This project has retired. For details please refer to its
Attic page.
PredicateLock xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.zk;
20
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.Lock;
24 import java.util.concurrent.locks.ReentrantLock;
25
26 import org.apache.giraph.time.SystemTime;
27 import org.apache.giraph.time.Time;
28 import org.apache.hadoop.util.Progressable;
29 import org.apache.log4j.Logger;
30
31
32
33
34
35 public class PredicateLock implements BspEvent {
36
37 private static final Logger LOG = Logger.getLogger(PredicateLock.class);
38
39 private static final int DEFAULT_MSEC_PERIOD = 10000;
40
41 protected final Progressable progressable;
42
43 private final int msecPeriod;
44
45 private Lock lock = new ReentrantLock();
46
47 private Condition cond = lock.newCondition();
48
49 private boolean eventOccurred = false;
50
51 private final Time time;
52
53
54
55
56
57
58 public PredicateLock(Progressable progressable) {
59 this(progressable, DEFAULT_MSEC_PERIOD, SystemTime.get());
60 }
61
62
63
64
65
66
67
68
69 public PredicateLock(Progressable progressable, int msecPeriod, Time time) {
70 this.progressable = progressable;
71 this.msecPeriod = msecPeriod;
72 this.time = time;
73 }
74
75 @Override
76 public void reset() {
77 lock.lock();
78 try {
79 eventOccurred = false;
80 } finally {
81 lock.unlock();
82 }
83 }
84
85 @Override
86 public void signal() {
87 lock.lock();
88 try {
89 eventOccurred = true;
90 cond.signalAll();
91 } finally {
92 lock.unlock();
93 }
94 }
95
96 @Override
97 public boolean waitMsecs(int msecs) {
98 if (msecs < 0) {
99 throw new RuntimeException("waitMsecs: msecs cannot be negative!");
100 }
101 long maxMsecs = time.getMilliseconds() + msecs;
102 int curMsecTimeout = 0;
103 lock.lock();
104 try {
105 while (!eventOccurred) {
106 curMsecTimeout =
107 Math.min(msecs, msecPeriod);
108 if (LOG.isDebugEnabled()) {
109 LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
110 }
111 try {
112 boolean signaled =
113 cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
114 if (LOG.isDebugEnabled()) {
115 LOG.debug("waitMsecs: Got timed signaled of " +
116 signaled);
117 }
118 } catch (InterruptedException e) {
119 throw new IllegalStateException(
120 "waitMsecs: Caught interrupted " +
121 "exception on cond.await() " +
122 curMsecTimeout, e);
123 }
124 if (time.getMilliseconds() > maxMsecs) {
125 return false;
126 }
127 msecs = Math.max(0, msecs - curMsecTimeout);
128 progressable.progress();
129 }
130 } finally {
131 lock.unlock();
132 }
133 return true;
134 }
135
136 @Override
137 public void waitForTimeoutOrFail(long timeout) {
138 long t0 = System.currentTimeMillis();
139 while (!waitMsecs(msecPeriod)) {
140 if (System.currentTimeMillis() > t0 + timeout) {
141 throw new RuntimeException("Timeout waiting");
142 }
143 progressable.progress();
144 }
145 }
146 }