This project has retired. For details please refer to its
Attic page.
ServerData xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm;
20
21 import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
22
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.ConcurrentMap;
28
29 import org.apache.giraph.bsp.CentralizedServiceWorker;
30 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
31 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
32 import org.apache.giraph.comm.messages.MessageStore;
33 import org.apache.giraph.comm.messages.MessageStoreFactory;
34 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
35 import org.apache.giraph.conf.GiraphConstants;
36 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
37 import org.apache.giraph.edge.EdgeStore;
38 import org.apache.giraph.edge.EdgeStoreFactory;
39 import org.apache.giraph.graph.Vertex;
40 import org.apache.giraph.graph.VertexMutations;
41 import org.apache.giraph.graph.VertexResolver;
42 import org.apache.giraph.ooc.OutOfCoreEngine;
43 import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
44 import org.apache.giraph.ooc.data.DiskBackedMessageStore;
45 import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
46 import org.apache.giraph.partition.Partition;
47 import org.apache.giraph.partition.PartitionStore;
48 import org.apache.giraph.partition.SimplePartitionStore;
49 import org.apache.giraph.utils.ReflectionUtils;
50 import org.apache.hadoop.io.Writable;
51 import org.apache.hadoop.io.WritableComparable;
52 import org.apache.hadoop.mapreduce.Mapper;
53 import org.apache.log4j.Logger;
54
55 import com.google.common.collect.Iterables;
56 import com.google.common.collect.Maps;
57
58
59
60
61
62
63
64
65 @SuppressWarnings("rawtypes")
66 public class ServerData<I extends WritableComparable,
67 V extends Writable, E extends Writable> {
68
69 private static final Logger LOG = Logger.getLogger(ServerData.class);
70
71 private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
72
73 private volatile PartitionStore<I, V, E> partitionStore;
74
75 private final EdgeStore<I, V, E> edgeStore;
76
77 private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
78 messageStoreFactory;
79
80
81
82
83 private volatile MessageStore<I, Writable> incomingMessageStore;
84
85
86
87
88 private volatile MessageStore<I, Writable> currentMessageStore;
89
90
91
92
93
94
95 private ConcurrentMap<Integer,
96 ConcurrentMap<I, VertexMutations<I, V, E>>>
97 oldPartitionMutations = Maps.newConcurrentMap();
98
99
100
101
102
103
104 private ConcurrentMap<Integer,
105 ConcurrentMap<I, VertexMutations<I, V, E>>>
106 partitionMutations = Maps.newConcurrentMap();
107
108
109
110 private final OwnerAggregatorServerData ownerAggregatorData;
111
112
113
114 private final AllAggregatorServerData allAggregatorData;
115
116 private final CentralizedServiceWorker<I, V, E> serviceWorker;
117
118
119 private volatile List<Writable> currentWorkerToWorkerMessages =
120 Collections.synchronizedList(new ArrayList<Writable>());
121
122 private volatile List<Writable> incomingWorkerToWorkerMessages =
123 Collections.synchronizedList(new ArrayList<Writable>());
124
125
126 private final Mapper<?, ?, ?, ?>.Context context;
127
128 private final OutOfCoreEngine oocEngine;
129
130
131
132
133
134
135
136
137
138 public ServerData(
139 CentralizedServiceWorker<I, V, E> service,
140 WorkerServer workerServer,
141 ImmutableClassesGiraphConfiguration<I, V, E> conf,
142 Mapper<?, ?, ?, ?>.Context context) {
143 this.serviceWorker = service;
144 this.conf = conf;
145 this.messageStoreFactory = createMessageStoreFactory();
146 EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
147 edgeStoreFactory.initialize(service, conf, context);
148 EdgeStore<I, V, E> inMemoryEdgeStore = edgeStoreFactory.newStore();
149 PartitionStore<I, V, E> inMemoryPartitionStore =
150 new SimplePartitionStore<I, V, E>(conf, context);
151 if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
152 oocEngine = new OutOfCoreEngine(conf, service, workerServer);
153 partitionStore =
154 new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
155 conf, context, oocEngine);
156 edgeStore =
157 new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine);
158 } else {
159 partitionStore = inMemoryPartitionStore;
160 edgeStore = inMemoryEdgeStore;
161 oocEngine = null;
162 }
163 ownerAggregatorData = new OwnerAggregatorServerData(context);
164 allAggregatorData = new AllAggregatorServerData(context, conf);
165 this.context = context;
166 }
167
168
169
170
171
172
173
174 private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
175 createMessageStoreFactory() {
176 Class<? extends MessageStoreFactory> messageStoreFactoryClass =
177 MESSAGE_STORE_FACTORY_CLASS.get(conf);
178
179 MessageStoreFactory messageStoreFactoryInstance =
180 ReflectionUtils.newInstance(messageStoreFactoryClass);
181 messageStoreFactoryInstance.initialize(serviceWorker, conf);
182
183 return messageStoreFactoryInstance;
184 }
185
186
187
188
189
190
191 public OutOfCoreEngine getOocEngine() {
192 return oocEngine;
193 }
194
195
196
197
198
199
200 public EdgeStore<I, V, E> getEdgeStore() {
201 return edgeStore;
202 }
203
204
205
206
207
208
209 public PartitionStore<I, V, E> getPartitionStore() {
210 return partitionStore;
211 }
212
213
214
215
216
217
218
219
220 public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
221 return (MessageStore<I, M>) incomingMessageStore;
222 }
223
224
225
226
227
228
229
230
231 public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
232 return (MessageStore<I, M>) currentMessageStore;
233 }
234
235
236
237
238
239 public void resetMessageStores() {
240 if (currentMessageStore != null) {
241 currentMessageStore.clearAll();
242 currentMessageStore = null;
243 }
244 if (incomingMessageStore != null) {
245 incomingMessageStore.clearAll();
246 incomingMessageStore = null;
247 }
248 prepareSuperstep();
249 }
250
251
252 public void prepareSuperstep() {
253 if (currentMessageStore != null) {
254 currentMessageStore.clearAll();
255 }
256
257 MessageStore<I, Writable> nextCurrentMessageStore;
258 MessageStore<I, Writable> nextIncomingMessageStore;
259 MessageStore<I, Writable> messageStore;
260
261
262
263
264 if (incomingMessageStore != null) {
265 nextCurrentMessageStore = incomingMessageStore;
266 } else {
267 messageStore = messageStoreFactory.newStore(
268 conf.getIncomingMessageClasses());
269 if (oocEngine == null) {
270 nextCurrentMessageStore = messageStore;
271 } else {
272 nextCurrentMessageStore = new DiskBackedMessageStore<>(
273 conf, oocEngine, messageStore,
274 conf.getIncomingMessageClasses().useMessageCombiner(),
275 serviceWorker.getSuperstep());
276 }
277 }
278
279 messageStore = messageStoreFactory.newStore(
280 conf.getOutgoingMessageClasses());
281 if (oocEngine == null) {
282 nextIncomingMessageStore = messageStore;
283 } else {
284 nextIncomingMessageStore = new DiskBackedMessageStore<>(
285 conf, oocEngine, messageStore,
286 conf.getOutgoingMessageClasses().useMessageCombiner(),
287 serviceWorker.getSuperstep() + 1);
288 }
289
290
291
292
293 if (oocEngine != null) {
294 oocEngine.getSuperstepLock().writeLock().lock();
295 }
296 currentMessageStore = nextCurrentMessageStore;
297 incomingMessageStore = nextIncomingMessageStore;
298 if (oocEngine != null) {
299 oocEngine.reset();
300 oocEngine.getSuperstepLock().writeLock().unlock();
301 }
302 currentMessageStore.finalizeStore();
303
304 currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
305 incomingWorkerToWorkerMessages =
306 Collections.synchronizedList(new ArrayList<Writable>());
307 }
308
309
310
311
312
313
314 public ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
315 getPartitionMutations() {
316 return partitionMutations;
317 }
318
319
320
321
322
323
324 public OwnerAggregatorServerData getOwnerAggregatorData() {
325 return ownerAggregatorData;
326 }
327
328
329
330
331
332
333 public AllAggregatorServerData getAllAggregatorData() {
334 return allAggregatorData;
335 }
336
337
338
339
340
341
342 public CentralizedServiceWorker<I, V, E> getServiceWorker() {
343 return this.serviceWorker;
344 }
345
346
347
348
349
350
351
352 public List<Writable> getAndClearCurrentWorkerToWorkerMessages() {
353 List<Writable> ret = currentWorkerToWorkerMessages;
354 currentWorkerToWorkerMessages = null;
355 return ret;
356 }
357
358
359
360
361
362
363 public void addIncomingWorkerToWorkerMessage(Writable message) {
364 incomingWorkerToWorkerMessages.add(message);
365 }
366
367
368
369
370
371
372 public List<Writable> getCurrentWorkerToWorkerMessages() {
373 return currentWorkerToWorkerMessages;
374 }
375
376
377
378
379 public void prepareResolveMutations() {
380 oldPartitionMutations = partitionMutations;
381 partitionMutations = Maps.newConcurrentMap();
382 }
383
384
385
386
387
388
389 public void resolvePartitionMutation(Partition<I, V, E> partition) {
390 Integer partitionId = partition.getId();
391 VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
392 ConcurrentMap<I, VertexMutations<I, V, E>> prevPartitionMutations =
393 oldPartitionMutations.get(partitionId);
394
395 boolean ignoreExistingVertices =
396 conf.getIncomingMessageClasses().ignoreExistingVertices();
397
398
399 if (prevPartitionMutations != null) {
400 for (Map.Entry<I, VertexMutations<I, V, E>> entry : prevPartitionMutations
401 .entrySet()) {
402 I vertexId = entry.getKey();
403 Vertex<I, V, E> originalVertex = partition.getVertex(vertexId);
404 VertexMutations<I, V, E> vertexMutations = entry.getValue();
405 Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId,
406 originalVertex, vertexMutations,
407 !ignoreExistingVertices &&
408 getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
409
410 if (LOG.isDebugEnabled()) {
411 LOG.debug("resolvePartitionMutations: Resolved vertex index " +
412 vertexId + " in partition index " + partitionId +
413 " with original vertex " + originalVertex +
414 ", returned vertex " + vertex + " on superstep " +
415 serviceWorker.getSuperstep() + " with mutations " +
416 vertexMutations);
417 }
418
419 if (vertex != null) {
420 partition.putVertex(vertex);
421 } else if (originalVertex != null) {
422 partition.removeVertex(vertexId);
423 if (!ignoreExistingVertices) {
424 getCurrentMessageStore().clearVertexMessages(vertexId);
425 }
426 }
427 context.progress();
428 }
429 }
430
431 if (!ignoreExistingVertices) {
432
433
434 Iterable<I> destinations = getCurrentMessageStore().
435 getPartitionDestinationVertices(partitionId);
436 if (!Iterables.isEmpty(destinations)) {
437 for (I vertexId : destinations) {
438 if (partition.getVertex(vertexId) == null) {
439 Vertex<I, V, E> vertex =
440 vertexResolver.resolve(vertexId, null, null, true);
441
442 if (LOG.isDebugEnabled()) {
443 LOG.debug(
444 "resolvePartitionMutations: A non-existing vertex has " +
445 "message(s). Added vertex index " + vertexId +
446 " in partition index " + partitionId +
447 ", vertex = " + vertex + ", on superstep " +
448 serviceWorker.getSuperstep());
449 }
450
451 if (vertex != null) {
452 partition.putVertex(vertex);
453 }
454 context.progress();
455 }
456 }
457 }
458 }
459 }
460
461
462
463
464
465 public void waitForComplete() {
466 if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
467 ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
468 }
469 }
470 }