C# - Simple example implementing concurrent queue
1 minute read
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class ActionQueueItem
{
public ActionQueueItem ( Action operation , string name , int priority )
{
Operation = operation ;
Delay = 500 ; // 500 milliseconds
Name = name ;
Priority = priority ;
}
public ActionQueueItem ( Action operation , string name , int delay , int priority )
{
Operation = operation ;
Name = name ;
Delay = delay ;
Priority = priority ;
}
public Action Operation { get ; private set ; }
public int Delay { get ; set ; }
public string Name { get ; set ; }
public int Priority { get ; set ; }
}
public class ActionDelayQueue
{
ManualResetEvent m _ stopEvent = new ManualResetEvent ( false ) ;
ManualResetEvent m _ enqueuEvent = new ManualResetEvent ( false ) ;
Thread m _ processTask ;
List < ActionQueueItem > m _ queue = new List < ActionQueueItem > () ;
public ActionDelayQueue ()
{
}
public bool Start ()
{
Stop () ;
m _ stopEvent . Reset () ;
m _ processTask = new Thread ( Process ) ;
m _ processTask . Start () ;
return true ;
}
public bool Stop ()
{
if ( m _ processTask != null )
{
m _ stopEvent . Set () ;
if ( ! m _ stopEvent . WaitOne ( 1000 ))
m _ processTask . Abort () ;
m _ processTask = null ;
return true ;
}
return false ;
}
public void Enqueue ( ActionQueueItem item )
{
lock ( m _ queue )
{
if ( item . Priority >= 5 )
{
m _ queue . Insert ( 0 , item ) ;
}
else
m _ queue . Add ( item ) ;
m _ enqueuEvent . Set () ;
}
}
private void Process ()
{
try
{
WaitHandle [] list = new WaitHandle [] { m _ stopEvent , m _ enqueuEvent } ;
while ( true )
{
var res = WaitHandle . WaitAny ( list ) ;
if ( 0 == res )
break ;
else if ( 1 == res )
{
ActionQueueItem action = null ;
lock ( m _ queue )
{
if ( m _ queue . Count > 0 )
{
var currentTime = DateTime . Now ;
action = m _ queue [ 0 ] ;
m _ queue . RemoveAt ( 0 ) ;
if ( m _ queue . Count <= 0 )
m _ enqueuEvent . Reset () ;
}
}
if ( action != null )
{
Thread . Sleep ( action . Delay ) ;
action . Operation () ;
}
}
}
}
catch ( Exception e )
{
Trace . WriteLine ( e . Message ) ;
}
}
}