1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.framework.internal;
1920import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
21import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
22import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender;
23import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
24import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
25import org.apache.giraph.graph.Vertex;
2627/**28 * Block execution logic on workers.29 */30 @SuppressWarnings({ "rawtypes", "unchecked" })
31publicclassBlockWorkerLogic {
32privatefinalBlockWorkerPieces pieces;
3334privatetransientVertexReceiver receiveFunctions;
35privatetransient InnerVertexSender sendFunctions;
3637publicBlockWorkerLogic(BlockWorkerPieces pieces) {
38this.pieces = pieces;
39 }
4041publicvoid preSuperstep(
42BlockWorkerReceiveApi receiveApi, BlockWorkerSendApi sendApi) {
43 pieces.getBlockApiHandle().setWorkerReceiveApi(receiveApi);
44 pieces.getBlockApiHandle().setWorkerSendApi(sendApi);
45if (pieces.getReceiver() != null) {
46 receiveFunctions = pieces.getReceiver().getVertexReceiver(receiveApi);
47 }
48if (pieces.getSender() != null) {
49 sendFunctions = pieces.getSender().getVertexSender(sendApi);
50 }
51 }
5253publicvoid compute(Vertex vertex, Iterable messages) {
54if (receiveFunctions != null) {
55 receiveFunctions.vertexReceive(vertex, messages);
56 }
57if (sendFunctions != null) {
58 sendFunctions.vertexSend(vertex);
59 }
60 }
6162publicvoid postSuperstep() {
63if (receiveFunctions instanceof VertexPostprocessor) {
64 ((VertexPostprocessor) receiveFunctions).postprocess();
65 }
66if (sendFunctions != null) {
67 sendFunctions.postprocess();
68 }
69 }
70 }