1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.requests;
2021import java.io.IOException;
2223import org.apache.giraph.comm.GlobalCommType;
24import org.apache.giraph.comm.ServerData;
25import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
26import org.apache.giraph.utils.UnsafeByteArrayInputStream;
27import org.apache.hadoop.io.LongWritable;
28import org.apache.hadoop.io.Writable;
2930/**31 * Request to send partial aggregated values for current superstep (values32 * which were computed by one worker's vertices)33 */34publicclassSendWorkerAggregatorsRequestextends35ByteArrayWithSenderTaskIdRequestimplementsWorkerRequest {
3637/**38 * Constructor39 *40 * @param data Serialized aggregator data41 * @param senderTaskId Sender task id42 */43publicSendWorkerAggregatorsRequest(byte[] data, int senderTaskId) {
44super(data, senderTaskId);
45 }
4647/**48 * Constructor used for reflection only49 */50publicSendWorkerAggregatorsRequest() {
51 }
5253 @Override
54publicvoid doRequest(ServerData serverData) {
55UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
56OwnerAggregatorServerData aggregatorData =
57 serverData.getOwnerAggregatorData();
58try {
59int num = input.readInt();
60for (int i = 0; i < num; i++) {
61 String name = input.readUTF();
62GlobalCommType type = GlobalCommType.values()[input.readByte()];
63if (type == GlobalCommType.SPECIAL_COUNT) {
64 LongWritable value = new LongWritable();
65 value.readFields(input);
66 aggregatorData.receivedRequestCountFromWorker(
67 value.get(),
68 getSenderTaskId());
69 } elseif (type == GlobalCommType.REDUCED_VALUE) {
70 Writable value = aggregatorData.createInitialValue(name);
71 value.readFields(input);
72 aggregatorData.reduce(name, value);
73 } else {
74thrownew IllegalStateException(
75"SendWorkerAggregatorsRequest received " + type);
76 }
77 }
78 } catch (IOException e) {
79thrownew IllegalStateException("doRequest: " +
80"IOException occurred while processing request", e);
81 }
82 aggregatorData.receivedRequestFromWorker();
83 }
8485 @Override
86publicRequestType getType() {
87return RequestType.SEND_WORKER_AGGREGATORS_REQUEST;
88 }
89 }