Latches und Barriers sind einfache Thread Synchronisierungsmechanismen, die es erlauben, mehrere Threads warten zu lassen, bis eine Operation fertig ist. Latches und Barriers soll es in drei Geschmacksrichtungen in C++20 geben: std::latch, std::barrier und std::flex_barrier.
Zuerst einmal stellen sich für mich zwei Fragen:
- Worin unterscheiden sich die drei Mechanismen um Threads zu synchronisieren? Während ein std::latch nur einmal verwendet werden kann, können std::barrier und std::flex_barrier mehrmals verwendet werden. std::flex_barrier bietet mehr Flexibilität als std::barrier. Ein std::flex_barrier erlaubt es eine Aktion zu hinterlegen, wenn die Operation fertig ist.
- Was können Latches und Barriers, was Koordinationsmechanismen in C++11 und C++14 wie Futures und Bedingungsvariablen in Kombination mit Locks nicht konnten? Latches und Barriers können nicht mehr. Sie sind aber wesentlich einfacher in der Anwendung und performanter, da sie oft intern lockfreie Mechanismen verwenden.
Jetzt werde ich mir die drei Koordinationsmechanismen genauer anschauen.
Die kurzen Codebeispiele sind aus dem Proposal n4204.
std::latch
std::latch ist ein Abwärtszähler. Sein Wert wird im Konstruktor gesetzt. Ein Thread thread kann mit der Methode thread.count_down_and_wait den Zähler um 1 heruntersetzen und warten, bis dieser 0 erreicht hat oder er kann mit thread.count_down nur den Zähler um 1 heruntersetzen. Neben diesen beiden Methoden besitzt der std::latch die Methode thread.is_ready, um zu testen, ob der Zähler 0 ist und die Methode thread.wait. Mit thread.wait wartet (blockiert) er, bis der Zähler den Wert 0 besitzt. Da std::latch nicht erlaubt, den Zähler zu inkrementieren oder auch zurücksetzen, lässt er sich nur einmal verwenden.
Die Details zu std::latch lassen sich bereits auf cppreference.com nachlesen. Zum Abschluss noch ein Beispiel zum std::latch.
1 2 3 4 5 6 7 8 9 10 11 12 |
void DoWork(threadpool* pool) { latch completion_latch(NTASKS); for (int i = 0; i < NTASKS; ++i) { pool->add_task([&] { // perform work ... completion_latch.count_down(); })); } // Block until work is done completion_latch.wait(); } |
Der std::latch completion_latch wird im Konstruktor auf NTASKS (Zeile 2) gesetzt. Der Threadpool führt die NTASKS Aufgaben (Zeile 4 - 8) aus. Am Ende jeder Aufgabe (Zeile 7) wird der Zähler dekrementiert. Zeile 11 ist die Barriere für den Thread, der DoWork ausführt. Damit ist die Zeile die Barriere für den ganzen Workflow, der die NTASKS Aufgaben ausführt.
std::barrier
Ein std::barrier ist einem std::latch sehr ähnlich. Der feine Unterschied ist aber, dass ein std::barrier mehrmals verwendet werden kann und der Zähler dabei auf den alten Wert zurückgesetzt wird. Unmittelbar nachdem der Zähler den Wert 0 besitzt, findet eine sogenannte Abschlussphase (eng.: completion phase) statt. Die ist im Falle des std::barrier leer. Das ändert sich mit dem std::flex_barrier. std::barrier besitzt zwei interessante Methoden: std::arrive_and_wait und std::arrive_and_drop. Während std::arrive_and_wait am Synchronisationspunkt wartet, entfernt sich std::arrive_and_drop aus dem Synchronisationsmechanismus.
Bevor ich aber genauer auf den std::flex_barrier und die Abschlussphase eingehe, noch ein kleines Beispiel zum std::barrier.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
void DoWork() { Tasks& tasks; int n_threads; vector<thread*> workers; barrier task_barrier(n_threads); for (int i = 0; i < n_threads; ++i) { workers.push_back(new thread([&] { bool active = true; while(active) { Task task = tasks.get(); // perform task ... task_barrier.arrive_and_wait(); } }); } // Read each stage of the task until all stages are complete. while (!finished()) { GetNextStage(tasks); } } |
Der std::barrier task_barrier in Zeile 6 wird in diesem Beispiel dazu verwendet eine Anzahl von Threads zu koordinieren, die eine Aufgabe mehrmalig wiederholen. Die Anzahl der Threads ist n_threads (Zeile 3). Jeder Thread holt sich seine Aufgabe in Zeile 12 mittels task.get() ab, führt diese aus und wartet - sobald er seine Aufgabe erledigt hat (Zeile 15) - bis alle Threads ihre Aufgabe erledigt haben. Dann holt er sich eine neue Aufgabe in Zeile 12, solange active in Zeile 11 true ergibt.
std::flex_barrier
In dem Beispiel zu std::flex_barrier sind die Namen unglücklich gewählt. So wird der std::flex_barrier als notifying_barrier bezeichnet. Da das verwirrt, habe ich den Namen std::flex_barrier verwendet.
Der std::flex_barrier besitzt im Gegensatz zum std::barrier einen zusätzlichen Konstruktor. Dieser kann mit einer aufrufbaren Einheit parametrisiert werden, die automatisch in der Abschlussphase ausgeführt wird. Diese aufrufbare Einheit muss einen Wert zurückgeben. Dieser Wert setzt den Wert des Zählers in der Abschlussphase. Ein Wert von -1 bedeutet, dass in der nächsten Iteration der Zähler unverändert bleibt. Kleiner Wert wie -1 sind nicht zulässig.
Was geschieht genau in der Abschlussphase?
- Alle Threads sind blockiert.
- Ein Thread wird entsperrt (eng. unblocked) und führt die aufrufbare Einheit aus.
- Wenn die Abschlussphase fertig ist, werden alle Thread entsperrt.
Das Code Schnipsel zeigt std::flex_barrier in der Anwendung.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
void DoWork() { Tasks& tasks; int initial_threads; atomic<int> current_threads(initial_threads) vector<thread*> workers; // Create a flex_barrier, and set a lambda that will be // invoked every time the barrier counts down. If one or more // active threads have completed, reduce the number of threads. std::function rf = [&] { return current_threads;}; flex_barrier task_barrier(n_threads, rf); for (int i = 0; i < n_threads; ++i) { workers.push_back(new thread([&] { bool active = true; while(active) { Task task = tasks.get(); // perform task ... if (finished(task)) { current_threads--; active = false; } task_barrier.arrive_and_wait(); } }); } // Read each stage of the task until all stages are complete. while (!finished()) { GetNextStage(tasks); } } |
Das Beispiel verwendet eine ähnliche Strategie wie das Beispiel zum std::barrier. Die Besonderheit ist aber, dass beim std::flex_barrrier zur Laufzeit der Zähler angepasst wird. Dazu erhält der std::flex_barrier task_barrier in Zeile 11 eine Lambda-Funktion. Diese bindet ihre Variable current_thread per Referenz. In der Zeile 21 wird diese Variable dekrementiert und active auf false gesetzt, wenn der Thread seine Aufgabe erledigt hat. Damit wird auch der Zähler in der Abschlussphase entsprechend dekrementiert.
Eine Besonderheit besitzt std::flex_barrier gegenüber dem std::barrier und std::latch. Er kann als einziger der drei seinen Zähler erhöhen.
Die Details zu std::latch, std::barrier und std::flex_barrier lassen sich bereits auf cppreference.com nachlesen.
Wie geht's weiter?
Coroutinen sind verallgemeinerte Funktionen, die ihre Ausführung anhalten und wieder aufnehmen können. Dabei behalten sie ihren Zustand. Gerne werden sie verwendet um kooperatives Multitasking in Betriebssystemen, Ereignisschleifen in Eventsystemen, unendliche Listen oder Pipelines zu implementieren. Im nächsten Artikel gibt's die Details.
Go to Leanpub/cpplibrary "What every professional C++ programmer should know about the C++ standard library". Hole dir dein E-Book. Unterstütze meinen Blog.
Weiterlesen...