python thread tutorial

python thread tutorial

参考链接

1. python 线程与进程的区别

进程是程序(软件,应用)的一个执行实例,每个运行中的程序,可以同时创建多个进程,但至少要有一个。每个进程都提供执行程序所需的所有资源,都有一个虚拟的地址空间、可执行的代码、操作系统的接口、安全的上下文(记录启动该进程的用户和权限等等)、唯一的进程ID、环境变量、优先级类、最小和最大的工作空间(内存空间)。进程可以包含线程,并且每个进程必须有至少一个线程。每个进程启动时都会最先产生一个线程,即主线程,然后主线程会再创建其他的子线程。

线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。另外,线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不独立拥有系统资源,但它可与同属一个进程的其它线程共享该进程所拥有的全部资源。每一个应用程序都至少有一个进程和一个线程。在单个程序中同时运行多个线程完成不同的被划分成一块一块的工作,称为多线程。

举个例子,某公司要生产一种产品,于是在生产基地建设了很多厂房,每个厂房内又有多条流水生产线。所有厂房配合将整个产品生产出来,单个厂房内的流水线负责生产所属厂房的产品部件,每个厂房都拥有自己的材料库,厂房内的生产线共享这些材料。公司要实现生产必须拥有至少一个厂房一条生产线。换成计算机的概念,那么这家公司就是应用程序,厂房就是应用程序的进程,生产线就是某个进程的一个线程。

线程的特点:

线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令。假设你正在读一本书,没有读完,你想休息一下,但是你想在回来时继续先前的进度。有一个方法就是记下页数、行数与字数这三个数值,这些数值就是execution context。如果你的室友在你休息的时候,使用相同的方法读这本书。你和她只需要这三个数字记下来就可以在交替的时间共同阅读这本书了。

线程的工作方式与此类似。CPU会给你一个在同一时间能够做多个运算的幻觉,实际上它在每个运算上只花了极少的时间,本质上CPU同一时刻只能干一件事,所谓的多线程和并发处理只是假象。CPU能这样做是因为它有每个任务的execution context,就像你能够和你朋友共享同一本书一样。

进程与线程区别:

  • 同一个进程中的线程共享同一内存空间,但进程之间的内存空间是独立的。
  • 同一个进程中的所有线程的数据是共享的,但进程之间的数据是独立的。
  • 对主线程的修改可能会影响其他线程的行为,但是父进程的修改(除了删除以外)不会影响其他子进程。
  • 线程是一个上下文的执行指令,而进程则是与运算相关的一簇资源。
  • 同一个进程的线程之间可以直接通信,但是进程之间的交流需要借助中间代理来实现。
  • 创建新的线程很容易,但是创建新的进程需要对父进程做一次复制。
  • 一个线程可以操作同一进程的其他线程,但是进程只能操作其子进程。
  • 线程启动速度快,进程启动速度慢(但是两者运行速度没有可比性)。

2. 简单Python实列

爬取相关常见网站主页,并且获取其中的title

1
2
3
4
5
6
7
8
9
10
11
import threading
import requests
import timefrom bs4
import BeautifulSoup
hosts = "http://baidu.com", "http://cn.bing.com", "http://taobao.com","http://jd.com"
start = time.time()
for host in hosts:
url = requests.get(host)
soup = BeautifulSoup(url.text)
print(soup.findAll([ 'title' ]))
print("Elapsed Time: %s" % (time.time()- start))

结果如下:

1
2
3
4
[<title>微软 Bing 搜索 - 国内版</title>]
[<title>淘宝网 - 淘!我喜欢</title>]
[<title>京东(JD.COM)-正品低价、品质保障、配送及时、轻松购物!</title>]
Elapsed Time: 4.66728138923645

thread优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import time
import queue
import requests
import threading
from bs4 import BeautifulSoup

hosts = "http://baidu.com", "http://cn.bing.com", "http://taobao.com","http://jd.com"
Queue = queue.Queue()
out_queue = queue.Queue()

class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue, outqueue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = outqueue

def run(self):
while True:
#grabs host from queue
host = self.queue.get()

#grabs urls of hosts and then grabs chunk of webpage
url = requests.get(host)
chunk = url.text
#place chunk into out queue
self.out_queue.put(chunk)

#signals to queue job is done
self.queue.task_done()

class DatamineThread(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, out_queue):
threading.Thread.__init__(self)
self.out_queue = out_queue

def run(self):
while True:
#grabs host from queue
chunk = self.out_queue.get()

#parse the chunk
soup = BeautifulSoup(chunk)
print (soup.findAll(['title']))

#signals to queue job is done
self.out_queue.task_done()
start = time.time()
def main():
#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(Queue, out_queue)
t.setDaemon(True)
t.start()

#populate queue with data
for host in hosts:
Queue.put(host)

for i in range(5):
dt = DatamineThread(out_queue)
dt.setDaemon(True)
dt.start()


#wait on the queue until everything has been processed
Queue.join()
out_queue.join()
main()
print ("Elapsed Time: %s" % (time.time() - start))

使用thread结果

1
2
3
4
[<title>微软 Bing 搜索 - 国内版</title>]
[<title>京东(JD.COM)-正品低价、品质保障、配送及时、轻松购物!</title>]
[<title>淘宝网 - 淘!我喜欢</title>]
Elapsed Time: 1.513925552368164

3. 生产者消费者模型

​ 代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import time
import random
import logging

import queue
import threading
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-9s) %(message)s',)

BUF_SIZE = 10
q = queue.Queue(BUF_SIZE)
class ProducerThread(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose = True):
super(ProducerThread,self).__init__()
self.target = target
self.name = name

def run(self):
while True:
if not q.full():
rand = random.randint(1,10)
q.put(rand)
logging.debug('Putting '+ str(rand) + ':' + str(q.qsize()) + ' item in queue')
time.sleep(random.random())



class ConsumerThread(threading.Thread):
def __init__( self, group = None, target = None, name = None,
args = (), kwargs = None, verbose = True ):
super(ConsumerThread, self).__init__()
self.target = target
self.name = name

def run(self):
while True:
if not q.empty():
item = q.get()
logging.debug('Getting ' + str(item)
+ ' : ' + str(q.qsize()) + ' items in queue')
time.sleep(random.random())

if __name__ == '__main__':
p = ProducerThread( name = 'producer')
c = ConsumerThread( name = 'consumer')
p.start()
time.sleep(2)
c.start()
time.sleep(2)

4. Python queue工具的认识

​ queue是python中的标准库,俗称队列。

​ 在python中,多个线程之间的数据是共享的,多个线程进行数据交换的时候,不能够保证数据的安全性和一致性,所以当多个线程需要进行数据交换的时候,队列就出现了,队列可以完美解决线程间的数据交换,保证线程间数据的安全性和一致性。

注意: 在python2.x中,模块名为Queue

queue模块有三种队列及构造函数

Python queue模块的FIFO队列先进先出。 queue.Queue(maxsize)

LIFO类似于堆,即先进后出。 queue.LifoQueue(maxsize)

还有一种是优先级队列级别越低越先出来。 queue.PriorityQueue(maxsize)

queue模块中的常用方法

queue.qsize() 返回队列的大小

queue.empty() 如果队列为空,返回True,反之False

queue.full() 如果队列满了,返回True,反之False

queue.full 与 maxsize 大小对应

queue.get([block[, timeout]])获取队列,立即取出一个元素, timeout超时时间

queue.put(item[, timeout]]) 写入队列,立即放入一个元素, timeout超时时间

queue.get_nowait() 相当于queue.get(False)

queue.put_nowait(item) 相当于queue.put(item, False)

queue.join() 阻塞调用线程,直到队列中的所有任务被处理掉, 实际上意味着等到队列为空,再执行别的操作

queue.task_done() 在完成一项工作之后,queue.task_done()函数向任务已经完成的队列发送一个信号

代码实例

以下代码在Python3下通过

创建队列

1
2
import queue
q = queue.Queue()

empty方法(如果队列为空,返回True)

1
2
3
4
import queue
q = queue.Queue()
print(q.empty())
#输出:True

full方法(如果队列满了,返回True)

1
2
3
4
5
import queue
q = queue.Queue(1) #指定队列大小
q.put('a')
print(q.full())
#输出:True

put方法和get方法

1
2
3
4
5
6
import queue
q = queue.Queue()
q.put('a')
q.put('b')
print(q.get())
#输出:a

qsize方法(返回队列里元素个数)

1
2
3
4
5
6
import queue
q = queue.Queue()
q.put('a')
q.put('b')
print(q.qsize())
#输出:2

5. multiprocessing 进程

Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。

另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。

下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import os
import multiprocessing

def foo(i):
# 同样的参数传递方法
print("这里是 ", multiprocessing.current_process().name)
print('模块名称:', __name__)
print('父进程 id:', os.getppid()) # 获取父进程id
print('当前子进程 id:', os.getpid()) # 获取自己的进程id
print('------------------------')

if __name__ == '__main__':

for i in range(5):
p = multiprocessing.Process(target=foo, args=(i,))
p.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
这里是  Process-2
模块名称: __mp_main__
父进程 id: 880
当前子进程 id: 5260
--------------
这里是 Process-3
模块名称: __mp_main__
父进程 id: 880
当前子进程 id: 4912
--------------
这里是 Process-4
模块名称: __mp_main__
父进程 id: 880
当前子进程 id: 5176
--------------
这里是 Process-1
模块名称: __mp_main__
父进程 id: 880
当前子进程 id: 5380
--------------
这里是 Process-5
模块名称: __mp_main__
父进程 id: 880
当前子进程 id: 3520
--------------

1. 进程间的数据共享

在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。

创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。

下面我们尝试用一个全局列表来实现进程间的数据共享:

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process

lis = []

def foo(i):
lis.append(i)
print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))

if __name__ == '__main__':
for i in range(5):
p = Process(target=foo, args=(i,))
p.start()
print("The end of list_1:", lis)

运行结果:

1
2
3
4
5
6
The end of list_1: []
This is Process 2 and lis is [2] and lis.address is 40356744
This is Process 1 and lis is [1] and lis.address is 40291208
This is Process 0 and lis is [0] and lis.address is 40291208
This is Process 3 and lis is [3] and lis.address is 40225672
This is Process 4 and lis is [4] and lis.address is 40291208

可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。

想要在进程之间进行数据共享可以使用QueuesArrayManager这三个multiprocess模块提供的类。

1.1 使用Array共享数据

对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系:

1
2
3
4
5
6
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process
from multiprocessing import Array

def func(i,temp):
temp[0] += 100
print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])

if __name__ == '__main__':
temp = Array('i', [1, 2, 3, 4])
for i in range(10):
p = Process(target=func, args=(i, temp))
p.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
进程2   修改数组第一个元素后-----> 101
进程4 修改数组第一个元素后-----> 201
进程5 修改数组第一个元素后-----> 301
进程3 修改数组第一个元素后-----> 401
进程1 修改数组第一个元素后-----> 501
进程6 修改数组第一个元素后-----> 601
进程9 修改数组第一个元素后-----> 701
进程8 修改数组第一个元素后-----> 801
进程0 修改数组第一个元素后-----> 901
进程7 修改数组第一个元素后-----> 1001

1.2 使用Manager共享数据

通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process
from multiprocessing import Manager

def func(i, dic):
dic["num"] = 100+i
print(dic.items())

if __name__ == '__main__':
dic = Manager().dict()
for i in range(10):
p = Process(target=func, args=(i, dic))
p.start()
p.join()

运行结果:

1
2
3
4
5
6
7
8
9
10
[('num', 100)]
[('num', 101)]
[('num', 102)]
[('num', 103)]
[('num', 104)]
[('num', 105)]
[('num', 106)]
[('num', 107)]
[('num', 108)]
[('num', 109)]

1.3 使用queues的Queue类共享数据

multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import multiprocessing
from multiprocessing import Process
from multiprocessing import queues

def func(i, q):
ret = q.get()
print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))
q.put(i)

if __name__ == "__main__":
lis = queues.Queue(20, ctx=multiprocessing)
lis.put(0)
for i in range(10):
p = Process(target=func, args=(i, lis,))
p.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
进程1从队列里获取了一个0,然后又向队列里放入了一个1
进程4从队列里获取了一个1,然后又向队列里放入了一个4
进程2从队列里获取了一个4,然后又向队列里放入了一个2
进程6从队列里获取了一个2,然后又向队列里放入了一个6
进程0从队列里获取了一个6,然后又向队列里放入了一个0
进程5从队列里获取了一个0,然后又向队列里放入了一个5
进程9从队列里获取了一个5,然后又向队列里放入了一个9
进程7从队列里获取了一个9,然后又向队列里放入了一个7
进程3从队列里获取了一个7,然后又向队列里放入了一个3
进程8从队列里获取了一个3,然后又向队列里放入了一个8

关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue

2. 进程锁

为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLockLockEventConditionSemaphore,连用法都是一样样的,这一点非常友好!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import time

def func(i,lis,lc):
lc.acquire()
lis[0] = lis[0] - 1
time.sleep(1)
print('say hi', lis[0])
lc.release()

if __name__ == "__main__":
array = Array('i', 1)
array[0] = 10
lock = RLock()
for i in range(10):
p = Process(target=func, args=(i, array, lock))
p.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
say hi 9
say hi 8
say hi 7
say hi 6
say hi 5
say hi 4
say hi 3
say hi 2
say hi 1
say hi 0

3. 进程池Pool类

进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。

比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中常用的方法:

  • apply() 同步执行(串行)
  • apply_async() 异步执行(并行)
  • terminate() 立刻关闭进程池
  • join() 主进程等待所有子进程执行完毕。必须在close或terminate()之后。
  • close() 等待所有进程结束后,才关闭进程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Pool
import time

def func(args):
time.sleep(1)
print("正在执行进程 ", args)

if __name__ == '__main__':

p = Pool(5) # 创建一个包含5个进程的进程池

for i in range(30):
p.apply_async(func=func, args=(i,))

p.close() # 等子进程执行完毕后关闭进程池
# time.sleep(2)
# p.terminate() # 立刻关闭进程池
p.join()
来发评论吧~
Powered By Valine
v1.5.2