1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.master.input;
2021import org.apache.giraph.worker.WorkerInfo;
22import org.apache.hadoop.mapreduce.InputSplit;
2324import java.io.IOException;
25import java.util.HashMap;
26import java.util.List;
27import java.util.Map;
28import java.util.concurrent.ConcurrentLinkedQueue;
29import java.util.concurrent.atomic.AtomicBoolean;
30import java.util.concurrent.atomic.AtomicInteger;
3132/**33 * Input splits organizer for vertex and edge input splits on master, which34 * uses locality information35 */36publicclassLocalityAwareInputSplitsMasterOrganizer37implementsInputSplitsMasterOrganizer {
38/** All splits before this pointer were taken */39privatefinal AtomicInteger listPointer = new AtomicInteger();
40/** List of serialized splits */41privatefinal List<byte[]> serializedSplits;
42/** Array containing information about whether a split was taken or not */43privatefinal AtomicBoolean[] splitsTaken;
4445/** Map with preferred splits for each worker */46privatefinal Map<Integer, ConcurrentLinkedQueue<Integer>>
47 workerToPreferredSplitsMap;
484950/**51 * Constructor52 *53 * @param serializedSplits Serialized splits54 * @param splits Splits55 * @param workers List of workers56 */57publicLocalityAwareInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
58 List<InputSplit> splits, List<WorkerInfo> workers) {
59this.serializedSplits = serializedSplits;
60 splitsTaken = new AtomicBoolean[serializedSplits.size()];
61// Mark all splits as not taken initially62for (int i = 0; i < serializedSplits.size(); i++) {
63 splitsTaken[i] = new AtomicBoolean(false);
64 }
6566 workerToPreferredSplitsMap = new HashMap<>();
67for (WorkerInfo worker : workers) {
68 workerToPreferredSplitsMap.put(worker.getTaskId(),
69new ConcurrentLinkedQueue<Integer>());
70 }
71// Go through all splits72for (int i = 0; i < splits.size(); i++) {
73try {
74 String[] locations = splits.get(i).getLocations();
75// For every worker76for (WorkerInfo worker : workers) {
77// Check splits locations78for (String location : locations) {
79// If split is local for the worker, add it to preferred list80if (location.contains(worker.getHostname())) {
81 workerToPreferredSplitsMap.get(worker.getTaskId()).add(i);
82break;
83 }
84 }
85 }
86 } catch (IOException | InterruptedException e) {
87thrownew IllegalStateException(
88"Exception occurred while getting splits locations", e);
89 }
90 }
91 }
9293 @Override
94public byte[] getSerializedSplitFor(int workerTaskId) {
95 ConcurrentLinkedQueue<Integer> preferredSplits =
96 workerToPreferredSplitsMap.get(workerTaskId);
97// Try to find a local split98while (true) {
99// Get position to check100 Integer splitIndex = preferredSplits.poll();
101// Check if all local splits were already processed for this worker102if (splitIndex == null) {
103break;
104 }
105// Try to reserve the split106if (splitsTaken[splitIndex].compareAndSet(false, true)) {
107return serializedSplits.get(splitIndex);
108 }
109 }
110111// No more local splits available, proceed linearly from splits list112while (true) {
113// Get position to check114int splitIndex = listPointer.getAndIncrement();
115// Check if all splits were already taken116if (splitIndex >= serializedSplits.size()) {
117returnnull;
118 }
119// Try to reserve the split120if (splitsTaken[splitIndex].compareAndSet(false, true)) {
121return serializedSplits.get(splitIndex);
122 }
123 }
124 }
125 }