Das Paket java.util.concurrent
Mit dem
neuen Java in der Version 1.5/5.0 oder einfach nur Tiger, sind viele Neuerungen
verbunden. In diesem Artikel möchte ich auf ein neues Paket eingehen, welches
ein ganzes Bündel nützlicher Klassen bereithält, welche die nebenläufige
Programmierung vereinfachen sollen. Ich versuche die Klassen anhand einfacher
Beispiele verständlich zu erklären. Auf die Hintergründe über das Paket und den
eigentlichen Vater, Doug Lea, werde ich bewusst nicht eingehen. Interessiert
Sie vermutlich auch nicht besonders ;-).
Das Paket
bietet wie bereits erwähnt eine Vielzahl von nützlichen Klassen für die
nebenläufige Programmierung. Der Begriff nebenläufige Programmierung und
Kenntnisse über die Thread Programmierung setze ich
bereits voraus. Also, los geht’s!
Ein
Semaphor ist übersetzt mein Fremdwörterbuch mit Signalmast mit verstellbaren
Flügeln. Können Sie sich darunter etwas vorstellen? Das Prinzip ist jedoch ganz
einfach an einem Beispiel erklärt, welches wir bestimmt alle kennen. Sie haben für
das Wochenende alles in ihren Einkaufswagen gepackt und müssen nun nur noch zur
Kasse und bezahlen. Leider gibt es im Supermarkt nur ein paar wenige Kassen und
es bilden sich lange Schlangen. Ärgerlich! Die Kassen übernehmen in unserem
Beispiel die Rolle des Semaphors. Sie lassen immer nur eine gewisse Anzahl von
Kunden durch, die anderen müssen warten, bis wieder eine Kasse frei wird.
Dasselbe Prinzip kennen wir auch vom Bankschalter, der Post, dem Skilift, usw.

Abbildung 1 Es können nur immer eine bestimmte Anzahl Threads gleichzeitig aktiv sein

Abbildung 2
Sobald wieder ein „Permit“ verfügbar ist, wird der
nächste wartende Thread aktiviert
Solche
Probleme mit den altbekannten Bordmitteln von Java zu lösen ist aufwändig und
fehleranfällig. Die Klasse java.util.concurrent.Semaphore nimmt uns diese Arbeit ab.
Schauen wir uns dies an einem für die Informatik etwas realistischerem Beispiel
an. Nehmen wir an, wir hätten einen Pool aus n Datenbankverbindungen und ein paar hundert Threads,
welche eine solche Verbindung benötigen. Achtung, die folgende Java Klasse ist
nicht lauffähig, sie ist noch nicht fertig ausprogrammiert!
|
package ch.menzsoft.concurrent; import java.sql.Connection; import java.util.concurrent.Semaphore; public class DatabasePool
{ private Semaphore semaphore; private Connection[]
connections; public DatabasePool(int n) { semaphore =
new Semaphore(n); connections
= new Connection[n]; // hier werden alle connections initialisiert... } public Connection
getConnection() throws InterruptedException
{ semaphore.acquire(); return getAvailableConnection(); } public void releaseConnection(Connection con) { release(con); semaphore.release(); } private synchronized void release(Connection con) { // die Verbindung wieder
als unbenutzt kennzeichnen... } private synchronized Connection
getAvailableConnection() { // eine freie Verbindung
zurückgeben... return null; } } |
Sobald ein Thread eine Verbindung benötigt, ruft er die Methode getConnection auf
und sobald eine solche Verbindung frei wird, bekommt er diese zurück. Sobald
der Thread die Verbindung nicht mehr benötigt, gibt
er sie wieder frei indem er die Methode releaseConnection aufruft. Nun werden die bereits wartenden Threads bedient. Das ist eigentlich alles was sich hinter
einem Semaphor verbirgt. Selbstverständlich bietet die Klasse noch einen Haufen
weiterer nützlicher Methoden, welche jedoch am Grundkonzept nichts ändern.
Ein CyclicBarrier ist eine weitere Synchronisationshilfe. Mit
dem CyclicBarrier löst man das Problem, dass eine
bestimmte Anzahl Threads an einem bestimmten Punkt aufeinander
warten müssen. Sobald alle Threads diese Barriere erreicht
haben, kann optional eine bestimmte Aktion ausgeführt werden, bevor die
wartenden Threads wieder aufgeweckt werden. Ihre
klassische Anwendung findet man in der Aufgabenteilung. Man hat ein bestimmtes
Problem, welches sich in unabhängige Teilprobleme aufteilen lässt. Sobald all
diese Teilprobleme gelöst sind, werden die Ergebnisse zusammengeführt und als eine
Lösung präsentiert.
Es handelt
sich also um eine Art Barriere, wie man dem Namen entnehmen kann. Doch warum zyklisch
(cyclic)? Ganz einfach, man kann diese Barriere nach
getaner Arbeit wieder verwenden.

Abbildung 3 Die Threads warten, bis alle die Barriere erreicht haben

Abbildung 4
Sobald der letzte Thread die Barriere erreicht hat,
werden alle wieder aktiviert
Auch hier
wieder ein Beispiel, das man sich leicht merken kann. Wenn man ein Auto
zusammenbauen möchte, benötigt man verschiedene Einzelkomponenten. Man braucht
einen Motor, ein Chassis, vier Räder und ein Lenkrad. Diese Teile können unabhängig
voneinander, also von verschiedenen Leuten gleichzeitig gebaut werden. Das Auto
können wir jedoch erst zusammensetzen, wenn wir alle Teile beisammen haben. Sobald
ein Teil fertig ist, wird gewartet. Das zusammensetzen wäre dann die gemeinsame
Aktion, welche aufgerufen wird, sobald alle Teile fertig sind. Danach werden
alle wartenden Threads wieder aktiviert. Im obigen
Beispiel hätten diese jedoch nichts mehr zu tun.
|
package ch.menzsoft.concurrent; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierSample
{ private CyclicBarrier
barrier; private class Worker
implements Runnable
{ public void run()
{ System.out.println("Löse
Teilproblem.."); try { barrier.await(); } catch (InterruptedException
e) { e.printStackTrace(); } catch (BrokenBarrierException
e) { e.printStackTrace(); } } } public CyclicBarrierSample()
{ int parties
= 5; barrier = new CyclicBarrier(parties, new Runnable()
{ public void run()
{ System.out.println("Baue
alles zusammen.."); } }); for (int i = 0; i < parties; i++) { Thread
t = new Thread(new Worker()); t.start(); } } public static void main(String[]
args) { new CyclicBarrierSample(); } } |
Produziert
den folgenden Output:
|
Löse
Teilproblem.. Löse
Teilproblem.. Löse
Teilproblem.. Löse
Teilproblem.. Löse
Teilproblem.. Baue alles
zusammen.. |
Ein CountDownLatch ist einem CyclicBarrier
in einiger Hinsicht ähnlich. Im Prinzip geht es darum, dass eine unbestimmte
Anzahl von Threads darauf wartet, bis der Countdown
abgeschlossen ist. Ähnliches kann man auch mit dem CyclicBarrier
erreichen. Beim CyclicBarrier wird einfach bei jedem await Aufruf der
Countdown dekrementiert, die Threads
warten also auf sich selbst. Beim CountDownLatch wartet
eine Gruppe von Threads auf den Startschuss von einem
oder mehreren anderen Threads.

Abbildung 5
Die Threads warten, bis der Countdown null erreicht
hat

Abbildung 6
Die wartenden Threads werden nach dem „Startschuss“
aktiviert
Wieder ein
Beispiel: Eine unbekannte Anzahl Personen möchte in ein Einkaufscenter, leider
sind die Geschäfte noch geschlossen. Sobald die Geschäfte öffnen, wachen all
die wartenden Einkaufswütigen auf und erledigen ihre Einkäufe. In diesem
Beispiel dauert der Countdown lediglich 1. Das Beispiel kann man einfach
ausbauen indem wir annehmen, dass die Käufer erst in die Geschäfte stürmen,
wenn alle n Türen geöffnet haben. Der
Countdown dauert dann n.
|
package ch.menzsoft.concurrent; import java.util.concurrent.CountDownLatch; public class CountDownLatchSample
{ private CountDownLatch
latch; private int shops
= 5; private class Shopper
implements Runnable
{ public void run()
{ try { latch.await(); } catch (InterruptedException
e) { e.printStackTrace(); } System.out.println("Juhu, shopping! ;-)"); } } private class Shop implements Runnable
{ public void run()
{ System.out.println("Opening shop.."); latch.countDown(); } } public CountDownLatchSample()
{ latch = new CountDownLatch(shops); } private void simulate()
{ int shopper
= 10; for (int i = 0; i < shopper; i++) { Thread
t = new Thread(new Shopper()); t.start(); } for (int i = 0; i < shops; i++) { Thread
t = new Thread(new Shop()); t.start(); } } public static void main(String[]
args) { new CountDownLatchSample().simulate(); } } |
Dieses
Programm produziert den folgenden Output:
|
Opening shop.. Opening shop.. Opening shop.. Opening shop.. Opening shop.. Juhu, shopping! ;-) Juhu, shopping! ;-) Juhu, shopping! ;-) Juhu, shopping! ;-) Juhu, shopping! ;-) Juhu, shopping! ;-) Juhu, shopping! ;-) Juhu, shopping! ;-) Juhu, shopping! ;-) Juhu, shopping! ;-) |
Mit einem Exchanger können immer zwei Threads
an einem bestimmten Punkt Objekte untereinander austauschen. Sobald beide Threads die Methode exchange aufrufen, werden die Objekte getauscht. Und so
läuft es ab:
|
Thread1 |
Thread2 |
|
Fülle
lokalen Buffer |
Fülle
lokalen Buffer |
|
Rufe exchange auf |
.. |
|
.. |
Rufe exchange auf |
|
Erhalte
Buffer von Thread2 |
Erhalte
Buffer von Thread1 |
Viel mehr
gibt’s nicht zu erklären. Sehen Sie das Beispiel genau durch. Achten Sie
darauf, dass ruhig mehr als zwei Threads den gleichen
Exchanger benutzen können, Sie müssen einfach
sicherstellen, dass am Ende nicht noch ein Thread am
warten ist und nie wieder irgendwelche Daten bekommt.
|
package ch.menzsoft.concurrent; import java.util.concurrent.Exchanger; public class ExchangerTest
{ private Exchanger exchanger; private class Worker implements Runnable
{ int buffer; int threadNr; public Worker(int threadNr)
{ this.threadNr = threadNr; } public void run() { for (int i =
0; i < 5; i++) { try { Integer b = (Integer) exchanger.exchange(new Integer(buffer)); System.out.println("Thread
" + threadNr + ", got " + b); } catch (InterruptedException
e) { e.printStackTrace(); } buffer++; } } } public ExchangerTest()
{ exchanger = new Exchanger(); for (int i =
0; i < 2; i++) { Thread t = new Thread(new Worker(i)); t.start(); } } public static void main(String[] args) { new ExchangerTest(); } } |
Das
Programm erzeugt den folgenden, erwarteten Output:
|
Thread 1, got 0 Thread 0, got 0 Thread 0, got 1 Thread 1, got 1 Thread 1, got 2 Thread 0, got 2 Thread 0, got 3 Thread 1, got 3 Thread 0, got 4 Thread 1, got 4 |
© Christian
Menz