This project has retired. For details please refer to its
Attic page.
NettyWorkerAggregatorRequestProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.bsp.CentralizedServiceWorker;
24 import org.apache.giraph.comm.GlobalCommType;
25 import org.apache.giraph.comm.WorkerClient;
26 import org.apache.giraph.comm.aggregators.AggregatorUtils;
27 import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
28 import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
29 import org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest;
30 import org.apache.giraph.comm.requests.SendReducedToMasterRequest;
31 import org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest;
32 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33 import org.apache.giraph.worker.WorkerInfo;
34 import org.apache.hadoop.io.Writable;
35 import org.apache.hadoop.util.Progressable;
36
37
38
39
40 public class NettyWorkerAggregatorRequestProcessor
41 implements WorkerAggregatorRequestProcessor {
42
43 private final Progressable progressable;
44
45 private final WorkerClient<?, ?, ?> workerClient;
46
47 private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
48
49 private final SendGlobalCommCache sendReducedValuesCache =
50 new SendGlobalCommCache(false);
51
52 private final int maxBytesPerAggregatorRequest;
53
54
55
56
57
58
59
60
61 public NettyWorkerAggregatorRequestProcessor(
62 Progressable progressable,
63 ImmutableClassesGiraphConfiguration<?, ?, ?> configuration,
64 CentralizedServiceWorker<?, ?, ?> serviceWorker) {
65 this.serviceWorker = serviceWorker;
66 this.workerClient = serviceWorker.getWorkerClient();
67 this.progressable = progressable;
68 maxBytesPerAggregatorRequest = configuration.getInt(
69 AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
70 AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
71
72 }
73
74 @Override
75 public boolean sendReducedValue(String name,
76 Writable reducedValue) throws IOException {
77 WorkerInfo owner =
78 AggregatorUtils.getOwner(name,
79 serviceWorker.getWorkerInfoList());
80 if (isThisWorker(owner)) {
81 return false;
82 } else {
83 int currentSize = sendReducedValuesCache.addValue(owner.getTaskId(),
84 name, GlobalCommType.REDUCED_VALUE, reducedValue);
85 if (currentSize >= maxBytesPerAggregatorRequest) {
86 flushAggregatorsToWorker(owner);
87 }
88 return true;
89 }
90 }
91
92 @Override
93 public void flush() throws IOException {
94 for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
95 if (!isThisWorker(workerInfo)) {
96 sendReducedValuesCache.addSpecialCount(workerInfo.getTaskId());
97 flushAggregatorsToWorker(workerInfo);
98 progressable.progress();
99 }
100 }
101 sendReducedValuesCache.reset();
102 }
103
104
105
106
107
108
109 private void flushAggregatorsToWorker(WorkerInfo worker) {
110 byte[] data =
111 sendReducedValuesCache.removeSerialized(worker.getTaskId());
112 workerClient.sendWritableRequest(worker.getTaskId(),
113 new SendWorkerAggregatorsRequest(data,
114 serviceWorker.getWorkerInfo().getTaskId()));
115 }
116
117 @Override
118 public void sendReducedValuesToMaster(byte[] data) throws IOException {
119 workerClient.sendWritableRequest(serviceWorker.getMasterInfo().getTaskId(),
120 new SendReducedToMasterRequest(data));
121 }
122
123 @Override
124 public void distributeReducedValues(
125 Iterable<byte[]> aggregatorDataList) throws IOException {
126 for (byte[] aggregatorData : aggregatorDataList) {
127 for (WorkerInfo worker : serviceWorker.getWorkerInfoList()) {
128 if (!isThisWorker(worker)) {
129 SendAggregatorsToWorkerRequest request =
130 new SendAggregatorsToWorkerRequest(aggregatorData,
131 serviceWorker.getWorkerInfo().getTaskId());
132 workerClient.sendWritableRequest(worker.getTaskId(), request);
133 }
134 progressable.progress();
135 }
136 }
137 }
138
139
140
141
142
143
144
145 private boolean isThisWorker(WorkerInfo workerInfo) {
146 return serviceWorker.getWorkerInfo().getTaskId() == workerInfo.getTaskId();
147 }
148 }