SchwabenCode Blog Post

C# Warteschlangen – Die BlockingCollection

Oft werden Warteschlangen benötigt, um Aufgaben zu Verwalten oder durch etwaige Parallelität die Performance zu Steigern. Mit .NET 4.0 hat Microsoft einen großen Schritt in diese Richtung getan und den Entwicklern durch die sogenannten ConcurrentCollections viel Arbeit abgenommen.

Die für diesen Beitrag interessante Klasse ist die BlockingCollection, die den sogenannten Producer and Consumer-Pattern umsetzt. Mit Hilfe dieses Patterns können sehr einfach und effizient Aufgaben in Warteschlangen gelegt und von anderen Threads zur Verarbeitung entnommen werden.

Die Funktionweise ist meist, dass eine Schleife in Thread Nr. 1 die Warteschlange füllt und mehrere Threads (Nr. 2 - Nr. x) solange warten, bis die Warteschlange neue Elemente erhält. Die konsumierenden Threads legen sich dann so lange schlafen, bis neue Elemente eintrudeln um diese zu verarbeiten - ohne dabei die CPU zu belasten.

Das Umsetzen dieses Patterns unter .NET 3.5 erforderte vom Entwickler noch relativ viel Code, der zudem Stoplerfallen durch die Thread-Synchronisation beachten musste.

  1    /// <summary>
  2    /// Element für die Warteschlange
  3    /// </summary>
  4    public class MyWarteschlangenElement
  5    {
  6        public MyWarteschlangenElement( Int32 zahl1, Int32 zahl2 )
  7        {
  8            Zahl1 = zahl1;
  9            Zahl2 = zahl2;
 10        }
 11        public Int32 Zahl1 { get; private set; }
 12        public Int32 Zahl2 { get; private set; }
 13    }
 14
 15    public class MyWarteschlange
 16    {
 17        private readonly object _warteschlangenSynchronisierer = new object( );
 18
 19        /// <summary>
 20        /// Alle aktiven konsumierenden Threads
 21        /// </summary>
 22        private    readonly Thread[] _arbeitendeThreads;
 23
 24        /// <summary>
 25        /// Warteschlange
 26        /// </summary>
 27        private   readonly Queue<MyWarteschlangenElement> _itemQ = new Queue<MyWarteschlangenElement>( );
 28
 29        public MyWarteschlange( int anzahlThreads )  // Anzahl Threads = Anzahl der Konsumenten, die Elemente 
 30        {                                            // aus der Warteschlange nehmen und bearbeiten
 31            _arbeitendeThreads = new Thread[ anzahlThreads ];
 32
 33            // Erstelle für jeden Arbeiter einen eignen Thread
 34            for ( var i = 0 ; i < anzahlThreads ; i++ )
 35            {
 36                ( _arbeitendeThreads[ i ] = new Thread( Consume ) ).Start( );
 37            }
 38        }
 39
 40        /// <summary>
 41        /// Für ein Element, hier zwei Zahlen, zur Warteschlange hinzu
 42        /// </summary>
 43        /// <param name="warteschlangenElement"></param>
 44        public void AufgabeHinzufuegen( MyWarteschlangenElement warteschlangenElement )
 45        {
 46            // Ein lock benötigen wir, da die Warteschlange hier nicht Thread-sicher ist
 47            lock ( _warteschlangenSynchronisierer )
 48            {
 49                // Element zur Warteschlange hinzufügen
 50                _itemQ.Enqueue( warteschlangenElement );
 51
 52                // Bekanntgeben, dass ein neues Element hinzugefügt wurde
 53                Monitor.Pulse( _warteschlangenSynchronisierer );
 54            }
 55        }
 56
 57        /// <summary>
 58        /// Beendet die Warteschlange
 59        /// </summary>
 60        public void Abschliessen( )
 61        {
 62            // Wir definieren null als das Element, das den verarbeitenden Threads sagt, dass nun abgeschlossen werden soll
 63            // Aufgrund des locks und des Pulse müssen wir für jeden Thread ein null einwerfern
 64            for ( var i = 0 ; i < _arbeitendeThreads.Length ; i++ )
 65            {
 66                AufgabeHinzufuegen( null );
 67            }
 68
 69            // Warten, bis sich alle Threads beendet haben
 70            foreach ( var worker in _arbeitendeThreads )
 71            {
 72                worker.Join( );
 73            }
 74        }
 75
 76        /// <summary>
 77        /// Verarbeitet ein Element
 78        /// </summary>
 79        private void Consume( )
 80        {
 81            // Dauerhaft schauen, ob ein Element in der Warteschlange auf die Verarbeitung wartet
 82            while ( true )
 83            {
 84                MyWarteschlangenElement zubearbeitendesElement;
 85
 86                // Ein lock benötigen wir, da die Warteschlange hier nicht Thread-sicher ist
 87                lock ( _warteschlangenSynchronisierer )
 88                {
 89                    while ( _itemQ.Count == 0 )
 90                    {
 91                        // Wir warten hier, bis ein neues Element der Warteschlange hinzugefügt wurde
 92                        // Dies schont die CPU, da nicht ständig das while(true) durchlaufen wird!
 93                        Monitor.Wait( _warteschlangenSynchronisierer );
 94                    }
 95
 96                    // Wir holen ein Element aus der Warteschlange
 97                    zubearbeitendesElement = _itemQ.Dequeue( );
 98                }
 99
100                // Wir haben null als Abbruchkriterium definiert; schauen, ob wir abbrechen sollen
101                if ( zubearbeitendesElement == null )
102                {
103                    // Thread beenden
104                    return;
105                }
106
107                // Ausführung der Aufgabe
108                BerechneSumme( zubearbeitendesElement );
109            }
110        }
111
112        /// <summary>
113        /// Berechnet die Summe zweier Zahlen
114        /// </summary>
115        /// <param name="warteschlangenElement"></param>
116        private static void BerechneSumme( MyWarteschlangenElement warteschlangenElement )
117        {
118            // Berechne die Summe aus zahl1 und zahl2
119            var summe = warteschlangenElement.Zahl1 + warteschlangenElement.Zahl2;
120        }
121        ```
122        
123Unter **.NET 4.0** ist die Implementierung nun deutlich einfacher.
124
125```csharp
126public class MyWarteschlange
127    {
128        /// <summary>
129        /// Warteschlange, die die Elemente zur Verarbeitung enthält
130        /// </summary>
131        private   readonly BlockingCollection<MyWarteschlangenElement> _warteschlange = new BlockingCollection<MyWarteschlangenElement>( );
132
133        /// <summary>
134        /// Mit Hilfe dessen kann die Bearbeitung beendet oder abgebrochen werden
135        /// </summary>
136        private static readonly CancellationTokenSource TokenSource = new CancellationTokenSource( );
137
138        /// <summary>
139        /// Array, um die aktuell konsumierenden Threads zwischen zu speichern
140        /// </summary>
141        private readonly Task[] _tasks;
142
143        public MyWarteschlange( int anzahlThreads )  // Anzahl Threads = Anzahl der Konsumenten, die Elemente 
144        {                                            // aus der Warteschlange nehmen und bearbeiten
145
146            // Array mit der Anzahl der Threads definieren
147            // Geringer 1 macht natürlich keinen Sinn und sollte abgefangen werden
148            _tasks = new Task[ anzahlThreads ];
149
150            for ( var i = 0 ; i < anzahlThreads ; i++ )
151            {
152                // Neuen Task anlegen, der Elemente für die Verarbeitung entnimmt
153                var consumeTask = Task.Factory.StartNew( ( ) =>
154                    {
155                        try
156                        {
157                            // GetConsumingEnumerable() blockiert hier so lange, bis ein Element entnommen werden konnte oder wenn die BlockingCollection komplettiert wurde.
158                            // Ebenso wird abgebrochen, wenn ein Abschließen, also das Beenden durch den CancellationTokenSource angefordert wurde
159                            foreach ( var warteschlangenElement in _warteschlange.GetConsumingEnumerable( TokenSource.Token ) )
160                            {
161                                // Schauen, ob Abschliessen() aufgerufen wurde
162                                if ( TokenSource.IsCancellationRequested )
163                                {
164                                    // Die Bearbeitung wurde abgebrochen
165                                    // Abschliessen() wurde ausgeführt
166                                    return;
167                                }
168
169                                // Berechnen der Summe des aktuellen Elements
170                                var summe = warteschlangenElement.Zahl1 + warteschlangenElement.Zahl2;
171                            }
172                        }
173                        catch ( OperationCanceledException )
174                        {
175                            // Die Bearbeitung wurde abgebrochen
176                            // Abschliessen() wurde ausgeführt
177                        }
178                    } );
179
180                // Neuen Task zur Liste der Tasks hinzufügen
181                _tasks[ i ] = consumeTask;
182            }
183        }
184
185        public void Hinzufuegen( MyWarteschlangenElement warteschlangenElement )
186        {
187            // Wir benötigen beim Hinzufügen kein Lock, da die BlockingCollection absolut Thread-sicher ist
188            _warteschlange.Add( warteschlangenElement );
189        }
190
191        public void Abschliessen( )
192        {
193            // Abbrechen anfordern
194            TokenSource.Cancel( );
195
196            // Warten, bis sich alle Tasks beendet haben
197            Task.WaitAll( _tasks );
198        }
199    }

Wir sehen, dass die Variante mit der BlockingCollection folgende Vorteile bietet:

  • Wir müssen uns um Locks nicht selbst kümmern > Thread-Sicherheit
  • Deutlich weniger Code-Aufwand
  • Und damit auch eine gesteigerte Übersicht und Code-Qualität<

Comments

Twitter Facebook LinkedIn WhatsApp