goo blog サービス終了のお知らせ 

P2Pとかプログラミング全般とか

P2Pアプリ開発を目指していこうかと。
基本、週末更新なので遅々として進まず。

Version0.9.2 が出た

2009-05-04 12:50:52 | Overlay Weaver
4/18 に 0.9.1 がリリースされていたので本当なら触っておかないといけなかった
んだけど仕事があってほったらかしに。
そうこうしているうちに、英語のメーリングリストでの指摘から Chord の実装で
最善ではない次ホップを選んでしまう場合があるバグがあったとのこと。
そのことプラス、スレッドプールについて洞察が深まったとのこと
この2つが組み合わさっての 0.9.2 となった模様。

0.9.1 を見ていないので、本当はそちらでの修正だったのかもしれないけれど、
0.9 から 0.9.2 への変更でいくつかメソッドが修正になっていた。

(1)DHT.setTTLForPut の引数が long から int をとるようになった。
(2)AbstractMessagingProvider から Executors がなくなった。
(3)インターフェイス DHT から高レベルサービス(DHT の本質ではない、
 主に情報取得系のメソッド類)がインターフェイス ow.util.HighLevelService として分離。
(4)HighLevelService.getRoutingTableString が int 型の詳細レベルを
 引数として取るようになった。
 これは最終的には IDAddressPair の出力形式の指定となる。
 詳細レベルが 0 未満の場合、ID は 2 byte 分しか出力されない。
 詳細レベルが 0 未満の場合、InetMessagingAddress のポート出力は省略される。
(5)DHT.getLastRoutingResult が HighLevelService..getLastRoutingResults に変更。
 戻り値が RoutingResult の配列に変更。
(6)DHT.getLastKeyString が HighLevelService..getLastKeys として ID[] を返すように変更。

まだ変更はあるかも。
残念ながら、ow.messaging.tcp パッケージはスレッドプールを使うようには変更
されなかった模様。このあたりは根本的に直さないとダメかなぁ。

首藤さんが Windows で全スレッドのダンプを取るため苦戦している模様
jps + jstack(日本語 Doc を見ると印刷すると書かれてるけどたぶん誤訳で、単純に表示するだけのはず)
とか jconsole とかでダメなのかな。

自ノードのアドレス設定を行うと selfID が変更される

2008-12-23 22:39:13 | Overlay Weaver
ちょっと困ったことになった。
ネットワークカードにグローバルアドレスが割り振られている場合、つまり NAT 環境
でない場合は問題にならないが、NAT 環境だと問題が出る。まあ無視すればいい問題
でもあるのだが…。

NAT 環境の場合、DHTConfiguration.setSelfAddress には INADDR_ANY か、もしくは
NIC に割り当てられたアドレスを指定する必要がある(これは bind を行う際に
このアドレスが使用されるため)。
しかしこのアドレスは他ノードへのメッセージに送信元情報として付与される。
となると当然ながら INADDR_ANY やらプライベートアドレスをいつまでも設定して
おくわけにはいかない。
そこで何とかしてグローバルアドレスを手に入れて、MessageReceiver.setSelfAddress
を呼ぶ必要がある。しかしこれを呼んだ場合、selfID が変更されてしまう。
これは IterativeRoutingDriver, RecursiveRoutingDriver の親である
AbstractRoutingDriver の getSelfIDAddressPair が、現在 RoutingDriver
が知っているアドレスと MessageReceiver のアドレスが異なっていた場合、
IDAddressPair.setAddressAndRecalculateID を呼んでアドレスと ID の両方
を変更するためにおこる。
最終的にはアドレスのハッシュ値を取り、それを SHA1 に通して ID にする。
ID なんて勝手に割り振ってくれればいいや、という場合には問題にならないが、
変更してほしくないときには回避不能の大問題となる。IterativeRoutingDriver, RecursiveRoutingDriver
は final クラスだからメソッドのオーバーライドもできないし。
ソースを改変するしかないのかなぁ…。

OutOfMemory のデバッグ完了

2008-11-03 22:17:13 | Overlay Weaver
http://cid-7862a61060e90b1f.skydrive.live.com/browse.aspx/NicoCacheWithOverlayWeaver?view=details

OutOfMemory の原因は、他ノードへのキャッシュ要求をかけた際の返答を受け取る
TaskGet がハッシュ表に残ってしまうせいだった。本来なら返答を受けたらすぐに
ハッシュ表から削除するはずが、削除されていなかった。基本的なミス。
デバッグには JDK6 update 10 から標準添付された Java VisualVM を使った。
本当に便利だ、このツール。ヒープダンプすればどこがメモリを一番食ってるか
一目瞭然。もちろん対象インスタンスをどのクラスが保持しているかも分かる。
これはお勧め。ただこれを使うと RMI のあたりでログが出まくるので、なんらか
の方法で抑止しないとログが流れちゃうなぁ…。

デバッグ中

2008-10-13 20:09:17 | Overlay Weaver
http://cid-7862a61060e90b1f.skydrive.live.com/browse.aspx/NicoCacheWithOverlayWeaver?view=details

ある程度動くようになってきた。他ノードからキャッシュを受け取れるように
なったので、そこを中心にデバッグ中。
ファイルサイズの大きい動画だと OutOfMemory 例外が出てしまうようなので
調査中。
あと、サーバからのレスポンスヘッダがどうしても必要だとわかったため、そこ
のコードを追加した。毎回サーバに HEAD メソッドで問い合わせてしまうと
サーバ負荷の低減にならないが、偽のヘッダを返すようにしてしまうと、いざ
新しいレスポンスヘッダが必要なときに困ってしまう。
仕方がないので、一度 HEAD メソッドでレスポンスヘッダをもらい、それを
キャッシュして使いまわすようにした。あまり性質はよくないが、ここまで
が限界かと。一応、キャッシュすると「nicocache_nl_ow_response_header.txt」
というファイル名でカレントディレクトリにファイルを作るようにした。
これを削除すれば再起動せずとも HEAD を再発行するようしたので、必要で
あれば削除して、レスポンスヘッダを再取得するという形で納得することにした。


バイパスを作るには

2008-08-29 15:07:55 | Overlay Weaver
プロキシ側と OW 側をどうつなぐか、ある程度プログラムを作ったのでデバッグ
してないけどアップ。

http://cid-7862a61060e90b1f.skydrive.live.com/browse.aspx/NicoCacheWithOverlayWeaver?view=details

つなぐ際に以下を目的とした。

(1)プロキシ側の書き換えを最小限に。
(2)OW から得たキャッシュをローカルに保存できるように。
(3)なんらかの理由で OW から得られなかった場合、通常の HTTP へ fall back する。

これらを満たすとなると取れる方法は限られる。結局、HttpURLConnection
を継承したクラスを作ることにした。内部で OW 側からキャッシュを取得し、
もし失敗した場合には従来の HttpURLConnection インスタンスをそのまま
呼び出して fall back する。
組み込み先は dareka.processor.URLResource の transferTo メソッド。

デバッグした

2008-07-08 22:14:40 | Overlay Weaver
owemu で起動できるようにしたり、デバッグしたり。
http://cid-7862a61060e90b1f.skydrive.live.com/browse.aspx/NicoCacheWithOverlayWeaver?view=details

ログを java.util.logging 経由で出すようにしたので、logging.properties

java -Djava.util.logging.config.file=./logging.properties
みたいに指定して起動すべし。
今つかっているプロパティは、
handlers=java.util.logging.ConsoleHandler
java.util.logging.ConsoleHandler.level = ALL

#.level=WARNING
.level=ALL
dareka.level=WARNING
dareka.ow.level=ALL
# for OW.
dht.level=WARNING
routing.level=WARNING
directory.level=WARNING
id.level=WARNING
ipmulticast.level=WARNING
messaging.level=WARNING
statcollector.level=WARNING
util.level=WARNING
mcast.level=WARNING
mrouted.level=INFO

こんな感じ。Overlay Weaver の Logging が階層構造になってないせいで
個別にレベル指定しなきゃいけないのがうざい。getClass().toString()
で Logger.getLogger してくれれば ow.level=WARNING で抑止できる
ものを…。

メッセージピープのために

2008-06-01 21:54:14 | Overlay Weaver
DHT に put する場合、実際に DHT のネットワークにちゃんと参加できているのか、
それともまだ安定していない、もしくは参加できていないのかが重要になる。
標準ではそのような情報が得られないようなので、別の方法を考えてみた。

他のノードから REQ_SUCCESSOR, REQ_PREDECESSOR がくるはずなので、それを
検知して判断することを考える。まずはそのメッセージをどう受信するかだ。
以前書いたとおり、これらの Tag の値で MessageHandler を登録してしまうと
メッセージに対して null を返すほか無くなる(実際には応答メッセージを
返さなければならない)ので、なんとかして実際の MessageHandler より前に
ハンドラを登録する必要がある。

ここで JavaDoc やらドキュメントやらを眺めてみると ow.stat がある。
これは統計用にメッセージをフックしつつも実際のハンドラに影響を与えていない、
つまりは実際のハンドラよりも前に処理用ハンドラを登録することに成功している。
ただし ow.stat そのものは使えない。MessagingReporter はリポート用に
外部に接続を作成してしまう。そこまでする必要はない。なのでどうやって
ハンドラを登録しているのか調べてみた。
調査をしてみると、要するに Tag の値によってハンドラを呼び分ける MessageHandler
を登録するよりも前にフック用ハンドラを登録することでこれを実現していることがわかった。
Tag 値呼び分けハンドラは ow.routing.impl.AbstractRoutingDriver のコンストラクタ
で登録される。このクラスは IterativeRoutingDriver, RecursiveRoutingDriver
の親クラスのため、ルーティング方法が決定するより前に MessageReceiver を
作成できれば OK ということになる。

ow.stat ではどうやっているかというと、StatFactory.getMessagingCollector
で MessagingCollector を作成し、メンバメソッドの getMessagingProvider,
getMessageReceiver で作成している。これらで作成した時点ではルーティング用の
ドライバが作成されないので、フック用の MessageHandler が登録し放題となる。

と、これで問題なければよかったのだがそうは上手くいかなかった。テスト用に
owemu で実行したところ、NullPointerException 発生。調査したところ、
MessageReceiver に登録されているハンドラがフック用のもの1個だけになってる。
どうなってる?Tag 値呼び分けハンドラが登録されるはずでは?

さらに調べると、エミュレータ実行時の MessagingProvider である EmuMessagingProvider
の substitute() メソッドが新しいインスタンスを返している!
MessagingProvider は MessagingFactory.getProvider で作成される。普通なら
同一 Provide が返されるのだが、実装の中で substitute() を呼び出し、
null でなければ既に作成されているインスタンスの代わりにこれを返す。
これは要するにエミュレータ実行時に、仮想ノード毎に異なる Provide インスタンス
を返すための機能なのだと思う。
MessageReceiver は Provider によって生成されるが、通常はひとつ作れば
何度取得しようが同一インスタンスを返す。異なる Provide インスタンスになれば
当然異なる MessageReceiver を返す。
ow.stat に似せてフック用ハンドラを登録したとしても、DHT インスタンス
作成時に MessagingFactory.getProvider を呼ばれてしまうと、異なる Provider が
返され、フックが登録されていない MessageReceiver が作成され、そちらに
Tag 値呼び分けハンドラが登録されるわけだ。
で、どちらの MessageReceiver にメッセージが届けられるかといえば、フック用
ハンドラを登録した最初のインスタンス。こちらが emu0 のアドレスを振られるから。
で、こちらには Tag 値呼び分けハンドラがない→正常な要求メッセージに
null 値を返すしかなくなる→NullPointerException となるわけだ。
これはエミュレータでしかおきない。なぜならエミュレータ以外の TCP, UDP 用の
MessagingProvider では substitute() が同一インスタンスを返すため。
同一 Provider だから、最初にフック用ハンドラを登録した MessageReceiver を、
Tag 値呼び分けハンドラ登録時にも Provider が返してくれる。
ということで、エミュレータ実行時は ow.stat は使えないわけだね。

で、ここであきらめてはプライドが許さない。これでもプログラマの端くれ。矜持は
あるつもり。
要は DHT 作成時に MessagingFactory.getProvider を呼ばず、こちらが用意した
Provider を受け入れてくれればいいはず。しかしどこをさがしても MessagingProvider
を受けてくれない。で、StandardDHTImpl をゴニョゴニョ探したところ、
コンストラクタが2つあることに気づく。DHTConfiguration を必要とするものと、
RoutingService を必要とするもの。比較してみると、前者は要するに RoutingService
を作成することが最終目的になっているようだ。後者のコンストラクタを
使用すれば、MessagingFactory.getProvider を呼ばずに DHT が構築できる。
つまりエミュレータ環境でも受信メッセージフックができる。

で、なんとか RoutingService を作成して DHT を作ってみると…。やっぱり NullPointerException。
なんでじゃと思ってさらに調べると、RoutingAlgorithm が null になってた。
StandardDHTImpl のコンストラクタを良く調べてみると、RoutingAlgorithmProvider
の initializeAlgorithmInstance を呼ばないと AbstractRoutingDriver に
アルゴリズムが登録されないらしい(AbstractRoutingAlgorithm のコンストラクタと、
AbstractRoutingDriver.setRoutingAlgorithm を調べてみるべし。特に後者を
使用しているところを Eclipse で検索すれば、他で登録していないことがわかる)。
で、最終的にはちゃんとフックできましたとさ。お疲れ様。

もうこういう面倒な調査しないでもフック(というか peep?)できるように
して欲しいよ、ホント…。


// メッセージフック設定。
RoutingService routingSvc = null;
if (cmd.hasOption("sam")) {
    StatConfiguration statConf = StatFactory.getDefaultConfiguration();
    DHTConfiguration dhtConf = DHTFactory.getDefaultConfiguration();
    statConf.setMessagingTransport(null != transport ? transport
            : dhtConf.getMessagingTransport());
    if (null != selfAddressAndPort) {
        MessagingUtility.HostAndPort hostAndPort = MessagingUtility
                .parseHostnameAndPort(selfAddressAndPort, dhtConf.getSelfPort());
        statConf.setSelfAddress(hostAndPort.getHostName());
        statConf.setSelfPort(hostAndPort.getPort());
    }
    statConf.setDoUPnPNATTraversal(false);
    try {
        MessagingCollector messagingCollector = StatFactory
                .getMessagingCollector(statConf);
        messagingCollector.getMessageReceiver().addHandler(
            new MessageHandler() {
                public Message process(Message msg)
                        throws Exception {
                    showReceivedMessage(msg);
                    return null;
                }
            });
        // RoutingService 作成。
        RoutingServiceProvider svcProvider = RoutingServiceFactory.getProvider(
                null != routingStyle ? routingStyle : DHTConfiguration.DEFAULT_ROUTING_STYLE);
        RoutingAlgorithmProvider algoProvider = RoutingAlgorithmFactory.getProvider(
                null != algorithm ? algorithm : DHTConfiguration.DEFAULT_ROUTING_ALGORITHM);
        routingSvc = svcProvider.getService(svcProvider.getDefaultConfiguration(),
                messagingCollector.getMessagingProvider(),
                messagingCollector.getMessageReceiver(),
                algoProvider, algoProvider.getDefaultConfiguration(), selfID);
        // アルゴリズム作成。
        RoutingAlgorithmConfiguration algoConfig = algoProvider.getDefaultConfiguration();
        algoProvider.initializeAlgorithmInstance(algoConfig, routingSvc);
    } catch (Exception e1) {
        e1.printStackTrace();
    }
}

0.8.6出てる

2008-05-21 08:58:01 | Overlay Weaver
最新バージョン0.8.6が出てる。
0.8.3 からの比較で言うと、メモリ消費量の削減が大きな変更みたい。
0.8.5 で通信メッセージのサイズ削減が行われた。具体的にはシリアライズ時に gzip で圧縮するようになった。
Message.encode(Message msg) で、Message.GZIP_MESSAGE が true の場合、
    if (GZIP_MESSAGE) {
        os = new GZIPOutputStream(os);
    }

このようにしてシリアライズを圧縮してる。正直、どの程度縮むのか多少疑問だけど
テキストベースでキロバイト単位のメッセージならかなり効果的だとは思うので
うれしい修正だ。
Message.GZIP_MESSAGE が static final なのでメッセージ毎に圧縮非圧縮を選ぶ
ことはできない。さらに gzip 圧縮かどうかのフラグをメッセージのバイト列に
持たないので、ノード同士は Message.GZIP_MESSAGE が同じ値のものでしか
通信できない。このへんはちょっとだけ気をつけるべきかも。