1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.master.input;
2021import org.apache.giraph.comm.MasterClient;
22import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
23import org.apache.giraph.conf.StrConfOption;
24import org.apache.giraph.io.GiraphInputFormat;
25import org.apache.giraph.io.InputType;
26import org.apache.giraph.worker.WorkerInfo;
27import org.apache.hadoop.mapreduce.Counter;
28import org.apache.hadoop.mapreduce.InputSplit;
29import org.apache.hadoop.mapreduce.Mapper;
3031import java.io.ByteArrayOutputStream;
32import java.io.DataOutput;
33import java.io.DataOutputStream;
34import java.io.IOException;
35import java.util.ArrayList;
36import java.util.EnumMap;
37import java.util.HashSet;
38import java.util.List;
39import java.util.Map;
40import java.util.Set;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.CountDownLatch;
43import java.util.concurrent.atomic.AtomicInteger;
4445/**46 * Handler for input splits on master47 *48 * Since currently Giraph fails if worker fails while reading input, we49 * didn't complicate this part with retries yet, later it could be added by50 * keeping track of which worker got which split and then if worker dies put51 * these splits back to queues.52 */53publicclassMasterInputSplitsHandler {
54/**55 * Store in counters timestamps when we finished reading56 * these fractions of input57 */58publicstaticfinalStrConfOption DONE_FRACTIONS_TO_STORE_IN_COUNTERS =
59newStrConfOption("giraph.master.input.doneFractionsToStoreInCounters",
60"0.99,1", "Store in counters timestamps when we finished reading " +
61"these fractions of input");
62/** Map of counter group and names */63privatestatic Map<String, Set<String>> COUNTER_GROUP_AND_NAMES =
64new ConcurrentHashMap<>();
6566/** Whether to use locality information */67privatefinalboolean useLocality;
68/** Master client */69privateMasterClient masterClient;
70/** Master client */71private List<WorkerInfo> workers;
72/** Map of splits organizers for each split type */73private Map<InputType, InputSplitsMasterOrganizer> splitsMap =
74new EnumMap<>(InputType.class);
75/** Latches to say when one input splits type is ready to be accessed */76private Map<InputType, CountDownLatch> latchesMap =
77new EnumMap<>(InputType.class);
78/** Context for accessing counters */79privatefinal Mapper.Context context;
80/** How many splits per type are there total */81privatefinal Map<InputType, Integer> numSplitsPerType =
82new EnumMap<>(InputType.class);
83/** How many splits per type have been read so far */84privatefinal Map<InputType, AtomicInteger> numSplitsReadPerType =
85new EnumMap<>(InputType.class);
86/** Timestamps when various splits were created */87privatefinal Map<InputType, Long> splitsCreatedTimestamp =
88new EnumMap<>(InputType.class);
89/**90 * Store in counters timestamps when we finished reading91 * these fractions of input92 */93privatefinaldouble[] doneFractionsToStoreInCounters;
9495/**96 * Constructor97 *98 * @param useLocality Whether to use locality information or not99 * @param context Context for accessing counters100 */101publicMasterInputSplitsHandler(boolean useLocality, Mapper.Context context) {
102this.useLocality = useLocality;
103this.context = context;
104for (InputType inputType : InputType.values()) {
105 latchesMap.put(inputType, new CountDownLatch(1));
106 numSplitsReadPerType.put(inputType, new AtomicInteger(0));
107 }
108109 String[] tmp = DONE_FRACTIONS_TO_STORE_IN_COUNTERS.get(
110 context.getConfiguration()).split(",");
111 doneFractionsToStoreInCounters = newdouble[tmp.length];
112for (int i = 0; i < tmp.length; i++) {
113 doneFractionsToStoreInCounters[i] = Double.parseDouble(tmp[i].trim());
114 }
115 }
116117/**118 * Initialize119 *120 * @param masterClient Master client121 * @param workers List of workers122 */123publicvoid initialize(MasterClient masterClient, List<WorkerInfo> workers) {
124this.masterClient = masterClient;
125this.workers = workers;
126 }
127128/**129 * Add splits130 *131 * @param splitsType Type of splits132 * @param inputSplits Splits133 * @param inputFormat Format134 */135publicvoid addSplits(InputType splitsType, List<InputSplit> inputSplits,
136GiraphInputFormat inputFormat) {
137 splitsCreatedTimestamp.put(splitsType, System.currentTimeMillis());
138 List<byte[]> serializedSplits = new ArrayList<>();
139for (InputSplit inputSplit : inputSplits) {
140try {
141 ByteArrayOutputStream byteArrayOutputStream =
142new ByteArrayOutputStream();
143 DataOutput outputStream =
144new DataOutputStream(byteArrayOutputStream);
145 inputFormat.writeInputSplit(inputSplit, outputStream);
146 serializedSplits.add(byteArrayOutputStream.toByteArray());
147 } catch (IOException e) {
148thrownew IllegalStateException("IOException occurred", e);
149 }
150 }
151InputSplitsMasterOrganizer inputSplitsOrganizer;
152if (splitsType == InputType.MAPPING) {
153 inputSplitsOrganizer = newMappingInputSplitsMasterOrganizer(
154 serializedSplits, workers);
155 } else {
156 inputSplitsOrganizer = useLocality ?
157newLocalityAwareInputSplitsMasterOrganizer(serializedSplits,
158 inputSplits, workers) :
159newBasicInputSplitsMasterOrganizer(serializedSplits);
160 }
161 splitsMap.put(splitsType, inputSplitsOrganizer);
162 latchesMap.get(splitsType).countDown();
163 numSplitsPerType.put(splitsType, serializedSplits.size());
164 }
165166/**167 * Called after we receive a split request from some worker, should send168 * split back to it if available, or send it information that there is no169 * more available170 *171 * @param splitType Type of split requested172 * @param workerTaskId Id of worker who requested split173 * @param isFirstSplit Whether this is the first split a thread is requesting,174 * or this request indicates that previously requested input split was done175 */176publicvoid sendSplitTo(InputType splitType, int workerTaskId,
177boolean isFirstSplit) {
178try {
179// Make sure we don't try to retrieve splits before they were added180 latchesMap.get(splitType).await();
181 } catch (InterruptedException e) {
182thrownew IllegalStateException("Interrupted", e);
183 }
184 byte[] serializedInputSplit =
185 splitsMap.get(splitType).getSerializedSplitFor(workerTaskId);
186 masterClient.sendWritableRequest(workerTaskId,
187newReplyWithInputSplitRequest(splitType,
188 serializedInputSplit == null ? new byte[0] : serializedInputSplit));
189if (!isFirstSplit) {
190 incrementSplitsRead(splitType);
191 }
192 }
193194/**195 * Increment splits read196 *197 * @param splitType Type of split which was read198 */199privatevoid incrementSplitsRead(InputType splitType) {
200int splitsRead = numSplitsReadPerType.get(splitType).incrementAndGet();
201int splits = numSplitsPerType.get(splitType);
202for (int i = 0; i < doneFractionsToStoreInCounters.length; i++) {
203if (splitsRead == (int) (splits * doneFractionsToStoreInCounters[i])) {
204 splitFractionReached(
205 splitType, doneFractionsToStoreInCounters[i], context);
206 }
207 }
208 }
209210/**211 * Call when we reached some fraction of split type done to set the212 * timestamp counter213 *214 * @param inputType Type of input215 * @param fraction Which fraction of input type was done reading216 * @param context Context for accessing counters217 */218privatevoid splitFractionReached(
219InputType inputType, double fraction, Mapper.Context context) {
220 getSplitFractionDoneTimestampCounter(inputType, fraction, context).setValue(
221 System.currentTimeMillis() - splitsCreatedTimestamp.get(inputType));
222 }
223224/**225 * Get counter226 *227 * @param inputType Type of input for counter228 * @param fraction Fraction for counter229 * @param context Context to get counter from230 * @return Counter231 */232publicstatic Counter getSplitFractionDoneTimestampCounter(
233InputType inputType, double fraction, Mapper.Context context) {
234 String groupName = inputType.name() + " input";
235 String counterName = String.format("%.2f%% done time (ms)", fraction * 100);
236 Set<String> counters = COUNTER_GROUP_AND_NAMES.getOrDefault(
237 groupName, new HashSet<>());
238 counters.add(counterName);
239 COUNTER_GROUP_AND_NAMES.put(groupName, counters);
240return context.getCounter(groupName, counterName);
241 }
242243publicstatic Map<String, Set<String>> getCounterGroupAndNames() {
244return COUNTER_GROUP_AND_NAMES;
245 }
246 }