This project has retired. For details please refer to its Attic page.
TaskIdsPermitsBarrier xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import org.apache.hadoop.util.Progressable;
22  import org.apache.log4j.Logger;
23  
24  import com.google.common.collect.Sets;
25  
26  import java.util.HashSet;
27  import java.util.Set;
28  
29  /**
30   * This barrier is used when we don't know how many events are we waiting on
31   * from the start. Instead we have a set of task ids, and each of those will,
32   * at some point of time, give the information about how many events from it
33   * should we expect. Barrier will be waiting for all the tasks to notify it
34   * about that number of events, and than it will also wait for all the events
35   * to happen.
36   *
37   * requirePermits() corresponds to task notifying us how many events from it
38   * to expect, and releasePermits() notifies us about events happening.
39   *
40   * This class is currently used during preparation of aggregators.
41   *
42   * User must follow this protocol for concurrent access:
43   *
44   * (1) an object instance is constructed
45   * (2) arbitrarily many times
46   *     (2a) concurrent calls to requirePermits(), releasePermits() and
47   *          waitForRequiredPermits() are issued
48   *     (2b) waitForRequiredPermits() returns
49   *
50   * Note that the next cycle of calls to requirePermits() or releasePermits()
51   * cannot start until the previous call to waitForRequiredPermits()
52   * has returned.
53   *
54   * Methods of this class are thread-safe.
55   */
56  public class TaskIdsPermitsBarrier {
57    /** Class logger */
58    private static final Logger LOG =
59        Logger.getLogger(TaskIdsPermitsBarrier.class);
60    /** Msecs to refresh the progress meter */
61    private static final int MSEC_PERIOD = 10000;
62    /** Maximum number of task ids to list in the log */
63    private static final int MAX_TASK_IDS_TO_LOG = 10;
64    /** Progressable for reporting progress */
65    private final Progressable progressable;
66    /** Number of permits we are currently waiting for */
67    private long waitingOnPermits = 0;
68    /** Set of task ids which required permits already */
69    private final Set<Integer> arrivedTaskIds = new HashSet<Integer>();
70    /** Logger */
71    private final TimedLogger logger;
72  
73    /**
74     * Constructor
75     *
76     * @param progressable Progressable for reporting progress
77     */
78    public TaskIdsPermitsBarrier(Progressable progressable) {
79      this.progressable = progressable;
80      logger = new TimedLogger(MSEC_PERIOD, LOG);
81    }
82  
83    /**
84     * Wait until permits have been required desired number of times,
85     * and all required permits are available
86     *
87     * @param expectedTaskIds List of task ids which we are waiting permits from
88     */
89    public synchronized void waitForRequiredPermits(
90        Set<Integer> expectedTaskIds) {
91      while (arrivedTaskIds.size() < expectedTaskIds.size() ||
92          waitingOnPermits > 0) {
93        try {
94          wait(MSEC_PERIOD);
95        } catch (InterruptedException e) {
96          throw new IllegalStateException("waitForRequiredPermits: " +
97              "InterruptedException occurred");
98        }
99        progressable.progress();
100       if (LOG.isInfoEnabled()) {
101         if (arrivedTaskIds.size() < expectedTaskIds.size()) {
102           String logSuffix = "";
103           if (expectedTaskIds.size() - arrivedTaskIds.size() <=
104               MAX_TASK_IDS_TO_LOG) {
105             Sets.SetView<Integer> difference =
106                 Sets.difference(expectedTaskIds, arrivedTaskIds);
107             logSuffix = ", task ids: " + difference;
108           }
109           logger.info("waitForRequiredPermits: " +
110               "Waiting for " +
111               (expectedTaskIds.size() - arrivedTaskIds.size()) +
112               " more tasks to send their aggregator data" +
113               logSuffix);
114         } else {
115           logger.info("waitForRequiredPermits: " +
116               "Waiting for " + waitingOnPermits + " more aggregator requests");
117         }
118       }
119     }
120 
121     // Reset for the next time to use
122     arrivedTaskIds.clear();
123     waitingOnPermits = 0;
124   }
125 
126   /**
127    * Require more permits. This will increase the number of times permits
128    * were required. Doesn't wait for permits to become available.
129    *
130    * @param permits Number of permits to require
131    * @param taskId Task id which required permits
132    */
133   public synchronized void requirePermits(long permits, int taskId) {
134     arrivedTaskIds.add(taskId);
135     waitingOnPermits += permits;
136     notifyAll();
137   }
138 
139   /**
140    * Release one permit.
141    */
142   public synchronized void releaseOnePermit() {
143     releasePermits(1);
144   }
145 
146   /**
147    * Release some permits.
148    *
149    * @param permits Number of permits to release
150    */
151   public synchronized void releasePermits(long permits) {
152     waitingOnPermits -= permits;
153     notifyAll();
154   }
155 }