1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.flow_control;
2021importstatic com.google.common.base.Preconditions.checkState;
22importstatic org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
23import com.google.common.collect.Lists;
24import com.google.common.collect.Maps;
25import com.google.common.collect.Sets;
26import org.apache.commons.lang3.tuple.ImmutablePair;
27import org.apache.commons.lang3.tuple.MutablePair;
28import org.apache.commons.lang3.tuple.Pair;
29import org.apache.giraph.comm.netty.NettyClient;
30import org.apache.giraph.comm.netty.handler.AckSignalFlag;
31import org.apache.giraph.comm.requests.SendResumeRequest;
32import org.apache.giraph.comm.requests.WritableRequest;
33import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34import org.apache.giraph.conf.IntConfOption;
35import org.apache.giraph.utils.AdjustableSemaphore;
36import org.apache.giraph.utils.ThreadUtils;
37import org.apache.log4j.Logger;
3839import java.util.ArrayDeque;
40import java.util.ArrayList;
41import java.util.Collections;
42import java.util.Comparator;
43import java.util.Deque;
44import java.util.Map;
45import java.util.Set;
46import java.util.concurrent.ArrayBlockingQueue;
47import java.util.concurrent.BlockingQueue;
48import java.util.concurrent.ConcurrentMap;
49import java.util.concurrent.Semaphore;
50import java.util.concurrent.TimeUnit;
51import java.util.concurrent.atomic.AtomicInteger;
5253/**54 * Representation of credit-based flow control policy. With this policy there55 * can be limited number of open requests from any worker x to any other worker56 * y. This number is called 'credit'. Let's denote this number by C{x->y}.57 * This implementation assumes that for a particular worker W, all values of58 * C{x->W} are the same. Let's denote this value by CR_W. CR_W may change59 * due to other reasons (e.g. memory pressure observed in an out-of-core60 * mechanism). However, CR_W is always in range [0, MAX_CR], where MAX_CR61 * is a user-defined constant. Note that MAX_CR should be representable by62 * at most 14 bits.63 *64 * In this implementation, the value of CR_W is announced to other workers along65 * with the ACK response envelope for all ACK response envelope going out of W.66 * Therefore, for non-zero values of CR_W, other workers know the instant value67 * of CR_W, hence they can control the number of open requests they have to W.68 * However, it is possible that W announces 0 as CR_W. In this case, other69 * workers stop opening more requests to W, hence they will not get any new70 * response envelope from W. This means other workers should be notified with71 * a dedicated request to resume sending more requests once CR_W becomes72 * non-zero. In this implementation, once W_CR is announced as 0 to a particular73 * worker U, we keep U in a set, so later on we can send 'resume signal' to U74 * once CR_W becomes non-zero. Sending resume signals are done through a75 * separate thread.76 */77publicclassCreditBasedFlowControlimplementsFlowControl {
78/**79 * Maximum number of requests we can have per worker without confirmation80 * (i.e. open requests)81 */82publicstaticfinalIntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER =
83newIntConfOption("giraph.maxOpenRequestsPerWorker", 20,
84"Maximum number of requests without confirmation we can have per " +
85"worker");
86/** Aggregate number of in-memory unsent requests */87publicstaticfinalIntConfOption MAX_NUM_OF_UNSENT_REQUESTS =
88newIntConfOption("giraph.maxNumberOfUnsentRequests", 2000,
89"Maximum number of unsent requests we can keep in memory");
90/**91 * Time interval to wait on unsent requests cahce until we find a spot in it92 */93publicstaticfinalIntConfOption UNSENT_CACHE_WAIT_INTERVAL =
94newIntConfOption("giraph.unsentCacheWaitInterval", 1000,
95"Time interval to wait on unsent requests cache (in milliseconds)");
96/** Class logger */97privatestaticfinal Logger LOG =
98 Logger.getLogger(CreditBasedFlowControl.class);
99100/** Waiting interval on unsent requests cache until it frees up */101privatefinalint unsentWaitMsecs;
102/** Waiting interval for checking outstanding requests msecs */103privatefinalint waitingRequestMsecs;
104/**105 * Maximum number of open requests each worker can have to this work at each106 * moment (CR_W -define above- for this worker)107 */108privatevolatileshort maxOpenRequestsPerWorker;
109/** Total number of unsent, cached requests */110privatefinal AtomicInteger aggregateUnsentRequests = new AtomicInteger(0);
111/**112 * Map of requests permits per worker. Keys in the map are worker ids and113 * values are pairs (X, Y) where:114 * X: is the semaphore to control the number of open requests for a115 * particular worker. Basically, the number of available permits on a116 * semaphore is the credit available for the worker associated with that117 * semaphore.118 * Y: is the timestamp of the latest message (resume signal or ACK response)119 * that changed the number of permits in the semaphore.120 * The idea behind keeping a timestamp is to avoid any issue that may happen121 * due to out-of-order message delivery. For example, consider this scenario:122 * an ACK response is sent to a worker announcing the credit is 0. Later on,123 * a resume signal announcing a non-zero credit is sent to the same worker.124 * Now, if the resume signal receives before the ACK message, the worker125 * would incorrectly assume credit value of 0, and would avoid sending any126 * messages, which may lead to a live-lock.127 *128 * The timestamp value is simply the request id generated by NettyClient.129 * These ids are generated in consecutive order, hence simulating the concept130 * of timestamp. However, the timestamp value should be sent along with131 * any ACK response envelope. The ACK response envelope is already very small132 * (maybe 10-20 bytes). So, the timestamp value should not add much overhead133 * to it. Instead of sending the whole long value request id (8 bytes) as the134 * timestamp, we can simply send the least significant 2 bytes of it. This is135 * a valid timestamp, as the credit value can be 0x3FFF (=16383) at most. This136 * means there will be at most 0x3FFF messages on the fly at each moment,137 * which means that the timestamp value sent by all messages in fly will fall138 * into a range of size 0x3FFF. So, it is enough to only consider timestamp139 * values twice as big as the mentioned range to be able to accurately140 * determine ordering even when values wrap around. This means we only need to141 * consider 15 least significant bits of request ids as timestamp values.142 *143 * The ACK response value contains following information (from least144 * significant to most significant):145 * - 16 bits timestamp146 * - 14 bits credit value147 * - 1 bit specifying whether one end of communication is master and hence148 * credit based flow control should be ignored149 * - 1 bit response flag150 */151privatefinal ConcurrentMap<Integer, Pair<AdjustableSemaphore, Integer>>
152 perWorkerOpenRequestMap = Maps.newConcurrentMap();
153/** Map of unsent cached requests per worker */154privatefinal ConcurrentMap<Integer, Deque<WritableRequest>>
155 perWorkerUnsentRequestMap = Maps.newConcurrentMap();
156/**157 * Set of workers that should be notified to resume sending more requests if158 * the credit becomes non-zero159 */160privatefinal Set<Integer> workersToResume = Sets.newHashSet();
161/**162 * Resume signals are not using any credit, so they should be treated163 * differently than normal requests. Resume signals should be ignored in164 * accounting for credits in credit-based flow control. The following map165 * keeps sets of request ids, for resume signals sent to other workers that166 * are still "open". The set of request ids used for resume signals for a167 * worker is important so we can determine if a received response is for a168 * resume signal or not.169 */170privatefinal ConcurrentMap<Integer, Set<Long>> resumeRequestsId =
171 Maps.newConcurrentMap();
172/**173 * Queue of the cached requests to be sent out. The queue keeps pairs of174 * (destination id, request). The thread-safe blocking queue is used here for175 * the sake of simplicity. The blocking queue should be bounded (with bounds176 * no more than user-defined max number of unsent/cached requests) to avoid177 * excessive memory footprint.178 */179privatefinal BlockingQueue<Pair<Integer, WritableRequest>> toBeSent;
180/**181 * Semaphore to control number of cached unsent requests. Maximum number of182 * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.183 */184privatefinal Semaphore unsentRequestPermit;
185/** Netty client used for sending requests */186privatefinalNettyClient nettyClient;
187188/**189 * Constructor190 * @param conf configuration191 * @param nettyClient netty client192 * @param exceptionHandler Exception handler193 */194publicCreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
195finalNettyClient nettyClient,
196 Thread.UncaughtExceptionHandler
197 exceptionHandler) {
198this.nettyClient = nettyClient;
199 maxOpenRequestsPerWorker =
200 (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
201 checkState(maxOpenRequestsPerWorker < 0x4000 &&
202 maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
203"requests should be in range (0, " + 0x4FFF + ")");
204int maxUnsentRequests = MAX_NUM_OF_UNSENT_REQUESTS.get(conf);
205 unsentRequestPermit = new Semaphore(maxUnsentRequests);
206this.toBeSent = new ArrayBlockingQueue<Pair<Integer, WritableRequest>>(
207 maxUnsentRequests);
208 unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
209 waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
210211// Thread to handle/send resume signals when necessary212 ThreadUtils.startThread(new Runnable() {
213 @Override
214publicvoid run() {
215while (true) {
216synchronized (workersToResume) {
217for (Integer workerId : workersToResume) {
218if (maxOpenRequestsPerWorker != 0) {
219 sendResumeSignal(workerId);
220 } else {
221break;
222 }
223 }
224try {
225 workersToResume.wait();
226 } catch (InterruptedException e) {
227thrownew IllegalStateException("run: caught exception " +
228"while waiting for resume-sender thread to be notified!",
229 e);
230 }
231 }
232 }
233 }
234 }, "resume-sender", exceptionHandler);
235236// Thread to handle/send cached requests237 ThreadUtils.startThread(new Runnable() {
238 @Override
239publicvoid run() {
240while (true) {
241 Pair<Integer, WritableRequest> pair = null;
242try {
243 pair = toBeSent.take();
244 } catch (InterruptedException e) {
245thrownew IllegalStateException("run: failed while waiting to " +
246"take an element from the request queue!", e);
247 }
248int taskId = pair.getLeft();
249WritableRequest request = pair.getRight();
250 nettyClient.doSend(taskId, request);
251if (aggregateUnsentRequests.decrementAndGet() == 0) {
252synchronized (aggregateUnsentRequests) {
253 aggregateUnsentRequests.notifyAll();
254 }
255 }
256 }
257 }
258 }, "cached-req-sender", exceptionHandler);
259 }
260261/**262 * Send resume signal request to a given worker263 *264 * @param workerId id of the worker to send the resume signal to265 */266privatevoid sendResumeSignal(int workerId) {
267if (maxOpenRequestsPerWorker == 0) {
268 LOG.warn("sendResumeSignal: method called while the max open requests " +
269"for worker " + workerId + " is still 0");
270return;
271 }
272WritableRequest request = newSendResumeRequest(maxOpenRequestsPerWorker);
273 Long resumeId = nettyClient.doSend(workerId, request);
274 checkState(resumeId != null);
275if (LOG.isDebugEnabled()) {
276 LOG.debug("sendResumeSignal: sending signal to worker " + workerId +
277" with credit=" + maxOpenRequestsPerWorker + ", ID=" +
278 (resumeId & 0xFFFF));
279 }
280 resumeRequestsId.get(workerId).add(resumeId);
281 }
282283 @Override
284publicvoid sendRequest(int destTaskId, WritableRequest request) {
285 Pair<AdjustableSemaphore, Integer> pair =
286 perWorkerOpenRequestMap.get(destTaskId);
287// Check if this is the first time sending a request to a worker. If so, we288// should the worker id to necessary bookkeeping data structure.289if (pair == null) {
290 pair = new MutablePair<>(
291newAdjustableSemaphore(maxOpenRequestsPerWorker), -1);
292 Pair<AdjustableSemaphore, Integer> temp =
293 perWorkerOpenRequestMap.putIfAbsent(destTaskId, pair);
294 perWorkerUnsentRequestMap.putIfAbsent(
295 destTaskId, new ArrayDeque<WritableRequest>());
296 resumeRequestsId.putIfAbsent(
297 destTaskId, Sets.<Long>newConcurrentHashSet());
298if (temp != null) {
299 pair = temp;
300 }
301 }
302AdjustableSemaphore openRequestPermit = pair.getLeft();
303// Try to reserve a spot for the request amongst the open requests of304// the destination worker.305boolean shouldSend = openRequestPermit.tryAcquire();
306boolean shouldCache = false;
307while (!shouldSend) {
308// We should not send the request, and should cache the request instead.309// It may be possible that the unsent message cache is also full, so we310// should try to acquire a space on the cache, and if there is no extra311// space in unsent request cache, we should wait until some space312// become available. However, it is possible that during the time we are313// waiting on the unsent messages cache, actual buffer for open requests314// frees up space.315try {
316 shouldCache = unsentRequestPermit.tryAcquire(unsentWaitMsecs,
317 TimeUnit.MILLISECONDS);
318 } catch (InterruptedException e) {
319thrownew IllegalStateException("shouldSend: failed " +
320"while waiting on the unsent request cache to have some more " +
321"room for extra unsent requests!");
322 }
323if (shouldCache) {
324break;
325 }
326// We may have an open spot in the meantime that we were waiting on the327// unsent requests.328 shouldSend = openRequestPermit.tryAcquire();
329if (shouldSend) {
330break;
331 }
332// The current thread will be at this point only if it could not make333// space amongst open requests for the destination worker and has been334// timed-out in trying to acquire a space amongst unsent messages. So,335// we should report logs, report progress, and check for request336// failures.337 nettyClient.logAndSanityCheck();
338 }
339// Either shouldSend == true or shouldCache == true340if (shouldCache) {
341 Deque<WritableRequest> unsentRequests =
342 perWorkerUnsentRequestMap.get(destTaskId);
343// This synchronize block is necessary for the following reason:344// Once we are at this point, it means there was no room for this345// request to become an open request, hence we have to put it into346// unsent cache. Consider the case that since last time we checked if347// there is any room for an additional open request so far, all open348// requests are delivered and their acknowledgements are also processed.349// Now, if we put this request in the unsent cache, it is not being350// considered to become an open request, as the only one who checks351// on this matter would be the one who receives an acknowledgment for an352// open request for the destination worker. So, a lock is necessary353// to forcefully serialize the execution if this scenario is about to354// happen.355synchronized (unsentRequests) {
356 shouldSend = openRequestPermit.tryAcquire();
357if (!shouldSend) {
358 aggregateUnsentRequests.getAndIncrement();
359 unsentRequests.add(request);
360return;
361 }
362 }
363// We found a spot amongst open requests to send this request. So, this364// request won't be cached anymore.365 unsentRequestPermit.release();
366 }
367 nettyClient.doSend(destTaskId, request);
368 }
369370/**371 * Whether response specifies that credit should be ignored372 *373 * @param response response received374 * @return true iff credit should be ignored, false otherwise375 */376privateboolean shouldIgnoreCredit(int response) {
377return ((short) ((response >> (14 + 16)) & 1)) == 1;
378 }
379380/**381 * Get the credit from a response382 *383 * @param response response received384 * @return credit from the received response385 */386privateshort getCredit(int response) {
387return (short) ((response >> 16) & 0x3FFF);
388 }
389390/**391 * Get the timestamp from a response392 *393 * @param response response received394 * @return timestamp from the received response395 */396privateint getTimestamp(int response) {
397return response & 0xFFFF;
398 }
399400/**401 * Get the response flag from a response402 *403 * @param response response received404 * @return AckSignalFlag coming with the response405 */406 @Override
407publicAckSignalFlag getAckSignalFlag(int response) {
408return AckSignalFlag.values()[(response >> (16 + 14 + 1)) & 1];
409 }
410411 @Override
412publicint calculateResponse(AckSignalFlag flag, int taskId) {
413boolean ignoreCredit = nettyClient.masterInvolved(taskId);
414if (!ignoreCredit && maxOpenRequestsPerWorker == 0) {
415synchronized (workersToResume) {
416 workersToResume.add(taskId);
417 }
418 }
419int timestamp = (int) (nettyClient.getNextRequestId(taskId) & 0xFFFF);
420return (flag.ordinal() << (16 + 14 + 1)) |
421 ((ignoreCredit ? 1 : 0) << (16 + 14)) |
422 (maxOpenRequestsPerWorker << 16) |
423 timestamp;
424 }
425426 @Override
427publicvoid logInfo() {
428if (LOG.isInfoEnabled()) {
429// Count how many unsent requests each task has430 Map<Integer, Integer> unsentRequestCounts = Maps.newHashMap();
431for (Map.Entry<Integer, Deque<WritableRequest>> entry :
432 perWorkerUnsentRequestMap.entrySet()) {
433 unsentRequestCounts.put(entry.getKey(), entry.getValue().size());
434 }
435 ArrayList<Map.Entry<Integer, Integer>> sorted =
436 Lists.newArrayList(unsentRequestCounts.entrySet());
437 Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
438 @Override
439publicint compare(Map.Entry<Integer, Integer> entry1,
440 Map.Entry<Integer, Integer> entry2) {
441int value1 = entry1.getValue();
442int value2 = entry2.getValue();
443return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
444 }
445 });
446 StringBuilder message = new StringBuilder();
447 message.append("logInfo: ").append(aggregateUnsentRequests.get())
448 .append(" unsent requests in total. ");
449int itemsToPrint = Math.min(10, sorted.size());
450for (int i = 0; i < itemsToPrint; ++i) {
451 message.append(sorted.get(i).getValue())
452 .append(" unsent requests for taskId=")
453 .append(sorted.get(i).getKey()).append(" (credit=")
454 .append(perWorkerOpenRequestMap.get(sorted.get(i).getKey())
455 .getKey().getMaxPermits())
456 .append("), ");
457 }
458 LOG.info(message);
459 }
460 }
461462 @Override
463publicvoid waitAllRequests() {
464while (true) {
465synchronized (aggregateUnsentRequests) {
466if (aggregateUnsentRequests.get() == 0) {
467break;
468 }
469try {
470 aggregateUnsentRequests.wait(waitingRequestMsecs);
471 } catch (InterruptedException e) {
472thrownew IllegalStateException("waitAllRequests: failed while " +
473"waiting on open/cached requests");
474 }
475 }
476if (aggregateUnsentRequests.get() == 0) {
477break;
478 }
479 nettyClient.logAndSanityCheck();
480 }
481 }
482483 @Override
484publicint getNumberOfUnsentRequests() {
485return aggregateUnsentRequests.get();
486 }
487488 @Override
489publicvoid messageAckReceived(int taskId, long requestId, int response) {
490boolean ignoreCredit = shouldIgnoreCredit(response);
491short credit = getCredit(response);
492int timestamp = getTimestamp(response);
493 MutablePair<AdjustableSemaphore, Integer> pair =
494 (MutablePair<AdjustableSemaphore, Integer>)
495 perWorkerOpenRequestMap.get(taskId);
496AdjustableSemaphore openRequestPermit = pair.getLeft();
497// Release a permit on open requests if we received ACK of a request other498// than a Resume request (resume requests are always sent regardless of499// number of open requests)500if (!resumeRequestsId.get(taskId).remove(requestId)) {
501 openRequestPermit.release();
502 } elseif (LOG.isDebugEnabled()) {
503 LOG.debug("messageAckReceived: ACK of resume received from " + taskId +
504" timestamp=" + timestamp);
505 }
506if (!ignoreCredit) {
507synchronized (pair) {
508if (compareTimestamps(timestamp, pair.getRight()) > 0) {
509 pair.setRight(timestamp);
510 openRequestPermit.setMaxPermits(credit);
511 } elseif (LOG.isDebugEnabled()) {
512 LOG.debug("messageAckReceived: received out-of-order messages." +
513"Received timestamp=" + timestamp + " and current timestamp=" +
514 pair.getRight());
515 }
516 }
517 }
518// Since we received a response and we changed the credit of the sender519// client, we may be able to send some more requests to the sender520// client. So, we try to send as much request as we can to the sender521// client.522 trySendCachedRequests(taskId);
523 }
524525/**526 * Try to send as much as cached requests to a given worker527 *528 * @param taskId id of the worker to send cached requests to529 */530privatevoid trySendCachedRequests(int taskId) {
531 Deque<WritableRequest> requestDeque =
532 perWorkerUnsentRequestMap.get(taskId);
533AdjustableSemaphore openRequestPermit =
534 perWorkerOpenRequestMap.get(taskId).getLeft();
535while (true) {
536WritableRequest request;
537synchronized (requestDeque) {
538 request = requestDeque.pollFirst();
539if (request == null) {
540break;
541 }
542// See whether the sender client has any unused credit543if (!openRequestPermit.tryAcquire()) {
544 requestDeque.offerFirst(request);
545break;
546 }
547 }
548 unsentRequestPermit.release();
549// At this point, we have a request, and we reserved a credit for the550// sender client. So, we put the request in a queue to be sent to the551// client.552try {
553 toBeSent.put(
554new ImmutablePair<Integer, WritableRequest>(taskId, request));
555 } catch (InterruptedException e) {
556thrownew IllegalStateException("trySendCachedRequests: failed while" +
557"waiting to put element in send queue!", e);
558 }
559 }
560 }
561562/**563 * Update the max credit that is announced to other workers564 *565 * @param newCredit new credit566 */567publicvoid updateCredit(short newCredit) {
568 newCredit = (short) Math.max(0, Math.min(0x3FFF, newCredit));
569// Check whether we should send resume signals to some workers570if (maxOpenRequestsPerWorker == 0 && newCredit != 0) {
571 maxOpenRequestsPerWorker = newCredit;
572synchronized (workersToResume) {
573 workersToResume.notifyAll();
574 }
575 } else {
576 maxOpenRequestsPerWorker = newCredit;
577 }
578 }
579580/**581 * Compare two timestamps accounting for wrap around. Note that the timestamp582 * values should be in a range that fits into 14 bits values. This means if583 * the difference of the two given timestamp is large, we are dealing with one584 * value being wrapped around.585 *586 * @param ts1 first timestamp587 * @param ts2 second timestamp588 * @return positive value if first timestamp is later than second timestamp,589 * negative otherwise590 */591privateint compareTimestamps(int ts1, int ts2) {
592int diff = ts1 - ts2;
593if (Math.abs(diff) < 0x7FFF) {
594return diff;
595 } else {
596return -diff;
597 }
598 }
599600/**601 * Process a resume signal came from a given worker602 *603 * @param clientId id of the worker that sent the signal604 * @param credit the credit value sent along with the resume signal605 * @param requestId timestamp (request id) of the resume signal606 */607publicvoid processResumeSignal(int clientId, short credit, long requestId) {
608int timestamp = (int) (requestId & 0xFFFF);
609if (LOG.isDebugEnabled()) {
610 LOG.debug("processResumeSignal: resume signal from " + clientId +
611" with timestamp=" + timestamp);
612 }
613 MutablePair<AdjustableSemaphore, Integer> pair =
614 (MutablePair<AdjustableSemaphore, Integer>)
615 perWorkerOpenRequestMap.get(clientId);
616synchronized (pair) {
617if (compareTimestamps(timestamp, pair.getRight()) > 0) {
618 pair.setRight(timestamp);
619 pair.getLeft().setMaxPermits(credit);
620 } elseif (LOG.isDebugEnabled()) {
621 LOG.debug("processResumeSignal: received out-of-order messages. " +
622"Received timestamp=" + timestamp + " and current timestamp=" +
623 pair.getRight());
624 }
625 }
626 trySendCachedRequests(clientId);
627 }
628 }