multiprocessing入門
multiprocessingといっても今まであったos.forkをうまくラップしてくれてる
というだけなのでプロセスをforkしたらどうなるの?っていうのを知らないと
扱えないと思われる。
forkすると子プロセスには今までの状態がコピーされるがその後はメモリ空間が
別々になる。
(当たり前の話)
だが開いているfdとかファイルエントリテーブルなどは共有される。
どのタイミングでforkするかが個人的には重要。
例ではPIPEはいっさい使ってない。
from multiprocessing import current_process, cpu_count, Process, Manager, Queue import time class Parent(object): def __init__(self, name): self.name = name self.queue = Queue() self.data = Manager().dict() self.data['start'] = True self.processe = [] self.parent_pid = current_process().pid def is_parent(self): return self.parent_pid == current_process().pid def _run(self, data, queue): i = 0 while self.data['start']: if not queue.empty(): pid = current_process().pid try: res = queue.get(timeout=1) except: return print("pid %d %d %s " % (pid,i,res)) i += 1 def start(self): for i in xrange(cpu_count() ): p = Process(target=self._run, args=(self.data, self.queue,)) self.processe.append(p) p.start() def set(self, val): self.queue.put(val, timeout=1) def stop(self): while not self.queue.empty(): time.sleep(0.5) self.data['start'] = False for p in self.processe: p.join() p = Parent('bob') p.start() for i in xrange(1000000): p.set(i) time.sleep(1) p.stop()
シンプルなケース。実行すると実行されてるpidを出力する。
プロセス間通信にはFIFO。
暇なプロセスが取り出しにいくことで分散化。
ただFIFOはサイズに制限がある+ロックがあるのでこのように細かくやり取り
するケースではあまり向かない。
上記だと自プロセスの分があるのでcpu_count -1で子供を作る方がよい。
from multiprocessing import current_process, cpu_count, Process, Manager, Queue import time class Parent(object): def __init__(self, name): self.name = name self.queue = Queue() self.data = Manager().dict() self.data['start'] = True self.processe = [] self.parent_pid = current_process().pid def f(self, data, queue): i = 0 while self.data['start']: if not queue.empty(): pid = current_process().pid try: res = queue.get(timeout=1) except: self.stop() print("pid %d %d %s is parent %s" % (pid,i,res,\ self.parent_pid == pid)) i += 1 else: self.stop() def start(self): for i in xrange(cpu_count() -1): p = Process(target=self.f, args=(self.data, self.queue,)) self.processe.append(p) p.start() self.f(self.data, self.queue) def set(self, val): self.queue.put(val) def stop(self): if self.parent_pid == current_process().pid: while not self.queue.empty(): time.sleep(0.5) self.data['start'] = False for p in self.processe: p.join() p = Parent('bob') for i in xrange(2000): p.set("bob") p.start()
親を自走させながらFIFOに書くのがめんどいの数を少なくした。
(ここを大きくすると止まります)
FIFOのロックに関して言えば一応O_NONBLOCKに対応するメソッド
put_nowait、get_nowaitを持っている。
こいつらを使うと書き込めない、取り出せない場合に即時、Full、Empty
といったエラーが返る。
Manager経由で共有なオブジェクトを生成し、終了条件に使っている。
多分、共有なオブジェクトに対するアクセスが遅いので別の方法がいい。
(whileのループで毎回評価してるので多分遅いはず)
とりあえずサーバ系のサンプル。
シンプル版
from multiprocessing import current_process, Process class Server(object): def __init__(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('localhost', 6000)) sock.listen(50) self.sock = sock self.process = [] def start(self): for i in xrange(1): p = Process(target=self.run) self.process.append(p) p.start() self.run() def run(self): while True: con, addr = self.sock.accept() data = con.recv(8192) con.sendall(data) print('process %d' % current_process().pid) con.close() Server().start()
マニア向け
from twisted.internet.protocol import Protocol, Factory from twisted.internet import reactor from multiprocessing import current_process, Process class Echo(Protocol): def dataReceived(self, data): print(current_process().pid) self.echo(data) def echo(self, data): self.transport.write(data) self.transport.loseConnection() class Server(object): def __init__(self): f = Factory() f.protocol = Echo reactor.listenTCP(6000, f) self.process = [] def start(self): for i in xrange(1): p = Process(target=self.run) self.process.append(p) #p.start() self.run() def run(self): reactor.run() Server().start()
特殊属性向け
from multiprocessing import current_process, Process from eventlet import api def handle_socket(client): print(current_process().pid) data = client.recv(8192) client.sendall(data) client.close() class Server(object): def __init__(self): server = api.tcp_listener(('0.0.0.0', 6000)) self.server = server self.process = [] def start(self): for i in xrange(1): p = Process(target=self.run) self.process.append(p) #p.start() self.run() def run(self): while True: new_sock, address = self.server.accept() api.spawn(handle_socket, new_sock) Server().start()
acceptでプロセスがひとつだけ起きてくれる前提のコードなので注意。
acceptのロックの調整のせいか、multiprocessにしてもそこまで性能あがらないっぽい。
(性能はそれなりにあがる。処理がシンプルすぎるせい)
既存のモジュールなどと併用する際には、内部のどこで何をやっているか具体的に
わからないとどのタイミングでforkするといいのかわからないので注意。
(当たり前。上記のどの例もlistenとaccept間にforkし、ちょんぼをしている)