Hadoopのソースを読もう

で,JobInProgressのコンストラクタを見てみるとなんだか色々やっているので,new JobInProgress()でgrepしてみると,JobTracker::submitJobのなかにだけありました.

JobInProgress job = new JobInProgress(jobFile, this, this.conf);


登録したときに,JobInProgressを作ってるんですね.それ以外には,JobInProgressのメンバ変数に適宜設定しているだけみたい.



緩和及第.ちょっともう一回最初から追いかけてみたところ,結局,JobTracker::submitJob(String jobFile)でjobのスタートになるみたい.元々,submitJob自体はJobSubmissionProtocolインターフェースで宣言されていて,かつ,JobTrackerはそれを継承しています.なので実態は,JobTracker::submitJob.でここで勘違いしていたのがjobIdがstringであること.間違えて,

public RunningJob submitJob(JobConf job) throws FileNotFoundException
のほうをよんでしまったわけですね.



で,

public synchronized JobStatus submitJob(String jobFile) throws IOException {

totalSubmissions++;

JobInProgress job = new JobInProgress(jobFile, this, this.conf);

synchronized (jobs) {

synchronized (jobsByPriority) {

synchronized (jobInitQueue) {

jobs.put(job.getProfile().getJobId(), job);

jobsByPriority.add(job);

jobInitQueue.add(job);

resortPriority();

jobInitQueue.notifyAll();

}

}

}

myMetrics.submitJob();

return job.getStatus();

}


この関数の上のコメントには,この関数で,実際に仕事がスタートするよってなことが書いてあります.何気にjavaは久しぶりなので,synchronized修飾子忘れた.いわゆるmutex lockをやってくれるわけですね.よくわからないけど,totalSubmissions++は大丈夫なんだろうか...アトミックなのかな++演算子って.まぁいいや,細かいことを気にしていると先に進まない.jobs.put(..)で,マップにjobid,jobを登録してますね.これで,後で状況見るときに使えるんですね.

jobsByPriority.add(job);

jobInitQueue.add(job);

resortPriority();

jobInitQueue.notifyAll();


この辺で多分,mapやreduceをどこのノードでやらせるのかといったことをやるんですね.後で追いかけよう.

myMetrics.submitJob();はJobTrackerのコンストラクタにおいて,

myMetrics = new JobTrackerMetrics(jobConf);


と入れられてますね.で,その中では,

synchronized void submitJob() {

++numJobsSubmitted;

}


拍子抜け.やっぱり,jobInitQueue.add(job)とnotifyAll()が本命みたいだ.