Hadoopのソースを読もう

大分前に,ストップしていたソースを別なところから読み始めることにする.HDFSを担うDataNode.javaのmain()から.


try {
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
DataNode datanode = createDataNode(args, null);
if (datanode != null)
datanode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
で,createDataNode(...)が呼び出されますね.

static DataNode createDataNode(String args[],
Configuration conf) throws IOException {
if (conf == null)
conf = new Configuration();
if (!parseArguments(args, conf)) {
printUsage();
return null;
}
return run(conf);
}
ということで,run(conf)です.省略するけど,この中で,スレッド化して,Thread.startです.で,

// start dataXceiveServer
dataXceiveServer.start();
が呼び出されます.これが,まぁレシーバになるわけです.あれですね,accept()を発行して,受け付けたら,スレッド立てるわけですね.

this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
で,dataXceiveServerは,初期化時に作られています.Daemonは,Threadのラッパーですね.ということで,

public void run() {
try {
while (shouldRun) {
Socket s = ss.accept();
//s.setSoTimeout(READ_TIMEOUT);
xceiverCount.incr();
new Daemon(new DataXceiver(s)).start();
}
ss.close();
} catch (IOException ie) {
LOG.info("Exiting DataXceiveServer due to " + ie.toString());
}
}
接続があれば,新しいスレッド作成して,クライアントに対応させます.まぁだから,データブロックを要求してきたやつにスレッドを作るわけですね.new Daemon(new DataXceiver(s)).start();ってのが重要です.これから先が,プロトコル処理になります.