1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.zk;
1920import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
21import org.apache.giraph.conf.GiraphConstants;
22import org.apache.giraph.utils.ThreadUtils;
23import org.apache.log4j.Logger;
24import org.apache.zookeeper.jmx.ManagedUtil;
25import org.apache.zookeeper.server.DatadirCleanupManager;
26import org.apache.zookeeper.server.ServerCnxnFactory;
27import org.apache.zookeeper.server.ZooKeeperServer;
28import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
29import org.apache.zookeeper.server.quorum.QuorumPeerMain;
3031import javax.management.JMException;
32import java.io.File;
33import java.io.IOException;
3435/**36 * Zookeeper wrapper that starts zookeeper withing master process.37 */38publicclassInProcessZooKeeperRunner39extendsDefaultImmutableClassesGiraphConfigurable40implementsZooKeeperRunner {
4142/** Class logger */43privatestaticfinal Logger LOG =
44 Logger.getLogger(InProcessZooKeeperRunner.class);
45/**46 * Wrapper for zookeeper quorum.47 */48privateQuorumRunner quorumRunner = newQuorumRunner();
4950 @Override
51publicint start(String zkDir, ZookeeperConfig config) throws IOException {
52return quorumRunner.start(config);
53 }
5455 @Override
56publicvoid stop() {
57try {
58 quorumRunner.stop();
59 } catch (InterruptedException e) {
60 LOG.error("Unable to cleanly shutdown zookeeper", e);
61 }
62 }
6364 @Override
65publicvoid cleanup() {
66 }
6768/**69 * Wrapper around zookeeper quorum. Does not necessarily70 * starts quorum, if there is only one server in config file71 * will only start zookeeper.72 */73privatestaticclassQuorumRunnerextends QuorumPeerMain {
7475/**76 * ZooKeeper server wrapper.77 */78privateZooKeeperServerRunner serverRunner;
7980/**81 * Starts quorum and/or zookeeper service.82 * @param config quorum and zookeeper configuration83 * @return zookeeper port84 * @throws IOException if can't start zookeeper85 */86publicint start(ZookeeperConfig config) throws IOException {
87 serverRunner = newZooKeeperServerRunner();
88//Make sure zookeeper starts first and purge manager last89//This is important because zookeeper creates a folder90//strucutre on the local disk. Purge manager also tries91//to create it but from a different thread and can run into92//race condition. See FileTxnSnapLog source code for details.93int port = serverRunner.start(config);
94// Start and schedule the the purge task95 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
96 config
97 .getDataDir(), config.getDataLogDir(),
98 GiraphConstants.ZOOKEEPER_SNAP_RETAIN_COUNT,
99 GiraphConstants.ZOOKEEPER_PURGE_INTERVAL);
100 purgeMgr.start();
101102return port;
103 }
104105/**106 * Stop quorum and/or zookeeper.107 * @throws InterruptedException108 */109publicvoid stop() throws InterruptedException {
110if (quorumPeer != null) {
111 quorumPeer.shutdown();
112 quorumPeer.join();
113 } elseif (serverRunner != null) {
114 serverRunner.stop();
115 } else {
116 LOG.warn("Neither quorum nor server is set");
117 }
118 }
119 }
120121/**122 * Wrapper around zookeeper service.123 */124publicstaticclassZooKeeperServerRunner {
125/**126 * Reference to zookeeper factory.127 */128private ServerCnxnFactory cnxnFactory;
129/**130 * Reference to zookeeper server.131 */132private ZooKeeperServer zkServer;
133134/**135 * Start zookeeper service.136 * @param config zookeeper configuration137 * formatted properly138 * @return the port zookeeper has started on.139 * @throws IOException140 */141publicint start(ZookeeperConfig config) throws IOException {
142 LOG.warn("Either no config or no quorum defined in config, " +
143"running in process");
144try {
145 ManagedUtil.registerLog4jMBeans();
146 } catch (JMException e) {
147 LOG.warn("Unable to register log4j JMX control", e);
148 }
149150 runFromConfig(config);
151 ThreadUtils.startThread(new Runnable() {
152 @Override
153publicvoid run() {
154try {
155 cnxnFactory.join();
156if (zkServer.isRunning()) {
157 zkServer.shutdown();
158 }
159 } catch (InterruptedException e) {
160 LOG.error(e.getMessage(), e);
161 }
162163 }
164 }, "zk-thread");
165return zkServer.getClientPort();
166 }
167168169/**170 * Run from a ServerConfig.171 * @param config ServerConfig to use.172 * @throws IOException173 */174publicvoid runFromConfig(ZookeeperConfig config) throws IOException {
175 LOG.info("Starting server");
176try {
177// Note that this thread isn't going to be doing anything else,178// so rather than spawning another thread, we will just call179// run() in this thread.180// create a file logger url from the command line args181 zkServer = new ZooKeeperServer();
182183 FileTxnSnapLog ftxn = new FileTxnSnapLog(new184 File(config.getDataLogDir()), new File(config.getDataDir()));
185 zkServer.setTxnLogFactory(ftxn);
186 zkServer.setTickTime(GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME);
187 zkServer.setMinSessionTimeout(config.getMinSessionTimeout());
188 zkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
189 cnxnFactory = ServerCnxnFactory.createFactory();
190 cnxnFactory.configure(config.getClientPortAddress(),
191 GiraphConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS);
192 cnxnFactory.startup(zkServer);
193 } catch (InterruptedException e) {
194// warn, but generally this is ok195 LOG.warn("Server interrupted", e);
196 }
197 }
198199200/**201 * Stop zookeeper service.202 */203publicvoid stop() {
204 cnxnFactory.shutdown();
205 }
206 }
207 }