1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.giraph.worker; 20 21 import org.apache.giraph.comm.WorkerClient; 22 import org.apache.giraph.comm.requests.AskForInputSplitRequest; 23 import org.apache.giraph.io.InputType; 24 25 import java.util.EnumMap; 26 import java.util.Map; 27 import java.util.concurrent.BlockingQueue; 28 import java.util.concurrent.LinkedBlockingQueue; 29 30 /** 31 * Requests splits from master and keeps track of them 32 */ 33 public class WorkerInputSplitsHandler { 34 /** Worker info of this worker */ 35 private final WorkerInfo workerInfo; 36 /** Task id of master */ 37 private final int masterTaskId; 38 /** Worker client, used for communication */ 39 private final WorkerClient workerClient; 40 /** Map with currently available splits received from master */ 41 private final Map<InputType, BlockingQueue<byte[]>> availableInputSplits; 42 43 /** 44 * Constructor 45 * 46 * @param workerInfo Worker info of this worker 47 * @param masterTaskId Task id of master 48 * @param workerClient Worker client, used for communication 49 */ 50 public WorkerInputSplitsHandler(WorkerInfo workerInfo, int masterTaskId, 51 WorkerClient workerClient) { 52 this.workerInfo = workerInfo; 53 this.masterTaskId = masterTaskId; 54 this.workerClient = workerClient; 55 availableInputSplits = new EnumMap<>(InputType.class); 56 for (InputType inputType : InputType.values()) { 57 availableInputSplits.put( 58 inputType, new LinkedBlockingQueue<byte[]>()); 59 } 60 } 61 62 /** 63 * Called when an input split has been received from master, adding it to 64 * the map 65 * 66 * @param splitType Type of split 67 * @param serializedInputSplit Split 68 */ 69 public void receivedInputSplit(InputType splitType, 70 byte[] serializedInputSplit) { 71 try { 72 availableInputSplits.get(splitType).put(serializedInputSplit); 73 } catch (InterruptedException e) { 74 throw new IllegalStateException("Interrupted", e); 75 } 76 } 77 78 /** 79 * Try to reserve an InputSplit for loading. While InputSplits exists that 80 * are not finished, wait until they are. 81 * 82 * NOTE: iterations on the InputSplit list only halt for each worker when it 83 * has scanned the entire list once and found every split marked RESERVED. 84 * When a worker fails, its Ephemeral RESERVED znodes will disappear, 85 * allowing other iterating workers to claim it's previously read splits. 86 * Only when the last worker left iterating on the list fails can a danger 87 * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently 88 * causes job failure, this is OK. As the failure model evolves, this 89 * behavior might need to change. We could add watches on 90 * inputSplitFinishedNodes and stop iterating only when all these nodes 91 * have been created. 92 * 93 * @param splitType Type of split 94 * @param isFirstSplit Whether this is the first split input thread reads 95 * @return reserved InputSplit or null if no unfinished InputSplits exist 96 */ 97 public byte[] reserveInputSplit(InputType splitType, boolean isFirstSplit) { 98 // Send request 99 workerClient.sendWritableRequest(masterTaskId, 100 new AskForInputSplitRequest( 101 splitType, workerInfo.getTaskId(), isFirstSplit)); 102 try { 103 // Wait for some split to become available 104 byte[] serializedInputSplit = availableInputSplits.get(splitType).take(); 105 return serializedInputSplit.length == 0 ? null : serializedInputSplit; 106 } catch (InterruptedException e) { 107 throw new IllegalStateException("Interrupted", e); 108 } 109 } 110 }