1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.graph;
2021import java.io.DataInput;
22import java.io.DataOutput;
23import java.io.IOException;
2425import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
26import org.apache.giraph.partition.PartitionStats;
27import org.apache.hadoop.io.Writable;
2829/**30 * Aggregated stats by the master.31 */32publicclassGlobalStatsimplements Writable {
33/** All vertices in the application */34privatelong vertexCount = 0;
35/** All finished vertices in the last superstep */36privatelong finishedVertexCount = 0;
37/** All edges in the last superstep */38privatelong edgeCount = 0;
39/** All messages sent in the last superstep */40privatelong messageCount = 0;
41/** All message bytes sent in the last superstep */42privatelong messageBytesCount = 0;
43/** Whether the computation should be halted */44privateboolean haltComputation = false;
45/** Bytes of data stored to disk in the last superstep */46privatelong oocStoreBytesCount = 0;
47/** Bytes of data loaded to disk in the last superstep */48privatelong oocLoadBytesCount = 0;
49/** Lowest percentage of graph in memory throughout the execution */50privateint lowestGraphPercentageInMemory = 100;
51/**52 * Master's decision on whether we should checkpoint and53 * what to do next.54 */55privateCheckpointStatus checkpointStatus =
56 CheckpointStatus.NONE;
5758/**59 * Add the stats of a partition to the global stats.60 *61 * @param partitionStats Partition stats to be added.62 */63publicvoid addPartitionStats(PartitionStats partitionStats) {
64this.vertexCount += partitionStats.getVertexCount();
65this.finishedVertexCount += partitionStats.getFinishedVertexCount();
66this.edgeCount += partitionStats.getEdgeCount();
67 }
6869publiclong getVertexCount() {
70return vertexCount;
71 }
7273publiclong getFinishedVertexCount() {
74return finishedVertexCount;
75 }
7677publiclong getEdgeCount() {
78return edgeCount;
79 }
8081publiclong getMessageCount() {
82return messageCount;
83 }
8485publiclong getMessageBytesCount() {
86return messageBytesCount;
87 }
8889publicboolean getHaltComputation() {
90return haltComputation;
91 }
9293publicvoid setHaltComputation(boolean value) {
94 haltComputation = value;
95 }
9697publiclong getOocStoreBytesCount() {
98return oocStoreBytesCount;
99 }
100101publiclong getOocLoadBytesCount() {
102return oocLoadBytesCount;
103 }
104105publicCheckpointStatus getCheckpointStatus() {
106return checkpointStatus;
107 }
108109publicvoid setCheckpointStatus(CheckpointStatus checkpointStatus) {
110this.checkpointStatus = checkpointStatus;
111 }
112113publicint getLowestGraphPercentageInMemory() {
114return lowestGraphPercentageInMemory;
115 }
116117publicvoid setLowestGraphPercentageInMemory(
118int lowestGraphPercentageInMemory) {
119this.lowestGraphPercentageInMemory = lowestGraphPercentageInMemory;
120 }
121122/**123 * Add bytes loaded to the global stats.124 *125 * @param oocLoadBytesCount number of bytes to be added126 */127publicvoid addOocLoadBytesCount(long oocLoadBytesCount) {
128this.oocLoadBytesCount += oocLoadBytesCount;
129 }
130131/**132 * Add bytes stored to the global stats.133 *134 * @param oocStoreBytesCount number of bytes to be added135 */136publicvoid addOocStoreBytesCount(long oocStoreBytesCount) {
137this.oocStoreBytesCount += oocStoreBytesCount;
138 }
139140/**141 * Add messages to the global stats.142 *143 * @param messageCount Number of messages to be added.144 */145publicvoid addMessageCount(long messageCount) {
146this.messageCount += messageCount;
147 }
148149/**150 * Add messages to the global stats.151 *152 * @param msgBytesCount Number of message bytes to be added.153 */154publicvoid addMessageBytesCount(long msgBytesCount) {
155this.messageBytesCount += msgBytesCount;
156 }
157158 @Override
159publicvoid readFields(DataInput input) throws IOException {
160 vertexCount = input.readLong();
161 finishedVertexCount = input.readLong();
162 edgeCount = input.readLong();
163 messageCount = input.readLong();
164 messageBytesCount = input.readLong();
165 oocLoadBytesCount = input.readLong();
166 oocStoreBytesCount = input.readLong();
167 lowestGraphPercentageInMemory = input.readInt();
168 haltComputation = input.readBoolean();
169if (input.readBoolean()) {
170 checkpointStatus = CheckpointStatus.values()[input.readInt()];
171 } else {
172 checkpointStatus = null;
173 }
174 }
175176 @Override
177publicvoid write(DataOutput output) throws IOException {
178 output.writeLong(vertexCount);
179 output.writeLong(finishedVertexCount);
180 output.writeLong(edgeCount);
181 output.writeLong(messageCount);
182 output.writeLong(messageBytesCount);
183 output.writeLong(oocLoadBytesCount);
184 output.writeLong(oocStoreBytesCount);
185 output.writeInt(lowestGraphPercentageInMemory);
186 output.writeBoolean(haltComputation);
187 output.writeBoolean(checkpointStatus != null);
188if (checkpointStatus != null) {
189 output.writeInt(checkpointStatus.ordinal());
190 }
191 }
192193 @Override
194public String toString() {
195return"(vtx=" + vertexCount + ",finVtx=" +
196 finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" +
197 messageCount + ",msgBytesCount=" +
198 messageBytesCount + ",haltComputation=" + haltComputation +
199", checkpointStatus=" + checkpointStatus + ')';
200 }
201 }