1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.mapping;
2021import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
2223import java.util.Arrays;
24import java.util.Map;
25import java.util.concurrent.ConcurrentMap;
26import java.util.concurrent.atomic.AtomicLong;
2728import javax.annotation.concurrent.ThreadSafe;
2930import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
31import org.apache.giraph.conf.GiraphConstants;
32import org.apache.hadoop.io.ByteWritable;
33import org.apache.hadoop.io.LongWritable;
34import org.apache.hadoop.io.Writable;
35import org.apache.log4j.Logger;
3637import com.google.common.collect.MapMaker;
3839/**40 *41 * An implementation of MappingStore<LongWritable, ByteWritable>42 *43 * Methods implemented here are thread safe by default because it is guaranteed44 * that each entry is written to only once.45 * It can represent up to a maximum of 254 workers46 * any byte passed is treated as unsigned47 */48 @ThreadSafe
49publicclassLongByteMappingStore50extends DefaultImmutableClassesGiraphConfigurable<LongWritable, Writable,
51 Writable> implements MappingStore<LongWritable, ByteWritable> {
52/** Logger instance */53privatestaticfinal Logger LOG = Logger.getLogger(
54 LongByteMappingStore.class);
5556/** Counts number of entries added */57privatefinal AtomicLong numEntries = new AtomicLong(0);
5859/** Id prefix to bytesArray index mapping */60private ConcurrentMap<Long, byte[]> concurrentIdToBytes;
61/** Primitive idToBytes for faster querying */62private Long2ObjectOpenHashMap<byte[]> idToBytes;
63/** Number of lower order bits */64privateint lower;
65/** Number of distinct prefixes */66privateint upper;
67/** Bit mask for lowerOrder suffix bits */68privateint lowerBitMask;
69/** LowerOrder bits count */70privateint lowerOrder;
7172 @Override
73publicvoid initialize() {
74 upper = GiraphConstants.LB_MAPPINGSTORE_UPPER.get(getConf());
75 lower = GiraphConstants.LB_MAPPINGSTORE_LOWER.get(getConf());
7677if ((lower & (lower - 1)) != 0) {
78thrownew IllegalStateException("lower not a power of two");
79 }
8081 lowerBitMask = lower - 1;
82 lowerOrder = Integer.numberOfTrailingZeros(lower); // log_2_(lower)83 concurrentIdToBytes = new MapMaker()
84 .initialCapacity(upper)
85 .concurrencyLevel(getConf().getNumInputSplitsThreads())
86 .makeMap();
87 idToBytes = new Long2ObjectOpenHashMap<>(upper);
88 }
8990/**91 * Auxiliary method to be used by getTarget92 *93 * @param vertexId vertexId94 * @return return byte value of target95 */96public byte getByteTarget(LongWritable vertexId) {
97long key = vertexId.get() >>> lowerOrder;
98int suffix = (int) (vertexId.get() & lowerBitMask);
99if (!idToBytes.containsKey(key)) {
100return -1;
101 }
102return idToBytes.get(key)[suffix];
103 }
104105 @Override
106publicvoid addEntry(LongWritable vertexId, ByteWritable target) {
107long key = vertexId.get() >>> lowerOrder;
108 byte[] bytes = concurrentIdToBytes.get(key);
109if (bytes == null) {
110 byte[] newBytes = new byte[lower];
111 Arrays.fill(newBytes, (byte) -1);
112 bytes = concurrentIdToBytes.putIfAbsent(key, newBytes);
113if (bytes == null) {
114 bytes = newBytes;
115 }
116 }
117 bytes[(int) (vertexId.get() & lowerBitMask)] = target.get();
118 numEntries.getAndIncrement(); // increment count119 }
120121 @Override
122public ByteWritable getTarget(LongWritable vertexId,
123 ByteWritable target) {
124 byte bval = getByteTarget(vertexId);
125if (bval == -1) { // worker not assigned by mapping126returnnull;
127 }
128 target.set(bval);
129return target;
130 }
131132 @Override
133publicvoid postFilling() {
134// not thread-safe135for (Map.Entry<Long, byte[]> entry : concurrentIdToBytes.entrySet()) {
136 idToBytes.put(entry.getKey(), entry.getValue());
137 }
138 concurrentIdToBytes.clear();
139 concurrentIdToBytes = null;
140 }
141142 @Override
143publiclong getStats() {
144return numEntries.longValue();
145 }
146 }