Article From:https://www.cnblogs.com/huangjuncong/p/9218194.html

JDK LinkedBlockingQueue principle analysis based on the chain table blocking queue, LinkedBlockingQueue inside is how to use two exclusive locks ReentrantLock and corresponding conditional variables to ensure that multithreading first enters the queueIs the thread out of the team operating safe? Why not use one lock and use two?

LinkedBlockingQueueThe implementation is the blocking queue implemented by exclusive locks. First, look at the class diagram structure of LinkedBlockingQueue, as shown in the following figure:

 

 

 

As shown in the class diagram, LinkedBlockingQueue is implemented using a one-way chain table with two Node respectively to store the first and end nodes, and there is an atomic variable count with an initial value of 0, which is used to record the number of the queue elements.

In addition, there are two instances of ReentrantLock, which are used to control the atomicity of the element entering and out of the team, in which takeLock is used to control and only one thread can get the elements from the queue, and the other threads must wait,

putLockControl at the same time only one thread can get locks to add elements, other threads must wait. In addition, notEmpty and notFull are semaphore, and there is a conditional queue inside to store the thread blocked when entering and leaving the team.

To put it plainly, this is actually a producer consumer model.

 

Let’s look at the source code of the exclusive lock first.

  /** You need to get the lock when you perform take, poll, and other operations*/
    private final ReentrantLock takeLock = new ReentrantLock();

    /** When the queue is empty, the thread that performs team operations (such as take) will be placed in this conditional queue to wait.*/
    private final Condition notEmpty = takeLock.newCondition();

    /** You need to get the lock when you perform put, offer, and other operations*/
    private final ReentrantLock putLock = new ReentrantLock();

    /**When the queue is full, the thread executing the queue operation (such as put) will be placed in this conditional queue to wait.*/
    private final Condition notFull = putLock.newCondition();

   /** The number of current queue elements*/
    private final AtomicInteger count = new AtomicInteger(0);

 

Then we will enter the LinkedBlockingQueue no reference constructor, which is as follows:

public static final int   MAX_VALUE = 0x7fffffff;

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

  public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //Initialize the head and tail node, point to the sentinel node
    last = head = new Node<E>(null);
 }

As you can see from the source code, the capacity of the default queue is 0x7fffffff; the user can also specify its own capacity, so to some extent, LinkedBlockingQueue can be said to be a bounded blocking queue.

 

Next we will mainly look at the source code of several main methods of LinkedBlockingQueue:

  1.offerOperation, insert an element to the tail of the queue, if the queue has free capacity, the insert is inserted to return to true, if the queue is full then discards the current element and returns to false, and if the E element is null, the empty pointer exception is thrown (NullPointerExceptioN) another point is that the method is non blocking. The source code is as follows:

public boolean offer(E e) {

        //(1)Abnormity of empty elements with empty pointers
        if (e == null) throw new NullPointerException();

        //(2) If the current queue is full, discard the elements to be put in and return to false.
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;

        //(3) Build a new node to get the putLock exclusive lock
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //(4)If the queue is dissatisfied, it enters the queue and increments the element count.
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                //(5)
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            //(6)Release lock
            putLock.unlock();
        }
        //(7)
        if (c == 0)
            signalNotEmpty();
        //(8)
        return c >= 0;
}

private void enqueue(Node<E> node) {   
 last = last.next = node;
}

The code (2) determines that if the current queue is full, discard the current element and return to false.

The code (3) gets the putLock lock, and the thread that the current thread gets to the lock will be blocked by other put and offer threads (the blocked thread is placed in the AQS blocking queue of the putLock lock).

The code (4) is here to rejudge whether the current queue is full, because in the execution of code (2) and the acquisition of putLock locks, it is possible that other threads want to add new elements inside the queue through the put or offer method. Rejudging the queue is really dissatisfied with the new dollarThe element is in the team and the counter is incremented.

The code (5) determines that if the new element has free space after entering the queue, the wake-up notFull’s conditional queue is blocked because the queue now has a thread that is blocked because it calls the await operation of the notFull (such as the queue full of the put method).Idle, so you can wake up an entry queue in advance.

The code (6) releases the acquired putLock lock, and it is important to note that the release of the lock must be done in the finally, because even if the try block is thrown out of the exception, the finally will be executed. In addition, other locks are blocked after calling the put and offer.The thread will have a gain to change lock.

The code (7) C = = 0 shows that there is at least one element in the queue when the lock is released in the execution code (6), and the element in the queue executes the signalNotEmpty, and the source code of the signalNotEmpty is as follows:

private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

It can be seen from the above code that the function is a thread that is blocked because of calling the notEmpty’s await method (such as calling the take method and the queue is empty) in the conditional queue that activates the notEmpty, which also illustrates the method of calling the conditional variable.To get the corresponding lock first.

offerSummary: in the offer method, the putLock lock is used to ensure the atomicity and the number of queue elements and the atomicity of the incremental operation.

 

  2.putOperation, inserting an element to the tail of the queue, if the queue has idle, it returns directly to true after the insertion of the queue, if the queue is full then the current thread knows that the queue has free insertion and returns to true after the queue has been free. If the queue is blocked, the interrupt flag is set by other threads,

The blocked thread throws the InterruptedException exception and returns the NullPointerException exception if the E element is null. The source code is as follows:

  public void put(E e) throws InterruptedException {
        //(1)Abnormity of empty elements with empty pointers
        if (e == null) throw new NullPointerException();
        //(2) Build a new node and get the exclusive lock putLock
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            //(3)If the queue is full, wait
            while (count.get() == capacity) {
                notFull.await();
            }
            //(4)Queuing and incrementing count
            enqueue(node);
            c = count.getAndIncrement();
            //(5)
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //(6)
            putLock.unlock();
        }
        //(7)
        if (c == 0)
            signalNotEmpty();
    }

The code (2) uses putLock.lockInterruptibly () to obtain an exclusive lock, which means that the acquisition of the exclusive lock method in the offer method can be interrupted, specifically the current thread is in the process of getting the lock, if it is set by another thread.The interrupt flag will throw the InterruptedException exception at the current thread.

Therefore, the put operation can be interrupted during the process of obtaining the lock.

Code (3) if the current queue is full, the await () of notFull () puts the current thread into the conditional queue of the notFull, the current thread is blocked and suspended and the obtained putLock lock is released, because the putLock lock is released, soNow other threads have the chance to get the putLock lock.

The code (3) determines whether the queue is empty. Why use the while loop instead of the if statement?

This is because, considering the problem of the current thread being false wake-up, that is, when other threads do not call notFull’s singal method, notFull.await () will return automatically in some case.

If you use the if statement to make a simple judgment, then the false wake-up will execute code (4), the element is in the queue, and the counter is incremented, and the queue is already full, causing the number of queue elements to be larger than the capacity of the queue setting, causing the program to go wrong.

With the use of the while loop, if notFull.await () is wake-up, the loop checks if the current queue is full and if it is waiting again.

 

  3.pollOperation, gets and removes an element from the queue header, and returns NULL if the queue is empty. This method is not blocked. The source code is as follows:

  public E poll() {
        //(1)The queue is empty and returns null
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        //(2)Obtain an exclusive lock
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //(3)The queue is not empty and the queue and decrement count
            if (count.get() > 0) {//3.1
                x = dequeue();//3.2
                c = count.getAndDecrement();//3.3
                //(4)
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            //(5)
            takeLock.unlock();
        }
        //(6)
        if (c == capacity)
            signalNotFull();
        //(7)Return
        return x;
    }
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

The code (1) returns directly to null if the current queue is empty.

The code (2) gets the exclusive lock takeLock. After the current thread gets the lock, the other thread will be blocked by calling poll or take.

Code (3) if the current queue is not empty, then the queue operation is performed, and then the counter is decreed.

Code (4) if c> 1 shows that the queue is not empty after the current thread removes an element inside the queue (C is the number of queue elements before the delete element), then this time can be activated because the poll or take method is called to be blocked to notEmptyA thread in the condition queue.

The code (5) releases the lock and must release the lock in the finally.

The code (6) shows that the current thread is full before the current thread removes the team head element. After removing the team header element, the queue has at least one free position, then the signalNotFull activation can be called because the call put or offer is blocked and put into the notFUll conditional queue of a thread, signalNotFull source code is as follows:

  private void signalNotFull() {
          final ReentrantLock putLock = this.putLock;
          putLock.lock();
          try {
              notFull.signal();
          } finally {
              putLock.unlock();
          }
   }

poll Code logic is relatively simple, and it is worth noting that only the head nodes of the queue are operated when obtaining elements.

 

  4.peek Operation, get queue header elements, but do not remove from the queue, if the queue is empty, then return to null, this method is not blocked. The source code is as follows:

  public E peek() {
        //(1)
        if (count.get() == 0)
            return null;
        //(2)
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            //(3)
            if (first == null)
                return null;
            else
            //(4)
                return first.item;
        } finally {
           //(5)
            takeLock.unlock();
        }
    }

You can see the code (3) here still need to determine whether the first is null, and can not directly execute the code (4).

In normal cases, the code (2) indicates that the queue is not empty, but the code (1) and (2) are not atomic, that is, after the execution code (1) determines that the queue is not empty,

Before the lock is obtained in code (2), it is possible that other threads perform poll or take operations that cause the queue to be empty, and then the current thread gets the lock, and the direct execution of first.item will throw a null pointer exception.

 

  5.take Operation, gets the current queue header element and removes it from the queue, blocking the calling thread if the queue is empty. If the queue is empty, blocking the current thread knows that the queue is not empty, then returns the element, if the interrupt flag is set by other threads when the block is blocked, the blocked thread will throw the InterrUptedException is abnormal and returns. The source code is as follows:

  public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        //(1)Get a lock
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //(2)The current queue is empty and blocked and hung up
            while (count.get() == 0) {
                notEmpty.await();
            }
            //(3)Out of team and decrement count
            x = dequeue();
            c = count.getAndDecrement();
            //(4)
            if (c > 1)
                notEmpty.signal();
        } finally {
           //(5)
            takeLock.unlock();
        }
        //(6)
        if (c == capacity)
            signalNotFull();
        //(7)
        return x;
    }

Code (1) the current thread gets the exclusive lock, and other threads calling take or poll will be blocked.

Code (2) if the queue is empty, the blocking suspends the current thread and puts the current thread into the conditional queue of notEmpty.

The code (3) performs the team operation and reduces the count.

Code (4) if C > 1 indicates that the current queue is not empty, then a thread that is blocked in the conditional queue of the notEmpty’s conditional queue is blocked by a call to take or poll.

Code (5) release the lock.

Code (6) if C = = = capacity indicates that the current queue has at least one free position, it activates a thread that is blocked in the conditional queue of the conditional variable notFull because of calling put or offer.

 

  6.removeOperation, delete the specified elements in the queue, delete true and do not return to false. The source code is as follows:

public boolean remove(Object o) {
    if (o == null) return false;

    //(1)Double lock
    fullyLock();
    try {

        //(2)Ergodic queue search is deleted and returned to true
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
             //(3)
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        //(4)Can't find a return to false
        return false;
    } finally {
        //(5)Unlock
        fullyUnlock();
    }
}

The code (1) obtains double locks through fullyLock. After the current thread is fetching, the other threads will be blocked up to join or leave the queue. The source code of the double lock method fullyLock is as follows:

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

The code (2) traverses the queue to find the elements to be deleted. If it is not found, it will go back to false directly and find the unlink operation. The source code of unlink is as follows:

  void unlink(Node<E> p, Node<E> trail) {
      p.item = null;
      trail.next = p.next;
      if (last == p)
          last = trail;
      If the current queue is full, after deletion, do not forget to wake up the waiting thread.if (count.getAndDecrement() == capacity)
          notFull.signal();
    }

If you can see the deletion of the element, if you find that the current queue has free space, a thread that wakes up in the conditional queue of the notFull is blocked because of calling the put or the offer method.

The code (5) calls the fullyUnlock method to release double locks in the order opposite to the lock sequence. The source code is as follows:

void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

 

 

  7.sizeOperation to get the number of current queue elements. The source code is as follows:

public int size() {
   return count.get();
}

 

 

Conclusion: the size method is more accurate than that of ConcurrentLinkedQueue because there is a lock when Count is deployed.

Finally, a diagram is used to deepen the understanding of LinkedBlockingQueue.

 

 

So we have to consider a question: why does ConcurrentLinkedQueue need to traverse the linked list to get size instead of an atomic variable?

This is because the use of the atomic variable to save the number of queue elements needs to ensure that the team operation and the operation of the atomic variable are atomic operations, and the ConcurrentLinkedQueue is using the CAS unlocking algorithm, so it can not be done.

Leave a Reply

Your email address will not be published. Required fields are marked *