goo blog サービス終了のお知らせ 

P2Pとかプログラミング全般とか

P2Pアプリ開発を目指していこうかと。
基本、週末更新なので遅々として進まず。

「情報共有(P2P)研究会」おつかれさまでした

2008-02-29 20:58:06 | P2P
行ってきた。
細かい感想は後日。全体的には、P2P入門からトラフィック量問題まで幅広い内容で楽しめた。
もっとディープに実装レベルの解説とかするのかとドキドキしてたがそんなことはなく、
P2P入門編なお話も多かったのでP2Pにあまり詳しく無い人でもわかりやすかった
と思う。もちろん実装レベルの話もあったので、そのあたりはプログラミングやって
ないとチンプンカンプンだったと思う。
ちなみにかなりボケナスな質問してたのが私。あはは…。

参加者はざっと40人ほどだったと思う。申し込み自体は87人あったそうだ。さすがに80人は来てなかったと思う。
女性もちらほら見受けられた。オスばっかりだと思ってたのでちょっと吃驚。
質問してた人は NEC とか NTT グループとかドワンゴ(!)とかそういう人たち。なんかいろんな人が来てるんだな。

懇親会は当日申し込みでも参加できたんだけど、よくよく考えたら何かしゃべれるほどの
知識もないのに参加したら、部屋の隅でウーロン茶ちびちび飲んでるだけになるかと思って
参加しなかった。

ところで、誰も JXTA について言及してなかった。たまには思い出してあげてくださいね。JXTA だって構造化オーバーレイの実装なんだから。

あ、あと首藤さんが延々とキーボード打ってたのが印象的。すごい量だったけど、何を打ってたんだろう?

DHTインスタンスを作る (ソースは分割して掲載 その2)

2008-02-23 23:01:53 | P2P
        // 初期接続アドレス
        String contactHostAndPort = null;
        int contactPort = -1;
        String contactString = null;
        if (cmd.getArgs().length >= 1) {
            contactHostAndPort = cmd.getArgs()[0];
            join = true;

            if (args.length >= 2)
                contactPort = Integer.parseInt(args[1]);
        }

        // initialize a DHT
        DHTConfiguration config = DHTFactory.getDefaultConfiguration();
        if (transport != null) config.setMessagingTransport(transport);
        if (algorithm != null) config.setRoutingAlgorithm(algorithm);
        if (routingStyle != null) config.setRoutingStyle(routingStyle);
        if (fWorkDir != null) config.setWorkingDirectory(fWorkDir.getPath());
        if (selfAddressAndPort != null) {
            MessagingUtility.HostAndPort hostAndPort =
                MessagingUtility.parseHostnameAndPort(selfAddressAndPort, config.getSelfPort());

            config.setSelfAddress(hostAndPort.getHostName());
            config.setSelfPort(hostAndPort.getPort());
        }
        if (noUPnP) config.setDoUPnPNATTraversal(false);

        DHT<Serializable> dht = DHTFactory.getDHT((short)0, (short)0, config, selfID);  // throws Exception

        StringBuilder sb = new StringBuilder();
        sb.append("DHT configuration:\n");
        sb.append("  hostname:port:     ").append(dht.getSelfIDAddressPair().getAddress()).append('\n');
        sb.append("  transport type:    ").append(config.getMessagingTransport()).append('\n');
        sb.append("  routing algorithm: ").append(config.getRoutingAlgorithm()).append('\n');
        sb.append("  routing style:     ").append(config.getRoutingStyle()).append('\n');
        sb.append("  directory type:    ").append(config.getDirectoryType()).append('\n');
        sb.append("  working directory: ").append(config.getWorkingDirectory()).append('\n');
        System.out.print(sb);

        try {
            if (statCollectorAddressAndPort != null) {
                MessagingUtility.HostAndPort hostAndPort =
                    MessagingUtility.parseHostnameAndPort(statCollectorAddressAndPort, config.getSelfPort());

                dht.setStatCollectorAddress(hostAndPort.getHostName(), hostAndPort.getPort());
            }

            if (join) {
                if (contactPort >= 0) {
                    dht.joinOverlay(contactHostAndPort, contactPort);
                    contactString = contactHostAndPort + " : " + contactPort;
                }
                else {
                    try {
                        dht.joinOverlay(contactHostAndPort);
                        contactString = contactHostAndPort;
                    }
                    catch (IllegalArgumentException e) {    // port is not specified
                        contactPort = config.getContactPort();
                        dht.joinOverlay(contactHostAndPort, contactPort);
                        contactString = contactHostAndPort + ":" + contactPort;
                    }
                }
            }
        }
        catch (UnknownHostException e) {
            System.err.println("A hostname could not be resolved: " + contactHostAndPort);
            e.printStackTrace();
            System.exit(1);
        }

        if (join) {
            System.out.println("  initial contact:   " + contactString);
        }

        System.out.println("A DHT started.");
        System.out.flush();

        return dht;
    }

}

だぁ、文字数制限があるとめんどくせぇ!
どっか別のブログに移動しようかしらん。

来週は put データ、送信メッセージを決めることにしよう。では。

DHTインスタンスを作る (ソースは分割して掲載 その1)

2008-02-23 22:58:06 | P2P
まず DHT を作るところを。
一応、シンプルに作ってみた(というか、ow.tool.dhtshell を切り張りしたというか…)ので、使いまわせるかと。
アルゴリズムは TCP, Koorde, Recursive で固定。
Koorde は Successor List が少なくてすむ(その代わりホップ数が多い)し、Chord ベースなので churn 耐性が高めというところを買って。
Recursive なのは、Iterative だと何度も接続を繰り返しそうだから。
基本的に接続数は少なくするのが良いっていう貧乏性なのでこういう選択に。
Successor List が大きいとそれだけ色々なノードに接続しに行く可能性が高まり、
切断と接続が繰り返されるんじゃないかな~って心配がある。
アルゴリズムは後々変更するかも。
public class Main implements EmulatorControllable {

    public Writer invoke(int id, String[] args, PrintStream out)
            throws Throwable {
        // TODO 自動生成されたメソッド・スタブ
        // Emulator で起動するとこのメソッドが呼ばれる。つまり、main メソッドは決して呼ばれない。
        return null;
    }

    /**
     * @param args コマンドライン引数
     */
    public static void main(String[] args) {
        new Main().start(args);
    }
    private void start(String[] args) {

    }

    private DHT<Serializable> initialize(String[] args) throws Exception {
        CommandLine cmd = null;
        try {
            Options opts = new Options();
            opts.addOption("d", "directory", true, "working directory");
            opts.addOption("i", "id", true, "self ID");
            opts.addOption("s", "selfaddress", true, "self IP address");
            opts.addOption("t", "interactive", false, "change to interactive mode");
            opts.addOption("ul", "useLocal", false, "use Local Address(not use Global Address)");
            cmd = new PosixParser().parse(opts, args);
        }
        catch (ParseException e) {
            System.out.println("There is an invalid option.");
            e.printStackTrace();
            System.exit(1);
        }

        String transport = "TCP";
        String algorithm = "Koorde";
        String routingStyle = "Recursive";
        File fWorkDir = null;
        ID selfID = null;
        String statCollectorAddressAndPort = null;
        String selfAddressAndPort = null;
        boolean noUPnP = true;

        boolean join = false;

        String s = cmd.getOptionValue('d');
        if (null != s) {
            fWorkDir = new File(s);
            if (fWorkDir.exists() && fWorkDir.isDirectory()) {
            }else{
                fWorkDir = null;
            }
        }
        if (null == fWorkDir) {
            fWorkDir = new File(".");
        }
        s = cmd.getOptionValue('i');
        if (null != s) {
            selfID = ID.getID(s, s.length() / 2);
        }

        // 待ち受けポート
        selfAddressAndPort = "37845";
        final boolean useLocalAddress = cmd.hasOption("ul");
        if (useLocalAddress) {
            selfAddressAndPort = InetAddress.getLocalHost().getHostAddress() + selfAddressAndPort;
        } else {
            // 自ノードアドレスを得る。
            String[] ipGetPage = {
                "http://www.goo.ne.jp/index.html",
                "http://www.cybersyndrome.net/evc.html",
                "http://www.cman.jp/network/support/go_access.cgi",
            };
            byte[] buf = new byte[1024];
            Pattern ptnIP = Pattern.compile("((\\d{1,3}\\.){3}\\d{1,3})");
            Proxy proxy = Proxy.NO_PROXY;
            for (String host : ipGetPage) {
                try {
                    URL u = new URI(host).toURL();
                    URLConnection uc = u.openConnection(proxy);
                    uc.setReadTimeout(3);
                    InputStream in = uc.getInputStream();
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    while (true) {
                        int n = in.read(buf);
                        if (n <= 0) {
                            break;
                        }
                        baos.write(buf, 0, n);
                    }
                    in.close();
                    s = new String(baos.toByteArray(), "ASCII");
                    Matcher m = ptnIP.matcher(s);
                    if (m.matches()) {
                        selfAddressAndPort = m.group(1) + ":" + selfAddressAndPort;
                        break;
                    }
                } catch (Exception e) {
                    // TODO エラー
                }
            }
        }


…まだソースは続く…。

実装する機能

2008-02-20 23:05:44 | P2P
P2Pニコ動キャッシュ共有プログラムに必要な実装機能を書いておこう。

一定時間(1時間?)毎に所持している flv ファイルの名称と自ノード情報を put する。

頻繁な離脱(churn)への耐性をつけるため、一定時間毎に put する。
put の key は sm???? をハッシュ値にしたもの、value は自ノードの IDAddressPair かな。
登録するのはエコノミー以外の flv、かつ完全ファイルのみ、かな。

目的の flv ファイルの情報を get し、要求先ノードを調べる。

要求先ノードに flv ファイルの名称と取得範囲を伝える。

これは put ではなくメッセージ送信になる。できれば put と同じように DHT のルーティングで届けるほうがいいかも。
直接送信するとそのノードに接続することになるけど、できれば接続するノードは Successor List にあるノードだけにしたほうが put するときと相性が良さそうだから。
要求は最大でも 64kb 単位で行う。flv は通常 数 mb あるから、それを一気に送ることは無理。細切れで取得する。

要求メッセージを受け取ったら、対象 flv ファイルの情報をメッセージにつめて要求元ノードに送信する。

これは DHT ルーティングせず直接送る予定。要求そのものは大きくても数 kb のメッセージだが、実際のデータは細切れにしても 64 kb 以上。これをルーティングしたらネットワーク帯域を食いすぎる。


だいたいこんな機能を実装予定。
よく見直したら、これって簡易的な P2P ファイル交換機能…。
…ま、flv 限定ってことで。

メッセージを受け取るには

2008-02-20 22:45:18 | Overlay Weaver
忘れてた。メッセージを受け取る方法を書いておかねば。
ow.messaging.MessageHandler インターフェイスを実装したクラスをメッセージの
タグ番号と共に登録することで、メッセージ受信時にコールバックされる。
ow.dht.DHT のインスタンスを dht、MessageHandler のインスタンスを handler とすると、
 dht.getRoutingService().addMessageHandler(tag, handler);
で登録できる。もちろん tag は 0 以上 255 以下だ。

実際には、Map<Integer, List<MessageHandler>> 型の ow.routing.impl.AbstractRoutingDriver.handlerTable 変数に
格納される。つまり1つのタグ値に対して複数の MessageHandler を登録できる。
ただ、ここの実装がちょっと微妙。登録された MessageHandler は、
ow.routing.impl.AbstractRoutingDriver.RoutingDrvMessageHandler.process
メソッドで振り分け処理がなされて呼び出されるんだけど、複数の MessageHandler
がある場合、最後に登録されたハンドラの process メソッドの戻り値を
メッセージの送信元に返すことになる。これだと、最初に登録されたハンドラの
戻り値を返したい、後から登録したハンドラはイベントドリブンのトリガに使いたい
だけ、って時に困る。最初の戻り値を潰す事になるからだ。
たとえば dht.put されたこと(つまり自ノードに put データが来たこと)を
知りたくて ow.messaging.Tag.PUT.getNumber() に addMessageHandler した場合、
本来のハンドラ ow.dht.impl.StandardDHTImpl.PutMessageHandler.process は
DHTMessageFactory.getDHTReplyMessage() の戻り値を put の返答として返すん
だけど、後から登録したハンドラの戻り値に潰されてしまう。
なので、できれば MessageHandler.process の戻り値が null の場合は前のハンドラ
の戻り値を使用する(潰さない)ようにしてほしいなぁ、と。
その旨を Sourceforge の Feature Requests にこっそり匿名で書いといたんだけど、
返事なし。仕方ないので、メッセージ到着を検知したい場合は自分でソースを書き換えよう。

メッセージを送るには その6 TCPの受信でスレッドいっぱい

2008-02-17 18:29:16 | Overlay Weaver
TCP のコネクションプールで補足。プールする接続数は ow.messaging.tcp.TCPMessagingConfiguration
で決まるんだけど、その数は動的に変更可能。TCPMessagingConfiguration.setConnectionPoolSize(int size)
で設定する。

で、受信なんだけど ow.messaging.tcp.TCPMessageReceiver で行ってる。具体的には
start() メソッドでスレッドを起動し、run() メソッドで接続の accept を待つ。
accept した接続はどうするかというと、内部クラスの TCPMessageHandler クラスの
インスタンスを生成、そのインスタンスに渡してそれをスレッド起動(!)する。
ということは接続されればされるほどスレッドがどんどん増えると…。
これは看過できない。しかもこちらから積極的に接続を閉じることはしない。
うーん…。

データの受信、これもちょっと…。具体的には ow.messaging.Message.decode(ByteChannel in)
にソケットチャンネルを渡して終わり。そのメソッド内では必要分の情報を受信する
まで channel からデータを読み出す。ブロッキングソケットだから、データが受信
できるまでそのスレッドは止まり続けるわけだ。だからこその別スレッド起動なんだけど。

まあ実装は正しいんだけど、スレッドが大量に起動する可能性があるのはいただけないなぁ。
要はメッセージ境界がハッキリしないからブロッキングソケットで必要分が届くまで
受信し続けるって実装なんじゃないかな、想像だけど。
だったら送信する際にバイト配列にしたメッセージ情報のバイト数を相手に送信して、
メッセージ境界をはっきりさせればいいのでは?1スレッドで複数の受信ソケットを
select なりで扱い、データを溜め込む。メッセージ境界まで読み込んだら decode
する。これでいいと思うのだが。

とまあ6回もメッセージ送受信で書いた結論としては、標準実装の TCP で頑張ってみようということ。
次からは少しずつ実装を行っていけたらいいな。では次の土曜日にまた。

メッセージを送るには その5 標準実装のTCP

2008-02-16 23:27:27 | Overlay Weaver
OW 標準実装の TCP はちょっと不思議な感じ。まあ調べればちゃんと理由はあるんだけど。
トランスポート層として TCP を使用し、1つのノードと送受信する場合を考える。
こちらからそのノードへ送信しようとすると、コネクションが張られる。これは ow.messaging.tcp.ConnectionPool でプールされるので再利用可能。
もしプール数が ow.messaging.tcp.TCPMessagingConfiguration.getConnectionPoolSize()
を超えると、ランダムでプールされている中から切断するコネクションが選ばれる。
ランダムってのがちょっと悩むところ。使用しなくなって一番時間が経過した
コネクションの方がよさそうなんだけどな。まあそれは大きな問題ではなかろう。
では受信。あるノードに送信したとする。そうするとそのノードへのコネクション
はすでにプールされている。その状態で相手ノードがメッセージを送りたいとする。
すでに張られているコネクションを利用するのか?

利用しないんだな。

実は OW の TCP 実装は一方通行。全二重(もう古い言い方なんだろうか。私は
世代的には半二重を知ってる世代なもんで)のはずなのになんともったいない。
と貧乏根性を最大発揮してもしかたない。なぜこうなっているのか?
たぶん接続してきた相手が、いったい誰なのかわからないからだろう。
2つのノードが同一の IPv4 アドレスを持つこと、つまりあるマシンが2つの
P2P を起動することは十分に可能だ(ポート変えりゃ簡単だ)。そうなると、
接続してきた元の IPv4 アドレスからだけでは相手を判断できない。
もちろん接続してきたそのコネクションのポート番号を調べても無駄だ。
そのポート番号はランダムに割り当てられたものなのだから。
そうなると、接続された側は相手が誰だかわからない、だからそのコネクション
へは送信しない(できない)わけだ。たとえメッセージをそこから受信し、
そのメッセージの src を取り出しても相手のアドレスとは限らない。
延々とルーティングでたらいまわしにされたメッセージかもしれないからだ(
以前、メッセージの src を詐称できちゃうんじゃ、って書いてたのは間違いで、
実際には送信元を示すために必要なのだ。メッセージは直接相手に届く場合だけじゃ
なく、転々と転送されて届けられることもあるから、送信元情報の格納が必須)。

送信は Encode された byte 列を書き込むだけ。ここはぜひとも channel で並列処理
したいところだ。で、受信なんだけどここはちょっと気にしておきたいってことで
明日に続く。たぶん明日だろう、きっと。

メッセージを送るには その4

2008-02-13 22:18:17 | Overlay Weaver
Message の tag が byte 型の範囲内って書いたんだけど、ちょっと違ったので
補足。
byte 型だと -128~127 だけど、tag は 0~255 でした。Message.decode の実装
を見ればわかるけど、
 int tag = buf.get() & 0xff;
ってことで、0~255。
初めて OW でアプリ作るとき、意外にここにはまるんだよね。tag が int だから
適当に 65536 とか指定しちゃうと、ぜんぜんメッセージとどかないんだよね。

ちなみに OW の内部で使用しているタグは、
 ow.messaging.Tag
に public static final で定義されてるので参照しよう。
60個以上あるので、だいたい 0~70 あたりは tag の値として使用できない
ってことになる。自前アプリのメッセージタグは 255 から使うのが吉。

今日は平日なのでこんなところ。次回は TCP の実装の気になるところと
システムメッセージをフックしようとしても現在の実装のままだと上手く
いかないよ、ってところを書くと思うので以下続く。

メッセージを送るには その3

2008-02-11 20:56:31 | Overlay Weaver
OW はトランスポート層を取り替えることができる。
トランスポート層の実装は ow.messaging.MessageSender とか
ow.messaging.MessageReceiver を実装してるので、これらを JavaDoc で見れば
どういう通信手段があるかわかる。
現在のところ、TCP, UDP, シミュレーション, 分散シミュレーションの4種がある。

シミュレーション用実装は、メッセージを送る場合、Sender が Receiver に
直接データを渡しちゃう実装。なのでパケットロスとかはありえない。
分散シミュレーションは、自分に送る場合はシミュレーションと同様に直接
渡しちゃうが、他ノードへは UDP もしくは TCP で送るらしい。なんか入り組んでて
完全には判らなかった。というか、ポート番号変えて自ノードで複数 OW を起動
すりゃいいんじゃ…。

実際にインターネットで運用する気なら、最終的には TCP もしくは UDP となる
わけで、あとはどう使い分けるか。
と以前は思ってたのだが、どうにも UDP じゃ無理っぽいと思い始めている。
UDP でやるには結局のところ、パケットロスの補償が要るわけだが、だったら
TCP でやりゃいいじゃん、車輪再発明し過ぎという意見がある。
http://www.kt.rim.or.jp/~ksk/sock-faq/indexj.html
ここの5章 UDP の項目を見れば UDP でっていう気力が萎えてくる。
(ちなみに同一HPの http://www.kt.rim.or.jp/~ksk/wskfaq-ja/index.html
もオススメ。このHPの FAQ は、TCP/IP を扱う技術者は必読だと思う)
さらに UDP は、どのバイト数のパケットなら相手に届くかわからない。経路の途中
で断片化されるとパケットロスの危険が高まる。TCP なら Path MTU Discovery で
できる限りの効率で通信ができる。
http://www.microsoft.com/japan/technet/community/columns/cableguy/cg0704.mspx
http://wakita.no-ip.com/server/ProblemMTU.html

実際、Infoseek の無料 HP に申し込んで Perl スクリプトを動かし、自宅に向けて
パケットを送ってみたところ、1426byte のパケットなら届いたが 1427byte は
届かなかった。自宅は Flet's ADSL なので、MTU 1454byte が効いているものと
思われる。IP パケットのヘッダ 20byte と UDP のヘッダ 8byte のあわせて
28byte。1454-28=1426byte となってるわけだ。
じゃあ全員が全員このバイト数で良いかというと、そうもいかないだろう。経路
の途中でもっと小さい UDP パケットしか受け付けない経路があるかもしれない。
どこでも質の高い経路、ってわけじゃないし。
そう考えると、UDP でやる利点ってパケット単位で受け取れる(ストリーム志向
じゃない)ってのが最大の利点なのかなぁ、と。

OW では1メッセージを 1 UDP パケットで送るのが標準実装。ってことは、ちょっと
でもデカい情報は送れないってことになる。
とまあこれで UDP じゃダメだなぁってこと、決定。
小さい情報しか送らないとしても、DHT で get しようとしたら同一キーに複数の情報が存在しましたってなったらそれで 1426 byte 超えちゃうこともありうる。

しかし同一 LAN 内に限るのであれば、たぶん問題なく使えるんだろうな。以前、
仕事で UDP でメッセージをやり取りしたんだけど、デカいパケットでも同一 LAN 内
ならぜんぜん問題なかったしね。

UDPについてはこんなところか。TCPについては以下続く。たぶん今週末あたりになるけど。