1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.requests;
2021import org.apache.giraph.comm.ServerData;
22import org.apache.giraph.io.InputType;
2324import java.io.DataInput;
25import java.io.DataOutput;
26import java.io.IOException;
2728/**29 * A request which master will send to workers to give them splits30 */31publicclassReplyWithInputSplitRequestextendsWritableRequest32implementsWorkerRequest {
33/** Type of input split */34privateInputType splitType;
35/** Serialized input split */36private byte[] serializedInputSplit;
3738/**39 * Constructor40 *41 * @param splitType Type of input split42 * @param serializedInputSplit Serialized input split43 */44publicReplyWithInputSplitRequest(InputType splitType,
45 byte[] serializedInputSplit) {
46this.splitType = splitType;
47this.serializedInputSplit = serializedInputSplit;
48 }
4950/**51 * Constructor used for reflection only52 */53publicReplyWithInputSplitRequest() {
54 }
5556 @Override
57void readFieldsRequest(DataInput in) throws IOException {
58 splitType = InputType.values()[in.readInt()];
59int size = in.readInt();
60 serializedInputSplit = new byte[size];
61 in.readFully(serializedInputSplit);
62 }
6364 @Override
65void writeRequest(DataOutput out) throws IOException {
66 out.writeInt(splitType.ordinal());
67 out.writeInt(serializedInputSplit.length);
68 out.write(serializedInputSplit);
69 }
7071 @Override
72publicvoid doRequest(ServerData serverData) {
73 serverData.getServiceWorker().getInputSplitsHandler().receivedInputSplit(
74 splitType, serializedInputSplit);
75 }
7677 @Override
78publicRequestType getType() {
79return RequestType.REPLY_WITH_INPUT_SPLIT_REQUEST;
80 }
81 }