1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.messages.primitives.long_id;
2021import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
2324import java.util.List;
2526import org.apache.giraph.comm.messages.PartitionSplitInfo;
27import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28import org.apache.giraph.factories.MessageValueFactory;
29import org.apache.giraph.graph.Vertex;
30import org.apache.giraph.partition.Partition;
31import org.apache.hadoop.io.LongWritable;
32import org.apache.hadoop.io.Writable;
3334/**35 * Special message store to be used when ids are LongWritable and no combiner36 * is used.37 * Uses fastutil primitive maps in order to decrease number of objects and38 * get better performance.39 *40 * @param <M> message type41 * @param <L> list type42 */43publicabstractclass LongAbstractListStore<M extends Writable,
44 L extends List> extends LongAbstractStore<M, L> {
45/**46 * Map used to store messages for nascent vertices i.e., ones47 * that did not exist at the start of current superstep but will get48 * created because of sending message to a non-existent vertex id49 */50privatefinal51 Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
5253/**54 * Constructor55 *56 * @param messageValueFactory Factory for creating message values57 * @param partitionInfo Partition split info58 * @param config Hadoop configuration59 */60publicLongAbstractListStore(
61 MessageValueFactory<M> messageValueFactory,
62 PartitionSplitInfo<LongWritable> partitionInfo,
63 ImmutableClassesGiraphConfiguration<LongWritable,
64 Writable, Writable> config) {
65super(messageValueFactory, partitionInfo, config);
66 populateMap();
6768// create map for vertex ids (i.e., nascent vertices) not known yet69 nascentMap = new Int2ObjectOpenHashMap<>();
70for (int partitionId : partitionInfo.getPartitionIds()) {
71 nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
72 }
73 }
7475/**76 * Populate the map with all vertexIds for each partition77 */78privatevoid populateMap() { // TODO - can parallelize?79// populate with vertex ids already known80 partitionInfo.startIteration();
81while (true) {
82Partition partition = partitionInfo.getNextPartition();
83if (partition == null) {
84break;
85 }
86 Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
87for (Object obj : partition) {
88Vertex vertex = (Vertex) obj;
89 LongWritable vertexId = (LongWritable) vertex.getId();
90 partitionMap.put(vertexId.get(), createList());
91 }
92 partitionInfo.putPartition(partition);
93 }
94 }
9596/**97 * Create an instance of L98 * @return instance of L99 */100protectedabstract L createList();
101102/**103 * Get list for the current vertexId104 *105 * @param vertexId vertex id106 * @return list for current vertexId107 */108protected L getList(LongWritable vertexId) {
109long id = vertexId.get();
110int partitionId = partitionInfo.getPartitionId(vertexId);
111 Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
112 L list = partitionMap.get(id);
113if (list == null) {
114 Long2ObjectOpenHashMap<L> nascentPartitionMap =
115 nascentMap.get(partitionId);
116// assumption: not many nascent vertices are created117// so overall synchronization is negligible118synchronized (nascentPartitionMap) {
119 list = nascentPartitionMap.get(id);
120if (list == null) {
121 list = createList();
122 nascentPartitionMap.put(id, list);
123 }
124return list;
125 }
126 }
127return list;
128 }
129130 @Override
131publicvoid finalizeStore() {
132for (int partitionId : nascentMap.keySet()) {
133// nascent vertices are present only in nascent map134 map.get(partitionId).putAll(nascentMap.get(partitionId));
135 }
136 nascentMap.clear();
137 }
138139 @Override
140publicboolean hasMessagesForVertex(LongWritable vertexId) {
141int partitionId = partitionInfo.getPartitionId(vertexId);
142 Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
143 L list = partitionMap.get(vertexId.get());
144if (list != null && !list.isEmpty()) {
145returntrue;
146 }
147 Long2ObjectOpenHashMap<L> nascentMessages = nascentMap.get(partitionId);
148return nascentMessages != null &&
149 nascentMessages.containsKey(vertexId.get());
150 }
151152// TODO - discussion153/*154 some approaches for ensuring correctness with parallel inserts155 - current approach: uses a small extra bit of memory by pre-populating156 map & pushes everything map cannot handle to nascentMap157 at the beginning of next superstep compute a single threaded finalizeStore is158 called (so little extra memory + 1 sequential finish ops)159 - used striped parallel fast utils instead (unsure of perf)160 - use concurrent map (every get gets far slower)161 - use reader writer locks (unsure of perf)162 (code looks something like underneath)163164 private final ReadWriteLock rwl = new ReentrantReadWriteLock();165 rwl.readLock().lock();166 L list = partitionMap.get(vertexId);167 if (list == null) {168 rwl.readLock().unlock();169 rwl.writeLock().lock();170 if (partitionMap.get(vertexId) == null) {171 list = createList();172 partitionMap.put(vertexId, list);173 }174 rwl.readLock().lock();175 rwl.writeLock().unlock();176 }177 rwl.readLock().unlock();178 - adopted from the article179http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\180 ReentrantReadWriteLock.html181 */182 }