This project has retired. For details please refer to its Attic page.
ServerData xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm;
20  
21  import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
22  
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.ConcurrentMap;
28  
29  import org.apache.giraph.bsp.CentralizedServiceWorker;
30  import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
31  import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
32  import org.apache.giraph.comm.messages.MessageStore;
33  import org.apache.giraph.comm.messages.MessageStoreFactory;
34  import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
35  import org.apache.giraph.conf.GiraphConstants;
36  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
37  import org.apache.giraph.edge.EdgeStore;
38  import org.apache.giraph.edge.EdgeStoreFactory;
39  import org.apache.giraph.graph.Vertex;
40  import org.apache.giraph.graph.VertexMutations;
41  import org.apache.giraph.graph.VertexResolver;
42  import org.apache.giraph.ooc.OutOfCoreEngine;
43  import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
44  import org.apache.giraph.ooc.data.DiskBackedMessageStore;
45  import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
46  import org.apache.giraph.partition.Partition;
47  import org.apache.giraph.partition.PartitionStore;
48  import org.apache.giraph.partition.SimplePartitionStore;
49  import org.apache.giraph.utils.ReflectionUtils;
50  import org.apache.hadoop.io.Writable;
51  import org.apache.hadoop.io.WritableComparable;
52  import org.apache.hadoop.mapreduce.Mapper;
53  import org.apache.log4j.Logger;
54  
55  import com.google.common.collect.Iterables;
56  import com.google.common.collect.Maps;
57  
58  /**
59   * Anything that the server stores
60   *
61   * @param <I> Vertex id
62   * @param <V> Vertex data
63   * @param <E> Edge data
64   */
65  @SuppressWarnings("rawtypes")
66  public class ServerData<I extends WritableComparable,
67      V extends Writable, E extends Writable> {
68    /** Class logger */
69    private static final Logger LOG = Logger.getLogger(ServerData.class);
70    /** Configuration */
71    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
72    /** Partition store for this worker. */
73    private volatile PartitionStore<I, V, E> partitionStore;
74    /** Edge store for this worker. */
75    private final EdgeStore<I, V, E> edgeStore;
76    /** Message store factory */
77    private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
78        messageStoreFactory;
79    /**
80     * Message store for incoming messages (messages which will be consumed
81     * in the next super step)
82     */
83    private volatile MessageStore<I, Writable> incomingMessageStore;
84    /**
85     * Message store for current messages (messages which we received in
86     * previous super step and which will be consumed in current super step)
87     */
88    private volatile MessageStore<I, Writable> currentMessageStore;
89    /**
90     * Map of partition ids to vertex mutations from other workers. These are
91     * mutations that should be applied before execution of *current* super step.
92     * (accesses to keys should be thread-safe as multiple threads may resolve
93     * mutations of different partitions at the same time)
94     */
95    private ConcurrentMap<Integer,
96        ConcurrentMap<I, VertexMutations<I, V, E>>>
97        oldPartitionMutations = Maps.newConcurrentMap();
98    /**
99     * Map of partition ids to vertex mutations from other workers. These are
100    * mutations that are coming from other workers as the execution goes one in a
101    * super step. These mutations should be applied in the *next* super step.
102    * (this should be thread-safe)
103    */
104   private ConcurrentMap<Integer,
105       ConcurrentMap<I, VertexMutations<I, V, E>>>
106       partitionMutations = Maps.newConcurrentMap();
107   /**
108    * Holds aggregators which current worker owns from current superstep
109    */
110   private final OwnerAggregatorServerData ownerAggregatorData;
111   /**
112    * Holds old aggregators from previous superstep
113    */
114   private final AllAggregatorServerData allAggregatorData;
115   /** Service worker */
116   private final CentralizedServiceWorker<I, V, E> serviceWorker;
117 
118   /** Store for current messages from other workers to this worker */
119   private volatile List<Writable> currentWorkerToWorkerMessages =
120       Collections.synchronizedList(new ArrayList<Writable>());
121   /** Store for message from other workers to this worker for next superstep */
122   private volatile List<Writable> incomingWorkerToWorkerMessages =
123       Collections.synchronizedList(new ArrayList<Writable>());
124 
125   /** Job context (for progress) */
126   private final Mapper<?, ?, ?, ?>.Context context;
127   /** Out-of-core engine */
128   private final OutOfCoreEngine oocEngine;
129 
130   /**
131    * Constructor.
132    *
133    * @param service Service worker
134    * @param workerServer Worker server
135    * @param conf Configuration
136    * @param context Mapper context
137    */
138   public ServerData(
139       CentralizedServiceWorker<I, V, E> service,
140       WorkerServer workerServer,
141       ImmutableClassesGiraphConfiguration<I, V, E> conf,
142       Mapper<?, ?, ?, ?>.Context context) {
143     this.serviceWorker = service;
144     this.conf = conf;
145     this.messageStoreFactory = createMessageStoreFactory();
146     EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
147     edgeStoreFactory.initialize(service, conf, context);
148     EdgeStore<I, V, E> inMemoryEdgeStore = edgeStoreFactory.newStore();
149     PartitionStore<I, V, E> inMemoryPartitionStore =
150         new SimplePartitionStore<I, V, E>(conf, context);
151     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
152       oocEngine = new OutOfCoreEngine(conf, service, workerServer);
153       partitionStore =
154           new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
155               conf, context, oocEngine);
156       edgeStore =
157           new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine);
158     } else {
159       partitionStore = inMemoryPartitionStore;
160       edgeStore = inMemoryEdgeStore;
161       oocEngine = null;
162     }
163     ownerAggregatorData = new OwnerAggregatorServerData(context);
164     allAggregatorData = new AllAggregatorServerData(context, conf);
165     this.context = context;
166   }
167 
168   /**
169    * Decide which message store should be used for current application,
170    * and create the factory for that store
171    *
172    * @return Message store factory
173    */
174   private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
175   createMessageStoreFactory() {
176     Class<? extends MessageStoreFactory> messageStoreFactoryClass =
177         MESSAGE_STORE_FACTORY_CLASS.get(conf);
178 
179     MessageStoreFactory messageStoreFactoryInstance =
180         ReflectionUtils.newInstance(messageStoreFactoryClass);
181     messageStoreFactoryInstance.initialize(serviceWorker, conf);
182 
183     return messageStoreFactoryInstance;
184   }
185 
186   /**
187    * Return the out-of-core engine for this worker.
188    *
189    * @return The out-of-core engine
190    */
191   public OutOfCoreEngine getOocEngine() {
192     return oocEngine;
193   }
194 
195   /**
196    * Return the edge store for this worker.
197    *
198    * @return The edge store
199    */
200   public EdgeStore<I, V, E> getEdgeStore() {
201     return edgeStore;
202   }
203 
204   /**
205    * Return the partition store for this worker.
206    *
207    * @return The partition store
208    */
209   public PartitionStore<I, V, E> getPartitionStore() {
210     return partitionStore;
211   }
212 
213   /**
214    * Get message store for incoming messages (messages which will be consumed
215    * in the next super step)
216    *
217    * @param <M> Message data
218    * @return Incoming message store
219    */
220   public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
221     return (MessageStore<I, M>) incomingMessageStore;
222   }
223 
224   /**
225    * Get message store for current messages (messages which we received in
226    * previous super step and which will be consumed in current super step)
227    *
228    * @param <M> Message data
229    * @return Current message store
230    */
231   public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
232     return (MessageStore<I, M>) currentMessageStore;
233   }
234 
235   /**
236    * Re-initialize message stores.
237    * Discards old values if any.
238    */
239   public void resetMessageStores() {
240     if (currentMessageStore != null) {
241       currentMessageStore.clearAll();
242       currentMessageStore = null;
243     }
244     if (incomingMessageStore != null) {
245       incomingMessageStore.clearAll();
246       incomingMessageStore = null;
247     }
248     prepareSuperstep();
249   }
250 
251   /** Prepare for next superstep */
252   public void prepareSuperstep() {
253     if (currentMessageStore != null) {
254       currentMessageStore.clearAll();
255     }
256 
257     MessageStore<I, Writable> nextCurrentMessageStore;
258     MessageStore<I, Writable> nextIncomingMessageStore;
259     MessageStore<I, Writable> messageStore;
260 
261     // First create the necessary in-memory message stores. If out-of-core
262     // mechanism is enabled, we wrap the in-memory message stores within
263     // disk-backed messages stores.
264     if (incomingMessageStore != null) {
265       nextCurrentMessageStore = incomingMessageStore;
266     } else {
267       messageStore = messageStoreFactory.newStore(
268           conf.getIncomingMessageClasses());
269       if (oocEngine == null) {
270         nextCurrentMessageStore = messageStore;
271       } else {
272         nextCurrentMessageStore = new DiskBackedMessageStore<>(
273             conf, oocEngine, messageStore,
274             conf.getIncomingMessageClasses().useMessageCombiner(),
275             serviceWorker.getSuperstep());
276       }
277     }
278 
279     messageStore = messageStoreFactory.newStore(
280         conf.getOutgoingMessageClasses());
281     if (oocEngine == null) {
282       nextIncomingMessageStore = messageStore;
283     } else {
284       nextIncomingMessageStore = new DiskBackedMessageStore<>(
285           conf, oocEngine, messageStore,
286           conf.getOutgoingMessageClasses().useMessageCombiner(),
287           serviceWorker.getSuperstep() + 1);
288     }
289 
290     // If out-of-core engine is enabled, we avoid overlapping of out-of-core
291     // decisions with change of superstep. This avoidance is done to simplify
292     // the design and reduce excessive use of synchronization primitives.
293     if (oocEngine != null) {
294       oocEngine.getSuperstepLock().writeLock().lock();
295     }
296     currentMessageStore = nextCurrentMessageStore;
297     incomingMessageStore = nextIncomingMessageStore;
298     if (oocEngine != null) {
299       oocEngine.reset();
300       oocEngine.getSuperstepLock().writeLock().unlock();
301     }
302     currentMessageStore.finalizeStore();
303 
304     currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
305     incomingWorkerToWorkerMessages =
306         Collections.synchronizedList(new ArrayList<Writable>());
307   }
308 
309   /**
310    * Get the vertex mutations (synchronize on the values)
311    *
312    * @return Vertex mutations
313    */
314   public ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
315   getPartitionMutations() {
316     return partitionMutations;
317   }
318 
319   /**
320    * Get holder for aggregators which current worker owns
321    *
322    * @return Holder for aggregators which current worker owns
323    */
324   public OwnerAggregatorServerData getOwnerAggregatorData() {
325     return ownerAggregatorData;
326   }
327 
328   /**
329    * Get holder for aggregators from previous superstep
330    *
331    * @return Holder for aggregators from previous superstep
332    */
333   public AllAggregatorServerData getAllAggregatorData() {
334     return allAggregatorData;
335   }
336 
337   /**
338    * Get the reference of the service worker.
339    *
340    * @return CentralizedServiceWorker
341    */
342   public CentralizedServiceWorker<I, V, E> getServiceWorker() {
343     return this.serviceWorker;
344   }
345 
346   /**
347    * Get and clear worker to worker messages for this superstep. Can be
348    * called only once per superstep.
349    *
350    * @return List of messages for this worker
351    */
352   public List<Writable> getAndClearCurrentWorkerToWorkerMessages() {
353     List<Writable> ret = currentWorkerToWorkerMessages;
354     currentWorkerToWorkerMessages = null;
355     return ret;
356   }
357 
358   /**
359    * Add incoming message to this worker for next superstep. Thread-safe.
360    *
361    * @param message Message received
362    */
363   public void addIncomingWorkerToWorkerMessage(Writable message) {
364     incomingWorkerToWorkerMessages.add(message);
365   }
366 
367 
368   /**
369    * Get worker to worker messages received in previous superstep.
370    * @return list of current worker to worker messages.
371    */
372   public List<Writable> getCurrentWorkerToWorkerMessages() {
373     return currentWorkerToWorkerMessages;
374   }
375 
376   /**
377    * Prepare resolving mutation.
378    */
379   public void prepareResolveMutations() {
380     oldPartitionMutations = partitionMutations;
381     partitionMutations = Maps.newConcurrentMap();
382   }
383 
384   /**
385    * Resolve mutations specific for a partition. This method is called once
386    * per partition, before the computation for that partition starts.
387    * @param partition The partition to resolve mutations for
388    */
389   public void resolvePartitionMutation(Partition<I, V, E> partition) {
390     Integer partitionId = partition.getId();
391     VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
392     ConcurrentMap<I, VertexMutations<I, V, E>> prevPartitionMutations =
393         oldPartitionMutations.get(partitionId);
394 
395     boolean ignoreExistingVertices =
396         conf.getIncomingMessageClasses().ignoreExistingVertices();
397 
398     // Resolve mutations that are explicitly sent for this partition
399     if (prevPartitionMutations != null) {
400       for (Map.Entry<I, VertexMutations<I, V, E>> entry : prevPartitionMutations
401           .entrySet()) {
402         I vertexId = entry.getKey();
403         Vertex<I, V, E> originalVertex = partition.getVertex(vertexId);
404         VertexMutations<I, V, E> vertexMutations = entry.getValue();
405         Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId,
406             originalVertex, vertexMutations,
407             !ignoreExistingVertices &&
408             getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
409 
410         if (LOG.isDebugEnabled()) {
411           LOG.debug("resolvePartitionMutations: Resolved vertex index " +
412               vertexId + " in partition index " + partitionId +
413               " with original vertex " + originalVertex +
414               ", returned vertex " + vertex + " on superstep " +
415               serviceWorker.getSuperstep() + " with mutations " +
416               vertexMutations);
417         }
418 
419         if (vertex != null) {
420           partition.putVertex(vertex);
421         } else if (originalVertex != null) {
422           partition.removeVertex(vertexId);
423           if (!ignoreExistingVertices) {
424             getCurrentMessageStore().clearVertexMessages(vertexId);
425           }
426         }
427         context.progress();
428       }
429     }
430 
431     if (!ignoreExistingVertices) {
432       // Keep track of vertices which are not here in the partition, but have
433       // received messages
434       Iterable<I> destinations = getCurrentMessageStore().
435           getPartitionDestinationVertices(partitionId);
436       if (!Iterables.isEmpty(destinations)) {
437         for (I vertexId : destinations) {
438           if (partition.getVertex(vertexId) == null) {
439             Vertex<I, V, E> vertex =
440                 vertexResolver.resolve(vertexId, null, null, true);
441 
442             if (LOG.isDebugEnabled()) {
443               LOG.debug(
444                   "resolvePartitionMutations: A non-existing vertex has " +
445                   "message(s). Added vertex index " + vertexId +
446                   " in partition index " + partitionId +
447                   ", vertex = " + vertex + ", on superstep " +
448                   serviceWorker.getSuperstep());
449             }
450 
451             if (vertex != null) {
452               partition.putVertex(vertex);
453             }
454             context.progress();
455           }
456         }
457       }
458     }
459   }
460 
461   /**
462    * In case of async message store we have to wait for all messages
463    * to be processed before going into next superstep.
464    */
465   public void waitForComplete() {
466     if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
467       ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
468     }
469   }
470 }