This project has retired. For details please refer to its
Attic page.
BlockOutputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.output;
19
20 import java.io.IOException;
21 import java.util.HashMap;
22 import java.util.Map;
23
24 import org.apache.giraph.bsp.BspOutputFormat;
25 import org.apache.giraph.conf.GiraphConfiguration;
26 import org.apache.giraph.conf.GiraphConstants;
27 import org.apache.giraph.conf.StrConfOption;
28 import org.apache.giraph.utils.ConfigurationObjectUtils;
29 import org.apache.giraph.utils.DefaultOutputCommitter;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.mapreduce.JobContext;
32 import org.apache.hadoop.mapreduce.OutputCommitter;
33 import org.apache.hadoop.mapreduce.TaskAttemptContext;
34
35
36
37
38
39 public class BlockOutputFormat extends BspOutputFormat {
40 private static final StrConfOption OUTPUT_CONF_OPTIONS = new StrConfOption(
41 "giraph.outputConfOptions", "",
42 "List of conf options for outputs used");
43
44 public static <OD> void addOutputDesc(OD outputDesc, String confOption,
45 GiraphConfiguration conf) {
46 GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.set(conf,
47 BlockOutputFormat.class);
48 String currentOutputs = OUTPUT_CONF_OPTIONS.get(conf);
49 if (!currentOutputs.isEmpty()) {
50 currentOutputs = currentOutputs + ",";
51 }
52 OUTPUT_CONF_OPTIONS.set(conf, currentOutputs + confOption);
53 ConfigurationObjectUtils.setObjectKryo(outputDesc, confOption, conf);
54 }
55
56
57
58
59
60
61
62
63 public static String[] getOutputConfOptions(Configuration conf) {
64 String outputConfOptions = OUTPUT_CONF_OPTIONS.get(conf);
65 return outputConfOptions.isEmpty() ?
66 new String[0] : outputConfOptions.split(",");
67 }
68
69 public static <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
70 OD createInitAndCheckOutputDesc(String confOption, Configuration conf,
71 String jobIdentifier) {
72 OD outputDesc = ConfigurationObjectUtils.getObjectKryo(confOption, conf);
73 outputDesc.initializeAndCheck(jobIdentifier, conf);
74 return outputDesc;
75 }
76
77 public static Map<String, BlockOutputDesc>
78 createInitAndCheckOutputDescsMap(Configuration conf, String jobIdentifier) {
79 String[] outputConfOptions = getOutputConfOptions(conf);
80 Map<String, BlockOutputDesc> ret = new HashMap<>(outputConfOptions.length);
81 for (String outputConfOption : outputConfOptions) {
82 ret.put(outputConfOption,
83 createInitAndCheckOutputDesc(outputConfOption, conf, jobIdentifier));
84 }
85 return ret;
86 }
87
88 public static Map<String, BlockOutputDesc> createInitAndCheckOutputDescsMap(
89 JobContext jobContext) {
90 return createInitAndCheckOutputDescsMap(jobContext.getConfiguration(),
91 jobContext.getJobID().toString());
92 }
93
94 @Override
95 public void checkOutputSpecs(JobContext jobContext)
96 throws IOException, InterruptedException {
97 createInitAndCheckOutputDescsMap(jobContext);
98 }
99
100 @Override
101 public OutputCommitter getOutputCommitter(
102 TaskAttemptContext context) throws IOException, InterruptedException {
103 return new DefaultOutputCommitter() {
104 @Override
105 public void commit(JobContext jobContext) throws IOException {
106 Map<String, BlockOutputDesc> map =
107 createInitAndCheckOutputDescsMap(jobContext);
108 for (BlockOutputDesc outputDesc : map.values()) {
109 outputDesc.commit();
110 }
111 }
112 };
113 }
114 }