8000 YARN-11158. Support getDelegationToken, renewDelegationToken, cancelDelegationToken API's for Federation by slfan1989 · Pull Request #4595 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

YARN-11158. Support getDelegationToken, renewDelegationToken, cancelDelegationToken API's for Federation #4595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected SecretKey generateSecret() {

/**
* Compute HMAC of the identifier using the secret key and return the
* output as password
* output as password.
* @param identifier the bytes of the identifier
* @param key the secret key
* @return the bytes of the generated password
Expand All @@ -180,7 +180,7 @@ public static byte[] createPassword(byte[] identifier,
}

/**
* Convert the byte[] to a secret key
* Convert the byte[] to a secret key.
* @param key the byte[] to create a secret key from
* @return the secret key
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message CredentialsProto {

message GetDelegationTokenRequestProto {
required string renewer = 1;
optional string clusterId = 2 [default = ""];
}

message GetDelegationTokenResponseProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,11 @@
<name>nfs3.mountd.port</name>
<value>4272</value>
</property>

<property>
<name>hadoop.zk.address</name>
<value>127.0.0.1:2181</value>
<description>Host:Port of the ZooKeeper server to be used.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -4107,6 +4107,10 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_ROUTER_WEBAPP_READ_TIMEOUT =
TimeUnit.SECONDS.toMillis(30);

public static final String ROUTER_KEYTAB = ROUTER_PREFIX + "keytab";

public static final String ROUTER_PRINCIPAL = ROUTER_PREFIX + "principal";

////////////////////////////////
// CSI Volume configs
////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,16 @@ protected <T> T createRMProxy(Class<T> protocol, Configuration config,
protected Token<AMRMTokenIdentifier> initializeUnmanagedAM(
ApplicationId appId) throws IOException, YarnException {
try {
UserGroupInformation appSubmitter =
UserGroupInformation.createRemoteUser(this.submitter);
UserGroupInformation appSubmitter;

if (UserGroupInformation.isSecurityEnabled()) {
appSubmitter =
UserGroupInformation.createProxyUser(this.submitter,
UserGroupInformation.getLoginUser());
} else {
appSubmitter =
UserGroupInformation.createRemoteUser(this.submitter);
}
this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
appSubmitter, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ShutdownHookManager;
Expand Down Expand Up @@ -88,7 +89,8 @@ public Router() {
}

protected void doSecureLogin() throws IOException {
// TODO YARN-6539 Create SecureLogin inside Router
SecurityUtil.login(this.conf,
YarnConfiguration.ROUTER_KEYTAB, YarnConfiguration.ROUTER_PRINCIPAL);
}

@Override
Expand Down Expand Up @@ -140,12 +142,7 @@ protected void serviceStop() throws Exception {
}

protected void shutDown() {
new Thread() {
@Override
public void run() {
Router.this.stop();
}
}.start();
new Thread(() -> Router.this.stop()).start();
}

protected RouterClientRMService createClientRMProxyService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,11 @@ public ClientRequestInterceptor getNextInterceptor() {
private void setupUser(String userName) {

try {
// Do not create a proxy user if user name matches the user name on
// Do not create a proxy user if userName matches the userName on
// current UGI
if (userName.equalsIgnoreCase(
if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(
UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -1563,4 +1558,15 @@ protected SubClusterId getApplicationHomeSubCluster(
String.format("Can't Found applicationId = %s in any sub clusters", applicationId);
throw new YarnException(errorMsg);
}

private boolean isAllowedDelegationTokenOp() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
return EnumSet.of(UserGroupInformation.AuthenticationMethod.KERBEROS,
UserGroupInformation.AuthenticationMethod.KERBEROS_SSL,
UserGroupInformation.AuthenticationMethod.CERTIFICATE)
.contains(UserGroupInformation.getCurrentUser().getRealAuthenticationMethod());
} else {
return Boolean.TRUE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
Expand Down Expand Up @@ -110,6 +111,9 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.router.security.DelegationTokenFetcher;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.security.ZKDelegationTokenFetcher;
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.slf4j.Logger;
Expand All @@ -135,23 +139,46 @@ public class RouterClientRMService extends AbstractService

private Server server;
private InetSocketAddress listenerEndpoint;
private RouterDelegationTokenSecretManager routerDTSecretManager;
private DelegationTokenFetcher tokenFetcher;

// For each user we store an interceptors' pipeline.
// For performance issue we use LRU cache to keep in memory the newest ones
// and remove the oldest used ones.
private Map<String, RequestInterceptorChainWrapper> userPipelineMap;

private ZKCuratorManager zkManager;

public RouterClientRMService() {
super(RouterClientRMService.class.getName());
}

@Override
protected void serviceInit(Configuration conf) throws Exception {
long secretKeyInterval = conf.getLong(YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
long tokenMaxLifetime = conf.getLong(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
long tokenRenewInterval = conf.getLong(YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
this.routerDTSecretManager = new RouterDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000);
zkManager = new ZKCuratorManager(conf);
zkManager.start();
this.tokenFetcher = new ZKDelegationTokenFetcher(conf, zkManager, routerDTSecretManager);
super.serviceInit(conf);
}

@Override
protected void serviceStart() throws Exception {
LOG.info("Starting Router ClientRMService");
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation.setConfiguration(conf);

this.tokenFetcher.start();
this.routerDTSecretManager.startThreads();

this.listenerEndpoint =
conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
Expand All @@ -161,9 +188,7 @@ protected void serviceStart() throws Exception {
int maxCacheSize =
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
this.userPipelineMap = Collections.synchronizedMap(
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
maxCacheSize, true));
this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));

Configuration serverConf = new Configuration(conf);

Expand All @@ -172,7 +197,7 @@ protected void serviceStart() throws Exception {
YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT);

this.server = rpc.getServer(ApplicationClientProtocol.class, this,
listenerEndpoint, serverConf, null, numWorkerThreads);
listenerEndpoint, serverConf, routerDTSecretManager, numWorkerThreads);

// Enable service authorization?
if (conf.getBoolean(
Expand All @@ -181,8 +206,7 @@ protected void serviceStart() throws Exception {
}

this.server.start();
LOG.info("Router ClientRMService listening on address: "
+ this.server.getListenerAddress());
LOG.info("Router ClientRMService listening on address: {}.", this.server.getListenerAddress());
super.serviceStart();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.security;

import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;

import java.io.IOException;

public abstract class DelegationTokenFetcher {
private RouterDelegationTokenSecretManager secretManager;

public abstract void start() throws Exception;

public DelegationTokenFetcher(RouterDelegationTokenSecretManager secretManager) {
this.secretManager = secretManager;
}

protected void updateToken(RMDelegationTokenIdentifier identifier, long renewDate)
throws IOException {
secretManager.addPersistedDelegationToken(identifier, renewDate);
}

protected void removeToken(Token<RMDelegationTokenIdentifier> token, String user)
throws IOException {
secretManager.cancelToken(token, user);
}

protected void updateMasterKey(DelegationKey key) throws IOException {
secretManager.addKey(key);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.security;

import org.apache.hadoop.security.token.delegation.DelegationKey;

public class RouterDelegationKey extends DelegationKey {
private String clusterId;
public RouterDelegationKey(String subClusterId, DelegationKey key) {
super(key.getKeyId(), key.getExpiryDate(), key.getEncodedKey());
this.clusterId = subClusterId;
}

public String getClusterId() {
return clusterId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.security;

import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class RouterDelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> {

private static final Logger LOG =
LoggerFactory.getLogger(RouterDelegationTokenSecretManager.class);

public RouterDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
}

@Override
public RMDelegationTokenIdentifier createIdentifier() {
return new RMDelegationTokenIdentifier();
}

@Override
public synchronized void addKey(DelegationKey key) throws IOException {
this.allKeys.put(key.getKeyId(), key);
}

@Override
protected void removeStoredMasterKey(DelegationKey key) {
super.removeStoredMasterKey(key);
}

@Override
public synchronized void addPersistedDelegationToken(
RMDelegationTokenIdentifier identifier, long renewDate) throws IOException {
int keyId = identifier.getMasterKeyId();
DelegationKey dKey = this.allKeys.get(keyId);
if (dKey == null) {
LOG.warn("No KEY found for persisted identifier ({}).", identifier);
} else {
byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
if (identifier.getSequenceNumber() > this.getDelegationTokenSeqNum()) {
this.setDelegationTokenSeqNum(identifier.getSequenceNumber());
}
this.currentTokens.put(identifier,
new DelegationTokenInformation(renewDate, password,
this.getTrackingIdIfEnabled(identifier)));
}
}
}
Loading
0