1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.conf;
1920import org.apache.giraph.aggregators.AggregatorWriter;
21import org.apache.giraph.aggregators.TextAggregatorWriter;
22import org.apache.giraph.bsp.BspOutputFormat;
23import org.apache.giraph.bsp.checkpoints.CheckpointSupportedChecker;
24import org.apache.giraph.bsp.checkpoints.DefaultCheckpointSupportedChecker;
25import org.apache.giraph.combiner.MessageCombiner;
26import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
27import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
28import org.apache.giraph.comm.messages.MessageStoreFactory;
29import org.apache.giraph.edge.ByteArrayEdges;
30import org.apache.giraph.edge.DefaultCreateSourceVertexCallback;
31import org.apache.giraph.edge.CreateSourceVertexCallback;
32import org.apache.giraph.edge.EdgeStoreFactory;
33import org.apache.giraph.edge.InMemoryEdgeStoreFactory;
34import org.apache.giraph.edge.OutEdges;
35import org.apache.giraph.factories.ComputationFactory;
36import org.apache.giraph.factories.DefaultComputationFactory;
37import org.apache.giraph.factories.DefaultEdgeValueFactory;
38import org.apache.giraph.factories.DefaultInputOutEdgesFactory;
39import org.apache.giraph.factories.DefaultMessageValueFactory;
40import org.apache.giraph.factories.DefaultOutEdgesFactory;
41import org.apache.giraph.factories.DefaultVertexIdFactory;
42import org.apache.giraph.factories.DefaultVertexValueFactory;
43import org.apache.giraph.factories.EdgeValueFactory;
44import org.apache.giraph.factories.MessageValueFactory;
45import org.apache.giraph.factories.OutEdgesFactory;
46import org.apache.giraph.factories.VertexIdFactory;
47import org.apache.giraph.factories.VertexValueFactory;
48import org.apache.giraph.graph.Computation;
49import org.apache.giraph.graph.DefaultVertex;
50import org.apache.giraph.graph.DefaultVertexResolver;
51import org.apache.giraph.graph.DefaultVertexValueCombiner;
52import org.apache.giraph.graph.JobProgressTrackerClient;
53import org.apache.giraph.graph.Language;
54import org.apache.giraph.graph.MapperObserver;
55import org.apache.giraph.graph.RetryableJobProgressTrackerClient;
56import org.apache.giraph.graph.Vertex;
57import org.apache.giraph.graph.VertexResolver;
58import org.apache.giraph.graph.VertexValueCombiner;
59import org.apache.giraph.io.EdgeInputFormat;
60import org.apache.giraph.io.EdgeOutputFormat;
61import org.apache.giraph.io.MappingInputFormat;
62import org.apache.giraph.io.VertexInputFormat;
63import org.apache.giraph.io.VertexOutputFormat;
64import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
65import org.apache.giraph.io.filters.DefaultVertexInputFilter;
66import org.apache.giraph.io.filters.EdgeInputFilter;
67import org.apache.giraph.io.filters.VertexInputFilter;
68import org.apache.giraph.job.DefaultGiraphJobRetryChecker;
69import org.apache.giraph.job.DefaultJobObserver;
70import org.apache.giraph.job.DefaultJobProgressTrackerService;
71import org.apache.giraph.job.GiraphJobObserver;
72import org.apache.giraph.job.GiraphJobRetryChecker;
73import org.apache.giraph.job.HaltApplicationUtils;
74import org.apache.giraph.job.JobProgressTrackerService;
75import org.apache.giraph.mapping.MappingStore;
76import org.apache.giraph.mapping.MappingStoreOps;
77import org.apache.giraph.mapping.translate.TranslateEdge;
78import org.apache.giraph.master.DefaultMasterCompute;
79import org.apache.giraph.master.MasterCompute;
80import org.apache.giraph.master.MasterObserver;
81import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
82import org.apache.giraph.ooc.persistence.LocalDiskDataAccessor;
83import org.apache.giraph.ooc.policy.MemoryEstimatorOracle;
84import org.apache.giraph.ooc.policy.OutOfCoreOracle;
85import org.apache.giraph.partition.GraphPartitionerFactory;
86import org.apache.giraph.partition.HashPartitionerFactory;
87import org.apache.giraph.partition.Partition;
88import org.apache.giraph.partition.SimplePartition;
89import org.apache.giraph.utils.GcObserver;
90import org.apache.giraph.worker.DefaultWorkerContext;
91import org.apache.giraph.worker.WorkerContext;
92import org.apache.giraph.worker.WorkerObserver;
93import org.apache.hadoop.io.Writable;
94import org.apache.hadoop.io.WritableComparable;
95import org.apache.hadoop.mapreduce.OutputFormat;
9697importstatic java.util.concurrent.TimeUnit.HOURS;
98importstatic java.util.concurrent.TimeUnit.MINUTES;
99importstatic java.util.concurrent.TimeUnit.SECONDS;
100101/**102 * Constants used all over Giraph for configuration.103 */104// CHECKSTYLE: stop InterfaceIsTypeCheck105publicinterfaceGiraphConstants {
106/** 1KB in bytes */107int ONE_KB = 1024;
108/** 1MB in bytes */109int ONE_MB = 1024 * 1024;
110111/** Mapping related information */112 ClassConfOption<MappingStore> MAPPING_STORE_CLASS =
113 ClassConfOption.create("giraph.mappingStoreClass", null,
114 MappingStore.class, "MappingStore Class");
115116/** Class to use for performing read operations on mapping store */117 ClassConfOption<MappingStoreOps> MAPPING_STORE_OPS_CLASS =
118 ClassConfOption.create("giraph.mappingStoreOpsClass", null,
119 MappingStoreOps.class, "MappingStoreOps class");
120121/** Upper value of LongByteMappingStore */122IntConfOption LB_MAPPINGSTORE_UPPER =
123newIntConfOption("giraph.lbMappingStoreUpper", -1,
124"'upper' value used by lbmappingstore");
125/** Lower value of LongByteMappingStore */126IntConfOption LB_MAPPINGSTORE_LOWER =
127newIntConfOption("giraph.lbMappingStoreLower", -1,
128"'lower' value used by lbMappingstore");
129/** Class used to conduct expensive edge translation during vertex input */130ClassConfOption EDGE_TRANSLATION_CLASS =
131 ClassConfOption.create("giraph.edgeTranslationClass", null,
132 TranslateEdge.class, "Class used to conduct expensive edge " +
133"translation during vertex input phase");
134135/**Computation class - required */136 ClassConfOption<Computation> COMPUTATION_CLASS =
137 ClassConfOption.create("giraph.computationClass", null,
138 Computation.class, "Computation class - required");
139/**Computation factory class - optional */140 ClassConfOption<ComputationFactory> COMPUTATION_FACTORY_CLASS =
141 ClassConfOption.create("giraph.computation.factory.class",
142 DefaultComputationFactory.class, ComputationFactory.class,
143"Computation factory class - optional");
144145/** TypesHolder, used if Computation not set - optional */146 ClassConfOption<TypesHolder> TYPES_HOLDER_CLASS =
147 ClassConfOption.create("giraph.typesHolder", null,
148 TypesHolder.class,
149"TypesHolder, used if Computation not set - optional");
150151/**Edge Store Factory */152 ClassConfOption<EdgeStoreFactory> EDGE_STORE_FACTORY_CLASS =
153 ClassConfOption.create("giraph.edgeStoreFactoryClass",
154 InMemoryEdgeStoreFactory.class,
155 EdgeStoreFactory.class,
156"Edge Store Factory class to use for creating edgeStore");
157158/** Message Store Factory */159 ClassConfOption<MessageStoreFactory> MESSAGE_STORE_FACTORY_CLASS =
160 ClassConfOption.create("giraph.messageStoreFactoryClass",
161 InMemoryMessageStoreFactory.class,
162 MessageStoreFactory.class,
163"Message Store Factory Class that is to be used");
164165/**Language user's graph types are implemented in */166 PerGraphTypeEnumConfOption<Language> GRAPH_TYPE_LANGUAGES =
167 PerGraphTypeEnumConfOption.create("giraph.types.language",
168 Language.class, Language.JAVA,
169"Language user graph types (IVEMM) are implemented in");
170171/** Whether user graph types need wrappers */172PerGraphTypeBooleanConfOption GRAPH_TYPES_NEEDS_WRAPPERS =
173newPerGraphTypeBooleanConfOption("giraph.jython.type.wrappers",
174 false, "Whether user graph types (IVEMM) need Jython wrappers");
175176/**Vertex id factory class - optional */177 ClassConfOption<VertexIdFactory> VERTEX_ID_FACTORY_CLASS =
178 ClassConfOption.create("giraph.vertexIdFactoryClass",
179 DefaultVertexIdFactory.class, VertexIdFactory.class,
180"Vertex ID factory class - optional");
181/**Vertex value factory class - optional */182 ClassConfOption<VertexValueFactory> VERTEX_VALUE_FACTORY_CLASS =
183 ClassConfOption.create("giraph.vertexValueFactoryClass",
184 DefaultVertexValueFactory.class, VertexValueFactory.class,
185"Vertex value factory class - optional");
186/**Edge value factory class - optional */187 ClassConfOption<EdgeValueFactory> EDGE_VALUE_FACTORY_CLASS =
188 ClassConfOption.create("giraph.edgeValueFactoryClass",
189 DefaultEdgeValueFactory.class, EdgeValueFactory.class,
190"Edge value factory class - optional");
191/** Outgoing message value factory class - optional */192 ClassConfOption<MessageValueFactory>
193 OUTGOING_MESSAGE_VALUE_FACTORY_CLASS =
194 ClassConfOption.create("giraph.outgoingMessageValueFactoryClass",
195 DefaultMessageValueFactory.class, MessageValueFactory.class,
196"Outgoing message value factory class - optional");
197198/**Vertex edges class - optional */199 ClassConfOption<OutEdges> VERTEX_EDGES_CLASS =
200 ClassConfOption.create("giraph.outEdgesClass", ByteArrayEdges.class,
201 OutEdges.class, "Vertex edges class - optional");
202/**Vertex edges class to be used during edge input only - optional */203 ClassConfOption<OutEdges> INPUT_VERTEX_EDGES_CLASS =
204 ClassConfOption.create("giraph.inputOutEdgesClass",
205 ByteArrayEdges.class, OutEdges.class,
206"Vertex edges class to be used during edge input only - optional");
207/**OutEdges factory class - optional */208 ClassConfOption<OutEdgesFactory> VERTEX_EDGES_FACTORY_CLASS =
209 ClassConfOption.create("giraph.outEdgesFactoryClass",
210 DefaultOutEdgesFactory.class, OutEdgesFactory.class,
211"OutEdges factory class - optional");
212/**OutEdges for input factory class - optional */213 ClassConfOption<OutEdgesFactory> INPUT_VERTEX_EDGES_FACTORY_CLASS =
214 ClassConfOption.create("giraph.inputOutEdgesFactoryClass",
215 DefaultInputOutEdgesFactory.class, OutEdgesFactory.class,
216"OutEdges for input factory class - optional");
217218/** Class for Master - optional */219 ClassConfOption<MasterCompute> MASTER_COMPUTE_CLASS =
220 ClassConfOption.create("giraph.masterComputeClass",
221 DefaultMasterCompute.class, MasterCompute.class,
222"Class for Master - optional");
223/** Classes for Master Observer - optional */224 ClassConfOption<MasterObserver> MASTER_OBSERVER_CLASSES =
225 ClassConfOption.create("giraph.master.observers",
226null, MasterObserver.class, "Classes for Master Observer - optional");
227/** Classes for Worker Observer - optional */228 ClassConfOption<WorkerObserver> WORKER_OBSERVER_CLASSES =
229 ClassConfOption.create("giraph.worker.observers", null,
230 WorkerObserver.class, "Classes for Worker Observer - optional");
231/** Classes for Mapper Observer - optional */232 ClassConfOption<MapperObserver> MAPPER_OBSERVER_CLASSES =
233 ClassConfOption.create("giraph.mapper.observers", null,
234 MapperObserver.class, "Classes for Mapper Observer - optional");
235/** Classes for GC Observer - optional */236 ClassConfOption<GcObserver> GC_OBSERVER_CLASSES =
237 ClassConfOption.create("giraph.gc.observers", null,
238 GcObserver.class, "Classes for GC oObserver - optional");
239/** Message combiner class - optional */240 ClassConfOption<MessageCombiner> MESSAGE_COMBINER_CLASS =
241 ClassConfOption.create("giraph.messageCombinerClass", null,
242 MessageCombiner.class, "Message combiner class - optional");
243/**Vertex resolver class - optional */244 ClassConfOption<VertexResolver> VERTEX_RESOLVER_CLASS =
245 ClassConfOption.create("giraph.vertexResolverClass",
246 DefaultVertexResolver.class, VertexResolver.class,
247"Vertex resolver class - optional");
248/**Vertex value combiner class - optional */249 ClassConfOption<VertexValueCombiner> VERTEX_VALUE_COMBINER_CLASS =
250 ClassConfOption.create("giraph.vertexValueCombinerClass",
251 DefaultVertexValueCombiner.class, VertexValueCombiner.class,
252"Vertex value combiner class - optional");
253254/** Which language computation is implemented in */255 EnumConfOption<Language> COMPUTATION_LANGUAGE =
256 EnumConfOption.create("giraph.computation.language",
257 Language.class, Language.JAVA,
258"Which language computation is implemented in");
259260/**261 * Option of whether to create vertexes that were not existent before but262 * received messages263 */264BooleanConfOption RESOLVER_CREATE_VERTEX_ON_MSGS =
265newBooleanConfOption("giraph.vertex.resolver.create.on.msgs", true,
266"Option of whether to create vertexes that were not existent " +
267"before but received messages");
268/** Graph partitioner factory class - optional */269 ClassConfOption<GraphPartitionerFactory> GRAPH_PARTITIONER_FACTORY_CLASS =
270 ClassConfOption.create("giraph.graphPartitionerFactoryClass",
271 HashPartitionerFactory.class, GraphPartitionerFactory.class,
272"Graph partitioner factory class - optional");
273274/** Observer class to watch over job status - optional */275 ClassConfOption<GiraphJobObserver> JOB_OBSERVER_CLASS =
276 ClassConfOption.create("giraph.jobObserverClass",
277 DefaultJobObserver.class, GiraphJobObserver.class,
278"Observer class to watch over job status - optional");
279280/** Observer class to watch over job status - optional */281 ClassConfOption<GiraphJobRetryChecker> JOB_RETRY_CHECKER_CLASS =
282 ClassConfOption.create("giraph.jobRetryCheckerClass",
283 DefaultGiraphJobRetryChecker.class, GiraphJobRetryChecker.class,
284"Class which decides whether a failed job should be retried - " +
285"optional");
286287/**288 * Maximum allowed time for job to run after getting all resources before it289 * will be killed, in milliseconds (-1 if it has no limit)290 */291LongConfOption MAX_ALLOWED_JOB_TIME_MS =
292newLongConfOption("giraph.maxAllowedJobTimeMilliseconds", -1,
293"Maximum allowed time for job to run after getting all resources " +
294"before it will be killed, in milliseconds " +
295"(-1 if it has no limit)");
296297// At least one of the input format classes is required.298/**VertexInputFormat class */299 ClassConfOption<VertexInputFormat> VERTEX_INPUT_FORMAT_CLASS =
300 ClassConfOption.create("giraph.vertexInputFormatClass", null,
301 VertexInputFormat.class, "VertexInputFormat class (at least " +
302"one of the input format classes is required)");
303/**EdgeInputFormat class */304 ClassConfOption<EdgeInputFormat> EDGE_INPUT_FORMAT_CLASS =
305 ClassConfOption.create("giraph.edgeInputFormatClass", null,
306 EdgeInputFormat.class, "EdgeInputFormat class");
307/**MappingInputFormat class */308 ClassConfOption<MappingInputFormat> MAPPING_INPUT_FORMAT_CLASS =
309 ClassConfOption.create("giraph.mappingInputFormatClass", null,
310 MappingInputFormat.class, "MappingInputFormat class");
311312/**EdgeInputFilter class */313 ClassConfOption<EdgeInputFilter> EDGE_INPUT_FILTER_CLASS =
314 ClassConfOption.create("giraph.edgeInputFilterClass",
315 DefaultEdgeInputFilter.class, EdgeInputFilter.class,
316"EdgeInputFilter class");
317/**VertexInputFilter class */318 ClassConfOption<VertexInputFilter> VERTEX_INPUT_FILTER_CLASS =
319 ClassConfOption.create("giraph.vertexInputFilterClass",
320 DefaultVertexInputFilter.class, VertexInputFilter.class,
321"VertexInputFilter class");
322/**Vertex class */323 ClassConfOption<Vertex> VERTEX_CLASS =
324 ClassConfOption.create("giraph.vertexClass",
325 DefaultVertex.class, Vertex.class,
326"Vertex class");
327/**VertexOutputFormat class */328 ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
329 ClassConfOption.create("giraph.vertexOutputFormatClass", null,
330 VertexOutputFormat.class, "VertexOutputFormat class");
331/**EdgeOutputFormat sub-directory */332StrConfOption VERTEX_OUTPUT_FORMAT_SUBDIR =
333newStrConfOption("giraph.vertex.output.subdir", "",
334"VertexOutputFormat sub-directory");
335/**EdgeOutputFormat class */336 ClassConfOption<EdgeOutputFormat> EDGE_OUTPUT_FORMAT_CLASS =
337 ClassConfOption.create("giraph.edgeOutputFormatClass", null,
338 EdgeOutputFormat.class, "EdgeOutputFormat class");
339/**EdgeOutputFormat sub-directory */340StrConfOption EDGE_OUTPUT_FORMAT_SUBDIR =
341newStrConfOption("giraph.edge.output.subdir", "",
342"EdgeOutputFormat sub-directory");
343344/** GiraphTextOuputFormat Separator */345StrConfOption GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR =
346newStrConfOption("giraph.textoutputformat.separator", "\t",
347"GiraphTextOuputFormat Separator");
348/** Reverse values in the output */349BooleanConfOption GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE =
350newBooleanConfOption("giraph.textoutputformat.reverse", false,
351"Reverse values in the output");
352353/**354 * If you use this option, instead of having saving vertices in the end of355 * application, saveVertex will be called right after each vertex.compute()356 * is called.357 * NOTE: This feature doesn't work well with checkpointing - if you restart358 * from a checkpoint you won't have any output from previous supersteps.359 */360BooleanConfOption DO_OUTPUT_DURING_COMPUTATION =
361newBooleanConfOption("giraph.doOutputDuringComputation", false,
362"If you use this option, instead of having saving vertices in the " +
363"end of application, saveVertex will be called right after each " +
364"vertex.compute() is called." +
365"NOTE: This feature doesn't work well with checkpointing - if you " +
366"restart from a checkpoint you won't have any ouptut from previous " +
367"supresteps.");
368/**369 * Vertex output format thread-safe - if your VertexOutputFormat allows370 * several vertexWriters to be created and written to in parallel,371 * you should set this to true.372 */373BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE =
374newBooleanConfOption("giraph.vertexOutputFormatThreadSafe", false,
375"Vertex output format thread-safe - if your VertexOutputFormat " +
376"allows several vertexWriters to be created and written to in " +
377"parallel, you should set this to true.");
378/** Number of threads for writing output in the end of the application */379IntConfOption NUM_OUTPUT_THREADS =
380newIntConfOption("giraph.numOutputThreads", 1,
381"Number of threads for writing output in the end of the application");
382383/** conf key for comma-separated list of jars to export to YARN workers */384StrConfOption GIRAPH_YARN_LIBJARS =
385newStrConfOption("giraph.yarn.libjars", "",
386"conf key for comma-separated list of jars to export to YARN workers");
387/** Name of the XML file that will export our Configuration to YARN workers */388 String GIRAPH_YARN_CONF_FILE = "giraph-conf.xml";
389/** Giraph default heap size for all tasks when running on YARN profile */390int GIRAPH_YARN_TASK_HEAP_MB_DEFAULT = 1024;
391/** Name of Giraph property for user-configurable heap memory per worker */392IntConfOption GIRAPH_YARN_TASK_HEAP_MB = newIntConfOption(
393"giraph.yarn.task.heap.mb", GIRAPH_YARN_TASK_HEAP_MB_DEFAULT,
394"Name of Giraph property for user-configurable heap memory per worker");
395/** Default priority level in YARN for our task containers */396int GIRAPH_YARN_PRIORITY = 10;
397/** Is this a pure YARN job (i.e. no MapReduce layer managing Giraph tasks) */398BooleanConfOption IS_PURE_YARN_JOB =
399newBooleanConfOption("giraph.pure.yarn.job", false,
400"Is this a pure YARN job (i.e. no MapReduce layer managing Giraph " +
401"tasks)");
402403/**Vertex index class */404 ClassConfOption<WritableComparable> VERTEX_ID_CLASS =
405 ClassConfOption.create("giraph.vertexIdClass", null,
406 WritableComparable.class, "Vertex index class");
407/**Vertex value class */408 ClassConfOption<Writable> VERTEX_VALUE_CLASS =
409 ClassConfOption.create("giraph.vertexValueClass", null, Writable.class,
410"Vertex value class");
411/**Edge value class */412 ClassConfOption<Writable> EDGE_VALUE_CLASS =
413 ClassConfOption.create("giraph.edgeValueClass", null, Writable.class,
414"Edge value class");
415/** Outgoing message value class */416 ClassConfOption<Writable> OUTGOING_MESSAGE_VALUE_CLASS =
417 ClassConfOption.create("giraph.outgoingMessageValueClass", null,
418 Writable.class, "Outgoing message value class");
419/** Worker context class */420 ClassConfOption<WorkerContext> WORKER_CONTEXT_CLASS =
421 ClassConfOption.create("giraph.workerContextClass",
422 DefaultWorkerContext.class, WorkerContext.class,
423"Worker contextclass");
424/**AggregatorWriter class - optional */425 ClassConfOption<AggregatorWriter> AGGREGATOR_WRITER_CLASS =
426 ClassConfOption.create("giraph.aggregatorWriterClass",
427 TextAggregatorWriter.class, AggregatorWriter.class,
428"AggregatorWriter class - optional");
429430/**Partition class - optional */431 ClassConfOption<Partition> PARTITION_CLASS =
432 ClassConfOption.create("giraph.partitionClass", SimplePartition.class,
433 Partition.class, "Partition class - optional");
434435/**436 * Minimum number of simultaneous workers before this job can run (int)437 */438 String MIN_WORKERS = "giraph.minWorkers";
439/**440 * Maximum number of simultaneous worker tasks started by this job (int).441 */442 String MAX_WORKERS = "giraph.maxWorkers";
443444/**445 * Separate the workers and the master tasks. This is required446 * to support dynamic recovery. (boolean)447 */448BooleanConfOption SPLIT_MASTER_WORKER =
449newBooleanConfOption("giraph.SplitMasterWorker", true,
450"Separate the workers and the master tasks. This is required to " +
451"support dynamic recovery. (boolean)");
452453/** Indicates whether this job is run in an internal unit test */454BooleanConfOption LOCAL_TEST_MODE =
455newBooleanConfOption("giraph.localTestMode", false,
456"Indicates whether this job is run in an internal unit test");
457458/** Override the Hadoop log level and set the desired log level. */459StrConfOption LOG_LEVEL = newStrConfOption("giraph.logLevel", "info",
460"Override the Hadoop log level and set the desired log level.");
461462/** Use thread level debugging? */463BooleanConfOption LOG_THREAD_LAYOUT =
464newBooleanConfOption("giraph.logThreadLayout", false,
465"Use thread level debugging?");
466467/** Configuration key to enable jmap printing */468BooleanConfOption JMAP_ENABLE =
469newBooleanConfOption("giraph.jmap.histo.enable", false,
470"Configuration key to enable jmap printing");
471472/** Configuration key for msec to sleep between calls */473IntConfOption JMAP_SLEEP_MILLIS =
474newIntConfOption("giraph.jmap.histo.msec", SECONDS.toMillis(30),
475"Configuration key for msec to sleep between calls");
476477/** Configuration key for how many lines to print */478IntConfOption JMAP_PRINT_LINES =
479newIntConfOption("giraph.jmap.histo.print_lines", 30,
480"Configuration key for how many lines to print");
481482/**483 * Configuration key for printing live objects only484 * This option will trigger Full GC for every jmap dump485 * and so can significantly hinder performance.486 */487BooleanConfOption JMAP_LIVE_ONLY =
488newBooleanConfOption("giraph.jmap.histo.live", false,
489"Only print live objects in jmap?");
490491/**492 * Option used by ReactiveJMapHistoDumper to check for an imminent493 * OOM in worker or master process494 */495IntConfOption MIN_FREE_MBS_ON_HEAP =
496newIntConfOption("giraph.heap.minFreeMb", 128, "Option used by " +
497"worker and master observers to check for imminent OOM exception");
498/**499 * Option can be used to enable reactively dumping jmap histo when500 * OOM is imminent501 */502BooleanConfOption REACTIVE_JMAP_ENABLE =
503newBooleanConfOption("giraph.heap.enableReactiveJmapDumping", false,
504"Option to enable dumping jmap histogram reactively based on " +
505"free memory on heap");
506507/**508 * Minimum percent of the maximum number of workers that have responded509 * in order to continue progressing. (float)510 */511FloatConfOption MIN_PERCENT_RESPONDED =
512newFloatConfOption("giraph.minPercentResponded", 100.0f,
513"Minimum percent of the maximum number of workers that have " +
514"responded in order to continue progressing. (float)");
515516/** Enable the Metrics system */517BooleanConfOption METRICS_ENABLE =
518newBooleanConfOption("giraph.metrics.enable", false,
519"Enable the Metrics system");
520521/** Directory in HDFS to write master metrics to, instead of stderr */522StrConfOption METRICS_DIRECTORY =
523newStrConfOption("giraph.metrics.directory", "",
524"Directory in HDFS to write master metrics to, instead of stderr");
525526/**527 * ZooKeeper comma-separated list (if not set,528 * will start up ZooKeeper locally). Consider that after locally-starting529 * zookeeper, this parameter will updated the configuration with the corrent530 * configuration value.531 */532StrConfOption ZOOKEEPER_LIST =
533newStrConfOption("giraph.zkList", "",
534"ZooKeeper comma-separated list (if not set, will start up " +
535"ZooKeeper locally). Consider that after locally-starting " +
536"zookeeper, this parameter will updated the configuration with " +
537"the corrent configuration value.");
538539/**540 * Zookeeper List will always hold a value during the computation while541 * this option provides information regarding whether the zookeeper was542 * internally started or externally provided.543 */544BooleanConfOption ZOOKEEPER_IS_EXTERNAL =
545newBooleanConfOption("giraph.zkIsExternal", true,
546"Zookeeper List will always hold a value during " +
547"the computation while this option provides " +
548"information regarding whether the zookeeper was " +
549"internally started or externally provided.");
550551/** ZooKeeper session millisecond timeout */552IntConfOption ZOOKEEPER_SESSION_TIMEOUT =
553newIntConfOption("giraph.zkSessionMsecTimeout", MINUTES.toMillis(1),
554"ZooKeeper session millisecond timeout");
555556/** Polling interval to check for the ZooKeeper server data */557IntConfOption ZOOKEEPER_SERVERLIST_POLL_MSECS =
558newIntConfOption("giraph.zkServerlistPollMsecs", SECONDS.toMillis(3),
559"Polling interval to check for the ZooKeeper server data");
560561/** ZooKeeper port to use */562IntConfOption ZOOKEEPER_SERVER_PORT =
563newIntConfOption("giraph.zkServerPort", 22181, "ZooKeeper port to use");
564565/** Local ZooKeeper directory to use */566 String ZOOKEEPER_DIR = "giraph.zkDir";
567568/** Max attempts for handling ZooKeeper connection loss */569IntConfOption ZOOKEEPER_OPS_MAX_ATTEMPTS =
570newIntConfOption("giraph.zkOpsMaxAttempts", 3,
571"Max attempts for handling ZooKeeper connection loss");
572573/**574 * Msecs to wait before retrying a failed ZooKeeper op due to connection loss.575 */576IntConfOption ZOOKEEPER_OPS_RETRY_WAIT_MSECS =
577newIntConfOption("giraph.zkOpsRetryWaitMsecs", SECONDS.toMillis(5),
578"Msecs to wait before retrying a failed ZooKeeper op due to " +
579"connection loss.");
580581/** TCP backlog (defaults to number of workers) */582IntConfOption TCP_BACKLOG = newIntConfOption("giraph.tcpBacklog", 1,
583"TCP backlog (defaults to number of workers)");
584585/** Use netty pooled memory buffer allocator */586BooleanConfOption NETTY_USE_POOLED_ALLOCATOR = newBooleanConfOption(
587"giraph.useNettyPooledAllocator", false, "Should netty use pooled " +
588"memory allocator?");
589590/** Use direct memory buffers in netty */591BooleanConfOption NETTY_USE_DIRECT_MEMORY = newBooleanConfOption(
592"giraph.useNettyDirectMemory", false, "Should netty use direct " +
593"memory buffers");
594595/** How big to make the encoder buffer? */596IntConfOption NETTY_REQUEST_ENCODER_BUFFER_SIZE =
597newIntConfOption("giraph.nettyRequestEncoderBufferSize", 32 * ONE_KB,
598"How big to make the encoder buffer?");
599600/** Netty client threads */601IntConfOption NETTY_CLIENT_THREADS =
602newIntConfOption("giraph.nettyClientThreads", 4, "Netty client threads");
603604/** Netty server threads */605IntConfOption NETTY_SERVER_THREADS =
606newIntConfOption("giraph.nettyServerThreads", 16,
607"Netty server threads");
608609/** Use the execution handler in netty on the client? */610BooleanConfOption NETTY_CLIENT_USE_EXECUTION_HANDLER =
611newBooleanConfOption("giraph.nettyClientUseExecutionHandler", true,
612"Use the execution handler in netty on the client?");
613614/** Netty client execution threads (execution handler) */615IntConfOption NETTY_CLIENT_EXECUTION_THREADS =
616newIntConfOption("giraph.nettyClientExecutionThreads", 8,
617"Netty client execution threads (execution handler)");
618619/** Where to place the netty client execution handle? */620StrConfOption NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
621newStrConfOption("giraph.nettyClientExecutionAfterHandler",
622"request-encoder",
623"Where to place the netty client execution handle?");
624625/** Use the execution handler in netty on the server? */626BooleanConfOption NETTY_SERVER_USE_EXECUTION_HANDLER =
627newBooleanConfOption("giraph.nettyServerUseExecutionHandler", true,
628"Use the execution handler in netty on the server?");
629630/** Netty server execution threads (execution handler) */631IntConfOption NETTY_SERVER_EXECUTION_THREADS =
632newIntConfOption("giraph.nettyServerExecutionThreads", 8,
633"Netty server execution threads (execution handler)");
634635/** Where to place the netty server execution handle? */636StrConfOption NETTY_SERVER_EXECUTION_AFTER_HANDLER =
637newStrConfOption("giraph.nettyServerExecutionAfterHandler",
638"requestFrameDecoder",
639"Where to place the netty server execution handle?");
640641/** Netty simulate a first request closed */642BooleanConfOption NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
643newBooleanConfOption("giraph.nettySimulateFirstRequestClosed", false,
644"Netty simulate a first request closed");
645646/** Netty simulate a first response failed */647BooleanConfOption NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
648newBooleanConfOption("giraph.nettySimulateFirstResponseFailed", false,
649"Netty simulate a first response failed");
650651/** Netty - set which compression to use */652StrConfOption NETTY_COMPRESSION_ALGORITHM =
653newStrConfOption("giraph.nettyCompressionAlgorithm", "",
654"Which compression algorithm to use in netty");
655656/** Max resolve address attempts */657IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS =
658newIntConfOption("giraph.maxResolveAddressAttempts", 5,
659"Max resolve address attempts");
660661/** Msecs to wait between waiting for all requests to finish */662IntConfOption WAITING_REQUEST_MSECS =
663newIntConfOption("giraph.waitingRequestMsecs", SECONDS.toMillis(15),
664"Msecs to wait between waiting for all requests to finish");
665666/** Millseconds to wait for an event before continuing */667IntConfOption EVENT_WAIT_MSECS =
668newIntConfOption("giraph.eventWaitMsecs", SECONDS.toMillis(30),
669"Millseconds to wait for an event before continuing");
670671/**672 * Maximum milliseconds to wait before giving up trying to get the minimum673 * number of workers before a superstep (int).674 */675IntConfOption MAX_MASTER_SUPERSTEP_WAIT_MSECS =
676newIntConfOption("giraph.maxMasterSuperstepWaitMsecs",
677 MINUTES.toMillis(10),
678"Maximum milliseconds to wait before giving up trying to get the " +
679"minimum number of workers before a superstep (int).");
680681/**682 * Maximum milliseconds to wait before giving up waiting for the workers to683 * write the counters to the Zookeeper after a superstep684 */685IntConfOption MAX_COUNTER_WAIT_MSECS = newIntConfOption(
686"giraph.maxCounterWaitMsecs", MINUTES.toMillis(2),
687"Maximum milliseconds to wait before giving up waiting for" +
688"the workers to write their counters to the " +
689"zookeeper after a superstep");
690691/** Milliseconds for a request to complete (or else resend) */692IntConfOption MAX_REQUEST_MILLISECONDS =
693newIntConfOption("giraph.maxRequestMilliseconds", MINUTES.toMillis(10),
694"Milliseconds for a request to complete (or else resend)");
695696/**697 * Whether to resend request which timed out or fail the job if timeout698 * happens699 */700BooleanConfOption RESEND_TIMED_OUT_REQUESTS =
701newBooleanConfOption("giraph.resendTimedOutRequests", true,
702"Whether to resend request which timed out or fail the job if " +
703"timeout happens");
704705/** Netty max connection failures */706IntConfOption NETTY_MAX_CONNECTION_FAILURES =
707newIntConfOption("giraph.nettyMaxConnectionFailures", 1000,
708"Netty max connection failures");
709710/** How long to wait before trying to reconnect failed connections */711IntConfOption WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS =
712newIntConfOption("giraph.waitTimeBetweenConnectionRetriesMs", 500,
713"");
714715/** Initial port to start using for the IPC communication */716IntConfOption IPC_INITIAL_PORT =
717newIntConfOption("giraph.ipcInitialPort", 30000,
718"Initial port to start using for the IPC communication");
719720/** Maximum bind attempts for different IPC ports */721IntConfOption MAX_IPC_PORT_BIND_ATTEMPTS =
722newIntConfOption("giraph.maxIpcPortBindAttempts", 20,
723"Maximum bind attempts for different IPC ports");
724/**725 * Fail first IPC port binding attempt, simulate binding failure726 * on real grid testing727 */728BooleanConfOption FAIL_FIRST_IPC_PORT_BIND_ATTEMPT =
729newBooleanConfOption("giraph.failFirstIpcPortBindAttempt", false,
730"Fail first IPC port binding attempt, simulate binding failure " +
731"on real grid testing");
732733/** Client send buffer size */734IntConfOption CLIENT_SEND_BUFFER_SIZE =
735newIntConfOption("giraph.clientSendBufferSize", 512 * ONE_KB,
736"Client send buffer size");
737738/** Client receive buffer size */739IntConfOption CLIENT_RECEIVE_BUFFER_SIZE =
740newIntConfOption("giraph.clientReceiveBufferSize", 32 * ONE_KB,
741"Client receive buffer size");
742743/** Server send buffer size */744IntConfOption SERVER_SEND_BUFFER_SIZE =
745newIntConfOption("giraph.serverSendBufferSize", 32 * ONE_KB,
746"Server send buffer size");
747748/** Server receive buffer size */749IntConfOption SERVER_RECEIVE_BUFFER_SIZE =
750newIntConfOption("giraph.serverReceiveBufferSize", 512 * ONE_KB,
751"Server receive buffer size");
752753/** Maximum size of messages (in bytes) per peer before flush */754IntConfOption MAX_MSG_REQUEST_SIZE =
755newIntConfOption("giraph.msgRequestSize", 512 * ONE_KB,
756"Maximum size of messages (in bytes) per peer before flush");
757758/**759 * How much bigger than the average per partition size to make initial per760 * partition buffers.761 * If this value is A, message request size is M,762 * and a worker has P partitions, than its initial partition buffer size763 * will be (M / P) * (1 + A).764 */765FloatConfOption ADDITIONAL_MSG_REQUEST_SIZE =
766newFloatConfOption("giraph.additionalMsgRequestSize", 0.2f,
767"How much bigger than the average per partition size to make " +
768"initial per partition buffers. If this value is A, message " +
769"request size is M, and a worker has P partitions, than its " +
770"initial partition buffer size will be (M / P) * (1 + A).");
771772773/** Warn if msg request size exceeds default size by this factor */774FloatConfOption REQUEST_SIZE_WARNING_THRESHOLD = newFloatConfOption(
775"giraph.msgRequestWarningThreshold", 2.0f,
776"If request sizes are bigger than the buffer size by this factor " +
777"warnings are printed to the log and to the command line");
778779/** Maximum size of vertices (in bytes) per peer before flush */780IntConfOption MAX_VERTEX_REQUEST_SIZE =
781newIntConfOption("giraph.vertexRequestSize", 512 * ONE_KB,
782"Maximum size of vertices (in bytes) per peer before flush");
783784/**785 * Additional size (expressed as a ratio) of each per-partition buffer on786 * top of the average size for vertices.787 */788FloatConfOption ADDITIONAL_VERTEX_REQUEST_SIZE =
789newFloatConfOption("giraph.additionalVertexRequestSize", 0.2f,
790"Additional size (expressed as a ratio) of each per-partition " +
791"buffer on top of the average size.");
792793/** Maximum size of edges (in bytes) per peer before flush */794IntConfOption MAX_EDGE_REQUEST_SIZE =
795newIntConfOption("giraph.edgeRequestSize", 512 * ONE_KB,
796"Maximum size of edges (in bytes) per peer before flush");
797798/**799 * Additional size (expressed as a ratio) of each per-partition buffer on800 * top of the average size for edges.801 */802FloatConfOption ADDITIONAL_EDGE_REQUEST_SIZE =
803newFloatConfOption("giraph.additionalEdgeRequestSize", 0.2f,
804"Additional size (expressed as a ratio) of each per-partition " +
805"buffer on top of the average size.");
806807/** Maximum number of mutations per partition before flush */808IntConfOption MAX_MUTATIONS_PER_REQUEST =
809newIntConfOption("giraph.maxMutationsPerRequest", 100,
810"Maximum number of mutations per partition before flush");
811812/**813 * Use message size encoding (typically better for complex objects,814 * not meant for primitive wrapped messages)815 */816BooleanConfOption USE_MESSAGE_SIZE_ENCODING =
817newBooleanConfOption("giraph.useMessageSizeEncoding", false,
818"Use message size encoding (typically better for complex objects, " +
819"not meant for primitive wrapped messages)");
820821/** Number of channels used per server */822IntConfOption CHANNELS_PER_SERVER =
823newIntConfOption("giraph.channelsPerServer", 1,
824"Number of channels used per server");
825826/** Number of flush threads per peer */827 String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads";
828829/** Number of threads for vertex computation */830IntConfOption NUM_COMPUTE_THREADS =
831newIntConfOption("giraph.numComputeThreads", 1,
832"Number of threads for vertex computation");
833834/** Number of threads for input split loading */835IntConfOption NUM_INPUT_THREADS =
836newIntConfOption("giraph.numInputThreads", 1,
837"Number of threads for input split loading");
838839/** Minimum stragglers of the superstep before printing them out */840IntConfOption PARTITION_LONG_TAIL_MIN_PRINT =
841newIntConfOption("giraph.partitionLongTailMinPrint", 1,
842"Minimum stragglers of the superstep before printing them out");
843844/** Use superstep counters? (boolean) */845BooleanConfOption USE_SUPERSTEP_COUNTERS =
846newBooleanConfOption("giraph.useSuperstepCounters", true,
847"Use superstep counters? (boolean)");
848849/**850 * Input split sample percent - Used only for sampling and testing, rather851 * than an actual job. The idea is that to test, you might only want a852 * fraction of the actual input splits from your VertexInputFormat to853 * load (values should be [0, 100]).854 */855FloatConfOption INPUT_SPLIT_SAMPLE_PERCENT =
856newFloatConfOption("giraph.inputSplitSamplePercent", 100f,
857"Input split sample percent - Used only for sampling and testing, " +
858"rather than an actual job. The idea is that to test, you might " +
859"only want a fraction of the actual input splits from your " +
860"VertexInputFormat to load (values should be [0, 100]).");
861862/**863 * To limit outlier vertex input splits from producing too many vertices or864 * to help with testing, the number of vertices loaded from an input split865 * can be limited. By default, everything is loaded.866 */867LongConfOption INPUT_SPLIT_MAX_VERTICES =
868newLongConfOption("giraph.InputSplitMaxVertices", -1,
869"To limit outlier vertex input splits from producing too many " +
870"vertices or to help with testing, the number of vertices " +
871"loaded from an input split can be limited. By default, " +
872"everything is loaded.");
873874/**875 * To limit outlier vertex input splits from producing too many vertices or876 * to help with testing, the number of edges loaded from an input split877 * can be limited. By default, everything is loaded.878 */879LongConfOption INPUT_SPLIT_MAX_EDGES =
880newLongConfOption("giraph.InputSplitMaxEdges", -1,
881"To limit outlier vertex input splits from producing too many " +
882"vertices or to help with testing, the number of edges loaded " +
883"from an input split can be limited. By default, everything is " +
884"loaded.");
885886/**887 * To minimize network usage when reading input splits,888 * each worker can prioritize splits that reside on its host.889 * This, however, comes at the cost of increased load on ZooKeeper.890 * Hence, users with a lot of splits and input threads (or with891 * configurations that can't exploit locality) may want to disable it.892 */893BooleanConfOption USE_INPUT_SPLIT_LOCALITY =
894newBooleanConfOption("giraph.useInputSplitLocality", true,
895"To minimize network usage when reading input splits, each worker " +
896"can prioritize splits that reside on its host. " +
897"This, however, comes at the cost of increased load on ZooKeeper. " +
898"Hence, users with a lot of splits and input threads (or with " +
899"configurations that can't exploit locality) may want to disable " +
900"it.");
901902/** Multiplier for the current workers squared */903FloatConfOption PARTITION_COUNT_MULTIPLIER =
904newFloatConfOption("giraph.masterPartitionCountMultiplier", 1.0f,
905"Multiplier for the current workers squared");
906907/** Minimum number of partitions to have per compute thread */908IntConfOption MIN_PARTITIONS_PER_COMPUTE_THREAD =
909newIntConfOption("giraph.minPartitionsPerComputeThread", 1,
910"Minimum number of partitions to have per compute thread");
911912/** Overrides default partition count calculation if not -1 */913IntConfOption USER_PARTITION_COUNT =
914newIntConfOption("giraph.userPartitionCount", -1,
915"Overrides default partition count calculation if not -1");
916917/** Vertex key space size for918 * {@link org.apache.giraph.partition.WorkerGraphPartitionerImpl}919 */920 String PARTITION_VERTEX_KEY_SPACE_SIZE = "giraph.vertexKeySpaceSize";
921922/**923 * How often to checkpoint (i.e. 0, means no checkpoint,924 * 1 means every superstep, 2 is every two supersteps, etc.).925 */926IntConfOption CHECKPOINT_FREQUENCY =
927newIntConfOption("giraph.checkpointFrequency", 0,
928"How often to checkpoint (i.e. 0, means no checkpoint, 1 means " +
929"every superstep, 2 is every two supersteps, etc.).");
930931/**932 * Delete checkpoints after a successful job run?933 */934BooleanConfOption CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
935newBooleanConfOption("giraph.cleanupCheckpointsAfterSuccess", true,
936"Delete checkpoints after a successful job run?");
937938/**939 * An application can be restarted manually by selecting a superstep. The940 * corresponding checkpoint must exist for this to work. The user should941 * set a long value. Default is start from scratch.942 */943 String RESTART_SUPERSTEP = "giraph.restartSuperstep";
944945/**946 * If application is restarted manually we need to specify job ID947 * to restart from.948 */949StrConfOption RESTART_JOB_ID = newStrConfOption("giraph.restart.jobId",
950null, "Which job ID should I try to restart?");
951952/**953 * Base ZNode for Giraph's state in the ZooKeeper cluster. Must be a root954 * znode on the cluster beginning with "/"955 */956 String BASE_ZNODE_KEY = "giraph.zkBaseZNode";
957958/**959 * If ZOOKEEPER_LIST is not set, then use this directory to manage960 * ZooKeeper961 */962StrConfOption ZOOKEEPER_MANAGER_DIRECTORY =
963newStrConfOption("giraph.zkManagerDirectory",
964"_bsp/_defaultZkManagerDir",
965"If ZOOKEEPER_LIST is not set, then use this directory to manage " +
966"ZooKeeper");
967968/** Number of ZooKeeper client connection attempts before giving up. */969IntConfOption ZOOKEEPER_CONNECTION_ATTEMPTS =
970newIntConfOption("giraph.zkConnectionAttempts", 10,
971"Number of ZooKeeper client connection attempts before giving up.");
972973/** This directory has/stores the available checkpoint files in HDFS. */974StrConfOption CHECKPOINT_DIRECTORY =
975newStrConfOption("giraph.checkpointDirectory", "_bsp/_checkpoints/",
976"This directory has/stores the available checkpoint files in HDFS.");
977978/**979 * Comma-separated list of directories in the local filesystem for980 * out-of-core partitions.981 */982StrConfOption PARTITIONS_DIRECTORY =
983newStrConfOption("giraph.partitionsDirectory", "_bsp/_partitions",
984"Comma-separated list of directories in the local filesystem for " +
985"out-of-core partitions.");
986987/**988 * Number of IO threads used in out-of-core mechanism. If local disk is used989 * for spilling data to and reading data from, this number should be equal to990 * the number of available disks on each machine. In such case, one should991 * use giraph.partitionsDirectory to specify directories mounted on different992 * disks.993 */994IntConfOption NUM_OUT_OF_CORE_THREADS =
995newIntConfOption("giraph.numOutOfCoreThreads", 1, "Number of IO " +
996"threads used in out-of-core mechanism. If using local disk to " +
997"spill data, this should be equal to the number of available " +
998"disks. In such case, use giraph.partitionsDirectory to specify " +
999"mount points on different disks.");
10001001/** Enable out-of-core graph. */1002BooleanConfOption USE_OUT_OF_CORE_GRAPH =
1003newBooleanConfOption("giraph.useOutOfCoreGraph", false,
1004"Enable out-of-core graph.");
10051006/** Data accessor resource/object */1007 ClassConfOption<OutOfCoreDataAccessor> OUT_OF_CORE_DATA_ACCESSOR =
1008 ClassConfOption.create("giraph.outOfCoreDataAccessor",
1009 LocalDiskDataAccessor.class, OutOfCoreDataAccessor.class,
1010"Data accessor used in out-of-core computation (local-disk, " +
1011"in-memory, HDFS, etc.)");
10121013/**1014 * Out-of-core oracle that is to be used for adaptive out-of-core engine. If1015 * the `MAX_PARTITIONS_IN_MEMORY` is already set, this will be over-written1016 * to be `FixedPartitionsOracle`.1017 */1018 ClassConfOption<OutOfCoreOracle> OUT_OF_CORE_ORACLE =
1019 ClassConfOption.create("giraph.outOfCoreOracle",
1020 MemoryEstimatorOracle.class, OutOfCoreOracle.class,
1021"Out-of-core oracle that is to be used for adaptive out-of-core " +
1022"engine");
10231024/** Maximum number of partitions to hold in memory for each worker. */1025IntConfOption MAX_PARTITIONS_IN_MEMORY =
1026newIntConfOption("giraph.maxPartitionsInMemory", 0,
1027"Maximum number of partitions to hold in memory for each worker. By" +
1028" default it is set to 0 (for adaptive out-of-core mechanism");
10291030/** Directory to write YourKit snapshots to */1031 String YOURKIT_OUTPUT_DIR = "giraph.yourkit.outputDir";
1032/** Default directory to write YourKit snapshots to */1033 String YOURKIT_OUTPUT_DIR_DEFAULT = "/tmp/giraph/%JOB_ID%/%TASK_ID%";
10341035/** Keep the zookeeper output for debugging? Default is to remove it. */1036BooleanConfOption KEEP_ZOOKEEPER_DATA =
1037newBooleanConfOption("giraph.keepZooKeeperData", false,
1038"Keep the zookeeper output for debugging? Default is to remove it.");
1039/** Default ZooKeeper snap count. */1040int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000;
1041/** Default ZooKeeper tick time. */1042int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
1043/** Default ZooKeeper maximum client connections. */1044int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
1045/** Number of snapshots to be retained after purge */1046int ZOOKEEPER_SNAP_RETAIN_COUNT = 3;
1047/** Zookeeper purge interval in hours */1048int ZOOKEEPER_PURGE_INTERVAL = 1;
1049/** ZooKeeper minimum session timeout */1050IntConfOption ZOOKEEPER_MIN_SESSION_TIMEOUT =
1051newIntConfOption("giraph.zKMinSessionTimeout", MINUTES.toMillis(10),
1052"ZooKeeper minimum session timeout");
1053/** ZooKeeper maximum session timeout */1054IntConfOption ZOOKEEPER_MAX_SESSION_TIMEOUT =
1055newIntConfOption("giraph.zkMaxSessionTimeout", MINUTES.toMillis(15),
1056"ZooKeeper maximum session timeout");
10571058/** ZooKeeper force sync */1059BooleanConfOption ZOOKEEPER_FORCE_SYNC =
1060newBooleanConfOption("giraph.zKForceSync", false,
1061"ZooKeeper force sync");
10621063/** ZooKeeper skip ACLs */1064BooleanConfOption ZOOKEEPER_SKIP_ACL =
1065newBooleanConfOption("giraph.ZkSkipAcl", true, "ZooKeeper skip ACLs");
10661067/**1068 * Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate1069 * and authorize Netty BSP Clients to Servers.1070 */1071BooleanConfOption AUTHENTICATE =
1072newBooleanConfOption("giraph.authenticate", false,
1073"Whether to use SASL with DIGEST and Hadoop Job Tokens to " +
1074"authenticate and authorize Netty BSP Clients to Servers.");
10751076/** Use unsafe serialization? */1077BooleanConfOption USE_UNSAFE_SERIALIZATION =
1078newBooleanConfOption("giraph.useUnsafeSerialization", true,
1079"Use unsafe serialization?");
10801081/**1082 * Use BigDataIO for messages? If there are super-vertices in the1083 * graph which receive a lot of messages (total serialized size of messages1084 * goes beyond the maximum size of a byte array), setting this option to true1085 * will remove that limit. The maximum memory available for a single vertex1086 * will be limited to the maximum heap size available.1087 */1088BooleanConfOption USE_BIG_DATA_IO_FOR_MESSAGES =
1089newBooleanConfOption("giraph.useBigDataIOForMessages", false,
1090"Use BigDataIO for messages?");
10911092/**1093 * Maximum number of attempts a master/worker will retry before killing1094 * the job. This directly maps to the number of map task attempts in1095 * Hadoop.1096 */1097IntConfOption MAX_TASK_ATTEMPTS =
1098newIntConfOption("mapred.map.max.attempts", -1,
1099"Maximum number of attempts a master/worker will retry before " +
1100"killing the job. This directly maps to the number of map task " +
1101"attempts in Hadoop.");
11021103/** Interface to use for hostname resolution */1104StrConfOption DNS_INTERFACE =
1105newStrConfOption("giraph.dns.interface", "default",
1106"Interface to use for hostname resolution");
1107/** Server for hostname resolution */1108StrConfOption DNS_NAMESERVER =
1109newStrConfOption("giraph.dns.nameserver", "default",
1110"Server for hostname resolution");
11111112/**1113 * The application will halt after this many supersteps is completed. For1114 * instance, if it is set to 3, the application will run at most 0, 1,1115 * and 2 supersteps and then go into the shutdown superstep.1116 */1117IntConfOption MAX_NUMBER_OF_SUPERSTEPS =
1118newIntConfOption("giraph.maxNumberOfSupersteps", 1,
1119"The application will halt after this many supersteps is " +
1120"completed. For instance, if it is set to 3, the application will " +
1121"run at most 0, 1, and 2 supersteps and then go into the shutdown " +
1122"superstep.");
11231124/**1125 * The application will not mutate the graph topology (the edges). It is used1126 * to optimise out-of-core graph, by not writing back edges every time.1127 */1128BooleanConfOption STATIC_GRAPH =
1129newBooleanConfOption("giraph.isStaticGraph", false,
1130"The application will not mutate the graph topology (the edges). " +
1131"It is used to optimise out-of-core graph, by not writing back " +
1132"edges every time.");
11331134/**1135 * This option will tell which message encode & store enum to use when1136 * combining is not enabled1137 */1138 EnumConfOption<MessageEncodeAndStoreType> MESSAGE_ENCODE_AND_STORE_TYPE =
1139 EnumConfOption.create("giraph.messageEncodeAndStoreType",
1140 MessageEncodeAndStoreType.class,
1141 MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION,
1142"Select the message_encode_and_store_type to use");
11431144/**1145 * This option can be used to specify if a source vertex present in edge1146 * input but not in vertex input can be created1147 */1148BooleanConfOption CREATE_EDGE_SOURCE_VERTICES =
1149newBooleanConfOption("giraph.createEdgeSourceVertices", true,
1150"Create a source vertex if present in edge input but not " +
1151"necessarily in vertex input");
11521153/**1154 * Defines a call back that can be used to make decisions on1155 * whether the vertex should be created or not in the runtime.1156 */1157 ClassConfOption<CreateSourceVertexCallback>
1158 CREATE_EDGE_SOURCE_VERTICES_CALLBACK =
1159 ClassConfOption.create("giraph.createEdgeSourceVerticesCallback",
1160 DefaultCreateSourceVertexCallback.class,
1161 CreateSourceVertexCallback.class,
1162"Decide whether we should create a source vertex when id is " +
1163"present in the edge input but not in vertex input");
11641165/**1166 * This counter group will contain one counter whose name is the ZooKeeper1167 * server:port which this job is using1168 */1169 String ZOOKEEPER_SERVER_PORT_COUNTER_GROUP = "Zookeeper server:port";
11701171/**1172 * This counter group will contain one counter whose name is the ZooKeeper1173 * node path which should be created to trigger computation halt1174 */1175 String ZOOKEEPER_HALT_NODE_COUNTER_GROUP = "Zookeeper halt node";
11761177/**1178 * This counter group will contain one counter whose name is the ZooKeeper1179 * node path which contains all data about this job1180 */1181 String ZOOKEEPER_BASE_PATH_COUNTER_GROUP = "Zookeeper base path";
11821183/**1184 * Which class to use to write instructions on how to halt the application1185 */1186 ClassConfOption<HaltApplicationUtils.HaltInstructionsWriter>
1187 HALT_INSTRUCTIONS_WRITER_CLASS = ClassConfOption.create(
1188"giraph.haltInstructionsWriter",
1189 HaltApplicationUtils.DefaultHaltInstructionsWriter.class,
1190 HaltApplicationUtils.HaltInstructionsWriter.class,
1191"Class used to write instructions on how to halt the application");
11921193/**1194 * Maximum timeout (in milliseconds) for waiting for all tasks1195 * to complete after the job is done. Defaults to 15 minutes.1196 */1197IntConfOption WAIT_TASK_DONE_TIMEOUT_MS =
1198newIntConfOption("giraph.waitTaskDoneTimeoutMs", MINUTES.toMillis(15),
1199"Maximum timeout (in ms) for waiting for all all tasks to " +
1200"complete");
12011202/** Whether to track job progress on client or not */1203BooleanConfOption TRACK_JOB_PROGRESS_ON_CLIENT =
1204newBooleanConfOption("giraph.trackJobProgressOnClient", false,
1205"Whether to track job progress on client or not");
12061207/** Class to use as the job progress client */1208 ClassConfOption<JobProgressTrackerClient> JOB_PROGRESS_TRACKER_CLIENT_CLASS =
1209 ClassConfOption.create("giraph.jobProgressTrackerClientClass",
1210 RetryableJobProgressTrackerClient.class,
1211 JobProgressTrackerClient.class,
1212"Class to use to make calls to the job progress tracker service");
12131214/** Class to use to track job progress on client */1215 ClassConfOption<JobProgressTrackerService>
1216 JOB_PROGRESS_TRACKER_SERVICE_CLASS =
1217 ClassConfOption.create("giraph.jobProgressTrackerServiceClass",
1218 DefaultJobProgressTrackerService.class,
1219 JobProgressTrackerService.class,
1220"Class to use to track job progress on client");
12211222/**1223 * Minimum number of vertices to compute before adding to worker progress.1224 */1225LongConfOption VERTICES_TO_UPDATE_PROGRESS =
1226newLongConfOption("giraph.VerticesToUpdateProgress", 100000,
1227"Minimum number of vertices to compute before " +
1228"updating worker progress");
122912301231/** Number of retries for creating the HDFS files */1232IntConfOption HDFS_FILE_CREATION_RETRIES =
1233newIntConfOption("giraph.hdfs.file.creation.retries", 10,
1234"Retries to create an HDFS file before failing");
12351236/** Number of milliseconds to wait before retrying HDFS file creation */1237IntConfOption HDFS_FILE_CREATION_RETRY_WAIT_MS =
1238newIntConfOption("giraph.hdfs.file.creation.retry.wait.ms", 30_000,
1239"Milliseconds to wait prior to retrying creation of an HDFS file");
12401241/** Number of threads for writing and reading checkpoints */1242IntConfOption NUM_CHECKPOINT_IO_THREADS =
1243newIntConfOption("giraph.checkpoint.io.threads", 8,
1244"Number of threads for writing and reading checkpoints");
12451246/**1247 * Compression algorithm to be used for checkpointing.1248 * Defined by extension for hadoop compatibility reasons.1249 */1250StrConfOption CHECKPOINT_COMPRESSION_CODEC =
1251newStrConfOption("giraph.checkpoint.compression.codec",
1252".deflate",
1253"Defines compression algorithm we will be using for " +
1254"storing checkpoint. Available options include but " +
1255"not restricted to: .deflate, .gz, .bz2, .lzo");
12561257/**1258 * Defines if and when checkpointing is supported by this job.1259 * By default checkpointing is always supported unless output during the1260 * computation is enabled.1261 */1262 ClassConfOption<CheckpointSupportedChecker> CHECKPOINT_SUPPORTED_CHECKER =
1263 ClassConfOption.create("giraph.checkpoint.supported.checker",
1264 DefaultCheckpointSupportedChecker.class,
1265 CheckpointSupportedChecker.class,
1266"This is the way to specify if checkpointing is " +
1267"supported by the job");
126812691270/** Number of threads to use in async message store, 0 means1271 * we should not use async message processing */1272IntConfOption ASYNC_MESSAGE_STORE_THREADS_COUNT =
1273newIntConfOption("giraph.async.message.store.threads", 0,
1274"Number of threads to be used in async message store.");
12751276/**Output format class for hadoop to use (for committing) */1277 ClassConfOption<OutputFormat> HADOOP_OUTPUT_FORMAT_CLASS =
1278 ClassConfOption.create("giraph.hadoopOutputFormatClass",
1279 BspOutputFormat.class, OutputFormat.class,
1280"Output format class for hadoop to use (for committing)");
12811282/**1283 * For worker to worker communication we can use IPs or host names, by1284 * default prefer IPs.1285 */1286BooleanConfOption PREFER_IP_ADDRESSES =
1287newBooleanConfOption("giraph.preferIP", false,
1288"Prefer IP addresses instead of host names");
12891290/**1291 * Timeout for "waitForever", when we need to wait for zookeeper.1292 * Since we should never really have to wait forever.1293 * We should only wait some reasonable but large amount of time.1294 */1295LongConfOption WAIT_ZOOKEEPER_TIMEOUT_MSEC =
1296newLongConfOption("giraph.waitZookeeperTimeoutMsec",
1297 MINUTES.toMillis(15),
1298"How long should we stay in waitForever loops in various " +
1299"places that require network connection");
13001301/**1302 * Timeout for "waitForever", when we need to wait for other workers1303 * to complete their job.1304 * Since we should never really have to wait forever.1305 * We should only wait some reasonable but large amount of time.1306 */1307LongConfOption WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC =
1308newLongConfOption("giraph.waitForOtherWorkersMsec",
1309 HOURS.toMillis(48),
1310"How long should workers wait to finish superstep");
13111312/** Number of supersteps job will run for */1313IntConfOption SUPERSTEP_COUNT = newIntConfOption("giraph.numSupersteps", -1,
1314"Number of supersteps job will run for");
13151316/** Whether to disable GiraphClassResolver which is an efficient1317 * implementation of kryo class resolver. By default this resolver is used by1318 * KryoSimpleWritable and KryoSimpleWrapper, and can be disabled with this1319 * option */1320BooleanConfOption DISABLE_GIRAPH_CLASS_RESOLVER =
1321newBooleanConfOption("giraph.disableGiraphClassResolver", false,
1322"Disables GiraphClassResolver, which is a custom implementation " +
1323"of kryo class resolver that avoids writing class names to the " +
1324"underlying stream for faster serialization.");
13251326/**1327 * Path where jmap exists1328 */1329StrConfOption JMAP_PATH = newStrConfOption("giraph.jmapPath", "jmap",
1330"Path to use for invoking jmap");
13311332/**1333 * Whether to fail the job or just warn when input is empty1334 */1335BooleanConfOption FAIL_ON_EMPTY_INPUT = newBooleanConfOption(
1336"giraph.failOnEmptyInput", true,
1337"Whether to fail the job or just warn when input is empty");
1338 }
1339// CHECKSTYLE: resume InterfaceIsTypeCheck