1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.conf;
2021import io.netty.buffer.ByteBufAllocator;
22import io.netty.buffer.PooledByteBufAllocator;
23import io.netty.buffer.UnpooledByteBufAllocator;
2425import java.net.InetAddress;
26import java.net.UnknownHostException;
2728import org.apache.giraph.aggregators.AggregatorWriter;
29import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
30import org.apache.giraph.combiner.MessageCombiner;
31import org.apache.giraph.edge.OutEdges;
32import org.apache.giraph.edge.ReuseObjectsOutEdges;
33import org.apache.giraph.factories.ComputationFactory;
34import org.apache.giraph.factories.VertexValueFactory;
35import org.apache.giraph.graph.Computation;
36import org.apache.giraph.graph.MapperObserver;
37import org.apache.giraph.graph.Vertex;
38import org.apache.giraph.graph.VertexResolver;
39import org.apache.giraph.graph.VertexValueCombiner;
40import org.apache.giraph.io.EdgeInputFormat;
41import org.apache.giraph.io.EdgeOutputFormat;
42import org.apache.giraph.io.MappingInputFormat;
43import org.apache.giraph.io.VertexInputFormat;
44import org.apache.giraph.io.VertexOutputFormat;
45import org.apache.giraph.io.filters.EdgeInputFilter;
46import org.apache.giraph.io.filters.VertexInputFilter;
47import org.apache.giraph.job.GiraphJobObserver;
48import org.apache.giraph.job.GiraphJobRetryChecker;
49import org.apache.giraph.master.MasterCompute;
50import org.apache.giraph.master.MasterObserver;
51import org.apache.giraph.partition.GraphPartitionerFactory;
52import org.apache.giraph.partition.Partition;
53import org.apache.giraph.partition.ReusesObjectsPartition;
54import org.apache.giraph.utils.GcObserver;
55import org.apache.giraph.utils.ReflectionUtils;
56import org.apache.giraph.worker.WorkerContext;
57import org.apache.giraph.worker.WorkerObserver;
58import org.apache.hadoop.conf.Configuration;
59import org.apache.hadoop.mapreduce.Mapper;
60import org.apache.hadoop.net.DNS;
6162/**63 * Adds user methods specific to Giraph. This will be put into an64 * ImmutableClassesGiraphConfiguration that provides the configuration plus65 * the immutable classes.66 *67 * Keeps track of parameters which were set so it easily set them in another68 * copy of configuration.69 */70publicclassGiraphConfigurationextends Configuration
71implementsGiraphConstants {
72/** ByteBufAllocator to be used by netty */73private ByteBufAllocator nettyBufferAllocator = null;
7475/**76 * Constructor that creates the configuration77 */78publicGiraphConfiguration() {
79 configureHadoopSecurity();
80 }
8182/**83 * Constructor.84 *85 * @param conf Configuration86 */87publicGiraphConfiguration(Configuration conf) {
88super(conf);
89 configureHadoopSecurity();
90 }
9192/**93 * Get name of computation being run. We leave this up to the94 * {@link ComputationFactory} to decide what to return.95 *96 * @return Name of computation being run97 */98public String getComputationName() {
99ComputationFactory compFactory = ReflectionUtils.newInstance(
100 getComputationFactoryClass());
101return compFactory.computationName(this);
102 }
103104/**105 * Get the user's subclassed {@link ComputationFactory}106 *107 * @return User's computation factory class108 */109public Class<? extends ComputationFactory> getComputationFactoryClass() {
110return COMPUTATION_FACTORY_CLASS.get(this);
111 }
112113/**114 * Get the user's subclassed {@link Computation}115 *116 * @return User's computation class117 */118public Class<? extends Computation> getComputationClass() {
119return COMPUTATION_CLASS.get(this);
120 }
121122/**123 * Set the computation class (required)124 *125 * @param computationClass Runs vertex computation126 */127publicvoid setComputationClass(
128 Class<? extends Computation> computationClass) {
129 COMPUTATION_CLASS.set(this, computationClass);
130 }
131132/**133 * Set the vertex value factory class134 *135 * @param vertexValueFactoryClass Creates default vertex values136 */137publicfinalvoid setVertexValueFactoryClass(
138 Class<? extends VertexValueFactory> vertexValueFactoryClass) {
139 VERTEX_VALUE_FACTORY_CLASS.set(this, vertexValueFactoryClass);
140 }
141142/**143 * Set the edge input filter class144 *145 * @param edgeFilterClass class to use146 */147publicvoid setEdgeInputFilterClass(
148 Class<? extends EdgeInputFilter> edgeFilterClass) {
149 EDGE_INPUT_FILTER_CLASS.set(this, edgeFilterClass);
150 }
151152/**153 * Set the vertex input filter class154 *155 * @param vertexFilterClass class to use156 */157publicvoid setVertexInputFilterClass(
158 Class<? extends VertexInputFilter> vertexFilterClass) {
159 VERTEX_INPUT_FILTER_CLASS.set(this, vertexFilterClass);
160 }
161162/**163 * Get the vertex edges class164 *165 * @return vertex edges class166 */167public Class<? extends OutEdges> getOutEdgesClass() {
168return VERTEX_EDGES_CLASS.get(this);
169 }
170171/**172 * Set the vertex edges class173 *174 * @param outEdgesClass Determines the way edges are stored175 */176publicfinalvoid setOutEdgesClass(
177 Class<? extends OutEdges> outEdgesClass) {
178 VERTEX_EDGES_CLASS.set(this, outEdgesClass);
179 }
180181/**182 * Set the vertex implementation class183 *184 * @param vertexClass class of the vertex implementation185 */186publicfinalvoid setVertexClass(Class<? extends Vertex> vertexClass) {
187 VERTEX_CLASS.set(this, vertexClass);
188 }
189190191/**192 * Set the vertex edges class used during edge-based input (if different193 * from the one used during computation)194 *195 * @param inputOutEdgesClass Determines the way edges are stored196 */197publicfinalvoid setInputOutEdgesClass(
198 Class<? extends OutEdges> inputOutEdgesClass) {
199 INPUT_VERTEX_EDGES_CLASS.set(this, inputOutEdgesClass);
200 }
201202/**203 * True if the {@link org.apache.giraph.edge.OutEdges} implementation204 * copies the passed edges to its own data structure,205 * i.e. it doesn't keep references to Edge objects, target vertex ids or edge206 * values passed to add() or initialize().207 * This makes it possible to reuse edge objects passed to the data208 * structure, to minimize object instantiation (see for example209 * EdgeStore#addPartitionEdges()).210 *211 * @return True iff we can reuse the edge objects212 */213publicboolean reuseEdgeObjects() {
214return ReuseObjectsOutEdges.class.isAssignableFrom(
215 getOutEdgesClass());
216 }
217218/**219 * True if the {@link Partition} implementation copies the passed vertices220 * to its own data structure, i.e. it doesn't keep references to Vertex221 * objects passed to it.222 * This makes it possible to reuse vertex objects passed to the data223 * structure, to minimize object instantiation.224 *225 * @return True iff we can reuse the vertex objects226 */227publicboolean reuseVertexObjects() {
228return ReusesObjectsPartition.class.isAssignableFrom(getPartitionClass());
229 }
230231/**232 * Get Partition class used233 * @return Partition class234 */235public Class<? extends Partition> getPartitionClass() {
236return PARTITION_CLASS.get(this);
237 }
238239/**240 * Does the job have a {@link VertexInputFormat}?241 *242 * @return True iff a {@link VertexInputFormat} has been specified.243 */244publicboolean hasVertexInputFormat() {
245return VERTEX_INPUT_FORMAT_CLASS.get(this) != null;
246 }
247248/**249 * Set the vertex input format class (required)250 *251 * @param vertexInputFormatClass Determines how graph is input252 */253publicvoid setVertexInputFormatClass(
254 Class<? extends VertexInputFormat> vertexInputFormatClass) {
255 VERTEX_INPUT_FORMAT_CLASS.set(this, vertexInputFormatClass);
256 }
257258/**259 * Does the job have a {@link EdgeInputFormat}?260 *261 * @return True iff a {@link EdgeInputFormat} has been specified.262 */263publicboolean hasEdgeInputFormat() {
264return EDGE_INPUT_FORMAT_CLASS.get(this) != null;
265 }
266267/**268 * Set the edge input format class (required)269 *270 * @param edgeInputFormatClass Determines how graph is input271 */272publicvoid setEdgeInputFormatClass(
273 Class<? extends EdgeInputFormat> edgeInputFormatClass) {
274 EDGE_INPUT_FORMAT_CLASS.set(this, edgeInputFormatClass);
275 }
276277/**278 * Set the mapping input format class (optional)279 *280 * @param mappingInputFormatClass Determines how mappings are input281 */282publicvoid setMappingInputFormatClass(
283 Class<? extends MappingInputFormat> mappingInputFormatClass) {
284 MAPPING_INPUT_FORMAT_CLASS.set(this, mappingInputFormatClass);
285 }
286287/**288 * Set the master class (optional)289 *290 * @param masterComputeClass Runs master computation291 */292publicfinalvoid setMasterComputeClass(
293 Class<? extends MasterCompute> masterComputeClass) {
294 MASTER_COMPUTE_CLASS.set(this, masterComputeClass);
295 }
296297/**298 * Add a MasterObserver class (optional)299 *300 * @param masterObserverClass MasterObserver class to add.301 */302publicfinalvoid addMasterObserverClass(
303 Class<? extends MasterObserver> masterObserverClass) {
304 MASTER_OBSERVER_CLASSES.add(this, masterObserverClass);
305 }
306307/**308 * Add a WorkerObserver class (optional)309 *310 * @param workerObserverClass WorkerObserver class to add.311 */312publicfinalvoid addWorkerObserverClass(
313 Class<? extends WorkerObserver> workerObserverClass) {
314 WORKER_OBSERVER_CLASSES.add(this, workerObserverClass);
315 }
316317/**318 * Add a MapperObserver class (optional)319 *320 * @param mapperObserverClass MapperObserver class to add.321 */322publicfinalvoid addMapperObserverClass(
323 Class<? extends MapperObserver> mapperObserverClass) {
324 MAPPER_OBSERVER_CLASSES.add(this, mapperObserverClass);
325 }
326327/**328 * Add a GcObserver class (optional)329 *330 * @param gcObserverClass GcObserver class to add.331 */332publicfinalvoid addGcObserverClass(
333 Class<? extends GcObserver> gcObserverClass) {
334 GC_OBSERVER_CLASSES.add(this, gcObserverClass);
335 }
336337/**338 * Get job observer class339 *340 * @return GiraphJobObserver class set.341 */342public Class<? extends GiraphJobObserver> getJobObserverClass() {
343return JOB_OBSERVER_CLASS.get(this);
344 }
345346/**347 * Set job observer class348 *349 * @param klass GiraphJobObserver class to set.350 */351publicvoid setJobObserverClass(Class<? extends GiraphJobObserver> klass) {
352 JOB_OBSERVER_CLASS.set(this, klass);
353 }
354355/**356 * Get job retry checker class357 *358 * @return GiraphJobRetryChecker class set.359 */360public Class<? extends GiraphJobRetryChecker> getJobRetryCheckerClass() {
361return JOB_RETRY_CHECKER_CLASS.get(this);
362 }
363364/**365 * Set job retry checker class366 *367 * @param klass GiraphJobRetryChecker class to set.368 */369publicvoid setJobRetryCheckerClass(
370 Class<? extends GiraphJobRetryChecker> klass) {
371 JOB_RETRY_CHECKER_CLASS.set(this, klass);
372 }
373374/**375 * Check whether to enable jmap dumping thread.376 *377 * @return true if jmap dumper is enabled.378 */379publicboolean isJMapHistogramDumpEnabled() {
380return JMAP_ENABLE.get(this);
381 }
382383/**384 * Check whether to enable heap memory supervisor thread385 *386 * @return true if jmap dumper is reactively enabled387 */388publicboolean isReactiveJmapHistogramDumpEnabled() {
389return REACTIVE_JMAP_ENABLE.get(this);
390 }
391392/**393 * Set mapping from a key name to a list of classes.394 *395 * @param name String key name to use.396 * @param xface interface of the classes being set.397 * @param klasses Classes to set.398 */399publicfinalvoid setClasses(String name, Class<?> xface,
400 Class<?> ... klasses) {
401 String[] klassNames = new String[klasses.length];
402for (int i = 0; i < klasses.length; ++i) {
403 Class<?> klass = klasses[i];
404if (!xface.isAssignableFrom(klass)) {
405thrownew RuntimeException(klass + " does not implement " +
406 xface.getName());
407 }
408 klassNames[i] = klasses[i].getName();
409 }
410 setStrings(name, klassNames);
411 }
412413/**414 * Does the job have a {@link VertexOutputFormat}?415 *416 * @return True iff a {@link VertexOutputFormat} has been specified.417 */418publicboolean hasVertexOutputFormat() {
419return VERTEX_OUTPUT_FORMAT_CLASS.get(this) != null;
420 }
421422/**423 * Set the vertex output format class (optional)424 *425 * @param vertexOutputFormatClass Determines how graph is output426 */427publicfinalvoid setVertexOutputFormatClass(
428 Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
429 VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
430 }
431432433/**434 * Does the job have a {@link EdgeOutputFormat} subdir?435 *436 * @return True iff a {@link EdgeOutputFormat} subdir has been specified.437 */438publicboolean hasVertexOutputFormatSubdir() {
439return !VERTEX_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
440 }
441442/**443 * Set the vertex output format path444 *445 * @param path path where the verteces will be written446 */447publicfinalvoid setVertexOutputFormatSubdir(String path) {
448 VERTEX_OUTPUT_FORMAT_SUBDIR.set(this, path);
449 }
450451/**452 * Check if output should be done during computation453 *454 * @return True iff output should be done during computation455 */456publicfinalboolean doOutputDuringComputation() {
457return DO_OUTPUT_DURING_COMPUTATION.get(this);
458 }
459460/**461 * Set whether or not we should do output during computation462 *463 * @param doOutputDuringComputation True iff we want output to happen464 * during computation465 */466publicfinalvoid setDoOutputDuringComputation(
467boolean doOutputDuringComputation) {
468 DO_OUTPUT_DURING_COMPUTATION.set(this, doOutputDuringComputation);
469 }
470471/**472 * Check if VertexOutputFormat is thread-safe473 *474 * @return True iff VertexOutputFormat is thread-safe475 */476publicfinalboolean vertexOutputFormatThreadSafe() {
477return VERTEX_OUTPUT_FORMAT_THREAD_SAFE.get(this);
478 }
479480/**481 * Set whether or not selected VertexOutputFormat is thread-safe482 *483 * @param vertexOutputFormatThreadSafe True iff selected VertexOutputFormat484 * is thread-safe485 */486publicfinalvoid setVertexOutputFormatThreadSafe(
487boolean vertexOutputFormatThreadSafe) {
488 VERTEX_OUTPUT_FORMAT_THREAD_SAFE.set(this, vertexOutputFormatThreadSafe);
489 }
490491/**492 * Does the job have a {@link EdgeOutputFormat}?493 *494 * @return True iff a {@link EdgeOutputFormat} has been specified.495 */496publicboolean hasEdgeOutputFormat() {
497return EDGE_OUTPUT_FORMAT_CLASS.get(this) != null;
498 }
499500/**501 * Set the edge output format class (optional)502 *503 * @param edgeOutputFormatClass Determines how graph is output504 */505publicfinalvoid setEdgeOutputFormatClass(
506 Class<? extends EdgeOutputFormat> edgeOutputFormatClass) {
507 EDGE_OUTPUT_FORMAT_CLASS.set(this, edgeOutputFormatClass);
508 }
509510/**511 * Does the job have a {@link EdgeOutputFormat} subdir?512 *513 * @return True iff a {@link EdgeOutputFormat} subdir has been specified.514 */515publicboolean hasEdgeOutputFormatSubdir() {
516return !EDGE_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
517 }
518519/**520 * Set the edge output format path521 *522 * @param path path where the edges will be written523 */524publicfinalvoid setEdgeOutputFormatSubdir(String path) {
525 EDGE_OUTPUT_FORMAT_SUBDIR.set(this, path);
526 }
527528/**529 * Get the number of threads to use for writing output in the end of the530 * application. If output format is not thread safe, returns 1.531 *532 * @return Number of output threads533 */534publicfinalint getNumOutputThreads() {
535if (!vertexOutputFormatThreadSafe()) {
536return 1;
537 } else {
538return NUM_OUTPUT_THREADS.get(this);
539 }
540 }
541542/**543 * Set the number of threads to use for writing output in the end of the544 * application. Will be used only if {#vertexOutputFormatThreadSafe} is true.545 *546 * @param numOutputThreads Number of output threads547 */548publicvoid setNumOutputThreads(int numOutputThreads) {
549 NUM_OUTPUT_THREADS.set(this, numOutputThreads);
550 }
551552/**553 * Set the message combiner class (optional)554 *555 * @param messageCombinerClass Determines how vertex messages are combined556 */557publicvoid setMessageCombinerClass(
558 Class<? extends MessageCombiner> messageCombinerClass) {
559 MESSAGE_COMBINER_CLASS.set(this, messageCombinerClass);
560 }
561562/**563 * Set the graph partitioner class (optional)564 *565 * @param graphPartitionerFactoryClass Determines how the graph is partitioned566 */567publicfinalvoid setGraphPartitionerFactoryClass(
568 Class<? extends GraphPartitionerFactory> graphPartitionerFactoryClass) {
569 GRAPH_PARTITIONER_FACTORY_CLASS.set(this, graphPartitionerFactoryClass);
570 }
571572/**573 * Set the vertex resolver class (optional)574 *575 * @param vertexResolverClass Determines how vertex mutations are resolved576 */577publicfinalvoid setVertexResolverClass(
578 Class<? extends VertexResolver> vertexResolverClass) {
579 VERTEX_RESOLVER_CLASS.set(this, vertexResolverClass);
580 }
581582/**583 * Whether to create a vertex that doesn't exist when it receives messages.584 * This only affects DefaultVertexResolver.585 *586 * @return true if we should create non existent vertices that get messages.587 */588publicfinalboolean getResolverCreateVertexOnMessages() {
589return RESOLVER_CREATE_VERTEX_ON_MSGS.get(this);
590 }
591592/**593 * Set whether to create non existent vertices when they receive messages.594 *595 * @param v true if we should create vertices when they get messages.596 */597publicfinalvoid setResolverCreateVertexOnMessages(boolean v) {
598 RESOLVER_CREATE_VERTEX_ON_MSGS.set(this, v);
599 }
600601/**602 * Set the vertex value combiner class (optional)603 *604 * @param vertexValueCombinerClass Determines how vertices are combined605 */606publicfinalvoid setVertexValueCombinerClass(
607 Class<? extends VertexValueCombiner> vertexValueCombinerClass) {
608 VERTEX_VALUE_COMBINER_CLASS.set(this, vertexValueCombinerClass);
609 }
610611/**612 * Set the worker context class (optional)613 *614 * @param workerContextClass Determines what code is executed on a each615 * worker before and after each superstep and computation616 */617publicfinalvoid setWorkerContextClass(
618 Class<? extends WorkerContext> workerContextClass) {
619 WORKER_CONTEXT_CLASS.set(this, workerContextClass);
620 }
621622/**623 * Set the aggregator writer class (optional)624 *625 * @param aggregatorWriterClass Determines how the aggregators are626 * written to file at the end of the job627 */628publicfinalvoid setAggregatorWriterClass(
629 Class<? extends AggregatorWriter> aggregatorWriterClass) {
630 AGGREGATOR_WRITER_CLASS.set(this, aggregatorWriterClass);
631 }
632633/**634 * Set the partition class (optional)635 *636 * @param partitionClass Determines the why partitions are stored637 */638publicfinalvoid setPartitionClass(
639 Class<? extends Partition> partitionClass) {
640 PARTITION_CLASS.set(this, partitionClass);
641 }
642643/**644 * Set worker configuration for determining what is required for645 * a superstep.646 *647 * @param minWorkers Minimum workers to do a superstep648 * @param maxWorkers Maximum workers to do a superstep649 * (max map tasks in job)650 * @param minPercentResponded 0 - 100 % of the workers required to651 * have responded before continuing the superstep652 */653publicfinalvoid setWorkerConfiguration(int minWorkers,
654int maxWorkers,
655float minPercentResponded) {
656 setInt(MIN_WORKERS, minWorkers);
657 setInt(MAX_WORKERS, maxWorkers);
658 MIN_PERCENT_RESPONDED.set(this, minPercentResponded);
659 }
660661publicfinalint getMinWorkers() {
662return getInt(MIN_WORKERS, -1);
663 }
664665publicfinalint getMaxWorkers() {
666return getInt(MAX_WORKERS, -1);
667 }
668669publicfinalfloat getMinPercentResponded() {
670return MIN_PERCENT_RESPONDED.get(this);
671 }
672673/**674 * How many mappers is job asking for, taking into account whether master675 * is running on the same mapper as worker or not676 *677 * @return How many mappers is job asking for678 */679publicfinalint getMaxMappers() {
680return getMaxWorkers() + (SPLIT_MASTER_WORKER.get(this) ? 1 : 0);
681 }
682683/**684 * Utilize an existing ZooKeeper service. If this is not set, ZooKeeper685 * will be dynamically started by Giraph for this job.686 *687 * @param serverList Comma separated list of servers and ports688 * (i.e. zk1:2221,zk2:2221)689 */690publicfinalvoid setZooKeeperConfiguration(String serverList) {
691 ZOOKEEPER_LIST.set(this, serverList);
692 }
693694/**695 * Getter for SPLIT_MASTER_WORKER flag.696 *697 * @return boolean flag value.698 */699publicfinalboolean getSplitMasterWorker() {
700return SPLIT_MASTER_WORKER.get(this);
701 }
702703/**704 * Get array of MasterObserver classes set in the configuration.705 *706 * @return array of MasterObserver classes.707 */708public Class<? extends MasterObserver>[] getMasterObserverClasses() {
709return MASTER_OBSERVER_CLASSES.getArray(this);
710 }
711712/**713 * Get array of WorkerObserver classes set in configuration.714 *715 * @return array of WorkerObserver classes.716 */717public Class<? extends WorkerObserver>[] getWorkerObserverClasses() {
718return WORKER_OBSERVER_CLASSES.getArray(this);
719 }
720721/**722 * Get array of MapperObserver classes set in configuration.723 *724 * @return array of MapperObserver classes.725 */726public Class<? extends MapperObserver>[] getMapperObserverClasses() {
727return MAPPER_OBSERVER_CLASSES.getArray(this);
728 }
729730/**731 * Get array of GcObserver classes set in configuration.732 *733 * @return array of GcObserver classes.734 */735public Class<? extends GcObserver>[] getGcObserverClasses() {
736return GC_OBSERVER_CLASSES.getArray(this);
737 }
738739/**740 * Whether to track, print, and aggregate metrics.741 *742 * @return true if metrics are enabled, false otherwise (default)743 */744publicboolean metricsEnabled() {
745return METRICS_ENABLE.isTrue(this);
746 }
747748/**749 * Get the task partition750 *751 * @return The task partition or -1 if not set752 */753publicint getTaskPartition() {
754return getInt("mapred.task.partition", -1);
755 }
756757/**758 * Is this a "pure YARN" Giraph job, or is a MapReduce layer (v1 or v2)759 * actually managing our cluster nodes, i.e. each task is a Mapper.760 *761 * @return TRUE if this is a pure YARN job.762 */763publicboolean isPureYarnJob() {
764return IS_PURE_YARN_JOB.get(this);
765 }
766767/**768 * Jars required in "Pure YARN" jobs (names only, no paths) should769 * be listed here in full, including Giraph framework jar(s).770 *771 * @return the comma-separated list of jar names for export to cluster.772 */773public String getYarnLibJars() {
774return GIRAPH_YARN_LIBJARS.get(this);
775 }
776777/**778 * Populate jar list for Pure YARN jobs.779 *780 * @param jarList a comma-separated list of jar names781 */782publicvoid setYarnLibJars(String jarList) {
783 GIRAPH_YARN_LIBJARS.set(this, jarList);
784 }
785786/**787 * Get heap size (in MB) for each task in our Giraph job run,788 * assuming this job will run on the "pure YARN" profile.789 *790 * @return the heap size for all tasks, in MB791 */792publicint getYarnTaskHeapMb() {
793return GIRAPH_YARN_TASK_HEAP_MB.get(this);
794 }
795796/**797 * Set heap size for Giraph tasks in our job run, assuming798 * the job will run on the "pure YARN" profile.799 *800 * @param heapMb the heap size for all tasks801 */802publicvoid setYarnTaskHeapMb(int heapMb) {
803 GIRAPH_YARN_TASK_HEAP_MB.set(this, heapMb);
804 }
805806/**807 * Get the ZooKeeper list.808 *809 * @return ZooKeeper list of strings, comma separated or null if none set.810 */811public String getZookeeperList() {
812return ZOOKEEPER_LIST.get(this);
813 }
814815/**816 * Set the ZooKeeper list to the provided list. This method is used when the817 * ZooKeeper is started internally and will set the zkIsExternal option to818 * false as well.819 *820 * @param zkList list of strings, comma separated of zookeeper servers821 */822publicvoid setZookeeperList(String zkList) {
823 ZOOKEEPER_LIST.set(this, zkList);
824 ZOOKEEPER_IS_EXTERNAL.set(this, false);
825 }
826827/**828 * Was ZooKeeper provided externally?829 *830 * @return true iff was zookeeper is external831 */832publicboolean isZookeeperExternal() {
833return ZOOKEEPER_IS_EXTERNAL.get(this);
834 }
835836public String getLocalLevel() {
837return LOG_LEVEL.get(this);
838 }
839840/**841 * Use the log thread layout option?842 *843 * @return True if use the log thread layout option, false otherwise844 */845publicboolean useLogThreadLayout() {
846return LOG_THREAD_LAYOUT.get(this);
847 }
848849/**850 * is this job run a local test?851 *852 * @return the test status as recorded in the Configuration853 */854publicboolean getLocalTestMode() {
855return LOCAL_TEST_MODE.get(this);
856 }
857858/**859 * Flag this job as a local test run.860 *861 * @param flag the test status for this job862 */863publicvoid setLocalTestMode(boolean flag) {
864 LOCAL_TEST_MODE.set(this, flag);
865 }
866867publicint getZooKeeperSessionTimeout() {
868return ZOOKEEPER_SESSION_TIMEOUT.get(this);
869 }
870871publicint getZookeeperOpsMaxAttempts() {
872return ZOOKEEPER_OPS_MAX_ATTEMPTS.get(this);
873 }
874875publicint getZookeeperOpsRetryWaitMsecs() {
876return ZOOKEEPER_OPS_RETRY_WAIT_MSECS.get(this);
877 }
878879publicboolean getNettyServerUseExecutionHandler() {
880return NETTY_SERVER_USE_EXECUTION_HANDLER.get(this);
881 }
882883publicint getNettyServerThreads() {
884return NETTY_SERVER_THREADS.get(this);
885 }
886887publicint getNettyServerExecutionThreads() {
888return NETTY_SERVER_EXECUTION_THREADS.get(this);
889 }
890891/**892 * Get the netty server execution concurrency. This depends on whether the893 * netty server execution handler exists.894 *895 * @return Server concurrency896 */897publicint getNettyServerExecutionConcurrency() {
898if (getNettyServerUseExecutionHandler()) {
899return getNettyServerExecutionThreads();
900 } else {
901return getNettyServerThreads();
902 }
903 }
904905/**906 * Used by netty client and server to create ByteBufAllocator907 *908 * @return ByteBufAllocator909 */910public ByteBufAllocator getNettyAllocator() {
911if (nettyBufferAllocator == null) {
912if (NETTY_USE_POOLED_ALLOCATOR.get(this)) { // Use pooled allocator913 nettyBufferAllocator = new PooledByteBufAllocator(
914 NETTY_USE_DIRECT_MEMORY.get(this));
915 } else { // Use un-pooled allocator916// Note: Current default settings create un-pooled heap allocator917 nettyBufferAllocator = new UnpooledByteBufAllocator(
918 NETTY_USE_DIRECT_MEMORY.get(this));
919 }
920 }
921return nettyBufferAllocator;
922 }
923924publicint getZookeeperConnectionAttempts() {
925return ZOOKEEPER_CONNECTION_ATTEMPTS.get(this);
926 }
927928publicint getZooKeeperMinSessionTimeout() {
929return ZOOKEEPER_MIN_SESSION_TIMEOUT.get(this);
930 }
931932publicint getZooKeeperMaxSessionTimeout() {
933return ZOOKEEPER_MAX_SESSION_TIMEOUT.get(this);
934 }
935936/**937 * Get the number of map tasks in this job938 *939 * @return Number of map tasks in this job940 */941publicint getMapTasks() {
942int mapTasks = getInt("mapred.map.tasks", -1);
943if (mapTasks == -1) {
944thrownew IllegalStateException("getMapTasks: Failed to get the map " +
945"tasks!");
946 }
947return mapTasks;
948 }
949950/**951 * Use authentication? (if supported)952 *953 * @return True if should authenticate, false otherwise954 */955publicboolean authenticate() {
956return AUTHENTICATE.get(this);
957 }
958959/**960 * Set the number of compute threads961 *962 * @param numComputeThreads Number of compute threads to use963 */964publicvoid setNumComputeThreads(int numComputeThreads) {
965 NUM_COMPUTE_THREADS.set(this, numComputeThreads);
966 }
967968publicint getNumComputeThreads() {
969return NUM_COMPUTE_THREADS.get(this);
970 }
971972/**973 * Set the number of input split threads974 *975 * @param numInputSplitsThreads Number of input split threads to use976 */977publicvoid setNumInputSplitsThreads(int numInputSplitsThreads) {
978 NUM_INPUT_THREADS.set(this, numInputSplitsThreads);
979 }
980981publicint getNumInputSplitsThreads() {
982return NUM_INPUT_THREADS.get(this);
983 }
984985publiclong getInputSplitMaxVertices() {
986return INPUT_SPLIT_MAX_VERTICES.get(this);
987 }
988989publiclong getInputSplitMaxEdges() {
990return INPUT_SPLIT_MAX_EDGES.get(this);
991 }
992993/**994 * Set whether to use unsafe serialization995 *996 * @param useUnsafeSerialization If true, use unsafe serialization997 */998publicvoid useUnsafeSerialization(boolean useUnsafeSerialization) {
999 USE_UNSAFE_SERIALIZATION.set(this, useUnsafeSerialization);
1000 }
10011002/**1003 * Set the checkpoint frequeuncy of how many supersteps to wait before1004 * checkpointing1005 *1006 * @param checkpointFrequency How often to checkpoint (0 means never)1007 */1008publicvoid setCheckpointFrequency(int checkpointFrequency) {
1009 CHECKPOINT_FREQUENCY.set(this, checkpointFrequency);
1010 }
10111012/**1013 * Get the checkpoint frequeuncy of how many supersteps to wait1014 * before checkpointing1015 *1016 * @return Checkpoint frequency (0 means never)1017 */1018publicint getCheckpointFrequency() {
1019return CHECKPOINT_FREQUENCY.get(this);
1020 }
10211022/**1023 * Check if checkpointing is used1024 *1025 * @return True iff checkpointing is used1026 */1027publicboolean useCheckpointing() {
1028return getCheckpointFrequency() != 0;
1029 }
10301031/**1032 * Set runtime checkpoint support checker.1033 * The instance of this class will have to decide whether1034 * checkpointing is allowed on current superstep.1035 *1036 * @param clazz checkpoint supported checker class1037 */1038publicvoid setCheckpointSupportedChecker(
1039 Class<? extends CheckpointSupportedChecker> clazz) {
1040 GiraphConstants.CHECKPOINT_SUPPORTED_CHECKER.set(this, clazz);
1041 }
10421043/**1044 * Set the max task attempts1045 *1046 * @param maxTaskAttempts Max task attempts to use1047 */1048publicvoid setMaxTaskAttempts(int maxTaskAttempts) {
1049 MAX_TASK_ATTEMPTS.set(this, maxTaskAttempts);
1050 }
10511052/**1053 * Get the max task attempts1054 *1055 * @return Max task attempts or -1, if not set1056 */1057publicint getMaxTaskAttempts() {
1058return MAX_TASK_ATTEMPTS.get(this);
1059 }
10601061/**1062 * Get the number of milliseconds to wait for an event before continuing on1063 *1064 * @return Number of milliseconds to wait for an event before continuing on1065 */1066publicint getEventWaitMsecs() {
1067return EVENT_WAIT_MSECS.get(this);
1068 }
10691070/**1071 * Set the number of milliseconds to wait for an event before continuing on1072 *1073 * @param eventWaitMsecs Number of milliseconds to wait for an event before1074 * continuing on1075 */1076publicvoid setEventWaitMsecs(int eventWaitMsecs) {
1077 EVENT_WAIT_MSECS.set(this, eventWaitMsecs);
1078 }
10791080/**1081 * Get the maximum milliseconds to wait before giving up trying to get the1082 * minimum number of workers before a superstep.1083 *1084 * @return Maximum milliseconds to wait before giving up trying to get the1085 * minimum number of workers before a superstep1086 */1087publicint getMaxMasterSuperstepWaitMsecs() {
1088return MAX_MASTER_SUPERSTEP_WAIT_MSECS.get(this);
1089 }
10901091publicint getMaxCounterWaitMsecs() {
1092return MAX_COUNTER_WAIT_MSECS.get(this);
1093 }
10941095/**1096 * Set the maximum milliseconds to wait before giving up trying to get the1097 * minimum number of workers before a superstep.1098 *1099 * @param maxMasterSuperstepWaitMsecs Maximum milliseconds to wait before1100 * giving up trying to get the minimum1101 * number of workers before a superstep1102 */1103publicvoid setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) {
1104 MAX_MASTER_SUPERSTEP_WAIT_MSECS.set(this, maxMasterSuperstepWaitMsecs);
1105 }
11061107/**1108 * Check environment for Hadoop security token location in case we are1109 * executing the Giraph job on a MRv1 Hadoop cluster.1110 */1111publicvoid configureHadoopSecurity() {
1112 String hadoopTokenFilePath = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
1113if (hadoopTokenFilePath != null) {
1114 set("mapreduce.job.credentials.binary", hadoopTokenFilePath);
1115 }
1116 }
11171118/**1119 * Check if we want to prioritize input splits which reside on the host.1120 *1121 * @return True iff we want to use input split locality1122 */1123publicboolean useInputSplitLocality() {
1124return USE_INPUT_SPLIT_LOCALITY.get(this);
1125 }
11261127/**1128 * Get the local hostname on the given interface.1129 *1130 * @return The local hostname1131 * @throws UnknownHostException IP address of a host could not be determined1132 */1133public String getLocalHostname() throws UnknownHostException {
1134return DNS.getDefaultHost(
1135 GiraphConstants.DNS_INTERFACE.get(this),
1136 GiraphConstants.DNS_NAMESERVER.get(this)).toLowerCase();
1137 }
11381139/**1140 * Return local host name by default. Or local host IP if preferIP1141 * option is set.1142 * @return local host name or IP1143 * @throws UnknownHostException IP address of a host could not be determined1144 */1145public String getLocalHostOrIp() throws UnknownHostException {
1146if (GiraphConstants.PREFER_IP_ADDRESSES.get(this)) {
1147return InetAddress.getLocalHost().getHostAddress();
1148 }
1149return getLocalHostname();
1150 }
11511152/**1153 * Set the maximum number of supersteps of this application. After this1154 * many supersteps are executed, the application will shutdown.1155 *1156 * @param maxNumberOfSupersteps Maximum number of supersteps1157 */1158publicvoid setMaxNumberOfSupersteps(int maxNumberOfSupersteps) {
1159 MAX_NUMBER_OF_SUPERSTEPS.set(this, maxNumberOfSupersteps);
1160 }
11611162/**1163 * Get the maximum number of supersteps of this application. After this1164 * many supersteps are executed, the application will shutdown.1165 *1166 * @return Maximum number of supersteps1167 */1168publicint getMaxNumberOfSupersteps() {
1169return MAX_NUMBER_OF_SUPERSTEPS.get(this);
1170 }
11711172/**1173 * Get the output directory to write YourKit snapshots to1174 *1175 * @param context Map context1176 * @return output directory1177 */1178public String getYourKitOutputDir(Mapper.Context context) {
1179final String cacheKey = "giraph.yourkit.outputDirCached";
1180 String outputDir = get(cacheKey);
1181if (outputDir == null) {
1182 outputDir = getStringVars(YOURKIT_OUTPUT_DIR, YOURKIT_OUTPUT_DIR_DEFAULT,
1183 context);
1184 set(cacheKey, outputDir);
1185 }
1186return outputDir;
1187 }
11881189/**1190 * Get string, replacing variables in the output.1191 *1192 * %JOB_ID% => job id1193 * %TASK_ID% => task id1194 * %USER% => owning user name1195 *1196 * @param key name of key to lookup1197 * @param context mapper context1198 * @return value for key, with variables expanded1199 */1200public String getStringVars(String key, Mapper.Context context) {
1201return getStringVars(key, null, context);
1202 }
12031204/**1205 * Get string, replacing variables in the output.1206 *1207 * %JOB_ID% => job id1208 * %TASK_ID% => task id1209 * %USER% => owning user name1210 *1211 * @param key name of key to lookup1212 * @param defaultValue value to return if no mapping exists. This can also1213 * have variables, which will be substituted.1214 * @param context mapper context1215 * @return value for key, with variables expanded1216 */1217public String getStringVars(String key, String defaultValue,
1218 Mapper.Context context) {
1219 String value = get(key);
1220if (value == null) {
1221if (defaultValue == null) {
1222returnnull;
1223 }
1224 value = defaultValue;
1225 }
1226 value = value.replace("%JOB_ID%", context.getJobID().toString());
1227 value = value.replace("%TASK_ID%", context.getTaskAttemptID().toString());
1228 value = value.replace("%USER%", get("user.name", "unknown_user"));
1229return value;
1230 }
12311232/**1233 * Get option whether to create a source vertex present only in edge input1234 *1235 * @return CREATE_EDGE_SOURCE_VERTICES option1236 */1237publicboolean getCreateSourceVertex() {
1238return CREATE_EDGE_SOURCE_VERTICES.get(this);
1239 }
12401241/**1242 * set option whether to create a source vertex present only in edge input1243 * @param createVertex create source vertex option1244 */1245publicvoid setCreateSourceVertex(boolean createVertex) {
1246 CREATE_EDGE_SOURCE_VERTICES.set(this, createVertex);
1247 }
12481249/**1250 * Get the maximum timeout (in milliseconds) for waiting for all tasks1251 * to complete after the job is done.1252 *1253 * @return Wait task done timeout in milliseconds.1254 */1255publicint getWaitTaskDoneTimeoutMs() {
1256return WAIT_TASK_DONE_TIMEOUT_MS.get(this);
1257 }
12581259/**1260 * Set the maximum timeout (in milliseconds) for waiting for all tasks1261 * to complete after the job is done.1262 *1263 * @param ms Milliseconds to set1264 */1265publicvoid setWaitTaskDoneTimeoutMs(int ms) {
1266 WAIT_TASK_DONE_TIMEOUT_MS.set(this, ms);
1267 }
12681269/**1270 * Check whether to track job progress on client or not1271 *1272 * @return True if job progress should be tracked on client1273 */1274publicboolean trackJobProgressOnClient() {
1275return TRACK_JOB_PROGRESS_ON_CLIENT.get(this);
1276 }
12771278/**1279 * @return Number of retries when creating an HDFS file before failing.1280 */1281publicint getHdfsFileCreationRetries() {
1282return HDFS_FILE_CREATION_RETRIES.get(this);
1283 }
12841285/**1286 * @return Milliseconds to wait before retrying an HDFS file creation1287 * operation.1288 */1289publicint getHdfsFileCreationRetryWaitMs() {
1290return HDFS_FILE_CREATION_RETRY_WAIT_MS.get(this);
1291 }
1292 }