Asynchronous Producer Consumer with BlockingQueue in Java

19 12 2015

What is it

Producer Consumer pattern is used when one module (producer) produces events, messages or any other kind of data with various pace and the other module (consumer) processes it when the data occur. The asynchronous version means that the producer does not wait until consumer processes each item – just sends it to consumer and forgets about it. The consumer consumes it when its turn comes.

How it works? BlockingQueue

Technically Producer puts events into FIFO BlockingQueue and Consumer takes the event one by one. The queue instance is shared among both actors, so that they can access is. BlockingQueue API has two main operations:

  • put() used by Producer to add new item (if the queue is bounded and is full, the put() method will block the Producer thread until there is a space to add new item)
  • take() used by Consumer to take next event to process (if the queue is empty, the consumer will be blocked until the event occur)

One more API method that is a good practice to use over the take() is:

  • offer() used by the producer to put() with maximum timeout to block producer thread when the queue is full. The offer() method returns boolean result saying if put was successful or not. This value should be interpreted in your code.

There are two BlockingQueue implementations:

  • ArrayBlockingQueue that has to be bounded and is more performant (use it if your case uses constant queue size)
  • LinkedBlockingQueue that can be bounded or not (in fact it is then bounded to Integer.MAX_VALUE) but can be less performant because of that

BlockingQueue is thread-safe and does not accept null values.

Work Unit

It is also a good practice to wrap your messages put in the queue in a wrapper that holds the message and returns it when consumer needs it. The reason to introduce this level of indirection is to be able to add additional data to the Producer Consumer mechanism that your message does not need. That can be:

  • performance metrics indicating when and how fast your messages are processed
  • testing to track the changes in your object

Work unit implementation according to this practice is:

public class MyEventWorkUnit<T> {

    private T myEventWorkUnit;

    public MyEventWorkUnit(T myEventWorkUnit) {
        this.myEventWorkUnit = myEventWorkUnit;

    public T getWorkUnit() {
        return myEventWorkUnit;

Then your BlockingQueue holds this type:

BlockingQueue<MyEventWorkUnit<MyEvent>> queue

The MyEvent class is a simple object that can handle itself when asked and has externally generated id. Notice that handle() method sleeps thread to simulate that handling events takes certain amount of time:

public class MyEvent {

    private static final Logger log = Logger.getLogger(MethodHandles.lookup().lookupClass().getSimpleName());

    private final int eventId;

    public MyEvent(int eventId) {
        this.eventId = eventId;

    public void handle() {"Serving event with id: " + eventId);
        try {
        } catch (InterruptedException e) {


Producer Consumer implementation

My Producer generates 10 events and puts it into queue with offer() method with a timeout. This is the implementation:

public class EventProducer extends Thread {
    private final BlockingQueue<MyEventWorkUnit<MyEvent>> queue;
    private AtomicInteger idGenerator;

    public static final Logger log = Logger.getLogger(MethodHandles.lookup().lookupClass().getName());

    public EventProducer(BlockingQueue<MyEventWorkUnit<MyEvent>> queue) {
        this.queue = queue;
        idGenerator = new AtomicInteger();

    public void run() {
        for (int i = 0; i < 10; i++) {
            boolean wasAdded = false;
            MyEvent eventToAdd = new MyEvent(idGenerator.getAndIncrement());
            try {
                wasAdded = queue.offer(new MyEventWorkUnit<>(eventToAdd), 100, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
      "Adding Thread was interrupted");
            } finally {
                handleAddResult(wasAdded, eventToAdd);

My consumer simply takes events from the queue and handles them:

public class EventConsumer extends Thread{
    private final BlockingQueue<MyEventWorkUnit<MyEvent>> queue;

    public EventConsumer(BlockingQueue<MyEventWorkUnit<MyEvent>> queue) {
        this.queue = queue;

    public void run() {
            try {
                MyEventWorkUnit<MyEvent> workUnit = queue.take();
            } catch (InterruptedException e) {

Producer Consumer Construction

I create the ArrayBlockingQueue instance with size bounded to 3 items. This is not enough to handle fast-producing and slow consuming Threads. I limit the size to show how offer() timeouts helps to handle such situations:

BlockingQueue<MyEventWorkUnit<MyEvent>> queue = new ArrayBlockingQueue<>(3);

EventProducer producer = new EventProducer(queue);
EventConsumer consumer = new EventConsumer(queue);


The result

Notice that:
1. the event (id=0) is immediately added and served. Service takes some time
2. the events with ids 1, 2, 3 are added to the queue, event with id=0 is still being served
3. events 4, 5, 6, and 7 are not added to the queue since it’s full so the timeout occur
4. event id=1 is served by the consumer. There is room for one more event in the queue
5. event id = 8 is added successfully to the queue, event id=9 is rejected due to timeout
6. all events from the queue are handled

INFO: Event was added to the queue: MyEvent{eventId=0}
INFO: Serving event with id: 0
INFO: Event was added to the queue: MyEvent{eventId=1}
INFO: Event was added to the queue: MyEvent{eventId=2}
INFO: Event was added to the queue: MyEvent{eventId=3}
WARNING: Unable to add event to queue due to timeout: MyEvent{eventId=4}
WARNING: Unable to add event to queue due to timeout: MyEvent{eventId=5}
WARNING: Unable to add event to queue due to timeout: MyEvent{eventId=6}
WARNING: Unable to add event to queue due to timeout: MyEvent{eventId=7}
INFO: Serving event with id: 1
INFO: Event was added to the queue: MyEvent{eventId=8}
WARNING: Unable to add event to queue due to timeout: MyEvent{eventId=9}
INFO: Serving event with id: 2
INFO: Serving event with id: 3
INFO: Serving event with id: 8

Source Code
You can run this sample on your own. Get the conde from my GitHub:


This post was inspired by the book ‘Well Grounded Java Developer’ and sites:,




Give Your feedback:

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: