- An introduction
- Introduction to the Second Cooperative Program
- Three Greenlet
- Introduction to Four Gevents
- Synchronization and Asynchronization of Five Gevents
- Examples of Six Gevents
- Examples of Seven Gevents
The theme of this section is to achieve concurrency on a single thread basis, that is, with only one main thread (obviously only one CPU available), so we need to review it first.The essence of concurrency: switching + saving state
cpuRunning a task cuts it off to perform other tasks (switching is mandatory by the operating system) in two cases, one is that the task is blocked, the other is that the task takes too long to compute or a higher priority program replaces it.
ps：In introducing the process theory, we refer to the three execution states of the process, and the thread is the execution unit, so we can also understand the above figure as the three states of the thread.
One: The second case does not improve efficiency, just in order to make the CPU rain and dew, and achieve the effect that all tasks seem to be executed at the same time. If multiple tasks are pure computing, this switching will reduce efficiency. So we can verify it based on yield. YieldIt’s a way to save the running state of a task in a single thread. Let’s review it briefly.
#1 yiledYou can save state. The state of yield is similar to the state of thread saved by the operating system, but yield is code-level controlled and lightweight.# 2 send can transfer the result of one function to another function to realize the switching between programs in a single thread'''1. Co-operation:Single-threaded concurrencyControlling switching and saving status of multiple tasks in an applicationAdvantage:Application-level switching is much faster than operating system switchingDisadvantages:MultipleOnce a task has a blockage and is not cut, the entire thread is blocked in place.No other tasks in this thread can be performedOnce a coroutine is introduced, it is necessary to detect all IO behavior in a single thread.Implement switching when IO is encounteredNo less than one, thinking that once a task blocked, the whole thread blocked.Other tasks can be computed, but they can't work.2. Aims of the Co-Procedure:Want to achieve concurrency in a single threadConcurrency refers to multiple tasksIt looks like it's running at the same time.Concurrent = switch + save state''Serial executionImport timeDef func1 ():For I in range (10000000):I+1Def func2 ():For I in range (10000000):I+1Start = time. time ()Func1 ()Func2 ()StoP = time. time ()Print (stop - start)# Concurrent execution based on YieldImport timeDef func1 ():While True:YieldDef func2 ():G=func1 ()For I in range (10000000):I+1Next (g)Start=tIme.time ()Func2 ()Stop = time. time ()Print (stop-start)
2. Switching in the first case. When the task encounters io, it is cut to task 2 to perform, so that the computation of task 2 can be completed by using the time of task 1 blocking. This is why the efficiency is improved.
import time def func1(): while True: print('func1') yield def func2(): g=func1() for i in range(10000000): i+1 next(g) time.sleep(3) print('func2') start=time.time() func2() stop=time.time() print(stop-start)
For single-threaded applications, it is inevitable that there will be IO operations, but if we can control multiple tasks under single-threaded applications (i.e. user program level, not operating system level), we can switch to another task to calculate when one task encounters IO blockage, which ensures the line.Cheng can be in the ready state to the greatest extent, that is, it can be executed by the CPU at any time, which is equivalent to hiding our IO operations at the user program level to the greatest extent, thus confusing the operating system, so that it can see: the thread seems to have been computing, less io, so that more will beThe execution privileges of the CPU are assigned to our threads.
The essence of a collaboration is that in a single thread, the user controls one task to switch to another task to execute when the IO blockage occurs, so as to improve efficiency. In order to achieve it, we need to find a solution that can satisfy the following conditions at the same time:
#1. The switch between multiple tasks can be controlled, and the state of the task can be saved before the switch, so that the task can be re-run, and the execution can be continued based on the suspended position.# 2. As a supplement to 1: you can detect IO operations and switch only when you encounter IO operations
Introduction to the Second Cooperative Program
COOPERATION: It is concurrent under single thread, also known as micro-thread, fibre. The English name is Coroutine.
One sentence explains what threads are: A coroutine is a user-mode lightweight thread, that is, a coroutine is scheduled by the user program itself.
It should be emphasized that:
#1. pythonThreads are at the kernel level, i.e. they are scheduled under the control of the operating system (for example, if a single thread encounters IO or if the execution time is too long, it will be forced to surrender the CPU execution privileges and switch other threads to run).# 2. Open the process within a single thread, and once you encounter io, it will be controlled from the application level (rather than the operating system).Switch to improve efficiency (!!! The switching of non-io operations is independent of efficiency.Contrasting with the switching of operating system control threads, users control the switching of the consortium within a single thread
The advantages are as follows:
#1. The switching overhead of the protocol is less, it belongs to program level switching, and the operating system is totally imperceptible, so it is lighter.# 2. Concurrency can be achieved within a single thread, maximizing the use of CPU
The shortcomings are as follows:
#1. The essence of a coroutine is that it can't use multiple cores in a single thread. It can be a program that opens multiple processes, multiple threads in each process, and coroutines in each thread.# 2. A collaboration refers to a single thread, so once the collaboration blockages, the entire thread will be blocked.
Summarize the characteristics of the process:
Concurrency must be implemented in only one single thread
Modifying shared data without locking
Save the context stack of multiple control flows in the user program
Addition: A protocol automatically switches IO operations to other protocols when it encounters them (how to detect IO, yield and Greenlet can not be achieved, so gevent module (select mechanism) is used)
If we have 20 tasks in a single thread, it’s too cumbersome to use the yield generator to switch between multiple tasks (we need to initialize the generator once, then call send…). Very troublesome), and using the Greenlet module can be very simpleDirect switching of these 20 tasks on a single site
pip3 install greenlet
from greenlet import greenlet def eat(name): print('%s eat 1' %name) g2.switch('egon') print('%s eat 2' %name) g2.switch() def play(name): print('%s play 1' %name) g1.switch() print('%s play 2' %name) g1=greenlet(eat) g2=greenlet(play) g1.switch('egon')#You can pass in parameters at the first switch, and you don't need them later.
Simple switching (in the absence of IO or repeated operations to open up memory space) can slow down the execution of programs.
#Sequential executionImport timeDef F1 ():Res=1For I in range (100000000):Res+=iDef F2 ():ReS=1For I in range (100000000):Res*=iStart = time. time ()F1 ()F2 ()Stop = time. time ()PrinT('run time is% s'% (stop-start)#10.985628366470337Switch overFrom Greenlet import GreenletImport timeDef F1 ():Res=1For I in range (100000000):Res+=iG2.switch ()Def F2 ():Res=1For I in range (100000000):Res*=iG1.switch ()Start = time. time ()G1=greeNLet (F1)G2 = Greenlet (f2)G1.switch ()Stop = time. time ()Print ('run time is% s'% (stop-start)# 52.763017892837524
greenletIt only provides a more convenient switching mode than generator. If IO is encountered when a task is cut to execute, it will block in place. It is still not solved the problem of IO automatic switching to improve efficiency.
The code of these 20 tasks in a single thread usually has both computational and blocking operations. We can use blocking time to execute task 2 when we encounter blocking in task 1. Only in this way can we improve efficiency, which uses the Gevent module.
Introduction to Four Gevents
pip3 install gevent
Gevent It is a third-party library that can easily implement concurrent synchronous or asynchronous programming through gevent. The main mode used in gevent is Greenlet. It is a lightweight protocol that accesses Python in the form of C extension module. Greenlet runs in the main programOperating system processes are internal, but they are scheduled collaboratively.
#usageG1 = gevent. spawn (func, 1, 2, 3, x = 4, y = 5) creates a co-process object g1. The first parameter in parentheses of span is the function name, such as eat, which can be followed by several parameters, such as position argument or keyword argument, which are passed on.For function eatG2 = gevent. spawn (func2)G1. join ()# Waiting for G1 to endG2. join ()# Waiting for G2 to end# or two steps above: gevent. joinall ([g]1, g2])G1. value # Gets the return value of func1
Automatically switch tasks when IO congestion occurs
import gevent def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() #Or gevent. join all ([g1, g2])Print ('main')
The example gevent. sleep (2) simulates an IO blocking that gevent recognizes.
And time. sleep (2) or other blockages, gevent is not directly identifiable need to use the following line of code, patch, you can identify.
from gevent import monkey;monkey.patch_all()Must be placed in front of the patched, such as time, socket module
Or we simply remember that to use gevent, we need to put from gevent import monkey; monkey. patch_all() at the beginning of the file.
from gevent import monkey;monkey.patch_all() import gevent import time def eat(): print('eat food 1') time.sleep(2) print('eat food 2') def play(): print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play_phone) gevent.joinall([g1,g2]) print('Main ""
We can use threading. current_thread (). getName () to view each G1 and g2, and the result is DummyThread-n, or pseudo-thread.
Synchronization and Asynchronization of Five Gevents
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(10): task(i) def asynchronous(): g_l=[spawn(task,i) for i in range(10)] joinall(g_l) if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() #An important part of the above program is to encapsulate task functions into gevent. spawn of Greenlet internal threads. The initialized Greenlet list is stored in the array threads, which is passed to the gevent. join all function, andThe blocker blocks the current process and executes all given greenlets. The execution process will only continue down after all greenlets have been executed.
Examples of Six Gevents
from gevent import monkey;monkey.patch_all() import gevent import requests import time def get_page(url): print('GET: %s' %url) response=requests.get(url) if response.status_code == 200: print('%d bytes received from %s' %(len(response.text),url)) start_time=time.time() gevent.joinall([ gevent.spawn(get_page,'https://www.python.org/'), gevent.spawn(get_page,'https://www.yahoo.com/'), gevent.spawn(get_page,'https://github.com/'), ]) stop_time=time.time() print('run time is %s' %(stop_time-start_time))
Examples of Seven Gevents
Implementing socket concurrency (from gevent import monkey; monkey. patch_all() under single thread through gevent must be put before importing socket module, otherwise gevent cannot recognize soBlocking of cket)
from gevent import monkey;monkey.patch_all() from socket import * import gevent #If you don't want to patch money. patch_all(), you can use the socket that comes with gevent.\ from gevent import socket# s = socket. socket ()Def seRver (server_ip, port):S = socket (AF_INET, SOCK_STREAM)S. setsockopt (SOL_SOCKET, SO_REUSEADDR, 1)S. bind ((server_ip, port))S.listen (5)While True:Conn, addr = S. accept ()GeVent. spawn (talk, conn, addr)Def talk (conn, addr):Try:While True:Res=conn.rECV (1024)Print ('client%s:% s msg:% s'% (addr , addr , res))Conn. send (res).upper ())Exception as e:Print (E)Finally:Conn.close ()If __name__=='u main_':Server ('127.0.0.1', 8080)
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
Multithread concurrent multiple clients
from threading import Thread from socket import * import threading def client(server_ip,port): c=socket(AF_INET,SOCK_STREAM) #The socket object must be added to the function, that is, the local namespace, which is shared by all threads outside the function. If we share a socket object, the client port will always be the same.C. connect ((server_ip, port))Count=0While True:C. send ('% s say hello% s') (threading. current_thread (). getName (), couNt). encode ('utf-8')MSG = C. recv (1024)Print (msg. decode ('utf-8'))Count+=1IF_u name_=='u main_':For I in range (500):T = Thread (target = client, args = 127.0.0.1',8080)T.start ()