1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.ooc;
2021import com.sun.management.GarbageCollectionNotificationInfo;
22import com.yammer.metrics.core.Gauge;
23import org.apache.giraph.bsp.BspService;
24import org.apache.giraph.bsp.CentralizedServiceWorker;
25import org.apache.giraph.comm.NetworkMetrics;
26import org.apache.giraph.comm.ServerData;
27import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
28import org.apache.giraph.comm.flow_control.FlowControl;
29import org.apache.giraph.conf.GiraphConstants;
30import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31import org.apache.giraph.metrics.GiraphMetrics;
32import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
33import org.apache.giraph.metrics.SuperstepMetricsRegistry;
34import org.apache.giraph.ooc.data.MetaPartitionManager;
35import org.apache.giraph.ooc.command.IOCommand;
36import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
37import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
38import org.apache.giraph.ooc.policy.FixedPartitionsOracle;
39import org.apache.giraph.ooc.policy.OutOfCoreOracle;
40import org.apache.giraph.utils.AdjustableSemaphore;
41import org.apache.giraph.worker.BspServiceWorker;
42import org.apache.log4j.Logger;
4344import java.lang.reflect.Constructor;
45import java.lang.reflect.InvocationTargetException;
46import java.util.concurrent.locks.ReadWriteLock;
47import java.util.concurrent.locks.ReentrantReadWriteLock;
4849importstatic com.google.common.base.Preconditions.checkState;
5051/**52 * Class to represent an out-of-core engine.53 */54publicclassOutOfCoreEngineimplementsResetSuperstepMetricsObserver {
55/**56 * Number of 'units of processing' after which an active thread should57 * check-in with the out-of-core engine in order to re-claim its permission to58 * stay active. For a compute thread, the 'unit of processing' is processing59 * of one vertex, and for an input thread, the 'unit of processing' is reading60 * a row of input data.61 */62publicstaticfinalint CHECK_IN_INTERVAL = (1 << 10) - 1;
63/** Name of metric for percentage of graph on disk */64publicstaticfinal String GRAPH_PERCENTAGE_IN_MEMORY = "ooc-graph-in-mem-%";
65/** Class logger. */66privatestaticfinal Logger LOG = Logger.getLogger(OutOfCoreEngine.class);
67/**68 * When getting partitions, how many milliseconds to wait if no partition was69 * available in memory70 */71privatestaticfinallong MSEC_TO_WAIT = 10000;
72/** Service worker */73privatefinal CentralizedServiceWorker<?, ?, ?> service;
74/** Flow control used in sending requests */75privateFlowControl flowControl;
76/** Scheduler for IO threads */77privatefinalOutOfCoreIOScheduler ioScheduler;
78/** Data structure to keep meta partition information */79privatefinalMetaPartitionManager metaPartitionManager;
80/** Out-of-core oracle (brain of out-of-core mechanism) */81privatefinalOutOfCoreOracle oracle;
82/** IO statistics collector */83privatefinalOutOfCoreIOStatistics statistics;
84/**85 * Global lock for entire superstep. This lock helps to avoid overlapping of86 * out-of-core decisions (what to do next to help the out-of-core mechanism)87 * with out-of-core operations (actual IO operations).88 */89privatefinal ReadWriteLock superstepLock = new ReentrantReadWriteLock();
90/** Data accessor object (DAO) used as persistence layer in out-of-core */91privatefinalOutOfCoreDataAccessor dataAccessor;
92/** Callable factory for IO threads */93privatefinalOutOfCoreIOCallableFactory oocIOCallableFactory;
94/**95 * Dummy object to wait on until a partition becomes available in memory96 * for processing97 */98privatefinal Object partitionAvailable = new Object();
99/** How many compute threads do we have? */100privateint numComputeThreads;
101/** How many threads (input/compute) are processing data? */102privatevolatileint numProcessingThreads;
103/** Semaphore used for controlling number of active threads at each moment */104privatefinalAdjustableSemaphore activeThreadsPermit;
105/**106 * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max107 * credit used for credit-based flow-control mechanism)108 */109privatefinalshort maxRequestsCredit;
110/**111 * Generally, the logic in Giraph for change of the superstep happens in the112 * following order:113 * (1) Compute threads are done processing all partitions114 * (2) Superstep number increases115 * (3) New message store is created and message stores are prepared116 * (4) Iteration over partitions starts117 * Note that there are other operations happening at the same time as well as118 * the above operations, but the above operations are the ones which may119 * interfere with out-of-core operations. The goal of `superstepLock` is to120 * isolate operations 2, 3, and 4 from the rest of computations and IO121 * operations. Specifically, increasing the superstep counter (operation 2)122 * should be exclusive and no IO operation should happen at the same time.123 * This is due to the fact that prefetching mechanism uses superstep counter124 * as a mean to identify which data should be read. That being said, superstep125 * counter should be cached in out-of-core engine, and all IO operations and126 * out-of-core logic should access superstep counter through this cached127 * value.128 */129privatelong superstep;
130/**131 * Generally, the logic of a graph computations happens in the following order132 * with respect to `startIteration` and `reset` method:133 * ...134 * startIteration (for moving edges)135 * ...136 * reset (to prepare messages/partitions for superstep 0)137 * ...138 * startIteration (superstep 0)139 * ...140 * reset (to prepare messages/partitions for superstep 1)141 * ...142 *143 * However, in the unit tests, we usually consider only one superstep (usually144 * INPUT_SUPERSTEP), and we move through partitions multiple times. Out-of-145 * core mechanism works only if partitions are reset in a proper way. So,146 * we keep the following flag to reset partitions if necessary.147 */148privateboolean resetDone;
149150/**151 * Provides statistics about network traffic (e.g. received bytes per152 * superstep etc).153 */154privatefinalNetworkMetrics networkMetrics;
155156/**157 * Constructor158 *159 * @param conf Configuration160 * @param service Service worker161 * @param networkMetrics Interface for network stats162 */163publicOutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
164 CentralizedServiceWorker<?, ?, ?> service,
165NetworkMetrics networkMetrics) {
166this.service = service;
167this.networkMetrics = networkMetrics;
168 Class<? extends OutOfCoreDataAccessor> accessorClass =
169 GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf);
170try {
171 Constructor<?> constructor = accessorClass.getConstructor(
172 ImmutableClassesGiraphConfiguration.class);
173this.dataAccessor = (OutOfCoreDataAccessor) constructor.newInstance(conf);
174 } catch (NoSuchMethodException | InstantiationException |
175 InvocationTargetException | IllegalAccessException e) {
176thrownew IllegalStateException("OutOfCoreEngine: caught exception " +
177"while creating the data accessor instance!", e);
178 }
179int numIOThreads = dataAccessor.getNumAccessorThreads();
180this.oocIOCallableFactory =
181newOutOfCoreIOCallableFactory(this, numIOThreads,
182 service.getGraphTaskManager().createUncaughtExceptionHandler());
183this.ioScheduler = newOutOfCoreIOScheduler(conf, this, numIOThreads);
184this.metaPartitionManager = newMetaPartitionManager(numIOThreads, this);
185this.statistics = newOutOfCoreIOStatistics(conf, numIOThreads);
186int maxPartitionsInMemory =
187 GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
188 Class<? extends OutOfCoreOracle> oracleClass =
189 GiraphConstants.OUT_OF_CORE_ORACLE.get(conf);
190if (maxPartitionsInMemory != 0 &&
191 oracleClass != FixedPartitionsOracle.class) {
192 LOG.warn("OutOfCoreEngine: Max number of partitions in memory is set " +
193"but the out-of-core oracle used is not tailored for fixed " +
194"out-of-core policy. Setting the oracle to be FixedPartitionsOracle");
195 oracleClass = FixedPartitionsOracle.class;
196 }
197this.numComputeThreads = conf.getNumComputeThreads();
198// At the beginning of the execution, only input threads are processing data199this.numProcessingThreads = conf.getNumInputSplitsThreads();
200this.activeThreadsPermit = newAdjustableSemaphore(numProcessingThreads);
201this.maxRequestsCredit = (short)
202 CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
203this.superstep = BspService.INPUT_SUPERSTEP;
204this.resetDone = false;
205 GiraphMetrics.get().addSuperstepResetObserver(this);
206try {
207 Constructor<?> constructor = oracleClass.getConstructor(
208 ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
209this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
210 } catch (NoSuchMethodException | IllegalAccessException |
211 InstantiationException | InvocationTargetException e) {
212thrownew IllegalStateException("OutOfCoreEngine: caught exception " +
213"while creating the oracle!", e);
214 }
215 }
216217/**218 * Initialize/Start the out-of-core engine.219 */220publicvoid initialize() {
221 dataAccessor.initialize();
222 oocIOCallableFactory.createCallable();
223 }
224225/**226 * Shutdown/Stop the out-of-core engine.227 */228publicvoid shutdown() {
229if (LOG.isInfoEnabled()) {
230 LOG.info("shutdown: out-of-core engine shutting down, signalling IO " +
231"threads to shutdown");
232 }
233 ioScheduler.shutdown();
234 oocIOCallableFactory.shutdown();
235 dataAccessor.shutdown();
236 }
237238/**239 * Get a reference to the server data240 *241 * @return ServerData242 */243publicServerData getServerData() {
244return service.getServerData();
245 }
246247/**248 * Get a reference to the service worker249 *250 * @return CentralizedServiceWorker251 */252publicCentralizedServiceWorker getServiceWorker() {
253return service;
254 }
255256/**257 * Get a reference to IO scheduler258 *259 * @return OutOfCoreIOScheduler260 */261publicOutOfCoreIOScheduler getIOScheduler() {
262return ioScheduler;
263 }
264265/**266 * Get a reference to meta partition information267 *268 * @return MetaPartitionManager269 */270publicMetaPartitionManager getMetaPartitionManager() {
271return metaPartitionManager;
272 }
273274/**275 * Get a reference to superstep lock276 *277 * @return read/write lock used for global superstep lock278 */279public ReadWriteLock getSuperstepLock() {
280return superstepLock;
281 }
282283/**284 * Get a reference to IO statistics collector285 *286 * @return IO statistics collector287 */288publicOutOfCoreIOStatistics getIOStatistics() {
289return statistics;
290 }
291292/**293 * Get a reference to out-of-core oracle294 *295 * @return out-of-core oracle296 */297publicOutOfCoreOracle getOracle() {
298return oracle;
299 }
300301/**302 * Get the id of the next partition to process in the current iteration over303 * all the partitions. If all partitions are already processed, this method304 * returns null.305 *306 * @return id of a partition to process. 'null' if all partitions are307 * processed in current iteration over partitions.308 */309public Integer getNextPartition() {
310 Integer partitionId;
311synchronized (partitionAvailable) {
312while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
313try {
314if (LOG.isInfoEnabled()) {
315 LOG.info("getNextPartition: waiting until a partition becomes " +
316"available!");
317 }
318 partitionAvailable.wait(MSEC_TO_WAIT);
319 } catch (InterruptedException e) {
320thrownew IllegalStateException("getNextPartition: caught " +
321"InterruptedException while waiting to retrieve a partition to " +
322"process");
323 }
324 }
325if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
326 partitionAvailable.notifyAll();
327 partitionId = null;
328 }
329 }
330return partitionId;
331 }
332333/**334 * Notify out-of-core engine that processing of a particular partition is done335 *336 * @param partitionId id of the partition that its processing is done337 */338publicvoid doneProcessingPartition(int partitionId) {
339 metaPartitionManager.setPartitionIsProcessed(partitionId);
340if (LOG.isInfoEnabled()) {
341 LOG.info("doneProcessingPartition: processing partition " + partitionId +
342" is done!");
343 }
344 }
345346/**347 * Notify out-of-core engine that iteration cycle over all partitions is about348 * to begin.349 */350 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
351"UL_UNRELEASED_LOCK_EXCEPTION_PATH")
352publicvoid startIteration() {
353 oracle.startIteration();
354if (!resetDone) {
355 superstepLock.writeLock().lock();
356 metaPartitionManager.resetPartitions();
357 superstepLock.writeLock().unlock();
358 }
359if (superstep != BspServiceWorker.INPUT_SUPERSTEP &&
360 numProcessingThreads != numComputeThreads) {
361// This method is only executed by the main thread, and at this point362// no other input/compute thread is alive. So, all the permits in363// `activeThreadsPermit` is available. However, now that we are changing364// the maximum number of active threads, we need to adjust the number365// of available permits on `activeThreadsPermit`.366 activeThreadsPermit.setMaxPermits(activeThreadsPermit.availablePermits() *
367 numComputeThreads / numProcessingThreads);
368 numProcessingThreads = numComputeThreads;
369 }
370if (LOG.isInfoEnabled()) {
371 LOG.info("startIteration: with " +
372 metaPartitionManager.getNumInMemoryPartitions() +
373" partitions in memory and " +
374 activeThreadsPermit.availablePermits() + " active threads");
375 }
376 resetDone = false;
377 }
378379/**380 * Retrieve a particular partition. After this method is complete the381 * requested partition should be in memory.382 *383 * @param partitionId id of the partition to retrieve384 */385publicvoid retrievePartition(int partitionId) {
386if (metaPartitionManager.isPartitionOnDisk(partitionId)) {
387 ioScheduler.addIOCommand(newLoadPartitionIOCommand(this, partitionId,
388 superstep));
389synchronized (partitionAvailable) {
390while (metaPartitionManager.isPartitionOnDisk(partitionId)) {
391try {
392if (LOG.isInfoEnabled()) {
393 LOG.info("retrievePartition: waiting until partition " +
394 partitionId + " becomes available");
395 }
396 partitionAvailable.wait();
397 } catch (InterruptedException e) {
398thrownew IllegalStateException("retrievePartition: caught " +
399"InterruptedException while waiting to retrieve partition " +
400 partitionId);
401 }
402 }
403 }
404 }
405 }
406407/**408 * Notify out-of-core engine that an IO command is completed by an IO thread409 *410 * @param command the IO command that is completed411 */412publicvoid ioCommandCompleted(IOCommand command) {
413 oracle.commandCompleted(command);
414if (command instanceof LoadPartitionIOCommand) {
415// Notifying compute threads who are waiting for a partition to become416// available in memory to process.417synchronized (partitionAvailable) {
418 partitionAvailable.notifyAll();
419 }
420 }
421 }
422423/**424 * Update the fraction of processing threads that should remain active. It is425 * the responsibility of out-of-core oracle to update the number of active426 * threads.427 *428 * @param fraction the fraction of processing threads to remain active. This429 * number is in range [0, 1]430 */431publicvoid updateActiveThreadsFraction(double fraction) {
432 checkState(fraction >= 0 && fraction <= 1);
433int numActiveThreads = (int) (numProcessingThreads * fraction);
434if (LOG.isInfoEnabled()) {
435 LOG.info("updateActiveThreadsFraction: updating the number of active " +
436"threads to " + numActiveThreads);
437 }
438 activeThreadsPermit.setMaxPermits(numActiveThreads);
439 }
440441/**442 * A processing thread would check in with out-of-core engine every once in a443 * while to make sure that it can still remain active. It is the444 * responsibility of the out-of-core oracle to update the number of active445 * threads in a way that the computation never fails, and yet achieve the446 * optimal performance it can achieve.447 */448publicvoid activeThreadCheckIn() {
449 activeThreadsPermit.release();
450try {
451 activeThreadsPermit.acquire();
452 } catch (InterruptedException e) {
453 LOG.error("activeThreadCheckIn: exception while acquiring a permit to " +
454"remain an active thread");
455thrownew IllegalStateException(e);
456 }
457 }
458459/**460 * Notify the out-of-core engine that a processing (input/compute) thread has461 * started.462 */463publicvoid processingThreadStart() {
464try {
465 activeThreadsPermit.acquire();
466 } catch (InterruptedException e) {
467 LOG.error("processingThreadStart: exception while acquiring a permit to" +
468" start the processing thread!");
469thrownew IllegalStateException(e);
470 }
471 }
472473/**474 * Notify the out-of-core engine that a processing (input/compute) thread has475 * finished.476 */477publicvoid processingThreadFinish() {
478 activeThreadsPermit.release();
479 }
480481/**482 * Update the credit announced for this worker in Netty. The lower the credit483 * is, the lower rate incoming messages arrive at this worker. Thus, credit484 * is an indirect way of controlling amount of memory incoming messages would485 * take.486 *487 * @param fraction the fraction of max credits others can use to send requests488 * to this worker489 */490publicvoid updateRequestsCreditFraction(double fraction) {
491 checkState(fraction >= 0 && fraction <= 1);
492short newCredit = (short) (maxRequestsCredit * fraction);
493if (LOG.isInfoEnabled()) {
494 LOG.info("updateRequestsCreditFraction: updating the credit to " +
495 newCredit);
496 }
497if (flowControl != null) {
498 ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
499 }
500 }
501502/**503 * Reset partitions and messages meta data. Also, reset the cached value of504 * superstep counter.505 */506publicvoid reset() {
507 metaPartitionManager.resetPartitions();
508 metaPartitionManager.resetMessages();
509 superstep = service.getSuperstep();
510 resetDone = true;
511 }
512513/**514 * @return cached value of the superstep counter515 */516publiclong getSuperstep() {
517return superstep;
518 }
519520/**521 * Notify the out-of-core engine that a GC has just been completed522 *523 * @param info GC information524 */525publicvoid gcCompleted(GarbageCollectionNotificationInfo info) {
526 oracle.gcCompleted(info);
527 }
528529 @Override
530publicvoid newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
531 superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
532 @Override
533public Double value() {
534return metaPartitionManager.getGraphFractionInMemory() * 100;
535 }
536 });
537 }
538539publicFlowControl getFlowControl() {
540return flowControl;
541 }
542543publicvoid setFlowControl(FlowControl flowControl) {
544this.flowControl = flowControl;
545 }
546547publicOutOfCoreDataAccessor getDataAccessor() {
548return dataAccessor;
549 }
550551publicNetworkMetrics getNetworkMetrics() {
552return networkMetrics;
553 }
554 }