1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.yarn;
1920import com.google.common.collect.Sets;
21import java.io.FileOutputStream;
22import java.util.Set;
23import org.apache.giraph.conf.GiraphConfiguration;
24import org.apache.giraph.conf.GiraphConstants;
25import org.apache.hadoop.fs.FileStatus;
26import org.apache.hadoop.fs.FileSystem;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.mapreduce.MRJobConfig;
29import org.apache.hadoop.yarn.api.records.ApplicationId;
30import org.apache.hadoop.yarn.api.records.LocalResource;
31import org.apache.hadoop.yarn.api.records.LocalResourceType;
32import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
33import org.apache.hadoop.yarn.conf.YarnConfiguration;
34import org.apache.hadoop.yarn.util.ConverterUtils;
35import org.apache.hadoop.yarn.util.Records;
36import org.apache.hadoop.util.StringUtils;
37import java.io.File;
38import java.io.IOException;
39import java.util.Map;
40import org.apache.log4j.Logger;
4142/**43 * Utilities that can only compile with versions of Hadoop that support YARN,44 * so they live here instead of o.a.g.utils package.45 */46publicclassYarnUtils {
47/** Class Logger */48privatestaticfinal Logger LOG = Logger.getLogger(YarnUtils.class);
49/** Default dir on HDFS (or equivalent) where LocalResources are stored */50privatestaticfinal String HDFS_RESOURCE_DIR = "giraph_yarn_jar_cache";
5152/** Private constructor, this is a utility class only */53privateYarnUtils() { /* no-op */ }5455/**56 * Populates the LocalResources list with the HDFS paths listed in57 * the conf under GiraphConstants.GIRAPH_YARN_LIBJARS, and the58 * GiraphConfiguration for this job. Also adds the Giraph default application59 * jar as determined by GiraphYarnClient.GIRAPH_CLIENT_JAR constant.60 * @param map the LocalResources list to populate.61 * @param giraphConf the configuration to use to select jars to include.62 * @param appId the ApplicationId, naming the the HDFS base dir for job jars.63 */64publicstaticvoid addFsResourcesToMap(Map<String, LocalResource> map,
65GiraphConfiguration giraphConf, ApplicationId appId) throws IOException {
66 FileSystem fs = FileSystem.get(giraphConf);
67 Path baseDir = YarnUtils.getFsCachePath(fs, appId);
68boolean coreJarFound = false;
69for (String fileName : giraphConf.getYarnLibJars().split(",")) {
70if (fileName.length() > 0) {
71 Path filePath = new Path(baseDir, fileName);
72 LOG.info("Adding " + fileName + " to LocalResources for export.to " +
73 filePath);
74if (fileName.contains("giraph-core")) {
75 coreJarFound = true;
76 }
77 addFileToResourceMap(map, fs, filePath);
78 }
79 }
80if (!coreJarFound) { // OK if you are running giraph-examples-jar-with-deps81 LOG.warn("Job jars (-yj option) didn't include giraph-core.");
82 }
83 Path confPath = new Path(baseDir, GiraphConstants.GIRAPH_YARN_CONF_FILE);
84 addFileToResourceMap(map, fs, confPath);
85 }
8687/**88 * Utility function to locate local JAR files and other resources89 * recursively in the dirs on the local CLASSPATH. Once all the files90 * named in <code>fileNames</code> are found, we stop and return the results.91 * @param fileNames the file name of the jars, without path information.92 * @return a set of Paths to the jar files requested in fileNames.93 */94publicstatic Set<Path> getLocalFiles(final Set<String> fileNames) {
95 Set<Path> jarPaths = Sets.newHashSet();
96 String classPath = ".:" + System.getenv("HADOOP_HOME");
97if (classPath.length() > 2) {
98 classPath += ":";
99 }
100 classPath += System.getenv("CLASSPATH");
101for (String baseDir : classPath.split(":")) {
102 LOG.info("Class path name " + baseDir);
103if (baseDir.length() > 0) {
104// lose the globbing chars that will fail in File#listFiles105finalint lastFileSep = baseDir.lastIndexOf("/");
106if (lastFileSep > 0) {
107 String test = baseDir.substring(lastFileSep);
108if (test.contains("*")) {
109 baseDir = baseDir.substring(0, lastFileSep);
110 }
111 }
112 LOG.info("base path checking " + baseDir);
113 populateJarList(new File(baseDir), jarPaths, fileNames);
114 }
115if (jarPaths.size() >= fileNames.size()) {
116break; // found a resource for each name in the input set, all done117 }
118 }
119return jarPaths;
120 }
121122/**123 * Start in the working directory and recursively locate all jars.124 * @param dir current directory to explore.125 * @param fileSet the list to populate.126 * @param fileNames file names to locate.127 */128privatestaticvoid populateJarList(final File dir,
129final Set<Path> fileSet, final Set<String> fileNames) {
130 File[] filesInThisDir = dir.listFiles();
131if (null == filesInThisDir) {
132return;
133 }
134for (File f : dir.listFiles()) {
135if (f.isDirectory()) {
136 populateJarList(f, fileSet, fileNames);
137 } elseif (f.isFile() && fileNames.contains(f.getName())) {
138 fileSet.add(new Path(f.getAbsolutePath()));
139 }
140 }
141 }
142143/**144 * Boilerplate to add a file to the local resources..145 * @param localResources the LocalResources map to populate.146 * @param fs handle to the HDFS file system.147 * @param target the file to send to the remote container.148 */149publicstaticvoid addFileToResourceMap(Map<String, LocalResource>
150 localResources, FileSystem fs, Path target)
151throws IOException {
152 LocalResource resource = Records.newRecord(LocalResource.class);
153 FileStatus destStatus = fs.getFileStatus(target);
154 resource.setResource(ConverterUtils.getYarnUrlFromURI(target.toUri()));
155 resource.setSize(destStatus.getLen());
156 resource.setTimestamp(destStatus.getModificationTime());
157 resource.setType(LocalResourceType.FILE); // use FILE, even for jars!158 resource.setVisibility(LocalResourceVisibility.APPLICATION);
159 localResources.put(target.getName(), resource);
160 LOG.info("Registered file in LocalResources :: " + target);
161 }
162163/**164 * Get the base HDFS dir we will be storing our LocalResources in.165 * @param fs the file system.166 * @param appId the ApplicationId under which our resources will be stored.167 * @return the path168 */169publicstatic Path getFsCachePath(final FileSystem fs,
170final ApplicationId appId) {
171returnnew Path(fs.getHomeDirectory(), HDFS_RESOURCE_DIR + "/" + appId);
172 }
173174/**175 * Popuate the environment string map to be added to the environment vars176 * in a remote execution container. Adds the local classpath to pick up177 * "yarn-site.xml" and "mapred-site.xml" stuff.178 * @param env the map of env var values.179 * @param giraphConf the GiraphConfiguration to pull values from.180 */181publicstaticvoid addLocalClasspathToEnv(final Map<String, String> env,
182finalGiraphConfiguration giraphConf) {
183 StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
184for (String cpEntry : giraphConf.getStrings(
185 YarnConfiguration.YARN_APPLICATION_CLASSPATH,
186 YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
187 classPathEnv.append(':').append(cpEntry.trim()); //TODO: Separator188 }
189for (String cpEntry : giraphConf.getStrings(
190 MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
191 StringUtils.getStrings(
192 MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) {
193 classPathEnv.append(':').append(cpEntry.trim());
194 }
195// add the runtime classpath needed for tests to work196if (giraphConf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
197 classPathEnv.append(':').append(System.getenv("CLASSPATH"));
198 }
199 env.put("CLASSPATH", classPathEnv.toString());
200 }
201202/**203 * Populate the LocalResources list with the GiraphConf XML file's HDFS path.204 * @param giraphConf the GiraphConfifuration to export for worker tasks.205 * @param appId the ApplicationId for this YARN app.206 * @param localResourceMap the LocalResource map of files to export to tasks.207 */208publicstaticvoid addGiraphConfToLocalResourceMap(GiraphConfiguration209 giraphConf, ApplicationId appId, Map<String, LocalResource>
210 localResourceMap) throws IOException {
211 FileSystem fs = FileSystem.get(giraphConf);
212 Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
213 GiraphConstants.GIRAPH_YARN_CONF_FILE);
214 YarnUtils.addFileToResourceMap(localResourceMap, fs, hdfsConfPath);
215 }
216217/**218 * Export our populated GiraphConfiguration as an XML file to be used by the219 * ApplicationMaster's exec container, and register it with LocalResources.220 * @param giraphConf the current Configuration object to be published.221 * @param appId the ApplicationId to stamp this app's base HDFS resources dir.222 */223publicstaticvoid exportGiraphConfiguration(GiraphConfiguration giraphConf,
224 ApplicationId appId) throws IOException {
225 File confFile = new File(System.getProperty("java.io.tmpdir"),
226 GiraphConstants.GIRAPH_YARN_CONF_FILE);
227if (confFile.exists()) {
228if (!confFile.delete()) {
229 LOG.warn("Unable to delete file " + confFile);
230 }
231 }
232 String localConfPath = confFile.getAbsolutePath();
233 FileOutputStream fos = null;
234try {
235 fos = new FileOutputStream(localConfPath);
236 giraphConf.writeXml(fos);
237 FileSystem fs = FileSystem.get(giraphConf);
238 Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
239 GiraphConstants.GIRAPH_YARN_CONF_FILE);
240 fos.flush();
241 fs.copyFromLocalFile(false, true, new Path(localConfPath), hdfsConfPath);
242 } finally {
243if (null != fos) {
244 fos.close();
245 }
246 }
247 }
248 }