apache beam でマルチプロセスで実行
apache beam で python でマルチプロセスで実行する方法のメモ。
簡易にマルチプロセスを実現するため、ここでは DirectRunner を使用します。
Pipeline のオプションに以下を指定することで、マルチプロセスでプログラムを実行することができます。
runner = 'DirectRunner'
direct_running_mode = 'multi_processing'
direct_num_workers = {ワーカー数}
■入力データ
今回読み込み対象としているファイルは以下のようなデータで、
単語の出現をカウントします。
■プログラム
apache beam で python でマルチプロセスで実行する方法のメモ。
簡易にマルチプロセスを実現するため、ここでは DirectRunner を使用します。
Pipeline のオプションに以下を指定することで、マルチプロセスでプログラムを実行することができます。
runner = 'DirectRunner'
direct_running_mode = 'multi_processing'
direct_num_workers = {ワーカー数}
■入力データ
今回読み込み対象としているファイルは以下のようなデータで、
単語の出現をカウントします。
Confidence NN B-NP in IN B-PP the DT B-NP pound NN I-NP is VBZ B-VP
■プログラム
import sys import os import re import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import DirectOptions input_file = '~/nltk_data/corpora/conll2000/train.txt' output_file = 'data/output.txt' class ParseLine(beam.DoFn): def __init__(self): self.re_chomp = re.compile('[\r\n]+$') def process(self, line): line = self.re_chomp.sub('', line) items = line.split(' ') if len(items) < 3: return kw = items[0].lower() yield (kw, 1) opts = PipelineOptions() opts.view_as(StandardOptions).runner = 'DirectRunner' opts.view_as(DirectOptions).direct_running_mode = 'multi_processing' opts.view_as(DirectOptions).direct_num_workers = 2 with beam.Pipeline(options=opts) as p: (p | beam.io.ReadFromText(input_file) | beam.ParDo(ParseLine()) | beam.CombinePerKey(sum) | beam.io.WriteToText(output_file) ) </pre> ■実行結果('existing', 13) ('to', 5080) ('as', 972) ...