DHT に put する場合、実際に DHT のネットワークにちゃんと参加できているのか、
それともまだ安定していない、もしくは参加できていないのかが重要になる。
標準ではそのような情報が得られないようなので、別の方法を考えてみた。
他のノードから REQ_SUCCESSOR, REQ_PREDECESSOR がくるはずなので、それを
検知して判断することを考える。まずはそのメッセージをどう受信するかだ。
以前書いたとおり、これらの Tag の値で MessageHandler を登録してしまうと
メッセージに対して null を返すほか無くなる(実際には応答メッセージを
返さなければならない)ので、なんとかして実際の MessageHandler より前に
ハンドラを登録する必要がある。
ここで JavaDoc やらドキュメントやらを眺めてみると ow.stat がある。
これは統計用にメッセージをフックしつつも実際のハンドラに影響を与えていない、
つまりは実際のハンドラよりも前に処理用ハンドラを登録することに成功している。
ただし ow.stat そのものは使えない。MessagingReporter はリポート用に
外部に接続を作成してしまう。そこまでする必要はない。なのでどうやって
ハンドラを登録しているのか調べてみた。
調査をしてみると、要するに Tag の値によってハンドラを呼び分ける MessageHandler
を登録するよりも前にフック用ハンドラを登録することでこれを実現していることがわかった。
Tag 値呼び分けハンドラは ow.routing.impl.AbstractRoutingDriver のコンストラクタ
で登録される。このクラスは IterativeRoutingDriver, RecursiveRoutingDriver
の親クラスのため、ルーティング方法が決定するより前に MessageReceiver を
作成できれば OK ということになる。
ow.stat ではどうやっているかというと、StatFactory.getMessagingCollector
で MessagingCollector を作成し、メンバメソッドの getMessagingProvider,
getMessageReceiver で作成している。これらで作成した時点ではルーティング用の
ドライバが作成されないので、フック用の MessageHandler が登録し放題となる。
と、これで問題なければよかったのだがそうは上手くいかなかった。テスト用に
owemu で実行したところ、NullPointerException 発生。調査したところ、
MessageReceiver に登録されているハンドラがフック用のもの1個だけになってる。
どうなってる?Tag 値呼び分けハンドラが登録されるはずでは?
さらに調べると、エミュレータ実行時の MessagingProvider である EmuMessagingProvider
の substitute() メソッドが新しいインスタンスを返している!
MessagingProvider は MessagingFactory.getProvider で作成される。普通なら
同一 Provide が返されるのだが、実装の中で substitute() を呼び出し、
null でなければ既に作成されているインスタンスの代わりにこれを返す。
これは要するにエミュレータ実行時に、仮想ノード毎に異なる Provide インスタンス
を返すための機能なのだと思う。
MessageReceiver は Provider によって生成されるが、通常はひとつ作れば
何度取得しようが同一インスタンスを返す。異なる Provide インスタンスになれば
当然異なる MessageReceiver を返す。
ow.stat に似せてフック用ハンドラを登録したとしても、DHT インスタンス
作成時に MessagingFactory.getProvider を呼ばれてしまうと、異なる Provider が
返され、フックが登録されていない MessageReceiver が作成され、そちらに
Tag 値呼び分けハンドラが登録されるわけだ。
で、どちらの MessageReceiver にメッセージが届けられるかといえば、フック用
ハンドラを登録した最初のインスタンス。こちらが emu0 のアドレスを振られるから。
で、こちらには Tag 値呼び分けハンドラがない→正常な要求メッセージに
null 値を返すしかなくなる→NullPointerException となるわけだ。
これはエミュレータでしかおきない。なぜならエミュレータ以外の TCP, UDP 用の
MessagingProvider では substitute() が同一インスタンスを返すため。
同一 Provider だから、最初にフック用ハンドラを登録した MessageReceiver を、
Tag 値呼び分けハンドラ登録時にも Provider が返してくれる。
ということで、エミュレータ実行時は ow.stat は使えないわけだね。
で、ここであきらめてはプライドが許さない。これでもプログラマの端くれ。矜持は
あるつもり。
要は DHT 作成時に MessagingFactory.getProvider を呼ばず、こちらが用意した
Provider を受け入れてくれればいいはず。しかしどこをさがしても MessagingProvider
を受けてくれない。で、StandardDHTImpl をゴニョゴニョ探したところ、
コンストラクタが2つあることに気づく。DHTConfiguration を必要とするものと、
RoutingService を必要とするもの。比較してみると、前者は要するに RoutingService
を作成することが最終目的になっているようだ。後者のコンストラクタを
使用すれば、MessagingFactory.getProvider を呼ばずに DHT が構築できる。
つまりエミュレータ環境でも受信メッセージフックができる。
で、なんとか RoutingService を作成して DHT を作ってみると…。やっぱり NullPointerException。
なんでじゃと思ってさらに調べると、RoutingAlgorithm が null になってた。
StandardDHTImpl のコンストラクタを良く調べてみると、RoutingAlgorithmProvider
の initializeAlgorithmInstance を呼ばないと AbstractRoutingDriver に
アルゴリズムが登録されないらしい(AbstractRoutingAlgorithm のコンストラクタと、
AbstractRoutingDriver.setRoutingAlgorithm を調べてみるべし。特に後者を
使用しているところを Eclipse で検索すれば、他で登録していないことがわかる)。
で、最終的にはちゃんとフックできましたとさ。お疲れ様。
もうこういう面倒な調査しないでもフック(というか peep?)できるように
して欲しいよ、ホント…。
// メッセージフック設定。
RoutingService routingSvc = null;
if (cmd.hasOption("sam")) {
StatConfiguration statConf = StatFactory.getDefaultConfiguration();
DHTConfiguration dhtConf = DHTFactory.getDefaultConfiguration();
statConf.setMessagingTransport(null != transport ? transport
: dhtConf.getMessagingTransport());
if (null != selfAddressAndPort) {
MessagingUtility.HostAndPort hostAndPort = MessagingUtility
.parseHostnameAndPort(selfAddressAndPort, dhtConf.getSelfPort());
statConf.setSelfAddress(hostAndPort.getHostName());
statConf.setSelfPort(hostAndPort.getPort());
}
statConf.setDoUPnPNATTraversal(false);
try {
MessagingCollector messagingCollector = StatFactory
.getMessagingCollector(statConf);
messagingCollector.getMessageReceiver().addHandler(
new MessageHandler() {
public Message process(Message msg)
throws Exception {
showReceivedMessage(msg);
return null;
}
});
// RoutingService 作成。
RoutingServiceProvider svcProvider = RoutingServiceFactory.getProvider(
null != routingStyle ? routingStyle : DHTConfiguration.DEFAULT_ROUTING_STYLE);
RoutingAlgorithmProvider algoProvider = RoutingAlgorithmFactory.getProvider(
null != algorithm ? algorithm : DHTConfiguration.DEFAULT_ROUTING_ALGORITHM);
routingSvc = svcProvider.getService(svcProvider.getDefaultConfiguration(),
messagingCollector.getMessagingProvider(),
messagingCollector.getMessageReceiver(),
algoProvider, algoProvider.getDefaultConfiguration(), selfID);
// アルゴリズム作成。
RoutingAlgorithmConfiguration algoConfig = algoProvider.getDefaultConfiguration();
algoProvider.initializeAlgorithmInstance(algoConfig, routingSvc);
} catch (Exception e1) {
e1.printStackTrace();
}
}