неделя, 16 октомври 2011 г.

Java Concurrent API

If you have read the previous part of the article series, you should be aware that even the simplest operations like incrementing a counter require your attention(you must think where to put volatile or synchronized). That's why Java Concurrent API was created,  where some common operations and patterns are implemented as atomic and thread-safe. All classes from this API are contained in the java.util.concurrent package.

The first part of classes which we will take a look at, are in the java.util.concurrent.atomic sub-package. They implement thread-safe operations on single variables. So let's see how we can transform the counter example from the previous article using the API classes:
public class IdCounter {
    private AtomicInteger counter = new AtomicInteger();

    public int increment() {
        return counter.incrementAndGet();
    }
}
So AtomicInteger basically wraps a primitive integer value and allows you to make thread-safe operations on it. You don't care anymore about volatile and synchronized. You just use the appropriate methods. The other classes in the java.util.concurrent.atomic package are basically doing the same but not on a different type - Long, Boolean, Array, etc..(AtomicLong, AtomicBoolean, AtomicIntegerArray).

All these classes have the useful method compareAndSet(expectedValue, newValue). It is making atomic the following kind of code:
if (variable == expectedValue) {
   variable = newValue;
}
Without appropriate synchronization the above code is error prone to race conditions, because while one thread is setting the newValue, a second thread can be making the comparison with the expectedValue. This is not what you want in the most cases.


Now it is time to take a look at the Producer-Consumer pattern and how the concurrent API makes its implementation a piece of cake. There are two roles here - the producer and the consumer. The producer produces work and the consumer consumes work, created by the producer. Imagine a factory conveyer. A machine puts some products on the conveyer. Then a person gets the product from the conveyer and do whatever is needed next. Here the machine is the producer and the person is the consumer. In programming terms - the conveyer is a buffer, the producer and consumer are two separate threads which are inserting and getting information from the shared buffer. As you know extra care must be taken when two threads are accessing a shared resource. Hopefully the concurrent API provides the BlockingQueue interface. There are the methods put and take which "produce" and "consume" from the queue. Both methods are blocking, which means they will block(wait) until there is an empty space for producing, or until at least one product appears for consuming. Here is a simple example of a Producer and Consumer class. Exception handling is left off for clarity.
public class Producer implements Runnable {
    private BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        //Produce 1000 products
        for (int i = 0; i < 1000; i++) {
            queue.put(new Product());
            System.out.println("PRODUCED PRODUCT");
        }
    }
}
public class Consumer implements Runnable {
    private BlockingQueue queue;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            Product product = queue.take();
            System.out.println("CONSUMED PRODUCT");
        }
    }
}

And here is the calling code:
BlockingQueue queue = new ArrayBlockingQueue(50);
        
Producer producer = new Producer(queue);
new Thread(producer).start();
        
Consumer consumer = new Consumer(queue);
new Thread(consumer).start();

Here the limit of the queue is 50, which means there can be maximum 50 products at the same time in the queue. Also ArrayBlockingQueue is used as an implementation of BlockingQueue, which means that the queue is backed by an array.

Thanks for reading the second part of the article series!

Stay tuned for the next part where other points from Java Concurrent API will be covered - ExecutorService and Semaphore.

Няма коментари:

Публикуване на коментар