This project has retired. For details please refer to its Attic page.
LocalityAwareInputSplitsMasterOrganizer xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master.input;
20  
21  import org.apache.giraph.worker.WorkerInfo;
22  import org.apache.hadoop.mapreduce.InputSplit;
23  
24  import java.io.IOException;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  /**
33   * Input splits organizer for vertex and edge input splits on master, which
34   * uses locality information
35   */
36  public class LocalityAwareInputSplitsMasterOrganizer
37      implements InputSplitsMasterOrganizer {
38    /** All splits before this pointer were taken */
39    private final AtomicInteger listPointer = new AtomicInteger();
40    /** List of serialized splits */
41    private final List<byte[]> serializedSplits;
42    /** Array containing information about whether a split was taken or not */
43    private final AtomicBoolean[] splitsTaken;
44  
45    /** Map with preferred splits for each worker */
46    private final Map<Integer, ConcurrentLinkedQueue<Integer>>
47        workerToPreferredSplitsMap;
48  
49  
50    /**
51     * Constructor
52     *
53     * @param serializedSplits Serialized splits
54     * @param splits           Splits
55     * @param workers          List of workers
56     */
57    public LocalityAwareInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
58        List<InputSplit> splits, List<WorkerInfo> workers) {
59      this.serializedSplits = serializedSplits;
60      splitsTaken = new AtomicBoolean[serializedSplits.size()];
61      // Mark all splits as not taken initially
62      for (int i = 0; i < serializedSplits.size(); i++) {
63        splitsTaken[i] = new AtomicBoolean(false);
64      }
65  
66      workerToPreferredSplitsMap = new HashMap<>();
67      for (WorkerInfo worker : workers) {
68        workerToPreferredSplitsMap.put(worker.getTaskId(),
69            new ConcurrentLinkedQueue<Integer>());
70      }
71      // Go through all splits
72      for (int i = 0; i < splits.size(); i++) {
73        try {
74          String[] locations = splits.get(i).getLocations();
75          // For every worker
76          for (WorkerInfo worker : workers) {
77            // Check splits locations
78            for (String location : locations) {
79              // If split is local for the worker, add it to preferred list
80              if (location.contains(worker.getHostname())) {
81                workerToPreferredSplitsMap.get(worker.getTaskId()).add(i);
82                break;
83              }
84            }
85          }
86        } catch (IOException | InterruptedException e) {
87          throw new IllegalStateException(
88              "Exception occurred while getting splits locations", e);
89        }
90      }
91    }
92  
93    @Override
94    public byte[] getSerializedSplitFor(int workerTaskId) {
95      ConcurrentLinkedQueue<Integer> preferredSplits =
96          workerToPreferredSplitsMap.get(workerTaskId);
97      // Try to find a local split
98      while (true) {
99        // Get position to check
100       Integer splitIndex = preferredSplits.poll();
101       // Check if all local splits were already processed for this worker
102       if (splitIndex == null) {
103         break;
104       }
105       // Try to reserve the split
106       if (splitsTaken[splitIndex].compareAndSet(false, true)) {
107         return serializedSplits.get(splitIndex);
108       }
109     }
110 
111     // No more local splits available, proceed linearly from splits list
112     while (true) {
113       // Get position to check
114       int splitIndex = listPointer.getAndIncrement();
115       // Check if all splits were already taken
116       if (splitIndex >= serializedSplits.size()) {
117         return null;
118       }
119       // Try to reserve the split
120       if (splitsTaken[splitIndex].compareAndSet(false, true)) {
121         return serializedSplits.get(splitIndex);
122       }
123     }
124   }
125 }