1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.flow_control;
2021import com.yammer.metrics.core.Counter;
22import org.apache.giraph.comm.netty.NettyClient;
23import org.apache.giraph.comm.netty.handler.AckSignalFlag;
24import org.apache.giraph.comm.requests.WritableRequest;
25import org.apache.giraph.conf.FloatConfOption;
26import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27import org.apache.giraph.conf.IntConfOption;
28import org.apache.giraph.metrics.GiraphMetrics;
29import org.apache.giraph.metrics.MetricNames;
30import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
31import org.apache.giraph.metrics.SuperstepMetricsRegistry;
32import org.apache.log4j.Logger;
3334import java.util.concurrent.atomic.AtomicInteger;
3536importstatic org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
3738/**39 * Representation of a flow control that limits the aggregate number of open40 * requests to all other workers to a constant user-defined value41 */42publicclassStaticFlowControlimplements43 FlowControl, ResetSuperstepMetricsObserver {
44/** Maximum number of requests without confirmation we should have */45publicstaticfinalIntConfOption MAX_NUMBER_OF_OPEN_REQUESTS =
46newIntConfOption("giraph.maxNumberOfOpenRequests", 10000,
47"Maximum number of requests without confirmation we should have");
48/**49 * After pausing a thread due to too large number of open requests,50 * which fraction of these requests need to be closed before we continue51 */52publicstaticfinalFloatConfOption53 FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING =
54newFloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding",
55 0.2f, "Fraction of requests to close before proceeding");
56/** Class logger */57privatestaticfinal Logger LOG = Logger.getLogger(StaticFlowControl.class);
5859/** Maximum number of requests without confirmation we can have */60privatefinalint maxNumberOfOpenRequests;
61/**62 * Maximum number of requests that can be open after the pause in order to63 * proceed64 */65privatefinalint numberOfRequestsToProceed;
66/** Netty client used for sending requests */67privatefinalNettyClient nettyClient;
68/** Waiting interval for checking outstanding requests msecs */69privatefinalint waitingRequestMsecs;
70/** Dummy object to wait on until enough open requests get completed */71privatefinal Object requestSpotAvailable = new Object();
72/** Counter for time spent waiting on too many open requests */73private Counter timeWaitingOnOpenRequests;
74/** Number of threads waiting on too many open requests */75privatefinal AtomicInteger numWaitingThreads = new AtomicInteger(0);
7677/**78 * Constructor79 *80 * @param conf configuration81 * @param nettyClient netty client82 */83publicStaticFlowControl(ImmutableClassesGiraphConfiguration conf,
84NettyClient nettyClient) {
85this.nettyClient = nettyClient;
86 maxNumberOfOpenRequests = MAX_NUMBER_OF_OPEN_REQUESTS.get(conf);
87 numberOfRequestsToProceed = (int) (maxNumberOfOpenRequests *
88 (1 - FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING.get(conf)));
89if (LOG.isInfoEnabled()) {
90 LOG.info("StaticFlowControl: Limit number of open requests to " +
91 maxNumberOfOpenRequests + " and proceed when <= " +
92 numberOfRequestsToProceed);
93 }
94 waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
95 GiraphMetrics.get().addSuperstepResetObserver(this);
96 }
9798 @Override
99publicvoid newSuperstep(SuperstepMetricsRegistry metrics) {
100 timeWaitingOnOpenRequests = metrics.getCounter(
101 MetricNames.TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS);
102 }
103104 @Override
105publicvoid sendRequest(int destTaskId, WritableRequest request) {
106 nettyClient.doSend(destTaskId, request);
107if (nettyClient.getNumberOfOpenRequests() > maxNumberOfOpenRequests) {
108long startTime = System.currentTimeMillis();
109 waitSomeRequests();
110 timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime);
111 }
112 }
113114/**115 * Ensure that at most numberOfRequestsToProceed are not complete.116 * Periodically, check the state of every request. If we find the connection117 * failed, re-establish it and re-send the request.118 */119privatevoid waitSomeRequests() {
120 numWaitingThreads.getAndIncrement();
121while (nettyClient.getNumberOfOpenRequests() > numberOfRequestsToProceed) {
122// Wait for requests to complete for some time123synchronized (requestSpotAvailable) {
124if (nettyClient.getNumberOfOpenRequests() <=
125 numberOfRequestsToProceed) {
126break;
127 }
128try {
129 requestSpotAvailable.wait(waitingRequestMsecs);
130 } catch (InterruptedException e) {
131thrownew IllegalStateException("waitSomeRequests: Got unexpected " +
132"InterruptedException", e);
133 }
134 }
135 nettyClient.logAndSanityCheck();
136 }
137 numWaitingThreads.getAndDecrement();
138 }
139140 @Override
141publicvoid messageAckReceived(int taskId, long requestId, int response) {
142synchronized (requestSpotAvailable) {
143 requestSpotAvailable.notifyAll();
144 }
145 }
146147 @Override
148publicAckSignalFlag getAckSignalFlag(int response) {
149return AckSignalFlag.values()[response];
150 }
151152 @Override
153publicint calculateResponse(AckSignalFlag alreadyDone, int clientId) {
154return alreadyDone.ordinal();
155 }
156157 @Override
158publicvoid logInfo() {
159if (LOG.isInfoEnabled()) {
160 LOG.info("logInfo: " + numWaitingThreads.get() + " threads waiting " +
161"until number of open requests falls below " +
162 numberOfRequestsToProceed);
163 }
164 }
165166 @Override
167publicvoid waitAllRequests() {
168// This flow control policy does not keep any unsent request. All the open169// requests are in possession of the network client, so the network client170// will wait for them to complete. Thus, this flow control policy does not171// need to do anything regarding flushing the remaining requests.172 }
173174 @Override
175publicint getNumberOfUnsentRequests() {
176return 0;
177 }
178 }