dak ブログ

python、rubyなどのプログラミング、MySQL、サーバーの設定などの備忘録。レゴの写真も。

apache beam でマルチプロセスで実行

2022-05-02 21:39:22 | python
apache beam でマルチプロセスで実行

apache beam で python でマルチプロセスで実行する方法のメモ。
簡易にマルチプロセスを実現するため、ここでは DirectRunner を使用します。
Pipeline のオプションに以下を指定することで、マルチプロセスでプログラムを実行することができます。
 runner = 'DirectRunner'
 direct_running_mode = 'multi_processing'
 direct_num_workers = {ワーカー数}

■入力データ
今回読み込み対象としているファイルは以下のようなデータで、
単語の出現をカウントします。
Confidence NN B-NP
in IN B-PP
the DT B-NP
pound NN I-NP
is VBZ B-VP

■プログラム
import sys
import os
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import DirectOptions

input_file = '~/nltk_data/corpora/conll2000/train.txt'
output_file = 'data/output.txt'

class ParseLine(beam.DoFn):
    def __init__(self):
        self.re_chomp = re.compile('[\r\n]+$')

    def process(self, line):
        line = self.re_chomp.sub('', line)
        items = line.split(' ')
        if len(items) < 3:
            return

        kw = items[0].lower()
        yield (kw, 1)

opts = PipelineOptions()
opts.view_as(StandardOptions).runner = 'DirectRunner'
opts.view_as(DirectOptions).direct_running_mode = 'multi_processing'
opts.view_as(DirectOptions).direct_num_workers = 2

with beam.Pipeline(options=opts) as p:
    (p
     | beam.io.ReadFromText(input_file)
     | beam.ParDo(ParseLine())
     | beam.CombinePerKey(sum)
     | beam.io.WriteToText(output_file)
    )
</pre>
■実行結果
('existing', 13)
('to', 5080)
('as', 972)
...