dak ブログ

python、rubyなどのプログラミング、MySQL、サーバーの設定などの備忘録。レゴの写真も。

python でのマルチプロセスでのデータ処理

2022-05-16 21:00:15 | python
python でマルチプロセスでデータを処理するプログラムです。
mp = MultiProcessUtil(num_proc, func) で func を実行する num_proc の子プロセスを生成し、mp.start() で子プロセスを実行します。

親プロセスから子プロセスへは、mp.childs[i].p2c で送信します。
子プロセスの関数 func は 2 つの引数 (cinfo, args) をとり、cinfo.p2c で親プロセスから送信された情報を読み込みます。

親プロセスが mp.childs[i].p2c を close し、子プロセス側でデータの読み込みが完了すると、子プロセス側の読み込みループが終了します。

親プロセスは mp.join() を実行すると、子プロセスが終了するまで待ちます。
■ライブラリ
import os

class ChildInfo:
    def __init__(self):
        self.id = -1
        self.pid = None
        self.func = None
        self.args = None
        self.status = None
        self.p2c = None
        self.c2p = None
        self.p2c_r = None
        self.p2c_w = None
        self.c2p_r = None
        self.c2p_w = None

class MultiProcessUtil:
    def __init__(self, num_childs, func, args=None):
        self.num_childs = num_childs
        self.childs = [None] * num_childs
        self.func = func
        self.args = args  # len(args) == num_childs
        if self.args is None:
            self.args = [None] * self.num_childs

        for i in range(0, self.num_childs):
            cinfo = ChildInfo()
            cinfo.id = i
            cinfo.func = self.func
            cinfo.args = self.args[i]
            cinfo.p2c_r, cinfo.p2c_w = os.pipe()
            cinfo.c2p_r, cinfo.c2p_w = os.pipe()
            self.childs[i] = cinfo

    def start(self):
        for i in range(0, self.num_childs):
            cinfo = self.childs[i]

            pid = os.fork()
            if pid == 0:
                # child
                # 不要な fd を close
                for j in range(0, i):
                    os.close(self.childs[j].p2c_w)
                    os.close(self.childs[j].c2p_r)

                os.close(cinfo.p2c_w)
                cinfo.p2c_w = None
                os.close(cinfo.c2p_r)
                cinfo.c2p_r = None
                cinfo.p2c = os.fdopen(cinfo.p2c_r, 'r')
                cinfo.c2p = os.fdopen(cinfo.c2p_w, 'w')
                try:
                    res = cinfo.func(cinfo, cinfo.args)
                except Exception as e:
                    res = 1
                exit(res)
            else:
                # parent
                # 不要な fd を close
                cinfo.pid = pid
                os.close(cinfo.p2c_r)
                cinfo.p2c_r = None
                os.close(cinfo.c2p_w)
                cinfo.c2p_w = None
                cinfo.p2c = os.fdopen(cinfo.p2c_w, 'w')
                cinfo.c2p = os.fdopen(cinfo.c2p_r, 'r')

        return self.childs

    def join(self):
        for child in self.childs:
            pid, status = os.wait()
            child.status = status

■プログラム
import sys
import os
import re
from multi_process_util import MultiProcessUtil

def proc1(cinfo, args=None):
    print('run: %d' % (os.getpid()))

    for line in cinfo.p2c:
        line = re.sub('[\r\n]+$', '', line)
        print('[%d]: [%s]' % (os.getpid(), line))

    return 0

def main():
    num_procs = 2
    mp = MultiProcessUtil(num_procs, proc1)
    mp.start()

    line_no = -1
    for line in sys.stdin:
        line_no += 1
        c = line_no % num_procs
        #print('c: %d' % (c))
        #print(mp.childs[c].json())
        mp.childs[c].p2c.write(line)

    for child in mp.childs:
        child.p2c.close()

    mp.join()
    return 0

if __name__ == '__main__':
    res = main()
    exit(res)

■入力ファイル
abc
def
ghi
jkl
mno
pqr
stu
vwx
012
345
678

■実行結果
run: 8223
[8223]: [def]
run: 8222
[8222]: [abc]
[8223]: [jkl]
[8223]: [pqr]
[8223]: [vwx]
[8223]: [345]
[8222]: [ghi]
[8222]: [mno]
[8222]: [stu]
[8222]: [012]
[8222]: [678]


この記事についてブログを書く
« apache beam のマルチプロセ... | トップ | awk で指定値以上の値の行を抽出 »

python」カテゴリの最新記事