This project has retired. For details please refer to its
Attic page.
TestGraphPartitioner xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph;
20
21 import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26
27 import org.apache.giraph.conf.GiraphConfiguration;
28 import org.apache.giraph.conf.GiraphConstants;
29 import org.apache.giraph.examples.SimpleCheckpoint;
30 import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
31 import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
32 import org.apache.giraph.job.GiraphJob;
33 import org.apache.giraph.partition.HashRangePartitionerFactory;
34 import org.apache.giraph.partition.PartitionBalancer;
35 import org.apache.giraph.partition.SimpleLongRangePartitionerFactory;
36 import org.apache.hadoop.fs.FileStatus;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.junit.Test;
40
41
42
43
44 public class TestGraphPartitioner extends BspCase {
45 public TestGraphPartitioner() {
46 super(TestGraphPartitioner.class.getName());
47 }
48
49 private void verifyOutput(FileSystem fs, Path outputPath)
50 throws IOException {
51
52 final int correctLen = 120;
53 if (runningInDistributedMode()) {
54 FileStatus[] fileStatusArr = fs.listStatus(outputPath);
55 int totalLen = 0;
56 for (FileStatus fileStatus : fileStatusArr) {
57 if (fileStatus.getPath().toString().contains("/part-m-")) {
58 totalLen += fileStatus.getLen();
59 }
60 }
61 assertEquals(correctLen, totalLen);
62 }
63 }
64
65
66
67
68
69
70
71
72
73 @Test
74 public void testPartitioners()
75 throws IOException, InterruptedException, ClassNotFoundException {
76 Path outputPath = getTempPath("testVertexBalancer");
77 GiraphConfiguration conf = new GiraphConfiguration();
78 conf.setComputationClass(
79 SimpleCheckpoint.SimpleCheckpointComputation.class);
80 conf.setWorkerContextClass(
81 SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
82 conf.setMasterComputeClass(
83 SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
84 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
85 conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
86 GiraphJob job = prepareJob("testVertexBalancer", conf, outputPath);
87
88 job.getConfiguration().set(
89 PartitionBalancer.PARTITION_BALANCE_ALGORITHM,
90 PartitionBalancer.VERTICES_BALANCE_ALGORITHM);
91
92 assertTrue(job.run(true));
93 FileSystem hdfs = FileSystem.get(job.getConfiguration());
94
95 conf = new GiraphConfiguration();
96 conf.setComputationClass(
97 SimpleCheckpoint.SimpleCheckpointComputation.class);
98 conf.setWorkerContextClass(
99 SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
100 conf.setMasterComputeClass(
101 SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
102 conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
103 conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
104 outputPath = getTempPath("testHashPartitioner");
105 job = prepareJob("testHashPartitioner", conf, outputPath);
106 assertTrue(job.run(true));
107 verifyOutput(hdfs, outputPath);
108
109 job = new GiraphJob("testHashRangePartitioner");
110 setupConfiguration(job);
111 job.getConfiguration().setComputationClass(
112 SimpleCheckpoint.SimpleCheckpointComputation.class);
113 job.getConfiguration().setWorkerContextClass(
114 SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
115 job.getConfiguration().setMasterComputeClass(
116 SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
117 job.getConfiguration().setVertexInputFormatClass(
118 SimpleSuperstepVertexInputFormat.class);
119 job.getConfiguration().setVertexOutputFormatClass(
120 SimpleSuperstepVertexOutputFormat.class);
121 job.getConfiguration().setGraphPartitionerFactoryClass(
122 HashRangePartitionerFactory.class);
123 outputPath = getTempPath("testHashRangePartitioner");
124 removeAndSetOutput(job, outputPath);
125 assertTrue(job.run(true));
126 verifyOutput(hdfs, outputPath);
127
128 job = new GiraphJob("testSimpleRangePartitioner");
129 setupConfiguration(job);
130 job.getConfiguration().setComputationClass(
131 SimpleCheckpoint.SimpleCheckpointComputation.class);
132 job.getConfiguration().setWorkerContextClass(
133 SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
134 job.getConfiguration().setMasterComputeClass(
135 SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
136 job.getConfiguration().setVertexInputFormatClass(
137 SimpleSuperstepVertexInputFormat.class);
138 job.getConfiguration().setVertexOutputFormatClass(
139 SimpleSuperstepVertexOutputFormat.class);
140
141 job.getConfiguration().setGraphPartitionerFactoryClass(
142 SimpleLongRangePartitionerFactory.class);
143 long readerVertices =
144 READER_VERTICES.getWithDefault(job.getConfiguration(), -1L);
145 job.getConfiguration().setLong(
146 GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, readerVertices);
147
148 outputPath = getTempPath("testSimpleRangePartitioner");
149 removeAndSetOutput(job, outputPath);
150 assertTrue(job.run(true));
151 verifyOutput(hdfs, outputPath);
152 }
153 }