1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.requests;
2021import org.apache.giraph.comm.ServerData;
22import org.apache.hadoop.io.Text;
23import org.apache.hadoop.io.Writable;
24import org.apache.hadoop.io.WritableComparable;
2526import java.io.DataInput;
27import java.io.DataOutput;
28import java.io.IOException;
2930/** Request which sends any Writable message from one worker to another */31publicclassSendWorkerToWorkerMessageRequestextendsWritableRequest32implements WorkerRequest<WritableComparable, Writable, Writable> {
33/** Message sent */34private Writable message;
3536/**37 * Default constructor, for reflection38 */39publicSendWorkerToWorkerMessageRequest() {
40 }
4142/**43 * Constructor with message44 *45 * @param message Message sent46 */47publicSendWorkerToWorkerMessageRequest(Writable message) {
48this.message = message;
49 }
5051 @Override
52publicRequestType getType() {
53return RequestType.SEND_WORKER_TO_WORKER_MESSAGE_REQUEST;
54 }
5556 @Override
57void writeRequest(DataOutput output) throws IOException {
58 Text.writeString(output, message.getClass().getName());
59 message.write(output);
60 }
6162 @Override
63void readFieldsRequest(DataInput input) throws IOException {
64 String className = Text.readString(input);
65try {
66 message = (Writable) Class.forName(className).newInstance();
67 message.readFields(input);
68 } catch (InstantiationException | IllegalAccessException |
69 ClassNotFoundException e) {
70thrownew IllegalStateException(
71"readFieldsRequest: Exception occurred", e);
72 }
73 }
7475 @Override
76publicvoid doRequest(
77 ServerData<WritableComparable, Writable, Writable> serverData) {
78 serverData.addIncomingWorkerToWorkerMessage(message);
79 }
80 }