This project has retired. For details please refer to its
Attic page.
BspService xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.bsp;
20
21 import org.apache.giraph.conf.GiraphConstants;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.graph.GraphTaskManager;
24 import org.apache.giraph.job.JobProgressTracker;
25 import org.apache.giraph.partition.GraphPartitionerFactory;
26 import org.apache.giraph.utils.CheckpointingUtils;
27 import org.apache.giraph.worker.WorkerInfo;
28 import org.apache.giraph.writable.kryo.GiraphClassResolver;
29 import org.apache.giraph.zk.BspEvent;
30 import org.apache.giraph.zk.PredicateLock;
31 import org.apache.giraph.zk.ZooKeeperExt;
32 import org.apache.giraph.zk.ZooKeeperManager;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.io.Writable;
35 import org.apache.hadoop.io.WritableComparable;
36 import org.apache.hadoop.mapreduce.Mapper;
37 import org.apache.log4j.Logger;
38 import org.apache.zookeeper.CreateMode;
39 import org.apache.zookeeper.KeeperException;
40 import org.apache.zookeeper.WatchedEvent;
41 import org.apache.zookeeper.Watcher;
42 import org.apache.zookeeper.Watcher.Event.EventType;
43 import org.apache.zookeeper.Watcher.Event.KeeperState;
44 import org.apache.zookeeper.ZooDefs.Ids;
45 import org.json.JSONException;
46 import org.json.JSONObject;
47
48 import java.io.IOException;
49 import java.net.UnknownHostException;
50 import java.nio.charset.Charset;
51 import java.util.ArrayList;
52 import java.util.Collections;
53 import java.util.List;
54
55 import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID;
56
57
58
59
60
61
62
63
64 @SuppressWarnings("rawtypes")
65 public abstract class BspService<I extends WritableComparable,
66 V extends Writable, E extends Writable>
67 implements Watcher, CentralizedService<I, V, E> {
68
69 public static final long UNSET_SUPERSTEP = Long.MIN_VALUE;
70
71 public static final long INPUT_SUPERSTEP = -1;
72
73 public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE;
74
75 public static final String BASE_DIR = "/_hadoopBsp";
76
77 public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
78
79
80 public static final String INPUT_SPLITS_WORKER_DONE_DIR =
81 "/_inputSplitsWorkerDoneDir";
82
83 public static final String INPUT_SPLITS_ALL_DONE_NODE =
84 "/_inputSplitsAllDone";
85
86 public static final String KRYO_REGISTERED_CLASS_DIR =
87 "/_kryo";
88
89 public static final String APPLICATION_ATTEMPTS_DIR =
90 "/_applicationAttemptsDir";
91
92 public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
93
94 public static final String SUPERSTEP_DIR = "/_superstepDir";
95
96 public static final String COUNTERS_DIR = "/_counters";
97
98 public static final String METRICS_DIR = "/_metrics";
99
100 public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
101
102 public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
103
104 public static final String WORKER_WROTE_CHECKPOINT_DIR =
105 "/_workerWroteCheckpointDir";
106
107 public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
108
109 public static final String PARTITION_EXCHANGE_DIR =
110 "/_partitionExchangeDir";
111
112 public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
113
114 public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
115
116 public static final String MEMORY_OBSERVER_DIR = "/_memoryObserver";
117
118 public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop";
119
120 public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
121
122 public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
123
124 public static final String JSONOBJ_NUM_MESSAGE_BYTES_KEY = "_numMsgBytesKey";
125
126 public static final String JSONOBJ_METRICS_KEY = "_metricsKey";
127
128
129 public static final String JSONOBJ_STATE_KEY = "_stateKey";
130
131 public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY =
132 "_applicationAttemptKey";
133
134 public static final String JSONOBJ_SUPERSTEP_KEY =
135 "_superstepKey";
136
137 public static final String WORKER_SUFFIX = "_worker";
138
139 public static final String MASTER_SUFFIX = "_master";
140
141
142 private static final Logger LOG = Logger.getLogger(BspService.class);
143
144 protected final String basePath;
145
146 protected final String masterJobStatePath;
147
148 protected final String inputSplitsWorkerDonePath;
149
150 protected final String inputSplitsAllDonePath;
151
152 protected final String applicationAttemptsPath;
153
154 protected final String cleanedUpPath;
155
156 protected final String checkpointBasePath;
157
158 protected final String savedCheckpointBasePath;
159
160 protected final String masterElectionPath;
161
162 protected final String haltComputationPath;
163
164 protected final String memoryObserverPath;
165
166 protected final String kryoRegisteredClassPath;
167
168 private final ZooKeeperExt zk;
169
170 private final BspEvent connectedEvent;
171
172 private final BspEvent workerHealthRegistrationChanged;
173
174 private final BspEvent applicationAttemptChanged;
175
176 private final BspEvent inputSplitsWorkerDoneEvent;
177
178 private final BspEvent inputSplitsAllDoneEvent;
179
180 private final BspEvent superstepFinished;
181
182 private final BspEvent masterElectionChildrenChanged;
183
184 private final BspEvent cleanedUpChildrenChanged;
185
186
187 private final BspEvent writtenCountersToZK;
188
189 private final List<BspEvent> registeredBspEvents =
190 new ArrayList<BspEvent>();
191
192 private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
193
194 private final Mapper<?, ?, ?, ?>.Context context;
195
196 private long cachedSuperstep = UNSET_SUPERSTEP;
197
198 private long restartedSuperstep = UNSET_SUPERSTEP;
199
200 private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
201
202 private final String jobId;
203
204 private final int taskId;
205
206 private final String hostname;
207
208 private final String hostnameTaskId;
209
210 private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
211
212 private final GraphTaskManager<I, V, E> graphTaskManager;
213
214 private final FileSystem fs;
215
216
217
218
219
220
221
222 public BspService(
223 Mapper<?, ?, ?, ?>.Context context,
224 GraphTaskManager<I, V, E> graphTaskManager) {
225 this.connectedEvent = new PredicateLock(context);
226 this.workerHealthRegistrationChanged = new PredicateLock(context);
227 this.applicationAttemptChanged = new PredicateLock(context);
228 this.inputSplitsWorkerDoneEvent = new PredicateLock(context);
229 this.inputSplitsAllDoneEvent = new PredicateLock(context);
230 this.superstepFinished = new PredicateLock(context);
231 this.masterElectionChildrenChanged = new PredicateLock(context);
232 this.cleanedUpChildrenChanged = new PredicateLock(context);
233 this.writtenCountersToZK = new PredicateLock(context);
234
235 registerBspEvent(connectedEvent);
236 registerBspEvent(workerHealthRegistrationChanged);
237 registerBspEvent(inputSplitsWorkerDoneEvent);
238 registerBspEvent(inputSplitsAllDoneEvent);
239 registerBspEvent(applicationAttemptChanged);
240 registerBspEvent(superstepFinished);
241 registerBspEvent(masterElectionChildrenChanged);
242 registerBspEvent(cleanedUpChildrenChanged);
243 registerBspEvent(writtenCountersToZK);
244
245 this.context = context;
246 this.graphTaskManager = graphTaskManager;
247 this.conf = graphTaskManager.getConf();
248
249 this.jobId = conf.getJobId();
250 this.restartedSuperstep = conf.getLong(
251 GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
252 try {
253 this.hostname = conf.getLocalHostname();
254 } catch (UnknownHostException e) {
255 throw new RuntimeException(e);
256 }
257 this.graphPartitionerFactory = conf.createGraphPartitioner();
258
259 basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
260 if (LOG.isInfoEnabled()) {
261 LOG.info(String.format("%s: %s",
262 GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP, basePath));
263 }
264 masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
265 inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR;
266 inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
267 applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
268 cleanedUpPath = basePath + CLEANED_UP_DIR;
269 kryoRegisteredClassPath = basePath + KRYO_REGISTERED_CLASS_DIR;
270
271
272 String restartJobId = RESTART_JOB_ID.get(conf);
273
274 savedCheckpointBasePath =
275 CheckpointingUtils.getCheckpointBasePath(getConfiguration(),
276 restartJobId == null ? getJobId() : restartJobId);
277
278 checkpointBasePath = CheckpointingUtils.
279 getCheckpointBasePath(getConfiguration(), getJobId());
280
281 masterElectionPath = basePath + MASTER_ELECTION_DIR;
282 String serverPortList = graphTaskManager.getZookeeperList();
283 haltComputationPath = basePath + HALT_COMPUTATION_NODE;
284 memoryObserverPath = basePath + MEMORY_OBSERVER_DIR;
285 getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
286 haltComputationPath);
287 if (LOG.isInfoEnabled()) {
288 LOG.info("BspService: Path to create to halt is " + haltComputationPath);
289 }
290 if (LOG.isInfoEnabled()) {
291 LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
292 ", partition " + conf.getTaskPartition() + " on " + serverPortList);
293 }
294 try {
295 this.zk = new ZooKeeperExt(serverPortList,
296 conf.getZooKeeperSessionTimeout(),
297 conf.getZookeeperOpsMaxAttempts(),
298 conf.getZookeeperOpsRetryWaitMsecs(),
299 this,
300 context);
301 connectedEvent.waitForTimeoutOrFail(
302 GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(conf));
303 this.fs = FileSystem.get(getConfiguration());
304 } catch (IOException e) {
305 throw new RuntimeException(e);
306 }
307
308 boolean disableGiraphResolver =
309 GiraphConstants.DISABLE_GIRAPH_CLASS_RESOLVER.get(conf);
310 if (!disableGiraphResolver) {
311 GiraphClassResolver.setZookeeperInfo(zk, kryoRegisteredClassPath);
312 }
313 this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
314 conf.getTaskPartition();
315 this.hostnameTaskId = hostname + "_" + getTaskId();
316
317
318 if (restartJobId != null &&
319 restartedSuperstep == UNSET_SUPERSTEP) {
320 try {
321 restartedSuperstep = getLastCheckpointedSuperstep();
322 } catch (IOException e) {
323 throw new RuntimeException(e);
324 }
325 }
326 this.cachedSuperstep = restartedSuperstep;
327 if ((restartedSuperstep != UNSET_SUPERSTEP) &&
328 (restartedSuperstep < 0)) {
329 throw new IllegalArgumentException(
330 "BspService: Invalid superstep to restart - " +
331 restartedSuperstep);
332 }
333 }
334
335
336
337
338
339
340
341 public static long getSuperstepFromPath(String path) {
342 int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR);
343 if (foundSuperstepStart == -1) {
344 throw new IllegalArgumentException(
345 "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR +
346 "from " + path);
347 }
348 foundSuperstepStart += SUPERSTEP_DIR.length() + 1;
349 int endIndex = foundSuperstepStart +
350 path.substring(foundSuperstepStart).indexOf("/");
351 if (endIndex == -1) {
352 throw new IllegalArgumentException(
353 "getSuperstepFromPath: Cannot find end of superstep from " +
354 path);
355 }
356 if (LOG.isTraceEnabled()) {
357 LOG.trace("getSuperstepFromPath: Got path=" + path +
358 ", start=" + foundSuperstepStart + ", end=" + endIndex);
359 }
360 return Long.parseLong(path.substring(foundSuperstepStart, endIndex));
361 }
362
363
364
365
366
367
368
369 public static String getHealthyHostnameIdFromPath(String path) {
370 int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR);
371 if (foundWorkerHealthyStart == -1) {
372 throw new IllegalArgumentException(
373 "getHealthyHostnameidFromPath: Couldn't find " +
374 WORKER_HEALTHY_DIR + " from " + path);
375 }
376 foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length();
377 return path.substring(foundWorkerHealthyStart);
378 }
379
380
381
382
383
384
385
386
387 public final String getSuperstepPath(long attempt) {
388 return applicationAttemptsPath + "/" + attempt + SUPERSTEP_DIR;
389 }
390
391
392
393
394
395
396
397
398
399 public final String getWorkerInfoHealthyPath(long attempt,
400 long superstep) {
401 return applicationAttemptsPath + "/" + attempt +
402 SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR;
403 }
404
405
406
407
408
409
410
411
412
413 public final String getWorkerInfoUnhealthyPath(long attempt,
414 long superstep) {
415 return applicationAttemptsPath + "/" + attempt +
416 SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR;
417 }
418
419
420
421
422
423
424
425
426
427 public final String getWorkerWroteCheckpointPath(long attempt,
428 long superstep) {
429 return applicationAttemptsPath + "/" + attempt +
430 SUPERSTEP_DIR + "/" + superstep + WORKER_WROTE_CHECKPOINT_DIR;
431 }
432
433
434
435
436
437
438
439
440
441 public final String getWorkerMetricsFinishedPath(
442 long attempt, long superstep) {
443 return applicationAttemptsPath + "/" + attempt +
444 SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR + METRICS_DIR;
445 }
446
447
448
449
450
451
452
453
454
455 public final String getWorkerCountersFinishedPath(
456 long attempt, long superstep) {
457 return applicationAttemptsPath + "/" + attempt +
458 SUPERSTEP_DIR + "/" + superstep +
459 WORKER_FINISHED_DIR + COUNTERS_DIR;
460 }
461
462
463
464
465
466
467
468
469 public final String getPartitionExchangePath(long attempt,
470 long superstep) {
471 return applicationAttemptsPath + "/" + attempt +
472 SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR;
473 }
474
475
476
477
478
479
480
481
482
483
484 public final String getPartitionExchangeWorkerPath(long attempt,
485 long superstep,
486 WorkerInfo workerInfo) {
487 return getPartitionExchangePath(attempt, superstep) +
488 "/" + workerInfo.getHostnameId();
489 }
490
491
492
493
494
495
496
497
498 public final String getSuperstepFinishedPath(long attempt, long superstep) {
499 return applicationAttemptsPath + "/" + attempt +
500 SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE;
501 }
502
503
504
505
506
507
508
509
510 public final String getCheckpointBasePath(long superstep) {
511 return checkpointBasePath + "/" + superstep;
512 }
513
514
515
516
517
518
519
520 public final String getSavedCheckpointBasePath(long superstep) {
521 return savedCheckpointBasePath + "/" + superstep;
522 }
523
524
525
526
527
528
529
530 public final ZooKeeperExt getZkExt() {
531 return zk;
532 }
533
534 @Override
535 public final long getRestartedSuperstep() {
536 return restartedSuperstep;
537 }
538
539
540
541
542
543
544 public final void setRestartedSuperstep(long superstep) {
545 if (superstep < INPUT_SUPERSTEP) {
546 throw new IllegalArgumentException(
547 "setRestartedSuperstep: Bad argument " + superstep);
548 }
549 restartedSuperstep = superstep;
550 }
551
552
553
554
555
556
557 public final FileSystem getFs() {
558 return fs;
559 }
560
561 public final ImmutableClassesGiraphConfiguration<I, V, E>
562 getConfiguration() {
563 return conf;
564 }
565
566 public final Mapper<?, ?, ?, ?>.Context getContext() {
567 return context;
568 }
569
570 public final String getHostname() {
571 return hostname;
572 }
573
574 public final String getHostnameTaskId() {
575 return hostnameTaskId;
576 }
577
578 public final int getTaskId() {
579 return taskId;
580 }
581
582 public final GraphTaskManager<I, V, E> getGraphTaskManager() {
583 return graphTaskManager;
584 }
585
586 public final BspEvent getWorkerHealthRegistrationChangedEvent() {
587 return workerHealthRegistrationChanged;
588 }
589
590 public final BspEvent getApplicationAttemptChangedEvent() {
591 return applicationAttemptChanged;
592 }
593
594 public final BspEvent getInputSplitsWorkerDoneEvent() {
595 return inputSplitsWorkerDoneEvent;
596 }
597
598 public final BspEvent getInputSplitsAllDoneEvent() {
599 return inputSplitsAllDoneEvent;
600 }
601
602 public final BspEvent getSuperstepFinishedEvent() {
603 return superstepFinished;
604 }
605
606
607 public final BspEvent getMasterElectionChildrenChangedEvent() {
608 return masterElectionChildrenChanged;
609 }
610
611 public final BspEvent getCleanedUpChildrenChangedEvent() {
612 return cleanedUpChildrenChanged;
613 }
614
615 public final BspEvent getWrittenCountersToZKEvent() {
616 return writtenCountersToZK;
617 }
618
619
620
621
622
623
624
625 public final JSONObject getJobState() {
626 try {
627 getZkExt().createExt(masterJobStatePath,
628 null,
629 Ids.OPEN_ACL_UNSAFE,
630 CreateMode.PERSISTENT,
631 true);
632 } catch (KeeperException.NodeExistsException e) {
633 LOG.info("getJobState: Job state already exists (" +
634 masterJobStatePath + ")");
635 } catch (KeeperException e) {
636 throw new IllegalStateException("Failed to create job state path " +
637 "due to KeeperException", e);
638 } catch (InterruptedException e) {
639 throw new IllegalStateException("Failed to create job state path " +
640 "due to InterruptedException", e);
641 }
642 String jobState = null;
643 try {
644 List<String> childList =
645 getZkExt().getChildrenExt(
646 masterJobStatePath, true, true, true);
647 if (childList.isEmpty()) {
648 return null;
649 }
650 jobState =
651 new String(getZkExt().getData(childList.get(childList.size() - 1),
652 true, null), Charset.defaultCharset());
653 } catch (KeeperException.NoNodeException e) {
654 LOG.info("getJobState: Job state path is empty! - " +
655 masterJobStatePath);
656 } catch (KeeperException e) {
657 throw new IllegalStateException("Failed to get job state path " +
658 "children due to KeeperException", e);
659 } catch (InterruptedException e) {
660 throw new IllegalStateException("Failed to get job state path " +
661 "children due to InterruptedException", e);
662 }
663 try {
664 return new JSONObject(jobState);
665 } catch (JSONException e) {
666 throw new RuntimeException(
667 "getJobState: Failed to parse job state " + jobState);
668 }
669 }
670
671
672
673
674
675
676 public final String getJobId() {
677 return jobId;
678 }
679
680
681
682
683
684
685 public final long getApplicationAttempt() {
686 if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) {
687 return cachedApplicationAttempt;
688 }
689 try {
690 getZkExt().createExt(applicationAttemptsPath,
691 null,
692 Ids.OPEN_ACL_UNSAFE,
693 CreateMode.PERSISTENT,
694 true);
695 } catch (KeeperException.NodeExistsException e) {
696 LOG.info("getApplicationAttempt: Node " +
697 applicationAttemptsPath + " already exists!");
698 } catch (KeeperException e) {
699 throw new IllegalStateException("Couldn't create application " +
700 "attempts path due to KeeperException", e);
701 } catch (InterruptedException e) {
702 throw new IllegalStateException("Couldn't create application " +
703 "attempts path due to InterruptedException", e);
704 }
705 try {
706 List<String> attemptList =
707 getZkExt().getChildrenExt(
708 applicationAttemptsPath, true, false, false);
709 if (attemptList.isEmpty()) {
710 cachedApplicationAttempt = 0;
711 } else {
712 cachedApplicationAttempt =
713 Long.parseLong(Collections.max(attemptList));
714 }
715 } catch (KeeperException e) {
716 throw new IllegalStateException("Couldn't get application " +
717 "attempts to KeeperException", e);
718 } catch (InterruptedException e) {
719 throw new IllegalStateException("Couldn't get application " +
720 "attempts to InterruptedException", e);
721 }
722
723 return cachedApplicationAttempt;
724 }
725
726
727
728
729
730
731 public final long getSuperstep() {
732 if (cachedSuperstep != UNSET_SUPERSTEP) {
733 return cachedSuperstep;
734 }
735 String superstepPath = getSuperstepPath(getApplicationAttempt());
736 try {
737 getZkExt().createExt(superstepPath,
738 null,
739 Ids.OPEN_ACL_UNSAFE,
740 CreateMode.PERSISTENT,
741 true);
742 } catch (KeeperException.NodeExistsException e) {
743 if (LOG.isInfoEnabled()) {
744 LOG.info("getApplicationAttempt: Node " +
745 applicationAttemptsPath + " already exists!");
746 }
747 } catch (KeeperException e) {
748 throw new IllegalStateException(
749 "getSuperstep: KeeperException", e);
750 } catch (InterruptedException e) {
751 throw new IllegalStateException(
752 "getSuperstep: InterruptedException", e);
753 }
754
755 List<String> superstepList;
756 try {
757 superstepList =
758 getZkExt().getChildrenExt(superstepPath, true, false, false);
759 } catch (KeeperException e) {
760 throw new IllegalStateException(
761 "getSuperstep: KeeperException", e);
762 } catch (InterruptedException e) {
763 throw new IllegalStateException(
764 "getSuperstep: InterruptedException", e);
765 }
766 if (superstepList.isEmpty()) {
767 cachedSuperstep = INPUT_SUPERSTEP;
768 } else {
769 cachedSuperstep =
770 Long.parseLong(Collections.max(superstepList));
771 }
772
773 return cachedSuperstep;
774 }
775
776
777
778
779 public final void incrCachedSuperstep() {
780 if (cachedSuperstep == UNSET_SUPERSTEP) {
781 throw new IllegalStateException(
782 "incrSuperstep: Invalid unset cached superstep " +
783 UNSET_SUPERSTEP);
784 }
785 ++cachedSuperstep;
786 }
787
788
789
790
791
792
793
794 public final void setCachedSuperstep(long superstep) {
795 cachedSuperstep = superstep;
796 }
797
798
799
800
801
802
803
804 public final void setApplicationAttempt(long applicationAttempt) {
805 cachedApplicationAttempt = applicationAttempt;
806 String superstepPath = getSuperstepPath(cachedApplicationAttempt);
807 try {
808 getZkExt().createExt(superstepPath,
809 null,
810 Ids.OPEN_ACL_UNSAFE,
811 CreateMode.PERSISTENT,
812 true);
813 } catch (KeeperException.NodeExistsException e) {
814 throw new IllegalArgumentException(
815 "setApplicationAttempt: Attempt already exists! - " +
816 superstepPath, e);
817 } catch (KeeperException e) {
818 throw new RuntimeException(
819 "setApplicationAttempt: KeeperException - " +
820 superstepPath, e);
821 } catch (InterruptedException e) {
822 throw new RuntimeException(
823 "setApplicationAttempt: InterruptedException - " +
824 superstepPath, e);
825 }
826 }
827
828
829
830
831
832
833
834
835 public void registerBspEvent(BspEvent event) {
836 registeredBspEvents.add(event);
837 }
838
839
840
841
842
843
844 protected GraphPartitionerFactory<I, V, E> getGraphPartitionerFactory() {
845 return graphPartitionerFactory;
846 }
847
848
849
850
851
852
853
854
855 protected boolean processEvent(WatchedEvent event) {
856 return false;
857 }
858
859 @Override
860 public final void process(WatchedEvent event) {
861
862
863 if (LOG.isDebugEnabled()) {
864 LOG.debug("process: Got a new event, path = " + event.getPath() +
865 ", type = " + event.getType() + ", state = " +
866 event.getState());
867 }
868
869 if ((event.getPath() == null) && (event.getType() == EventType.None)) {
870 if (event.getState() == KeeperState.Disconnected) {
871
872 for (BspEvent bspEvent : registeredBspEvents) {
873 bspEvent.signal();
874 }
875 LOG.warn("process: Disconnected from ZooKeeper (will automatically " +
876 "try to recover) " + event);
877 } else if (event.getState() == KeeperState.SyncConnected) {
878 if (LOG.isInfoEnabled()) {
879 LOG.info("process: Asynchronous connection complete.");
880 }
881 connectedEvent.signal();
882 } else {
883 LOG.warn("process: Got unknown null path event " + event);
884 }
885 return;
886 }
887
888 boolean eventProcessed = false;
889 if (event.getPath().startsWith(masterJobStatePath)) {
890
891
892 masterElectionChildrenChanged.signal();
893 eventProcessed = true;
894 } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) ||
895 event.getPath().contains(WORKER_UNHEALTHY_DIR)) &&
896 (event.getType() == EventType.NodeChildrenChanged)) {
897 if (LOG.isDebugEnabled()) {
898 LOG.debug("process: workerHealthRegistrationChanged " +
899 "(worker health reported - healthy/unhealthy )");
900 }
901 workerHealthRegistrationChanged.signal();
902 eventProcessed = true;
903 } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) &&
904 event.getType() == EventType.NodeCreated) {
905 if (LOG.isInfoEnabled()) {
906 LOG.info("process: all input splits done");
907 }
908 inputSplitsAllDoneEvent.signal();
909 eventProcessed = true;
910 } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) &&
911 event.getType() == EventType.NodeChildrenChanged) {
912 if (LOG.isDebugEnabled()) {
913 LOG.debug("process: worker done reading input splits");
914 }
915 inputSplitsWorkerDoneEvent.signal();
916 eventProcessed = true;
917 } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
918 event.getType() == EventType.NodeCreated) {
919 if (LOG.isInfoEnabled()) {
920 LOG.info("process: superstepFinished signaled");
921 }
922 superstepFinished.signal();
923 eventProcessed = true;
924 } else if (event.getPath().endsWith(applicationAttemptsPath) &&
925 event.getType() == EventType.NodeChildrenChanged) {
926 if (LOG.isInfoEnabled()) {
927 LOG.info("process: applicationAttemptChanged signaled");
928 }
929 applicationAttemptChanged.signal();
930 eventProcessed = true;
931 } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
932 event.getType() == EventType.NodeChildrenChanged) {
933 if (LOG.isInfoEnabled()) {
934 LOG.info("process: masterElectionChildrenChanged signaled");
935 }
936 masterElectionChildrenChanged.signal();
937 eventProcessed = true;
938 } else if (event.getPath().equals(cleanedUpPath) &&
939 event.getType() == EventType.NodeChildrenChanged) {
940 if (LOG.isInfoEnabled()) {
941 LOG.info("process: cleanedUpChildrenChanged signaled");
942 }
943 cleanedUpChildrenChanged.signal();
944 eventProcessed = true;
945 } else if (event.getPath().endsWith(COUNTERS_DIR) &&
946 event.getType() == EventType.NodeChildrenChanged) {
947 LOG.info("process: writtenCountersToZK signaled");
948 getWrittenCountersToZKEvent().signal();
949 eventProcessed = true;
950 }
951
952 if (!(processEvent(event)) && (!eventProcessed)) {
953 LOG.warn("process: Unknown and unprocessed event (path=" +
954 event.getPath() + ", type=" + event.getType() +
955 ", state=" + event.getState() + ")");
956 }
957 }
958
959
960
961
962
963
964
965 protected long getLastCheckpointedSuperstep() throws IOException {
966 return CheckpointingUtils.getLastCheckpointedSuperstep(getFs(),
967 savedCheckpointBasePath);
968 }
969
970 @Override
971 public JobProgressTracker getJobProgressTracker() {
972 return getGraphTaskManager().getJobProgressTracker();
973 }
974
975
976
977
978
979
980
981
982
983
984
985 protected int getWorkerId(WorkerInfo workerInfo) {
986 return getWorkerInfoList().indexOf(workerInfo);
987 }
988
989
990
991
992
993
994 protected WorkerInfo getWorkerInfoById(int id) {
995 return getWorkerInfoList().get(id);
996 }
997 }