This project has retired. For details please refer to its
Attic page.
ImmutableClassesGiraphConfiguration xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.conf;
20
21 import io.netty.handler.codec.ByteToMessageDecoder;
22 import io.netty.handler.codec.MessageToByteEncoder;
23 import io.netty.handler.codec.compression.JdkZlibDecoder;
24 import io.netty.handler.codec.compression.JdkZlibEncoder;
25 import io.netty.handler.codec.compression.SnappyFramedDecoder;
26 import io.netty.handler.codec.compression.SnappyFramedEncoder;
27
28 import org.apache.giraph.aggregators.AggregatorWriter;
29 import org.apache.giraph.combiner.MessageCombiner;
30 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
31 import org.apache.giraph.edge.Edge;
32 import org.apache.giraph.edge.EdgeFactory;
33 import org.apache.giraph.edge.EdgeStoreFactory;
34 import org.apache.giraph.edge.OutEdges;
35 import org.apache.giraph.edge.ReusableEdge;
36 import org.apache.giraph.factories.ComputationFactory;
37 import org.apache.giraph.factories.EdgeValueFactory;
38 import org.apache.giraph.factories.MessageValueFactory;
39 import org.apache.giraph.factories.OutEdgesFactory;
40 import org.apache.giraph.factories.ValueFactories;
41 import org.apache.giraph.factories.VertexIdFactory;
42 import org.apache.giraph.factories.VertexValueFactory;
43 import org.apache.giraph.graph.Computation;
44 import org.apache.giraph.graph.Language;
45 import org.apache.giraph.graph.MapperObserver;
46 import org.apache.giraph.graph.Vertex;
47 import org.apache.giraph.graph.VertexResolver;
48 import org.apache.giraph.graph.VertexValueCombiner;
49 import org.apache.giraph.io.EdgeInputFormat;
50 import org.apache.giraph.io.EdgeOutputFormat;
51 import org.apache.giraph.io.MappingInputFormat;
52 import org.apache.giraph.io.VertexInputFormat;
53 import org.apache.giraph.io.VertexOutputFormat;
54 import org.apache.giraph.io.filters.EdgeInputFilter;
55 import org.apache.giraph.io.filters.VertexInputFilter;
56 import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
57 import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
58 import org.apache.giraph.io.internal.WrappedMappingInputFormat;
59 import org.apache.giraph.io.internal.WrappedVertexInputFormat;
60 import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
61 import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
62 import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput;
63 import org.apache.giraph.io.superstep_output.SuperstepOutput;
64 import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
65 import org.apache.giraph.job.GiraphJobObserver;
66 import org.apache.giraph.job.GiraphJobRetryChecker;
67 import org.apache.giraph.mapping.MappingStore;
68 import org.apache.giraph.mapping.MappingStoreOps;
69 import org.apache.giraph.mapping.translate.TranslateEdge;
70 import org.apache.giraph.master.MasterCompute;
71 import org.apache.giraph.master.MasterObserver;
72 import org.apache.giraph.master.SuperstepClasses;
73 import org.apache.giraph.partition.GraphPartitionerFactory;
74 import org.apache.giraph.partition.Partition;
75 import org.apache.giraph.utils.ExtendedByteArrayDataInput;
76 import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
77 import org.apache.giraph.utils.ExtendedDataInput;
78 import org.apache.giraph.utils.ExtendedDataOutput;
79 import org.apache.giraph.utils.GcObserver;
80 import org.apache.giraph.utils.ReflectionUtils;
81 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
82 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
83 import org.apache.giraph.utils.WritableUtils;
84 import org.apache.giraph.utils.io.BigDataInputOutput;
85 import org.apache.giraph.utils.io.DataInputOutput;
86 import org.apache.giraph.utils.io.ExtendedDataInputOutput;
87 import org.apache.giraph.worker.WorkerContext;
88 import org.apache.giraph.worker.WorkerObserver;
89 import org.apache.hadoop.conf.Configuration;
90 import org.apache.hadoop.io.NullWritable;
91 import org.apache.hadoop.io.Writable;
92 import org.apache.hadoop.io.WritableComparable;
93 import org.apache.hadoop.mapreduce.Mapper;
94 import org.apache.hadoop.util.Progressable;
95
96 import com.google.common.base.Preconditions;
97
98
99
100
101
102
103
104
105
106
107
108 @SuppressWarnings("unchecked")
109 public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
110 V extends Writable, E extends Writable> extends GiraphConfiguration {
111
112 private final GiraphClasses classes;
113
114 private Class<? extends Writable> mappingTargetClass = null;
115
116 private final ValueFactories<I, V, E> valueFactories;
117
118 private final OutEdgesFactory<I, E> outEdgesFactory;
119
120 private final OutEdgesFactory<I, E> inputOutEdgesFactory;
121
122 private final PerGraphTypeEnum<Language> valueLanguages;
123
124 private final PerGraphTypeBoolean valueNeedsWrappers;
125
126
127
128
129
130
131 private final boolean useUnsafeSerialization;
132
133
134
135
136 private final boolean useBigDataIOForMessages;
137
138 private final boolean isStaticGraph;
139
140 private final boolean useMessageSizeEncoding;
141
142
143
144
145
146
147
148 public ImmutableClassesGiraphConfiguration(Configuration conf) {
149 super(conf);
150 classes = new GiraphClasses<I, V, E>(conf);
151 useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
152 useBigDataIOForMessages = USE_BIG_DATA_IO_FOR_MESSAGES.get(this);
153 valueLanguages = PerGraphTypeEnum.readFromConf(
154 GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
155 valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
156 GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf);
157 isStaticGraph = GiraphConstants.STATIC_GRAPH.get(this);
158 valueFactories = new ValueFactories<I, V, E>(this);
159 outEdgesFactory = VERTEX_EDGES_FACTORY_CLASS.newInstance(this);
160 inputOutEdgesFactory = INPUT_VERTEX_EDGES_FACTORY_CLASS.newInstance(this);
161 useMessageSizeEncoding = USE_MESSAGE_SIZE_ENCODING.get(conf);
162 }
163
164
165
166
167
168
169 public void configureIfPossible(Object obj) {
170 if (obj instanceof GiraphConfigurationSettable) {
171 ((GiraphConfigurationSettable) obj).setConf(this);
172 }
173 }
174
175 public PerGraphTypeBoolean getValueNeedsWrappers() {
176 return valueNeedsWrappers;
177 }
178
179 public PerGraphTypeEnum<Language> getValueLanguages() {
180 return valueLanguages;
181 }
182
183
184
185
186
187
188 public Class<? extends TranslateEdge> edgeTranslationClass() {
189 return EDGE_TRANSLATION_CLASS.get(this);
190 }
191
192
193
194
195
196
197 public TranslateEdge<I, E> edgeTranslationInstance() {
198 if (edgeTranslationClass() != null) {
199 return ReflectionUtils.newInstance(edgeTranslationClass(), this);
200 }
201 return null;
202 }
203
204
205
206
207
208
209 public Class<? extends EdgeInputFilter<I, E>>
210 getEdgeInputFilterClass() {
211 return classes.getEdgeInputFilterClass();
212 }
213
214
215
216
217
218
219 public EdgeInputFilter getEdgeInputFilter() {
220 return ReflectionUtils.newInstance(getEdgeInputFilterClass(), this);
221 }
222
223
224
225
226
227
228 public Class<? extends VertexInputFilter<I, V, E>>
229 getVertexInputFilterClass() {
230 return classes.getVertexInputFilterClass();
231 }
232
233
234
235
236
237
238 public VertexInputFilter getVertexInputFilter() {
239 return ReflectionUtils.newInstance(getVertexInputFilterClass(), this);
240 }
241
242
243
244
245
246
247
248 public Class<? extends GraphPartitionerFactory<I, V, E>>
249 getGraphPartitionerClass() {
250 return classes.getGraphPartitionerFactoryClass();
251 }
252
253
254
255
256
257
258 public GraphPartitionerFactory<I, V, E> createGraphPartitioner() {
259 Class<? extends GraphPartitionerFactory<I, V, E>> klass =
260 classes.getGraphPartitionerFactoryClass();
261 return ReflectionUtils.newInstance(klass, this);
262 }
263
264 @Override
265 public boolean hasVertexInputFormat() {
266 return classes.hasVertexInputFormat();
267 }
268
269
270
271
272
273
274
275 public Class<? extends VertexInputFormat<I, V, E>>
276 getVertexInputFormatClass() {
277 return classes.getVertexInputFormatClass();
278 }
279
280
281
282
283
284
285
286
287 private VertexInputFormat<I, V, E> createVertexInputFormat() {
288 Class<? extends VertexInputFormat<I, V, E>> klass =
289 getVertexInputFormatClass();
290 return ReflectionUtils.newInstance(klass, this);
291 }
292
293
294
295
296
297
298
299
300 public WrappedVertexInputFormat<I, V, E> createWrappedVertexInputFormat() {
301 WrappedVertexInputFormat<I, V, E> wrappedVertexInputFormat =
302 new WrappedVertexInputFormat<I, V, E>(createVertexInputFormat());
303 configureIfPossible(wrappedVertexInputFormat);
304 return wrappedVertexInputFormat;
305 }
306
307 @Override
308 public void setVertexInputFormatClass(
309 Class<? extends VertexInputFormat> vertexInputFormatClass) {
310 super.setVertexInputFormatClass(vertexInputFormatClass);
311 classes.setVertexInputFormatClass(vertexInputFormatClass);
312 }
313
314 @Override
315 public boolean hasVertexOutputFormat() {
316 return classes.hasVertexOutputFormat();
317 }
318
319
320
321
322
323
324
325 public Class<? extends VertexOutputFormat<I, V, E>>
326 getVertexOutputFormatClass() {
327 return classes.getVertexOutputFormatClass();
328 }
329
330
331
332
333
334
335 public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
336 getMappingInputFormatClass() {
337 return classes.getMappingInputFormatClass();
338 }
339
340
341
342
343
344
345 @Override
346 public void setMappingInputFormatClass(
347 Class<? extends MappingInputFormat> mappingInputFormatClass) {
348 super.setMappingInputFormatClass(mappingInputFormatClass);
349 classes.setMappingInputFormatClass(mappingInputFormatClass);
350 }
351
352
353
354
355
356
357 public boolean hasMappingInputFormat() {
358 return classes.hasMappingInputFormat();
359 }
360
361
362
363
364
365
366
367
368 private VertexOutputFormat<I, V, E> createVertexOutputFormat() {
369 Class<? extends VertexOutputFormat<I, V, E>> klass =
370 getVertexOutputFormatClass();
371 return ReflectionUtils.newInstance(klass, this);
372 }
373
374
375
376
377
378
379
380
381 private MappingInputFormat<I, V, E, ? extends Writable>
382 createMappingInputFormat() {
383 Class<? extends MappingInputFormat<I, V, E, ? extends Writable>> klass =
384 getMappingInputFormatClass();
385 return ReflectionUtils.newInstance(klass, this);
386 }
387
388
389
390
391
392
393
394
395 public WrappedVertexOutputFormat<I, V, E> createWrappedVertexOutputFormat() {
396 WrappedVertexOutputFormat<I, V, E> wrappedVertexOutputFormat =
397 new WrappedVertexOutputFormat<I, V, E>(createVertexOutputFormat());
398 configureIfPossible(wrappedVertexOutputFormat);
399 return wrappedVertexOutputFormat;
400 }
401
402
403
404
405
406
407
408
409 public WrappedMappingInputFormat<I, V, E, ? extends Writable>
410 createWrappedMappingInputFormat() {
411 WrappedMappingInputFormat<I, V, E, ? extends Writable>
412 wrappedMappingInputFormat =
413 new WrappedMappingInputFormat<>(createMappingInputFormat());
414 configureIfPossible(wrappedMappingInputFormat);
415 return wrappedMappingInputFormat;
416 }
417
418 @Override
419 public boolean hasEdgeOutputFormat() {
420 return classes.hasEdgeOutputFormat();
421 }
422
423
424
425
426
427
428
429 public Class<? extends EdgeOutputFormat<I, V, E>>
430 getEdgeOutputFormatClass() {
431 return classes.getEdgeOutputFormatClass();
432 }
433
434
435
436
437
438
439
440
441 private EdgeOutputFormat<I, V, E> createEdgeOutputFormat() {
442 Class<? extends EdgeOutputFormat<I, V, E>> klass =
443 getEdgeOutputFormatClass();
444 return ReflectionUtils.newInstance(klass, this);
445 }
446
447
448
449
450
451
452
453
454 public WrappedEdgeOutputFormat<I, V, E> createWrappedEdgeOutputFormat() {
455 WrappedEdgeOutputFormat<I, V, E> wrappedEdgeOutputFormat =
456 new WrappedEdgeOutputFormat<I, V, E>(createEdgeOutputFormat());
457 configureIfPossible(wrappedEdgeOutputFormat);
458 return wrappedEdgeOutputFormat;
459 }
460
461
462
463
464
465
466
467 public SuperstepOutput<I, V, E> createSuperstepOutput(
468 Mapper<?, ?, ?, ?>.Context context) {
469 if (doOutputDuringComputation()) {
470 if (vertexOutputFormatThreadSafe()) {
471 return new MultiThreadedSuperstepOutput<I, V, E>(this, context);
472 } else {
473 return new SynchronizedSuperstepOutput<I, V, E>(this, context);
474 }
475 } else {
476 return new NoOpSuperstepOutput<I, V, E>();
477 }
478 }
479
480 @Override
481 public boolean hasEdgeInputFormat() {
482 return classes.hasEdgeInputFormat();
483 }
484
485
486
487
488
489
490
491 public Class<? extends EdgeInputFormat<I, E>> getEdgeInputFormatClass() {
492 return classes.getEdgeInputFormatClass();
493 }
494
495
496
497
498
499
500
501
502 private EdgeInputFormat<I, E> createEdgeInputFormat() {
503 Class<? extends EdgeInputFormat<I, E>> klass = getEdgeInputFormatClass();
504 return ReflectionUtils.newInstance(klass, this);
505 }
506
507
508
509
510
511
512
513
514 public WrappedEdgeInputFormat<I, E> createWrappedEdgeInputFormat() {
515 WrappedEdgeInputFormat<I, E> wrappedEdgeInputFormat =
516 new WrappedEdgeInputFormat<I, E>(createEdgeInputFormat());
517 configureIfPossible(wrappedEdgeInputFormat);
518 return wrappedEdgeInputFormat;
519 }
520
521 @Override
522 public void setEdgeInputFormatClass(
523 Class<? extends EdgeInputFormat> edgeInputFormatClass) {
524 super.setEdgeInputFormatClass(edgeInputFormatClass);
525 classes.setEdgeInputFormatClass(edgeInputFormatClass);
526 }
527
528
529
530
531
532
533 public Class<? extends AggregatorWriter> getAggregatorWriterClass() {
534 return classes.getAggregatorWriterClass();
535 }
536
537
538
539
540
541
542 public AggregatorWriter createAggregatorWriter() {
543 return ReflectionUtils.newInstance(getAggregatorWriterClass(), this);
544 }
545
546
547
548
549
550
551
552 public Class<? extends VertexValueCombiner<V>>
553 getVertexValueCombinerClass() {
554 return classes.getVertexValueCombinerClass();
555 }
556
557
558
559
560
561
562 @SuppressWarnings("rawtypes")
563 public VertexValueCombiner<V> createVertexValueCombiner() {
564 return ReflectionUtils.newInstance(getVertexValueCombinerClass(), this);
565 }
566
567
568
569
570
571
572 public Class<? extends VertexResolver<I, V, E>> getVertexResolverClass() {
573 return classes.getVertexResolverClass();
574 }
575
576
577
578
579
580
581 public VertexResolver<I, V, E> createVertexResolver() {
582 return ReflectionUtils.newInstance(getVertexResolverClass(), this);
583 }
584
585
586
587
588
589
590 public Class<? extends WorkerContext> getWorkerContextClass() {
591 return classes.getWorkerContextClass();
592 }
593
594
595
596
597
598
599 public WorkerContext createWorkerContext() {
600 return ReflectionUtils.newInstance(getWorkerContextClass(), this);
601 }
602
603
604
605
606
607
608 public Class<? extends MasterCompute> getMasterComputeClass() {
609 return classes.getMasterComputeClass();
610 }
611
612
613
614
615
616
617 public MasterCompute createMasterCompute() {
618 return ReflectionUtils.newInstance(getMasterComputeClass(), this);
619 }
620
621 @Override
622 public Class<? extends Computation<I, V, E,
623 ? extends Writable, ? extends Writable>>
624 getComputationClass() {
625 return classes.getComputationClass();
626 }
627
628
629
630
631
632
633 @Override
634 public Class<? extends ComputationFactory<I, V, E,
635 ? extends Writable, ? extends Writable>>
636 getComputationFactoryClass() {
637 return classes.getComputationFactoryClass();
638 }
639
640
641
642
643
644
645 public ComputationFactory<I, V, E, ? extends Writable, ? extends Writable>
646 createComputationFactory() {
647 return ReflectionUtils.newInstance(getComputationFactoryClass(), this);
648 }
649
650
651
652
653
654
655 public Computation<I, V, E, ? extends Writable, ? extends Writable>
656 createComputation() {
657 return createComputationFactory().createComputation(this);
658 }
659
660
661
662
663
664
665 public GiraphTypes<I, V, E> getGiraphTypes() {
666 return classes.getGiraphTypes();
667 }
668
669
670
671
672
673
674 public Vertex<I, V, E> createVertex() {
675 Class vertexClass = classes.getVertexClass();
676 return (Vertex<I, V, E>) ReflectionUtils.newInstance(vertexClass, this);
677 }
678
679
680
681
682
683
684
685 public Class<I> getVertexIdClass() {
686 return classes.getVertexIdClass();
687 }
688
689
690
691
692
693
694 public VertexIdFactory<I> getVertexIdFactory() {
695 return valueFactories.getVertexIdFactory();
696 }
697
698
699
700
701
702
703 public I createVertexId() {
704 return getVertexIdFactory().newInstance();
705 }
706
707
708
709
710
711
712 public Class<V> getVertexValueClass() {
713 return classes.getVertexValueClass();
714 }
715
716
717
718
719
720
721 public VertexValueFactory<V> getVertexValueFactory() {
722 return valueFactories.getVertexValueFactory();
723 }
724
725
726
727
728
729
730 @SuppressWarnings("unchecked")
731 public V createVertexValue() {
732 return getVertexValueFactory().newInstance();
733 }
734
735
736
737
738
739
740 public Class<? extends VertexValueFactory<V>> getVertexValueFactoryClass() {
741 return (Class<? extends VertexValueFactory<V>>)
742 valueFactories.getVertexValueFactory().getClass();
743 }
744
745
746
747
748
749
750
751 public MasterObserver[] createMasterObservers(
752 Mapper<?, ?, ?, ?>.Context context) {
753 Class<? extends MasterObserver>[] klasses = getMasterObserverClasses();
754 MasterObserver[] objects = new MasterObserver[klasses.length];
755 for (int i = 0; i < klasses.length; ++i) {
756 objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
757 }
758 return objects;
759 }
760
761
762
763
764
765
766
767 public WorkerObserver[] createWorkerObservers(
768 Mapper<?, ?, ?, ?>.Context context) {
769 Class<? extends WorkerObserver>[] klasses = getWorkerObserverClasses();
770 WorkerObserver[] objects = new WorkerObserver[klasses.length];
771 for (int i = 0; i < klasses.length; ++i) {
772 objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
773 }
774 return objects;
775 }
776
777
778
779
780
781
782
783 public MapperObserver[] createMapperObservers(
784 Mapper<?, ?, ?, ?>.Context context) {
785 Class<? extends MapperObserver>[] klasses = getMapperObserverClasses();
786 MapperObserver[] objects = new MapperObserver[klasses.length];
787 for (int i = 0; i < klasses.length; ++i) {
788 objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
789 }
790 return objects;
791 }
792
793
794
795
796
797
798
799 public GcObserver[] createGcObservers(
800 Mapper<?, ?, ?, ?>.Context context) {
801 Class<? extends GcObserver>[] klasses = getGcObserverClasses();
802 GcObserver[] objects = new GcObserver[klasses.length];
803 for (int i = 0; i < klasses.length; ++i) {
804 objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
805 }
806 return objects;
807 }
808
809
810
811
812
813
814 public GiraphJobObserver getJobObserver() {
815 return ReflectionUtils.newInstance(getJobObserverClass(), this);
816 }
817
818
819
820
821
822
823 public GiraphJobRetryChecker getJobRetryChecker() {
824 return ReflectionUtils.newInstance(getJobRetryCheckerClass(), this);
825 }
826
827
828
829
830
831
832 public Class<E> getEdgeValueClass() {
833 return classes.getEdgeValueClass();
834 }
835
836
837
838
839
840
841 public boolean isEdgeValueNullWritable() {
842 return getEdgeValueClass() == NullWritable.class;
843 }
844
845
846
847
848
849
850 public EdgeValueFactory<E> getEdgeValueFactory() {
851 return valueFactories.getEdgeValueFactory();
852 }
853
854
855
856
857
858
859 public E createEdgeValue() {
860 return getEdgeValueFactory().newInstance();
861 }
862
863
864
865
866
867
868 public Edge<I, E> createEdge() {
869 if (isEdgeValueNullWritable()) {
870 return (Edge<I, E>) EdgeFactory.create(createVertexId());
871 } else {
872 return EdgeFactory.create(createVertexId(), createEdgeValue());
873 }
874 }
875
876
877
878
879
880
881
882
883 public Edge<I, E> createEdge(TranslateEdge<I, E>
884 translateEdge, Edge<I, E> edge) {
885 I translatedId = translateEdge.translateId(edge.getTargetVertexId());
886 if (isEdgeValueNullWritable()) {
887 return (Edge<I, E>) EdgeFactory.create(translatedId);
888 } else {
889 return EdgeFactory.create(translatedId,
890 translateEdge.cloneValue(edge.getValue()));
891 }
892 }
893
894
895
896
897
898
899 public ReusableEdge<I, E> createReusableEdge() {
900 if (isEdgeValueNullWritable()) {
901 return (ReusableEdge<I, E>) EdgeFactory.createReusable(createVertexId());
902 } else {
903 return EdgeFactory.createReusable(createVertexId(), createEdgeValue());
904 }
905 }
906
907
908
909
910
911
912 public EdgeStoreFactory<I, V, E> createEdgeStoreFactory() {
913 Class<? extends EdgeStoreFactory> edgeStoreFactoryClass =
914 EDGE_STORE_FACTORY_CLASS.get(this);
915 return ReflectionUtils.newInstance(edgeStoreFactoryClass);
916 }
917
918
919
920
921
922
923
924 public <M extends Writable> Class<M> getIncomingMessageValueClass() {
925 return classes.getIncomingMessageClasses().getMessageClass();
926 }
927
928
929
930
931
932
933
934 public <M extends Writable> Class<M> getOutgoingMessageValueClass() {
935 return classes.getOutgoingMessageClasses().getMessageClass();
936 }
937
938
939
940
941
942
943 public <M extends Writable>
944 MessageClasses<I, M> getIncomingMessageClasses() {
945 return classes.getIncomingMessageClasses();
946 }
947
948
949
950
951
952
953 public <M extends Writable>
954 MessageClasses<I, M> getOutgoingMessageClasses() {
955 return classes.getOutgoingMessageClasses();
956 }
957
958
959
960
961
962
963 public <M extends Writable>
964 MessageValueFactory<M> createOutgoingMessageValueFactory() {
965 return classes.getOutgoingMessageClasses().createMessageValueFactory(this);
966 }
967
968
969
970
971
972
973 public <M extends Writable>
974 MessageValueFactory<M> createIncomingMessageValueFactory() {
975 return classes.getIncomingMessageClasses().createMessageValueFactory(this);
976 }
977
978 @Override
979 public void setMessageCombinerClass(
980 Class<? extends MessageCombiner> messageCombinerClass) {
981 throw new IllegalArgumentException(
982 "Cannot set message combiner on ImmutableClassesGiraphConfigurable");
983 }
984
985
986
987
988
989
990
991 public <M extends Writable> MessageCombiner<? super I, M>
992 createOutgoingMessageCombiner() {
993 return classes.getOutgoingMessageClasses().createMessageCombiner(this);
994 }
995
996
997
998
999
1000
1001 public boolean useOutgoingMessageCombiner() {
1002 return classes.getOutgoingMessageClasses().useMessageCombiner();
1003 }
1004
1005
1006
1007
1008
1009 public MessageEncodeAndStoreType getOutgoingMessageEncodeAndStoreType() {
1010 return classes.getOutgoingMessageClasses().getMessageEncodeAndStoreType();
1011 }
1012
1013 @Override
1014 public Class<? extends OutEdges<I, E>> getOutEdgesClass() {
1015 return classes.getOutEdgesClass();
1016 }
1017
1018
1019
1020
1021
1022
1023
1024 public Class<? extends OutEdges<I, E>> getInputOutEdgesClass() {
1025 return classes.getInputOutEdgesClass();
1026 }
1027
1028
1029
1030
1031
1032
1033
1034
1035 public boolean useInputOutEdges() {
1036 return classes.getInputOutEdgesClass() != classes.getOutEdgesClass();
1037 }
1038
1039
1040
1041
1042
1043
1044 public Class<? extends MappingStore> getMappingStoreClass() {
1045 return MAPPING_STORE_CLASS.get(this);
1046 }
1047
1048
1049
1050
1051
1052
1053 public MappingStore<I, ? extends Writable> createMappingStore() {
1054 if (getMappingStoreClass() != null) {
1055 return ReflectionUtils.newInstance(getMappingStoreClass(), this);
1056 } else {
1057 return null;
1058 }
1059 }
1060
1061
1062
1063
1064
1065
1066 public Class<? extends MappingStoreOps> getMappingStoreOpsClass() {
1067 return MAPPING_STORE_OPS_CLASS.get(this);
1068 }
1069
1070
1071
1072
1073
1074
1075 public MappingStoreOps<I, ? extends Writable> createMappingStoreOps() {
1076 if (getMappingStoreOpsClass() != null) {
1077 return ReflectionUtils.newInstance(getMappingStoreOpsClass(), this);
1078 } else {
1079 return null;
1080 }
1081 }
1082
1083
1084
1085
1086
1087
1088 public Class<? extends Writable> getMappingTargetClass() {
1089 if (mappingTargetClass == null) {
1090 Class<?>[] classList = ReflectionUtils.getTypeArguments(
1091 MappingStore.class, getMappingStoreClass());
1092 Preconditions.checkArgument(classList.length == 2);
1093 mappingTargetClass = (Class<? extends Writable>) classList[1];
1094 }
1095 return mappingTargetClass;
1096 }
1097
1098
1099
1100
1101
1102
1103 public Writable createMappingTarget() {
1104 return WritableUtils.createWritable(getMappingTargetClass());
1105 }
1106
1107
1108
1109
1110
1111
1112 public OutEdges<I, E> createOutEdges() {
1113 return outEdgesFactory.newInstance();
1114 }
1115
1116
1117
1118
1119
1120
1121
1122 public OutEdges<I, E> createAndInitializeOutEdges() {
1123 OutEdges<I, E> outEdges = createOutEdges();
1124 outEdges.initialize();
1125 return outEdges;
1126 }
1127
1128
1129
1130
1131
1132
1133
1134
1135 public OutEdges<I, E> createAndInitializeOutEdges(int capacity) {
1136 OutEdges<I, E> outEdges = createOutEdges();
1137 outEdges.initialize(capacity);
1138 return outEdges;
1139 }
1140
1141
1142
1143
1144
1145
1146
1147
1148 public OutEdges<I, E> createAndInitializeOutEdges(
1149 Iterable<Edge<I, E>> edges) {
1150 OutEdges<I, E> outEdges = createOutEdges();
1151 outEdges.initialize(edges);
1152 return outEdges;
1153 }
1154
1155
1156
1157
1158
1159
1160
1161 public OutEdges<I, E> createInputOutEdges() {
1162 return inputOutEdgesFactory.newInstance();
1163 }
1164
1165
1166
1167
1168
1169
1170
1171 public OutEdges<I, E> createAndInitializeInputOutEdges() {
1172 OutEdges<I, E> outEdges = createInputOutEdges();
1173 outEdges.initialize();
1174 return outEdges;
1175 }
1176
1177
1178
1179
1180
1181
1182
1183
1184 public Partition<I, V, E> createPartition(
1185 int id, Progressable progressable) {
1186 Class<? extends Partition<I, V, E>> klass = classes.getPartitionClass();
1187 Partition<I, V, E> partition = ReflectionUtils.newInstance(klass, this);
1188 partition.initialize(id, progressable);
1189 return partition;
1190 }
1191
1192
1193
1194
1195
1196
1197 public boolean useUnsafeSerialization() {
1198 return useUnsafeSerialization;
1199 }
1200
1201
1202
1203
1204
1205
1206 public DataInputOutput createMessagesInputOutput() {
1207 if (useBigDataIOForMessages) {
1208 return new BigDataInputOutput(this);
1209 } else {
1210 return new ExtendedDataInputOutput(this);
1211 }
1212 }
1213
1214
1215
1216
1217
1218
1219 public ExtendedDataOutput createExtendedDataOutput() {
1220 if (useUnsafeSerialization) {
1221 return new UnsafeByteArrayOutputStream();
1222 } else {
1223 return new ExtendedByteArrayDataOutput();
1224 }
1225 }
1226
1227
1228
1229
1230
1231
1232
1233 public ExtendedDataOutput createExtendedDataOutput(int expectedSize) {
1234 if (useUnsafeSerialization) {
1235 return new UnsafeByteArrayOutputStream(expectedSize);
1236 } else {
1237 return new ExtendedByteArrayDataOutput(expectedSize);
1238 }
1239 }
1240
1241
1242
1243
1244
1245
1246
1247
1248 public ExtendedDataOutput createExtendedDataOutput(byte[] buf,
1249 int pos) {
1250 if (useUnsafeSerialization) {
1251 return new UnsafeByteArrayOutputStream(buf, pos);
1252 } else {
1253 return new ExtendedByteArrayDataOutput(buf, pos);
1254 }
1255 }
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265 public ExtendedDataInput createExtendedDataInput(
1266 byte[] buf, int off, int length) {
1267 if (useUnsafeSerialization) {
1268 return new UnsafeByteArrayInputStream(buf, off, length);
1269 } else {
1270 return new ExtendedByteArrayDataInput(buf, off, length);
1271 }
1272 }
1273
1274
1275
1276
1277
1278
1279
1280 public ExtendedDataInput createExtendedDataInput(byte[] buf) {
1281 if (useUnsafeSerialization) {
1282 return new UnsafeByteArrayInputStream(buf);
1283 } else {
1284 return new ExtendedByteArrayDataInput(buf);
1285 }
1286 }
1287
1288
1289
1290
1291
1292
1293
1294 public ExtendedDataInput createExtendedDataInput(
1295 ExtendedDataOutput extendedDataOutput) {
1296 return createExtendedDataInput(extendedDataOutput.getByteArray(), 0,
1297 extendedDataOutput.getPos());
1298 }
1299
1300
1301
1302
1303
1304
1305 public boolean getUseUnsafeSerialization() {
1306 return useUnsafeSerialization;
1307 }
1308
1309
1310
1311
1312
1313
1314 public void updateSuperstepClasses(SuperstepClasses superstepClasses) {
1315 superstepClasses.updateGiraphClasses(classes);
1316 }
1317
1318
1319
1320
1321
1322
1323 public boolean doCompression() {
1324 switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1325 case "SNAPPY":
1326 return true;
1327 case "INFLATE":
1328 return true;
1329 default:
1330 return false;
1331 }
1332 }
1333
1334
1335
1336
1337
1338
1339 public MessageToByteEncoder getNettyCompressionEncoder() {
1340 switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1341 case "SNAPPY":
1342 return new SnappyFramedEncoder();
1343 case "INFLATE":
1344 return new JdkZlibEncoder();
1345 default:
1346 return null;
1347 }
1348 }
1349
1350
1351
1352
1353
1354
1355 public ByteToMessageDecoder getNettyCompressionDecoder() {
1356 switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
1357 case "SNAPPY":
1358 return new SnappyFramedDecoder();
1359 case "INFLATE":
1360 return new JdkZlibDecoder();
1361 default:
1362 return null;
1363 }
1364 }
1365
1366
1367
1368
1369
1370
1371 public boolean isStaticGraph() {
1372 return isStaticGraph;
1373 }
1374
1375
1376
1377
1378 public String getJobId() {
1379 return get("mapred.job.id", "UnknownJob");
1380 }
1381
1382
1383
1384
1385
1386
1387
1388 public boolean useMessageSizeEncoding() {
1389 return useMessageSizeEncoding;
1390 }
1391 }