1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.zk;
2021import java.io.IOException;
2223import org.apache.hadoop.util.Progressable;
24import org.apache.log4j.Logger;
25import org.apache.zookeeper.KeeperException;
26import org.apache.zookeeper.CreateMode;
27import org.apache.zookeeper.data.ACL;
28import org.apache.zookeeper.data.Stat;
2930import java.util.ArrayList;
31import java.util.Collections;
32import java.util.Comparator;
33import java.util.List;
3435import org.apache.zookeeper.Watcher;
36import org.apache.zookeeper.ZooKeeper;
3738/**39 * ZooKeeper provides only atomic operations. ZooKeeperExt provides additional40 * non-atomic operations that are useful. It also provides wrappers to41 * deal with ConnectionLossException. All methods of this class42 * should be thread-safe.43 */44publicclassZooKeeperExt {
45/** Length of the ZK sequence number */46publicstaticfinalint SEQUENCE_NUMBER_LENGTH = 10;
47/** Internal logger */48privatestaticfinal Logger LOG = Logger.getLogger(ZooKeeperExt.class);
49/** Internal ZooKeeper */50privatefinal ZooKeeper zooKeeper;
51/** Ensure we have progress */52privatefinal Progressable progressable;
53/** Number of max attempts to retry when failing due to connection loss */54privatefinalint maxRetryAttempts;
55/** Milliseconds to wait before trying again due to connection loss */56privatefinalint retryWaitMsecs;
5758/**59 * Constructor to connect to ZooKeeper, does not make progress60 *61 * @param connectString Comma separated host:port pairs, each corresponding62 * to a zk server. e.g.63 * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional64 * chroot suffix is used the example would look65 * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"66 * where the client would be rooted at "/app/a" and all paths67 * would be relative to this root - ie getting/setting/etc...68 * "/foo/bar" would result in operations being run on69 * "/app/a/foo/bar" (from the server perspective).70 * @param sessionTimeout Session timeout in milliseconds71 * @param maxRetryAttempts Max retry attempts during connection loss72 * @param retryWaitMsecs Msecs to wait when retrying due to connection73 * loss74 * @param watcher A watcher object which will be notified of state changes,75 * may also be notified for node events76 * @throws IOException77 */78publicZooKeeperExt(String connectString,
79int sessionTimeout,
80int maxRetryAttempts,
81int retryWaitMsecs,
82 Watcher watcher) throws IOException {
83this(connectString, sessionTimeout, maxRetryAttempts,
84 retryWaitMsecs, watcher, null);
85 }
8687/**88 * Constructor to connect to ZooKeeper, make progress89 *90 * @param connectString Comma separated host:port pairs, each corresponding91 * to a zk server. e.g.92 * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional93 * chroot suffix is used the example would look94 * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"95 * where the client would be rooted at "/app/a" and all paths96 * would be relative to this root - ie getting/setting/etc...97 * "/foo/bar" would result in operations being run on98 * "/app/a/foo/bar" (from the server perspective).99 * @param sessionTimeout Session timeout in milliseconds100 * @param maxRetryAttempts Max retry attempts during connection loss101 * @param retryWaitMsecs Msecs to wait when retrying due to connection102 * loss103 * @param watcher A watcher object which will be notified of state changes,104 * may also be notified for node events105 * @param progressable Makes progress for longer operations106 * @throws IOException107 */108publicZooKeeperExt(String connectString,
109int sessionTimeout,
110int maxRetryAttempts,
111int retryWaitMsecs,
112 Watcher watcher,
113 Progressable progressable) throws IOException {
114this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
115this.progressable = progressable;
116this.maxRetryAttempts = maxRetryAttempts;
117this.retryWaitMsecs = retryWaitMsecs;
118 }
119120/**121 * Provides a possibility of a creating a path consisting of more than one122 * znode (not atomic). If recursive is false, operates exactly the123 * same as create().124 *125 * @param path path to create126 * @param data data to set on the final znode127 * @param acl acls on each znode created128 * @param createMode only affects the final znode129 * @param recursive if true, creates all ancestors130 * @return Actual created path131 * @throws KeeperException132 * @throws InterruptedException133 */134public String createExt(
135final String path,
136 byte[] data,
137 List<ACL> acl,
138 CreateMode createMode,
139boolean recursive) throws KeeperException, InterruptedException {
140if (LOG.isDebugEnabled()) {
141 LOG.debug("createExt: Creating path " + path);
142 }
143144int attempt = 0;
145while (attempt < maxRetryAttempts) {
146try {
147if (!recursive) {
148return zooKeeper.create(path, data, acl, createMode);
149 }
150151try {
152return zooKeeper.create(path, data, acl, createMode);
153 } catch (KeeperException.NoNodeException e) {
154if (LOG.isDebugEnabled()) {
155 LOG.debug("createExt: Cannot directly create node " + path);
156 }
157 }
158159int pos = path.indexOf("/", 1);
160for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
161try {
162if (progressable != null) {
163 progressable.progress();
164 }
165 String filePath = path.substring(0, pos);
166if (zooKeeper.exists(filePath, false) == null) {
167 zooKeeper.create(
168 filePath, null, acl, CreateMode.PERSISTENT);
169 }
170 } catch (KeeperException.NodeExistsException e) {
171if (LOG.isDebugEnabled()) {
172 LOG.debug("createExt: Znode " + path.substring(0, pos) +
173" already exists");
174 }
175 }
176 }
177return zooKeeper.create(path, data, acl, createMode);
178 } catch (KeeperException.ConnectionLossException e) {
179 LOG.warn("createExt: Connection loss on attempt " + attempt + ", " +
180"waiting " + retryWaitMsecs + " msecs before retrying.", e);
181 }
182 ++attempt;
183 Thread.sleep(retryWaitMsecs);
184 }
185thrownew IllegalStateException("createExt: Failed to create " + path +
186" after " + attempt + " tries!");
187 }
188189/**190 * Data structure for handling the output of createOrSet()191 */192publicstaticclassPathStat {
193/** Path to created znode (if any) */194private String path;
195/** Stat from set znode (if any) */196private Stat stat;
197198/**199 * Put in results from createOrSet()200 *201 * @param path Path to created znode (or null)202 * @param stat Stat from set znode (if set)203 */204publicPathStat(String path, Stat stat) {
205this.path = path;
206this.stat = stat;
207 }
208209/**210 * Get the path of the created znode if it was created.211 *212 * @return Path of created znode or null if not created213 */214public String getPath() {
215return path;
216 }
217218/**219 * Get the stat of the set znode if set220 *221 * @return Stat of set znode or null if not set222 */223public Stat getStat() {
224return stat;
225 }
226 }
227228/**229 * Create a znode. Set the znode if the created znode already exists.230 *231 * @param path path to create232 * @param data data to set on the final znode233 * @param acl acls on each znode created234 * @param createMode only affects the final znode235 * @param recursive if true, creates all ancestors236 * @param version Version to set if setting237 * @return Path of created znode or Stat of set znode238 * @throws InterruptedException239 * @throws KeeperException240 */241publicPathStat createOrSetExt(final String path,
242 byte[] data,
243 List<ACL> acl,
244 CreateMode createMode,
245boolean recursive,
246int version) throws KeeperException, InterruptedException {
247 String createdPath = null;
248 Stat setStat = null;
249try {
250 createdPath = createExt(path, data, acl, createMode, recursive);
251 } catch (KeeperException.NodeExistsException e) {
252if (LOG.isDebugEnabled()) {
253 LOG.debug("createOrSet: Node exists on path " + path);
254 }
255 setStat = zooKeeper.setData(path, data, version);
256 }
257returnnewPathStat(createdPath, setStat);
258 }
259260/**261 * Create a znode if there is no other znode there262 *263 * @param path path to create264 * @param data data to set on the final znode265 * @param acl acls on each znode created266 * @param createMode only affects the final znode267 * @param recursive if true, creates all ancestors268 * @return Path of created znode or Stat of set znode269 * @throws InterruptedException270 * @throws KeeperException271 */272publicPathStat createOnceExt(final String path,
273 byte[] data,
274 List<ACL> acl,
275 CreateMode createMode,
276boolean recursive) throws KeeperException, InterruptedException {
277 String createdPath = null;
278 Stat setStat = null;
279try {
280 createdPath = createExt(path, data, acl, createMode, recursive);
281 } catch (KeeperException.NodeExistsException e) {
282if (LOG.isDebugEnabled()) {
283 LOG.debug("createOnceExt: Node already exists on path " + path);
284 }
285 }
286returnnewPathStat(createdPath, setStat);
287 }
288289/**290 * Delete a path recursively. When the deletion is recursive, it is a291 * non-atomic operation, hence, not part of ZooKeeper.292 * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)293 * @param version expected version (-1 for all)294 * @param recursive if true, remove all children, otherwise behave like295 * remove()296 * @throws InterruptedException297 * @throws KeeperException298 */299publicvoid deleteExt(final String path, int version, boolean recursive)
300throws InterruptedException, KeeperException {
301int attempt = 0;
302while (attempt < maxRetryAttempts) {
303try {
304if (!recursive) {
305 zooKeeper.delete(path, version);
306return;
307 }
308309try {
310 zooKeeper.delete(path, version);
311return;
312 } catch (KeeperException.NotEmptyException e) {
313if (LOG.isDebugEnabled()) {
314 LOG.debug("deleteExt: Cannot directly remove node " + path);
315 }
316 }
317318 List<String> childList = zooKeeper.getChildren(path, false);
319for (String child : childList) {
320if (progressable != null) {
321 progressable.progress();
322 }
323 deleteExt(path + "/" + child, -1, true);
324 }
325326 zooKeeper.delete(path, version);
327return;
328 } catch (KeeperException.ConnectionLossException e) {
329 LOG.warn("deleteExt: Connection loss on attempt " +
330 attempt + ", waiting " + retryWaitMsecs +
331" msecs before retrying.", e);
332 }
333 ++attempt;
334 Thread.sleep(retryWaitMsecs);
335 }
336thrownew IllegalStateException("deleteExt: Failed to delete " + path +
337" after " + attempt + " tries!");
338 }
339340/**341 * Return the stat of the node of the given path. Return null if no such a342 * node exists.343 * <p>344 * If the watch is true and the call is successful (no exception is thrown),345 * a watch will be left on the node with the given path. The watch will be346 * triggered by a successful operation that creates/delete the node or sets347 * the data on the node.348 *349 * @param path350 * the node path351 * @param watch352 * whether need to watch this node353 * @return the stat of the node of the given path; return null if no such a354 * node exists.355 * @throws KeeperException If the server signals an error356 * @throws InterruptedException If the server transaction is interrupted.357 */358public Stat exists(String path, boolean watch) throws KeeperException,
359 InterruptedException {
360int attempt = 0;
361while (attempt < maxRetryAttempts) {
362try {
363return zooKeeper.exists(path, watch);
364 } catch (KeeperException.ConnectionLossException e) {
365 LOG.warn("exists: Connection loss on attempt " +
366 attempt + ", waiting " + retryWaitMsecs +
367" msecs before retrying.", e);
368 }
369 ++attempt;
370 Thread.sleep(retryWaitMsecs);
371 }
372thrownew IllegalStateException("exists: Failed to check " + path +
373" after " + attempt + " tries!");
374 }
375376/**377 * Return the stat of the node of the given path. Return null if no such a378 * node exists.379 * <p>380 * If the watch is non-null and the call is successful (no exception is381 * thrown), a watch will be left on the node with the given path. The382 * watch will be triggered by a successful operation that383 * creates/delete the node or sets the data on the node.384 *385 * @param path the node path386 * @param watcher explicit watcher387 * @return the stat of the node of the given path; return null if no such a388 * node exists.389 * @throws KeeperException If the server signals an error390 * @throws InterruptedException If the server transaction is interrupted.391 * @throws IllegalArgumentException if an invalid path is specified392 */393public Stat exists(final String path, Watcher watcher)
394throws KeeperException, InterruptedException {
395int attempt = 0;
396while (attempt < maxRetryAttempts) {
397try {
398return zooKeeper.exists(path, watcher);
399 } catch (KeeperException.ConnectionLossException e) {
400 LOG.warn("exists: Connection loss on attempt " +
401 attempt + ", waiting " + retryWaitMsecs +
402" msecs before retrying.", e);
403 }
404 ++attempt;
405 Thread.sleep(retryWaitMsecs);
406 }
407thrownew IllegalStateException("exists: Failed to check " + path +
408" after " + attempt + " tries!");
409 }
410411/**412 * Return the data and the stat of the node of the given path.413 * <p>414 * If the watch is non-null and the call is successful (no exception is415 * thrown), a watch will be left on the node with the given path. The watch416 * will be triggered by a successful operation that sets data on the node, or417 * deletes the node.418 * <p>419 * A KeeperException with error code KeeperException.NoNode will be thrown420 * if no node with the given path exists.421 *422 * @param path the given path423 * @param watcher explicit watcher424 * @param stat the stat of the node425 * @return the data of the node426 * @throws KeeperException If the server signals an error with a non-zero427 * error code428 * @throws InterruptedException If the server transaction is interrupted.429 * @throws IllegalArgumentException if an invalid path is specified430 */431public byte[] getData(final String path, Watcher watcher, Stat stat)
432throws KeeperException, InterruptedException {
433int attempt = 0;
434while (attempt < maxRetryAttempts) {
435try {
436return zooKeeper.getData(path, watcher, stat);
437 } catch (KeeperException.ConnectionLossException e) {
438 LOG.warn("getData: Connection loss on attempt " +
439 attempt + ", waiting " + retryWaitMsecs +
440" msecs before retrying.", e);
441 }
442 ++attempt;
443 Thread.sleep(retryWaitMsecs);
444 }
445thrownew IllegalStateException("getData: Failed to get " + path +
446" after " + attempt + " tries!");
447 }
448449/**450 * Return the data and the stat of the node of the given path.451 * <p>452 * If the watch is true and the call is successful (no exception is453 * thrown), a watch will be left on the node with the given path. The watch454 * will be triggered by a successful operation that sets data on the node, or455 * deletes the node.456 * <p>457 * A KeeperException with error code KeeperException.NoNode will be thrown458 * if no node with the given path exists.459 *460 * @param path the given path461 * @param watch whether need to watch this node462 * @param stat the stat of the node463 * @return the data of the node464 * @throws KeeperException If the server signals an error with a non-zero465 * error code466 * @throws InterruptedException If the server transaction is interrupted.467 */468public byte[] getData(String path, boolean watch, Stat stat)
469throws KeeperException, InterruptedException {
470int attempt = 0;
471while (attempt < maxRetryAttempts) {
472try {
473return zooKeeper.getData(path, watch, stat);
474 } catch (KeeperException.ConnectionLossException e) {
475 LOG.warn("getData: Connection loss on attempt " +
476 attempt + ", waiting " + retryWaitMsecs +
477" msecs before retrying.", e);
478 }
479 ++attempt;
480 Thread.sleep(retryWaitMsecs);
481 }
482thrownew IllegalStateException("getData: Failed to get " + path +
483" after " + attempt + " tries!");
484 }
485486/**487 * Get the children of the path with extensions.488 * Extension 1: Sort the children based on sequence number489 * Extension 2: Get the full path instead of relative path490 *491 * @param path path to znode492 * @param watch set the watch?493 * @param sequenceSorted sort by the sequence number494 * @param fullPath if true, get the fully znode path back495 * @return list of children496 * @throws InterruptedException497 * @throws KeeperException498 */499public List<String> getChildrenExt(
500final String path,
501boolean watch,
502boolean sequenceSorted,
503boolean fullPath) throws KeeperException, InterruptedException {
504int attempt = 0;
505while (attempt < maxRetryAttempts) {
506try {
507 List<String> childList = zooKeeper.getChildren(path, watch);
508/* Sort children according to the sequence number, if desired */509if (sequenceSorted) {
510 Collections.sort(childList, new Comparator<String>() {
511publicint compare(String s1, String s2) {
512if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
513 (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
514thrownew RuntimeException(
515"getChildrenExt: Invalid length for sequence " +
516" sorting > " +
517 SEQUENCE_NUMBER_LENGTH +
518" for s1 (" +
519 s1.length() + ") or s2 (" + s2.length() + ")");
520 }
521int s1sequenceNumber = Integer.parseInt(
522 s1.substring(s1.length() -
523 SEQUENCE_NUMBER_LENGTH));
524int s2sequenceNumber = Integer.parseInt(
525 s2.substring(s2.length() -
526 SEQUENCE_NUMBER_LENGTH));
527return s1sequenceNumber - s2sequenceNumber;
528 }
529 });
530 }
531if (fullPath) {
532 List<String> fullChildList = new ArrayList<String>();
533for (String child : childList) {
534 fullChildList.add(path + "/" + child);
535 }
536return fullChildList;
537 }
538return childList;
539 } catch (KeeperException.ConnectionLossException e) {
540 LOG.warn("getChildrenExt: Connection loss on attempt " +
541 attempt + ", waiting " + retryWaitMsecs +
542" msecs before retrying.", e);
543 }
544 ++attempt;
545 Thread.sleep(retryWaitMsecs);
546 }
547thrownew IllegalStateException("createExt: Failed to create " + path +
548" after " + attempt + " tries!");
549 }
550551/**552 * Close this client object. Once the client is closed, its session becomes553 * invalid. All the ephemeral nodes in the ZooKeeper server associated with554 * the session will be removed. The watches left on those nodes (and on555 * their parents) will be triggered.556 *557 * @throws InterruptedException558 */559publicvoid close() throws InterruptedException {
560 zooKeeper.close();
561 }
562 }