浅涉python与分布式

并行和分布式计算介绍

现代计算的特点,主板上安装多块处理器(每个处理器含有多个核心),这使得计算机能真正地实现并发。

一个处理器同一时间只能处理同一事务;后面章节我们会看到,当处理器快到一定程度,就可以给出同一时间进行多项任务的假象。若要真正实现同一时间多任务,就需要多个处理器。

另一个是高速计算机网络。它让无穷多的电脑实现了相互通讯。

并行计算

并行计算是同时使用多个处理器处理事务。

分布式计算

分布式计算是指同一时间使用多台计算机处理一个任务。只有当计算机之间互相连接时,才可以使用分布式计算。要记住,真正的瓶颈往往是数据而不是CPU。

并行和分布式计算的最明显的差异就是底层的内存架构和访问方式不同。

并行和四个处理器可以访问同一内存地址。对于分布式应用,不同的并发任务不能正常访问同一内存。原因是,一些任务是在这一台计算机运行,一些任务是在另一台计算机运行,它们是物理分隔的,通过网络进行数据传输。

现实中,计算机网络通讯就像一个纯粹的分布式内存架构。然而,每台计算机有多个处理器,运行着共享式内存架构。

分布式内存系统扩展性强、组建成本低:需要更高性能,扩展即可。另一优点是,处理器可以访问各自的内存,不必担心发生Race condition。

缺点是,开发者需要手动写数据传输的策略,需要考虑数据存储的位置。另外,不是所有算法都能容易移植到这种架构。

阿姆达尔定律

考虑一个部分并行的算法,称P为并行分量,S为序列分量(即非并行分量),P+S=100%T(n)为运行时间,处理器数量为n

对比T(n)和T(1)可以得到,分布式处理的加速比。

随着n的提高,加速的效果不让人满意。使用10个处理器,是9.2倍速。使用100个处理器,则是50倍速。使用1000个处理器,仅仅是91倍速。

阿姆达尔定律告诉我们两点:我们最快可以将倍速提高到多少;收益减少时,何时应该减少硬件资源的投入。

异步编程(非阻塞编程)

与传统的同步编程相比,异步编程或非阻塞编程,可以使性能获得极大提高。

理想的状态应该是安排一下任务,当一个任务等待I/O时,它处于悬停状态,就让另一个任务接管CPU。这就是异步(也称为事件驱动)编程。

使用多线程在不同的线程并行运行,也可以达到同样的效果。但是,有一个显著的不同:使用多线程时,是由操作系统决定哪个线程处于运行或悬停。然而,在异步编程中,每个任务可以自己决定是否放弃CPU。

另外,单单使用异步编程,我们不能做出真正的并发:同一时间仅仅有一个任务在运行。

另一点要注意的是,异步编程更善于处理I/O密集型任务,而不是CPU密集型任务(暂停任务不会使性能提高)。

任何异步代码都要精心选择非阻塞的库,以防使用阻塞代码。

协程

在Python中,让一个功能中途暂停的关键是使用协程。

协程就是一类函数,它可以通过yield,在指定位置暂停或继续任务。需要注意,尽管协程是强化的生成器,在概念意义上并不等于生成器。原因是,协程与迭代无关。另一不同点,生成器产生值,而协程消除值。

生成器就是一个callable,它生成一个结果序列,而不是返回结果。这是通过产生(通过yield关键字)值而不是返回值

1
2
3
4
def mygenerator(n):
while n:
n -= 1
yield n

next()从生成的序列产生一个值,本质上,生成器是简化的迭代器,免去了定义类中__iter____next__的方法。外,生成器是一次性操作,不能重复生成的序列。

__iter____next__方法,运行了迭代协议:前者返回了迭代的对象,后者逐个返回了序列中的元素。

1
2
3
4
5
6
7
8
9
10
class MyIterator(object):
def __init__(self, xs):
self.xs = xs
def __iter__(self):
return self
def __next__(self):
if self.xs:
return self.xs.pop(0)
else:
raise StopIteration

协程有三种主要的结构:

yield(): 用来暂停协程的执行

send(): 用来向协程传递数据(以让协程继续执行)

close():用来关闭协程

示例

1
2
3
4
5
6
7
8
9
10
11
def complain_about(substring):
print('Please talk to me!')
try:
while True:
# 执行到此处,控制点返回shell,直到外部send数据到yield处,传递给text
text = (yield)
if substring in text:
print('Oh no: I found a %s again!'
% (substring))
except GeneratorExit:
print('Ok, ok: I am quitting.')
1
2
3
4
5
6
7
8
9
10
11
>>> c = complain_about('Ruby')
>>> next(c)
Please talk to me!
>>> c.send('Test data')
>>> c.send('Some more random text')
>>> c.send('Test data with Ruby somewhere in it')
Oh no: I found a Ruby again!
>>> c.send('Stop complaining about Ruby or else!')
Oh no: I found a Ruby again!
>>> c.close()
Ok, ok: I am quitting. 复制ErrorOK!

通过 next 启动协程,close关闭。

词汇计数示例,文本来自http://www.gutenberg.org/cache/epub/2600/pg2600.txt。

逐行读取文件(通过cat函数);统计每行中substring的出现次数(grep协程);求和并打印数据(count协程)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from functools import wraps


def coroutine(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
c = fn(*args, **kwargs)
next(c)
return c

return wrapper


def cat(f, case_insensitive, child):
if case_insensitive:
line_processor = lambda l: l.lower()
else:
line_processor = lambda l: l
for line in f:
child.send(line_processor(line))


@coroutine
def grep(sub_str, case_insensitive, child):
if case_insensitive:
sub_str = sub_str.lower()
while True:
text = (yield) # 等待send发送的数据
child.send(text.count(sub_str))


@coroutine
def count(sub_str):
n = 0
try:
while True:
n += (yield)
except GeneratorExit:
print(sub_str, n)


@coroutine
def fanout(children):
# 多个目标同时输入,计数
while True:
data = (yield)
for child in children:
child.send(data)


if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-i', action='store_true', dest='case_insensitive')
parser.add_argument('pattern', type=str)
parser.add_argument('infile', type=argparse.FileType('r'))
args = parser.parse_args()

cat(args.infile, args.case_insensitive,
grep(args.pattern, args.case_insensitive, count(args.pattern)))

cat(
args.infile, args.case_insensitive,
fanout(
[grep(p, args.case_insensitive, count(p)) for p in args.pattern]))
1
python grep.py -i love pg2600.txt

并行计算

如何使用多个CPU进行并行编程的。具体目标是加速CPU密集型任务。

多线程

在单CPU系统中,使用多线程并不是真正的并行,在给定时间只有一个线程在运行。只有在多CPU计算机上,线程才是并行的。

尽管Python的线程是OS原生的,全局锁却使特定时间只有一个是运行的。

当一个协程或进程等待I/O时,让另一个运行CPU,也可以达到并发的效果。当一个任务需要占用CPU大量时间时,CPU Bound,就不会有多大提高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from threading import Thread
from queue import Queue
import urllib.request
URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'


def get_rate(pair, outq, url_tmplt=URL):
with urllib.request.urlopen(url_tmplt.format(pair)) as res:
body = res.read()
outq.put((pair, float(body.strip())))


if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('pairs', type=str, nargs='+')
args = parser.parse_args()
outputq = Queue()
for pair in args.pairs:
t = Thread(target=get_rate, kwargs={'pair': pair, 'outq': outputq})
t.daemon = True # 结束时,会自行回收线程资源
t.start()
for _ in args.pairs:
pair, rate = outputq.get()
print(pair, rate)
outputq.task_done()
outputq.join()

多进程

为避免全局锁对CPU制约型线程的影响,使用多进程。多进程有一些缺点,它必须启动Python的多个实例,启动时间长,耗费内存多。

多进程有它们各自的内存空间,使用的是无共享架构,数据访问十分清晰。

实现并行进程,python提供两个module:multiprocessingconcurrent.futures

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import concurrent.futures as cf


def fib(n):
if n <= 2:
return 1
elif n == 0:
return 0
elif n < 0:
raise Exception('fib(n) is undefined for n < 0')
return fib(n - 1) + fib(n - 2)


if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-n', type=int, default=1)
parser.add_argument('number', type=int, nargs='?', default=34)
args = parser.parse_args()
assert args.n >= 1, 'The number of threads has to be > 1'

with cf.ProcessPoolExecutor(max_workers=args.n) as pool:
results = pool.map(fib, [args.number] * args.n)

在四核处理器的计算机上运行时,可以实现真正的并行,运行一次到四次,时间差不多。

进程数比处理器数目多时,性能会急剧下降。

在工作进程之间交换数据,在学习C时,会用到

  • 管道 (使用最简单)
  • 信号 (开销最小)
  • 共享映射区 (无父子关系)
  • 本地套接字 (最稳定)

multiprocessing模块提供的方法是队列和管道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import multiprocessing as mp


def fib(n):
if n <= 2:
return 1
elif n == 0:
return 0
elif n < 0:
raise Exception('fib(n) is undefined for n < 0')
return fib(n - 1) + fib(n - 2)


def worker(inq, outq):
"inq 任务队列, outq 输出队列"
while True:
data = inq.get()
if data is None: # 检测 哨兵值
return
fn, arg = data
outq.put(fn(arg))


if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-n', type=int, default=1)
parser.add_argument('number', type=int, nargs='?', default=34)
args = parser.parse_args()
assert args.n >= 1, 'The number of threads has to be > 1'

tasks = mp.Queue()
results = mp.Queue()
for i in range(args.n):
tasks.put((fib, args.number))
for i in range(args.n):
mp.Process(target=worker, args=(tasks, results)).start()
for i in range(args.n):
print(results.get())
for i in range(args.n): # 输入哨兵值,停止worker
tasks.put(None)

开发并行应用的主要难点就是控制数据访问,避免竞争条件或篡改共享数据。要明确何时停止。阿姆达尔定律指出,并行是收益递减的。并行化可能耗时巨大。一定要知道,哪段代码是需要并行化的,理论加速上限又是多少。

另外,避免收益递减的方法是增加任务量,提升并行任务的占比,这是古斯塔夫森定律的核心。

Celery

Celery(http://www.celeryproject.org)是用到的第一个第三方库。Celery是一个分布任务队列,就是一个以队列为基础的系统。

轻量化的队列任务调度包:https://python-rq.org/

分别在主机安装RabbitMQ (windows: exe erlang),ubuntu机器开启 redis-server ( sudo apt-get install redis-server ),python环境安装 celery,redis。(pip install celery[Redis])

就学习示例而言,自己搭建整个环境耗时太大,尤其在windows环境下!最好能云服务器环境,只管写代码,环境一般不会出问题。

直接在几个虚拟机上测试比较方便。

在机器1上:

1
2
3
4
5
6
7
8
9
import celery

app = celery.Celery('test',
broker='redis://192.168.56.104',
backend='redis://192.168.56.103')

@app.task
def echo(message):
return message

建立机器1,worker池

1
celery -A test worker --loglevel=info

celery命令会默认启动CPU数目相同的worker进程。worker会使用test模块中的应用app(我们可以使用实例的名字celery -A test.app worker),并使用INFO等级在控制台显示日志。

在机器2上(此处即为 master节点),保存一份test.py相同代码

进入 python 交互环境:

1
>>> from test import echo
1
2
>>> res = echo.delay('Python rocks!')
>>> res.result

可以在worker机器上执行程序,并返回结果

分布式任务队列

master-worker架构,有一个中间件层,中间件层使用多个任务请求队列(即任务队列),和一个用于存储结果的队列(即结果后台)。

主进程(也叫作clientproducer)将任务请求安插到某个任务队列,从结果后台获取数据。worker进程订阅任务队列以明确任务是什么,并把结果放到结果后台。

只管定制好 任务队列 和 结果后台,其他worker、producer如何变化、什么程序都无所谓。

也称作 Master Worker 模式:

Master Worker 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# master.py
import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()


# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
pass


# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')

# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()

# 注册任务
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)

# 读取结果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)

# 关闭:
manager.shutdown()
print('master exit.')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager


# 可以在多个机器上同时开启多个worker
# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass


# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)

# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()

# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()

# 从task队列取任务,并把结果写入result队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n * n)
time.sleep(1)
result.put(r)
except queue.Queue.Empty:
print('task queue is empty.')

# 处理结束:
print('worker exit.')

使用中间件传递消息(基于网络),类似Go语言的channel(基于内存)。

另一种消息传递方式是,直接传递,Actor模型,一般有一个Global Schedule,设计的目的是计算。

Actor消息传递示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# actor.py
from queue import Queue
from threading import Thread, Event


class ActorExit(Exception):
pass


class Actor(object):
def __init__(self):
self._mailbox = Queue()

def send(self, msg):
"向_mailbox提交任务"
self._mailbox.put(msg)

def recv(self):
"从_mailbox获取任务"
msg = self._mailbox.get()
if msg is ActorExit:
raise ActorExit()
return msg

def close(self):
self.send(ActorExit)

def start(self):
"启动线程执行任务"
self._terminated = Event()
t = Thread(target=self._bootstrap)
t.daemon = True
t.start()

def _bootstrap(self):
try:
self.run()
except ActorExit:
pass
finally:
self._terminated.set()

def join(self):
self._terminated.wait()

def run(self):
'''
Run method to be implemented by the user
'''
while True:
msg = self.recv()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# test_actor.py
from .actor import Actor
from threading import Event


class Result(object):
def __init__(self):
self._evt = Event()
self._result = None

def set_result(self, value):
self._result = value
self._evt.set() # 当执行完成计算任务时,解除block

@property
def result(self):
self._evt.wait() # 等待计算结果程序的执行完成,thread block
return self._result


class Worker(Actor):
def submit(self, func, *args, **kwargs):
"注册任务"
r = Result()
self.send((func, args, kwargs, r))
return r

def run(self):
"重写actor的run逻辑,执行用户程序"
while True:
func, args, kwargs, r = self.recv()
r.set_result(func(*args, **kwargs))


if __name__ == '__main__':
worker = Worker()
worker.start()
r = worker.submit(pow, 2, 4)
print('it will not block')
print(r.result)

Ray

基于Master Slaves,Actor的分布式框架。DOC tutorials。分布计算、深度学习调参等等,可是请问 在下去哪里能领到更多的物理机器呢。。。

Global Scheduler:Master上启动了一个全局调度器,用于接收本地调度器提交的任务,并将任务分发给合适的本地任务调度器执行

Redis Server:Master上启动了一到多个Redis Server用于保存分布式任务的状态信息(ControlState),包括对象机器的映射、任务描述、任务debug信息等。

Local Scheduler:每个Slave上启动了一个本地调度器,用于提交任务到全局调度器,以及分配任务给当前机器的Worker进程

Worker:每个Slave上可以启动多个Worker进程执行分布式任务,并将计算结果存储到Object Store,每一个有全局唯一的 Object ID

Object Store:每个Slave上启动了一个Object Store存储只读数据对象,Worker可以通过共享内存的方式访问这些对象数据(通过Object ID),这样可以有效地减少内存拷贝和对象序列化成本。Object Store底层由Apache Arrow实现

Plasma:每个Slave上的Object Store都由一个名为Plasma的对象管理器进行管理,它可以在Worker访问本地Object Store上不存在的远程数据对象时,主动拉取其它Slave上的对象数据到当前机器

Ray简易环境搭建

  • 配置Conda

    1
    2
    3
    $ wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh

    $ sh Miniconda3-latest-Linux-x86_64.sh
  • 安装Ray,推荐python3.7,其他版本存在已知的Bug(官方)

    1
    2
    3
    4
    5
    $ conda create --name ray python=3.7

    $ conda activate ray

    $ pip install ray

Ray 集群搭建:

  • 部署Redis服务(下面假设部署在localhost:6379)
  • 选择任意一台主机作为Master启动 ray start --head --ip localhost --redis-port=6379
  • 在集群其他机器上启动 ray start --address=[head_node_address]:6379

运行一个Map Reduce示例。数据准备,下载Wiki数据,使用WikiExtractor解析:

1
2
$ wget https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles-multistream9.xml-p1791081p2336422.bz2
$ python WikiExtractor.py -o /data enwiki-latest-pages-articles-multistream9.xml-p1791081p2336422.bz2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import argparse
from collections import Counter, defaultdict
import heapq
import numpy as np
import os
import ray

parser = argparse.ArgumentParser()
parser.add_argument("--num-mappers",
help="number of mapper actors used",
default=3,
type=int)
parser.add_argument("--num-reducers",
help="number of reducer actors used",
default=4,
type=int)

@ray.remote
class Mapper(object):
def __init__(self, content_stream):
self.content_stream = content_stream
self.num_articles_processed = 0
self.articles = []
self.word_counts = []

def get_new_article(self):
article = self.content_stream.next()
# Count the words and store the result.
self.word_counts.append(Counter(article.split(" ")))
self.num_articles_processed += 1

def get_range(self, article_index, keys):
"""keys: list of 2 chars
article_index:当前mapper处理的文章index,需要与GlobalScheduler中任务参数进行通信,
保证当前article在整个处理序列中处于article_index的位置"""
# Process more articles if this Mapper hasn't processed enough yet.
while self.num_articles_processed < article_index + 1:
self.get_new_article()
# Return the word counts from within a given character range.
return [(k, v) for k, v in self.word_counts[article_index].items()
if len(k) >= 1 and k[0] >= keys[0] and k[0] <= keys[1]]


@ray.remote
class Reducer(object):
def __init__(self, keys, *mappers):
"针对不同范围的开头字母区间,进行reduce"
self.mappers = mappers
self.keys = keys

def next_reduce_result(self, article_index):
word_count_sum = defaultdict(lambda: 0)
# Get the word counts for this Reducer's keys from all of the Mappers
# and aggregate the results.
count_ids = [
mapper.get_range.remote(article_index, self.keys)
for mapper in self.mappers
]
# From many Mappers
for count_id in count_ids:
for k, v in ray.get(count_id):
word_count_sum[k] += v
return word_count_sum


def get_content(file, floder='/data/'):
file = floder + file
f = open(file, 'r')
return f.read()


class Stream(object):
"数据流生成"

def __init__(self, max, folder):
"""max: 最大提取文件数量
folder: 文件夹名称
"""
self.index = 0
self.max = max
self.folder = folder
self.g = None

def init(self):
self.g = self.content()

def file(self):
return f"wiki_{0}{self.index}" if self.index < 10 else f"wiki_{self.index}"

def content(self):
while self.index < self.max:
yield get_content(self.file(), self.folder)
self.index += 1

def next(self):
"生成器"
if not self.g:
self.init()
return next(self.g)


if __name__ == "__main__":
MAX = 10
args = parser.parse_args()

ray.init()

# Create one streaming source of articles per mapper.
directory = os.path.dirname(os.path.realpath(__file__))
streams = []
folders = ['/data/AA/', '/data/AB/', '/data/AC/']
for i in range(args.num_mappers):
streams.append(Stream(MAX, folders[i % len(folders)]))

# Partition the keys among the reducers.
chunks = np.array_split([chr(i)
for i in range(ord("a"),
ord("z") + 1)], args.num_reducers)
keys = [[chunk[0], chunk[-1]] for chunk in chunks]

# Create a number of mappers.
mappers = [Mapper.remote(stream) for stream in streams]

# Create a number of reduces, each responsible for a different range of
# keys. This gives each Reducer actor a handle to each Mapper actor.
reducers = [Reducer.remote(key, *mappers) for key in keys]

# Most frequent 10 words.
article_index = 0
while True:
print("article index = {}".format(article_index))
wordcounts = {}
counts = ray.get([
reducer.next_reduce_result.remote(article_index)
for reducer in reducers
])
for count in counts:
wordcounts.update(count)
most_frequent_words = heapq.nlargest(10,
wordcounts,
key=wordcounts.get)
for word in most_frequent_words:
print(" ", word, wordcounts[word])
article_index += 1

然后,在每个节点保存一份代码,启动worker和master节点

1
2
3
4
ray start --head --ip localhost --redis-port=6379 

# 修改head_node_address为 master 节点的ip地址
ray start --address=[head_node_address]:6379

在master运行程序得到结果。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!