1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.ooc.command;
2021import com.google.common.base.Preconditions;
22import org.apache.giraph.bsp.BspService;
23import org.apache.giraph.comm.messages.MessageStore;
24import org.apache.giraph.ooc.OutOfCoreEngine;
25import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
26import org.apache.giraph.ooc.data.DiskBackedMessageStore;
27import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
2829import java.io.IOException;
3031/**32 * IOCommand to load partition data, edge data (if in INPUT_SUPERSTEP), and33 * message data (if in compute supersteps). Also, this command can be used to34 * prefetch a partition to be processed in the next superstep.35 */36publicclassLoadPartitionIOCommandextendsIOCommand {
37/**38 * Which superstep this partition should be loaded for? (can be current39 * superstep or next superstep -- in case of prefetching).40 */41privatefinallong superstep;
4243/**44 * Constructor45 *46 * @param oocEngine out-of-core engine47 * @param partitionId id of the partition to be loaded48 * @param superstep superstep to load the partition for49 */50publicLoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId,
51long superstep) {
52super(oocEngine, partitionId);
53this.superstep = superstep;
54 }
5556 @Override
57publicboolean execute() throws IOException {
58boolean executed = false;
59if (oocEngine.getMetaPartitionManager()
60 .startLoadingPartition(partitionId, superstep)) {
61long currentSuperstep = oocEngine.getSuperstep();
62DiskBackedPartitionStore partitionStore =
63 (DiskBackedPartitionStore)
64 oocEngine.getServerData().getPartitionStore();
65 numBytesTransferred +=
66 partitionStore.loadPartitionData(partitionId);
67if (currentSuperstep == BspService.INPUT_SUPERSTEP &&
68 superstep == currentSuperstep) {
69DiskBackedEdgeStore edgeStore =
70 (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
71 numBytesTransferred +=
72 edgeStore.loadPartitionData(partitionId);
73 }
74MessageStore messageStore;
75if (currentSuperstep == superstep) {
76 messageStore = oocEngine.getServerData().getCurrentMessageStore();
77 } else {
78 Preconditions.checkState(superstep == currentSuperstep + 1);
79 messageStore = oocEngine.getServerData().getIncomingMessageStore();
80 }
81if (messageStore != null) {
82 numBytesTransferred += ((DiskBackedMessageStore) messageStore)
83 .loadPartitionData(partitionId);
84 }
85 oocEngine.getMetaPartitionManager()
86 .doneLoadingPartition(partitionId, superstep);
87 executed = true;
88 }
89return executed;
90 }
9192 @Override
93publicIOCommandType getType() {
94return IOCommandType.LOAD_PARTITION;
95 }
9697 @Override
98public String toString() {
99return"LoadPartitionIOCommand: (partitionId = " + partitionId + ", " +
100"superstep = " + superstep + ")";
101 }
102 }