Article From:https://www.cnblogs.com/longfurcat/p/9906437.html

Preface

  In the previous two blog posts, the execution process of tasks using FutureTask and cancel method implemented by FutureTask are introduced. This article will introduce the results of thread task acquisition.

Series catalogue

  • Unveiling the Mystery of Future: Mission Cancellation
  • Unveiling the Mystery of Future: Task Execution
  • Unveiling the Mystery of Future: Acquisition of Result

Getting Program Running Result by Get Method

  We know that the most important operation using the Future interface is to get the result of the task, and the corresponding method of this operation is get. But the question is, if I call the get method, the task is not finished yet? The answer is, when it’s done, the current thread will be blocked until the task is completed.Completion (Note that completion here refers to the end of a task, which is due to an exception.),getMethod returns. Main thread (Threads that are not performing tasks)It’s awakened and then it continues to run.

 

Flexible get method

  Someone may ask, if I call the get method, the task will take a long time to complete, then my main thread will not waste some time? Yes, if the main thread is busy, then it is really efficient. But there is also a parameter get method, which takes the waiting time as a parameter.If the time is over and the task is not completed, the main thread will continue to execute, and then the task results will be retrieved at a later time. (Of course, if the main thread relies on the result of this task to continue executing, it will have to wait honestly.

FutureTaskBlocking model

  To understand the implementation of get method, we must first understand how it blocked. As mentioned in the previous blog post, FutureTask has WaitNode field waiters, which actually refers to a WaitNode section.The head node of a one-way list of points. As shown in the figure:

waitNodeThe class code is as follows:

static final class WaitNode {
    volatile Thread thread; //thread
    volatile WaitNode next; //Next node//Constructor to get a reference to the current thread of execution
    WaitNode() { thread = Thread.currentThread(); }
}

WaitNodeWhat is the role of retaining thread references?

The answer is to wake up the waiting thread after the task is completed. When FutureTask executes the run method of callable, it notifies all waiting threads that the finishCompletion method is executed.

private void finishCompletion() {
    //Traversing Waiting Nodes
    for (WaitNode q; (q = waiters) != null;) {
        //Set the waiters reference of FutureTask to null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 
            //Wake up all waiting threads
            for (;;) {
                //Remove the thread corresponding to the node
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); //Wake up the corresponding thread
                }
                //Get the next node
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    //Call hook function done, which is an empty method, and subclasses can be implemented according to requirements.
    done();

    callable = null;       
}

Thread Blocking Ways – Park and UnPark

  park/unParkIt is also used to control the waiting state of threads. Familiarly, wait / notify is also used to control the waiting state of threads. Wait / notify is a conditional queue of an object. To block a thread, or join a waiting queue, you must first acquire the lock of the object.

       Unlike wait ()/notify, Park and unpark operate threads directly without acquiring the lock of the object.in my opinionThis is why park/unPark is used here, not wait/notify All, because acquiring locks requires additional overhead.

getThe concrete realization of the method

The following is the implementation of get method in FutureTask

public V get() throws InterruptedException, ExecutionException {
    //Get the current task status
    int s = state;
    //If it's NEW or COMPLETING, that is, it's not over yet, call awaitDone to block.
    if (s <= COMPLETING)
        s = awaitDone(false, 0L); //Note that the parameters here represent non-timeout waiting, and if the task is not finished, the program will be stuck here all the time.//If awaitDone returns, that is, the task has ended, the result is returned according to the status of the task.
    return report(s);
}

The following is the implementation of awaitDone called in the get method

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    //Calculate the end point according to the timeout time
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    //Waiting node
    WaitNode q = null;
    //Whether to join the waiting queue
    boolean queued = false;
    //It's not spinning that prevents the method from returning. Instead, spin CAS is used to change the state. If it succeeds, once is enough.
    for (;;) {
        //If this thread is interrupted, remove the slave node from the waiting queue
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        //If the state is greater than COMPLETING, that is to say, the task has ended and the task state is returned.
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //The first loop, q is null, creates nodes
        else if (q == null)
            q = new WaitNode();
        //If you haven't joined the waiting queue, join
        else if (!queued)
            //q.next = waiters The return value of the expression is the value on the left, that is, waiters.//This means that if the waiters of the current object are waiters, it is assigned Q.
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q); 
        //If it is a timeout wait, parkNanos is called, and the thread will be awakened after a specified time
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        //If you do not wait overtime and have joined the waiting queue, then use park to suspend the current thread
        else
            LockSupport.park(this);
    }
}

A lot of people may think that this cycle is a little confused, and I started to look big. But we can look at the execution of code in several scenarios.

Note: In the second for loop, the second if-else block is a large block, only one at a time.

Several Implementation Situations

1. The current thread successfully joins the waiting queue and is blocked. After a period of time, the task is completed and the thread is awakened.

 

 

2. After the current thread joins the queue, the task is completed before it is blocked.

3. Because of the influence of other threads joining the waiting queue, the current thread can not join the waiting queue.

Here we show that if other threads join the waiting queue before this thread, because of the memory visibility, the waiters value seen by the current thread does not change in time, so unlike its actual value, CAS operation will fail.

Why must CAS succeed?The answer is that if it doesn’t succeed, there will be thread security problems, and the structure of the linked list will be confused. I won’t go into details here.

 

Obtain results based on task status

   We already know that FutureTask has an Object field outcome, which is the result of task execution. When the task is completed, the result is assigned to it. The following is FutureTask’s run method:

public void run() {
    //When the task starts to execute, set the runner field of FutureTask to indicate the thread that executes it.
    if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
        return;
    try {
        //Acquire specific tasks
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran; //Has the task been run out?
            try {
                //Operation task
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                //If an exception occurs during the run of the task, ran = false indicates that the run is not complete
                ran = false;
                //Set exception=> set task status to exception and assign exception information to outcome, that is, task result//This method calls finishCompletion
                setException(ex);
            }
            //If the run is complete, assign the result to the result
            if (ran)
                set(result); //This method calls finishCompletion
        }
    } finally {
        //Now that the thread has "completed" the current task, it abandons the reference to prevent it from performing other tasks.
        runner = null; 
        //Retrieving task status
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

As we can see from the previous article, when the task “completes”, the thread that obtains the result will be awakened. Back to the get method, it gets the state of the task and gets the result according to the state of the task. That is, the report method:

private V report(int s) throws ExecutionException {
    //Get results
    Object x = outcome;
    //If the task is completed properly
    if (s == NORMAL)
        //Mandatory conversion to the corresponding type and return
        return (V)x;
    //If the task status is CANCELLED, INTERRUPTING, INTERRUPTED, it is cancelled through the cacel method.//Returns cancelled exceptions
    if (s >= CANCELLED)
        throw new CancellationException();
    //If it is due to an exception interrupt, throw specific exception information
    throw new ExecutionException((Throwable)x);
}

 

Leave a Reply

Your email address will not be published. Required fields are marked *