Article From:https://www.cnblogs.com/qiaoqianshitou/p/9968891.html

Catalog

  • Two Ways to Open Threads
  • Second, the difference between opening multiple threads in one process and opening multiple sub-processes in one process
  • Three practice
  • Other Four Thread Related Approaches
  • Five daemon threads
  • Six Python GIL (Global Interpreter Lock)
  • Seven Synchronization Locks
  • Eight Deadlocks and Recursive Locks
  • 9 semaphore Semaphore
  • Ten Event
  • 11 Conditions Conditions (Understanding)
  • Twelve Timers
  • Thirteen threads queue
  • 14 Python Standard Module–concurrent. futures

Two Ways to Open Threads

#One wayFrom threading import ThreadImport timeDef sayhi (name):Time. sleep (2)Print ('% s say he)Llo'%name)If u name_=='u main_':T = Thread (target = sayhi, args =('egon',))T.start ()Print ('main thread')The way of twoFrom threading import ThreadImport timeClass Sayhi (Thread):Def_u init_ (Self, name):Sup(). u init_()Self. name = nameDef run (self):Time.sleep (2)Print ('% s say hello'% self. name)If u name_=='u main_':T = Sayhi ('egon')T.start ()Print ('main thread')

Second, the difference between opening multiple threads in one process and opening multiple sub-processes in one process

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('Master', n) 6550N=1T = Thread (target = work)T.staRT ()T.join ()Print ('master', n)  View result is 0, because the data in the process is shared between threads in the same process

Three practice

Exercise 1:

#Server sideImport multiprocessingImport threadingImport socketS = socket. socket (socket. AF_INET, socket. SOC)K_STREAM)S. bind ('127.0.0.1', 8080))S.listen (5)Def action (conn):While True:Data=conN.recv (1024)Print (data)Conn. send (data. upper ())If u name_=='u main_':WhIle True:Conn, addr = S. accept ()P = threading. Thread (target = action, args = (conn,))P.start ()Client clientImport socketS = socket. socket (socket. AF_INET, socket. SOCK_STREAM)S. connect (('1)27.0.0.1', 8080)While True:MSG = input ('& gt; & gt;:'). strip ()If not msg:continueS.SenD (msg. encode ('utf-8'))Data = s. recv (1024)Print (data)

Exercise 2: Three tasks, one receiving user input, one formatting user input into capitals, and one storing formatted results into files

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()

Other Four Thread Related Approaches

ThreadMethod of Instance Object# isAlive (): Returns whether the thread is active.# getName (): Returns the thread name.# setName (): Set the thread name.Some methods provided by threading module:# threading. currentThread (): Returns the current thread variable.# threading. enumerate (): Returns a list of running threads. Running refers to the start and end of a threadPreviously, threads before and after startup are not included.# threading. activeCount (): Returns the number of threads running with the same results as len (threading. enumerate ()).FromThreading import ThreadImport threadingFrom multiprocessing import processImport OSDef work ():Import timeTime. sleep (3)Print (threading. current_thread(). getName ())If u name_== '__main__':# Open threads under the main processT = Thread (target = work)T.start ()Print (threading. current_threa)D (). getName ())Print (threading. current_thread() main threadPrint (threading. enumerate ()# with the main threadTwo running threadsPrint (threading. active_count())Print ('main thread / main process')''Print results:MainThreadGt; _MainThread (MainThread, started 140735268892672) & gt;[<_MainThread (MainThread, started)140735268892672) & gt;,& lt; Thread (Thread-1, started 123145307557888) & gt;]Main thread/main processThread-1''

The main thread waits for the end of the sub-thread

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('aa',))
    t.start()
    t.join()
    print('Main thread ')Print (t.is_alive())

Five daemon threads

Both processes and threads follow the rule that the daemon XXX waits for the main XXX to be destroyed after it has finished running

It should be emphasized that running is not terminated.

#1.For the main process, run-through refers to the completion of the main process code.# 2. For the main thread, run-through refers to all non-daemon threads in the process where the main thread is running, and the main thread is running-out.

Explain in detail:

#1 The main process runs after the end of its code (the daemon is reclaimed at this time), and then the main process will wait until all the non-daemon sub-processes have run to reclaim the resources of the sub-process (otherwise a zombie process will occur) before it ends.# 2 The main thread runs through other non-daemon threadsThe daemon thread is then reclaimed. Because the end of the main thread means the end of the process, the overall resources of the process will be recycled, and the process must ensure that all non-daemon threads run before it can end.
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('aa',))
    t.setDaemon(True) #You must set it before t. start ()T.start ()Print ('main thread')Print (t.is_alive())

Six Python GIL (Global Interpreter Lock)

The first thing to be clear is that GIL is not a feature of Python, but a concept introduced when implementing a Python parser (CPython). It’s like C++ is a set of language (grammar) standards, but it can be compiled into executable code with different compilers. FamousCompilers such as GCC, INTEL C++, Visual C++, etc. The same is true for Python, where the same piece of code can be executed through different Python execution environments such as CPython, PyPy, Psyco, etc. Like JPython in it.No GIL. However, because CPython is the default Python execution environment in most environments. So in many people’s concepts, CPython is Python, which naturally attributes GIL to the flaw of Python language. So let’s make it clear that:GIL is not a feature of Python. Python can be completely independent of GIL.

GILThe essence is a mutex. Since it is mutex, the essence of all mutexes is the same. It turns concurrent operation into serial operation, so as to control that the shared data can only be modified by one task at the same time, thus ensuring data security.

One thing is certain: to protect different data security, different locks should be added.

To understand GIL, first make sure that every time you execute a python program, you have a separate process. For example, Python test.py, Python aaa.py, Python bbb.py generate three different Python processes

In a python process, there are not only the main thread of test.py or other threads opened by the main thread, but also interpreter-level threads such as garbage collection opened by the interpreter. In short, all threads run in this process, no doubt.

#1 All data is shared, in which the code as a data is shared by all threads (all the code for test. py and all the code for the Cpython interpreter)For example, test. py defines a function work (the code is shown below), all of which are in the process.Threads can access the work code, so we can open three threads and target all points to the code, access means that it can be executed.#2 All threaded tasks need to be executed by passing the task code as a parameter to the interpreter code, that is, all threads.To run your own tasks, the first thing you need to do is to be able to access the interpreter's code.

Sum up:

If target = work for multiple threads, the execution process is

Multiple threads first access the interpreter’s code, that is, get execution privileges, and then give the target’s code to the interpreter’s code to execute.

The code of the interpreter is shared by all threads, so the garbage collection thread may also access the code of the interpreter to execute, which leads to a problem: for the same data 100, thread 1 may execute x = 100 at the same time, while garbage collection executes the operation of recycling 100 to solve this problem.There’s no clever way to solve this problem. It’s lock processing. The following GIL guarantees that the Python interpreter can only execute one task at a time.

GILInterpreter-level data is protected, while user-level data needs to be locked, as shown in the following figure

With GIL, only one thread is executed in the same process at the same time.

Hearing this, some students immediately asked: the process can use multi-core, but the overhead is large, while python’s multi-threading overhead is small, but it can not take advantage of multi-core advantages, that is to say, Python is useless, PHP is the most powerful language?

Don’t worry. The old lady hasn’t finished yet.

To solve this problem, we need to agree on several points:

#1. cpuIs it for computing or I/O?Multi-cpu means that multiple cores can be used to perform computing in parallel, so multi-core improves computing performance#3. Once each CPU encounters I/O blockage, it still needs to wait, so multi-core I/O operation is nothing.Usefulness

A worker is equal to a cpu, at this time the calculation is equivalent to the worker working, I/O blockage is equivalent to the process of supplying the raw materials needed for the worker’s work. If there is no raw materials in the process of the worker’s work, the worker’s working process needs to stop until the arrival of raw materials.

If most of the tasks you do in your factory are in the process of preparing raw materials (I/O-intensive), then no matter how many workers you have, it doesn’t make much sense. It’s better for you to let the workers do other jobs while waiting for the materials.

Conversely, if your factory has all the raw materials, of course, the more workers you have, the more efficient you are.

Conclusion:
For computing, the more CPUs the better, but for I/O, no more CPUs are useful.
Of course, for running a program, with the increase of the CPU, the execution efficiency will certainly improve (no matter how much the improvement is, it will always improve). This is because a program is basically not pure computing or pure I/O, so we can only see a program is relatively computationally intensive or I/O.O-intensive, so as to further analyze the usefulness of python’s multi-threading.

#Analysis:We have four tasks to deal with. The way to deal with them is to play concurrent effects. The solution can be:Solution 1: Open four processesSolution 2: Open four threads in one processIn the case of # single core, the analysis results are as follows:If the four tasks are computationally intensive, noParallel computing with multiple cores adds to the overhead of the creation process, and the scheme wins twice.If the four tasks are I/O-intensive, the overhead of scheme 1 creating process is large, and the switching speed of process is far less than that of thread, scheme 2 wins.In the case of # multi-core, the analysis results are as follows:If the four tasks are computingIntensive, multi-core means parallel computing. In python, only one thread can execute at the same time without more than one core, and the solution wins.If the four tasks are I/O-intensive, no more cores can solve the I/O problem, and the solution is two wins.65507It is multi-core, Python for computing-intensive task multi-threading efficiency can not bring much performance improvement, or even worse than serial (without a lot of switching), but for IO-intensive task efficiency is still significantly improved.

Seven Synchronization Locks

Three points need to be noted:

#1.Threads grab GIL locks. GIL locks are equivalent to execution privileges. Only when they get execution privileges, can they get mutex locks. Other threads can grab GIL, but if they find that Locks are still not released, they will block. Even if they get execution privileges, GIL should be handed over immediately.#2.joIn is waiting for all, i.e. the whole serial, while the lock only locks the part that modifies the shared data, i.e. the part serial. To ensure data security, the fundamental principle is to make concurrency become serial. Join and mutex can be realized. There is no doubt that the partial serial efficiency of mutex is higher.# 3. Be sure toLook at the classical analysis of GIL and mutex at the end of this section

GIL VS Lock:
A witty classmate may ask this question, because as you said before, Python already has a GIL to ensure that only one thread can execute at the same time. Why do we need a lock here?

First, we need to reach a consensus that the purpose of locks is to protect shared data, and that only one thread can modify shared data at the same time.

Then we can conclude that different locks should be added to protect different data.

Finally, the problem is clear. GIL and Lock are two locks, protecting data differently. The former is interpreter level (of course, protecting interpreter level data, such as garbage collection data), and the latter is protecting the data of applications developed by users themselves. It is obvious that GIL is not responsible for it.This can only be handled by user-defined locking, i.e. Lock.

Process analysis: All threads grab GIL locks, or execution privileges.
Thread 1 grabs the GIL lock, gets the execution privilege, starts execution, and then adds a Lock, which has not yet been executed. Thread 1 has not released the Lock. Thread 2 may grab the GIL lock and start execution. During execution, it is found that the Lock has not been released by Thread 1, so Thread 2 enters.Blocked, deprived of execution privileges, it is possible that thread 1 gets GIL, and then executes normally until Lock is released… This leads to the effect of serial operation.

Since it’s serial, let’s execute:

  t1.start()

  t1.join

  t2.start()

  t2.join()

This is also serial execution. Why add Lock? Join is waiting for all the code of T1 to be executed, which is equivalent to locking all the code of t1, while Lock only locks part of the code that operates and shares data.
Explain in detail:

Because the Python interpreter helps you reclaim memory automatically and regularly, you can understand that there is a separate thread in the python interpreter. Every time it wakes up, it makes a global poll to see which memory data can be emptied. At this time, the thread in your own program.If your thread deletes a variable, the clearing time of the garbage collection thread of the PY interpreter in clearing this variable may be that another thread just reassigns the memory space that has not yet been cleared.As a result, it is possible to delete the newly assigned data. To solve similar problems, the Python interpreter simply and roughly locks, that is, when a thread runs, no one else can move. This solves the above problems, which can be said to be a legacy of the early version of Python.
from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

Locks are often used to achieve synchronous access to shared resources. Create a Lock object for each shared resource. When you need to access the resource, call the acquire method to get the lock object (if the lock has been acquired by other threads, the current thread needs to wait for it to be released) and wait for resource access.After that, the release method is called to release the lock:

import threading

R=threading.Lock()

R.acquire()
'''
Operation of public data''R.release ()
from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #The result must be 0, from concurrent execution to serial execution, which guarantees data security at the expense of execution efficiency.Analysis:#1.100 threads to grab GIL locks, that is, to grab execution privileges# 2. There must be a thread that grabs the GIL first (temporarily referred to as thread 1), and thenStart execution and get lock. acquire ()It is very likely that another thread 2 grabs the GIL and starts running before thread 1 runs out, but thread 2 finds that the mutex lock has not been released by thread 1 and blocked.Forced to surrender execution authority, i.e. release GIL# 4. Until thread 1 grabs the GIL again, execution continues from the last pause until the mutex lock is released properly, and then the other threads repeat the 234 process.No Lock: Concurrent Execution, Fast SpeedData is not secureFrom threading import current_thread, Thread, LockImport os, timeDef task ():Global nPrint ('% s is running'% current_thread (). getName ())Temp=nTime. sleep (0.5)N=temp-1If_u name_=='u main_':N=100Lock=Lock ()Threads=[]Start_time = time. time ()ForI in range (100):T = Thread (target = task)Threads. append (t)T.start ()For tIn threads:T.join ()Stop_time = time. time ()Print ('master:% s n:% s') (stop_time-start_tim)E, n)''Thread-1 is runningThread-2 is running...Thread-100 is runningMaster: 0.5216062068939209N:99''# unlocked: unlocked part is executed concurrently, unlocked part is executed serially, slow speed, data securityFrom threading import current_thread, Thread, LockImportOS, timeDef task (): Unlocked code running concurrentlyTime. sleep (3)Print ('% s start to run'% current_thread ().getName ())Global nSerial Running of Locked CodeLock. acquire ()Temp=nTime. sleep (0.5)N=temp-1Lock. release ()If u name_=='u main_':N=100Lock=Lock ()Threads=[]Start_time = time. time ()For I in range (100):T = Thread (target = task)Threads. append (t)T.start ()For t in threads:T.join ()Stop_time = time. time ()Print ('main:% s n:% s'% (stop_time-start_time, n))''Thread-1 is runningThread-2 is running...Thread-100 is runNingMain: 53.294203758239746 n:0''Some students may have doubts: since locking makes running serial, then I use join immediately after start, so I don't need to lock, it's also the effect of serial.Yes, yes.Using jion immediately after start will certainly turn the execution of 100 tasks into serial. There is no doubt that the end result of n will be zero, which is safe, but the problem isJoin immediately after # start: All the code in the task is executed serially, while locking is just lockingThe part that modifies the shared data is serial.Both can be achieved solely in terms of data security, but it is obvious that locking is more efficient.From threading import current_thread, Thread, LockImport os, timeDef task ():Time. sleep (3)Print ('% s start to run'% current_thread (). getName ()Global nTemp=nTime. sleep (0.5)N=temp-1If u name_=='u main_':N=100Lock=Lock ()Start_time = time. time ()For I in range (100):T = Thread (target = task)T.start ()T.join ()Stop_time = time. time ()Print ('master:% s n:% s'% (stop_time-start_ti)Me, n)''Thread-1 start to runThread-2 start to run...Thread-100 start to runMain: 350.6937336921692 n:0# How horrible is the time-consuming

Eight Deadlocks and Recursive Locks

Processes also have deadlocks and recursive locks, where the process forgot to say, put all the words here.

The so-called deadlock: refers to two or more processes or threads in the execution process, due to competition for resources caused by a phenomenon of waiting for each other, if there is no external force, they will not be able to promote. In this case, the system is said to be in a deadlock state or the system has a deadlock. These processes that are always waiting for each other are called death.Lock processes, such as deadlocks

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s Get lock A_ [0m'% self. name)MutexB. acquire ()Print ('[42m%s) gets the B lock [0m'% self.name)MutexB. release ()MutexA. release ()Def func2 (self):MutexB. acquire ()Print ('[43m%s gets B lock[0m'%self.name)Time. sleep (2)MutexA. acquire ()Print ('[44m% s gets lock A[0m'% self. name)MutexA. release ()MutexB. release ()If u name_=='u main_':For I in range (10):T=MyThread ()T.start ()''ThrEad-1 Gets Lock AThread-1 gets the B lockThread-1 gets the B lockThread-2 Gets Lock AThen it gets stuck, deadlocked.''

The solution is recursive lock. In Python, in order to support multiple requests for the same resource in the same thread, Python provides a reentrant lock RLock.

The RLock maintains a Lock and a counter variable internally, which records the number of acquisitions so that resources can be required multiple times. Until all acquires in one thread are released, the othersThreads get resources. In the example above, if RLock is used instead of Lock, no deadlock will occur:

mutexA=mutexB=threading.RLock() #When a thread gets a lock and counter adds 1, when it encounters another lock, counter continues to add 1, during which all other threads can only wait for the thread to release all locks, that is, counter decreases to zero.

9 semaphore Semaphore

The same as the process

SemaphoreManage a built-in counter.
Built-in counter-1 whenever acquire () is called;
Built-in counter + 1 when release () is called;
The counter must not be less than 0; when the counter is 0, acquire () blocks the thread until release () is called by other threads.

Example: (At the same time, only five threads can obtain semaphore, which can limit the maximum number of connections to 5):

from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

Unlike process pool, process pool Pool (4) can produce only four processes at most, and it is only these four processes from beginning to end. It does not generate new ones, but semaphores generate a bunch of threads/processes.

Ten Event

The same as the process

A key feature of threads is that each thread runs independently and its state is unpredictable. If other threads in the program need to determine their next operation by judging the state of a thread, the thread synchronization problem becomes very difficult. To solve these problems, we need to use threEvent objects in the ading library. Object contains a signal flag that can be set by a thread, which allows the thread to wait for certain events to occur. Initially, the signal flag in the Event object is set to false. If there is a thread waiting for an Event object, and this EvIf the flag of the ENT object is false, the thread will be blocked until the flag is true. If a thread sets the signal flag of an Event object to true, it will wake up all threads waiting for the Event object. If a thread waits for an Eve that has been set to trueNT object, then it will ignore the event and continue executing.

event.isSet():Returns the state value of event;Event. wait (): If event. isSet () == False will block the thread;Event. set (): Set the state value of event to True, and all blocking pool threads activate to enterReady state, waiting for operating system scheduling;Event. clear (): Restore event's status value to False.


For example, there are several worker threads trying to link MySQL. We want to make sure that MySQL services are normal before linking, so that those worker threads can connect to MySQL servers. If the connection is unsuccessful, they will try to reconnect. Then we can use threading.T mechanism to coordinate the connection operation of each worker thread.

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('Link timeout')Print ('& lt;% S & gt; second attempt link'%) (threading. current_thread (). getName (), count)EVent. wait (0.5)Count+=1Print ('& lt;% S & gt; link success'% threading. current_thread (). getName ())Def check_mysql():Print ('[45m [% s] is checking mysql[0m'% threading. current_thread (). getName ())Time. sleep (random. randInt (2,4))Event.set ()If u name_=='u main_':Event = Event ()Conn1 = Thread (target = conn_mysql)Conn2 = Thread (target = conn_mysql)Check = Thread (target = check_m)Ysql)Conn1. start ()Conn2. start ()Check. start ()

11 Conditions Condition

Make threads wait and release n threads only when certain conditions are met

import threading
 
def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" %n)
    con.release()
 
if __name__ == '__main__':
 
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
 
    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
----------------------------------------
def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

Twelve Timers

Timer to perform an operation after specifying N seconds

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed
------------------------------------------------------------------------------------
from threading import Timer
import random,time

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache=self.make_code()
        print(self.cache)
        self.t=Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res=''
        for i in range(n):
            s1=str(random.randint(0,9))
            s2=chr(random.randint(65,90))
            res+=random.choice([s1,s2])
        return res

    def check(self):
        while True:
            inp=input('>>: ').strip()
            if inp.upper() ==  self.cache:
                print('Verification success', end='\n')Self. T. cancel ()BreakIf u name_=='u main_':Obj=Code ()Obj.check ()

Thirteen threads queue

queueQueue: Import queue is used in the same way as process Queue

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #FIFO

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

Results (FIFO):FirstSecondThird

class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

Results (FIFO):ThirdSecondFirst

class queue.PriorityQueue(maxsize=0) #Queues with priority when storing data

import queue

q=queue.PriorityQueue()
#putEntering a tuple, the first element of the tuple is priority (usually a comparison between numbers or non-numbers). The smaller the number, the higher the priority.Q. put ((20,'a'))Q. put ((10,'b'))Q. put ((30,'c'))PRint (q. get ())Print (q. get ())Print (q. get ())Result (The smaller the number, the higher the priority, the higher the priority to queue):(10,'b')(20,'a')(30,'c')

14 Python Standard Module–concurrent. futures

#1 introduceThe concurrent. futures module provides a highly encapsulated asynchronous call interfaceThreadPoolExecutor: Thread pool, providing asynchronous callsProcessPoolExecutor: Process pool, providing asynchronous callsBoth implement the same interface, which is defined by the abstract Executor class.Basic Method of #2 uuuuuuuuuuuu#submit(fn,* args,** kwargs)Asynchronous submission of tasks# map (func, * iterables, timeout = None, chunksize = 1)Replacement of the for loop submit operationWeiShutdown (wait = True)Pool. close () +pool. join () operation equivalent to process poolWait = True, wait for all tasks in the pool to complete the recovery of resources before continuingWait = False, immediatelyReturn and do not wait for tasks in the pool to completeWhatever the wait parameter is, the whole program waits until all tasks are completed.Submit and map must be before shutdown# result (timeout = None)Achieve results# add_done_callback (fn)

ProcessPoolExecutor

#usageFrom concurrent. futures import ThreadPool Executor, ProcessPool ExecutorImport os, time, randomDefTask (n):Print ('% s is runing'% os. getpid ())Time. sleep (random. randInt (1,3))Return n**2If u name_=='u main_':Executor = ProcessPool Executor (max_workers = 3)Futures=[]ForI in range (11):Future = executor. submit (task, i)Futures. append (future)Executor.Shutdown (True)Print ('++ & gt;')For future in futures:Print (future. result ())

map

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #mapInstead of for+submit

callback

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<Process% s> get% s'% (os. getpid (), url)Response = requests. get (url)If respone. status_code = 200:Return {url': url,'text': response. text}Def parse_page (res):Res = res. result ()PrinT ('& lt; process% S & gt; parse% s') (os. getpid (), res ['url'])Parse_re s='url:<% s> size:[% s] n'% (re)S ['url'], len (res ['text'])With open ('db.txt','a') as f:F. write (parse_res)If __name__=='u main_':Urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']P=Pool (3)# for URL in urls:# P. apply_async (get_page, args= (url,), callbacK = pasrse_page)P.close ()P.join ()P = ProcessPool Executor (3)For URL in urls:P. submit (get_page, url). add_done_callback (parse_page) parse_page gets a future object obj, which needs obj.result.() Get the results

Leave a Reply

Your email address will not be published. Required fields are marked *