dak ブログ

python、rubyなどのプログラミング、MySQL、サーバーの設定などの備忘録。レゴの写真も。

BigQuery でデータを JSON 化して配列に集約

2022-05-27 22:45:44 | GCP
BigQuery でデータ項目を struct() で JSON 化して、array_agg() で配列に集約する方法のメモ。

BigTable には以下のデータを登録します。
insert into tbl values ('item_01', 'store_01', 100);
insert into tbl values ('item_01', 'store_02', 110);
insert into tbl values ('item_01', 'store_03', 120);

insert into tbl values ('item_02', 'store_01', 200);
insert into tbl values ('item_02', 'store_02', 210);
insert into tbl values ('item_02', 'store_03', 220);

insert into tbl values ('item_03', 'store_04', 300);


以下のクエリで JSON 化したデータを配列に集約します。
struct(store_cd, price) で {"store_cd": ..., "price": ... } の JSON 形式に変換し、array_agg() で item_id が同じレコードを集約します。
select
   item_id
  , array_agg(store_info)
from (
select
   item_id
  , struct(store_cd, price) as store_info
from
  tbl
)
group by
  item_id
;


実行結果は以下の通りで、同一 item_id で {"store_cd": ..., "price": ...} を集約した配列が得られます。
{"item_id": "item_01", "store_infos": [{ "store_cd": "store_03", "price": "120"}, {"store_cd": "store_01", "price": "100"}, {"store_cd": "store_02", "price": "110"}]}
{"item_id": "item_02", "store_infos": [{"store_cd": "store_02", "price": "210"}, {"store_cd": "store_03", "price": "220"}, { "store_cd": "store_01", "price": "200"}]}
{"item_id": "item_03", "store_infos": [{"store_cd": "store_04", "price": "300"}]}



python で gzip ファイルを読み込む方法

2022-05-26 21:29:37 | python
python で gzip されたテキストファイルを読み込む方法のメモ。

■gzip.open() のパラメータでテキスト('rt')、エンコーディングを指定
import gzip

with gzip.open('test.txt.gz', 'rt', 'utf-8') as inst:
    for line in inst:
        print(line, end='')

■各行をバイト列として読み込み
import gzip

with gzip.open('test.txt.gz') as inst:
    for line in inst:
        line = line.decode('utf-8')
        print(line, end='')

■stdin から読み込み
import sys
import gzip

with gzip.open(sys.stdin.buffer, 'rt', 'utf-8') as inst:
    for line in inst:
        print(line, end='')


awk で指定値以上の値の行を抽出

2022-05-19 23:43:34 | linux
awk で指定値以上の値の行を抽出する方法のメモ。

■ファイル
abc     1
def     11
ghi     5


■コマンド
cat test1.txt \
| awk '$2 >= 10 {FS="\t"; OFS="\t"; print $1, $2}'


■実行結果
def     11


python でのマルチプロセスでのデータ処理

2022-05-16 21:00:15 | python
python でマルチプロセスでデータを処理するプログラムです。
mp = MultiProcessUtil(num_proc, func) で func を実行する num_proc の子プロセスを生成し、mp.start() で子プロセスを実行します。

親プロセスから子プロセスへは、mp.childs[i].p2c で送信します。
子プロセスの関数 func は 2 つの引数 (cinfo, args) をとり、cinfo.p2c で親プロセスから送信された情報を読み込みます。

親プロセスが mp.childs[i].p2c を close し、子プロセス側でデータの読み込みが完了すると、子プロセス側の読み込みループが終了します。

親プロセスは mp.join() を実行すると、子プロセスが終了するまで待ちます。
■ライブラリ
import os

class ChildInfo:
    def __init__(self):
        self.id = -1
        self.pid = None
        self.func = None
        self.args = None
        self.status = None
        self.p2c = None
        self.c2p = None
        self.p2c_r = None
        self.p2c_w = None
        self.c2p_r = None
        self.c2p_w = None

class MultiProcessUtil:
    def __init__(self, num_childs, func, args=None):
        self.num_childs = num_childs
        self.childs = [None] * num_childs
        self.func = func
        self.args = args  # len(args) == num_childs
        if self.args is None:
            self.args = [None] * self.num_childs

        for i in range(0, self.num_childs):
            cinfo = ChildInfo()
            cinfo.id = i
            cinfo.func = self.func
            cinfo.args = self.args[i]
            cinfo.p2c_r, cinfo.p2c_w = os.pipe()
            cinfo.c2p_r, cinfo.c2p_w = os.pipe()
            self.childs[i] = cinfo

    def start(self):
        for i in range(0, self.num_childs):
            cinfo = self.childs[i]

            pid = os.fork()
            if pid == 0:
                # child
                # 不要な fd を close
                for j in range(0, i):
                    os.close(self.childs[j].p2c_w)
                    os.close(self.childs[j].c2p_r)

                os.close(cinfo.p2c_w)
                cinfo.p2c_w = None
                os.close(cinfo.c2p_r)
                cinfo.c2p_r = None
                cinfo.p2c = os.fdopen(cinfo.p2c_r, 'r')
                cinfo.c2p = os.fdopen(cinfo.c2p_w, 'w')
                try:
                    res = cinfo.func(cinfo, cinfo.args)
                except Exception as e:
                    res = 1
                exit(res)
            else:
                # parent
                # 不要な fd を close
                cinfo.pid = pid
                os.close(cinfo.p2c_r)
                cinfo.p2c_r = None
                os.close(cinfo.c2p_w)
                cinfo.c2p_w = None
                cinfo.p2c = os.fdopen(cinfo.p2c_w, 'w')
                cinfo.c2p = os.fdopen(cinfo.c2p_r, 'r')

        return self.childs

    def join(self):
        for child in self.childs:
            pid, status = os.wait()
            child.status = status

■プログラム
import sys
import os
import re
from multi_process_util import MultiProcessUtil

def proc1(cinfo, args=None):
    print('run: %d' % (os.getpid()))

    for line in cinfo.p2c:
        line = re.sub('[\r\n]+$', '', line)
        print('[%d]: [%s]' % (os.getpid(), line))

    return 0

def main():
    num_procs = 2
    mp = MultiProcessUtil(num_procs, proc1)
    mp.start()

    line_no = -1
    for line in sys.stdin:
        line_no += 1
        c = line_no % num_procs
        #print('c: %d' % (c))
        #print(mp.childs[c].json())
        mp.childs[c].p2c.write(line)

    for child in mp.childs:
        child.p2c.close()

    mp.join()
    return 0

if __name__ == '__main__':
    res = main()
    exit(res)

■入力ファイル
abc
def
ghi
jkl
mno
pqr
stu
vwx
012
345
678

■実行結果
run: 8223
[8223]: [def]
run: 8222
[8222]: [abc]
[8223]: [jkl]
[8223]: [pqr]
[8223]: [vwx]
[8223]: [345]
[8222]: [ghi]
[8222]: [mno]
[8222]: [stu]
[8222]: [012]
[8222]: [678]


apache beam のマルチプロセスの動作確認

2022-05-10 20:45:23 | python
apache beam でマルチプロセスで実行し、各処理でプロセスIDを確認します。
ここでは以下の形式のデータを処理します。
Confidence NN B-NP
in IN B-PP
the DT B-NP

以下のプログラムでは、ParDo() で WordReader を並列化し、以降の処理をシリアルに実行します。
各処理でプロセスIDを取得し、最後に行番号、プロセスID、データを出力します。

■プログラム
import sys
import os
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import DirectOptions

input_file = '~/nltk_data/corpora/conll2000/train.txt'

class WordReader(beam.DoFn):
    def __init__(self):
        self.line_no = 0

    def process(self, line):
        self.line_no += 1
        items = line.split(' ')
        if len(items) < 3:
            return
        obj = {
            'line_no': self.line_no,
            'form': items[0],
            'pos': items[1],
            'tag': items[2],
            'pids': [os.getpid()],
        }
        yield obj

opts = PipelineOptions()
opts.view_as(StandardOptions).runner = 'DirectRunner'
opts.view_as(DirectOptions).direct_running_mode = 'multi_processing'
opts.view_as(DirectOptions).direct_num_workers = 2

def form_lower(obj):
    obj['form'] = obj['form'].lower()
    obj['pids'].append(os.getpid())
    return obj

def pos_lower(obj):
    obj['pos'] = obj['pos'].lower()
    obj['pids'].append(os.getpid())
    return obj

def tag_lower(obj):
    obj['tag'] = obj['tag'].lower()
    obj['pids'].append(os.getpid())
    return obj

def print_obj(obj):
    print("%d\t%s [%s, %s, %s]" %
          (obj['line_no'],
           obj['pids'],
           obj['form'],
           obj['pos'],
           obj['tag']
           ))
    return obj

with beam.Pipeline(options=opts) as p:
    (p
     | beam.io.ReadFromText(input_file)
     | beam.ParDo(WordReader())
     | beam.Map(form_lower)
     | beam.Map(pos_lower)
     | beam.Map(tag_lower)
     | beam.Map(print_obj)
    )
</pre>
■出力結果
WordReader が並列で実行されるため、行番号がプロセス毎に1から始まっています。
1       [3768, 3768, 3768, 3768] [its, prp$, b-np]
2       [3768, 3768, 3768, 3768] [existing, vbg, i-np]
3       [3768, 3768, 3768, 3768] [authorization, nn, i-np]
...
1       [3769, 3769, 3769, 3769] [confidence, nn, b-np]
2       [3769, 3769, 3769, 3769] [in, in, b-pp]
3       [3769, 3769, 3769, 3769] [the, dt, b-np]
...


apache beam によるパイプラインでのデータ加工処理の例

2022-05-09 23:26:48 | python
apache beam でパイプラインでデータ加工処理を行ってみました。
以下のプログラムでは jsonl のデータを読み込み、price を 2倍する処理と、price に 1000 を加算する処理をパイプラインでつないで実行しています。
■プログラム
import sys
import os
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import DirectOptions

input_file = 'data/test.jsonl'

opts = PipelineOptions()
opts.view_as(StandardOptions).runner = 'DirectRunner'
opts.view_as(DirectOptions).direct_running_mode = 'multi_processing'
opts.view_as(DirectOptions).direct_num_workers = 4

def price_mul2(jsonl_str):
    obj = json.loads(jsonl_str)
    obj['price'] *= 2
    jsonl_str = json.dumps(obj, ensure_ascii=False)
    return jsonl_str

def price_add1k(jsonl_str):
    obj = json.loads(jsonl_str)
    obj['price'] += 1000
    jsonl_str = json.dumps(obj, ensure_ascii=False)
    return jsonl_str

def print_obj(jsonl_str):
    print(jsonl_str)
    return jsonl_str

with beam.Pipeline(options=opts) as p:
    (p
     | beam.io.ReadFromText(input_file)
     | beam.Map(price_mul2)
     | beam.Map(price_add1k)
     | beam.Map(print_obj)
    )

■入力データ
{"id": "id_001", "title": "item 001", "price": 101}
{"id": "id_002", "title": "item 002", "price": 102}
{"id": "id_003", "title": "item 003", "price": 103}
{"id": "id_004", "title": "item 004", "price": 104}
{"id": "id_005", "title": "item 005", "price": 105}
{"id": "id_006", "title": "item 006", "price": 106}
{"id": "id_007", "title": "item 007", "price": 107}
{"id": "id_008", "title": "item 008", "price": 108}
{"id": "id_009", "title": "item 009", "price": 109}
{"id": "id_010", "title": "item 010", "price": 110}
{"id": "id_011", "title": "item 011", "price": 111}
{"id": "id_012", "title": "item 012", "price": 112}
{"id": "id_013", "title": "item 013", "price": 113}
{"id": "id_014", "title": "item 014", "price": 114}
{"id": "id_015", "title": "item 015", "price": 115}
{"id": "id_016", "title": "item 016", "price": 116}
{"id": "id_017", "title": "item 017", "price": 117}
{"id": "id_018", "title": "item 018", "price": 118}
{"id": "id_019", "title": "item 019", "price": 119}
{"id": "id_020", "title": "item 020", "price": 120}

■実行結果
{"id": "id_001", "title": "item 001", "price": 1202}
{"id": "id_002", "title": "item 002", "price": 1204}
{"id": "id_003", "title": "item 003", "price": 1206}
{"id": "id_004", "title": "item 004", "price": 1208}
{"id": "id_005", "title": "item 005", "price": 1210}
{"id": "id_006", "title": "item 006", "price": 1212}
{"id": "id_007", "title": "item 007", "price": 1214}
{"id": "id_008", "title": "item 008", "price": 1216}
{"id": "id_009", "title": "item 009", "price": 1218}
{"id": "id_010", "title": "item 010", "price": 1220}
{"id": "id_011", "title": "item 011", "price": 1222}
{"id": "id_012", "title": "item 012", "price": 1224}
{"id": "id_013", "title": "item 013", "price": 1226}
{"id": "id_014", "title": "item 014", "price": 1228}
{"id": "id_015", "title": "item 015", "price": 1230}
{"id": "id_016", "title": "item 016", "price": 1232}
{"id": "id_017", "title": "item 017", "price": 1234}
{"id": "id_018", "title": "item 018", "price": 1236}
{"id": "id_019", "title": "item 019", "price": 1238}
{"id": "id_020", "title": "item 020", "price": 1240}


python で XML を SAX でパーズ

2022-05-06 23:33:31 | python
python で XML を SAX でパーズして、JSONL で出力します。
■XML
<?xml version=&quote;1.0&quote;?>
<items>
  <item id=&quote;item1&quote;>
    <title>商品1</title>
    <price>100</price>
  </item>
  <item id=&quote;item2&quote;>
    <title>商品2</title>
    <price>500</price>
  </item>
  <item id=&quote;item3&quote;>
    <title>商品3</title>
    <price>1000</price>
  </item>
</items>

■プログラム
startElement() には開始タグを読み取った際の処理を記述し、
endElement() には終了タグを読み取った際の処理を記述します。
そして、characters() にはタグ内の文字列を読み取った際の処理を記述します。
import sys
import json
import xml.sax
import xml.sax.handler

class XmlToJsonHandler(xml.sax.handler.ContentHandler):
    def __init__(self):
        self.tags = []
        self.item = None

    def create_item(self):
        item = {
            'id': '',
            'title': '',
            'price': '',
        }
        return item

    def create_tag(self, name, attrs):
        tag = {
            'name': name,
            'attrs': attrs,
        }
        return tag

    def startElement(self, name, attrs):
        tag = self.create_tag(name, attrs)
        self.tags.append(tag)

        if name == 'item':
            self.item = self.create_item()
            self.item['id'] = attrs.get('id')

    def endElement(self, name):
        self.tags.pop()
        if name == 'item':
            print(json.dumps(self.item, ensure_ascii=False))

    def characters(self, text):
        name = self.tags[-1]['name']
        if name == 'title':
            self.item[name] = text
        elif name == 'price':
            self.item[name] = int(text)

def main():
    parser = xml.sax.make_parser()
    parser.setContentHandler(XmlToJsonHandler())
    parser.parse(sys.stdin)
    return 0

if __name__== '__main__':
    res = main()
    exit(res)

■実行結果
上記の XML は以下の JSONL に変換されます。
{"id": "item1", "title": "商品1", "price": 100}
{"id": "item2", "title": "商品2", "price": 500}
{"id": "item3", "title": "商品3", "price": 1000}

apache beam でマルチプロセスで実行

2022-05-02 21:39:22 | python
apache beam でマルチプロセスで実行

apache beam で python でマルチプロセスで実行する方法のメモ。
簡易にマルチプロセスを実現するため、ここでは DirectRunner を使用します。
Pipeline のオプションに以下を指定することで、マルチプロセスでプログラムを実行することができます。
 runner = 'DirectRunner'
 direct_running_mode = 'multi_processing'
 direct_num_workers = {ワーカー数}

■入力データ
今回読み込み対象としているファイルは以下のようなデータで、
単語の出現をカウントします。
Confidence NN B-NP
in IN B-PP
the DT B-NP
pound NN I-NP
is VBZ B-VP

■プログラム
import sys
import os
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import DirectOptions

input_file = '~/nltk_data/corpora/conll2000/train.txt'
output_file = 'data/output.txt'

class ParseLine(beam.DoFn):
    def __init__(self):
        self.re_chomp = re.compile('[\r\n]+$')

    def process(self, line):
        line = self.re_chomp.sub('', line)
        items = line.split(' ')
        if len(items) < 3:
            return

        kw = items[0].lower()
        yield (kw, 1)

opts = PipelineOptions()
opts.view_as(StandardOptions).runner = 'DirectRunner'
opts.view_as(DirectOptions).direct_running_mode = 'multi_processing'
opts.view_as(DirectOptions).direct_num_workers = 2

with beam.Pipeline(options=opts) as p:
    (p
     | beam.io.ReadFromText(input_file)
     | beam.ParDo(ParseLine())
     | beam.CombinePerKey(sum)
     | beam.io.WriteToText(output_file)
    )
</pre>
■実行結果
('existing', 13)
('to', 5080)
('as', 972)
...


python で Apache Beam を使ってみた

2022-05-01 16:54:00 | python
python での Apache Beam によるデータ処理のサンプルプログラム。

■プログラム1
import apache_beam as beam

with beam.Pipeline() as p:
    (p
     | beam.Create(['abc', 'def', 'ghi', 'abc', 'def', 'abc'])
     | beam.Map(lambda str: (str, 1))
     | beam.CombinePerKey(sum)
     | beam.Map(print)
    )

リスト内の文字列の出現頻度をカウントするプログラムです。
Create([...]) で文字列のリストを生成します。
Map(lambda str: (str, 1)) で、各文字列を出現頻度1回として、データを生成します。
CombinePerKey(sum) では、タプルの先頭要素(=文字列)をキーとして、同じキーの出現頻度を合計します。
Map(print) で各文字列毎に集計結果を出力します。

■実行結果1
('abc', 3)
('def', 2)
('ghi', 1)

■プログラム2
import apache_beam as beam
import re

input = 'data/input_*.txt'
output = 'data/output.txt'

with beam.Pipeline() as p:
    (p
     | beam.io.ReadFromText(input)
     | beam.FlatMap(lambda line: re.findall(r'[a-zA-Z0-9]+', line))
     | beam.Map(lambda str: (str, 1))
     | beam.CombinePerKey(sum)
     | beam.io.WriteToText(output)
    )

プログラム1 と同様に文字列の出現数をカウントしますが、入出力がファイルになっています。
ファイルからの読み込みには io.ReadFromText() を使用します。
ファイル名に * を含めることができ、複数のファイルを処理対象にすることができます。
ファイルへの出力は io.WriteToText() を使用します。
出力ファイル名には -mmmmm-of-nnnnn の形式で全nnnnnファイルの通し番号が付与されます。

■入力ファイル
data/input_1.txt:
w1
w1 w2
w1 w2 w3
w1 w2 w3 w4
w1 w2 w3 w4 w5

data/input_2.txt:
w11
w11 w12
w11 w12 w13
w11 w12 w13 w14
w11 w12 w13 w14 w15

■出力ファイル
data/output.txt-00000-of-00001
('w1', 5)
('w2', 4)
('w3', 3)
('w4', 2)
('w5', 1)
('w11', 5)
('w12', 4)
('w13', 3)
('w14', 2)
('w15', 1)