This project has retired. For details please refer to its
Attic page.
BspCase xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph;
20
21 import org.apache.giraph.conf.GiraphConfiguration;
22 import org.apache.giraph.conf.GiraphConstants;
23 import org.apache.giraph.io.formats.FileOutputFormatUtil;
24 import org.apache.giraph.job.GiraphJob;
25 import org.apache.giraph.utils.FileUtils;
26 import org.apache.giraph.zk.ZooKeeperExt;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FSDataInputStream;
29 import org.apache.hadoop.fs.FileStatus;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.fs.PathFilter;
33 import org.apache.zookeeper.WatchedEvent;
34 import org.apache.zookeeper.Watcher;
35 import org.junit.After;
36 import org.junit.Before;
37
38 import com.google.common.base.Charsets;
39 import com.google.common.base.Preconditions;
40 import com.google.common.io.Closeables;
41
42 import java.io.BufferedReader;
43 import java.io.IOException;
44 import java.io.InputStreamReader;
45 import java.util.List;
46
47
48
49
50 @SuppressWarnings("unchecked")
51 public class BspCase implements Watcher {
52
53 private final String jobTracker =
54 System.getProperty("prop.mapred.job.tracker");
55
56 private final String jarLocation =
57 System.getProperty("prop.jarLocation", "");
58
59 private int numWorkers = 1;
60
61 private final String zkList = System.getProperty("prop.zookeeper.list");
62 private String testName;
63
64
65 static final Path DEFAULT_TEMP_DIR =
66 new Path(System.getProperty("java.io.tmpdir"), "_giraphTests");
67
68 public static final String READER_VERTICES_OPT =
69 "GeneratedVertexReader.reader_vertices";
70
71
72 static final PathFilter PARTS_FILTER = new PathFilter() {
73 @Override
74 public boolean accept(Path path) {
75 return path.getName().startsWith("part-");
76 }
77 };
78
79
80
81
82 public final Configuration setupConfiguration(GiraphJob job)
83 throws IOException {
84 GiraphConfiguration conf = job.getConfiguration();
85 conf.set("mapred.jar", getJarLocation());
86
87
88 if (runningInDistributedMode()) {
89 System.out.println("setupConfiguration: Sending job to job tracker " +
90 jobTracker + " with jar path " + getJarLocation()
91 + " for " + getName());
92 conf.set("mapred.job.tracker", jobTracker);
93 conf.setWorkerConfiguration(getNumWorkers(), getNumWorkers(), 100.0f);
94 }
95 else {
96 System.out.println("setupConfiguration: Using local job runner with " +
97 "location " + getJarLocation() + " for " + getName());
98 conf.setWorkerConfiguration(1, 1, 100.0f);
99
100 GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
101 GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
102 }
103 conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
104 conf.setEventWaitMsecs(3 * 1000);
105 GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
106 if (getZooKeeperList() != null) {
107 conf.setZooKeeperConfiguration(getZooKeeperList());
108 }
109
110 conf.setLong(READER_VERTICES_OPT, 5);
111
112
113 Path zookeeperDir = getTempPath("_bspZooKeeper");
114 Path zkManagerDir = getTempPath("_defaultZkManagerDir");
115 Path checkPointDir = getTempPath("_checkpoints");
116
117
118 FileUtils.deletePath(conf, zookeeperDir);
119 FileUtils.deletePath(conf, zkManagerDir);
120 FileUtils.deletePath(conf, checkPointDir);
121
122 conf.set(GiraphConstants.ZOOKEEPER_DIR, zookeeperDir.toString());
123 GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
124 zkManagerDir.toString());
125 GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkPointDir.toString());
126
127 return conf;
128 }
129
130
131
132
133
134
135
136 protected Path getTempPath(String name) {
137 return new Path(DEFAULT_TEMP_DIR, name);
138 }
139
140
141
142
143
144
145
146
147
148 protected GiraphJob prepareJob(String name, GiraphConfiguration conf)
149 throws IOException {
150 return prepareJob(name, conf, null);
151 }
152
153
154
155
156
157
158
159
160
161
162 protected GiraphJob prepareJob(String name, GiraphConfiguration conf,
163 Path outputPath)
164 throws IOException {
165 GiraphJob job = new GiraphJob(conf, name);
166 setupConfiguration(job);
167 if (outputPath != null) {
168 removeAndSetOutput(job, outputPath);
169 }
170 return job;
171 }
172
173 private String getName() {
174 return testName;
175 }
176
177
178
179
180
181
182 public BspCase(String testName) {
183 this.testName = testName;
184 }
185
186
187
188
189
190
191 public int getNumWorkers() {
192 return numWorkers;
193 }
194
195
196
197
198 public String getZooKeeperList() {
199 return zkList;
200 }
201
202
203
204
205
206
207 String getJarLocation() {
208 return jarLocation;
209 }
210
211
212
213
214
215
216 public boolean runningInDistributedMode() {
217 return jobTracker != null;
218 }
219
220
221
222
223
224
225
226
227
228 public static FileStatus getSinglePartFileStatus(Configuration conf,
229 Path partDirPath) throws IOException {
230 FileSystem fs = FileSystem.get(conf);
231 FileStatus singlePartFileStatus = null;
232 int partFiles = 0;
233 for (FileStatus fileStatus : fs.listStatus(partDirPath)) {
234 if (fileStatus.getPath().getName().equals("part-m-00000")) {
235 singlePartFileStatus = fileStatus;
236 }
237 if (fileStatus.getPath().getName().startsWith("part-m-")) {
238 ++partFiles;
239 }
240 }
241
242 Preconditions.checkState(partFiles == 1, "getSinglePartFile: Part file " +
243 "count should be 1, but is " + partFiles);
244
245 return singlePartFileStatus;
246 }
247
248
249
250
251
252
253
254
255
256
257 public int getNumResults(Configuration conf, Path outputPath)
258 throws IOException {
259 FileSystem fs = FileSystem.get(conf);
260 int numResults = 0;
261 for (FileStatus status : fs.listStatus(outputPath, PARTS_FILTER)) {
262 FSDataInputStream in = null;
263 BufferedReader reader = null;
264 try {
265 in = fs.open(status.getPath());
266 reader = new BufferedReader(new InputStreamReader(in, Charsets.UTF_8));
267 while (reader.readLine() != null) {
268 numResults++;
269 }
270 } finally {
271 Closeables.close(in, true);
272 Closeables.close(reader, true);
273 }
274 }
275 return numResults;
276 }
277
278 @Before
279 public void setUp() {
280 if (runningInDistributedMode()) {
281 System.out.println("setUp: Setting tasks to 3 for " + getName() +
282 " since JobTracker exists...");
283 numWorkers = 3;
284 }
285 try {
286 cleanupTemporaryFiles();
287
288 if (zkList == null) {
289 return;
290 }
291 ZooKeeperExt zooKeeperExt =
292 new ZooKeeperExt(zkList, 30 * 1000, 0, 0, this);
293 List<String> rootChildren =
294 zooKeeperExt.getChildrenExt("/", false, false, true);
295 for (String rootChild : rootChildren) {
296 if (rootChild.startsWith("/_hadoopBsp")) {
297 List<String> children =
298 zooKeeperExt.getChildrenExt(rootChild, false, false, true);
299 for (String child: children) {
300 if (child.contains("job_local_")) {
301 System.out.println("Cleaning up " + child);
302 zooKeeperExt.deleteExt(child, -1, true);
303 }
304 }
305 }
306 }
307 zooKeeperExt.close();
308 } catch (Exception e) {
309 throw new RuntimeException(e);
310 }
311 }
312
313 @After
314 public void tearDown() throws IOException {
315 cleanupTemporaryFiles();
316 }
317
318
319
320
321 private void cleanupTemporaryFiles() throws IOException {
322 FileUtils.deletePath(new Configuration(), DEFAULT_TEMP_DIR);
323 }
324
325 @Override
326 public void process(WatchedEvent event) {
327
328 }
329
330
331
332
333
334
335
336
337
338
339 public static void removeAndSetOutput(GiraphJob job,
340 Path outputPath) throws IOException {
341 FileUtils.deletePath(job.getConfiguration(), outputPath);
342 FileOutputFormatUtil.setOutputPath(job.getInternalJob(), outputPath);
343 }
344
345 public static String getCallingMethodName() {
346 return Thread.currentThread().getStackTrace()[2].getMethodName();
347 }
348 }