Asakusaソースコードリーディング(第二回)、ThunderGate編のメモです。
資料: ThunderGate
Togetter:Asakusaソースコードリーディング(第二回)
ThunderGateはHDFSと外部のRDB(今のところはMySQL)とのデータ転送を行う部分。Asakusaフレームワークが「トランザクションをサポート」と言っているのは、ThunderGateにかかっている。
やはりHadoopを使う上ではHDFSと外部とのデータ配信がひとつのポイントになるので、重要。
説明してくださったのは、ThunderGateのアーキテクチャー設計者の埋金さん。
テレメータ(社会インフラ)の開発だとか、RDBでHAクラスターを組んだりとか、UMLaut/J-XML(XML-EDI)を担当してたりしたそう。
(UMLautはウルシステムズさんのJ2EE・Strutsベースのフレームワーク。ふふふ、触ったことあるから知ってますよ…)
ThunderGateの思想としては、RDBがデータ永続層で、HDFSは作業場。
HDFSはNameNodeがSPoFだし、バックアップ/リストアのノウハウが不足。障害が起きた時の復旧が、Hadoopに詳しい人なら復旧できるのかもしれないけれども、実運用に入った際の運用担当者が復旧できるとは限らない(というかまず無理)。
KVSも未知数だし、RDBMSは信頼性が高いので、こういう仕組みになった。
ThunderGateにおけるRDBMSとHDFSの関係は、
RDB→HDFS: インポート
HDFS→RDB: エクスポート
HDFSがおかしくなったら、HDFSをフォーマットして最初から実行する(データ本体はRDBに永続化されているから)。
ThunderGateの制限として、オンラインや他のAsakusaバッチとは排他する。
ThunderGate自身はスケールアウトしないが、数GB程度のデータであれば大丈夫。数百GBは今後の課題。TB級にはフィットしない。TBは無理でもGBは保証しようという考え方になっている。
PBともなると、他の方法を採る必要があるだろう。
対象RDBMSは、OSSということで商用RDBMSでなくMySQLを選択。MySQLとOracleとSybaseを検証しており、MySQLでも大丈夫という判断。
ただ、やはりOracleで使えるようにして欲しいという要望は出ているそう。(僕も自分が担当していた業務がOracleを使っていたので、Oracleを想定してた)
ThunderGateの使用イメージは、企業VLANにRDB(MySQL)を置き、ファイアーウォールをはさんでHadoopクラスターVLANとつなぐ。
MySQLからデータを取り出すのがImporter、それをHDFSに書き込むのがExtractor。
逆にHDFSから取り出すのがCollector、MySQLへ取り込むのがExporter。
(Importer→Extractor間とCollector→Exporter間は、sshでデータ転送している)
障害があったときにデータを戻すのがRecoverer。
最後にHDFSからファイルを消すのがCleaner。
基本的に、各処理(プロセス)はMonkeyMagicから起動される。
HDFSのファイルはテンポラリー的な位置付けなので、理屈上は、そのファイルを使うバッチが終了したら、ファイルを消してもよい。ただし、別ジョブがそのファイルを使うかもしれない(使わないという判断をするのは難しい)し、障害発生時のリカバリーにも使えるので、残している。
RDBへのアクセスに関しては、標準SQL・JDBC標準APIを使って検証したそうだが、Oracleは性能が良かったがMySQL・Sybaseは遅かったらしい。
そこで、MySQLへのアクセスでは、MySQLのロード文とかを使っている。
Importer(MySQL→HDFS)は、
- ロック取得
- MySQLからselect into outfileでTSVファイル作成
- TSVをzipにしてExtractorへ転送(100Mbpsでは効果があったが1Gbpsではオーバーヘッドの方が大きかった為、デフォルトではzip圧縮はオフになっている)
- TSVをSequenceFileに変換してHDFSへ格納(SequenceFileの作成をDBサーバー側で行っても大差無い)
Extractorが分散構造になっていないのは、分散してもDB側は1つなのでボトルネックになる為。
Exporter(HDFS→MySQL)は、概ねwImporterの逆。
- CollectorがHDFSのSequenceFileをTSV化、zipにしてExporterへ転送(Ashigelコンパイラによるメタデータが使用される)
- DBサーバー上でTSVファイル生成
- MySQL上のワークテーブルへロード
- ワークテーブルから対象テーブルへコピー(このとき、重複チェックも実施)
- ロック解除
ワークテーブルを使っている箇所だけ、Importer/Exporterは対称になっていない。
ロックは、いわばレコードロック。ImporterではWHERE条件によって転送対象を絞る想定なので、転送対象(=処理対象)をロックしておき、他ジョブで更新できないようにするという考えらしい。
このロックは、ロック用テーブルのフラグ項目の更新で行う。(RDBMSのロック機構を使っているわけではない。長時間のロックになる可能性がある為)
テーブルロックのような仕組みも用意されているが、たぶん使わないんだそうで^^;
また、楽観的ロックも使用可能。(更新時刻だかバージョン番号だかを比較に使用)
既にロックされている場合にどう動くかは、実行する際に指定する(3通り)。
処理対象レコードが1万件あったら、ロックテーブルにも1万件のレコードが作られることになり、そのフラグ項目更新となると、けっこうコストが高い。処理対象テーブル1つに対しロック用テーブルも1つ用意されるので、対象テーブルが100個あったらロックテーブルも100個存在することになる。
したがってロック有無の調査をするのも大変なので、IMPORT_RECORD_LOCKというテーブルが調査用に用意されている。
ロールバック/ロールフォワード
RDBへ結果を反映させるとき、直接テーブルを更新せずワークテーブルに一旦入れている理由は、1トランザクションで大量レコード入れるとパフォーマンスが悪いから。
ワークテーブルへは(メモリーに入る単位で)複数回のトランザクションに分けて入れる。
ワークテーブルへ入れている最中に失敗したら、ワークテーブルを消してやり直し。(ロールバック)
ワークテーブルに入れた後であれば、ロールフォワード。
また、オートインクリメントのあるテーブルだと、もし直接更新していたら再実行時に不整合になる可能性もあるので、ワークテーブルを使用している。
(これらの理由でワークテーブルを使うことになったので、ついでに重複チェックも行うようにした)
そういった処理も行っている為、ExportはImportの3倍くらいの時間がかかっている。
1~2GBのデータであればそんなにかからないが、TBくらい来たらアウトかも。
実績値としては、Importが20MB/s。MySQLのロード/dumpが30~70MB/s、SSH転送が50MB/s、ギガビットイーサは100MB/s。Importはいずれにも達していないので、遅い。
高速化案としては、
- 並列化できる箇所もある
- Sqoopだとダンプファイルを使わず直接Streamに書いているようだが、標準APIでは無理かも
- SSHをやめる(認証はともかく、データ転送まで暗号化する必要は無いのでは?という考え)
- bzip圧縮だと遅かったが、LZOだといいかも?(未検証)
- 転送を二重化
- DBサーバーをHDFSクライアントとして直接使う
- 複数ノードからHDFSへ転送(現状ではDBが遅いので1ノードから転送している)
- ExporterのSQL実行の並列化(ただし、Oracleは4並列が速かったが、MySQLは並列なしが速かった)
ただし、これで数倍の速度になったとしても、もっと大きいデータになったら焼け石に水なので、他の方法を検討する必要あり。
- キャッシュ使用。バッチの実行までにHDFSにデータを置くようにする。SequenceFileを差分更新するとか、HBaseを使うとか。ただしトランザクション(ロールバック?)とは相性が悪い。
- MySQLがボトルネックになるなら、MySQLインスタンスを増やす(レプリカ)とか。レプリカ作成の速度や完全な複製であることの保証・壊れたときの復旧時間などが懸念事項(レプリカなら、壊れても縮退する(遅くなる)だけかも)
- シャーディングはトランザクションが面倒?
- データの種類に応じて転送方法を変える。マスターやトランザクションはRDB→ThunderGate→HDFS、変更されないデータはThunderGate以外からロード(例えば過去ログをテープからHDFSへ直接ロード)
- 全部RDBに置くのはもったいないので、例外的なデータ・使用しそうにないデータは消したらどうか?→不要そうでも消せない→例外処理のハンドリングをきっちり作る必要がある→データを消すのとプログラムを作っておくのと、どちらのコストが高いか?
SqoopはRDBが複数あるなら速いらしい。
AsakusaがSqoopを採用しなかった理由は、トランザクションが出来ない事と、HadoopからDBへのアクセスが出来ない事。
ThunderGateはロックするサーバーが1つだけという制約があり、複数のThunderGateを実行することは出来ない。Exporterもトランザクションがあるので、1つしか動かせない。
ちょっと休憩をはさんで、ここから本当にソースリーディング(笑)
Oracle対応(MySQLをOracleに置き換える)、HBase対応(DBサーバー側あるいはHadoop側にHBaseを置く)をするとしたらどこを変更する必要があるかを念頭に。
各プログラムの構造は同じような感じなので、分かりやすい。
Oracle対応としては、ImporterのダンプSQLをOracle用のSQLに変え、ResultSetでループしてファイルへ書き出すようにする。ExporterもSQL修正。MySQLのオートインクリメントはOracleの機能を使うよう変更要。もし実行速度が遅いなら、各DBMSの機能を生かすよう修正が必要。(SQL*Loaderをプロセス起動するとか)
SSH転送部分は面白い構造になっていた。
ImporterからSSH経由でリモートでHadoopクラスター上のExtractorプロセスを起動。パイプラインを使ってzipデータを転送している。つまり一旦zipファイルを作っているのではなく、Streamを使って、Importerの標準出力にzipデータを出力し、Extractorの標準入力からそのzipデータを受け取っている。(したがって、デバッグ用でも、うかつにSystem.out.printlnで出力するとzipデータが壊れる事に!^^;)
なお、DBサーバーの方がセキュリティーが強いので、プロセスはDBサーバー側から起動する。(Collectorも、Exporterから起動)
分散処理プログラムではログ出力がボトルネックになったりすることがあるようだが、ThunderGateでは特別なことはしていない。データ量が増えてもログ出力量が増えるわけでもないので。
また、ワークテーブルは自動的に毎回作られる。
ちなみにソースを見ていると面白いのが色々あって、
- zipの圧縮レベル設定がコメントアウトされていた。この状態で検証して大丈夫?^^;(ちなみにSTORED(無圧縮)が実際に使用されているのは初めて見た)
- 「TODO マルチスレッド化」→別プログラムでやってみたけど効果が無かったから、このTODO部分では実装しなかったらしい
- close()の例外をキャッチしたブロックが、コメント「ここで例外が発生した場合は握りつぶす」とe.printStackTrace()
- 例外用の定数名がEXCEPRION(エクセプリオン、格好いい!今度どこかで使おうかな(爆))
close()での例外握りつぶしは自分もよくそういう実装をするけれども、例えばzip解凍のclose()の中でCRCチェックをやって例外が出るとかなっていたら、握りつぶすのは良くないだろうなぁ。少なくともログは出さないとね。
tryブロックで例外が発生していたらその例外にclose時の例外も追加、とか出来ると格好いいんだけどな(笑)
そういえば後から気付いたんだけど、ThunderGateが使用する管理用テーブルや各プロセスの引数にMonkeyMagicのIDが使われているようだけど、MonkeyMagicが無いと動かせないのかな…?
今回は割と機能がイメージしやすかった為か、分かりやすかったです。
ありがとうございました。
P.S.
今回は帰りの電車が止まってた。品川なので迂回はしやすいから、まだましだったけど(苦笑)
いちおう、Experimental Shellというのがあって、ThunderGateと連携したAsakusaのジョブをシェルスクリプトから流すことができるようになっています。スクリプトはAsakusa DSLのコンパイル結果から自動生成されます。実装は確認してないですが、Experimental Shellで実行するとpid的なものが入るのだろうと。