This project has retired. For details please refer to its
Attic page.
AddressesAndPartitionsWritable xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.graph;
20
21 import org.apache.giraph.partition.PartitionOwner;
22 import org.apache.giraph.master.MasterInfo;
23 import org.apache.giraph.utils.ReflectionUtils;
24 import org.apache.giraph.utils.WritableUtils;
25 import org.apache.giraph.worker.WorkerInfo;
26 import org.apache.hadoop.io.Writable;
27
28 import com.google.common.collect.Iterables;
29 import com.google.common.collect.Lists;
30 import com.google.common.collect.Maps;
31
32 import java.io.DataInput;
33 import java.io.DataOutput;
34 import java.io.IOException;
35 import java.util.Collection;
36 import java.util.List;
37 import java.util.Map;
38
39
40
41
42 public class AddressesAndPartitionsWritable implements Writable {
43
44 private MasterInfo masterInfo;
45
46 private List<WorkerInfo> workerInfos;
47
48 private Collection<PartitionOwner> partitionOwners;
49
50
51
52
53
54
55
56
57 public AddressesAndPartitionsWritable(MasterInfo masterInfo,
58 List<WorkerInfo> workerInfos,
59 Collection<PartitionOwner> partitionOwners) {
60 this.masterInfo = masterInfo;
61 this.workerInfos = workerInfos;
62 this.partitionOwners = partitionOwners;
63 }
64
65
66 public AddressesAndPartitionsWritable() {
67 }
68
69
70
71
72
73
74 public MasterInfo getMasterInfo() {
75 return masterInfo;
76 }
77
78
79
80
81
82
83 public List<WorkerInfo> getWorkerInfos() {
84 return workerInfos;
85 }
86
87
88
89
90
91
92 public Collection<PartitionOwner> getPartitionOwners() {
93 return partitionOwners;
94 }
95
96 @Override
97 public void write(DataOutput output) throws IOException {
98 masterInfo.write(output);
99
100 output.writeInt(workerInfos.size());
101 for (WorkerInfo workerInfo : workerInfos) {
102 workerInfo.write(output);
103 }
104
105 Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos);
106
107
108 List<WorkerInfo> previousWorkerInfos = Lists.newArrayList();
109 for (PartitionOwner partitionOwner : partitionOwners) {
110 if (partitionOwner.getPreviousWorkerInfo() != null) {
111 if (!workerInfoMap.containsKey(
112 partitionOwner.getPreviousWorkerInfo().getTaskId())) {
113 previousWorkerInfos.add(partitionOwner.getPreviousWorkerInfo());
114 }
115 }
116 }
117 output.writeInt(previousWorkerInfos.size());
118 for (WorkerInfo workerInfo : previousWorkerInfos) {
119 workerInfo.write(output);
120 }
121
122 output.writeInt(partitionOwners.size());
123 if (partitionOwners.size() > 0) {
124 WritableUtils.writeClass(
125 partitionOwners.iterator().next().getClass(), output);
126 }
127 for (PartitionOwner partitionOwner : partitionOwners) {
128 partitionOwner.writeWithWorkerIds(output);
129 }
130 }
131
132 @Override
133 public void readFields(DataInput input) throws IOException {
134 masterInfo = new MasterInfo();
135 masterInfo.readFields(input);
136
137 int workerInfosSize = input.readInt();
138 workerInfos = Lists.newArrayListWithCapacity(workerInfosSize);
139 for (int i = 0; i < workerInfosSize; i++) {
140 WorkerInfo workerInfo = new WorkerInfo();
141 workerInfo.readFields(input);
142 workerInfos.add(workerInfo);
143 }
144
145 Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos);
146 int additionalWorkerInfos = input.readInt();
147 for (int i = 0; i < additionalWorkerInfos; i++) {
148 WorkerInfo workerInfo = new WorkerInfo();
149 workerInfo.readFields(input);
150 workerInfoMap.put(workerInfo.getTaskId(), workerInfo);
151 }
152
153 int partitionOwnersSize = input.readInt();
154 Class<PartitionOwner> partitionOwnerClass = null;
155 if (partitionOwnersSize > 0) {
156 partitionOwnerClass = WritableUtils.readClass(input);
157 }
158 partitionOwners = Lists.newArrayListWithCapacity(partitionOwnersSize);
159 for (int i = 0; i < partitionOwnersSize; i++) {
160 PartitionOwner partitionOwner =
161 ReflectionUtils.newInstance(partitionOwnerClass);
162 partitionOwner.readFieldsWithWorkerIds(input, workerInfoMap);
163 partitionOwners.add(partitionOwner);
164 }
165 }
166
167
168
169
170
171
172
173 private static Map<Integer, WorkerInfo> getAsWorkerInfoMap(
174 Iterable<WorkerInfo> workerInfos) {
175 Map<Integer, WorkerInfo> workerInfoMap =
176 Maps.newHashMapWithExpectedSize(Iterables.size(workerInfos));
177 for (WorkerInfo workerInfo : workerInfos) {
178 workerInfoMap.put(workerInfo.getTaskId(), workerInfo);
179 }
180 return workerInfoMap;
181 }
182 }