This project has retired. For details please refer to its
Attic page.
NettyMasterClient xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.bsp.CentralizedServiceMaster;
24 import org.apache.giraph.comm.GlobalCommType;
25 import org.apache.giraph.comm.MasterClient;
26 import org.apache.giraph.comm.aggregators.AggregatorUtils;
27 import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
28 import org.apache.giraph.comm.flow_control.FlowControl;
29 import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
30 import org.apache.giraph.comm.requests.WritableRequest;
31 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32 import org.apache.giraph.worker.WorkerInfo;
33 import org.apache.hadoop.io.Writable;
34 import org.apache.hadoop.mapreduce.Mapper;
35 import org.apache.hadoop.util.Progressable;
36
37
38
39
40 public class NettyMasterClient implements MasterClient {
41
42 private final NettyClient nettyClient;
43
44 private final CentralizedServiceMaster<?, ?, ?> service;
45
46 private final SendGlobalCommCache sendGlobalCommCache =
47 new SendGlobalCommCache(true);
48
49 private final int maxBytesPerAggregatorRequest;
50
51 private final Progressable progressable;
52
53
54
55
56
57
58
59
60
61
62 public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
63 ImmutableClassesGiraphConfiguration configuration,
64 CentralizedServiceMaster<?, ?, ?> service,
65 Thread.UncaughtExceptionHandler exceptionHandler) {
66 this.nettyClient =
67 new NettyClient(context, configuration, service.getMasterInfo(),
68 exceptionHandler);
69 this.service = service;
70 this.progressable = context;
71 maxBytesPerAggregatorRequest = configuration.getInt(
72 AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
73 AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
74 }
75
76 @Override
77 public void openConnections() {
78 nettyClient.connectAllAddresses(service.getWorkerInfoList());
79 }
80
81 @Override
82 public void sendToOwner(String name, GlobalCommType sendType, Writable object)
83 throws IOException {
84 WorkerInfo owner =
85 AggregatorUtils.getOwner(name, service.getWorkerInfoList());
86 int currentSize = sendGlobalCommCache.addValue(owner.getTaskId(),
87 name, sendType, object);
88 if (currentSize >= maxBytesPerAggregatorRequest) {
89 flushAggregatorsToWorker(owner);
90 }
91 }
92
93 @Override
94 public void finishSendingValues() throws IOException {
95 for (WorkerInfo worker : service.getWorkerInfoList()) {
96 sendGlobalCommCache.addSpecialCount(worker.getTaskId());
97 flushAggregatorsToWorker(worker);
98 progressable.progress();
99 }
100 sendGlobalCommCache.reset();
101 }
102
103
104
105
106
107
108 private void flushAggregatorsToWorker(WorkerInfo worker) {
109 byte[] data =
110 sendGlobalCommCache.removeSerialized(worker.getTaskId());
111 nettyClient.sendWritableRequest(
112 worker.getTaskId(), new SendAggregatorsToOwnerRequest(data,
113 service.getMasterInfo().getTaskId()));
114 }
115
116 @Override
117 public void flush() {
118 nettyClient.waitAllRequests();
119 }
120
121 @Override
122 public void sendWritableRequest(int destTaskId, WritableRequest request) {
123 nettyClient.sendWritableRequest(destTaskId, request);
124 }
125
126 @Override
127 public void closeConnections() {
128 nettyClient.stop();
129 }
130
131 @Override
132 public FlowControl getFlowControl() {
133 return nettyClient.getFlowControl();
134 }
135 }