Das Java Concurrent Package

 

Das Java Concurrent Package

Überblick

Das Paket java.util.concurrent

Semaphore

CyclicBarrier

CountDownLatch

Exchanger

 

Überblick

 

Mit dem neuen Java in der Version 1.5/5.0 oder einfach nur Tiger, sind viele Neuerungen verbunden. In diesem Artikel möchte ich auf ein neues Paket eingehen, welches ein ganzes Bündel nützlicher Klassen bereithält, welche die nebenläufige Programmierung vereinfachen sollen. Ich versuche die Klassen anhand einfacher Beispiele verständlich zu erklären. Auf die Hintergründe über das Paket und den eigentlichen Vater, Doug Lea, werde ich bewusst nicht eingehen. Interessiert Sie vermutlich auch nicht besonders ;-).

 

Das Paket java.util.concurrent

 

Das Paket bietet wie bereits erwähnt eine Vielzahl von nützlichen Klassen für die nebenläufige Programmierung. Der Begriff nebenläufige Programmierung und Kenntnisse über die Thread Programmierung setze ich bereits voraus. Also, los geht’s!

 

Semaphore

 

Ein Semaphor ist übersetzt mein Fremdwörterbuch mit Signalmast mit verstellbaren Flügeln. Können Sie sich darunter etwas vorstellen? Das Prinzip ist jedoch ganz einfach an einem Beispiel erklärt, welches wir bestimmt alle kennen. Sie haben für das Wochenende alles in ihren Einkaufswagen gepackt und müssen nun nur noch zur Kasse und bezahlen. Leider gibt es im Supermarkt nur ein paar wenige Kassen und es bilden sich lange Schlangen. Ärgerlich! Die Kassen übernehmen in unserem Beispiel die Rolle des Semaphors. Sie lassen immer nur eine gewisse Anzahl von Kunden durch, die anderen müssen warten, bis wieder eine Kasse frei wird. Dasselbe Prinzip kennen wir auch vom Bankschalter, der Post, dem Skilift, usw.

 

Abbildung 1 Es können nur immer eine bestimmte Anzahl Threads gleichzeitig aktiv sein

 

Abbildung 2 Sobald wieder ein „Permit“ verfügbar ist, wird der nächste wartende Thread aktiviert

 

Solche Probleme mit den altbekannten Bordmitteln von Java zu lösen ist aufwändig und fehleranfällig. Die Klasse java.util.concurrent.Semaphore nimmt uns diese Arbeit ab. Schauen wir uns dies an einem für die Informatik etwas realistischerem Beispiel an. Nehmen wir an, wir hätten einen Pool aus n Datenbankverbindungen und ein paar hundert Threads, welche eine solche Verbindung benötigen. Achtung, die folgende Java Klasse ist nicht lauffähig, sie ist noch nicht fertig ausprogrammiert!

 

package ch.menzsoft.concurrent;

 

import java.sql.Connection;

import java.util.concurrent.Semaphore;

 

public class DatabasePool {

 

    private Semaphore semaphore;

   

    private Connection[] connections;

 

    public DatabasePool(int n) {

        semaphore = new Semaphore(n);

        connections = new Connection[n];

        // hier werden alle connections initialisiert...

    }

 

    public Connection getConnection() throws InterruptedException {

        semaphore.acquire();

        return getAvailableConnection();

    }

 

    public void releaseConnection(Connection con) {

        release(con);

        semaphore.release();

    }

 

    private synchronized void release(Connection con) {

        // die Verbindung wieder als unbenutzt kennzeichnen...

    }

 

    private synchronized Connection getAvailableConnection() {

        // eine freie Verbindung zurückgeben...

        return null;

    }

 

}

 

Sobald ein Thread eine Verbindung benötigt, ruft er die Methode getConnection auf und sobald eine solche Verbindung frei wird, bekommt er diese zurück. Sobald der Thread die Verbindung nicht mehr benötigt, gibt er sie wieder frei indem er die Methode releaseConnection aufruft. Nun werden die bereits wartenden Threads bedient. Das ist eigentlich alles was sich hinter einem Semaphor verbirgt. Selbstverständlich bietet die Klasse noch einen Haufen weiterer nützlicher Methoden, welche jedoch am Grundkonzept nichts ändern.

 

CyclicBarrier

 

Ein CyclicBarrier ist eine weitere Synchronisationshilfe. Mit dem CyclicBarrier löst man das Problem, dass eine bestimmte Anzahl Threads an einem bestimmten Punkt aufeinander warten müssen. Sobald alle Threads diese Barriere erreicht haben, kann optional eine bestimmte Aktion ausgeführt werden, bevor die wartenden Threads wieder aufgeweckt werden. Ihre klassische Anwendung findet man in der Aufgabenteilung. Man hat ein bestimmtes Problem, welches sich in unabhängige Teilprobleme aufteilen lässt. Sobald all diese Teilprobleme gelöst sind, werden die Ergebnisse zusammengeführt und als eine Lösung präsentiert.

 

Es handelt sich also um eine Art Barriere, wie man dem Namen entnehmen kann. Doch warum zyklisch (cyclic)? Ganz einfach, man kann diese Barriere nach getaner Arbeit wieder verwenden.

 

Abbildung 3 Die Threads warten, bis alle die Barriere erreicht haben

 

Abbildung 4 Sobald der letzte Thread die Barriere erreicht hat, werden alle wieder aktiviert

 

Auch hier wieder ein Beispiel, das man sich leicht merken kann. Wenn man ein Auto zusammenbauen möchte, benötigt man verschiedene Einzelkomponenten. Man braucht einen Motor, ein Chassis, vier Räder und ein Lenkrad. Diese Teile können unabhängig voneinander, also von verschiedenen Leuten gleichzeitig gebaut werden. Das Auto können wir jedoch erst zusammensetzen, wenn wir alle Teile beisammen haben. Sobald ein Teil fertig ist, wird gewartet. Das zusammensetzen wäre dann die gemeinsame Aktion, welche aufgerufen wird, sobald alle Teile fertig sind. Danach werden alle wartenden Threads wieder aktiviert. Im obigen Beispiel hätten diese jedoch nichts mehr zu tun.

 

package ch.menzsoft.concurrent;

 

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

 

public class CyclicBarrierSample {

 

    private CyclicBarrier barrier;

 

    private class Worker implements Runnable {

 

        public void run() {

            System.out.println("Löse Teilproblem..");

            try {

                barrier.await();

            } catch (InterruptedException e) {

                e.printStackTrace();

            } catch (BrokenBarrierException e) {

                e.printStackTrace();

            }

 

        }

 

    }

 

    public CyclicBarrierSample() {

        int parties = 5;

        barrier = new CyclicBarrier(parties, new Runnable() {

 

            public void run() {

                System.out.println("Baue alles zusammen..");

            }

 

        });

 

        for (int i = 0; i < parties; i++) {

            Thread t = new Thread(new Worker());

            t.start();

        }

 

    }

 

    public static void main(String[] args) {

        new CyclicBarrierSample();

    }

 

}

 

Produziert den folgenden Output:

 

Löse Teilproblem..

Löse Teilproblem..

Löse Teilproblem..

Löse Teilproblem..

Löse Teilproblem..

Baue alles zusammen..

 

CountDownLatch

 

Ein CountDownLatch ist einem CyclicBarrier in einiger Hinsicht ähnlich. Im Prinzip geht es darum, dass eine unbestimmte Anzahl von Threads darauf wartet, bis der Countdown abgeschlossen ist. Ähnliches kann man auch mit dem CyclicBarrier erreichen. Beim CyclicBarrier wird einfach bei jedem await Aufruf der Countdown dekrementiert, die Threads warten also auf sich selbst. Beim CountDownLatch wartet eine Gruppe von Threads auf den Startschuss von einem oder mehreren anderen Threads.

 

Abbildung 5 Die Threads warten, bis der Countdown null erreicht hat

 

Abbildung 6 Die wartenden Threads werden nach dem „Startschuss“ aktiviert

 

Wieder ein Beispiel: Eine unbekannte Anzahl Personen möchte in ein Einkaufscenter, leider sind die Geschäfte noch geschlossen. Sobald die Geschäfte öffnen, wachen all die wartenden Einkaufswütigen auf und erledigen ihre Einkäufe. In diesem Beispiel dauert der Countdown lediglich 1. Das Beispiel kann man einfach ausbauen indem wir annehmen, dass die Käufer erst in die Geschäfte stürmen, wenn alle n Türen geöffnet haben. Der Countdown dauert dann n.

 

package ch.menzsoft.concurrent;

 

import java.util.concurrent.CountDownLatch;

 

public class CountDownLatchSample {

 

    private CountDownLatch latch;

 

    private int shops = 5;

 

    private class Shopper implements Runnable {

 

        public void run() {

            try {

                latch.await();

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            System.out.println("Juhu, shopping! ;-)");

        }

 

    }

 

    private class Shop implements Runnable {

 

        public void run() {

            System.out.println("Opening shop..");

            latch.countDown();

        }

 

    }

 

    public CountDownLatchSample() {

        latch = new CountDownLatch(shops);

    }

 

    private void simulate() {

        int shopper = 10;

        for (int i = 0; i < shopper; i++) {

            Thread t = new Thread(new Shopper());

            t.start();

        }

 

        for (int i = 0; i < shops; i++) {

            Thread t = new Thread(new Shop());

            t.start();

        }

    }

 

    public static void main(String[] args) {

        new CountDownLatchSample().simulate();

    }

 

}

 

Dieses Programm produziert den folgenden Output:

 

Opening shop..

Opening shop..

Opening shop..

Opening shop..

Opening shop..

Juhu, shopping! ;-)

Juhu, shopping! ;-)

Juhu, shopping! ;-)

Juhu, shopping! ;-)

Juhu, shopping! ;-)

Juhu, shopping! ;-)

Juhu, shopping! ;-)

Juhu, shopping! ;-)

Juhu, shopping! ;-)

Juhu, shopping! ;-)

 

Exchanger

 

Mit einem Exchanger können immer zwei Threads an einem bestimmten Punkt Objekte untereinander austauschen. Sobald beide Threads die Methode exchange aufrufen, werden die Objekte getauscht. Und so läuft es ab:

 

Thread1

Thread2

Fülle lokalen Buffer

Fülle lokalen Buffer

Rufe exchange auf

..

..

Rufe exchange auf

Erhalte Buffer von Thread2

Erhalte Buffer von Thread1

 

Viel mehr gibt’s nicht zu erklären. Sehen Sie das Beispiel genau durch. Achten Sie darauf, dass ruhig mehr als zwei Threads den gleichen Exchanger benutzen können, Sie müssen einfach sicherstellen, dass am Ende nicht noch ein Thread am warten ist und nie wieder irgendwelche Daten bekommt.

 

package ch.menzsoft.concurrent;

 

import java.util.concurrent.Exchanger;

 

public class ExchangerTest {

 

    private Exchanger exchanger;

 

    private class Worker implements Runnable {

 

        int buffer;

 

        int threadNr;

 

        public Worker(int threadNr) {

            this.threadNr = threadNr;

        }

 

        public void run() {

            for (int i = 0; i < 5; i++) {

                try {

                    Integer b = (Integer) exchanger.exchange(new Integer(buffer));

                    System.out.println("Thread " + threadNr + ", got " + b);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

                buffer++;

            }

        }

 

    }

 

    public ExchangerTest() {

        exchanger = new Exchanger();

 

        for (int i = 0; i < 2; i++) {

            Thread t = new Thread(new Worker(i));

            t.start();

        }

    }

 

    public static void main(String[] args) {

        new ExchangerTest();

    }

 

}

 

Das Programm erzeugt den folgenden, erwarteten Output:

 

Thread 1, got 0

Thread 0, got 0

Thread 0, got 1

Thread 1, got 1

Thread 1, got 2

Thread 0, got 2

Thread 0, got 3

Thread 1, got 3

Thread 0, got 4

Thread 1, got 4

 

© Christian Menz