1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.ooc.data;
2021import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import com.google.common.util.concurrent.AtomicDouble;
24import org.apache.giraph.bsp.BspService;
25import org.apache.giraph.ooc.OutOfCoreEngine;
26import org.apache.giraph.worker.BspServiceWorker;
27import org.apache.giraph.worker.WorkerProgress;
28import org.apache.log4j.Logger;
2930import java.util.ArrayList;
31import java.util.Iterator;
32import java.util.List;
33import java.util.Random;
34import java.util.Set;
35import java.util.concurrent.ConcurrentMap;
36import java.util.concurrent.atomic.AtomicInteger;
3738importstatic com.google.common.base.Preconditions.checkState;
3940/**41 * Class to keep meta-information about partition data, edge data, and message42 * data of each partition on a worker.43 */44publicclassMetaPartitionManager {
45/**46 * Flag representing no partitions is left to process in the current iteration47 * cycle over all partitions.48 */49publicstaticfinalint NO_PARTITION_TO_PROCESS = -1;
5051/** Class logger */52privatestaticfinal Logger LOG =
53 Logger.getLogger(MetaPartitionManager.class);
54/** Different storage states for data */55private enum StorageState { IN_MEM, ON_DISK, IN_TRANSIT };
56/**57 * Different storage states for a partition as a whole (i.e. the partition58 * and its current messages)59 */60private enum PartitionStorageState61/**62 * Either both partition and its current messages are in memory, or both63 * are on disk, or one part is on disk and the other part is in memory.64 */65 { FULLY_IN_MEM, PARTIALLY_IN_MEM, FULLY_ON_DISK };
66/**67 * Different processing states for partitions. Processing states are reset68 * at the beginning of each iteration cycle over partitions.69 */70private enum ProcessingState { PROCESSED, UNPROCESSED, IN_PROCESS };
7172/**73 * Number of partitions in-memory (partition and current messages in memory)74 */75privatefinal AtomicInteger numInMemoryPartitions = new AtomicInteger(0);
76/**77 * Number of partitions that are partially in-memory (either partition or its78 * current messages is in memory and the other part is not)79 */80privatefinal AtomicInteger numPartiallyInMemoryPartitions =
81new AtomicInteger(0);
82/** Map (dictionary) of partitions to their meta information */83privatefinal ConcurrentMap<Integer, MetaPartition> partitions =
84 Maps.newConcurrentMap();
85/** Reverse dictionaries of partitions assigned to each IO thread */86privatefinal List<MetaPartitionDictionary> perThreadPartitionDictionary;
87/** For each IO thread, set of partition ids that are on-disk and have88 * 'large enough' vertex/edge buffers to be offloaded on disk89 */90privatefinal List<Set<Integer>> perThreadVertexEdgeBuffers;
91/**92 * For each IO thread, set of partition ids that are on-disk and have93 * 'large enough' message buffers to be offloaded on disk94 */95privatefinal List<Set<Integer>> perThreadMessageBuffers;
96/**97 * Out-of-core engine98 */99privatefinalOutOfCoreEngine oocEngine;
100/**101 * Number of processed partitions in the current iteration cycle over all102 * partitions103 */104privatefinal AtomicInteger numPartitionsProcessed = new AtomicInteger(0);
105/**106 * Random number generator to choose a thread to get one of its partition for107 * processing108 */109privatefinal Random randomGenerator;
110/**111 * What is the lowest fraction of partitions in memory, relative to the total112 * number of available partitions? This is an indirect estimation of the113 * amount of graph in memory, which can be used to estimate how many more114 * machines needed to avoid out-of-core execution. At the beginning all the115 * graph is in memory, so the fraction is 1. This fraction is calculated per116 * superstep.117 */118privatefinal AtomicDouble lowestGraphFractionInMemory =
119new AtomicDouble(1);
120/**121 * Map of partition ids to their indices. index of a partition is the order122 * with which the partition has been inserted. Partitions are indexed as 0, 1,123 * 2, etc. This indexing is later used to find the id of the IO thread who is124 * responsible for handling a partition. Partitions are assigned to IO threads125 * in a round-robin fashion based on their indices.126 */127privatefinal ConcurrentMap<Integer, Integer> partitionIndex =
128 Maps.newConcurrentMap();
129/**130 * Sequential counter used to assign indices to partitions as they are added131 */132privatefinal AtomicInteger indexCounter = new AtomicInteger(0);
133/** How many disks (i.e. IO threads) do we have? */134privatefinalint numIOThreads;
135136/**137 * Constructor138 *139 * @param numIOThreads number of IO threads140 * @param oocEngine out-of-core engine141 */142publicMetaPartitionManager(int numIOThreads, OutOfCoreEngine oocEngine) {
143 perThreadPartitionDictionary = new ArrayList<>(numIOThreads);
144 perThreadVertexEdgeBuffers = new ArrayList<>(numIOThreads);
145 perThreadMessageBuffers = new ArrayList<>(numIOThreads);
146for (int i = 0; i < numIOThreads; ++i) {
147 perThreadPartitionDictionary.add(newMetaPartitionDictionary());
148 perThreadMessageBuffers.add(Sets.<Integer>newConcurrentHashSet());
149 perThreadVertexEdgeBuffers.add(Sets.<Integer>newConcurrentHashSet());
150 }
151this.oocEngine = oocEngine;
152this.randomGenerator = new Random();
153this.numIOThreads = numIOThreads;
154 }
155156/**157 * @return number of partitions in memory158 */159publicint getNumInMemoryPartitions() {
160return numInMemoryPartitions.get();
161 }
162163/**164 * @return number of partitions that are partially in memory165 */166publicint getNumPartiallyInMemoryPartitions() {
167return numPartiallyInMemoryPartitions.get();
168 }
169170/**171 * Get total number of partitions172 *173 * @return total number of partitions174 */175publicint getNumPartitions() {
176return partitions.size();
177 }
178179/**180 * Since the statistics are based on estimates, we assume each partial181 * partition is taking about half of the full partition in terms of memory182 * footprint.183 *184 * @return estimate of fraction of graph in memory185 */186publicdouble getGraphFractionInMemory() {
187return (getNumInMemoryPartitions() +
188 getNumPartiallyInMemoryPartitions() / 2.0) / getNumPartitions();
189 }
190191/**192 * Update the lowest fraction of graph in memory so to have a more accurate193 * information in one of the counters.194 */195privatesynchronizedvoid updateGraphFractionInMemory() {
196double graphInMemory = getGraphFractionInMemory();
197if (graphInMemory < lowestGraphFractionInMemory.get()) {
198 lowestGraphFractionInMemory.set(graphInMemory);
199 WorkerProgress.get().updateLowestGraphPercentageInMemory(
200 (int) (graphInMemory * 100));
201 }
202 }
203204/**205 * Update the book-keeping about number of in-memory partitions and partially206 * in-memory partitions with regard to the storage status of the partition and207 * its current messages before and after an update to its status.208 *209 * @param stateBefore the storage state of the partition and its current210 * messages before an update211 * @param stateAfter the storage state of the partition and its current212 * messages after an update213 */214privatevoid updateCounters(PartitionStorageState stateBefore,
215PartitionStorageState stateAfter) {
216 numInMemoryPartitions.getAndAdd(
217 ((stateAfter == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0) -
218 ((stateBefore == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0));
219 numPartiallyInMemoryPartitions.getAndAdd(
220 ((stateAfter == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0) -
221 ((stateBefore == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0));
222 }
223224/**225 * Whether a given partition is available226 *227 * @param partitionId id of the partition to check if this worker owns it228 * @return true if the worker owns the partition, false otherwise229 */230publicboolean hasPartition(Integer partitionId) {
231return partitions.containsKey(partitionId);
232 }
233234/**235 * Return the list of all available partitions as an iterable236 *237 * @return list of all available partitions238 */239public Iterable<Integer> getPartitionIds() {
240return partitions.keySet();
241 }
242243/**244 * Get the thread id that is responsible for a particular partition245 *246 * @param partitionId id of the given partition247 * @return id of the thread responsible for the given partition248 */249publicint getOwnerThreadId(int partitionId) {
250 Integer index = partitionIndex.get(partitionId);
251 checkState(index != null);
252return index % numIOThreads;
253 }
254255/**256 * Add a partition257 *258 * @param partitionId id of a partition to add259 */260publicvoid addPartition(int partitionId) {
261MetaPartition meta = newMetaPartition(partitionId);
262MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
263// Check if the given partition is new264if (temp == null) {
265int index = indexCounter.getAndIncrement();
266 checkState(partitionIndex.putIfAbsent(partitionId, index) == null);
267int ownerThread = getOwnerThreadId(partitionId);
268 perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
269 numInMemoryPartitions.getAndIncrement();
270 }
271 }
272273/**274 * Remove a partition. This method assumes that the partition is already275 * retrieved and is in memory)276 *277 * @param partitionId id of a partition to remove278 */279publicvoid removePartition(Integer partitionId) {
280MetaPartition meta = partitions.remove(partitionId);
281int ownerThread = getOwnerThreadId(partitionId);
282 perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
283 checkState(!meta.isOnDisk());
284 numInMemoryPartitions.getAndDecrement();
285 }
286287/**288 * Pops an entry from the specified set.289 *290 * @param set set to pop an entry from291 * @param <T> Type of entries in the set292 * @return popped entry from the given set293 */294privatestatic <T> T popFromSet(Set<T> set) {
295if (!set.isEmpty()) {
296 Iterator<T> it = set.iterator();
297 T entry = it.next();
298 it.remove();
299return entry;
300 }
301returnnull;
302 }
303304/**305 * Peeks an entry from the specified set.306 *307 * @param set set to peek an entry from308 * @param <T> Type of entries in the set309 * @return peeked entry from the given set310 */311privatestatic <T> T peekFromSet(Set<T> set) {
312if (!set.isEmpty()) {
313return set.iterator().next();
314 }
315returnnull;
316 }
317318/**319 * Get id of a partition to offload to disk. Prioritize offloading processed320 * partitions over unprocessed partition. Also, prioritize offloading321 * partitions partially in memory over partitions fully in memory.322 *323 * @param threadId id of the thread who is going to store the partition on324 * disk325 * @return id of the partition to offload on disk326 */327public Integer getOffloadPartitionId(int threadId) {
328// First, look for a processed partition partially on disk329MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
330 ProcessingState.PROCESSED,
331 StorageState.IN_MEM,
332 StorageState.ON_DISK,
333null);
334if (meta != null) {
335return meta.getPartitionId();
336 }
337 meta = perThreadPartitionDictionary.get(threadId).lookup(
338 ProcessingState.PROCESSED,
339 StorageState.ON_DISK,
340 StorageState.IN_MEM,
341null);
342if (meta != null) {
343return meta.getPartitionId();
344 }
345// Second, look for a processed partition entirely in memory346 meta = perThreadPartitionDictionary.get(threadId).lookup(
347 ProcessingState.PROCESSED,
348 StorageState.IN_MEM,
349 StorageState.IN_MEM,
350null);
351if (meta != null) {
352return meta.getPartitionId();
353 }
354355// Third, look for an unprocessed partition partially on disk356 meta = perThreadPartitionDictionary.get(threadId).lookup(
357 ProcessingState.UNPROCESSED,
358 StorageState.IN_MEM,
359 StorageState.ON_DISK,
360null);
361if (meta != null) {
362return meta.getPartitionId();
363 }
364 meta = perThreadPartitionDictionary.get(threadId).lookup(
365 ProcessingState.UNPROCESSED,
366 StorageState.ON_DISK,
367 StorageState.IN_MEM,
368null);
369if (meta != null) {
370return meta.getPartitionId();
371 }
372// Forth, look for an unprocessed partition entirely in memory373 meta = perThreadPartitionDictionary.get(threadId).lookup(
374 ProcessingState.UNPROCESSED,
375 StorageState.IN_MEM,
376 StorageState.IN_MEM,
377null);
378if (meta != null) {
379return meta.getPartitionId();
380 }
381returnnull;
382 }
383384/**385 * Get id of a partition to offload its vertex/edge buffers on disk386 *387 * @param threadId id of the thread who is going to store the buffers on disk388 * @return id of the partition to offload its vertex/edge buffers on disk389 */390public Integer getOffloadPartitionBufferId(int threadId) {
391if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
392 Integer partitionId =
393 popFromSet(perThreadVertexEdgeBuffers.get(threadId));
394if (partitionId == null) {
395 DiskBackedPartitionStore<?, ?, ?> partitionStore =
396 (DiskBackedPartitionStore<?, ?, ?>) (oocEngine.getServerData()
397 .getPartitionStore());
398 perThreadVertexEdgeBuffers.get(threadId)
399 .addAll(partitionStore.getCandidateBuffersToOffload(threadId));
400 DiskBackedEdgeStore<?, ?, ?> edgeStore =
401 (DiskBackedEdgeStore<?, ?, ?>) (oocEngine.getServerData())
402 .getEdgeStore();
403 perThreadVertexEdgeBuffers.get(threadId)
404 .addAll(edgeStore.getCandidateBuffersToOffload(threadId));
405 partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId));
406 }
407return partitionId;
408 }
409returnnull;
410 }
411412/**413 * Get id of a partition to offload its incoming message buffers on disk414 *415 * @param threadId id of the thread who is going to store the buffers on disk416 * @return id of the partition to offload its message buffer on disk417 */418public Integer getOffloadMessageBufferId(int threadId) {
419if (oocEngine.getSuperstep() != BspServiceWorker.INPUT_SUPERSTEP) {
420 Integer partitionId =
421 popFromSet(perThreadMessageBuffers.get(threadId));
422if (partitionId == null) {
423 DiskBackedMessageStore<?, ?> messageStore =
424 (DiskBackedMessageStore<?, ?>) (oocEngine.getServerData()
425 .getIncomingMessageStore());
426if (messageStore != null) {
427 perThreadMessageBuffers.get(threadId)
428 .addAll(messageStore.getCandidateBuffersToOffload(threadId));
429 partitionId = popFromSet(perThreadMessageBuffers.get(threadId));
430 }
431 }
432return partitionId;
433 }
434returnnull;
435 }
436437/**438 * Get id of a partition to offload its incoming message on disk. Prioritize439 * offloading messages of partitions already on disk, and then partitions440 * in-transit, over partitions in-memory. Also, prioritize processed441 * partitions over unprocessed (processed partitions would go on disk with442 * more chances that unprocessed partitions)443 *444 * @param threadId id of the thread who is going to store the incoming445 * messages on disk446 * @return id of the partition to offload its message on disk447 */448public Integer getOffloadMessageId(int threadId) {
449if (oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
450returnnull;
451 }
452MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
453 ProcessingState.PROCESSED,
454 StorageState.ON_DISK,
455null,
456 StorageState.IN_MEM);
457if (meta != null) {
458return meta.getPartitionId();
459 }
460 meta = perThreadPartitionDictionary.get(threadId).lookup(
461 ProcessingState.PROCESSED,
462 StorageState.IN_TRANSIT,
463null,
464 StorageState.IN_MEM);
465if (meta != null) {
466return meta.getPartitionId();
467 }
468 meta = perThreadPartitionDictionary.get(threadId).lookup(
469 ProcessingState.UNPROCESSED,
470 StorageState.ON_DISK,
471null,
472 StorageState.IN_MEM);
473if (meta != null) {
474return meta.getPartitionId();
475 }
476 meta = perThreadPartitionDictionary.get(threadId).lookup(
477 ProcessingState.UNPROCESSED,
478 StorageState.IN_TRANSIT,
479null,
480 StorageState.IN_MEM);
481if (meta != null) {
482return meta.getPartitionId();
483 }
484returnnull;
485 }
486487/**488 * Get id of a partition to load its data to memory. Prioritize loading an489 * unprocessed partition over loading processed partition. Also, prioritize490 * loading a partition partially in memory over partitions entirely on disk.491 *492 * @param threadId id of the thread who is going to load the partition data493 * @return id of the partition to load its data to memory494 */495public Integer getLoadPartitionId(int threadId) {
496// First, look for an unprocessed partition partially in memory497MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
498 ProcessingState.UNPROCESSED,
499 StorageState.IN_MEM,
500 StorageState.ON_DISK,
501null);
502if (meta != null) {
503return meta.getPartitionId();
504 }
505506 meta = perThreadPartitionDictionary.get(threadId).lookup(
507 ProcessingState.UNPROCESSED,
508 StorageState.ON_DISK,
509 StorageState.IN_MEM,
510null);
511if (meta != null) {
512return meta.getPartitionId();
513 }
514515// Second, look for an unprocessed partition entirely on disk516 meta = perThreadPartitionDictionary.get(threadId).lookup(
517 ProcessingState.UNPROCESSED,
518 StorageState.ON_DISK,
519 StorageState.ON_DISK,
520null);
521if (meta != null) {
522return meta.getPartitionId();
523 }
524525// Third, look for a processed partition partially in memory526 meta = perThreadPartitionDictionary.get(threadId).lookup(
527 ProcessingState.PROCESSED,
528 StorageState.IN_MEM,
529null,
530 StorageState.ON_DISK);
531if (meta != null) {
532return meta.getPartitionId();
533 }
534535 meta = perThreadPartitionDictionary.get(threadId).lookup(
536 ProcessingState.PROCESSED,
537 StorageState.ON_DISK,
538null,
539 StorageState.IN_MEM);
540if (meta != null) {
541return meta.getPartitionId();
542 }
543544 meta = perThreadPartitionDictionary.get(threadId).lookup(
545 ProcessingState.PROCESSED,
546 StorageState.ON_DISK,
547null,
548 StorageState.ON_DISK);
549if (meta != null) {
550return meta.getPartitionId();
551 }
552553returnnull;
554 }
555556/**557 * Mark a partition as being 'IN_PROCESS'558 *559 * @param partitionId id of the partition to mark560 */561publicvoid markPartitionAsInProcess(int partitionId) {
562MetaPartition meta = partitions.get(partitionId);
563int ownerThread = getOwnerThreadId(partitionId);
564synchronized (meta) {
565 perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
566 meta.setProcessingState(ProcessingState.IN_PROCESS);
567 perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
568 }
569 }
570571/**572 * Whether there is any processed partition stored in memory (excluding those573 * that are prefetched to execute in the next superstep).574 *575 * @return true iff there is any processed partition in memory576 */577publicboolean hasProcessedOnMemory() {
578for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
579if (dictionary.hasProcessedOnMemory()) {
580returntrue;
581 }
582 }
583return false;
584 }
585586/**587 * Whether a partition is *processed* in the current iteration cycle over588 * partitions.589 *590 * @param partitionId id of the partition to check591 * @return true iff processing the given partition is done592 */593publicboolean isPartitionProcessed(Integer partitionId) {
594MetaPartition meta = partitions.get(partitionId);
595synchronized (meta) {
596return meta.getProcessingState() == ProcessingState.PROCESSED;
597 }
598 }
599600/**601 * Mark a partition as 'PROCESSED'602 *603 * @param partitionId id of the partition to mark604 */605publicvoid setPartitionIsProcessed(int partitionId) {
606MetaPartition meta = partitions.get(partitionId);
607int ownerThread = getOwnerThreadId(partitionId);
608synchronized (meta) {
609 perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
610 meta.setProcessingState(ProcessingState.PROCESSED);
611 perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
612 }
613 numPartitionsProcessed.getAndIncrement();
614 }
615616/**617 * Notify this meta store that load of a partition for a specific superstep618 * is about to start.619 *620 * @param partitionId id of the partition to load to memory621 * @param superstep superstep in which the partition is needed for622 * @return true iff load of the given partition is viable623 */624publicboolean startLoadingPartition(int partitionId, long superstep) {
625MetaPartition meta = partitions.get(partitionId);
626synchronized (meta) {
627boolean shouldLoad = meta.getPartitionState() == StorageState.ON_DISK;
628if (superstep == oocEngine.getSuperstep()) {
629 shouldLoad |= meta.getCurrentMessagesState() == StorageState.ON_DISK;
630 } else {
631 shouldLoad |= meta.getIncomingMessagesState() == StorageState.ON_DISK;
632 }
633return shouldLoad;
634 }
635 }
636637/**638 * Notify this meta store that load of a partition for a specific superstep639 * is completed640 *641 * @param partitionId id of the partition for which the load is completed642 * @param superstep superstep in which the partition is loaded for643 */644publicvoid doneLoadingPartition(int partitionId, long superstep) {
645MetaPartition meta = partitions.get(partitionId);
646int owner = getOwnerThreadId(partitionId);
647synchronized (meta) {
648PartitionStorageState stateBefore = meta.getPartitionStorageState();
649 perThreadPartitionDictionary.get(owner).removePartition(meta);
650 meta.setPartitionState(StorageState.IN_MEM);
651if (superstep == oocEngine.getSuperstep()) {
652 meta.setCurrentMessagesState(StorageState.IN_MEM);
653 } else {
654 meta.setIncomingMessagesState(StorageState.IN_MEM);
655 }
656PartitionStorageState stateAfter = meta.getPartitionStorageState();
657 updateCounters(stateBefore, stateAfter);
658// Check whether load was to prefetch a partition from disk to memory for659// the next superstep660if (meta.getProcessingState() == ProcessingState.PROCESSED) {
661 perThreadPartitionDictionary.get(owner).increaseNumPrefetch();
662 }
663 perThreadPartitionDictionary.get(owner).addPartition(meta);
664 }
665 updateGraphFractionInMemory();
666 }
667668/**669 * Notify this meta store that offload of messages for a particular partition670 * is about to start.671 *672 * @param partitionId id of the partition that its messages is being offloaded673 * @return true iff offload of messages of the given partition is viable674 */675publicboolean startOffloadingMessages(int partitionId) {
676MetaPartition meta = partitions.get(partitionId);
677int ownerThread = getOwnerThreadId(partitionId);
678synchronized (meta) {
679if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
680 perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
681 meta.setIncomingMessagesState(StorageState.IN_TRANSIT);
682 perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
683returntrue;
684 } else {
685return false;
686 }
687 }
688 }
689690/**691 * Notify this meta store that offload of messages for a particular partition692 * is complete.693 *694 * @param partitionId id of the partition that its messages is offloaded to695 * disk696 */697publicvoid doneOffloadingMessages(int partitionId) {
698MetaPartition meta = partitions.get(partitionId);
699int ownerThread = getOwnerThreadId(partitionId);
700synchronized (meta) {
701 perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
702 meta.setIncomingMessagesState(StorageState.ON_DISK);
703 perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
704 }
705 }
706707/**708 * Notify this meta store that offload of raw data buffers (vertex/edges/709 * messages) of a particular partition is about to start.710 *711 * @param partitionId id of the partition that its buffer is being offloaded712 * @return true iff offload of buffers of the given partition is viable713 */714publicboolean startOffloadingBuffer(int partitionId) {
715// Do nothing716returntrue;
717 }
718719/**720 * Notify this meta store that offload of raw data buffers (vertex/edges/721 * messages) of a particular partition is completed.722 *723 * @param partitionId id of the partition that its buffer is offloaded724 */725publicvoid doneOffloadingBuffer(int partitionId) {
726// Do nothing727 }
728729/**730 * Notify this meta store that offload of a partition (partition data and its731 * current messages) is about to start.732 *733 * @param partitionId id of the partition that its data is being offloaded734 * @return true iff offload of the given partition is viable735 */736publicboolean startOffloadingPartition(int partitionId) {
737MetaPartition meta = partitions.get(partitionId);
738int owner = getOwnerThreadId(partitionId);
739synchronized (meta) {
740if (meta.getProcessingState() != ProcessingState.IN_PROCESS &&
741 (meta.getPartitionState() == StorageState.IN_MEM ||
742 meta.getCurrentMessagesState() == StorageState.IN_MEM)) {
743 perThreadPartitionDictionary.get(owner).removePartition(meta);
744// We may only need to offload either partition or current messages of745// that partition to disk. So, if either of the components (partition746// or its current messages) is already on disk, we should not update its747// metadata.748if (meta.getPartitionState() != StorageState.ON_DISK) {
749 meta.setPartitionState(StorageState.IN_TRANSIT);
750 }
751if (meta.getCurrentMessagesState() != StorageState.ON_DISK) {
752 meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
753 }
754 perThreadPartitionDictionary.get(owner).addPartition(meta);
755returntrue;
756 } else {
757return false;
758 }
759 }
760 }
761762/**763 * Notify this meta store that offload of a partition (partition data and its764 * current messages) is completed.765 *766 * @param partitionId id of the partition that its data is offloaded767 */768publicvoid doneOffloadingPartition(int partitionId) {
769MetaPartition meta = partitions.get(partitionId);
770int owner = getOwnerThreadId(partitionId);
771synchronized (meta) {
772// We either offload both partition and its messages to disk, or we only773// offload one of the components.774if (meta.getCurrentMessagesState() == StorageState.IN_TRANSIT &&
775 meta.getPartitionState() == StorageState.IN_TRANSIT) {
776 numInMemoryPartitions.getAndDecrement();
777 } else {
778 numPartiallyInMemoryPartitions.getAndDecrement();
779 }
780 perThreadPartitionDictionary.get(owner).removePartition(meta);
781 meta.setPartitionState(StorageState.ON_DISK);
782 meta.setCurrentMessagesState(StorageState.ON_DISK);
783 perThreadPartitionDictionary.get(owner).addPartition(meta);
784 }
785 updateGraphFractionInMemory();
786 }
787788/**789 * Reset the meta store for a new iteration cycle over all partitions.790 * Note: this is not thread-safe and should be called from a single thread.791 */792publicvoid resetPartitions() {
793for (MetaPartition meta : partitions.values()) {
794int owner = getOwnerThreadId(meta.getPartitionId());
795 perThreadPartitionDictionary.get(owner).removePartition(meta);
796 meta.resetPartition();
797 perThreadPartitionDictionary.get(owner).addPartition(meta);
798 }
799for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
800 dictionary.reset();
801 }
802 numPartitionsProcessed.set(0);
803 }
804805/**806 * Reset messages in the meta store.807 * Note: this is not thread-safe and should be called from a single thread.808 */809publicvoid resetMessages() {
810for (MetaPartition meta : partitions.values()) {
811int owner = getOwnerThreadId(meta.getPartitionId());
812 perThreadPartitionDictionary.get(owner).removePartition(meta);
813PartitionStorageState stateBefore = meta.getPartitionStorageState();
814 meta.resetMessages();
815PartitionStorageState stateAfter = meta.getPartitionStorageState();
816 updateCounters(stateBefore, stateAfter);
817 perThreadPartitionDictionary.get(owner).addPartition(meta);
818 }
819 }
820821/**822 * Return the id of an unprocessed partition in memory. If all partitions are823 * processed, return an appropriate 'finisher signal'. If there are824 * unprocessed partitions, but none are in memory, return null.825 *826 * @return id of the partition to be processed next.827 */828public Integer getNextPartition() {
829if (numPartitionsProcessed.get() >= partitions.size()) {
830return NO_PARTITION_TO_PROCESS;
831 }
832int numThreads = perThreadPartitionDictionary.size();
833int index = randomGenerator.nextInt(numThreads);
834int startIndex = index;
835MetaPartition meta;
836do {
837// We first look up a partition in the reverse dictionary. If there is a838// partition with the given properties, we then check whether we can839// return it as the next partition to process. If we cannot, there may840// still be other partitions in the dictionary, so we will continue841// looping through all of them. If all the partitions with our desired842// properties has been examined, we will break the loop.843while (true) {
844 meta = perThreadPartitionDictionary.get(index).lookup(
845 ProcessingState.UNPROCESSED,
846 StorageState.IN_MEM,
847 StorageState.IN_MEM,
848null);
849if (meta != null) {
850// Here we should check if the 'meta' still has the same property as851// when it was looked up in the dictionary. There may be a case where852// meta changes from the time it is looked up until the moment the853// synchronize block is granted to progress.854synchronized (meta) {
855if (meta.getProcessingState() == ProcessingState.UNPROCESSED &&
856 meta.getPartitionState() == StorageState.IN_MEM &&
857 meta.getCurrentMessagesState() == StorageState.IN_MEM) {
858 perThreadPartitionDictionary.get(index).removePartition(meta);
859 meta.setProcessingState(ProcessingState.IN_PROCESS);
860 perThreadPartitionDictionary.get(index).addPartition(meta);
861return meta.getPartitionId();
862 }
863 }
864 } else {
865break;
866 }
867 }
868 index = (index + 1) % numThreads;
869 } while (index != startIndex);
870returnnull;
871 }
872873/**874 * Whether a partition is on disk (both its data and its current messages)875 *876 * @param partitionId id of the partition to check if it is on disk877 * @return true if partition data or its current messages are on disk, false878 * otherwise879 */880publicboolean isPartitionOnDisk(int partitionId) {
881MetaPartition meta = partitions.get(partitionId);
882synchronized (meta) {
883return meta.isOnDisk();
884 }
885 }
886887/**888 * Representation of meta information of a partition889 */890privatestaticclassMetaPartition {
891/** Id of the partition */892privateint partitionId;
893/** Storage state of incoming messages */894privateStorageState incomingMessagesState;
895/** Storage state of current messages */896privateStorageState currentMessagesState;
897/** Storage state of partition data */898privateStorageState partitionState;
899/** Processing state of a partition */900privateProcessingState processingState;
901902/**903 * Constructor904 *905 * @param partitionId id of the partition906 */907publicMetaPartition(int partitionId) {
908this.partitionId = partitionId;
909this.processingState = ProcessingState.UNPROCESSED;
910this.partitionState = StorageState.IN_MEM;
911this.currentMessagesState = StorageState.IN_MEM;
912this.incomingMessagesState = StorageState.IN_MEM;
913 }
914915 @Override
916public String toString() {
917 StringBuffer sb = new StringBuffer();
918 sb.append("\nMetaData: {");
919 sb.append("ID: " + partitionId + "; ");
920 sb.append("Partition: " + partitionState + "; ");
921 sb.append("Current Messages: " + currentMessagesState + "; ");
922 sb.append("Incoming Messages: " + incomingMessagesState + "; ");
923 sb.append("Processed? : " + processingState + "}");
924return sb.toString();
925 }
926927publicint getPartitionId() {
928return partitionId;
929 }
930931publicStorageState getIncomingMessagesState() {
932return incomingMessagesState;
933 }
934935publicvoid setIncomingMessagesState(StorageState incomingMessagesState) {
936this.incomingMessagesState = incomingMessagesState;
937 }
938939publicStorageState getCurrentMessagesState() {
940return currentMessagesState;
941 }
942943publicvoid setCurrentMessagesState(StorageState currentMessagesState) {
944this.currentMessagesState = currentMessagesState;
945 }
946947publicStorageState getPartitionState() {
948return partitionState;
949 }
950951publicvoid setPartitionState(StorageState state) {
952this.partitionState = state;
953 }
954955publicProcessingState getProcessingState() {
956return processingState;
957 }
958959publicvoid setProcessingState(ProcessingState processingState) {
960this.processingState = processingState;
961 }
962963/**964 * Whether the partition is on disk (either its data or its current965 * messages)966 *967 * @return true if the partition is on disk, false otherwise968 */969publicboolean isOnDisk() {
970return partitionState == StorageState.ON_DISK ||
971 currentMessagesState == StorageState.ON_DISK;
972 }
973974/**975 * Reset the partition meta information for the next iteration cycle976 */977publicvoid resetPartition() {
978 processingState = ProcessingState.UNPROCESSED;
979 }
980981/**982 * Reset messages meta information for the next iteration cycle983 */984publicvoid resetMessages() {
985 currentMessagesState = incomingMessagesState;
986 incomingMessagesState = StorageState.IN_MEM;
987 }
988989/**990 * @return the state of the partition and its current messages as a whole991 */992publicPartitionStorageState getPartitionStorageState() {
993if (partitionState == StorageState.ON_DISK &&
994 currentMessagesState == StorageState.ON_DISK) {
995return PartitionStorageState.FULLY_ON_DISK;
996 } elseif (partitionState == StorageState.IN_MEM &&
997 currentMessagesState == StorageState.IN_MEM) {
998return PartitionStorageState.FULLY_IN_MEM;
999 } else {
1000return PartitionStorageState.PARTIALLY_IN_MEM;
1001 }
1002 }
1003 }
10041005/**1006 * Class representing reverse dictionary for partitions. The main operation1007 * of the reverse dictionary is to lookup for a partition with certain1008 * properties. The responsibility of keeping the dictionary consistent1009 * when partition property changes in on the code that changes the property.1010 * One can simply remove a partition from the dictionary, change the property1011 * (or properties), and then add the partition to the dictionary.1012 */1013privatestaticclassMetaPartitionDictionary {
1014/**1015 * Sets of partitions for each possible combination of properties. Each1016 * partition can have 4 properties, and each property can have any of 31017 * different values. The properties are as follows (in the order in which1018 * it is used as the dimensions of the following 4-D array):1019 * - processing status (PROCESSED, UN_PROCESSED, or IN_PROCESS)1020 * - partition storage status (IN_MEM, IN_TRANSIT, ON_DISK)1021 * - current messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)1022 * - incoming messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)1023 */1024privatefinal Set<MetaPartition>[][][][] partitions =
1025 (Set<MetaPartition>[][][][]) new Set<?>[3][3][3][3];
1026/**1027 * Number of partitions that has been prefetched to be computed in the1028 * next superstep1029 */1030privatefinal AtomicInteger numPrefetch = new AtomicInteger(0);
10311032/**1033 * Constructor1034 */1035publicMetaPartitionDictionary() {
1036for (int i = 0; i < 3; ++i) {
1037for (int j = 0; j < 3; ++j) {
1038for (int k = 0; k < 3; ++k) {
1039for (int t = 0; t < 3; ++t) {
1040 partitions[i][j][k][t] = Sets.newLinkedHashSet();
1041 }
1042 }
1043 }
1044 }
1045 }
10461047/**1048 * Get a partition set associated with property combination that a given1049 * partition has1050 *1051 * @param meta meta partition containing properties of a partition1052 * @return partition set with the same property combination as the given1053 * meta partition1054 */1055private Set<MetaPartition> getSet(MetaPartition meta) {
1056return partitions[meta.getProcessingState().ordinal()]
1057 [meta.getPartitionState().ordinal()]
1058 [meta.getCurrentMessagesState().ordinal()]
1059 [meta.getIncomingMessagesState().ordinal()];
1060 }
10611062/**1063 * Add a partition to the dictionary1064 *1065 * @param meta meta information of the partition to add1066 */1067publicvoid addPartition(MetaPartition meta) {
1068 Set<MetaPartition> partitionSet = getSet(meta);
1069synchronized (partitionSet) {
1070 partitionSet.add(meta);
1071 }
1072 }
10731074/**1075 * Remove a partition to the dictionary1076 *1077 * @param meta meta infomation of the partition to remove1078 */1079publicvoid removePartition(MetaPartition meta) {
1080 Set<MetaPartition> partitionSet = getSet(meta);
1081synchronized (partitionSet) {
1082 partitionSet.remove(meta);
1083 }
1084 }
10851086/**1087 * Lookup for a partition with given properties. One can use wildcard as1088 * a property in lookup operation (by passing null as the property).1089 *1090 * @param processingState processing state property1091 * @param partitionStorageState partition storage property1092 * @param currentMessagesState current messages storage property1093 * @param incomingMessagesState incoming messages storage property1094 * @return a meta partition in the dictionary with the given combination of1095 * properties. If there is no such partition, return null1096 */1097publicMetaPartition lookup(ProcessingState processingState,
1098StorageState partitionStorageState,
1099StorageState currentMessagesState,
1100StorageState incomingMessagesState) {
1101int iStart =
1102 (processingState == null) ? 0 : processingState.ordinal();
1103int iEnd =
1104 (processingState == null) ? 3 : (processingState.ordinal() + 1);
1105int jStart =
1106 (partitionStorageState == null) ? 0 : partitionStorageState.ordinal();
1107int jEnd = (partitionStorageState == null) ? 3 :
1108 (partitionStorageState.ordinal() + 1);
1109int kStart =
1110 (currentMessagesState == null) ? 0 : currentMessagesState.ordinal();
1111int kEnd = (currentMessagesState == null) ? 3 :
1112 (currentMessagesState.ordinal() + 1);
1113int tStart =
1114 (incomingMessagesState == null) ? 0 : incomingMessagesState.ordinal();
1115int tEnd = (incomingMessagesState == null) ? 3 :
1116 (incomingMessagesState.ordinal() + 1);
1117for (int i = iStart; i < iEnd; ++i) {
1118for (int j = jStart; j < jEnd; ++j) {
1119for (int k = kStart; k < kEnd; ++k) {
1120for (int t = tStart; t < tEnd; ++t) {
1121 Set<MetaPartition> partitionSet = partitions[i][j][k][t];
1122synchronized (partitionSet) {
1123MetaPartition meta = peekFromSet(partitionSet);
1124if (meta != null) {
1125return meta;
1126 }
1127 }
1128 }
1129 }
1130 }
1131 }
1132returnnull;
1133 }
11341135/**1136 * Whether there is an in-memory partition that is processed already,1137 * excluding those partitions that are prefetched1138 *1139 * @return true if there is a processed in-memory partition1140 */1141publicboolean hasProcessedOnMemory() {
1142int count = 0;
1143for (int i = 0; i < 3; ++i) {
1144for (int j = 0; j < 3; ++j) {
1145 Set<MetaPartition> partitionSet =
1146 partitions[ProcessingState.PROCESSED.ordinal()]
1147 [StorageState.IN_MEM.ordinal()][i][j];
1148synchronized (partitionSet) {
1149 count += partitionSet.size();
1150 }
1151 }
1152 }
1153return count - numPrefetch.get() != 0;
1154 }
11551156/** Increase number of prefetch-ed partition by 1 */1157publicvoid increaseNumPrefetch() {
1158 numPrefetch.getAndIncrement();
1159 }
11601161/**1162 * Reset the dictionary preparing it for the next iteration cycle over1163 * partitions1164 */1165publicvoid reset() {
1166 numPrefetch.set(0);
1167 }
1168 }
1169 }