1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.yarn;
1920import com.google.common.collect.ImmutableList;
2122import com.google.common.collect.Maps;
23import org.apache.giraph.conf.GiraphConfiguration;
24import org.apache.giraph.conf.GiraphConstants;
25import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26import org.apache.hadoop.conf.Configuration;
27import org.apache.hadoop.mapreduce.MRJobConfig;
28import org.apache.hadoop.io.DataOutputBuffer;
29import org.apache.hadoop.security.Credentials;
30import org.apache.hadoop.security.UserGroupInformation;
31import org.apache.hadoop.security.token.Token;
32import org.apache.hadoop.yarn.api.ApplicationConstants;
33import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
34import org.apache.hadoop.yarn.api.protocolrecords
35 .RegisterApplicationMasterResponse;
36import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
37import org.apache.hadoop.yarn.api.records.Container;
38import org.apache.hadoop.yarn.api.records.ContainerId;
39import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
40import org.apache.hadoop.yarn.api.records.ContainerStatus;
41import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
42import org.apache.hadoop.yarn.api.records.LocalResource;
43import org.apache.hadoop.yarn.api.records.NodeReport;
44import org.apache.hadoop.yarn.api.records.Priority;
45import org.apache.hadoop.yarn.api.records.Resource;
46import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
47import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
48import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
49import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
50import org.apache.hadoop.yarn.conf.YarnConfiguration;
51import org.apache.hadoop.yarn.exceptions.YarnException;
52import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
53import org.apache.hadoop.yarn.util.ConverterUtils;
54import org.apache.hadoop.yarn.util.Records;
5556import org.apache.log4j.Logger;
5758import java.io.IOException;
59import java.nio.ByteBuffer;
60import java.util.Iterator;
61import java.util.List;
62import java.util.Map;
63import java.util.concurrent.ConcurrentHashMap;
64import java.util.concurrent.ConcurrentMap;
65import java.util.concurrent.ExecutorService;
66import java.util.concurrent.Executors;
67import java.util.concurrent.atomic.AtomicInteger;
6869/**70 * The YARN Application Master for Giraph is launched when the GiraphYarnClient71 * successfully requests an execution container from the Resource Manager. The72 * Application Master is provided by Giraph to manage all requests for resources73 * (worker nodes, memory, jar files, job configuration metadata, etc.) that74 * Giraph will need to perform the job. When Giraph runs in a non-YARN context,75 * the role of the Application Master is played by Hadoop when it launches our76 * GraphMappers (worker/master task nodes) to run the job.77 */78publicclassGiraphApplicationMaster {
79/** Logger */80privatestaticfinal Logger LOG =
81 Logger.getLogger(GiraphApplicationMaster.class);
82/** Exit code for YARN containers that were manually killed/aborted */83privatestaticfinalint YARN_ABORT_EXIT_STATUS = -100;
84/** Exit code for successfully run YARN containers */85privatestaticfinalint YARN_SUCCESS_EXIT_STATUS = 0;
86/** millis to sleep between heartbeats during long loops */87privatestaticfinalint SLEEP_BETWEEN_HEARTBEATS_MSECS = 900;
88/** A reusable map of resources already in HDFS for each task to copy-to-local89 * env and use to launch each GiraphYarnTask. */90privatestatic Map<String, LocalResource> LOCAL_RESOURCES;
91/** Initialize the Configuration class with the resource file exported by92 * the YarnClient. We will need to export this resource to the tasks also.93 * Construct the HEARTBEAT to use to ping the RM about job progress/health.94 */95//TODO96/** For status update for clients - yet to be implemented\\97 * Hostname of the container98 */99private String appMasterHostname = "";
100/** Port on which the app master listens for status updates from clients*/101privateint appMasterRpcPort = 0;
102/** Tracking url to which app master publishes info for clients to monitor*/103private String appMasterTrackingUrl = "";
104105static {
106// pick up new conf XML file and populate it with stuff exported from client107 Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
108 }
109110/** GiraphApplicationMaster's application attempt id */111privatefinal ApplicationAttemptId appAttemptId;
112/**GiraphApplicationMaster container id. Leave me here, I'm very useful */113privatefinal ContainerId containerId;
114/** number of containers Giraph needs (conf.getMaxWorkers() + 1 master) */115privatefinalint containersToLaunch;
116/** MB of JVM heap per Giraph task container */117privatefinalint heapPerContainer;
118/** Giraph configuration for this job, transported here by YARN framework */119privatefinalImmutableClassesGiraphConfiguration giraphConf;
120/** Yarn configuration for this job*/121privatefinal YarnConfiguration yarnConf;
122/** Completed Containers Counter */123privatefinal AtomicInteger completedCount;
124/** Failed Containers Counter */125privatefinal AtomicInteger failedCount;
126/** Number of containers requested (hopefully '-w' from our conf) */127privatefinal AtomicInteger allocatedCount;
128/** Number of successfully completed containers in this job run. */129privatefinal AtomicInteger successfulCount;
130/** the ACK #'s for AllocateRequests + heartbeats == last response # */131private AtomicInteger lastResponseId;
132/** buffer tostore all tokens */133private ByteBuffer allTokens;
134/** Executor to attempt asynchronous launches of Giraph containers */135private ExecutorService executor;
136/** YARN progress is a <code>float</code> between 0.0f and 1.0f */137//Handle to communicate with the Resource Manager138 @SuppressWarnings("rawtypes")
139private AMRMClientAsync amRMClient;
140/** Handle to communicate with the Node Manager */141private NMClientAsync nmClientAsync;
142/** Listen to process the response from the Node Manager */143privateNMCallbackHandler containerListener;
144/** whether all containers finishe */145privatevolatileboolean done;
146147/**148 * Construct the GiraphAppMaster, populate fields using env vars149 * set up by YARN framework in this execution container.150 * @param cId the ContainerId151 * @param aId the ApplicationAttemptId152 */153protectedGiraphApplicationMaster(ContainerId cId, ApplicationAttemptId aId)
154throws IOException {
155 containerId = cId; // future good stuff will need me to operate.156 appAttemptId = aId;
157 lastResponseId = new AtomicInteger(0);
158 giraphConf =
159newImmutableClassesGiraphConfiguration(newGiraphConfiguration());
160 yarnConf = new YarnConfiguration(giraphConf);
161 completedCount = new AtomicInteger(0);
162 failedCount = new AtomicInteger(0);
163 allocatedCount = new AtomicInteger(0);
164 successfulCount = new AtomicInteger(0);
165 containersToLaunch = giraphConf.getMaxWorkers() + 1;
166 executor = Executors.newFixedThreadPool(containersToLaunch);
167 heapPerContainer = giraphConf.getYarnTaskHeapMb();
168 LOG.info("GiraphAM for ContainerId " + cId + " ApplicationAttemptId " +
169 aId);
170 }
171172/**173 * Coordinates all requests for Giraph's worker/master task containers, and174 * manages application liveness heartbeat, completion status, teardown, etc.175 * @return success or failure176 */177privateboolean run() throws YarnException, IOException {
178boolean success = false;
179try {
180 getAllTokens();
181 registerRMCallBackHandler();
182 registerNMCallbackHandler();
183 registerAMToRM();
184 madeAllContainerRequestToRM();
185 LOG.info("Wait to finish ..");
186while (!done) {
187try {
188 Thread.sleep(200);
189 } catch (InterruptedException ex) {
190 LOG.error(ex);
191//TODO:192 }
193 }
194 LOG.info("Done " + done);
195 } finally {
196// if we get here w/o problems, the executor is already long finished.197if (null != executor && !executor.isTerminated()) {
198 LOG.info("Forcefully terminating executors with done =:" + done);
199 executor.shutdownNow(); // force kill, especially if got here by throw200 }
201 success = finish();
202 }
203return success;
204 }
205206/**207 * Call when the application is done208 * @return if all containers succeed209 */210privateboolean finish() {
211// When the application completes, it should stop all running containers212 LOG.info("Application completed. Stopping running containers");
213 nmClientAsync.stop();
214215// When the application completes, it should send a finish application216// signal to the RM217 LOG.info("Application completed. Signalling finish to RM");
218 FinalApplicationStatus appStatus;
219 String appMessage = null;
220boolean success = true;
221if (failedCount.get() == 0 &&
222 completedCount.get() == containersToLaunch) {
223 appStatus = FinalApplicationStatus.SUCCEEDED;
224 } else {
225 appStatus = FinalApplicationStatus.FAILED;
226 appMessage = "Diagnostics." + ", total=" + containersToLaunch +
227", completed=" + completedCount.get() + ", failed=" +
228 failedCount.get();
229 success = false;
230 }
231try {
232 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
233 } catch (YarnException ex) {
234 LOG.error("Failed to unregister application", ex);
235 } catch (IOException e) {
236 LOG.error("Failed to unregister application", e);
237 }
238239 amRMClient.stop();
240return success;
241 }
242/**243 * Add all containers' request244 * @return245 */246privatevoid madeAllContainerRequestToRM() {
247// Setup ask for containers from RM248// Send request for containers to RM249// Until we get our fully allocated quota, we keep on polling RM for250// containers251// Keep looping until all the containers are launched and shell script252// executed on them ( regardless of success/failure).253for (int i = 0; i < containersToLaunch; ++i) {
254 ContainerRequest containerAsk = setupContainerAskForRM();
255 amRMClient.addContainerRequest(containerAsk);
256 }
257 }
258259/**260 * Setup the request that will be sent to the RM for the container ask.261 *262 * @return the setup ResourceRequest to be sent to RM263 */264private ContainerRequest setupContainerAskForRM() {
265// setup requirements for hosts266// using * as any host will do for the distributed shell app267// set the priority for the request268 Priority pri = Records.newRecord(Priority.class);
269// TODO - what is the range for priority? how to decide?270 pri.setPriority(GiraphConstants.GIRAPH_YARN_PRIORITY);
271272// Set up resource type requirements273// For now, only memory is supported so we set memory requirements274 Resource capability = Records.newRecord(Resource.class);
275 capability.setMemory(heapPerContainer);
276277 ContainerRequest request = new ContainerRequest(capability, null, null,
278 pri);
279 LOG.info("Requested container ask: " + request.toString());
280return request;
281 }
282283/**284 * Populate allTokens with the tokens received285 * @return286 */287privatevoid getAllTokens() throws IOException {
288 Credentials credentials = UserGroupInformation.getCurrentUser()
289 .getCredentials();
290 DataOutputBuffer dob = new DataOutputBuffer();
291 credentials.writeTokenStorageToStream(dob);
292// Now remove the AM->RM token so that containers cannot access it.293 Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
294while (iter.hasNext()) {
295 Token<?> token = iter.next();
296if (LOG.isDebugEnabled()) {
297 LOG.debug("Token type :" + token.getKind());
298 }
299if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
300 iter.remove();
301 }
302 }
303 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
304 }
305306/**307 * Register RM callback and start listening308 * @return309 */310privatevoid registerRMCallBackHandler() {
311 AMRMClientAsync.CallbackHandler allocListener = newRMCallbackHandler();
312 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000,
313 allocListener);
314 amRMClient.init(yarnConf);
315 amRMClient.start();
316 }
317318/**319 * Register NM callback and start listening320 * @return321 */322privatevoid registerNMCallbackHandler() {
323 containerListener = newNMCallbackHandler();
324 nmClientAsync = new NMClientAsyncImpl(containerListener);
325 nmClientAsync.init(yarnConf);
326 nmClientAsync.start();
327 }
328/**329 * Register AM to RM330 * @return AM register response331 */332private RegisterApplicationMasterResponse registerAMToRM()
333throws YarnException {
334// register Application Master with the YARN Resource Manager so we can335// begin requesting resources.336try {
337if (UserGroupInformation.isSecurityEnabled()) {
338 LOG.info("SECURITY ENABLED ");
339 }
340// TODO: provide actual call back details341 RegisterApplicationMasterResponse response = amRMClient
342 .registerApplicationMaster(appMasterHostname
343 , appMasterRpcPort, appMasterTrackingUrl);
344return response;
345 } catch (IOException ioe) {
346thrownew IllegalStateException(
347"GiraphApplicationMaster failed to register with RM.", ioe);
348 }
349 }
350351/**352 * For each container successfully allocated, attempt to set up and launch353 * a Giraph worker/master task.354 * @param allocatedContainers the containers we have currently allocated.355 */356privatevoid startContainerLaunchingThreads(final List<Container>
357 allocatedContainers) {
358for (Container allocatedContainer : allocatedContainers) {
359 LOG.info("Launching command on a new container." +
360", containerId=" + allocatedContainer.getId() +
361", containerNode=" + allocatedContainer.getNodeId().getHost() +
362":" + allocatedContainer.getNodeId().getPort() +
363", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() +
364", containerResourceMemory=" +
365 allocatedContainer.getResource().getMemory());
366// Launch and start the container on a separate thread to keep the main367// thread unblocked as all containers may not be allocated at one go.368LaunchContainerRunnable runnableLaunchContainer =
369newLaunchContainerRunnable(allocatedContainer, containerListener);
370 executor.execute(runnableLaunchContainer);
371 }
372 }
373374/**375 * Lazily compose the map of jar and file names to LocalResource records for376 * inclusion in GiraphYarnTask container requests. Can re-use the same map377 * as Giraph tasks need identical HDFS-based resources (jars etc.) to run.378 * @return the resource map for a ContainerLaunchContext379 */380privatesynchronized Map<String, LocalResource> getTaskResourceMap() {
381// Set the local resources: just send the copies already in HDFS382if (null == LOCAL_RESOURCES) {
383 LOCAL_RESOURCES = Maps.newHashMap();
384try {
385// if you have to update the giraphConf for export to tasks, do it now386 updateGiraphConfForExport();
387 YarnUtils.addFsResourcesToMap(LOCAL_RESOURCES, giraphConf,
388 appAttemptId.getApplicationId());
389 } catch (IOException ioe) {
390// fail fast, this container will never launch.391thrownew IllegalStateException("Could not configure the container" +
392"launch context for GiraphYarnTasks.", ioe);
393 }
394 }
395// else, return the prepopulated copy to reuse for each GiraphYarkTask396return LOCAL_RESOURCES;
397 }
398399/**400 * If you're going to make ANY CHANGES to your local GiraphConfiguration401 * while running the GiraphApplicationMaster, put them here.402 * This method replaces the current XML file GiraphConfiguration403 * stored in HDFS with the copy you have modified locally in-memory.404 */405privatevoid updateGiraphConfForExport()
406throws IOException {
407// Giraph expects this MapReduce stuff408 giraphConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
409 appAttemptId.getAttemptId());
410// now republish the giraph-conf.xml in HDFS411 YarnUtils.exportGiraphConfiguration(giraphConf,
412 appAttemptId.getApplicationId());
413 }
414415/**416 * Application entry point417 * @param args command-line args (set by GiraphYarnClient, if any)418 */419publicstaticvoid main(final String[] args) {
420boolean result = false;
421 LOG.info("Starting GiraphAM ");
422 String containerIdString = System.getenv().get(
423 Environment.CONTAINER_ID.name());
424if (containerIdString == null) {
425// container id should always be set in the env by the framework426thrownew IllegalArgumentException("ContainerId not found in env vars.");
427 }
428 ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
429 ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
430try {
431GiraphApplicationMaster giraphAppMaster =
432newGiraphApplicationMaster(containerId, appAttemptId);
433 result = giraphAppMaster.run();
434// CHECKSTYLE: stop IllegalCatch435 } catch (Throwable t) {
436// CHECKSTYLE: resume IllegalCatch437 LOG.error("GiraphApplicationMaster caught a " +
438"top-level exception in main.", t);
439 System.exit(1);
440 }
441if (result) {
442 LOG.info("Giraph Application Master completed successfully. exiting");
443 System.exit(0);
444 } else {
445 LOG.info("Giraph Application Master failed. exiting");
446 System.exit(2);
447 }
448 }
449450/**451 * Thread to connect to the {@link ContainerManager} and launch the container452 * that will house one of our Giraph worker (or master) tasks.453 */454privateclassLaunchContainerRunnableimplements Runnable {
455/** Allocated container */456private Container container;
457/** NM listener */458privateNMCallbackHandler containerListener;
459460/**461 * Constructor.462 * @param newGiraphTaskContainer Allocated container463 * @param containerListener container listener.464 */465publicLaunchContainerRunnable(final Container newGiraphTaskContainer,
466NMCallbackHandler containerListener) {
467this.container = newGiraphTaskContainer;
468this.containerListener = containerListener;
469 }
470471/**472 * Connects to CM, sets up container launch context473 * for shell command and eventually dispatches the container474 * start request to the CM.475 */476publicvoid run() {
477// Connect to ContainerManager478// configure the launcher for the Giraph task it will host479 ContainerLaunchContext ctx = buildContainerLaunchContext();
480// request CM to start this container as spec'd in ContainerLaunchContext481 containerListener.addContainer(container.getId(), container);
482 nmClientAsync.startContainerAsync(container, ctx);
483 }
484485/**486 * Boilerplate to set up the ContainerLaunchContext to tell the Container487 * Manager how to launch our Giraph task in the execution container we have488 * already allocated.489 * @return a populated ContainerLaunchContext object.490 */491private ContainerLaunchContext buildContainerLaunchContext() {
492 LOG.info("Setting up container launch container for containerid=" +
493 container.getId());
494 ContainerLaunchContext launchContext = Records
495 .newRecord(ContainerLaunchContext.class);
496// args inject the CLASSPATH, heap MB, and TaskAttemptID for launched task497final List<String> commands = generateShellExecCommand();
498 LOG.info("Conatain launch Commands :" + commands.get(0));
499 launchContext.setCommands(commands);
500// Set up tokens for the container too. We are501// populating them mainly for NodeManagers to be able to download any502// files in the distributed file-system. The tokens are otherwise also503// useful in cases, for e.g., when one is running a504// "hadoop dfs" like command505 launchContext.setTokens(allTokens.slice());
506507// add user information to the job508 String jobUserName = "ERROR_UNKNOWN_USER";
509 UserGroupInformation ugi = null;
510try {
511 ugi = UserGroupInformation.getCurrentUser();
512 jobUserName = ugi.getUserName();
513 } catch (IOException ioe) {
514 jobUserName =
515 System.getenv(ApplicationConstants.Environment.USER.name());
516 }
517//launchContext.setUser(jobUserName);518 LOG.info("Setting username in ContainerLaunchContext to: " + jobUserName);
519// Set the environment variables to inject into remote task's container520 buildEnvironment(launchContext);
521// Set the local resources: just send the copies already in HDFS522 launchContext.setLocalResources(getTaskResourceMap());
523return launchContext;
524 }
525526/**527 * Generates our command line string used to launch our Giraph tasks.528 * @return the BASH shell commands to launch the job.529 */530private List<String> generateShellExecCommand() {
531return ImmutableList.of("java " +
532"-Xmx" + heapPerContainer + "M " +
533"-Xms" + heapPerContainer + "M " +
534"-cp .:${CLASSPATH} " +
535"org.apache.giraph.yarn.GiraphYarnTask " +
536 appAttemptId.getApplicationId().getClusterTimestamp() + " " +
537 appAttemptId.getApplicationId().getId() + " " +
538 container.getId().getId() + " " +
539 appAttemptId.getAttemptId() + " " +
540"1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
541"/task-" + container.getId().getId() + "-stdout.log " +
542"2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
543"/task-" + container.getId().getId() + "-stderr.log "544 );
545 }
546547/**548 * Utility to populate the environment vars we wish to inject into the new549 * containter's env when the Giraph BSP task is executed.550 * @param launchContext the launch context which will set our environment551 * vars in the app master's execution container.552 */553privatevoid buildEnvironment(final ContainerLaunchContext launchContext) {
554 Map<String, String> classPathForEnv = Maps.<String, String>newHashMap();
555// pick up the local classpath so when we instantiate a Configuration556// remotely, we also get the "mapred-site.xml" and "yarn-site.xml"557 YarnUtils.addLocalClasspathToEnv(classPathForEnv, giraphConf);
558// set this map of env vars into the launch context.559 launchContext.setEnvironment(classPathForEnv);
560 }
561 }
562563/**564 * CallbackHandler to process RM async calls565 */566privateclassRMCallbackHandlerimplements AMRMClientAsync.CallbackHandler {
567 @SuppressWarnings("unchecked")
568 @Override
569publicvoid onContainersCompleted(List<ContainerStatus>
570 completedContainers) {
571 LOG.info("Got response from RM for container ask, completedCnt=" +
572 completedContainers.size());
573for (ContainerStatus containerStatus : completedContainers) {
574 LOG.info("Got container status for containerID=" +
575 containerStatus.getContainerId() + ", state=" +
576 containerStatus.getState() + ", exitStatus=" +
577 containerStatus.getExitStatus() + ", diagnostics=" +
578 containerStatus.getDiagnostics());
579switch (containerStatus.getExitStatus()) {
580case YARN_SUCCESS_EXIT_STATUS:
581 successfulCount.incrementAndGet();
582break;
583case YARN_ABORT_EXIT_STATUS:
584break; // not success or fail585default:
586 failedCount.incrementAndGet();
587break;
588 }
589 completedCount.incrementAndGet();
590 }
591592if (completedCount.get() == containersToLaunch) {
593 done = true;
594 LOG.info("All container compeleted. done = " + done);
595 } else {
596 LOG.info("After completion of one conatiner. current status is:" +
597" completedCount :" + completedCount.get() +
598" containersToLaunch :" + containersToLaunch +
599" successfulCount :" + successfulCount.get() +
600" failedCount :" + failedCount.get());
601 }
602 }
603 @Override
604publicvoid onContainersAllocated(List<Container> allocatedContainers) {
605 LOG.info("Got response from RM for container ask, allocatedCnt=" +
606 allocatedContainers.size());
607 allocatedCount.addAndGet(allocatedContainers.size());
608 LOG.info("Total allocated # of container so far : " +
609 allocatedCount.get() +
610" allocated out of " + containersToLaunch + " required.");
611 startContainerLaunchingThreads(allocatedContainers);
612 }
613614 @Override
615publicvoid onShutdownRequest() {
616 done = true;
617 }
618619 @Override
620publicvoid onNodesUpdated(List<NodeReport> updatedNodes) {
621 }
622623 @Override
624publicfloat getProgress() {
625// set progress to deliver to RM on next heartbeat626float progress = (float) completedCount.get() /
627 containersToLaunch;
628return progress;
629 }
630631 @Override
632publicvoid onError(Throwable e) {
633 done = true;
634 amRMClient.stop();
635 }
636 }
637638/**639 * CallbackHandler to process NM async calls640 */641privateclassNMCallbackHandlerimplements NMClientAsync.CallbackHandler {
642/** List of containers */643private ConcurrentMap<ContainerId, Container> containers =
644new ConcurrentHashMap<ContainerId, Container>();
645646/**647 * Add a container648 * @param containerId id of container649 * @param container container object650 * @return651 */652publicvoid addContainer(ContainerId containerId, Container container) {
653 containers.putIfAbsent(containerId, container);
654 }
655656 @Override
657publicvoid onContainerStopped(ContainerId containerId) {
658if (LOG.isDebugEnabled()) {
659 LOG.debug("Succeeded to stop Container " + containerId);
660 }
661 containers.remove(containerId);
662 }
663664 @Override
665publicvoid onContainerStatusReceived(ContainerId containerId,
666 ContainerStatus containerStatus) {
667if (LOG.isDebugEnabled()) {
668 LOG.debug("Container Status: id=" + containerId + ", status=" +
669 containerStatus);
670 }
671 }
672673 @Override
674publicvoid onContainerStarted(ContainerId containerId,
675 Map<String, ByteBuffer> allServiceResponse) {
676if (LOG.isDebugEnabled()) {
677 LOG.debug("Succeeded to start Container " + containerId);
678 }
679 Container container = containers.get(containerId);
680if (container != null) {
681 nmClientAsync.getContainerStatusAsync(containerId,
682 container.getNodeId());
683 }
684 }
685686 @Override
687publicvoid onStartContainerError(ContainerId containerId, Throwable t) {
688 LOG.error("Failed to start Container " + containerId, t);
689 containers.remove(containerId);
690 }
691692 @Override
693publicvoid onGetContainerStatusError(
694 ContainerId containerId, Throwable t) {
695 LOG.error("Failed to query the status of Container " + containerId, t);
696 }
697698 @Override
699publicvoid onStopContainerError(ContainerId containerId, Throwable t) {
700 LOG.error("Failed to stop Container " + containerId);
701 containers.remove(containerId);
702 }
703 }
704 }