1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.yarn;
1920importstatic org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
2122import com.google.common.collect.ImmutableList;
23import com.google.common.collect.Maps;
2425import com.google.common.collect.Sets;
26import java.util.Set;
27import org.apache.giraph.conf.GiraphConfiguration;
28import org.apache.giraph.conf.GiraphConstants;
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.fs.FileStatus;
31import org.apache.hadoop.fs.FileSystem;
32import org.apache.hadoop.fs.Path;
33import org.apache.hadoop.io.DataOutputBuffer;
34import org.apache.hadoop.security.Credentials;
35import org.apache.hadoop.security.UserGroupInformation;
36import org.apache.hadoop.security.token.Token;
37import org.apache.hadoop.yarn.api.ApplicationConstants;
38import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
39import org.apache.hadoop.yarn.api.records.ApplicationId;
40import org.apache.hadoop.yarn.api.records.ApplicationReport;
41import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
42import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
43import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
44import org.apache.hadoop.yarn.api.records.LocalResource;
45import org.apache.hadoop.yarn.api.records.NodeReport;
46import org.apache.hadoop.yarn.api.records.NodeState;
47import org.apache.hadoop.yarn.api.records.Resource;
48import org.apache.hadoop.yarn.api.records.YarnApplicationState;
49import org.apache.hadoop.yarn.client.api.YarnClient;
50import org.apache.hadoop.yarn.client.api.YarnClientApplication;
51import org.apache.hadoop.yarn.conf.YarnConfiguration;
52import org.apache.hadoop.yarn.exceptions.YarnException;
53import org.apache.hadoop.yarn.util.Records;
5455import org.apache.log4j.Logger;
5657import java.io.IOException;
58import java.util.List;
59import java.util.Map;
60import java.nio.ByteBuffer;
6162/**63 * The initial launcher for a YARN-based Giraph job. This class attempts to64 * configure and send a request to the ResourceManager for a single65 * application container to host GiraphApplicationMaster. The RPC connection66 * between the RM and GiraphYarnClient is the YARN ApplicationManager.67 */68publicclassGiraphYarnClient {
69static {
70 Configuration.addDefaultResource("giraph-site.xml");
71 }
72/** Class logger */73privatestaticfinal Logger LOG = Logger.getLogger(GiraphYarnClient.class);
74/** Sleep time between silent progress checks */75privatestaticfinalint JOB_STATUS_INTERVAL_MSECS = 800;
76/** Memory (in MB) to allocate for our ApplicationMaster container */77privatestaticfinalint YARN_APP_MASTER_MEMORY_MB = 512;
7879/** human-readable job name */80privatefinal String jobName;
81/** Helper configuration from the job */82privatefinalGiraphConfiguration giraphConf;
83/** ApplicationId object (needed for RPC to ResourceManager) */84private ApplicationId appId;
85/** # of sleeps between progress reports to client */86privateint reportCounter;
87/** Yarn client object */88private YarnClient yarnClient;
8990/**91 * Constructor. Requires caller to hand us a GiraphConfiguration.92 *93 * @param giraphConf User-defined configuration94 * @param jobName User-defined job name95 */96publicGiraphYarnClient(GiraphConfiguration giraphConf, String jobName)
97throws IOException {
98this.reportCounter = 0;
99this.jobName = jobName;
100this.appId = null; // can't set this until after start()101this.giraphConf = giraphConf;
102 verifyOutputDirDoesNotExist();
103 yarnClient = YarnClient.createYarnClient();
104 yarnClient.init(giraphConf);
105 }
106107/**108 * Submit a request to the Hadoop YARN cluster's ResourceManager109 * to obtain an application container. This will run our ApplicationMaster,110 * which will in turn request app containers for Giraphs' master and all111 * worker tasks.112 * @param verbose Not implemented yet, to provide compatibility w/GiraphJob113 * @return true if job is successful114 */115publicboolean run(finalboolean verbose) throws YarnException, IOException {
116// init our connection to YARN ResourceManager RPC117 LOG.info("Running Client");
118 yarnClient.start();
119// request an application id from the RM120// Get a new application id121 YarnClientApplication app = yarnClient.createApplication();
122 GetNewApplicationResponse getNewAppResponse = app.
123 getNewApplicationResponse();
124 checkPerNodeResourcesAvailable(getNewAppResponse);
125// configure our request for an exec container for GiraphApplicationMaster126 ApplicationSubmissionContext appContext = app.
127 getApplicationSubmissionContext();
128 appId = appContext.getApplicationId();
129//createAppSubmissionContext(appContext);130 appContext.setApplicationId(appId);
131 appContext.setApplicationName(jobName);
132 LOG.info("Obtained new Application ID: " + appId);
133// sanity check134 applyConfigsForYarnGiraphJob();
135136 ContainerLaunchContext containerContext = buildContainerLaunchContext();
137 appContext.setResource(buildContainerMemory());
138 appContext.setAMContainerSpec(containerContext);
139 LOG.info("ApplicationSumbissionContext for GiraphApplicationMaster " +
140"launch container is populated.");
141//TODO: priority and queue142// Set the priority for the application master143//Priority pri = Records.newRecord(Priority.class);144// TODO - what is the range for priority? how to decide?145//pri.setPriority(amPriority);146//appContext.setPriority(pri);147148// Set the queue to which this application is to be submitted in the RM149//appContext.setQueue(amQueue);150151// make the request, blow up if fail, loop and report job progress if not152try {
153 LOG.info("Submitting application to ASM");
154// obtain an "updated copy" of the appId for status checks/job kill later155 appId = yarnClient.submitApplication(appContext);
156 LOG.info("Got new appId after submission :" + appId);
157 } catch (YarnException yre) {
158// TODO159// Try submitting the same request again160// app submission failure?161thrownew RuntimeException("submitApplication(appContext) FAILED.", yre);
162 }
163 LOG.info("GiraphApplicationMaster container request was submitted to " +
164"ResourceManager for job: " + jobName);
165return awaitGiraphJobCompletion();
166 }
167168/**169 * Without Hadoop MR to check for us, make sure the output dir doesn't exist!170 */171privatevoid verifyOutputDirDoesNotExist() {
172 Path outDir = null;
173try {
174 FileSystem fs = FileSystem.get(giraphConf);
175 String errorMsg = "__ERROR_NO_OUTPUT_DIR_SET__";
176 outDir =
177new Path(fs.getHomeDirectory(), giraphConf.get(OUTDIR, errorMsg));
178 FileStatus outStatus = fs.getFileStatus(outDir);
179if (outStatus.isDirectory() || outStatus.isFile() ||
180 outStatus.isSymlink()) {
181thrownew IllegalStateException("Path " + outDir + " already exists.");
182 }
183 } catch (IOException ioe) {
184 LOG.info("Final output path is: " + outDir);
185 }
186 }
187188/**189 * Configuration settings we need to customize for a Giraph on YARN190 * job. We need to call this EARLY in the job, before the GiraphConfiguration191 * is exported to HDFS for localization in each task container.192 */193privatevoid applyConfigsForYarnGiraphJob() {
194 GiraphConstants.IS_PURE_YARN_JOB.set(giraphConf, true);
195 GiraphConstants.SPLIT_MASTER_WORKER.set(giraphConf, true);
196 giraphConf.set("mapred.job.id", "giraph_yarn_" + appId); // ZK app base path197 }
198199/**200 * Utility to make sure we have the cluster resources we need to run this201 * job. If they are not available, we should die here before too much setup.202 * @param cluster the GetNewApplicationResponse from the YARN RM.203 */204privatevoid checkPerNodeResourcesAvailable(
205final GetNewApplicationResponse cluster) throws YarnException, IOException {
206// are there enough containers to go around for our Giraph job?207 List<NodeReport> nodes = null;
208long totalAvailable = 0;
209try {
210 nodes = yarnClient.getNodeReports(NodeState.RUNNING);
211 } catch (YarnException yre) {
212thrownew RuntimeException("GiraphYarnClient could not connect with " +
213"the YARN ResourceManager to determine the number of available " +
214"application containers.", yre);
215 }
216for (NodeReport node : nodes) {
217 LOG.info("Got node report from ASM for" +
218", nodeId=" + node.getNodeId() +
219", nodeAddress " + node.getHttpAddress() +
220", nodeRackName " + node.getRackName() +
221", nodeNumContainers " + node.getNumContainers());
222 totalAvailable += node.getCapability().getMemory();
223 }
224// 1 master + all workers in -w command line arg225finalint workers = giraphConf.getMaxWorkers() + 1;
226 checkAndAdjustPerTaskHeapSize(cluster);
227finallong totalAsk =
228 giraphConf.getYarnTaskHeapMb() * workers;
229if (totalAsk > totalAvailable) {
230thrownew IllegalStateException("Giraph's estimated cluster heap " +
231 totalAsk + "MB ask is greater than the current available cluster " +
232"heap of " + totalAvailable + "MB. Aborting Job.");
233 }
234 }
235236/**237 * Adjust the user-supplied <code>-yh</code> and <code>-w</code>238 * settings if they are too small or large for the current cluster,239 * and re-record the new settings in the GiraphConfiguration for export.240 * @param gnar the GetNewAppResponse from the YARN ResourceManager.241 */242privatevoid checkAndAdjustPerTaskHeapSize(
243final GetNewApplicationResponse gnar) {
244// do we have the right heap size on these cluster nodes to run our job?245//TODO:246//final int minCapacity = gnar.getMinimumResourceCapability().getMemory();247finalint maxCapacity = gnar.getMaximumResourceCapability().getMemory();
248// make sure heap size is OK for this cluster's available containers249int giraphMem = giraphConf.getYarnTaskHeapMb();
250if (giraphMem == GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB_DEFAULT) {
251 LOG.info("Defaulting per-task heap size to " + giraphMem + "MB.");
252 }
253if (giraphMem > maxCapacity) {
254 LOG.info("Giraph's request of heap MB per-task is more than the " +
255"maximum; downgrading Giraph to" + maxCapacity + "MB.");
256 giraphMem = maxCapacity;
257 }
258/*if (giraphMem < minCapacity) { //TODO:259 LOG.info("Giraph's request of heap MB per-task is less than the " +260 "minimum; upgrading Giraph to " + minCapacity + "MB.");261 giraphMem = minCapacity;262 }*/263 giraphConf.setYarnTaskHeapMb(giraphMem); // record any changes made264 }
265266/**267 * Kill time for the client, report progress occasionally, and otherwise268 * just sleep and wait for the job to finish. If no AM response, kill the app.269 * @return true if job run is successful.270 */271privateboolean awaitGiraphJobCompletion() throws YarnException, IOException {
272boolean done;
273 ApplicationReport report = null;
274try {
275do {
276try {
277 Thread.sleep(JOB_STATUS_INTERVAL_MSECS);
278 } catch (InterruptedException ir) {
279 LOG.info("Progress reporter's sleep was interrupted!", ir);
280 }
281 report = yarnClient.getApplicationReport(appId);
282 done = checkProgress(report);
283 } while (!done);
284if (!giraphConf.metricsEnabled()) {
285 cleanupJarCache();
286 }
287 } catch (IOException ex) {
288final String diagnostics = (null == report) ? "" :
289"Diagnostics: " + report.getDiagnostics();
290 LOG.error("Fatal fault encountered, failing " + jobName + ". " +
291 diagnostics, ex);
292try {
293 LOG.error("FORCIBLY KILLING Application from AppMaster.");
294 yarnClient.killApplication(appId);
295 } catch (YarnException yre) {
296 LOG.error("Exception raised in attempt to kill application.", yre);
297 }
298return false;
299 }
300return printFinalJobReport();
301 }
302303/**304 * Deletes the HDFS cache in YARN, which replaces DistributedCache of Hadoop.305 * If metrics are enabled this will not get called (so you can examine cache.)306 * @throws IOException if bad things happen.307 */308privatevoid cleanupJarCache() throws IOException {
309 FileSystem fs = FileSystem.get(giraphConf);
310 Path baseCacheDir = YarnUtils.getFsCachePath(fs, appId);
311if (fs.exists(baseCacheDir)) {
312 LOG.info("Cleaning up HDFS distributed cache directory for Giraph job.");
313 fs.delete(baseCacheDir, true); // stuff inside314 fs.delete(baseCacheDir, false); // dir itself315 }
316 }
317318/**319 * Print final formatted job report for local client that initiated this run.320 * @return true for app success, false for failure.321 */322privateboolean printFinalJobReport() throws YarnException, IOException {
323 ApplicationReport report;
324try {
325 report = yarnClient.getApplicationReport(appId);
326 FinalApplicationStatus finalAppStatus =
327 report.getFinalApplicationStatus();
328finallong secs =
329 (report.getFinishTime() - report.getStartTime()) / 1000L;
330final String time = String.format("%d minutes, %d seconds.",
331 secs / 60L, secs % 60L);
332 LOG.info("Completed " + jobName + ": " +
333 finalAppStatus.name() + ", total running time: " + time);
334 } catch (YarnException yre) {
335 LOG.error("Exception encountered while attempting to request " +
336"a final job report for " + jobName , yre);
337return false;
338 }
339returntrue;
340 }
341342/**343 * Compose the ContainerLaunchContext for the Application Master.344 * @return the CLC object populated and configured.345 */346private ContainerLaunchContext buildContainerLaunchContext()
347throws IOException {
348 ContainerLaunchContext appMasterContainer =
349 Records.newRecord(ContainerLaunchContext.class);
350 appMasterContainer.setEnvironment(buildEnvironment());
351 appMasterContainer.setLocalResources(buildLocalResourceMap());
352 appMasterContainer.setCommands(buildAppMasterExecCommand());
353//appMasterContainer.setResource(buildContainerMemory());354//appMasterContainer.setUser(ApplicationConstants.Environment.USER.name());355 setToken(appMasterContainer);
356return appMasterContainer;
357 }
358359/**360 * Set delegation tokens for AM container361 * @param amContainer AM container362 * @return363 */364privatevoid setToken(ContainerLaunchContext amContainer) throws IOException {
365// Setup security tokens366if (UserGroupInformation.isSecurityEnabled()) {
367 Credentials credentials = new Credentials();
368 String tokenRenewer = giraphConf.get(YarnConfiguration.RM_PRINCIPAL);
369if (tokenRenewer == null || tokenRenewer.length() == 0) {
370thrownew IOException(
371"Can't get Master Kerberos principal for the RM to use as renewer");
372 }
373 FileSystem fs = FileSystem.get(giraphConf);
374// For now, only getting tokens for the default file-system.375final Token<?> [] tokens =
376 fs.addDelegationTokens(tokenRenewer, credentials);
377if (tokens != null) {
378for (Token<?> token : tokens) {
379 LOG.info("Got dt for " + fs.getUri() + "; " + token);
380 }
381 }
382 DataOutputBuffer dob = new DataOutputBuffer();
383 credentials.writeTokenStorageToStream(dob);
384 ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
385 amContainer.setTokens(fsTokens);
386 }
387 }
388389/**390 * Assess whether job is already finished/failed and 'done' flag needs to be391 * set, prints progress display for client if all is going well.392 * @param report the application report to assess.393 * @return true if job report indicates the job run is over.394 */395privateboolean checkProgress(final ApplicationReport report) {
396 YarnApplicationState jobState = report.getYarnApplicationState();
397if (jobState == YarnApplicationState.FINISHED ||
398 jobState == YarnApplicationState.KILLED) {
399returntrue;
400 } elseif (jobState == YarnApplicationState.FAILED) {
401 LOG.error(jobName + " reports FAILED state, diagnostics show: " +
402 report.getDiagnostics());
403returntrue;
404 } else {
405if (reportCounter++ % 5 == 0) {
406 displayJobReport(report);
407 }
408 }
409return false;
410 }
411412/**413 * Display a formatted summary of the job progress report from the AM.414 * @param report the report to display.415 */416privatevoid displayJobReport(final ApplicationReport report) {
417if (null == report) {
418thrownew IllegalStateException("[*] Latest ApplicationReport for job " +
419 jobName + " was not received by the local client.");
420 }
421finalfloat elapsed =
422 (System.currentTimeMillis() - report.getStartTime()) / 1000.0f;
423 LOG.info(jobName + ", Elapsed: " + String.format("%.2f secs", elapsed));
424 LOG.info(report.getCurrentApplicationAttemptId() + ", State: " +
425 report.getYarnApplicationState().name() + ", Containers used: " +
426 report.getApplicationResourceUsageReport().getNumUsedContainers());
427 }
428429/**430 * Utility to produce the command line to activate the AM from the shell.431 * @return A <code>List<String></code> of shell commands to execute in432 * the container allocated to us by the RM to host our App Master.433 */434private List<String> buildAppMasterExecCommand() {
435// 'gam-' prefix is for GiraphApplicationMaster in log file names436return ImmutableList.of("${JAVA_HOME}/bin/java " +
437"-Xmx" + YARN_APP_MASTER_MEMORY_MB + "M " +
438"-Xms" + YARN_APP_MASTER_MEMORY_MB + "M " + // TODO: REMOVE examples jar!439//TODO: Make constant440"-cp .:${CLASSPATH} org.apache.giraph.yarn.GiraphApplicationMaster " +
441"1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stdout.log " +
442"2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/gam-stderr.log "443 );
444 }
445446/**447 * Register all local jar files from GiraphConstants.GIRAPH_YARN_LIBJARS448 * in the LocalResources map, copy to HDFS on that same registered path.449 * @param map the LocalResources list to populate.450 */451privatevoid addLocalJarsToResourceMap(Map<String, LocalResource> map)
452throws IOException {
453 Set<String> jars = Sets.newHashSet();
454 LOG.info("LIB JARS :" + giraphConf.getYarnLibJars());
455 String[] libJars = giraphConf.getYarnLibJars().split(",");
456for (String libJar : libJars) {
457 jars.add(libJar);
458 }
459 FileSystem fs = FileSystem.get(giraphConf);
460 Path baseDir = YarnUtils.getFsCachePath(fs, appId);
461for (Path jar : YarnUtils.getLocalFiles(jars)) {
462 Path dest = new Path(baseDir, jar.getName());
463 LOG.info("Made local resource for :" + jar + " to " + dest);
464 fs.copyFromLocalFile(false, true, jar, dest);
465 YarnUtils.addFileToResourceMap(map, fs, dest);
466 }
467 }
468469/**470 * Construct the memory requirements for the AppMaster's container request.471 * @return A Resource that wraps the memory request.472 */473private Resource buildContainerMemory() {
474 Resource capability = Records.newRecord(Resource.class);
475 capability.setMemory(YARN_APP_MASTER_MEMORY_MB); //Configurable thru CLI?476return capability;
477 }
478479/**480 * Create the mapping of environment vars that will be visible to the481 * ApplicationMaster in its remote app container.482 * @return a map of environment vars to set up for the AppMaster.483 */484private Map<String, String> buildEnvironment() {
485 Map<String, String> environment =
486 Maps.<String, String>newHashMap();
487 LOG.info("Set the environment for the application master");
488 YarnUtils.addLocalClasspathToEnv(environment, giraphConf);
489//TODO: add the runtime classpath needed for tests to work490 LOG.info("Environment for AM :" + environment);
491return environment;
492 }
493494/**495 * Create the mapping of files and JARs to send to the GiraphApplicationMaster496 * and from there on to the Giraph tasks.497 * @return the map of jars to local resource paths for transport498 * to the host container that will run our AppMaster.499 */500private Map<String, LocalResource> buildLocalResourceMap() {
501// set local resources for the application master502// local files or archives as needed503// In this scenario, the jar file for the application master504//is part of the local resources505 Map<String, LocalResource> localResources =
506 Maps.<String, LocalResource>newHashMap();
507 LOG.info("buildLocalResourceMap ....");
508try {
509// export the GiraphConfiguration to HDFS for localization to remote tasks510//Ques: Merge the following two method511 YarnUtils.exportGiraphConfiguration(giraphConf, appId);
512 YarnUtils.addGiraphConfToLocalResourceMap(
513 giraphConf, appId, localResources);
514// add jars from '-yj' cmd-line arg to resource map for localization515 addLocalJarsToResourceMap(localResources);
516//TODO: log4j?517return localResources;
518 } catch (IOException ioe) {
519thrownew IllegalStateException("Failed to build LocalResouce map.", ioe);
520 }
521 }
522523 }