This project has retired. For details please refer to its
Attic page.
BlockOutputHandle xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.output;
19
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Queue;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentLinkedQueue;
25
26 import org.apache.giraph.block_app.framework.api.BlockOutputApi;
27 import org.apache.giraph.conf.GiraphConstants;
28 import org.apache.giraph.utils.CallableFactory;
29 import org.apache.giraph.utils.ProgressableUtils;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.util.Progressable;
32
33
34
35
36 @SuppressWarnings("unchecked")
37 public class BlockOutputHandle implements BlockOutputApi {
38 private transient Configuration conf;
39 private transient Progressable progressable;
40 private final Map<String, BlockOutputDesc> outputDescMap;
41 private final Map<String, Queue<BlockOutputWriter>> freeWriters =
42 new HashMap<>();
43 private final Map<String, Queue<BlockOutputWriter>> occupiedWriters =
44 new HashMap<>();
45
46 public BlockOutputHandle() {
47 outputDescMap = null;
48 }
49
50 public BlockOutputHandle(String jobIdentifier, Configuration conf,
51 Progressable hadoopProgressable) {
52 outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap(
53 conf, jobIdentifier);
54 for (Map.Entry<String, BlockOutputDesc> entry : outputDescMap.entrySet()) {
55 entry.getValue().preWriting();
56 freeWriters.put(entry.getKey(),
57 new ConcurrentLinkedQueue<BlockOutputWriter>());
58 occupiedWriters.put(entry.getKey(),
59 new ConcurrentLinkedQueue<BlockOutputWriter>());
60 }
61 initialize(conf, hadoopProgressable);
62 }
63
64 public void initialize(Configuration conf, Progressable progressable) {
65 this.conf = conf;
66 this.progressable = progressable;
67 }
68
69
70 @Override
71 public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
72 OD getOutputDesc(String confOption) {
73 if (outputDescMap == null) {
74 throw new IllegalArgumentException(
75 "Output cannot be used with checkpointing");
76 }
77 return (OD) outputDescMap.get(confOption);
78 }
79
80 @Override
81 public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
82 if (outputDescMap == null) {
83 throw new IllegalArgumentException(
84 "Output cannot be used with checkpointing");
85 }
86 OW outputWriter = (OW) freeWriters.get(confOption).poll();
87 if (outputWriter == null) {
88 outputWriter = (OW) outputDescMap.get(confOption).createOutputWriter(
89 conf, progressable);
90 }
91 occupiedWriters.get(confOption).add(outputWriter);
92 return outputWriter;
93 }
94
95 public void returnAllWriters() {
96 for (Map.Entry<String, Queue<BlockOutputWriter>> entry :
97 occupiedWriters.entrySet()) {
98 freeWriters.get(entry.getKey()).addAll(entry.getValue());
99 entry.getValue().clear();
100 }
101 }
102
103 public void closeAllWriters() {
104 final Queue<BlockOutputWriter> allWriters = new ConcurrentLinkedQueue<>();
105 for (Queue<BlockOutputWriter> blockOutputWriters : freeWriters.values()) {
106 allWriters.addAll(blockOutputWriters);
107 }
108 if (allWriters.isEmpty()) {
109 return;
110 }
111
112 CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
113 @Override
114 public Callable<Void> newCallable(int callableId) {
115 return new Callable<Void>() {
116 @Override
117 public Void call() throws Exception {
118 BlockOutputWriter writer = allWriters.poll();
119 while (writer != null) {
120 writer.close();
121 writer = allWriters.poll();
122 }
123 return null;
124 }
125 };
126 }
127 };
128 ProgressableUtils.getResultsWithNCallables(callableFactory,
129 Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf),
130 allWriters.size()), "close-writers-%d", progressable);
131
132 for (BlockOutputDesc outputDesc : outputDescMap.values()) {
133 outputDesc.postWriting();
134 }
135 }
136 }