This project has retired. For details please refer to its
Attic page.
BlockWorkerPieces xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.internal;
19
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Objects;
23
24 import org.apache.giraph.block_app.framework.api.BlockApiHandle;
25 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
26 import org.apache.giraph.conf.DefaultMessageClasses;
27 import org.apache.giraph.conf.GiraphConstants;
28 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29 import org.apache.giraph.conf.MessageClasses;
30 import org.apache.giraph.counters.CustomCounter;
31 import org.apache.giraph.counters.CustomCounters;
32 import org.apache.giraph.factories.DefaultMessageValueFactory;
33 import org.apache.giraph.master.MasterCompute;
34 import org.apache.giraph.types.NoMessage;
35 import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
36 import org.apache.giraph.utils.WritableUtils;
37 import org.apache.giraph.worker.WorkerGlobalCommUsage;
38 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
39 import org.apache.hadoop.io.IntWritable;
40 import org.apache.hadoop.io.Writable;
41 import org.apache.log4j.Logger;
42
43
44
45
46
47
48 @SuppressWarnings({ "rawtypes", "unchecked" })
49 public class BlockWorkerPieces<S> {
50 private static final Logger LOG = Logger.getLogger(BlockWorkerPieces.class);
51
52
53 private static final
54 String NEXT_WORKER_PIECES = "giraph.blocks.next_worker_pieces";
55
56
57 private static final String PASSED_WORKER_STATS_GROUP = "PassedWorker Stats";
58
59
60 private static final String TOTAL_SERIALISED_SIZE_NAME =
61 "total serialized size";
62
63
64 private static final String SPLIT_PARTS_NAME = "split parts";
65
66 private final PairedPieceAndStage<S> receiver;
67 private final PairedPieceAndStage<S> sender;
68 private final BlockApiHandle blockApiHandle;
69
70 public BlockWorkerPieces(
71 PairedPieceAndStage<S> receiver, PairedPieceAndStage<S> sender,
72 BlockApiHandle blockApiHandle) {
73 this.receiver = receiver;
74 this.sender = sender;
75 this.blockApiHandle = blockApiHandle;
76 }
77
78 public PairedPieceAndStage<S> getReceiver() {
79 return receiver;
80 }
81
82 public PairedPieceAndStage<S> getSender() {
83 return sender;
84 }
85
86 public BlockApiHandle getBlockApiHandle() {
87 return blockApiHandle;
88 }
89
90 public MessageClasses getOutgoingMessageClasses(
91 ImmutableClassesGiraphConfiguration conf) {
92 MessageClasses messageClasses;
93 if (sender == null || sender.getPiece() == null) {
94 messageClasses = new DefaultMessageClasses(
95 NoMessage.class,
96 DefaultMessageValueFactory.class,
97 null,
98 MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION);
99 } else {
100 messageClasses = sender.getPiece().getMessageClasses(conf);
101 }
102
103 messageClasses.verifyConsistent(conf);
104 return messageClasses;
105 }
106
107 @Override
108 public String toString() {
109 return "[receiver=" + receiver + ",sender=" + sender + "]";
110 }
111
112 public String toStringShort() {
113 String receiverString =
114 Objects.toString(receiver != null ? receiver.getPiece() : null);
115 String senderString =
116 Objects.toString(sender != null ? sender.getPiece() : null);
117 if (receiverString.equals(senderString)) {
118 return "[receiver&sender=" + receiverString + "]";
119 } else {
120 return "[receiver=" + receiverString + ",sender=" + senderString + "]";
121 }
122 }
123
124
125
126
127 public static <S> void setNextWorkerPieces(
128 MasterCompute master, BlockWorkerPieces<S> nextWorkerPieces) {
129 Writable toBroadcast = new KryoWritableWrapper<>(nextWorkerPieces);
130 byte[] data = WritableUtils.toByteArrayUnsafe(toBroadcast);
131
132
133 int overhead = 4096;
134 int singleSize = Math.max(
135 overhead,
136 GiraphConstants.MAX_MSG_REQUEST_SIZE.get(master.getConf()) - overhead);
137
138 ArrayList<byte[]> splittedData = new ArrayList<>();
139 if (data.length < singleSize) {
140 splittedData.add(data);
141 } else {
142 for (int start = 0; start < data.length; start += singleSize) {
143 splittedData.add(Arrays.copyOfRange(
144 data, start, Math.min(data.length, start + singleSize)));
145 }
146 }
147
148 LOG.info("Next worker piece - total serialized size: " + data.length +
149 ", split into " + splittedData.size());
150 CustomCounters.addCustomCounter(PASSED_WORKER_STATS_GROUP,
151 TOTAL_SERIALISED_SIZE_NAME, CustomCounter.Aggregation.SUM);
152 master.getContext().getCounter(PASSED_WORKER_STATS_GROUP,
153 TOTAL_SERIALISED_SIZE_NAME)
154 .increment(data.length);
155 CustomCounters.addCustomCounter(PASSED_WORKER_STATS_GROUP,
156 SPLIT_PARTS_NAME, CustomCounter.Aggregation.SUM);
157 master.getContext().getCounter(PASSED_WORKER_STATS_GROUP, SPLIT_PARTS_NAME)
158 .increment(splittedData.size());
159
160 master.broadcast(NEXT_WORKER_PIECES, new IntWritable(splittedData.size()));
161
162 for (int i = 0; i < splittedData.size(); i++) {
163 master.broadcast(NEXT_WORKER_PIECES + "_part_" + i,
164 KryoWritableWrapper.wrapIfNeeded(splittedData.get(i)));
165 }
166
167 master.setOutgoingMessageClasses(
168 nextWorkerPieces.getOutgoingMessageClasses(master.getConf()));
169 }
170
171 public static <S> BlockWorkerPieces<S> getNextWorkerPieces(
172 WorkerGlobalCommUsage worker) {
173 int splits = worker.<IntWritable>getBroadcast(NEXT_WORKER_PIECES).get();
174
175 int totalLength = 0;
176 ArrayList<byte[]> splittedData = new ArrayList<>();
177 for (int i = 0; i < splits; i++) {
178 byte[] cur = KryoWritableWrapper.<byte[]>unwrapIfNeeded(
179 worker.getBroadcast(NEXT_WORKER_PIECES + "_part_" + i));
180 splittedData.add(cur);
181 totalLength += cur.length;
182 }
183
184 byte[] merged;
185 if (splits == 1) {
186 merged = splittedData.get(0);
187 } else {
188 merged = new byte[totalLength];
189 int index = 0;
190 for (int i = 0; i < splits; i++) {
191 System.arraycopy(
192 splittedData.get(i), 0, merged, index, splittedData.get(i).length);
193 index += splittedData.get(i).length;
194 }
195 }
196
197 KryoWritableWrapper<BlockWorkerPieces<S>> wrapper =
198 new KryoWritableWrapper<>();
199 WritableUtils.fromByteArrayUnsafe(
200 merged, wrapper, new UnsafeReusableByteArrayInput());
201 return wrapper.get();
202 }
203 }