This project has retired. For details please refer to its
Attic page.
AbstractBlockFactory xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework;
19
20 import java.util.List;
21
22 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
23 import org.apache.giraph.conf.BulkConfigurator;
24 import org.apache.giraph.conf.GiraphConfiguration;
25 import org.apache.giraph.conf.GiraphConstants;
26 import org.apache.giraph.conf.StrConfOption;
27 import org.apache.giraph.edge.IdAndNullArrayEdges;
28 import org.apache.giraph.edge.IdAndValueArrayEdges;
29 import org.apache.giraph.edge.LongDiffNullArrayEdges;
30 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
31 import org.apache.giraph.types.ops.TypeOps;
32 import org.apache.giraph.types.ops.TypeOpsUtils;
33 import org.apache.giraph.utils.ReflectionUtils;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.io.LongWritable;
36 import org.apache.hadoop.io.NullWritable;
37 import org.apache.hadoop.io.Writable;
38 import org.apache.hadoop.io.WritableComparable;
39
40
41
42
43
44
45
46
47 public abstract class AbstractBlockFactory<S> implements BlockFactory<S> {
48
49
50
51
52 public static final StrConfOption CONFIGURATORS = new StrConfOption(
53 "digraph.block_factory_configurators", null, "");
54
55 @Override
56 public List<String> getGcJavaOpts(Configuration conf) {
57 return null;
58 }
59
60 @Override
61 public final void initConfig(GiraphConfiguration conf) {
62 initConfigurators(conf);
63 GiraphConstants.VERTEX_ID_CLASS.setIfUnset(conf, getVertexIDClass(conf));
64 GiraphConstants.VERTEX_VALUE_CLASS.setIfUnset(
65 conf, getVertexValueClass(conf));
66 GiraphConstants.EDGE_VALUE_CLASS.setIfUnset(conf, getEdgeValueClass(conf));
67 GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.setIfUnset(
68 conf, shouldCreateVertexOnMsgs(conf));
69 if (shouldSendOneMessageToAll(conf)) {
70 GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.setIfUnset(
71 conf, MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION);
72 }
73
74 BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.setIfUnset(
75 conf, getWorkerContextValueClass(conf));
76
77
78 if (!GiraphConstants.VERTEX_EDGES_CLASS.contains(conf)) {
79 @SuppressWarnings("rawtypes")
80 Class<? extends WritableComparable> vertexIDClass =
81 GiraphConstants.VERTEX_ID_CLASS.get(conf);
82 Class<? extends Writable> edgeValueClass =
83 GiraphConstants.EDGE_VALUE_CLASS.get(conf);
84
85
86 @SuppressWarnings("rawtypes")
87 PrimitiveIdTypeOps<? extends WritableComparable> idTypeOps =
88 TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIDClass);
89 if (edgeValueClass.equals(NullWritable.class)) {
90 if (vertexIDClass.equals(LongWritable.class)) {
91 GiraphConstants.VERTEX_EDGES_CLASS.set(
92 conf, LongDiffNullArrayEdges.class);
93 } else if (idTypeOps != null) {
94 GiraphConstants.VERTEX_EDGES_CLASS.set(
95 conf, IdAndNullArrayEdges.class);
96 }
97 } else {
98 TypeOps<?> edgeValueTypeOps =
99 TypeOpsUtils.getTypeOpsOrNull(edgeValueClass);
100 if (edgeValueTypeOps != null && idTypeOps != null) {
101 GiraphConstants.VERTEX_EDGES_CLASS.set(
102 conf, IdAndValueArrayEdges.class);
103 }
104 }
105 }
106
107 additionalInitConfig(conf);
108 }
109
110 @Override
111 public void registerOutputs(GiraphConfiguration conf) {
112 }
113
114 private void initConfigurators(GiraphConfiguration conf) {
115 String configurators = CONFIGURATORS.get(conf);
116 if (configurators != null) {
117 String[] split = configurators.split(",");
118 for (String configurator : split) {
119 runConfigurator(conf, configurator);
120 }
121 }
122 }
123
124 private void runConfigurator(GiraphConfiguration conf, String configurator) {
125 String[] packages = getConvenienceConfiguratorPackages();
126 String[] prefixes = new String[packages.length + 1];
127 prefixes[0] = "";
128 for (int i = 0; i < packages.length; i++) {
129 prefixes[i + 1] = packages[i] + ".";
130 }
131
132 for (String prefix : prefixes) {
133 try {
134 @SuppressWarnings({ "unchecked", "rawtypes" })
135 Class<BulkConfigurator> confClass =
136 (Class) Class.forName(prefix + configurator);
137 BulkConfigurator c = ReflectionUtils.newInstance(confClass);
138 c.configure(conf);
139 return;
140
141
142 } catch (ClassNotFoundException e) {
143 }
144
145 }
146 throw new IllegalStateException(
147 "Configurator " + configurator + " not found");
148 }
149
150
151
152
153
154 protected void additionalInitConfig(GiraphConfiguration conf) {
155 }
156
157
158
159
160 @SuppressWarnings("rawtypes")
161 protected abstract Class<? extends WritableComparable> getVertexIDClass(
162 GiraphConfiguration conf);
163
164
165
166
167 protected abstract Class<? extends Writable> getVertexValueClass(
168 GiraphConfiguration conf);
169
170
171
172
173 protected abstract Class<? extends Writable> getEdgeValueClass(
174 GiraphConfiguration conf);
175
176
177
178
179 protected Class<?> getWorkerContextValueClass(GiraphConfiguration conf) {
180 return Object.class;
181 }
182
183
184
185
186
187 protected boolean shouldCreateVertexOnMsgs(GiraphConfiguration conf) {
188 return true;
189 }
190
191
192 protected boolean shouldSendOneMessageToAll(GiraphConfiguration conf) {
193 return false;
194 }
195
196
197
198
199
200
201 protected String[] getConvenienceConfiguratorPackages() {
202 return new String[] { };
203 }
204 }