This project has retired. For details please refer to its
Attic page.
BspServiceWorker xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.worker;
20
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.IOException;
24 import java.nio.charset.Charset;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.Queue;
33 import java.util.Set;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ConcurrentLinkedQueue;
36 import java.util.concurrent.TimeUnit;
37
38 import net.iharder.Base64;
39
40 import org.apache.giraph.bsp.ApplicationState;
41 import org.apache.giraph.bsp.BspService;
42 import org.apache.giraph.bsp.CentralizedServiceWorker;
43 import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
44 import org.apache.giraph.comm.ServerData;
45 import org.apache.giraph.comm.WorkerClient;
46 import org.apache.giraph.comm.WorkerClientRequestProcessor;
47 import org.apache.giraph.comm.WorkerServer;
48 import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
49 import org.apache.giraph.comm.messages.MessageStore;
50 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
51 import org.apache.giraph.comm.netty.NettyClient;
52 import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
53 import org.apache.giraph.comm.netty.NettyWorkerClient;
54 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
55 import org.apache.giraph.comm.netty.NettyWorkerServer;
56 import org.apache.giraph.comm.requests.PartitionStatsRequest;
57 import org.apache.giraph.conf.GiraphConstants;
58 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
59 import org.apache.giraph.counters.CustomCounter;
60 import org.apache.giraph.counters.CustomCounters;
61 import org.apache.giraph.edge.Edge;
62 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
63 import org.apache.giraph.graph.FinishedSuperstepStats;
64 import org.apache.giraph.graph.GlobalStats;
65 import org.apache.giraph.graph.GraphTaskManager;
66 import org.apache.giraph.graph.Vertex;
67 import org.apache.giraph.graph.VertexEdgeCount;
68 import org.apache.giraph.io.EdgeOutputFormat;
69 import org.apache.giraph.io.EdgeWriter;
70 import org.apache.giraph.io.VertexOutputFormat;
71 import org.apache.giraph.io.VertexWriter;
72 import org.apache.giraph.io.superstep_output.SuperstepOutput;
73 import org.apache.giraph.mapping.translate.TranslateEdge;
74 import org.apache.giraph.master.MasterInfo;
75 import org.apache.giraph.master.SuperstepClasses;
76 import org.apache.giraph.metrics.GiraphMetrics;
77 import org.apache.giraph.metrics.GiraphTimer;
78 import org.apache.giraph.metrics.GiraphTimerContext;
79 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
80 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
81 import org.apache.giraph.metrics.WorkerSuperstepMetrics;
82 import org.apache.giraph.ooc.OutOfCoreEngine;
83 import org.apache.giraph.partition.Partition;
84 import org.apache.giraph.partition.PartitionExchange;
85 import org.apache.giraph.partition.PartitionOwner;
86 import org.apache.giraph.partition.PartitionStats;
87 import org.apache.giraph.partition.PartitionStore;
88 import org.apache.giraph.partition.WorkerGraphPartitioner;
89 import org.apache.giraph.utils.BlockingElementsSet;
90 import org.apache.giraph.utils.CallableFactory;
91 import org.apache.giraph.utils.CheckpointingUtils;
92 import org.apache.giraph.utils.JMapHistoDumper;
93 import org.apache.giraph.utils.LoggerUtils;
94 import org.apache.giraph.utils.MemoryUtils;
95 import org.apache.giraph.utils.ProgressableUtils;
96 import org.apache.giraph.utils.ReactiveJMapHistoDumper;
97 import org.apache.giraph.utils.WritableUtils;
98 import org.apache.giraph.zk.BspEvent;
99 import org.apache.giraph.zk.PredicateLock;
100 import org.apache.hadoop.fs.FSDataInputStream;
101 import org.apache.hadoop.fs.FSDataOutputStream;
102 import org.apache.hadoop.fs.Path;
103 import org.apache.hadoop.io.Writable;
104 import org.apache.hadoop.io.WritableComparable;
105 import org.apache.hadoop.io.compress.CompressionCodec;
106 import org.apache.hadoop.io.compress.CompressionCodecFactory;
107 import org.apache.hadoop.mapreduce.Counter;
108 import org.apache.hadoop.mapreduce.Mapper;
109 import org.apache.hadoop.mapreduce.OutputCommitter;
110 import org.apache.log4j.Level;
111 import org.apache.log4j.Logger;
112 import org.apache.zookeeper.CreateMode;
113 import org.apache.zookeeper.KeeperException;
114 import org.apache.zookeeper.WatchedEvent;
115 import org.apache.zookeeper.Watcher.Event.EventType;
116 import org.apache.zookeeper.ZooDefs.Ids;
117 import org.apache.zookeeper.data.Stat;
118 import org.json.JSONArray;
119 import org.json.JSONException;
120 import org.json.JSONObject;
121
122 import com.google.common.collect.Lists;
123
124 import static org.apache.giraph.graph.GraphTaskManager.isConnectionResetByPeer;
125
126
127
128
129
130
131
132
133 @SuppressWarnings("rawtypes")
134 public class BspServiceWorker<I extends WritableComparable,
135 V extends Writable, E extends Writable>
136 extends BspService<I, V, E>
137 implements CentralizedServiceWorker<I, V, E>,
138 ResetSuperstepMetricsObserver {
139
140 public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
141
142 private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
143
144 private String myHealthZnode;
145
146 private final WorkerInfo workerInfo;
147
148 private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
149
150 private final LocalData<I, V, E, ? extends Writable> localData;
151
152 private final TranslateEdge<I, E> translateEdge;
153
154 private final WorkerClient<I, V, E> workerClient;
155
156 private final WorkerServer<I, V, E> workerServer;
157
158 private final WorkerAggregatorRequestProcessor
159 workerAggregatorRequestProcessor;
160
161 private MasterInfo masterInfo = new MasterInfo();
162
163 private List<WorkerInfo> workerInfoList = Lists.newArrayList();
164
165 private final BspEvent partitionExchangeChildrenChanged;
166
167
168 private BlockingElementsSet<AddressesAndPartitionsWritable>
169 addressesAndPartitionsHolder = new BlockingElementsSet<>();
170
171
172 private final WorkerContext workerContext;
173
174
175 private final WorkerAggregatorHandler globalCommHandler;
176
177
178 private final SuperstepOutput<I, V, E> superstepOutput;
179
180
181 private final WorkerObserver[] observers;
182
183 private final WorkerProgressWriter workerProgressWriter;
184
185
186
187 private GiraphTimer wcPostSuperstepTimer;
188
189 private GiraphTimer waitRequestsTimer;
190
191
192 private final WorkerInputSplitsHandler inputSplitsHandler;
193
194
195 private final MemoryObserver memoryObserver;
196
197
198
199
200
201
202
203
204
205 public BspServiceWorker(
206 Mapper<?, ?, ?, ?>.Context context,
207 GraphTaskManager<I, V, E> graphTaskManager)
208 throws IOException, InterruptedException {
209 super(context, graphTaskManager);
210 ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
211 localData = new LocalData<>(conf);
212 translateEdge = getConfiguration().edgeTranslationInstance();
213 if (translateEdge != null) {
214 translateEdge.initialize(this);
215 }
216 partitionExchangeChildrenChanged = new PredicateLock(context);
217 registerBspEvent(partitionExchangeChildrenChanged);
218 workerGraphPartitioner =
219 getGraphPartitionerFactory().createWorkerGraphPartitioner();
220 workerInfo = new WorkerInfo();
221 workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
222 graphTaskManager.createUncaughtExceptionHandler(
223 (thread, throwable) -> {
224
225
226
227 return !isConnectionResetByPeer(throwable);
228 }
229 )
230 );
231 workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
232 workerServer.getLocalHostOrIp());
233 workerInfo.setTaskId(getTaskId());
234 workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
235 graphTaskManager.createUncaughtExceptionHandler());
236 workerServer.setFlowControl(workerClient.getFlowControl());
237 OutOfCoreEngine oocEngine = workerServer.getServerData().getOocEngine();
238 if (oocEngine != null) {
239 oocEngine.setFlowControl(workerClient.getFlowControl());
240 }
241
242 workerAggregatorRequestProcessor =
243 new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
244
245 globalCommHandler = new WorkerAggregatorHandler(this, conf, context);
246
247 workerContext = conf.createWorkerContext();
248 workerContext.setWorkerGlobalCommUsage(globalCommHandler);
249
250 superstepOutput = conf.createSuperstepOutput(context);
251
252 if (conf.isJMapHistogramDumpEnabled()) {
253 conf.addWorkerObserverClass(JMapHistoDumper.class);
254 }
255 if (conf.isReactiveJmapHistogramDumpEnabled()) {
256 conf.addWorkerObserverClass(ReactiveJMapHistoDumper.class);
257 }
258 observers = conf.createWorkerObservers(context);
259
260 WorkerProgress.get().setTaskId(getTaskId());
261 workerProgressWriter = conf.trackJobProgressOnClient() ?
262 new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
263 null;
264
265 GiraphMetrics.get().addSuperstepResetObserver(this);
266
267 inputSplitsHandler = new WorkerInputSplitsHandler(
268 workerInfo, masterInfo.getTaskId(), workerClient);
269
270 memoryObserver = new MemoryObserver(getZkExt(), memoryObserverPath, conf);
271 }
272
273 @Override
274 public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
275 waitRequestsTimer = new GiraphTimer(superstepMetrics,
276 TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
277 wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
278 "worker-context-post-superstep", TimeUnit.MICROSECONDS);
279 }
280
281 @Override
282 public WorkerContext getWorkerContext() {
283 return workerContext;
284 }
285
286 @Override
287 public WorkerObserver[] getWorkerObservers() {
288 return observers;
289 }
290
291 @Override
292 public WorkerClient<I, V, E> getWorkerClient() {
293 return workerClient;
294 }
295
296 public LocalData<I, V, E, ? extends Writable> getLocalData() {
297 return localData;
298 }
299
300 public TranslateEdge<I, E> getTranslateEdge() {
301 return translateEdge;
302 }
303
304
305
306
307
308
309
310
311
312 public boolean isHealthy() {
313 return true;
314 }
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332 private VertexEdgeCount loadInputSplits(
333 CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
334 throws KeeperException, InterruptedException {
335 VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
336 int numThreads = getConfiguration().getNumInputSplitsThreads();
337 if (LOG.isInfoEnabled()) {
338 LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
339 "originally " + getConfiguration().getNumInputSplitsThreads() +
340 " threads(s)");
341 }
342
343 List<VertexEdgeCount> results =
344 ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
345 numThreads, "load-%d", getContext());
346 for (VertexEdgeCount result : results) {
347 vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
348 }
349
350 workerClient.waitAllRequests();
351 return vertexEdgeCount;
352 }
353
354
355
356
357
358
359
360 private long loadMapping() throws KeeperException,
361 InterruptedException {
362 MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
363 inputSplitsCallableFactory =
364 new MappingInputSplitsCallableFactory<>(
365 getConfiguration().createWrappedMappingInputFormat(),
366 getContext(),
367 getConfiguration(),
368 this,
369 inputSplitsHandler);
370
371 long mappingsLoaded =
372 loadInputSplits(inputSplitsCallableFactory).getMappingCount();
373
374
375 localData.getMappingStore().postFilling();
376 return mappingsLoaded;
377 }
378
379
380
381
382
383
384
385 private VertexEdgeCount loadVertices() throws KeeperException,
386 InterruptedException {
387 VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
388 new VertexInputSplitsCallableFactory<I, V, E>(
389 getConfiguration().createWrappedVertexInputFormat(),
390 getContext(),
391 getConfiguration(),
392 this,
393 inputSplitsHandler);
394
395 return loadInputSplits(inputSplitsCallableFactory);
396 }
397
398
399
400
401
402
403
404 private long loadEdges() throws KeeperException, InterruptedException {
405 EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
406 new EdgeInputSplitsCallableFactory<I, V, E>(
407 getConfiguration().createWrappedEdgeInputFormat(),
408 getContext(),
409 getConfiguration(),
410 this,
411 inputSplitsHandler);
412
413 return loadInputSplits(inputSplitsCallableFactory).getEdgeCount();
414 }
415
416 @Override
417 public MasterInfo getMasterInfo() {
418 return masterInfo;
419 }
420
421 @Override
422 public List<WorkerInfo> getWorkerInfoList() {
423 return workerInfoList;
424 }
425
426
427
428
429
430 private void markCurrentWorkerDoneReadingThenWaitForOthers() {
431 String workerInputSplitsDonePath =
432 inputSplitsWorkerDonePath + "/" + getWorkerInfo().getHostnameId();
433 try {
434 getZkExt().createExt(workerInputSplitsDonePath,
435 null,
436 Ids.OPEN_ACL_UNSAFE,
437 CreateMode.PERSISTENT,
438 true);
439 } catch (KeeperException e) {
440 throw new IllegalStateException(
441 "markCurrentWorkerDoneThenWaitForOthers: " +
442 "KeeperException creating worker done splits", e);
443 } catch (InterruptedException e) {
444 throw new IllegalStateException(
445 "markCurrentWorkerDoneThenWaitForOthers: " +
446 "InterruptedException creating worker done splits", e);
447 }
448 while (true) {
449 Stat inputSplitsDoneStat;
450 try {
451 inputSplitsDoneStat =
452 getZkExt().exists(inputSplitsAllDonePath, true);
453 } catch (KeeperException e) {
454 throw new IllegalStateException(
455 "markCurrentWorkerDoneThenWaitForOthers: " +
456 "KeeperException waiting on worker done splits", e);
457 } catch (InterruptedException e) {
458 throw new IllegalStateException(
459 "markCurrentWorkerDoneThenWaitForOthers: " +
460 "InterruptedException waiting on worker done splits", e);
461 }
462 if (inputSplitsDoneStat != null) {
463 break;
464 }
465 getInputSplitsAllDoneEvent().waitForTimeoutOrFail(
466 GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
467 getConfiguration()));
468 getInputSplitsAllDoneEvent().reset();
469 }
470 }
471
472 @Override
473 public FinishedSuperstepStats setup() {
474
475
476
477
478
479
480
481 if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
482 setCachedSuperstep(getRestartedSuperstep());
483 return new FinishedSuperstepStats(0, false, 0, 0, true,
484 CheckpointStatus.NONE);
485 }
486
487 JSONObject jobState = getJobState();
488 if (jobState != null) {
489 try {
490 if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
491 ApplicationState.START_SUPERSTEP) &&
492 jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
493 getSuperstep()) {
494 if (LOG.isInfoEnabled()) {
495 LOG.info("setup: Restarting from an automated " +
496 "checkpointed superstep " +
497 getSuperstep() + ", attempt " +
498 getApplicationAttempt());
499 }
500 setRestartedSuperstep(getSuperstep());
501 return new FinishedSuperstepStats(0, false, 0, 0, true,
502 CheckpointStatus.NONE);
503 }
504 } catch (JSONException e) {
505 throw new RuntimeException(
506 "setup: Failed to get key-values from " +
507 jobState.toString(), e);
508 }
509 }
510
511
512 Collection<? extends PartitionOwner> masterSetPartitionOwners =
513 startSuperstep();
514 workerGraphPartitioner.updatePartitionOwners(
515 getWorkerInfo(), masterSetPartitionOwners);
516 getPartitionStore().initialize();
517
518
519
520
521 workerClient.setup(getConfiguration().authenticate());
522
523
524
525
526 globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
527
528 VertexEdgeCount vertexEdgeCount;
529 long entriesLoaded;
530
531 if (getConfiguration().hasMappingInputFormat()) {
532 getContext().progress();
533 try {
534 entriesLoaded = loadMapping();
535
536
537 getGraphPartitionerFactory().initialize(localData);
538 } catch (InterruptedException e) {
539 throw new IllegalStateException(
540 "setup: loadMapping failed with InterruptedException", e);
541 } catch (KeeperException e) {
542 throw new IllegalStateException(
543 "setup: loadMapping failed with KeeperException", e);
544 }
545 getContext().progress();
546 if (LOG.isInfoEnabled()) {
547 LOG.info("setup: Finally loaded a total of " +
548 entriesLoaded + " entries from inputSplits");
549 }
550
551
552
553 localData.printStats();
554 }
555
556 if (getConfiguration().hasVertexInputFormat()) {
557 getContext().progress();
558 try {
559 vertexEdgeCount = loadVertices();
560 } catch (InterruptedException e) {
561 throw new IllegalStateException(
562 "setup: loadVertices failed with InterruptedException", e);
563 } catch (KeeperException e) {
564 throw new IllegalStateException(
565 "setup: loadVertices failed with KeeperException", e);
566 }
567 getContext().progress();
568 } else {
569 vertexEdgeCount = new VertexEdgeCount();
570 }
571 WorkerProgress.get().finishLoadingVertices();
572
573 if (getConfiguration().hasEdgeInputFormat()) {
574 getContext().progress();
575 try {
576 vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
577 } catch (InterruptedException e) {
578 throw new IllegalStateException(
579 "setup: loadEdges failed with InterruptedException", e);
580 } catch (KeeperException e) {
581 throw new IllegalStateException(
582 "setup: loadEdges failed with KeeperException", e);
583 }
584 getContext().progress();
585 }
586 WorkerProgress.get().finishLoadingEdges();
587
588 if (LOG.isInfoEnabled()) {
589 LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
590 }
591
592 markCurrentWorkerDoneReadingThenWaitForOthers();
593
594
595 for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
596 if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
597 !getPartitionStore().hasPartition(
598 partitionOwner.getPartitionId())) {
599 Partition<I, V, E> partition =
600 getConfiguration().createPartition(
601 partitionOwner.getPartitionId(), getContext());
602 getPartitionStore().addPartition(partition);
603 }
604 }
605
606
607 localData.removeMappingStoreIfPossible();
608
609 if (getConfiguration().hasEdgeInputFormat()) {
610
611 getServerData().getEdgeStore().moveEdgesToVertices();
612 }
613
614
615
616 List<PartitionStats> partitionStatsList =
617 new ArrayList<PartitionStats>();
618 PartitionStore<I, V, E> partitionStore = getPartitionStore();
619 for (Integer partitionId : partitionStore.getPartitionIds()) {
620 PartitionStats partitionStats =
621 new PartitionStats(partitionId,
622 partitionStore.getPartitionVertexCount(partitionId),
623 0,
624 partitionStore.getPartitionEdgeCount(partitionId),
625 0,
626 0,
627 workerInfo.getHostnameId());
628 partitionStatsList.add(partitionStats);
629 }
630 workerGraphPartitioner.finalizePartitionStats(
631 partitionStatsList, getPartitionStore());
632
633 return finishSuperstep(partitionStatsList, null);
634 }
635
636
637
638
639 private void registerHealth() {
640
641 String myHealthPath = null;
642 if (isHealthy()) {
643 myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
644 getSuperstep());
645 } else {
646 myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
647 getSuperstep());
648 }
649 myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
650 try {
651 myHealthZnode = getZkExt().createExt(
652 myHealthPath,
653 WritableUtils.writeToByteArray(workerInfo),
654 Ids.OPEN_ACL_UNSAFE,
655 CreateMode.EPHEMERAL,
656 true);
657 } catch (KeeperException.NodeExistsException e) {
658 LOG.warn("registerHealth: myHealthPath already exists (likely " +
659 "from previous failure): " + myHealthPath +
660 ". Waiting for change in attempts " +
661 "to re-join the application");
662 getApplicationAttemptChangedEvent().waitForTimeoutOrFail(
663 GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
664 getConfiguration()));
665 if (LOG.isInfoEnabled()) {
666 LOG.info("registerHealth: Got application " +
667 "attempt changed event, killing self");
668 }
669 throw new IllegalStateException(
670 "registerHealth: Trying " +
671 "to get the new application attempt by killing self", e);
672 } catch (KeeperException e) {
673 throw new IllegalStateException("Creating " + myHealthPath +
674 " failed with KeeperException", e);
675 } catch (InterruptedException e) {
676 throw new IllegalStateException("Creating " + myHealthPath +
677 " failed with InterruptedException", e);
678 }
679 if (LOG.isInfoEnabled()) {
680 LOG.info("registerHealth: Created my health node for attempt=" +
681 getApplicationAttempt() + ", superstep=" +
682 getSuperstep() + " with " + myHealthZnode +
683 " and workerInfo= " + workerInfo);
684 }
685 }
686
687
688
689
690 private void unregisterHealth() {
691 LOG.error("unregisterHealth: Got failure, unregistering health on " +
692 myHealthZnode + " on superstep " + getSuperstep());
693 try {
694 getZkExt().deleteExt(myHealthZnode, -1, false);
695 } catch (InterruptedException e) {
696 throw new IllegalStateException(
697 "unregisterHealth: InterruptedException - Couldn't delete " +
698 myHealthZnode, e);
699 } catch (KeeperException e) {
700 throw new IllegalStateException(
701 "unregisterHealth: KeeperException - Couldn't delete " +
702 myHealthZnode, e);
703 }
704 }
705
706 @Override
707 public void failureCleanup() {
708 unregisterHealth();
709 }
710
711 @Override
712 public Collection<? extends PartitionOwner> startSuperstep() {
713
714
715
716
717
718
719 if (getSuperstep() != INPUT_SUPERSTEP) {
720 workerServer.prepareSuperstep();
721 }
722
723 registerHealth();
724
725 AddressesAndPartitionsWritable addressesAndPartitions =
726 addressesAndPartitionsHolder.getElement(getContext());
727
728 workerInfoList.clear();
729 workerInfoList = addressesAndPartitions.getWorkerInfos();
730 masterInfo = addressesAndPartitions.getMasterInfo();
731 workerServer.resetBytesReceivedPerSuperstep();
732
733 if (LOG.isInfoEnabled()) {
734 LOG.info("startSuperstep: " + masterInfo);
735 }
736
737 getContext().setStatus("startSuperstep: " +
738 getGraphTaskManager().getGraphFunctions().toString() +
739 " - Attempt=" + getApplicationAttempt() +
740 ", Superstep=" + getSuperstep());
741
742 if (LOG.isDebugEnabled()) {
743 LOG.debug("startSuperstep: addressesAndPartitions" +
744 addressesAndPartitions.getWorkerInfos());
745 for (PartitionOwner partitionOwner : addressesAndPartitions
746 .getPartitionOwners()) {
747 LOG.debug(partitionOwner.getPartitionId() + " " +
748 partitionOwner.getWorkerInfo());
749 }
750 }
751
752 return addressesAndPartitions.getPartitionOwners();
753 }
754
755 @Override
756 public FinishedSuperstepStats finishSuperstep(
757 List<PartitionStats> partitionStatsList,
758 GiraphTimerContext superstepTimerContext) {
759
760
761
762
763
764
765
766
767
768
769
770
771 waitForRequestsToFinish();
772
773 getGraphTaskManager().notifyFinishedCommunication();
774
775 long workerSentMessages = 0;
776 long workerSentMessageBytes = 0;
777 long localVertices = 0;
778 for (PartitionStats partitionStats : partitionStatsList) {
779 workerSentMessages += partitionStats.getMessagesSentCount();
780 workerSentMessageBytes += partitionStats.getMessageBytesSentCount();
781 localVertices += partitionStats.getVertexCount();
782 }
783
784 if (getSuperstep() != INPUT_SUPERSTEP) {
785 postSuperstepCallbacks();
786 }
787
788 globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
789
790 MessageStore<I, Writable> incomingMessageStore =
791 getServerData().getIncomingMessageStore();
792 if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
793 ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
794 }
795
796 if (LOG.isInfoEnabled()) {
797 LOG.info("finishSuperstep: Superstep " + getSuperstep() +
798 ", messages = " + workerSentMessages + " " +
799 ", message bytes = " + workerSentMessageBytes + " , " +
800 MemoryUtils.getRuntimeMemoryStats());
801 }
802
803 if (superstepTimerContext != null) {
804 superstepTimerContext.stop();
805 }
806 writeFinshedSuperstepInfoToZK(partitionStatsList,
807 workerSentMessages, workerSentMessageBytes);
808
809
810
811 storeCountersInZooKeeper(false);
812
813 LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
814 "finishSuperstep: (waiting for rest " +
815 "of workers) " +
816 getGraphTaskManager().getGraphFunctions().toString() +
817 " - Attempt=" + getApplicationAttempt() +
818 ", Superstep=" + getSuperstep());
819
820 String superstepFinishedNode =
821 getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
822
823 waitForOtherWorkers(superstepFinishedNode);
824
825 GlobalStats globalStats = new GlobalStats();
826 SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
827 getConfiguration());
828 WritableUtils.readFieldsFromZnode(
829 getZkExt(), superstepFinishedNode, false, null, globalStats,
830 superstepClasses);
831 if (LOG.isInfoEnabled()) {
832 LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
833 " with global stats " + globalStats + " and classes " +
834 superstepClasses);
835 }
836 getContext().setStatus("finishSuperstep: (all workers done) " +
837 getGraphTaskManager().getGraphFunctions().toString() +
838 " - Attempt=" + getApplicationAttempt() +
839 ", Superstep=" + getSuperstep());
840 incrCachedSuperstep();
841 getConfiguration().updateSuperstepClasses(superstepClasses);
842
843 return new FinishedSuperstepStats(
844 localVertices,
845 globalStats.getHaltComputation(),
846 globalStats.getVertexCount(),
847 globalStats.getEdgeCount(),
848 false,
849 globalStats.getCheckpointStatus());
850 }
851
852
853
854
855 private void postSuperstepCallbacks() {
856 GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
857 getWorkerContext().postSuperstep();
858 timerContext.stop();
859 getContext().progress();
860
861 for (WorkerObserver obs : getWorkerObservers()) {
862 obs.postSuperstep(getSuperstep());
863 getContext().progress();
864 }
865 }
866
867
868
869
870 private void waitForRequestsToFinish() {
871 if (LOG.isInfoEnabled()) {
872 LOG.info("finishSuperstep: Waiting on all requests, superstep " +
873 getSuperstep() + " " +
874 MemoryUtils.getRuntimeMemoryStats());
875 }
876 GiraphTimerContext timerContext = waitRequestsTimer.time();
877 workerClient.waitAllRequests();
878 timerContext.stop();
879 }
880
881
882
883
884
885
886 private void waitForOtherWorkers(String superstepFinishedNode) {
887 try {
888 while (getZkExt().exists(superstepFinishedNode, true) == null) {
889 getSuperstepFinishedEvent().waitForTimeoutOrFail(
890 GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
891 getConfiguration()));
892 getSuperstepFinishedEvent().reset();
893 }
894 } catch (KeeperException e) {
895 throw new IllegalStateException(
896 "finishSuperstep: Failed while waiting for master to " +
897 "signal completion of superstep " + getSuperstep(), e);
898 } catch (InterruptedException e) {
899 throw new IllegalStateException(
900 "finishSuperstep: Failed while waiting for master to " +
901 "signal completion of superstep " + getSuperstep(), e);
902 }
903 }
904
905
906
907
908
909
910
911
912
913 private void writeFinshedSuperstepInfoToZK(
914 List<PartitionStats> partitionStatsList, long workerSentMessages,
915 long workerSentMessageBytes) {
916 Collection<PartitionStats> finalizedPartitionStats =
917 workerGraphPartitioner.finalizePartitionStats(
918 partitionStatsList, getPartitionStore());
919 workerClient.sendWritableRequest(masterInfo.getTaskId(),
920 new PartitionStatsRequest(finalizedPartitionStats));
921 WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
922 metrics.readFromRegistry();
923 byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
924
925 JSONObject workerFinishedInfoObj = new JSONObject();
926 try {
927 workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
928 workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGE_BYTES_KEY,
929 workerSentMessageBytes);
930 workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
931 Base64.encodeBytes(metricsBytes));
932 } catch (JSONException e) {
933 throw new RuntimeException(e);
934 }
935
936 String finishedWorkerPath =
937 getWorkerMetricsFinishedPath(getApplicationAttempt(), getSuperstep()) +
938 "/" + workerInfo.getHostnameId();
939 try {
940 getZkExt().createExt(finishedWorkerPath,
941 workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
942 Ids.OPEN_ACL_UNSAFE,
943 CreateMode.PERSISTENT,
944 true);
945 } catch (KeeperException.NodeExistsException e) {
946 LOG.warn("finishSuperstep: finished worker path " +
947 finishedWorkerPath + " already exists!");
948 } catch (KeeperException e) {
949 throw new IllegalStateException("Creating " + finishedWorkerPath +
950 " failed with KeeperException", e);
951 } catch (InterruptedException e) {
952 throw new IllegalStateException("Creating " + finishedWorkerPath +
953 " failed with InterruptedException", e);
954 }
955 }
956
957
958
959
960
961
962
963
964 private void saveVertices(long numLocalVertices) throws IOException,
965 InterruptedException {
966 ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
967
968 if (conf.getVertexOutputFormatClass() == null) {
969 LOG.warn("saveVertices: " +
970 GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
971 " not specified -- there will be no saved output");
972 return;
973 }
974 if (conf.doOutputDuringComputation()) {
975 if (LOG.isInfoEnabled()) {
976 LOG.info("saveVertices: The option for doing output during " +
977 "computation is selected, so there will be no saving of the " +
978 "output in the end of application");
979 }
980 return;
981 }
982
983 final int numPartitions = getPartitionStore().getNumPartitions();
984 int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
985 numPartitions);
986 LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
987 "saveVertices: Starting to save " + numLocalVertices + " vertices " +
988 "using " + numThreads + " threads");
989 final VertexOutputFormat<I, V, E> vertexOutputFormat =
990 getConfiguration().createWrappedVertexOutputFormat();
991 vertexOutputFormat.preWriting(getContext());
992
993 getPartitionStore().startIteration();
994
995 long verticesToStore = 0;
996 PartitionStore<I, V, E> partitionStore = getPartitionStore();
997 for (int partitionId : partitionStore.getPartitionIds()) {
998 verticesToStore += partitionStore.getPartitionVertexCount(partitionId);
999 }
1000 WorkerProgress.get().startStoring(
1001 verticesToStore, getPartitionStore().getNumPartitions());
1002
1003 CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1004 @Override
1005 public Callable<Void> newCallable(int callableId) {
1006 return new Callable<Void>() {
1007
1008 private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
1009
1010 @Override
1011 public Void call() throws Exception {
1012 VertexWriter<I, V, E> vertexWriter =
1013 vertexOutputFormat.createVertexWriter(getContext());
1014 vertexWriter.setConf(getConfiguration());
1015 vertexWriter.initialize(getContext());
1016 long nextPrintVertices = 0;
1017 long nextUpdateProgressVertices = VERTICES_TO_UPDATE_PROGRESS;
1018 long nextPrintMsecs = System.currentTimeMillis() + 15000;
1019 int partitionIndex = 0;
1020 int numPartitions = getPartitionStore().getNumPartitions();
1021 while (true) {
1022 Partition<I, V, E> partition =
1023 getPartitionStore().getNextPartition();
1024 if (partition == null) {
1025 break;
1026 }
1027
1028 long verticesWritten = 0;
1029 for (Vertex<I, V, E> vertex : partition) {
1030 vertexWriter.writeVertex(vertex);
1031 ++verticesWritten;
1032
1033
1034 if (verticesWritten > nextPrintVertices &&
1035 System.currentTimeMillis() > nextPrintMsecs) {
1036 LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1037 "saveVertices: Saved " + verticesWritten + " out of " +
1038 partition.getVertexCount() + " partition vertices, " +
1039 "on partition " + partitionIndex +
1040 " out of " + numPartitions);
1041 nextPrintMsecs = System.currentTimeMillis() + 15000;
1042 nextPrintVertices = verticesWritten + 250000;
1043 }
1044
1045 if (verticesWritten >= nextUpdateProgressVertices) {
1046 WorkerProgress.get().addVerticesStored(
1047 VERTICES_TO_UPDATE_PROGRESS);
1048 nextUpdateProgressVertices += VERTICES_TO_UPDATE_PROGRESS;
1049 }
1050 }
1051 getPartitionStore().putPartition(partition);
1052 ++partitionIndex;
1053 WorkerProgress.get().addVerticesStored(
1054 verticesWritten % VERTICES_TO_UPDATE_PROGRESS);
1055 WorkerProgress.get().incrementPartitionsStored();
1056 }
1057 vertexWriter.close(getContext());
1058 return null;
1059 }
1060 };
1061 }
1062 };
1063 ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1064 "save-vertices-%d", getContext());
1065
1066 vertexOutputFormat.postWriting(getContext());
1067
1068 LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1069 "saveVertices: Done saving vertices.");
1070
1071 if (getConfiguration().isPureYarnJob() &&
1072 getConfiguration().getVertexOutputFormatClass() != null) {
1073 try {
1074 OutputCommitter outputCommitter =
1075 vertexOutputFormat.getOutputCommitter(getContext());
1076 if (outputCommitter.needsTaskCommit(getContext())) {
1077 LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1078 "OutputCommitter: committing task output.");
1079
1080
1081 outputCommitter.commitTask(getContext());
1082 }
1083 } catch (InterruptedException ie) {
1084 LOG.error("Interrupted while attempting to obtain " +
1085 "OutputCommitter.", ie);
1086 } catch (IOException ioe) {
1087 LOG.error("Master task's attempt to commit output has " +
1088 "FAILED.", ioe);
1089 }
1090 }
1091 }
1092
1093
1094
1095
1096
1097
1098
1099 private void saveEdges() throws IOException, InterruptedException {
1100 final ImmutableClassesGiraphConfiguration<I, V, E> conf =
1101 getConfiguration();
1102
1103 if (conf.getEdgeOutputFormatClass() == null) {
1104 LOG.warn("saveEdges: " +
1105 GiraphConstants.EDGE_OUTPUT_FORMAT_CLASS +
1106 "Make sure that the EdgeOutputFormat is not required.");
1107 return;
1108 }
1109
1110 final int numPartitions = getPartitionStore().getNumPartitions();
1111 int numThreads = Math.min(conf.getNumOutputThreads(),
1112 numPartitions);
1113 LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1114 "saveEdges: Starting to save the edges using " +
1115 numThreads + " threads");
1116 final EdgeOutputFormat<I, V, E> edgeOutputFormat =
1117 conf.createWrappedEdgeOutputFormat();
1118 edgeOutputFormat.preWriting(getContext());
1119
1120 getPartitionStore().startIteration();
1121
1122 CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1123 @Override
1124 public Callable<Void> newCallable(int callableId) {
1125 return new Callable<Void>() {
1126 @Override
1127 public Void call() throws Exception {
1128 EdgeWriter<I, V, E> edgeWriter =
1129 edgeOutputFormat.createEdgeWriter(getContext());
1130 edgeWriter.setConf(conf);
1131 edgeWriter.initialize(getContext());
1132
1133 long nextPrintVertices = 0;
1134 long nextPrintMsecs = System.currentTimeMillis() + 15000;
1135 int partitionIndex = 0;
1136 int numPartitions = getPartitionStore().getNumPartitions();
1137 while (true) {
1138 Partition<I, V, E> partition =
1139 getPartitionStore().getNextPartition();
1140 if (partition == null) {
1141 break;
1142 }
1143
1144 long vertices = 0;
1145 long edges = 0;
1146 long partitionEdgeCount = partition.getEdgeCount();
1147 for (Vertex<I, V, E> vertex : partition) {
1148 for (Edge<I, E> edge : vertex.getEdges()) {
1149 edgeWriter.writeEdge(vertex.getId(), vertex.getValue(), edge);
1150 ++edges;
1151 }
1152 ++vertices;
1153
1154
1155 if (vertices > nextPrintVertices &&
1156 System.currentTimeMillis() > nextPrintMsecs) {
1157 LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1158 "saveEdges: Saved " + edges +
1159 " edges out of " + partitionEdgeCount +
1160 " partition edges, on partition " + partitionIndex +
1161 " out of " + numPartitions);
1162 nextPrintMsecs = System.currentTimeMillis() + 15000;
1163 nextPrintVertices = vertices + 250000;
1164 }
1165 }
1166 getPartitionStore().putPartition(partition);
1167 ++partitionIndex;
1168 }
1169 edgeWriter.close(getContext());
1170 return null;
1171 }
1172 };
1173 }
1174 };
1175 ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1176 "save-vertices-%d", getContext());
1177
1178 edgeOutputFormat.postWriting(getContext());
1179
1180 LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1181 "saveEdges: Done saving edges.");
1182
1183 if (conf.isPureYarnJob() &&
1184 conf.getVertexOutputFormatClass() != null) {
1185 try {
1186 OutputCommitter outputCommitter =
1187 edgeOutputFormat.getOutputCommitter(getContext());
1188 if (outputCommitter.needsTaskCommit(getContext())) {
1189 LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1190 "OutputCommitter: committing task output.");
1191
1192
1193 outputCommitter.commitTask(getContext());
1194 }
1195 } catch (InterruptedException ie) {
1196 LOG.error("Interrupted while attempting to obtain " +
1197 "OutputCommitter.", ie);
1198 } catch (IOException ioe) {
1199 LOG.error("Master task's attempt to commit output has " +
1200 "FAILED.", ioe);
1201 }
1202 }
1203 }
1204
1205 @Override
1206 public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
1207 throws IOException, InterruptedException {
1208 workerClient.closeConnections();
1209 setCachedSuperstep(getSuperstep() - 1);
1210 if (finishedSuperstepStats.getCheckpointStatus() !=
1211 CheckpointStatus.CHECKPOINT_AND_HALT) {
1212 saveVertices(finishedSuperstepStats.getLocalVertexCount());
1213 saveEdges();
1214 }
1215 WorkerProgress.get().finishStoring();
1216 if (workerProgressWriter != null) {
1217 workerProgressWriter.stop();
1218 }
1219 getPartitionStore().shutdown();
1220
1221
1222
1223
1224 String workerCleanedUpPath = cleanedUpPath + "/" +
1225 getTaskId() + WORKER_SUFFIX;
1226 try {
1227 String finalFinishedPath =
1228 getZkExt().createExt(workerCleanedUpPath,
1229 null,
1230 Ids.OPEN_ACL_UNSAFE,
1231 CreateMode.PERSISTENT,
1232 true);
1233 if (LOG.isInfoEnabled()) {
1234 LOG.info("cleanup: Notifying master its okay to cleanup with " +
1235 finalFinishedPath);
1236 }
1237 } catch (KeeperException.NodeExistsException e) {
1238 if (LOG.isInfoEnabled()) {
1239 LOG.info("cleanup: Couldn't create finished node '" +
1240 workerCleanedUpPath);
1241 }
1242 } catch (KeeperException e) {
1243
1244 LOG.error("cleanup: Got KeeperException on notification " +
1245 "to master about cleanup", e);
1246 } catch (InterruptedException e) {
1247
1248 LOG.error("cleanup: Got InterruptedException on notification " +
1249 "to master about cleanup", e);
1250 }
1251 }
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262 public void storeCountersInZooKeeper(boolean allSuperstepsDone) {
1263 Set<CustomCounter> additionalCounters =
1264 CustomCounters.getAndClearCustomCounters();
1265
1266 JSONArray jsonCounters = new JSONArray();
1267 Mapper.Context context = getContext();
1268 Counter counter;
1269 for (CustomCounter customCounter : additionalCounters) {
1270 String groupName = customCounter.getGroupName();
1271 String counterName = customCounter.getCounterName();
1272 counter = context.getCounter(groupName, counterName);
1273 customCounter.setValue(counter.getValue());
1274 jsonCounters.put(Base64.encodeBytes(
1275 WritableUtils.writeToByteArray(customCounter)));
1276 }
1277
1278 Map<String, Set<String>> nettyCounters =
1279 NettyClient.getCounterGroupsAndNames();
1280 for (Map.Entry<String, Set<String>> entry : nettyCounters.entrySet()) {
1281 String groupName = entry.getKey();
1282 for (String counterName: entry.getValue()) {
1283 CustomCounter customCounter = new CustomCounter(groupName, counterName,
1284 CustomCounter.Aggregation.SUM);
1285 counter = context.getCounter(groupName, counterName);
1286 customCounter.setValue(counter.getValue());
1287 jsonCounters.put(Base64.encodeBytes(
1288 WritableUtils.writeToByteArray(customCounter)));
1289 }
1290 }
1291 long superStep = getSuperstep() + (allSuperstepsDone ? 1 : 0);
1292 String finishedWorkerPath =
1293 getWorkerCountersFinishedPath(getApplicationAttempt(), superStep) +
1294 "/" + workerInfo.getHostnameId();
1295 LOG.info(String.format("Writing counters to zookeeper for superstep: %d",
1296 superStep));
1297 try {
1298 getZkExt().createExt(finishedWorkerPath,
1299 jsonCounters.toString().getBytes(
1300 Charset.defaultCharset()),
1301 Ids.OPEN_ACL_UNSAFE,
1302 CreateMode.PERSISTENT,
1303 true);
1304 } catch (KeeperException.NodeExistsException e) {
1305 LOG.warn("storeCountersInZookeeper: finished worker path " +
1306 finishedWorkerPath + " already exists!");
1307 } catch (KeeperException e) {
1308 LOG.warn("Creating " + finishedWorkerPath +
1309 " failed with KeeperException", e);
1310 } catch (InterruptedException e) {
1311 LOG.warn("Creating " + finishedWorkerPath +
1312 " failed with InterruptedException", e);
1313 }
1314 }
1315
1316
1317
1318
1319
1320
1321 public void closeZooKeeper() {
1322 try {
1323 getZkExt().close();
1324 } catch (InterruptedException e) {
1325
1326 LOG.error("cleanup: Zookeeper failed to close with " + e);
1327 }
1328
1329 if (getConfiguration().metricsEnabled()) {
1330 GiraphMetrics.get().dumpToStream(System.err);
1331 }
1332
1333
1334
1335
1336 workerServer.close();
1337 }
1338
1339 @Override
1340 public void storeCheckpoint() throws IOException {
1341 LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
1342 "storeCheckpoint: Starting checkpoint " +
1343 getGraphTaskManager().getGraphFunctions().toString() +
1344 " - Attempt=" + getApplicationAttempt() +
1345 ", Superstep=" + getSuperstep());
1346
1347
1348
1349 Path metadataFilePath = createCheckpointFilePathSafe(
1350 CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
1351 Path validFilePath = createCheckpointFilePathSafe(
1352 CheckpointingUtils.CHECKPOINT_VALID_POSTFIX);
1353 Path checkpointFilePath = createCheckpointFilePathSafe(
1354 CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
1355
1356
1357
1358
1359 FSDataOutputStream metadataOutputStream =
1360 getFs().create(metadataFilePath);
1361 metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
1362
1363 for (Integer partitionId : getPartitionStore().getPartitionIds()) {
1364 metadataOutputStream.writeInt(partitionId);
1365 }
1366 metadataOutputStream.close();
1367
1368 storeCheckpointVertices();
1369
1370 FSDataOutputStream checkpointOutputStream =
1371 getFs().create(checkpointFilePath);
1372 workerContext.write(checkpointOutputStream);
1373 getContext().progress();
1374
1375
1376
1377 for (Integer partitionId : getPartitionStore().getPartitionIds()) {
1378
1379 checkpointOutputStream.writeInt(partitionId);
1380 getServerData().getCurrentMessageStore()
1381 .writePartition(checkpointOutputStream, partitionId);
1382 getContext().progress();
1383
1384 }
1385
1386 List<Writable> w2wMessages =
1387 getServerData().getCurrentWorkerToWorkerMessages();
1388 WritableUtils.writeList(w2wMessages, checkpointOutputStream);
1389
1390 checkpointOutputStream.close();
1391
1392 getFs().createNewFile(validFilePath);
1393
1394
1395 String workerWroteCheckpoint =
1396 getWorkerWroteCheckpointPath(getApplicationAttempt(),
1397 getSuperstep()) + "/" + workerInfo.getHostnameId();
1398 try {
1399 getZkExt().createExt(workerWroteCheckpoint,
1400 new byte[0],
1401 Ids.OPEN_ACL_UNSAFE,
1402 CreateMode.PERSISTENT,
1403 true);
1404 } catch (KeeperException.NodeExistsException e) {
1405 LOG.warn("storeCheckpoint: wrote checkpoint worker path " +
1406 workerWroteCheckpoint + " already exists!");
1407 } catch (KeeperException e) {
1408 throw new IllegalStateException("Creating " + workerWroteCheckpoint +
1409 " failed with KeeperException", e);
1410 } catch (InterruptedException e) {
1411 throw new IllegalStateException("Creating " +
1412 workerWroteCheckpoint +
1413 " failed with InterruptedException", e);
1414 }
1415 }
1416
1417
1418
1419
1420
1421
1422
1423 private Path createCheckpointFilePathSafe(String name) throws IOException {
1424 Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + '.' +
1425 getWorkerId(workerInfo) + name);
1426
1427
1428 if (getFs().delete(validFilePath, false)) {
1429 LOG.warn("storeCheckpoint: Removed " + name + " file " +
1430 validFilePath);
1431 }
1432 return validFilePath;
1433 }
1434
1435
1436
1437
1438
1439
1440
1441
1442 private Path getSavedCheckpoint(long superstep, String name) {
1443 return new Path(getSavedCheckpointBasePath(superstep) + '.' +
1444 getWorkerId(workerInfo) + name);
1445 }
1446
1447
1448
1449
1450
1451 private void storeCheckpointVertices() {
1452 final int numPartitions = getPartitionStore().getNumPartitions();
1453 int numThreads = Math.min(
1454 GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
1455 numPartitions);
1456
1457 getPartitionStore().startIteration();
1458
1459 final CompressionCodec codec =
1460 new CompressionCodecFactory(getConfiguration())
1461 .getCodec(new Path(
1462 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
1463 .get(getConfiguration())));
1464
1465 long t0 = System.currentTimeMillis();
1466
1467 CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1468 @Override
1469 public Callable<Void> newCallable(int callableId) {
1470 return new Callable<Void>() {
1471
1472 @Override
1473 public Void call() throws Exception {
1474 while (true) {
1475 Partition<I, V, E> partition =
1476 getPartitionStore().getNextPartition();
1477 if (partition == null) {
1478 break;
1479 }
1480 Path path =
1481 createCheckpointFilePathSafe("_" + partition.getId() +
1482 CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
1483
1484 FSDataOutputStream uncompressedStream =
1485 getFs().create(path);
1486
1487
1488 DataOutputStream stream = codec == null ? uncompressedStream :
1489 new DataOutputStream(
1490 codec.createOutputStream(uncompressedStream));
1491
1492
1493 partition.write(stream);
1494
1495 getPartitionStore().putPartition(partition);
1496
1497 stream.close();
1498 uncompressedStream.close();
1499 }
1500 return null;
1501 }
1502
1503
1504 };
1505 }
1506 };
1507
1508 ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1509 "checkpoint-vertices-%d", getContext());
1510
1511 LOG.info("Save checkpoint in " + (System.currentTimeMillis() - t0) +
1512 " ms, using " + numThreads + " threads");
1513 }
1514
1515
1516
1517
1518
1519
1520 private void loadCheckpointVertices(final long superstep,
1521 List<Integer> partitions) {
1522 int numThreads = Math.min(
1523 GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()),
1524 partitions.size());
1525
1526 final Queue<Integer> partitionIdQueue =
1527 new ConcurrentLinkedQueue<>(partitions);
1528
1529 final CompressionCodec codec =
1530 new CompressionCodecFactory(getConfiguration())
1531 .getCodec(new Path(
1532 GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
1533 .get(getConfiguration())));
1534
1535 long t0 = System.currentTimeMillis();
1536
1537 CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
1538 @Override
1539 public Callable<Void> newCallable(int callableId) {
1540 return new Callable<Void>() {
1541
1542 @Override
1543 public Void call() throws Exception {
1544 while (!partitionIdQueue.isEmpty()) {
1545 Integer partitionId = partitionIdQueue.poll();
1546 if (partitionId == null) {
1547 break;
1548 }
1549 Path path =
1550 getSavedCheckpoint(superstep, "_" + partitionId +
1551 CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX);
1552
1553 FSDataInputStream compressedStream =
1554 getFs().open(path);
1555
1556 DataInputStream stream = codec == null ? compressedStream :
1557 new DataInputStream(
1558 codec.createInputStream(compressedStream));
1559
1560 Partition<I, V, E> partition =
1561 getConfiguration().createPartition(partitionId, getContext());
1562
1563 partition.readFields(stream);
1564
1565 getPartitionStore().addPartition(partition);
1566
1567 stream.close();
1568 }
1569 return null;
1570 }
1571
1572 };
1573 }
1574 };
1575
1576 ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
1577 "load-vertices-%d", getContext());
1578
1579 LOG.info("Loaded checkpoint in " + (System.currentTimeMillis() - t0) +
1580 " ms, using " + numThreads + " threads");
1581 }
1582
1583 @Override
1584 public VertexEdgeCount loadCheckpoint(long superstep) {
1585 Path metadataFilePath = getSavedCheckpoint(
1586 superstep, CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
1587
1588 Path checkpointFilePath = getSavedCheckpoint(
1589 superstep, CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
1590
1591
1592
1593
1594 try {
1595 DataInputStream metadataStream =
1596 getFs().open(metadataFilePath);
1597
1598 int partitions = metadataStream.readInt();
1599 List<Integer> partitionIds = new ArrayList<>(partitions);
1600 for (int i = 0; i < partitions; i++) {
1601 int partitionId = metadataStream.readInt();
1602 partitionIds.add(partitionId);
1603 }
1604
1605 loadCheckpointVertices(superstep, partitionIds);
1606
1607 getContext().progress();
1608
1609 metadataStream.close();
1610
1611 DataInputStream checkpointStream =
1612 getFs().open(checkpointFilePath);
1613 workerContext.readFields(checkpointStream);
1614
1615
1616 GlobalStats globalStats = new GlobalStats();
1617 SuperstepClasses superstepClasses = SuperstepClasses.createToRead(
1618 getConfiguration());
1619 String finalizedCheckpointPath = getSavedCheckpointBasePath(superstep) +
1620 CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
1621 DataInputStream finalizedStream =
1622 getFs().open(new Path(finalizedCheckpointPath));
1623 globalStats.readFields(finalizedStream);
1624 superstepClasses.readFields(finalizedStream);
1625 getConfiguration().updateSuperstepClasses(superstepClasses);
1626 getServerData().resetMessageStores();
1627
1628
1629
1630 for (int i = 0; i < partitions; i++) {
1631 int partitionId = checkpointStream.readInt();
1632 getServerData().getCurrentMessageStore()
1633 .readFieldsForPartition(checkpointStream, partitionId);
1634 }
1635
1636 List<Writable> w2wMessages = (List<Writable>) WritableUtils.readList(
1637 checkpointStream);
1638 getServerData().getCurrentWorkerToWorkerMessages().addAll(w2wMessages);
1639
1640 checkpointStream.close();
1641
1642 if (LOG.isInfoEnabled()) {
1643 LOG.info("loadCheckpoint: Loaded " +
1644 workerGraphPartitioner.getPartitionOwners().size() +
1645 " total.");
1646 }
1647
1648
1649
1650
1651
1652
1653 workerClient.setup(getConfiguration().authenticate());
1654
1655 return new VertexEdgeCount(globalStats.getVertexCount(),
1656 globalStats.getEdgeCount(), 0);
1657
1658 } catch (IOException e) {
1659 throw new RuntimeException(
1660 "loadCheckpoint: Failed for superstep=" + superstep, e);
1661 }
1662 }
1663
1664
1665
1666
1667
1668
1669
1670 private void sendWorkerPartitions(
1671 Map<WorkerInfo, List<Integer>> workerPartitionMap) {
1672 List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
1673 new ArrayList<Entry<WorkerInfo, List<Integer>>>(
1674 workerPartitionMap.entrySet());
1675 Collections.shuffle(randomEntryList);
1676 WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
1677 new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
1678 getConfiguration(), this,
1679 false
1680 for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
1681 randomEntryList) {
1682 for (Integer partitionId : workerPartitionList.getValue()) {
1683 Partition<I, V, E> partition =
1684 getPartitionStore().removePartition(partitionId);
1685 if (partition == null) {
1686 throw new IllegalStateException(
1687 "sendWorkerPartitions: Couldn't find partition " +
1688 partitionId + " to send to " +
1689 workerPartitionList.getKey());
1690 }
1691 if (LOG.isInfoEnabled()) {
1692 LOG.info("sendWorkerPartitions: Sending worker " +
1693 workerPartitionList.getKey() + " partition " +
1694 partitionId);
1695 }
1696 workerClientRequestProcessor.sendPartitionRequest(
1697 workerPartitionList.getKey(),
1698 partition);
1699 }
1700 }
1701
1702 try {
1703 workerClientRequestProcessor.flush();
1704 workerClient.waitAllRequests();
1705 } catch (IOException e) {
1706 throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
1707 }
1708 String myPartitionExchangeDonePath =
1709 getPartitionExchangeWorkerPath(
1710 getApplicationAttempt(), getSuperstep(), getWorkerInfo());
1711 try {
1712 getZkExt().createExt(myPartitionExchangeDonePath,
1713 null,
1714 Ids.OPEN_ACL_UNSAFE,
1715 CreateMode.PERSISTENT,
1716 true);
1717 } catch (KeeperException e) {
1718 throw new IllegalStateException(
1719 "sendWorkerPartitions: KeeperException to create " +
1720 myPartitionExchangeDonePath, e);
1721 } catch (InterruptedException e) {
1722 throw new IllegalStateException(
1723 "sendWorkerPartitions: InterruptedException to create " +
1724 myPartitionExchangeDonePath, e);
1725 }
1726 if (LOG.isInfoEnabled()) {
1727 LOG.info("sendWorkerPartitions: Done sending all my partitions.");
1728 }
1729 }
1730
1731 @Override
1732 public final void exchangeVertexPartitions(
1733 Collection<? extends PartitionOwner> masterSetPartitionOwners) {
1734
1735
1736
1737
1738
1739
1740 PartitionExchange partitionExchange =
1741 workerGraphPartitioner.updatePartitionOwners(
1742 getWorkerInfo(), masterSetPartitionOwners);
1743 workerClient.openConnections();
1744
1745 Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
1746 partitionExchange.getSendWorkerPartitionMap();
1747 if (!getPartitionStore().isEmpty()) {
1748 sendWorkerPartitions(sendWorkerPartitionMap);
1749 }
1750
1751 Set<WorkerInfo> myDependencyWorkerSet =
1752 partitionExchange.getMyDependencyWorkerSet();
1753 Set<String> workerIdSet = new HashSet<String>();
1754 for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
1755 if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
1756 throw new IllegalStateException(
1757 "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
1758 }
1759 }
1760 if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
1761 if (LOG.isInfoEnabled()) {
1762 LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
1763 "exiting early");
1764 }
1765 return;
1766 }
1767
1768 String vertexExchangePath =
1769 getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
1770 List<String> workerDoneList;
1771 try {
1772 while (true) {
1773 workerDoneList = getZkExt().getChildrenExt(
1774 vertexExchangePath, true, false, false);
1775 workerIdSet.removeAll(workerDoneList);
1776 if (workerIdSet.isEmpty()) {
1777 break;
1778 }
1779 if (LOG.isInfoEnabled()) {
1780 LOG.info("exchangeVertexPartitions: Waiting for workers " +
1781 workerIdSet);
1782 }
1783 getPartitionExchangeChildrenChangedEvent().waitForTimeoutOrFail(
1784 GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
1785 getConfiguration()));
1786 getPartitionExchangeChildrenChangedEvent().reset();
1787 }
1788 } catch (KeeperException | InterruptedException e) {
1789 throw new RuntimeException(
1790 "exchangeVertexPartitions: Got runtime exception", e);
1791 }
1792
1793 if (LOG.isInfoEnabled()) {
1794 LOG.info("exchangeVertexPartitions: Done with exchange.");
1795 }
1796 }
1797
1798
1799
1800
1801
1802
1803 public final BspEvent getPartitionExchangeChildrenChangedEvent() {
1804 return partitionExchangeChildrenChanged;
1805 }
1806
1807 @Override
1808 protected boolean processEvent(WatchedEvent event) {
1809 boolean foundEvent = false;
1810 if (event.getPath().startsWith(masterJobStatePath) &&
1811 (event.getType() == EventType.NodeChildrenChanged)) {
1812 if (LOG.isInfoEnabled()) {
1813 LOG.info("processEvent: Job state changed, checking " +
1814 "to see if it needs to restart");
1815 }
1816 JSONObject jsonObj = getJobState();
1817
1818
1819 if (getConfiguration().isPureYarnJob() && null == jsonObj) {
1820 LOG.error("BspServiceWorker#getJobState() came back NULL.");
1821 return false;
1822 }
1823 try {
1824 if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
1825 ApplicationState.START_SUPERSTEP) &&
1826 jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
1827 getApplicationAttempt()) {
1828 LOG.fatal("processEvent: Worker will restart " +
1829 "from command - " + jsonObj.toString());
1830 System.exit(-1);
1831 }
1832 } catch (JSONException e) {
1833 throw new RuntimeException(
1834 "processEvent: Couldn't properly get job state from " +
1835 jsonObj.toString());
1836 }
1837 foundEvent = true;
1838 } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
1839 event.getType() == EventType.NodeChildrenChanged) {
1840 if (LOG.isInfoEnabled()) {
1841 LOG.info("processEvent : partitionExchangeChildrenChanged " +
1842 "(at least one worker is done sending partitions)");
1843 }
1844 partitionExchangeChildrenChanged.signal();
1845 foundEvent = true;
1846 } else if (event.getPath().contains(MEMORY_OBSERVER_DIR) &&
1847 event.getType() == EventType.NodeChildrenChanged) {
1848 memoryObserver.callGc();
1849 foundEvent = true;
1850 }
1851
1852 return foundEvent;
1853 }
1854
1855 @Override
1856 public WorkerInfo getWorkerInfo() {
1857 return workerInfo;
1858 }
1859
1860 @Override
1861 public PartitionStore<I, V, E> getPartitionStore() {
1862 return getServerData().getPartitionStore();
1863 }
1864
1865 @Override
1866 public PartitionOwner getVertexPartitionOwner(I vertexId) {
1867 return workerGraphPartitioner.getPartitionOwner(vertexId);
1868 }
1869
1870 @Override
1871 public Iterable<? extends PartitionOwner> getPartitionOwners() {
1872 return workerGraphPartitioner.getPartitionOwners();
1873 }
1874
1875 @Override
1876 public int getPartitionId(I vertexId) {
1877 PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
1878 return partitionOwner.getPartitionId();
1879 }
1880
1881 @Override
1882 public boolean hasPartition(Integer partitionId) {
1883 return getPartitionStore().hasPartition(partitionId);
1884 }
1885
1886 @Override
1887 public Iterable<Integer> getPartitionIds() {
1888 return getPartitionStore().getPartitionIds();
1889 }
1890
1891 @Override
1892 public long getPartitionVertexCount(Integer partitionId) {
1893 return getPartitionStore().getPartitionVertexCount(partitionId);
1894 }
1895
1896 @Override
1897 public void startIteration() {
1898 getPartitionStore().startIteration();
1899 }
1900
1901 @Override
1902 public Partition getNextPartition() {
1903 return getPartitionStore().getNextPartition();
1904 }
1905
1906 @Override
1907 public void putPartition(Partition partition) {
1908 getPartitionStore().putPartition(partition);
1909 }
1910
1911 @Override
1912 public ServerData<I, V, E> getServerData() {
1913 return workerServer.getServerData();
1914 }
1915
1916
1917 @Override
1918 public WorkerAggregatorHandler getAggregatorHandler() {
1919 return globalCommHandler;
1920 }
1921
1922 @Override
1923 public void prepareSuperstep() {
1924 if (getSuperstep() != INPUT_SUPERSTEP) {
1925 globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
1926 }
1927 }
1928
1929 @Override
1930 public SuperstepOutput<I, V, E> getSuperstepOutput() {
1931 return superstepOutput;
1932 }
1933
1934 @Override
1935 public GlobalStats getGlobalStats() {
1936 GlobalStats globalStats = new GlobalStats();
1937 if (getSuperstep() > Math.max(INPUT_SUPERSTEP, getRestartedSuperstep())) {
1938 String superstepFinishedNode =
1939 getSuperstepFinishedPath(getApplicationAttempt(),
1940 getSuperstep() - 1);
1941 WritableUtils.readFieldsFromZnode(
1942 getZkExt(), superstepFinishedNode, false, null,
1943 globalStats);
1944 }
1945 return globalStats;
1946 }
1947
1948 @Override
1949 public WorkerInputSplitsHandler getInputSplitsHandler() {
1950 return inputSplitsHandler;
1951 }
1952
1953 @Override
1954 public void addressesAndPartitionsReceived(
1955 AddressesAndPartitionsWritable addressesAndPartitions) {
1956 addressesAndPartitionsHolder.offer(addressesAndPartitions);
1957 }
1958 }