April 17, 2017

#Collections: Part 2-All about BlockingQueue!


java.util.concurrent.BlockingQueue interface is added in Java 1.5 along with various other concurrent Utility classes like ConcurrentHashMap, Counting Semaphore, CopyOnWriteArrrayList.

  • BlockingQueue not only stores elements, but also supports flow control by introducing blocking if either BlockingQueue is full or empty.
  • The insertion of elements takes place when the queue is not completely filled otherwise the thread performing the operation will get blocked. It will wait for till the space is available in the queue to accommodate the new element. And when an element has to be dequeued from the queue, the operation gets blocked if the queue is empty. It will wait till an element is inserted into the queue.
  • In simple words, the take() method of BlockingQueue will block if Queue is empty and put() method of BlockingQueue will block if Queue is full. This will enhance flow control by activating blocking, in case a thread is trying to dequeue an empty queue or enqueue a full queue. In either case, this interface comes in handy. 
  • Because of this property BlockingQueue is an ideal choice for implementing Producer-Consumer design pattern, where one thread insert element into BlockingQueue and other thread consumes it.
  • BlockingQueue in Java doesn't allow null elements, implementation of BlockingQueue like ArrayBlockingQueue, LinkedBlockingQueue throws NullPointerException when you try to add null on queue.
Types Of Blocking Queue

Unbounded Queue: An unbounded Queue is one which is initialized without capacity, actually by default it initialized with Integer.MAX_VALUE. In the case of an unbounded blocking queue, the queue will never block because it could grow to a very large size. when you add elements its size grows.

Syntax:
BlockingQueue blockingQueue = new LinkedBlockingDeque();

Bounded Queue: A bounded queue is created by passing the capacity of the queue in constructor and call to put() will be blocked if BlockingQueue is full.

Syntax:
BlockingQueue blockingQueue = new LinkedBlockingDeque(5);

e.g:

/*Bounded BlockingQueue */
BlockingQueue < String > namebQueue = new ArrayBlockingQueue < String > (3);
namebQueue.put("Catherine Bell");
System.out.println("Catherine Bell is added in the list");
namebQueue.put("James Denton");
System.out.println("James Denton is added in the list");
namebQueue.put("Bailee Madison");
System.out.println("Bailee Madison is added in the list");
namebQueue.put("Rhys Matthew Bond"); //list is full, so this will not be added
System.out.println("Rhys Matthew Bond is added in the list");

Output will be:
Catherine Bell is added in the list
James Denton is added in the list
Bailee Madison is added in the list

Hierarchy of BlockingQueue


Basic Operations Of BlockingQueue

BlockingQueue implementations like ArrayBlockingQueue, LinkedBlockingQueue and PriorityBlockingQueue are thread-safe. All queuing methods use concurrency control and internal locks to perform operation atomically.

The blocking queue interface has a number of methods, some of which are described here:
  • BlockingQueue interface extends Collection, Queue and Iterable interface which provides all Collection and Queue related methods like poll(), and peak().
  • take() method will help in retrieving and removing the head of the queue by waiting for an element to become available, in case the queue is empty.
  • Unlike take(), the peek() method returns head of the queue without removing it, 'E poll(long timeout, TimeUnit unit)' also retrieves and removes elements from head but can wait till specified time if Queue is empty.
  • Since BlockingQueue also extends Collection, bulk Collection operations like addAll(), containsAll() are not performed atomically until any BlockingQueue implementation specifically supports it. So the call to addAll() may fail after inserting a couple of elements.
  • contains(value) method is used to check for the specific elements present in the array blocking queue, if the element is present then it returns true else it returns false.
  • remainingCapacity() method of BlockingQueue, returns the remaining space in BlockingQueue, which can be filled without blocking.
  • offer(E e) method of BlockingQueue insert object into queue if possible and return true if success and false if fail, unlike add(E e) method which throws IllegalStateException if it fails to insert object into BlockingQueue. Use offer() over add() wherever possible.
  • offer(E e, long timeout, TimeUnit unit) will insert the specified element into the queue. In case the queue is full, it will wait up to the specified wait time for the space to become available.
  • int drainTo(Collection < ? super E > c) method will remove all available elements from the queue and add them to the specified collection. i.e it move the contents of the first blocking queue to other blocking queue, making the first array blocking queue empty.
  • Where as int drainTo(Collection < ? super E > c, int maxElements) method will remove at the given number of available elements from the queue and add them into the specified collection.
  • remove() method deletes the elements from the queue from the front i.e. the element which is first inserted into the queue is removed first. It is of void type.
  • boolean remove(Object o) method will remove a single instance of the specified element from the queue only if it is present.
How can we implement a Blocking Queue in Java?

To implement a blocking queue we need to make sure:
  • It is always thread-safe.
  • It can hold arbitrary data
  • Producer has to wait if the queue is already full
  • Consumer has to be wait if no item is present in the queue.
Why and when do we need to use BlockingQueue?

BlockingQueue in Java is considered as the thread-safe collection because it can be helpful in multi-threading operations.

Let's assume, one thread is inserting elements to the queue and another thread is removing elements. If the first thread is slow, then the blocking queue can make the second thread wait until the first thread completes its operation. 

Because of it's behavior we can use BlockingQueue to solve producer consumer problem.

How can we Implement producer consumer using BlockingQueue?

The Producer-Consumer problem is one of the classic problems in the multithreading world. Because of inter-thread communication, it's a bit tricky to implement it. There are many ways to solve this either by using the wait() and notify() method, or by using Semaphore. The simplest way to solve this problem is by using BlockingQueue.

For this we need to create two threads that will simulate producer and consumer. And we will use a shared BlockingQueue, instead of the shared object.

If we use BlockingQueue we don't need to put any thread synchronization code, because put() will block if the queue has reached its capacity and take() method will block if the queue is empty. 

GIT URL: Producer-Consumer using BQueue

Keep learning and growing!

-K Himaanshu Shuklaa..

No comments:

Post a Comment