
Oft werden Warteschlangen benötigt, um Aufgaben zu Verwalten oder durch etwaige Parallelität die Performance zu Steigern. Mit .NET 4.0 hat Microsoft einen großen Schritt in diese Richtung getan und den Entwicklern durch die sogenannten ConcurrentCollections viel Arbeit abgenommen.
Die für diesen Beitrag interessante Klasse ist die BlockingCollection, die den sogenannten Producer and Consumer-Pattern umsetzt. Mit Hilfe dieses Patterns können sehr einfach und effizient Aufgaben in Warteschlangen gelegt und von anderen Threads zur Verarbeitung entnommen werden.
Die Funktionweise ist meist, dass eine Schleife in Thread Nr. 1 die Warteschlange füllt und mehrere Threads (Nr. 2 - Nr. x) solange warten, bis die Warteschlange neue Elemente erhält. Die konsumierenden Threads legen sich dann so lange schlafen, bis neue Elemente eintrudeln um diese zu verarbeiten - ohne dabei die CPU zu belasten.
Das Umsetzen dieses Patterns unter .NET 3.5 erforderte vom Entwickler noch relativ viel Code, der zudem Stoplerfallen durch die Thread-Synchronisation beachten musste.
1 /// <summary>
2 /// Element für die Warteschlange
3 /// </summary>
4 public class MyWarteschlangenElement
5 {
6 public MyWarteschlangenElement( Int32 zahl1, Int32 zahl2 )
7 {
8 Zahl1 = zahl1;
9 Zahl2 = zahl2;
10 }
11 public Int32 Zahl1 { get; private set; }
12 public Int32 Zahl2 { get; private set; }
13 }
14
15 public class MyWarteschlange
16 {
17 private readonly object _warteschlangenSynchronisierer = new object( );
18
19 /// <summary>
20 /// Alle aktiven konsumierenden Threads
21 /// </summary>
22 private readonly Thread[] _arbeitendeThreads;
23
24 /// <summary>
25 /// Warteschlange
26 /// </summary>
27 private readonly Queue<MyWarteschlangenElement> _itemQ = new Queue<MyWarteschlangenElement>( );
28
29 public MyWarteschlange( int anzahlThreads ) // Anzahl Threads = Anzahl der Konsumenten, die Elemente
30 { // aus der Warteschlange nehmen und bearbeiten
31 _arbeitendeThreads = new Thread[ anzahlThreads ];
32
33 // Erstelle für jeden Arbeiter einen eignen Thread
34 for ( var i = 0 ; i < anzahlThreads ; i++ )
35 {
36 ( _arbeitendeThreads[ i ] = new Thread( Consume ) ).Start( );
37 }
38 }
39
40 /// <summary>
41 /// Für ein Element, hier zwei Zahlen, zur Warteschlange hinzu
42 /// </summary>
43 /// <param name="warteschlangenElement"></param>
44 public void AufgabeHinzufuegen( MyWarteschlangenElement warteschlangenElement )
45 {
46 // Ein lock benötigen wir, da die Warteschlange hier nicht Thread-sicher ist
47 lock ( _warteschlangenSynchronisierer )
48 {
49 // Element zur Warteschlange hinzufügen
50 _itemQ.Enqueue( warteschlangenElement );
51
52 // Bekanntgeben, dass ein neues Element hinzugefügt wurde
53 Monitor.Pulse( _warteschlangenSynchronisierer );
54 }
55 }
56
57 /// <summary>
58 /// Beendet die Warteschlange
59 /// </summary>
60 public void Abschliessen( )
61 {
62 // Wir definieren null als das Element, das den verarbeitenden Threads sagt, dass nun abgeschlossen werden soll
63 // Aufgrund des locks und des Pulse müssen wir für jeden Thread ein null einwerfern
64 for ( var i = 0 ; i < _arbeitendeThreads.Length ; i++ )
65 {
66 AufgabeHinzufuegen( null );
67 }
68
69 // Warten, bis sich alle Threads beendet haben
70 foreach ( var worker in _arbeitendeThreads )
71 {
72 worker.Join( );
73 }
74 }
75
76 /// <summary>
77 /// Verarbeitet ein Element
78 /// </summary>
79 private void Consume( )
80 {
81 // Dauerhaft schauen, ob ein Element in der Warteschlange auf die Verarbeitung wartet
82 while ( true )
83 {
84 MyWarteschlangenElement zubearbeitendesElement;
85
86 // Ein lock benötigen wir, da die Warteschlange hier nicht Thread-sicher ist
87 lock ( _warteschlangenSynchronisierer )
88 {
89 while ( _itemQ.Count == 0 )
90 {
91 // Wir warten hier, bis ein neues Element der Warteschlange hinzugefügt wurde
92 // Dies schont die CPU, da nicht ständig das while(true) durchlaufen wird!
93 Monitor.Wait( _warteschlangenSynchronisierer );
94 }
95
96 // Wir holen ein Element aus der Warteschlange
97 zubearbeitendesElement = _itemQ.Dequeue( );
98 }
99
100 // Wir haben null als Abbruchkriterium definiert; schauen, ob wir abbrechen sollen
101 if ( zubearbeitendesElement == null )
102 {
103 // Thread beenden
104 return;
105 }
106
107 // Ausführung der Aufgabe
108 BerechneSumme( zubearbeitendesElement );
109 }
110 }
111
112 /// <summary>
113 /// Berechnet die Summe zweier Zahlen
114 /// </summary>
115 /// <param name="warteschlangenElement"></param>
116 private static void BerechneSumme( MyWarteschlangenElement warteschlangenElement )
117 {
118 // Berechne die Summe aus zahl1 und zahl2
119 var summe = warteschlangenElement.Zahl1 + warteschlangenElement.Zahl2;
120 }
121 ```
122
123Unter **.NET 4.0** ist die Implementierung nun deutlich einfacher.
124
125```csharp
126public class MyWarteschlange
127 {
128 /// <summary>
129 /// Warteschlange, die die Elemente zur Verarbeitung enthält
130 /// </summary>
131 private readonly BlockingCollection<MyWarteschlangenElement> _warteschlange = new BlockingCollection<MyWarteschlangenElement>( );
132
133 /// <summary>
134 /// Mit Hilfe dessen kann die Bearbeitung beendet oder abgebrochen werden
135 /// </summary>
136 private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource( );
137
138 /// <summary>
139 /// Array, um die aktuell konsumierenden Threads zwischen zu speichern
140 /// </summary>
141 private readonly Task[] _tasks;
142
143 public MyWarteschlange( int anzahlThreads ) // Anzahl Threads = Anzahl der Konsumenten, die Elemente
144 { // aus der Warteschlange nehmen und bearbeiten
145
146 // Array mit der Anzahl der Threads definieren
147 // Geringer 1 macht natürlich keinen Sinn und sollte abgefangen werden
148 _tasks = new Task[ anzahlThreads ];
149
150 for ( var i = 0 ; i < anzahlThreads ; i++ )
151 {
152 // Neuen Task anlegen, der Elemente für die Verarbeitung entnimmt
153 var consumeTask = Task.Factory.StartNew( ( ) =>
154 {
155 try
156 {
157 // GetConsumingEnumerable() blockiert hier so lange, bis ein Element entnommen werden konnte oder wenn die BlockingCollection komplettiert wurde.
158 // Ebenso wird abgebrochen, wenn ein Abschließen, also das Beenden durch den CancellationTokenSource angefordert wurde
159 foreach ( var warteschlangenElement in _warteschlange.GetConsumingEnumerable( TokenSource.Token ) )
160 {
161 // Schauen, ob Abschliessen() aufgerufen wurde
162 if ( TokenSource.IsCancellationRequested )
163 {
164 // Die Bearbeitung wurde abgebrochen
165 // Abschliessen() wurde ausgeführt
166 return;
167 }
168
169 // Berechnen der Summe des aktuellen Elements
170 var summe = warteschlangenElement.Zahl1 + warteschlangenElement.Zahl2;
171 }
172 }
173 catch ( OperationCanceledException )
174 {
175 // Die Bearbeitung wurde abgebrochen
176 // Abschliessen() wurde ausgeführt
177 }
178 } );
179
180 // Neuen Task zur Liste der Tasks hinzufügen
181 _tasks[ i ] = consumeTask;
182 }
183 }
184
185 public void Hinzufuegen( MyWarteschlangenElement warteschlangenElement )
186 {
187 // Wir benötigen beim Hinzufügen kein Lock, da die BlockingCollection absolut Thread-sicher ist
188 _warteschlange.Add( warteschlangenElement );
189 }
190
191 public void Abschliessen( )
192 {
193 // Abbrechen anfordern
194 TokenSource.Cancel( );
195
196 // Warten, bis sich alle Tasks beendet haben
197 Task.WaitAll( _tasks );
198 }
199 }
Wir sehen, dass die Variante mit der BlockingCollection folgende Vorteile bietet:
- Wir müssen uns um Locks nicht selbst kümmern > Thread-Sicherheit
- Deutlich weniger Code-Aufwand
- Und damit auch eine gesteigerte Übersicht und Code-Qualität<

Comments