This project has retired. For details please refer to its
Attic page.
BlockMasterLogic xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.internal;
19
20 import java.util.HashSet;
21 import java.util.Iterator;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.TreeMap;
25
26 import org.apache.commons.lang3.time.DurationFormatUtils;
27 import org.apache.giraph.block_app.framework.BlockFactory;
28 import org.apache.giraph.block_app.framework.BlockUtils;
29 import org.apache.giraph.block_app.framework.api.BlockApiHandle;
30 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
31 import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
32 import org.apache.giraph.block_app.framework.block.Block;
33 import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
34 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
35 import org.apache.giraph.conf.GiraphConfiguration;
36 import org.apache.giraph.function.Consumer;
37 import org.apache.giraph.writable.tuple.IntLongWritable;
38 import org.apache.log4j.Logger;
39 import com.google.common.base.Preconditions;
40
41
42
43
44
45
46
47
48 @SuppressWarnings("rawtypes")
49 public class BlockMasterLogic<S> {
50 private static final Logger LOG = Logger.getLogger(BlockMasterLogic.class);
51
52 private Iterator<AbstractPiece> pieceIterator;
53 private PairedPieceAndStage<S> previousPiece;
54 private transient BlockMasterApi masterApi;
55 private long lastTimestamp = -1;
56 private BlockWorkerPieces previousWorkerPieces;
57 private boolean computationDone;
58 private BlockApiHandle blockApiHandle;
59
60
61 private final TimeStatsPerEvent masterPerPieceTimeStats =
62 new TimeStatsPerEvent("master");
63
64 private final TimeStatsPerEvent workerPerPieceTimeStats =
65 new TimeStatsPerEvent("worker");
66
67
68
69
70
71 public void initialize(
72 GiraphConfiguration conf, final BlockMasterApi masterApi) {
73 BlockFactory<S> factory = BlockUtils.createBlockFactory(conf);
74 initialize(factory.createBlock(conf), factory.createExecutionStage(conf),
75 masterApi);
76 }
77
78
79
80
81
82 public void initialize(
83 Block executionBlock, S executionStage, final BlockMasterApi masterApi) {
84 this.masterApi = masterApi;
85 this.computationDone = false;
86
87 LOG.info("Executing application - " + executionBlock);
88 if (executionBlock instanceof BlockWithApiHandle) {
89 blockApiHandle =
90 ((BlockWithApiHandle) executionBlock).getBlockApiHandle();
91 }
92 if (blockApiHandle == null) {
93 blockApiHandle = new BlockApiHandle();
94 }
95 blockApiHandle.setMasterApi(masterApi);
96
97
98 executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
99 private final HashSet<AbstractPiece> registeredPieces = new HashSet<>();
100 @SuppressWarnings("deprecation")
101 @Override
102 public void apply(AbstractPiece piece) {
103
104 if (registeredPieces.add(piece)) {
105 try {
106 piece.registerAggregators(masterApi);
107 } catch (InstantiationException | IllegalAccessException e) {
108 throw new RuntimeException(e);
109 }
110 }
111 }
112 });
113
114 pieceIterator = executionBlock.iterator();
115
116
117
118
119
120 previousPiece = new PairedPieceAndStage<>(null, executionStage);
121 }
122
123
124
125
126
127
128 public void initializeAfterRead(BlockMasterApi masterApi) {
129 this.masterApi = masterApi;
130 }
131
132
133
134
135
136
137
138
139
140 public BlockWorkerPieces<S> computeNext(long superstep) {
141 long beforeMaster = System.currentTimeMillis();
142 if (lastTimestamp != -1) {
143 BlockCounters.setWorkerTimeCounter(
144 previousWorkerPieces, superstep - 1,
145 beforeMaster - lastTimestamp, masterApi, workerPerPieceTimeStats);
146 }
147
148 if (previousPiece == null) {
149 postApplication();
150 return null;
151 } else {
152 boolean logExecutionStatus =
153 BlockUtils.LOG_EXECUTION_STATUS.get(masterApi.getConf());
154 if (logExecutionStatus) {
155 LOG.info("Master executing " + previousPiece +
156 ", in superstep " + superstep);
157 }
158 previousPiece.masterCompute(masterApi);
159 ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
160 returnAllWriters();
161 long afterMaster = System.currentTimeMillis();
162
163 if (previousPiece.getPiece() != null) {
164 BlockCounters.setMasterTimeCounter(
165 previousPiece, superstep, afterMaster - beforeMaster, masterApi,
166 masterPerPieceTimeStats);
167 }
168
169 PairedPieceAndStage<S> nextPiece;
170 if (pieceIterator.hasNext()) {
171 nextPiece = new PairedPieceAndStage<S>(
172 pieceIterator.next(), previousPiece.nextExecutionStage());
173 nextPiece.registerReducers(masterApi);
174 } else {
175 nextPiece = null;
176 }
177 BlockCounters.setStageCounters(
178 "Master finished stage: ", previousPiece.getExecutionStage(),
179 masterApi);
180 if (logExecutionStatus) {
181 LOG.info(
182 "Master passing next " + nextPiece + ", in superstep " + superstep);
183 }
184
185
186
187 BlockWorkerPieces<S> result;
188 if (previousPiece.getPiece() == null && nextPiece == null) {
189 postApplication();
190 result = null;
191 } else {
192 result = new BlockWorkerPieces<>(
193 previousPiece, nextPiece, blockApiHandle);
194 if (logExecutionStatus) {
195 LOG.info("Master in " + superstep + " superstep passing " +
196 result + " to be executed");
197 }
198 }
199
200 previousPiece = nextPiece;
201 lastTimestamp = afterMaster;
202 previousWorkerPieces = result;
203 return result;
204 }
205 }
206
207
208
209
210 private void postApplication() {
211 ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
212 closeAllWriters();
213 Preconditions.checkState(!computationDone);
214 computationDone = true;
215 IntLongWritable masterTimes = masterPerPieceTimeStats.logTimeSums();
216 IntLongWritable workerTimes = workerPerPieceTimeStats.logTimeSums();
217 LOG.info("Time split:\n" +
218 TimeStatsPerEvent.header() +
219 TimeStatsPerEvent.line(
220 masterTimes.getLeft().get(),
221 100.0 * masterTimes.getRight().get() /
222 (masterTimes.getRight().get() + workerTimes.getRight().get()),
223 masterTimes.getRight().get(),
224 "master") +
225 TimeStatsPerEvent.line(
226 workerTimes.getLeft().get(),
227 100.0 * workerTimes.getRight().get() /
228 (masterTimes.getRight().get() + workerTimes.getRight().get()),
229 workerTimes.getRight().get(),
230 "worker"));
231 }
232
233
234
235
236
237 public static class TimeStatsPerEvent {
238 private final String groupName;
239 private final Map<String, IntLongWritable> keyToCountAndTime =
240 new TreeMap<>();
241
242 public TimeStatsPerEvent(String groupName) {
243 this.groupName = groupName;
244 }
245
246 public void inc(String name, long millis) {
247 IntLongWritable val = keyToCountAndTime.get(name);
248 if (val == null) {
249 val = new IntLongWritable();
250 keyToCountAndTime.put(name, val);
251 }
252 val.getLeft().set(val.getLeft().get() + 1);
253 val.getRight().set(val.getRight().get() + millis);
254 }
255
256 public IntLongWritable logTimeSums() {
257 StringBuilder sb = new StringBuilder("Time sums " + groupName + ":\n");
258 sb.append(header());
259 long total = 0;
260 int count = 0;
261 for (Entry<String, IntLongWritable> entry :
262 keyToCountAndTime.entrySet()) {
263 total += entry.getValue().getRight().get();
264 count += entry.getValue().getLeft().get();
265 }
266
267 for (Entry<String, IntLongWritable> entry :
268 keyToCountAndTime.entrySet()) {
269 sb.append(line(
270 entry.getValue().getLeft().get(),
271 (100.0 * entry.getValue().getRight().get()) / total,
272 entry.getValue().getRight().get(),
273 entry.getKey()));
274 }
275 LOG.info(sb);
276 return new IntLongWritable(count, total);
277 }
278
279 public static String header() {
280 return String.format(
281 "%10s%10s%11s %s%n", "count", "time %", "time", "name");
282 }
283
284 public static String line(
285 int count, double percTime, long time, String name) {
286 return String.format("%10d%9.2f%%%11s %s%n",
287 count,
288 percTime,
289 DurationFormatUtils.formatDuration(time, "HH:mm:ss"),
290 name);
291 }
292 }
293 }