1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.giraph.comm.requests; 20 21 import java.io.IOException; 22 23 import org.apache.giraph.comm.GlobalCommType; 24 import org.apache.giraph.comm.ServerData; 25 import org.apache.giraph.comm.aggregators.AllAggregatorServerData; 26 import org.apache.giraph.utils.UnsafeByteArrayInputStream; 27 import org.apache.giraph.utils.WritableUtils; 28 import org.apache.hadoop.io.LongWritable; 29 import org.apache.hadoop.io.Writable; 30 31 /** 32 * Request to send final aggregated values from worker which owns them to 33 * other workers 34 */ 35 public class SendAggregatorsToWorkerRequest extends 36 ByteArrayWithSenderTaskIdRequest implements WorkerRequest { 37 38 /** 39 * Constructor 40 * 41 * @param data Serialized aggregator data 42 * @param senderTaskId Sender task id 43 */ 44 public SendAggregatorsToWorkerRequest(byte[] data, int senderTaskId) { 45 super(data, senderTaskId); 46 } 47 48 /** 49 * Constructor used for reflection only 50 */ 51 public SendAggregatorsToWorkerRequest() { 52 } 53 54 @Override 55 public void doRequest(ServerData serverData) { 56 UnsafeByteArrayInputStream input = getUnsafeByteArrayInput(); 57 AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData(); 58 try { 59 int num = input.readInt(); 60 for (int i = 0; i < num; i++) { 61 String name = input.readUTF(); 62 GlobalCommType type = GlobalCommType.values()[input.readByte()]; 63 Writable value = WritableUtils.readWritableObject(input, conf); 64 if (type == GlobalCommType.SPECIAL_COUNT) { 65 aggregatorData.receivedRequestCountFromWorker( 66 ((LongWritable) value).get(), 67 getSenderTaskId()); 68 } else { 69 aggregatorData.receiveValueFromMaster(name, type, value); 70 } 71 } 72 } catch (IOException e) { 73 throw new IllegalStateException("doRequest: " + 74 "IOException occurred while processing request", e); 75 } 76 aggregatorData.receivedRequestFromWorker(); 77 } 78 79 @Override 80 public RequestType getType() { 81 return RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST; 82 } 83 }