1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.giraph.comm.messages; 20 21 import java.io.DataInput; 22 import java.io.DataOutput; 23 import java.io.IOException; 24 25 import org.apache.giraph.utils.VertexIdMessages; 26 import org.apache.hadoop.io.Writable; 27 import org.apache.hadoop.io.WritableComparable; 28 29 /** 30 * Message store 31 * 32 * @param <I> Vertex id 33 * @param <M> Message data 34 */ 35 public interface MessageStore<I extends WritableComparable, 36 M extends Writable> { 37 /** 38 * True if this message-store encodes messages as a list of long pointers 39 * to compact serialized messages 40 * 41 * @return true if we encode messages as a list of pointers 42 */ 43 boolean isPointerListEncoding(); 44 45 /** 46 * Gets messages for a vertex. The lifetime of every message is only 47 * guaranteed until the iterator's next() method is called. Do not hold 48 * references to objects returned by this iterator. 49 * 50 * @param vertexId Vertex id for which we want to get messages 51 * @return Iterable of messages for a vertex id 52 */ 53 Iterable<M> getVertexMessages(I vertexId); 54 55 /** 56 * Clears messages for a vertex. 57 * 58 * @param vertexId Vertex id for which we want to clear messages 59 */ 60 void clearVertexMessages(I vertexId); 61 62 /** 63 * Clears all resources used by this store. 64 */ 65 void clearAll(); 66 67 /** 68 * Check if we have messages for some vertex 69 * 70 * @param vertexId Id of vertex which we want to check 71 * @return True iff we have messages for vertex with required id 72 */ 73 boolean hasMessagesForVertex(I vertexId); 74 75 /** 76 * Check if we have messages for some partition 77 * 78 * @param partitionId Id of partition which we want to check 79 * @return True iff we have messages for the given partition 80 */ 81 boolean hasMessagesForPartition(int partitionId); 82 83 /** 84 * Adds messages for partition 85 * 86 * @param partitionId Id of partition 87 * @param messages Collection of vertex ids and messages we want to add 88 */ 89 void addPartitionMessages( 90 int partitionId, VertexIdMessages<I, M> messages); 91 92 /** 93 * Adds a message for a particular vertex 94 * The method is used by InternalMessageStore to send local messages; for 95 * the general case, use a more efficient addPartitionMessages 96 * 97 * @param vertexId Id of target vertex 98 * @param message A message to send 99 * @throws IOException 100 */ 101 void addMessage(I vertexId, M message) throws IOException; 102 103 /** 104 * Called before start of computation in bspworker 105 * Since it is run from a single thread while the store is not being 106 * accessed by any other thread - this is ensured to be thread-safe 107 */ 108 void finalizeStore(); 109 110 /** 111 * Gets vertex ids from selected partition which we have messages for 112 * 113 * @param partitionId Id of partition 114 * @return Iterable over vertex ids which we have messages for 115 */ 116 Iterable<I> getPartitionDestinationVertices(int partitionId); 117 118 /** 119 * Clears messages for a partition. 120 * 121 * @param partitionId Partition id for which we want to clear messages 122 */ 123 void clearPartition(int partitionId); 124 125 /** 126 * Serialize messages for one partition. 127 * 128 * @param out {@link DataOutput} to serialize this object into 129 * @param partitionId Id of partition 130 * @throws IOException 131 */ 132 void writePartition(DataOutput out, int partitionId) throws IOException; 133 134 /** 135 * Deserialize messages for one partition 136 * 137 * @param in {@link DataInput} to deserialize this object 138 * from. 139 * @param partitionId Id of partition 140 * @throws IOException 141 */ 142 void readFieldsForPartition(DataInput in, 143 int partitionId) throws IOException; 144 }