8000 YARN-11228. [Federation] Add getAppAttempts, getAppAttempt REST APIs for Router. by slfan1989 · Pull Request #4695 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

YARN-11228. [Federation] Add getAppAttempts, getAppAttempt REST APIs for Router. #4695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

8000

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8000
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ private RouterServerUtil() {
public static final Logger LOG =
LoggerFactory.getLogger(RouterServerUtil.class);

/**
* Throws an exception due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws YarnException on failure
*/
@Public
@Unstable
public static void logAndThrowException(Throwable t, String errMsgFormat, Object... args)
throws YarnException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
throw new YarnException(msg, t);
} else {
LOG.error(msg);
throw new YarnException(msg);
}
}

/**
* Throws an exception due to an error.
*
Expand Down Expand Up @@ -101,4 +123,26 @@ public static void logAndThrowRunTimeException(String errMsg, Throwable t)
throw new RuntimeException(errMsg);
}
}

/**
* Throws an RunTimeException due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws RuntimeException on failure
*/
@Public
@Unstable
public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat, Object... args)
throws RuntimeException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
throw new RuntimeException(msg, t);
} else {
LOG.error(msg);
throw new RuntimeException(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,24 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,

@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
throw new NotImplementedException("Code is not implemented");
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempts(hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the AppAttempt appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
}

return null;
}

@Override
Expand All @@ -1377,7 +1394,28 @@ public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
@Override
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
if (appAttemptId == null || appAttemptId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempt(req, res, appId, appAttemptId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the AppAttempt appId: %s, appAttemptId: %s.", appId, appAttemptId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
}

return null;
}

@Override
Expand Down Expand Up @@ -1428,13 +1466,7 @@ public ContainerInfo getContainer(HttpServletRequest req,
}

try {
ApplicationId applicationId = ApplicationId.fromString(appId);
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);

if (subClusterInfo == null) {
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
applicationId, null);
}
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
Expand Down Expand Up @@ -1488,12 +1520,7 @@ public Response signalToContainer(String containerId, String command,
ContainerId containerIdObj = ContainerId.fromString(containerId);
ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId();

SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);

if (subClusterInfo == null) {
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
applicationId, null);
}
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId.toString());

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
Expand Down Expand Up @@ -1561,24 +1588,26 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
/**
* get the HomeSubCluster according to ApplicationId.
*
* @param applicationId applicationId
* @param appId applicationId
* @return HomeSubCluster
* @throws YarnException on failure
*/
private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException {
private SubClusterInfo getHomeSubClusterInfoByAppId(String appId)
throws YarnException {
SubClusterInfo subClusterInfo = null;
SubClusterId subClusterId = null;
try {
subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
ApplicationId applicationId = ApplicationId.fromString(appId);
SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == null) {
RouterServerUtil.logAndThrowException("Can't get HomeSubCluster by applicationId "
+ applicationId, null);
RouterServerUtil.logAndThrowException(null,
"Can't get HomeSubCluster by applicationId %s", applicationId);
}
subClusterInfo = federationFacade.getSubCluster(subClusterId);
return subClusterInfo;
} catch (YarnException e) {
RouterServerUtil.logAndThrowException("Get HomeSubClusterInfo by applicationId "
+ applicationId + " failed.", e);
RouterServerUtil.logAndThrowException(e,
"Get HomeSubClusterInfo by applicationId %s failed.", appId);
}
return subClusterInfo;
throw new YarnException("Unable to get subCluster by applicationId = " + appId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
Expand All @@ -67,6 +72,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.webapp.NotFoundException;
Expand Down Expand Up @@ -412,4 +419,48 @@ public Response signalToContainer(String containerId, String command,

return Response.status(Status.OK).build();
}

@Override
public AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

ApplicationReport newApplicationReport = ApplicationReport.newInstance(
applicationId, ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
"user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 1, 2, 3, 4,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);

ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance(
ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
"host", 124, "url", "oUrl", "diagnostics",
YarnApplicationAttemptState.FINISHED, ContainerId.newContainerId(
newApplicationReport.getCurrentApplicationAttemptId(), 1));

return new AppAttemptInfo(attempt);
}

@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

AppAttemptsInfo infos = new AppAttemptsInfo();
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(0));
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(1));
return infos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand Down Expand Up @@ -717,4 +719,60 @@ public void testGetContainer()
appId.toString(), appAttemptId.toString(), "0");
Assert.assertNotNull(containerInfo);
}

@Test
public void testGetAppAttempts()
throws IOException, InterruptedException, YarnException {
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());

Assert.assertNotNull(interceptor.submitApplication(context, null));

AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(null, appId.toString());
Assert.assertNotNull(appAttemptsInfo);

ArrayList<AppAttemptInfo> attemptLists = appAttemptsInfo.getAttempts();
Assert.assertNotNull(appAttemptsInfo);
Assert.assertEquals(2, attemptLists.size());

AppAttemptInfo attemptInfo1 = attemptLists.get(0);
Assert.assertNotNull(attemptInfo1);
Assert.assertEquals(0, attemptInfo1.getAttemptId());
Assert.assertEquals("AppAttemptId_0", attemptInfo1.getAppAttemptId());
Assert.assertEquals("LogLink_0", attemptInfo1.getLogsLink());
Assert.assertEquals(1659621705L, attemptInfo1.getFinishedTime());

AppAttemptInfo attemptInfo2 = attemptLists.get(1);
Assert.assertNotNull(attemptInfo2);
Assert.assertEquals(0, attemptInfo2.getAttemptId());
Assert.assertEquals("AppAttemptId_1", attemptInfo2.getAppAttemptId());
Assert.assertEquals("LogLink_1", attemptInfo2.getLogsLink());
Assert.assertEquals(1659621705L, attemptInfo2.getFinishedTime());
}

@Test
public void testGetAppAttempt()
throws IOException, InterruptedException, YarnException {

// Generate ApplicationId information
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());

// Generate ApplicationAttemptId information
Assert.assertNotNull(interceptor.submitApplication(context, null));
ApplicationAttemptId expectAppAttemptId = ApplicationAttemptId.newInstance(appId, 1);

org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo
appAttemptInfo = interceptor.getAppAttempt(null, null, appId.toString(), "1");

Assert.assertNotNull(appAttemptInfo);
Assert.assertEquals(expectAppAttemptId.toString(), appAttemptInfo.getAppAttemptId());
Assert.assertEquals("url", appAttemptInfo.getTrackingUrl());
Assert.assertEquals("oUrl", appAttemptInfo.getOriginalTrackingUrl());
Assert.assertEquals(124, appAttemptInfo.getRpcPort());
Assert.assertEquals("host", appAttemptInfo.getHost());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Test class to validate RouterWebServiceUtil methods.
*/
Expand Down Expand Up @@ -579,4 +583,13 @@ private void setUpClusterMetrics(ClusterMetricsInfo metrics, long seed) {
metrics.setActiveNodes(rand.nextInt(1000));
metrics.setShutdownNodes(rand.nextInt(1000));
}

public static AppAttemptInfo generateAppAttemptInfo(int attemptId) {
AppAttemptInfo appAttemptInfo = mock(AppAttemptInfo.class);
when(appAttemptInfo.getAppAttemptId()).thenReturn("AppAttemptId_" + attemptId);
when(appAttemptInfo.getAttemptId()).thenReturn(0);
when(appAttemptInfo.getFinishedTime()).thenReturn(1659621705L);
when(appAttemptInfo.getLogsLink()).thenReturn("LogLink_" + attemptId);
return appAttemptInfo;
}
}
0