1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.giraph.utils; 20 21 import org.apache.hadoop.util.Progressable; 22 import org.apache.log4j.Logger; 23 24 import com.google.common.collect.Sets; 25 26 import java.util.HashSet; 27 import java.util.Set; 28 29 /** 30 * This barrier is used when we don't know how many events are we waiting on 31 * from the start. Instead we have a set of task ids, and each of those will, 32 * at some point of time, give the information about how many events from it 33 * should we expect. Barrier will be waiting for all the tasks to notify it 34 * about that number of events, and than it will also wait for all the events 35 * to happen. 36 * 37 * requirePermits() corresponds to task notifying us how many events from it 38 * to expect, and releasePermits() notifies us about events happening. 39 * 40 * This class is currently used during preparation of aggregators. 41 * 42 * User must follow this protocol for concurrent access: 43 * 44 * (1) an object instance is constructed 45 * (2) arbitrarily many times 46 * (2a) concurrent calls to requirePermits(), releasePermits() and 47 * waitForRequiredPermits() are issued 48 * (2b) waitForRequiredPermits() returns 49 * 50 * Note that the next cycle of calls to requirePermits() or releasePermits() 51 * cannot start until the previous call to waitForRequiredPermits() 52 * has returned. 53 * 54 * Methods of this class are thread-safe. 55 */ 56 public class TaskIdsPermitsBarrier { 57 /** Class logger */ 58 private static final Logger LOG = 59 Logger.getLogger(TaskIdsPermitsBarrier.class); 60 /** Msecs to refresh the progress meter */ 61 private static final int MSEC_PERIOD = 10000; 62 /** Maximum number of task ids to list in the log */ 63 private static final int MAX_TASK_IDS_TO_LOG = 10; 64 /** Progressable for reporting progress */ 65 private final Progressable progressable; 66 /** Number of permits we are currently waiting for */ 67 private long waitingOnPermits = 0; 68 /** Set of task ids which required permits already */ 69 private final Set<Integer> arrivedTaskIds = new HashSet<Integer>(); 70 /** Logger */ 71 private final TimedLogger logger; 72 73 /** 74 * Constructor 75 * 76 * @param progressable Progressable for reporting progress 77 */ 78 public TaskIdsPermitsBarrier(Progressable progressable) { 79 this.progressable = progressable; 80 logger = new TimedLogger(MSEC_PERIOD, LOG); 81 } 82 83 /** 84 * Wait until permits have been required desired number of times, 85 * and all required permits are available 86 * 87 * @param expectedTaskIds List of task ids which we are waiting permits from 88 */ 89 public synchronized void waitForRequiredPermits( 90 Set<Integer> expectedTaskIds) { 91 while (arrivedTaskIds.size() < expectedTaskIds.size() || 92 waitingOnPermits > 0) { 93 try { 94 wait(MSEC_PERIOD); 95 } catch (InterruptedException e) { 96 throw new IllegalStateException("waitForRequiredPermits: " + 97 "InterruptedException occurred"); 98 } 99 progressable.progress(); 100 if (LOG.isInfoEnabled()) { 101 if (arrivedTaskIds.size() < expectedTaskIds.size()) { 102 String logSuffix = ""; 103 if (expectedTaskIds.size() - arrivedTaskIds.size() <= 104 MAX_TASK_IDS_TO_LOG) { 105 Sets.SetView<Integer> difference = 106 Sets.difference(expectedTaskIds, arrivedTaskIds); 107 logSuffix = ", task ids: " + difference; 108 } 109 logger.info("waitForRequiredPermits: " + 110 "Waiting for " + 111 (expectedTaskIds.size() - arrivedTaskIds.size()) + 112 " more tasks to send their aggregator data" + 113 logSuffix); 114 } else { 115 logger.info("waitForRequiredPermits: " + 116 "Waiting for " + waitingOnPermits + " more aggregator requests"); 117 } 118 } 119 } 120 121 // Reset for the next time to use 122 arrivedTaskIds.clear(); 123 waitingOnPermits = 0; 124 } 125 126 /** 127 * Require more permits. This will increase the number of times permits 128 * were required. Doesn't wait for permits to become available. 129 * 130 * @param permits Number of permits to require 131 * @param taskId Task id which required permits 132 */ 133 public synchronized void requirePermits(long permits, int taskId) { 134 arrivedTaskIds.add(taskId); 135 waitingOnPermits += permits; 136 notifyAll(); 137 } 138 139 /** 140 * Release one permit. 141 */ 142 public synchronized void releaseOnePermit() { 143 releasePermits(1); 144 } 145 146 /** 147 * Release some permits. 148 * 149 * @param permits Number of permits to release 150 */ 151 public synchronized void releasePermits(long permits) { 152 waitingOnPermits -= permits; 153 notifyAll(); 154 } 155 }