001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.security;
020
021import java.io.IOException;
022import java.util.HashSet;
023import java.util.Set;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.io.Text;
033import org.apache.hadoop.mapred.JobConf;
034import org.apache.hadoop.mapred.Master;
035import org.apache.hadoop.mapreduce.MRJobConfig;
036import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
037import org.apache.hadoop.security.Credentials;
038import org.apache.hadoop.security.UserGroupInformation;
039import org.apache.hadoop.security.token.Token;
040import org.apache.hadoop.security.token.TokenIdentifier;
041
042
043/**
044 * This class provides user facing APIs for transferring secrets from
045 * the job client to the tasks.
046 * The secrets can be stored just before submission of jobs and read during
047 * the task execution.  
048 */
049@InterfaceAudience.Public
050@InterfaceStability.Evolving
051public class TokenCache {
052  
053  private static final Log LOG = LogFactory.getLog(TokenCache.class);
054
055  
056  /**
057   * auxiliary method to get user's secret keys..
058   * @param alias
059   * @return secret key from the storage
060   */
061  public static byte[] getSecretKey(Credentials credentials, Text alias) {
062    if(credentials == null)
063      return null;
064    return credentials.getSecretKey(alias);
065  }
066  
067  /**
068   * Convenience method to obtain delegation tokens from namenodes 
069   * corresponding to the paths passed.
070   * @param credentials
071   * @param ps array of paths
072   * @param conf configuration
073   * @throws IOException
074   */
075  public static void obtainTokensForNamenodes(Credentials credentials,
076      Path[] ps, Configuration conf) throws IOException {
077    if (!UserGroupInformation.isSecurityEnabled()) {
078      return;
079    }
080    obtainTokensForNamenodesInternal(credentials, ps, conf);
081  }
082
083  /**
084   * Remove jobtoken referrals which don't make sense in the context
085   * of the task execution.
086   *
087   * @param conf
088   */
089  public static void cleanUpTokenReferral(Configuration conf) {
090    conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
091  }
092
093  static void obtainTokensForNamenodesInternal(Credentials credentials,
094      Path[] ps, Configuration conf) throws IOException {
095    Set<FileSystem> fsSet = new HashSet<FileSystem>();
096    for(Path p: ps) {
097      fsSet.add(p.getFileSystem(conf));
098    }
099    for (FileSystem fs : fsSet) {
100      obtainTokensForNamenodesInternal(fs, credentials, conf);
101    }
102  }
103
104  static boolean isTokenRenewalExcluded(FileSystem fs, Configuration conf) {
105    String [] nns =
106        conf.getStrings(MRJobConfig.JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE);
107    if (nns != null) {
108      String host = fs.getUri().getHost();
109      for(int i=0; i< nns.length; i++) {
110        if (nns[i].equals(host)) {
111          return true;
112        }
113      }
114    }
115    return false;
116  }
117
118  /**
119   * get delegation token for a specific FS
120   * @param fs
121   * @param credentials
122   * @param p
123   * @param conf
124   * @throws IOException
125   */
126  static void obtainTokensForNamenodesInternal(FileSystem fs, 
127      Credentials credentials, Configuration conf) throws IOException {
128    // RM skips renewing token with empty renewer
129    String delegTokenRenewer = "";
130    if (!isTokenRenewalExcluded(fs, conf)) {
131      delegTokenRenewer = Master.getMasterPrincipal(conf);
132      if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
133        throw new IOException(
134            "Can't get Master Kerberos principal for use as renewer");
135      }
136    }
137
138    mergeBinaryTokens(credentials, conf);
139
140    final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
141                                                     credentials);
142    if (tokens != null) {
143      for (Token<?> token : tokens) {
144        LOG.info("Got dt for " + fs.getUri() + "; "+token);
145      }
146    }
147  }
148
149  private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
150    String binaryTokenFilename =
151        conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
152    if (binaryTokenFilename != null) {
153      Credentials binary;
154      try {
155        binary = Credentials.readTokenStorageFile(
156            FileSystem.getLocal(conf).makeQualified(
157                new Path(binaryTokenFilename)),
158            conf);
159      } catch (IOException e) {
160        throw new RuntimeException(e);
161      }
162      // supplement existing tokens with the tokens in the binary file
163      creds.mergeAll(binary);
164    }
165  }
166  
167  /**
168   * file name used on HDFS for generated job token
169   */
170  @InterfaceAudience.Private
171  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
172
173  /**
174   * conf setting for job tokens cache file name
175   */
176  @InterfaceAudience.Private
177  public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
178  private static final Text JOB_TOKEN = new Text("JobToken");
179  private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
180  private static final Text ENC_SPILL_KEY = new Text("MapReduceEncryptedSpillKey");
181  
182  /**
183   * load job token from a file
184   * @deprecated Use {@link Credentials#readTokenStorageFile} instead,
185   * this method is included for compatibility against Hadoop-1.
186   * @param conf
187   * @throws IOException
188   */
189  @InterfaceAudience.Private
190  @Deprecated
191  public static Credentials loadTokens(String jobTokenFile, JobConf conf)
192  throws IOException {
193    Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
194
195    Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
196
197    if(LOG.isDebugEnabled()) {
198      LOG.debug("Task: Loaded jobTokenFile from: "+
199          localJobTokenFile.toUri().getPath() 
200          +"; num of sec keys  = " + ts.numberOfSecretKeys() +
201          " Number of tokens " +  ts.numberOfTokens());
202    }
203    return ts;
204  }
205  
206  /**
207   * load job token from a file
208   * @deprecated Use {@link Credentials#readTokenStorageFile} instead,
209   * this method is included for compatibility against Hadoop-1.
210   * @param conf
211   * @throws IOException
212   */
213  @InterfaceAudience.Private
214  @Deprecated
215  public static Credentials loadTokens(String jobTokenFile, Configuration conf)
216      throws IOException {
217    return loadTokens(jobTokenFile, new JobConf(conf));
218  }
219  
220  /**
221   * store job token
222   * @param t
223   */
224  @InterfaceAudience.Private
225  public static void setJobToken(Token<? extends TokenIdentifier> t, 
226      Credentials credentials) {
227    credentials.addToken(JOB_TOKEN, t);
228  }
229  /**
230   * 
231   * @return job token
232   */
233  @SuppressWarnings("unchecked")
234  @InterfaceAudience.Private
235  public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
236    return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
237  }
238
239  @InterfaceAudience.Private
240  public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
241    credentials.addSecretKey(SHUFFLE_TOKEN, key);
242  }
243
244  @InterfaceAudience.Private
245  public static byte[] getShuffleSecretKey(Credentials credentials) {
246    return getSecretKey(credentials, SHUFFLE_TOKEN);
247  }
248
249  @InterfaceAudience.Private
250  public static void setEncryptedSpillKey(byte[] key, Credentials credentials) {
251    credentials.addSecretKey(ENC_SPILL_KEY, key);
252  }
253
254  @InterfaceAudience.Private
255  public static byte[] getEncryptedSpillKey(Credentials credentials) {
256    return getSecretKey(credentials, ENC_SPILL_KEY);
257  }
258  /**
259   * @deprecated Use {@link Credentials#getToken(org.apache.hadoop.io.Text)}
260   * instead, this method is included for compatibility against Hadoop-1
261   * @param namenode
262   * @return delegation token
263   */
264  @InterfaceAudience.Private
265  @Deprecated
266  public static
267      Token<?> getDelegationToken(
268          Credentials credentials, String namenode) {
269    return (Token<?>) credentials.getToken(new Text(
270      namenode));
271  }
272}