This project has retired. For details please refer to its
Attic page.
OutOfCoreIOScheduler xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.conf.IntConfOption;
23 import org.apache.giraph.ooc.command.IOCommand;
24 import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
25 import org.apache.giraph.ooc.command.StoreDataBufferIOCommand;
26 import org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand;
27 import org.apache.giraph.ooc.command.StorePartitionIOCommand;
28 import org.apache.giraph.ooc.command.WaitIOCommand;
29 import org.apache.giraph.ooc.policy.OutOfCoreOracle;
30 import org.apache.log4j.Logger;
31
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37
38 import static com.google.common.base.Preconditions.checkNotNull;
39
40
41
42
43 public class OutOfCoreIOScheduler {
44
45
46
47
48
49 public static final IntConfOption OOC_WAIT_INTERVAL =
50 new IntConfOption("giraph.oocWaitInterval", 1000,
51 "Duration (in milliseconds) which IO threads in out-of-core " +
52 "mechanism would wait until a command becomes available");
53
54 private static final Logger LOG =
55 Logger.getLogger(OutOfCoreIOScheduler.class);
56
57 private final OutOfCoreEngine oocEngine;
58
59 private final int waitInterval;
60
61
62
63
64 private final List<Queue<IOCommand>> threadLoadCommandQueue;
65
66 private volatile boolean shouldTerminate;
67
68
69
70
71
72
73
74
75 OutOfCoreIOScheduler(final ImmutableClassesGiraphConfiguration conf,
76 OutOfCoreEngine oocEngine, int numDisks) {
77 this.oocEngine = oocEngine;
78 this.waitInterval = OOC_WAIT_INTERVAL.get(conf);
79 threadLoadCommandQueue = new ArrayList<>(numDisks);
80 for (int i = 0; i < numDisks; ++i) {
81 threadLoadCommandQueue.add(
82 new ConcurrentLinkedQueue<IOCommand>());
83 }
84 shouldTerminate = false;
85 }
86
87
88
89
90
91
92
93 public IOCommand getNextIOCommand(int threadId) {
94 if (shouldTerminate) {
95 return null;
96 }
97 IOCommand command = null;
98 do {
99 if (command != null && LOG.isInfoEnabled()) {
100 LOG.info("getNextIOCommand: command " + command + " was proposed to " +
101 "the oracle, but got denied. Generating another command!");
102 }
103 OutOfCoreOracle.IOAction[] actions =
104 oocEngine.getOracle().getNextIOActions();
105 if (LOG.isDebugEnabled()) {
106 LOG.debug("getNextIOCommand: actions are " + Arrays.toString(actions));
107 }
108
109 if (!threadLoadCommandQueue.get(threadId).isEmpty()) {
110
111 boolean canLoad = false;
112 for (OutOfCoreOracle.IOAction action : actions) {
113 if (action == OutOfCoreOracle.IOAction.LOAD_PARTITION ||
114 action == OutOfCoreOracle.IOAction.LOAD_UNPROCESSED_PARTITION ||
115 action == OutOfCoreOracle.IOAction.LOAD_TO_SWAP_PARTITION ||
116 action == OutOfCoreOracle.IOAction.URGENT_LOAD_PARTITION) {
117 canLoad = true;
118 break;
119 }
120 }
121 if (canLoad) {
122 command = threadLoadCommandQueue.get(threadId).poll();
123 checkNotNull(command);
124 if (oocEngine.getOracle().approve(command)) {
125 return command;
126 } else {
127
128
129 threadLoadCommandQueue.get(threadId).offer(command);
130 }
131 }
132 }
133 command = null;
134 for (OutOfCoreOracle.IOAction action : actions) {
135 Integer partitionId;
136 switch (action) {
137 case STORE_MESSAGES_AND_BUFFERS:
138 partitionId = oocEngine.getMetaPartitionManager()
139 .getOffloadPartitionBufferId(threadId);
140 if (partitionId != null) {
141 command = new StoreDataBufferIOCommand(oocEngine, partitionId,
142 StoreDataBufferIOCommand.DataBufferType.PARTITION);
143 } else {
144 partitionId = oocEngine.getMetaPartitionManager()
145 .getOffloadMessageBufferId(threadId);
146 if (partitionId != null) {
147 command = new StoreDataBufferIOCommand(oocEngine, partitionId,
148 StoreDataBufferIOCommand.DataBufferType.MESSAGE);
149 } else {
150 partitionId = oocEngine.getMetaPartitionManager()
151 .getOffloadMessageId(threadId);
152 if (partitionId != null) {
153 command = new StoreIncomingMessageIOCommand(oocEngine,
154 partitionId);
155 }
156 }
157 }
158 break;
159 case STORE_PROCESSED_PARTITION:
160 partitionId = oocEngine.getMetaPartitionManager()
161 .getOffloadPartitionId(threadId);
162 if (partitionId != null &&
163 oocEngine.getMetaPartitionManager()
164 .isPartitionProcessed(partitionId)) {
165 command = new StorePartitionIOCommand(oocEngine, partitionId);
166 }
167 break;
168 case STORE_PARTITION:
169 partitionId = oocEngine.getMetaPartitionManager()
170 .getOffloadPartitionId(threadId);
171 if (partitionId != null) {
172 command = new StorePartitionIOCommand(oocEngine, partitionId);
173 }
174 break;
175 case LOAD_UNPROCESSED_PARTITION:
176 partitionId = oocEngine.getMetaPartitionManager()
177 .getLoadPartitionId(threadId);
178 if (partitionId != null &&
179 !oocEngine.getMetaPartitionManager()
180 .isPartitionProcessed(partitionId)) {
181 command = new LoadPartitionIOCommand(oocEngine, partitionId,
182 oocEngine.getSuperstep());
183 }
184 break;
185 case LOAD_TO_SWAP_PARTITION:
186 partitionId = oocEngine.getMetaPartitionManager()
187 .getLoadPartitionId(threadId);
188 if (partitionId != null &&
189 !oocEngine.getMetaPartitionManager()
190 .isPartitionProcessed(partitionId) &&
191 oocEngine.getMetaPartitionManager().hasProcessedOnMemory()) {
192 command = new LoadPartitionIOCommand(oocEngine, partitionId,
193 oocEngine.getSuperstep());
194 }
195 break;
196 case LOAD_PARTITION:
197 partitionId = oocEngine.getMetaPartitionManager()
198 .getLoadPartitionId(threadId);
199 if (partitionId != null) {
200 if (oocEngine.getMetaPartitionManager()
201 .isPartitionProcessed(partitionId)) {
202 command = new LoadPartitionIOCommand(oocEngine, partitionId,
203 oocEngine.getSuperstep() + 1);
204 } else {
205 command = new LoadPartitionIOCommand(oocEngine, partitionId,
206 oocEngine.getSuperstep());
207 }
208 }
209 break;
210 case URGENT_LOAD_PARTITION:
211
212 break;
213 default:
214 throw new IllegalStateException("getNextIOCommand: the IO action " +
215 "is not defined!");
216 }
217 if (command != null) {
218 break;
219 }
220 }
221 if (command == null) {
222 command = new WaitIOCommand(oocEngine, waitInterval);
223 }
224 } while (!oocEngine.getOracle().approve(command));
225 return command;
226 }
227
228
229
230
231
232
233 public void ioCommandCompleted(IOCommand command) {
234 oocEngine.ioCommandCompleted(command);
235 }
236
237
238
239
240
241
242 public void addIOCommand(IOCommand ioCommand) {
243 if (ioCommand instanceof LoadPartitionIOCommand) {
244 int ownerThread = oocEngine.getMetaPartitionManager()
245 .getOwnerThreadId(ioCommand.getPartitionId());
246 threadLoadCommandQueue.get(ownerThread).offer(ioCommand);
247 } else {
248 throw new IllegalStateException("addIOCommand: IO command type is not " +
249 "supported for addition");
250 }
251 }
252
253
254
255
256 public void shutdown() {
257 shouldTerminate = true;
258 if (LOG.isInfoEnabled()) {
259 LOG.info("shutdown: OutOfCoreIOScheduler shutting down!");
260 }
261 }
262 }