This project has retired. For details please refer to its Attic page.
StripingUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.library.striping;
19  
20  import org.apache.giraph.block_app.framework.block.Block;
21  import org.apache.giraph.block_app.framework.block.FilteringBlock;
22  import org.apache.giraph.block_app.framework.block.SequenceBlock;
23  import org.apache.giraph.function.Function;
24  import org.apache.giraph.function.Predicate;
25  import org.apache.giraph.function.primitive.Int2ObjFunction;
26  import org.apache.giraph.function.primitive.Obj2IntFunction;
27  import org.apache.giraph.function.vertex.SupplierFromVertex;
28  import org.apache.giraph.graph.Vertex;
29  import org.apache.hadoop.io.LongWritable;
30  import org.apache.hadoop.io.Writable;
31  import org.apache.hadoop.io.WritableComparable;
32  
33  import com.google.common.base.Preconditions;
34  
35  /**
36   * Utility functions for doing superstep striping.
37   *
38   * We need to make sure that partitioning (which uses mod for distributing
39   * data across workers) is independent from striping itself. So we are using
40   * fastHash function below, taken from https://code.google.com/p/fast-hash/.
41   */
42  public class StripingUtils {
43    private StripingUtils() { }
44  
45    /* The MIT License
46  
47    Copyright (C) 2012 Zilong Tan (eric.zltan@gmail.com)
48  
49    Permission is hereby granted, free of charge, to any person
50    obtaining a copy of this software and associated documentation
51    files (the "Software"), to deal in the Software without
52    restriction, including without limitation the rights to use, copy,
53    modify, merge, publish, distribute, sublicense, and/or sell copies
54    of the Software, and to permit persons to whom the Software is
55    furnished to do so, subject to the following conditions:
56  
57    The above copyright notice and this permission notice shall be
58    included in all copies or substantial portions of the Software.
59  
60    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
61    EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
62    MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
63    NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
64    BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
65    ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
66    CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
67    SOFTWARE.
68    */
69    /**
70     * Returns 32-bit hash of a given value.
71     *
72     * Fast and generally good hashing function, adapted from C++ implementation:
73     * https://code.google.com/p/fast-hash/
74     */
75    public static int fastHash(long h) {
76      h ^= h >> 23;
77      h *= 0x2127599bf4325c37L;
78      h ^= h >> 47;
79      return ((int) (h - (h >> 32))) & 0x7fffffff;
80    }
81  
82    /**
83     * Returns number in [0, stripes) range, from given input {@code value}.
84     */
85    public static int fastStripe(long value, int stripes) {
86      return fastHash(value) % stripes;
87    }
88  
89    /**
90     * Fast hash-based striping for LongWritable IDs, returns a function
91     * that for a given ID returns it's stripe index.
92     */
93    public static
94    Obj2IntFunction<LongWritable> fastHashStriping(final int stripes) {
95      return new Obj2IntFunction<LongWritable>() {
96        @Override
97        public int apply(LongWritable id) {
98          return fastStripe(id.get(), stripes);
99        }
100     };
101   }
102 
103   /**
104    * Fast hash-based striping for LongWritable IDs, returns a function
105    * that for a given stripe index returns a predicate checking whether ID is
106    * in that stripe.
107    */
108   public static
109   Int2ObjFunction<Predicate<LongWritable>> fastHashStripingPredicate(
110       final int stripes) {
111     return new Int2ObjFunction<Predicate<LongWritable>>() {
112       @Override
113       public Predicate<LongWritable> apply(final int stripe) {
114         return new Predicate<LongWritable>() {
115           @Override
116           public boolean apply(LongWritable id) {
117             return fastStripe(id.get(), stripes) == stripe;
118           }
119         };
120       }
121     };
122   }
123 
124   /**
125    * Generate striped block, with given number of {@code stripes},
126    * using given {@code blockGenerator} to generate block for each stripe.
127    *
128    * @param stripes Number of stripes
129    * @param blockGenerator Function given predicate representing whether
130    *                       ID is in current stripe, should return Block
131    *                       for current stripe
132    * @return Resulting block
133    */
134   public static Block generateStripedBlock(
135       int stripes,
136       Function<Predicate<LongWritable>, Block> blockGenerator) {
137     return generateStripedBlockImpl(
138         stripes, blockGenerator,
139         StripingUtils.fastHashStripingPredicate(stripes));
140   }
141 
142   /**
143    * Generate striped block, with given number of {@code stripes},
144    * using given {@code blockGenerator} to generate block for each stripe,
145    * and using striping based on given {@code stripeSupplier}.
146    *
147    * @param stripes Number of stripes
148    * @param blockGenerator Function given predicate representing whether
149    *                       ID is in current stripe, should return Block
150    *                       for current stripe
151    * @param stripeSupplier Function given number of stripes,
152    *                       generates a function that given stripe index,
153    *                       returns predicate checking whether ID is in that
154    *                       stripe.
155    * @return Resulting block
156    */
157   public static <I extends WritableComparable>
158   Block generateStripedBlock(
159       int stripes,
160       Function<Predicate<I>, Block> blockGenerator,
161       Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
162     return generateStripedBlockImpl(
163         stripes, blockGenerator, stripeSupplier.apply(stripes));
164   }
165 
166   /**
167    * Stripe given block, by calling vertexSend only in it's corresponding
168    * stripe. All other methods are called number of stripes times.
169    *
170    * @param stripes Number of stripes
171    * @param block Block to stripe
172    * @return Resulting block
173    */
174   public static Block stripeBlockBySenders(
175       int stripes,
176       Block block) {
177     return generateStripedBlockImpl(
178         stripes,
179         StripingUtils.<LongWritable>createSingleStripeBySendersFunction(block),
180         StripingUtils.fastHashStripingPredicate(stripes));
181   }
182 
183   /**
184    * Given a block, creates a function that will given a predicate filter
185    * calls to vertexSend function based on that predicate.
186    *
187    * Useful to be combined with generateStripedBlock to stripe blocks.
188    */
189   public static <I extends WritableComparable> Function<Predicate<I>, Block>
190       createSingleStripeBySendersFunction(final Block block) {
191     return new Function<Predicate<I>, Block>() {
192       @Override
193       public Block apply(final Predicate<I> stripePredicate) {
194         return FilteringBlock.createSendFiltering(
195             new SupplierFromVertex<I, Writable, Writable, Boolean>() {
196               @Override
197               public Boolean get(Vertex<I, Writable, Writable> vertex) {
198                 return stripePredicate.apply(vertex.getId());
199               }
200             }, block);
201       }
202     };
203   }
204 
205   private static <I extends WritableComparable>
206   Block generateStripedBlockImpl(
207       int stripes,
208       Function<Predicate<I>, Block> blockGenerator,
209       Int2ObjFunction<Predicate<I>> stripeSupplier) {
210     Preconditions.checkArgument(stripes >= 1);
211     if (stripes == 1) {
212       return blockGenerator.apply(new Predicate<I>() {
213         @Override
214         public boolean apply(I input) {
215           return true;
216         }
217       });
218     }
219     Block[] blocks = new Block[stripes];
220     for (int i = 0; i < stripes; i++) {
221       blocks[i] = blockGenerator.apply(stripeSupplier.apply(i));
222     }
223     return new SequenceBlock(blocks);
224   }
225 }