1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.library.striping;
1920import org.apache.giraph.block_app.framework.block.Block;
21import org.apache.giraph.block_app.framework.block.FilteringBlock;
22import org.apache.giraph.block_app.framework.block.SequenceBlock;
23import org.apache.giraph.function.Function;
24import org.apache.giraph.function.Predicate;
25import org.apache.giraph.function.primitive.Int2ObjFunction;
26import org.apache.giraph.function.primitive.Obj2IntFunction;
27import org.apache.giraph.function.vertex.SupplierFromVertex;
28import org.apache.giraph.graph.Vertex;
29import org.apache.hadoop.io.LongWritable;
30import org.apache.hadoop.io.Writable;
31import org.apache.hadoop.io.WritableComparable;
3233import com.google.common.base.Preconditions;
3435/**36 * Utility functions for doing superstep striping.37 *38 * We need to make sure that partitioning (which uses mod for distributing39 * data across workers) is independent from striping itself. So we are using40 * fastHash function below, taken from https://code.google.com/p/fast-hash/.41 */42publicclassStripingUtils {
43privateStripingUtils() { }
4445/* The MIT License4647 Copyright (C) 2012 Zilong Tan (eric.zltan@gmail.com)4849 Permission is hereby granted, free of charge, to any person50 obtaining a copy of this software and associated documentation51 files (the "Software"), to deal in the Software without52 restriction, including without limitation the rights to use, copy,53 modify, merge, publish, distribute, sublicense, and/or sell copies54 of the Software, and to permit persons to whom the Software is55 furnished to do so, subject to the following conditions:5657 The above copyright notice and this permission notice shall be58 included in all copies or substantial portions of the Software.5960 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,61 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF62 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND63 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS64 BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN65 ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN66 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE67 SOFTWARE.68 */69/**70 * Returns 32-bit hash of a given value.71 *72 * Fast and generally good hashing function, adapted from C++ implementation:73 * https://code.google.com/p/fast-hash/74 */75publicstaticint fastHash(long h) {
76 h ^= h >> 23;
77 h *= 0x2127599bf4325c37L;
78 h ^= h >> 47;
79return ((int) (h - (h >> 32))) & 0x7fffffff;
80 }
8182/**83 * Returns number in [0, stripes) range, from given input {@code value}.84 */85publicstaticint fastStripe(long value, int stripes) {
86return fastHash(value) % stripes;
87 }
8889/**90 * Fast hash-based striping for LongWritable IDs, returns a function91 * that for a given ID returns it's stripe index.92 */93publicstatic94 Obj2IntFunction<LongWritable> fastHashStriping(finalint stripes) {
95returnnew Obj2IntFunction<LongWritable>() {
96 @Override
97publicint apply(LongWritable id) {
98return fastStripe(id.get(), stripes);
99 }
100 };
101 }
102103/**104 * Fast hash-based striping for LongWritable IDs, returns a function105 * that for a given stripe index returns a predicate checking whether ID is106 * in that stripe.107 */108publicstatic109 Int2ObjFunction<Predicate<LongWritable>> fastHashStripingPredicate(
110finalint stripes) {
111returnnew Int2ObjFunction<Predicate<LongWritable>>() {
112 @Override
113public Predicate<LongWritable> apply(finalint stripe) {
114returnnew Predicate<LongWritable>() {
115 @Override
116publicboolean apply(LongWritable id) {
117return fastStripe(id.get(), stripes) == stripe;
118 }
119 };
120 }
121 };
122 }
123124/**125 * Generate striped block, with given number of {@code stripes},126 * using given {@code blockGenerator} to generate block for each stripe.127 *128 * @param stripes Number of stripes129 * @param blockGenerator Function given predicate representing whether130 * ID is in current stripe, should return Block131 * for current stripe132 * @return Resulting block133 */134publicstaticBlock generateStripedBlock(
135int stripes,
136 Function<Predicate<LongWritable>, Block> blockGenerator) {
137return generateStripedBlockImpl(
138 stripes, blockGenerator,
139 StripingUtils.fastHashStripingPredicate(stripes));
140 }
141142/**143 * Generate striped block, with given number of {@code stripes},144 * using given {@code blockGenerator} to generate block for each stripe,145 * and using striping based on given {@code stripeSupplier}.146 *147 * @param stripes Number of stripes148 * @param blockGenerator Function given predicate representing whether149 * ID is in current stripe, should return Block150 * for current stripe151 * @param stripeSupplier Function given number of stripes,152 * generates a function that given stripe index,153 * returns predicate checking whether ID is in that154 * stripe.155 * @return Resulting block156 */157publicstatic <I extends WritableComparable>
158Block generateStripedBlock(
159int stripes,
160 Function<Predicate<I>, Block> blockGenerator,
161 Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
162return generateStripedBlockImpl(
163 stripes, blockGenerator, stripeSupplier.apply(stripes));
164 }
165166/**167 * Stripe given block, by calling vertexSend only in it's corresponding168 * stripe. All other methods are called number of stripes times.169 *170 * @param stripes Number of stripes171 * @param block Block to stripe172 * @return Resulting block173 */174publicstaticBlock stripeBlockBySenders(
175int stripes,
176Block block) {
177return generateStripedBlockImpl(
178 stripes,
179 StripingUtils.<LongWritable>createSingleStripeBySendersFunction(block),
180 StripingUtils.fastHashStripingPredicate(stripes));
181 }
182183/**184 * Given a block, creates a function that will given a predicate filter185 * calls to vertexSend function based on that predicate.186 *187 * Useful to be combined with generateStripedBlock to stripe blocks.188 */189publicstatic <I extends WritableComparable> Function<Predicate<I>, Block>
190 createSingleStripeBySendersFunction(finalBlock block) {
191returnnew Function<Predicate<I>, Block>() {
192 @Override
193publicBlock apply(final Predicate<I> stripePredicate) {
194return FilteringBlock.createSendFiltering(
195new SupplierFromVertex<I, Writable, Writable, Boolean>() {
196 @Override
197public Boolean get(Vertex<I, Writable, Writable> vertex) {
198return stripePredicate.apply(vertex.getId());
199 }
200 }, block);
201 }
202 };
203 }
204205privatestatic <I extends WritableComparable>
206Block generateStripedBlockImpl(
207int stripes,
208 Function<Predicate<I>, Block> blockGenerator,
209 Int2ObjFunction<Predicate<I>> stripeSupplier) {
210 Preconditions.checkArgument(stripes >= 1);
211if (stripes == 1) {
212return blockGenerator.apply(new Predicate<I>() {
213 @Override
214publicboolean apply(I input) {
215returntrue;
216 }
217 });
218 }
219Block[] blocks = newBlock[stripes];
220for (int i = 0; i < stripes; i++) {
221 blocks[i] = blockGenerator.apply(stripeSupplier.apply(i));
222 }
223returnnewSequenceBlock(blocks);
224 }
225 }