1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.giraph.ooc.command; 20 21 import org.apache.giraph.ooc.OutOfCoreEngine; 22 import org.apache.giraph.ooc.data.DiskBackedEdgeStore; 23 import org.apache.giraph.ooc.data.DiskBackedMessageStore; 24 import org.apache.giraph.ooc.data.DiskBackedPartitionStore; 25 26 import java.io.IOException; 27 28 /** 29 * IOCommand to store raw data buffers on disk. 30 */ 31 public class StoreDataBufferIOCommand extends IOCommand { 32 /** 33 * Types of raw data buffer to offload to disk (either vertices/edges buffer 34 * in INPUT_SUPERSTEP or incoming message buffer). 35 */ 36 public enum DataBufferType { PARTITION, MESSAGE }; 37 /** 38 * Type of the buffer to store on disk. 39 */ 40 private final DataBufferType type; 41 42 /** 43 * Constructor 44 * 45 * @param oocEngine out-of-core engine 46 * @param partitionId id of the partition to offload its buffers 47 * @param type type of the buffer to store on disk 48 */ 49 public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine, 50 int partitionId, 51 DataBufferType type) { 52 super(oocEngine, partitionId); 53 this.type = type; 54 } 55 56 @Override 57 public boolean execute() throws IOException { 58 boolean executed = false; 59 if (oocEngine.getMetaPartitionManager() 60 .startOffloadingBuffer(partitionId)) { 61 switch (type) { 62 case PARTITION: 63 DiskBackedPartitionStore partitionStore = 64 (DiskBackedPartitionStore) 65 oocEngine.getServerData().getPartitionStore(); 66 numBytesTransferred += 67 partitionStore.offloadBuffers(partitionId); 68 DiskBackedEdgeStore edgeStore = 69 (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore(); 70 numBytesTransferred += edgeStore.offloadBuffers(partitionId); 71 break; 72 case MESSAGE: 73 DiskBackedMessageStore messageStore = 74 (DiskBackedMessageStore) 75 oocEngine.getServerData().getIncomingMessageStore(); 76 numBytesTransferred += 77 messageStore.offloadBuffers(partitionId); 78 break; 79 default: 80 throw new IllegalStateException("execute: requested data buffer type " + 81 "does not exist!"); 82 } 83 oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId); 84 executed = true; 85 } 86 return executed; 87 } 88 89 @Override 90 public IOCommandType getType() { 91 return IOCommandType.STORE_BUFFER; 92 } 93 94 @Override 95 public String toString() { 96 return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " + 97 "type = " + type.name() + ")"; 98 } 99 }