多线程

1.使用threading模块

import random
import time
import threading
# 新线程执行的代码,放在一个函数中:
def thread_run(args):
    print('%s is running...' % threading.current_thread().name)
    for i in args:
        print ('%s args: %s' % (threading.current_thread().name,i))
        time.sleep(random.random())
    print( '%s ended.' % threading.current_thread().name)

print ('%s is running...' % threading.current_thread().name)
t1 = threading.Thread(target=thread_run, name='Thread_1',args=(['a','b','c'],))
t2 = threading.Thread(target=thread_run, name='Thread_2',args=(['aa','bb','cc'],))
t1.start()
t2.start()
t1.join()
t2.join()
print ('%s ended.' % threading.current_thread().name)

运行结果:

MainThread is running...
Thread_1 is running...
Thread_1 args: a
Thread_2 is running...
Thread_2 args: aa
Thread_1 args: b
Thread_2 args: bb
Thread_1 args: c
Thread_2 args: cc
Thread_1 ended.
Thread_2 ended.
MainThread ended.
[Finished in 1.8s]

2.继承threading.Thread

import random
import threading
import time
#继承Thread类
class myThread(threading.Thread):
    def __init__(self,name,args):
        threading.Thread.__init__(self,name=name)
        self.args = args

    def run(self):
        print('%s is running...' % (threading.current_thread().name))
        for a in self.args:
                print ('%s args: %s' % (threading.current_thread().name,a))
                time.sleep(random.random())
        print ('%s ended.' % (threading.current_thread().name))

print ('%s is running...' % (threading.current_thread().name))
t1 = myThread(name='Thread_1',args=['a','b','c'])
t2 = myThread(name='Thread_2',args=['aa','bb','cc'])
t1.start()
t2.start()
t1.join()
t2.join()
print ('%s ended.' % (threading.current_thread().name))

运行结果:

MainThread is running...
Thread_1 is running...
Thread_2 is running...
Thread_1 args: a
Thread_2 args: aa
Thread_2 args: bb
Thread_1 args: b
Thread_2 args: cc
Thread_1 args: c
Thread_1 ended.
Thread_2 ended.
MainThread ended.
[Finished in 2.2s]

3.线程同步

import threading
#import random
#使用RLock,通过acquire和release锁定和释放资源
mylock = threading.RLock()
num=0
class myThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self,name=name)

    def run(self):
        global num
        while True:
            mylock.acquire()
            print ('%s locked, Number: %d'%(threading.current_thread().name, num))
            if num>=5:
                mylock.release()
                print ('%s released, Number: %d'%(threading.current_thread().name, num))
                break
            num+=1
            print ('%s released, Number: %d'%(threading.current_thread().name, num))
            mylock.release()


if __name__== '__main__':
    thread1 = myThread('Thread_1')
    thread2 = myThread('Thread_2')
    thread1.start()
    thread2.start()

运行结果:

Thread_1 locked, Number: 0
Thread_1 released, Number: 1
Thread_1 locked, Number: 1
Thread_1 released, Number: 2
Thread_1 locked, Number: 2
Thread_1 released, Number: 3
Thread_1 locked, Number: 3
Thread_1 released, Number: 4
Thread_1 locked, Number: 4
Thread_1 released, Number: 5
Thread_1 locked, Number: 5
Thread_1 released, Number: 5
Thread_2 locked, Number: 5
Thread_2 released, Number: 5
[Finished in 0.1s]

多进程

1.使用os模块中的fork方式

操作系统将当前进程(父进程)复制出一份进程(子进程),这两个进程几乎完全相同,于是fork方法分别在父进程和子进程中返回。子进程中永远返回0,父进程中返回的是子进程的ID

import os
if __name__ == '__main__':
    print ('this is main progress (%s) start ...'%(os.getpid()))
    pid = os.fork()
    if pid < 0:
        print ('fork error')
    elif pid == 0:
        print( 'child process: %s and parent process is (%s)'%(os.getpid(),os.getppid()))
    else:
        print ('parent process: %s created a child process (%s).'%(os.getpid(),pid))

运行结果:

this is main progress (15450) start ...
parent process: 15450 created a child process (15454).
child process: 15454 and parent process is (15450)
[Finished in 0.1s]

2.使用multiprocessing模块

用start()方法启动进程,用join()方法实现进程间的同步。

import os
from multiprocessing import Process
# 子进程要执行的代码,放在一个函数中
def run_proc(name):
    print ('child process %s (%s) is running...' % (name, os.getpid()))

if __name__ == '__main__':
    print ('parent process %s.' % os.getpid())
    p_list=[]
    for i in range(5):
        p = Process(target=run_proc, args=(str(i),))
        p_list.append(p)
        print ('process will start.')
        p_list[i].start()
    for p in p_list:
        p.join()
    print ('process end.')

运行结果:

parent process 15679.
process will start.
process will start.
child process 0 (15683) is running...
process will start.
child process 1 (15684) is running...
process will start.
child process 2 (15685) is running...
process will start.
child process 4 (15687) is running...
child process 3 (15686) is running...
process end.
[Finished in 0.1s]

3.进程池对象

Pool提供指定数量的进程供用户调用,默认大小是CPU的核数。当有新的请求提交到Pool中时,如果池未满,那么就创建一个新的进程执行该请求;若池中的进程数已达到最大值,那么该请求就保持等待,直到池中有进程结束,才会创建新的进程来处理它。

from multiprocessing import Pool
import os, time, random

def run_task(name):
    print( 'task %s (pid = %s) is running...' % (name, os.getpid()))
    time.sleep(random.random() * 50)
    print ('task %s ended.' % name)

if __name__=='__main__':
    print ('main process %s.' % os.getpid())
    p = Pool(processes=5)
    for i in range(8):
        p.apply_async(run_task, args=(i,))
    print ('Waiting for all subprocesses ended...')
    p.close()
    p.join()
    print ('all subprocesses ended.')

运行结果:

main process 16119.
Waiting for all subprocesses ended...
task 0 (pid = 16123) is running...
task 1 (pid = 16124) is running...
task 2 (pid = 16125) is running...
task 3 (pid = 16126) is running...
task 4 (pid = 16127) is running...
task 3 ended.
task 5 (pid = 16126) is running...
task 0 ended.
task 6 (pid = 16123) is running...
task 5 ended.
task 7 (pid = 16126) is running...
task 7 ended.
task 2 ended.
task 1 ended.
task 4 ended.
task 6 ended.
all subprocesses ended.
[Finished in 70.3s]

4.进程间通信

1)queue

queue是多进程安全的队列,使用queue可以实现多进程之间的数据传递。

from multiprocessing import Process, Queue
import os, time, random

# 写进程代码:
def proc_write(q,args):
    print('process(%s) is writing...' % os.getpid())
    for a in args:
        q.put(a)
        print('put %s to queue...' % a)
        time.sleep(random.random())

# 读进程代码:
def proc_read(q):
    print('process(%s) is reading...' % os.getpid())
    while (True):
        a = q.get(True)
        print('get %s from queue.' % a)

if __name__=='__main__':
    # 父进程创建queue,并传给各个子进程:
    q = Queue()
    proc_writer1 = Process(target=proc_write, args=(q,['a', 'b', 'c']))
    proc_writer2 = Process(target=proc_write, args=(q,['aa','bb','cc']))
    proc_reader = Process(target=proc_read, args=(q,))
    # 启动子进程proc_writer,写入:
    proc_writer1.start()
    proc_writer2.start()
    # 启动子进程proc_reader,读取:
    proc_reader.start()
    # 等待proc_writer结束:
    proc_writer1.join()
    proc_writer2.join()
    # proc_reader进程里是死循环,无法等待其结束,只能强行终止:
    proc_reader.terminate()

运行结果:

process(16709) is writing...
process(16710) is writing...
put a to queue...
put aa to queue...
process(16712) is reading...
get a from queue.
get aa from queue.
put b to queue...
get b from queue.
put bb to queue...
get bb from queue.
put cc to queue...
get cc from queue.
put c to queue...
get c from queue.
[Finished in 1.9s]

2.pipe

pipe常用来在两个进程间进行通信,两个进程分别位于管道的两端。pipe方法返回(conn1,conn2)代表管道的两个端。

 duplex参数为True(默认值表示这个管道是全双工模式,也就是说conn1和conn2均可收发。     若duplex为False,conn1只负责接收消息,conn2只负责发送消息。

 send和recv方法分别是发送和接收消息的方法。

import multiprocessing
import random
import time,os

def proc_send(pipe,args):
    for arg in args:
        print ("process(%s) send: %s" %(os.getpid(),arg))
        pipe.send(arg)
        time.sleep(random.random())

def proc_recv(pipe):
    while True:
        print ("process(%s) rev:%s" %(os.getpid(),pipe.recv()))
        time.sleep(random.random())

if __name__ == "__main__":
        pipe = multiprocessing.Pipe()
        p1 = multiprocessing.Process(target=proc_send, args=(pipe[0],['aa'+str(i) 
         for i in range(10) ]))
        p2 = multiprocessing.Process(target=proc_recv, args=(pipe[1],))
        p1.start()
        p2.start()
        p1.join()
        p2.join()

运行结果:

process(17181) send: aa0
process(17182) rev:aa0
process(17181) send: aa1
process(17181) send: aa2
process(17182) rev:aa1
process(17181) send: aa3
process(17181) send: aa4
process(17182) rev:aa2
process(17182) rev:aa3
process(17181) send: aa5
process(17182) rev:aa4
process(17181) send: aa6
process(17182) rev:aa5
process(17181) send: aa7
process(17181) send: aa8
process(17182) rev:aa6
process(17181) send: aa9
process(17182) rev:aa7
process(17182) rev:aa8
process(17182) rev:aa9

协程

Python通过yield提供了对协程的基本支持,但是不完全。

gevent是一个基于协程的Python网络函数库,提供了比较完善的协程支持。

(1)使用gevent中的spawn方法和joinall方法。

spawn方法是用来形成协程,joinall方法就是添加这些协程任务,并且启动运行。从运行结果来看,3个网络操作是并发执行的,而且结束顺序不同,但其实只有一个线程。

#gevent的使用流程
from gevent import monkey; monkey.patch_all()
import gevent
import urllib.request, urllib.error, urllib.parse

def run_task(url):
    print('Visit --> %s' % url)
    try:
        response = urllib.request.urlopen(url)
        data = response.read()
        print('%d bytes received from %s.' % (len(data), url))
    except Exception as e:
        print(e)
if __name__=='__main__':
    urls = ['https://github.com/','https://www.python.org/','http://xueyp.github.io/']
    greenlets = [gevent.spawn(run_task, url) for url in urls  ]
    gevent.joinall(greenlets)```

运行结果:

Visit --> https://github.com/
Visit --> https://www.python.org/
Visit --> http://xueyp.github.io/
49115 bytes received from https://www.python.org/.
53784 bytes received from https://github.com/.
7101 bytes received from http://xueyp.github.io/.
[Finished in 2.5s]

(2)池可以对动态数量的greenlet进行并发管理(限制并发数).

from gevent import monkey
monkey.patch_all()
import urllib.request, urllib.error, urllib.parse
from gevent.pool import Pool

def run_task(url):
    print('Visit --> %s' % url)
    try:
        response = urllib.request.urlopen(url)
        data = response.read()
        print('%d bytes received from %s.' % (len(data), url))
    except Exception as e:
        print(e)
    return '%s is fetched'% url

if __name__=='__main__':
    pool = Pool(3)
    urls = ['https://github.com/','https://www.python.org/','http://xueyp.github.io/','http://baidu.com','http://163.com/']
    results = pool.map(run_task,urls)
    print(results)

运行结果:

Visit --> https://github.com/
Visit --> https://www.python.org/
Visit --> http://xueyp.github.io/
49115 bytes received from https://www.python.org/.
Visit --> http://baidu.com
7101 bytes received from http://xueyp.github.io/.
Visit --> http://163.com/
81 bytes received from http://baidu.com.
53784 bytes received from https://github.com/.
688437 bytes received from http://163.com/.
['https://github.com/ is fetched', 'https://www.python.org/ is fetched', 'http://xueyp.github.io/ is fetched', 'http://baidu.com is fetched', 'http://163.com/ is fetched']
[Finished in 2.2s]

分布式编程

创建分布式进程服务端需要分为五个步骤:

1.建立用来进行进程间的通信的Queue

 服务进程创建作为传递任务给任务进程的通道:任务队列task_queue

 服务进程创建结果队列作为任务进程完成任务后回复服务进程的通道:result_queue

 在分布式多进程环境下,必须通过由Queuemanager获得的Queue接口来添加任务。

2.把第一步中建立的队列注册在网络中,供其他进程(主机)注册。

3.建立一个对象(Queuemanager(BaseManager))实例manager,绑定端口和验证口令并启动。

4.各主机获得通过网络访问的Queue对象,即把网络队列实体化成可以使用的本地队列。

5.创建任务到“本地”队列中,自动上传任务到网络队列中,分配给任务进程进行处理。

 贴个网上找的python2的代码:

#coding:utf-8
#server.py
import random,time,Queue
from multiprocessing.managers import BaseManager
#实现第一步:建立task_queue和result_queue,用来存放任务和结果
task_queue=Queue.Queue()
result_queue=Queue.Queue()

class Queuemanager(BaseManager):
    pass
#实现第二步:把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象,
# 将Queue对象在网络中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)

#实现第三步:绑定端口8001,设置验证口令‘qiye’。这个相当于对象的初始化,并启动
manager=Queuemanager(address=('',8001),authkey='qiye')
manager.start()

#实现第四步:通过管理实例的方法获得通过网络访问的Queue对象
task=manager.get_task_queue()
result=manager.get_result_queue()

#实现第五步:添加任务
for url in ["ImageUrl_"+str(i) for i in range(10)]:
    print 'put task %s ...' %url
    task.put(url) 
#获取返回结果
print 'try get result...'
for i in range(10):
    print 'result is %s' %result.get(timeout=10)
#关闭管理
manager.shutdown()

创建任务进程的步骤相对较少,需要四个步骤:

1.使用QueueManager注册用于获取Queue的方法名称,任务进程只能通过名称来在网络上获取Queue。

2.连接服务器,端口和验证口令注意保持与服务进程中完全一致。

3.从网络上获取Queue,进行本地化。

4.从task队列获取任务,并把结果写入result队列。

#coding:utf-8
#worker.py
import time
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass
# 实现第一步:使用QueueManager注册获取Queue的方法名称
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 实现第二步:连接到服务器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证口令注意保持与服务进程设置的完全一致:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 从网络连接:
m.connect()
# 实现第三步:获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 实现第四步:从task队列取任务,并把结果写入result队列:
while(not task.empty()):
        image_url = task.get(True,timeout=5)
        print('run task download %s...' % image_url)
        time.sleep(1)
        result.put('%s--->success'%image_url)

# 处理结束:
print('worker exit.')

版权声明:本文为博主原创文章,转载请注明出处。 旭日酒馆