Hadoopのソースを読もう
で,JobInProgressのコンストラクタを見てみるとなんだか色々やっているので,new JobInProgress()でgrepしてみると,JobTracker::submitJobのなかにだけありました.
JobInProgress job = new JobInProgress(jobFile, this, this.conf);
登録したときに,JobInProgressを作ってるんですね.それ以外には,JobInProgressのメンバ変数に適宜設定しているだけみたい.
緩和及第.ちょっともう一回最初から追いかけてみたところ,結局,JobTracker::submitJob(String jobFile)でjobのスタートになるみたい.元々,submitJob自体はJobSubmissionProtocolインターフェースで宣言されていて,かつ,JobTrackerはそれを継承しています.なので実態は,JobTracker::submitJob.でここで勘違いしていたのがjobIdがstringであること.間違えて,
public RunningJob submitJob(JobConf job) throws FileNotFoundExceptionのほうをよんでしまったわけですね.
で,
public synchronized JobStatus submitJob(String jobFile) throws IOException {
totalSubmissions++;
JobInProgress job = new JobInProgress(jobFile, this, this.conf);
synchronized (jobs) {
synchronized (jobsByPriority) {
synchronized (jobInitQueue) {
jobs.put(job.getProfile().getJobId(), job);
jobsByPriority.add(job);
jobInitQueue.add(job);
resortPriority();
jobInitQueue.notifyAll();
}
}
}
myMetrics.submitJob();
return job.getStatus();
}
この関数の上のコメントには,この関数で,実際に仕事がスタートするよってなことが書いてあります.何気にjavaは久しぶりなので,synchronized修飾子忘れた.いわゆるmutex lockをやってくれるわけですね.よくわからないけど,totalSubmissions++は大丈夫なんだろうか...アトミックなのかな++演算子って.まぁいいや,細かいことを気にしていると先に進まない.jobs.put(..)で,マップにjobid,jobを登録してますね.これで,後で状況見るときに使えるんですね.
jobsByPriority.add(job);
jobInitQueue.add(job);
resortPriority();
jobInitQueue.notifyAll();
この辺で多分,mapやreduceをどこのノードでやらせるのかといったことをやるんですね.後で追いかけよう.
myMetrics.submitJob();はJobTrackerのコンストラクタにおいて,
myMetrics = new JobTrackerMetrics(jobConf);
と入れられてますね.で,その中では,
synchronized void submitJob() {
++numJobsSubmitted;
}
拍子抜け.やっぱり,jobInitQueue.add(job)とnotifyAll()が本命みたいだ.