8000 feat(shecule): add batch terminal by ungreat · Pull Request #4384 · oceanbase/odc · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(shecule): add batch terminal #4384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.oceanbase.odc.server.web.controller.v2;

import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -49,6 +50,7 @@
import com.oceanbase.odc.service.common.util.WebResponseUtils;
import com.oceanbase.odc.service.flow.FlowInstanceService;
import com.oceanbase.odc.service.flow.FlowTaskInstanceService;
import com.oceanbase.odc.service.flow.model.BatchTerminateFlowResult;
import com.oceanbase.odc.service.flow.model.BinaryDataResult;
import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq;
import com.oceanbase.odc.service.flow.model.FlowInstanceApprovalReq;
Expand All @@ -61,6 +63,8 @@
import com.oceanbase.odc.service.partitionplan.model.PartitionPlanConfig;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.session.model.SqlExecuteResult;
import com.oceanbase.odc.service.state.model.StateName;
import com.oceanbase.odc.service.state.model.StatefulRoute;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;

import io.swagger.annotations.ApiOperation;
Expand Down Expand Up @@ -251,4 +255,24 @@ public SuccessResponse<PartitionPlanConfig> getPartitionPlan(@PathVariable Long
return Responses.ok(this.partitionPlanScheduleService.getPartitionPlanByFlowInstanceId(id));
}

@ApiOperation(value = "cancelFlowInstance", notes = "批量终止流程")
@RequestMapping(value = "/asyncCancel", method = RequestMethod.POST)
public SuccessResponse<String> batchCancelFlowInstance(@RequestBody Collection<Long> flowInstanceIds) {
return Responses.single(flowInstanceService.startBatchCancelFlowInstance(flowInstanceIds));
}

@ApiOperation(value = "getBatchCancelResult", notes = "获取批量终止结果")
@RequestMapping(value = "/asyncCancelResult", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
public SuccessResponse<List<BatchTerminateFlowResult>> getBatchCancelResult(String terminateId) {
return Responses.single(flowInstanceService.getBatchCancelResult(terminateId));
}

@ApiOperation(value = "getBatchCancelLog", notes = "获取批量终止日志")
@RequestMapping(value = "/asyncCancelLog", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
public SuccessResponse<String> getBatchCancelLog(String terminateId) {
return Responses.single(flowInstanceService.getBatchCancelLog(terminateId));
}

}
10000
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.oceanbase.odc.service.common.util.WebResponseUtils;
import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateCmd;
import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateResult;
import com.oceanbase.odc.service.schedule.model.ChangeScheduleResp;
import com.oceanbase.odc.service.schedule.model.CreateScheduleReq;
import com.oceanbase.odc.service.schedule.model.OperationType;
Expand All @@ -57,6 +59,8 @@
import com.oceanbase.odc.service.schedule.model.ScheduleTaskOverview;
import com.oceanbase.odc.service.schedule.model.ScheduleType;
import com.oceanbase.odc.service.schedule.model.UpdateScheduleReq;
import com.oceanbase.odc.service.state.model.StateName;
import com.oceanbase.odc.service.state.model.StatefulRoute;
import com.oceanbase.odc.service.task.executor.logger.LogUtils;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;

Expand Down Expand Up @@ -283,4 +287,21 @@ public SuccessResponse<RateLimitConfiguration> updateLimiterConfig(@PathVariable
@RequestBody RateLimitConfiguration limiterConfig) {
return Responses.single(scheduleService.updateDlmRateLimit(id, limiterConfig));
}

@RequestMapping(value = "/schedules/asyncTerminate", method = RequestMethod.POST)
public SuccessResponse<String> startTerminateScheduleAndTask(@RequestBody ScheduleTerminateCmd cmd) {
return Responses.ok(scheduleService.startTerminateScheduleAndTask(cmd));
}

@RequestMapping(value = "/schedules/asyncTerminateResult", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
public SuccessResponse<List<ScheduleTerminateResult>> getTerminateScheduleResult(String terminateId) {
return Responses.ok(scheduleService.getTerminateScheduleResult(terminateId));
}

@RequestMapping(value = "/schedules/asyncTerminateLog", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
public SuccessResponse<String> getTerminateScheduleLog(String terminateId) {
return Responses.ok(scheduleService.getTerminateLog(terminateId));
}
}
4 changes: 4 additions & 0 deletions server/odc-server/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,10 @@
<AppenderRef ref="RouteLogRunnableAppender"/>
</Logger>

<Logger name="com.oceanbase.odc.service.schedule.ScheduleService" level="INFO" additivity="true">
<AppenderRef ref="RouteLogRunnableAppender"/>
</Logger>

<!-- ODC程序日志输出,输出级别 INFO -->
<Logger name="com.oceanbase.odc" level="INFO" additivity="false">
<AppenderRef ref="OdcFileAppender"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ public ThreadPoolTaskExecutor queryProfileMonitorExecutor() {
return executor;
}

@Bean(name = "scheduleImportExecutor")
public ThreadPoolTaskExecutor scheduleImportExecutor() {
@Bean(name = "commonAsyncTaskExecutor")
public ThreadPoolTaskExecutor commonAsyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int minPoolSize = Math.max(SystemUtils.availableProcessors(), 4);
executor.setCorePoolSize(minPoolSize);
Expand All @@ -324,7 +324,7 @@ public ThreadPoolTaskExecutor scheduleImportExecutor() {
executor.setTaskDecorator(new TraceDecorator<>());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
log.info("scheduleImportExecutor initialized");
log.info("commonAsyncTaskExecutor initialized");
return executor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class FutureCache {

private final Cache<String, Future<?>> tempId2Future =
Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.MINUTES)
.maximumSize(1000L)
.removalListener((String key, Future<?> future, RemovalCause cause) -> {
if (future != null) {
future.cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -46,6 +47,7 @@
import org.flowable.engine.history.HistoricProcessInstanceQuery;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand All @@ -54,13 +56,15 @@
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.validation.annotation.Validated;

import com.fasterxml.jackson.core.type.TypeReference;
import com.oceanbase.odc.common.event.EventPublisher;
import com.oceanbase.odc.common.i18n.I18n;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.common.lang.Holder;
import com.oceanbase.odc.common.task.RouteLogCallable;
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.core.authority.util.SkipAuthorize;
import com.oceanbase.odc.core.flow.model.TaskParameters;
Expand Down Expand Up @@ -94,6 +98,7 @@
import com.oceanbase.odc.metadb.task.TaskEntity;
import com.oceanbase.odc.plugin.task.api.datatransfer.model.DataTransferConfig;
import com.oceanbase.odc.service.collaboration.environment.EnvironmentService;
import com.oceanbase.odc.service.common.FutureCache;
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.common.util.SpringContextUtil;
import com.oceanbase.odc.service.common.util.SqlUtils;
Expand Down Expand Up @@ -122,6 +127,7 @@
import com.oceanbase.odc.service.flow.instance.FlowInstanceConfigurer;
import com.oceanbase.odc.service.flow.instance.FlowTaskInstance;
import com.oceanbase.odc.service.flow.listener.AutoApproveUserTaskListener;
import com.oceanbase.odc.service.flow.model.BatchTerminateFlowResult;
import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq;
import com.oceanbase.odc.service.flow.model.ExecutionStrategyConfig;
import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp;
Expand All @@ -148,6 +154,7 @@
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.iam.model.User;
import com.oceanbase.odc.service.iam.model.UserResourceRole;
import com.oceanbase.odc.service.iam.util.SecurityContextUtils;
import com.oceanbase.odc.service.integration.IntegrationService;
import com.oceanbase.odc.service.integration.client.ApprovalClient;
import com.oceanbase.odc.service.integration.model.ApprovalProperties;
Expand All @@ -174,8 +181,11 @@
import com.oceanbase.odc.service.regulation.risklevel.model.RiskLevelDescriber;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.model.ScheduleStatus;
import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator;
import com.oceanbase.odc.service.task.TaskService;
import com.oceanbase.odc.service.task.executor.logger.LogUtils;
import com.oceanbase.odc.service.task.model.ExecutorInfo;
import com.oceanbase.odc.service.task.service.SpringTransactionManager;
import com.oceanbase.tools.loaddump.common.enums.ObjectType;

import io.micrometer.core.instrument.Tag;
Expand All @@ -195,6 +205,11 @@
@SkipAuthorize("flow instance use internal check")
public class FlowInstanceService {

private static final long MAX_EXPORT_OBJECT_COUNT = 10000;
private static final String ODC_SITE_URL = "odc.site.url";
private static final int MAX_APPLY_DATABASE_SIZE = 10;
private final List<Consumer<DataTransferTaskInitEvent>> dataTransferTaskInitHooks = new ArrayList<>();
private final List<Consumer<ShadowTableComparingUpdateEvent>> shadowTableComparingTaskHooks = new ArrayList<>();
@Autowired
private FlowInstanceRepository flowInstanceRepository;
@Autowired
Expand Down Expand Up @@ -262,15 +277,20 @@ public class FlowInstanceService {
@Autowired
private EnvironmentService environmentService;
@Autowired
private StatefulUuidStateIdGenerator statefulUuidStateIdGenerator;
@Autowired
private ThreadPoolTaskExecutor commonAsyncTaskExecutor;
@Autowired
private FutureCache futureCache;
@Autowired
private FlowPermissionHelper flowPermissionHelper;
@Autowired
private MeterManager meterManager;
@Autowired
private TransactionTemplate transactionTemplate;

private static final long MAX_EXPORT_OBJECT_COUNT = 10000;
private static final String ODC_SITE_URL = "odc.site.url";
private static final int MAX_APPLY_DATABASE_SIZE = 10;
10000 private final List<Consumer<DataTransferTaskInitEvent>> dataTransferTaskInitHooks = new ArrayList<>();
private final List<Consumer<ShadowTableComparingUpdateEvent>> shadowTableComparingTaskHooks = new ArrayList<>();
@Value("${odc.log.directory:./log}")
private String logPath;

@PostConstruct
public void init() {
Expand Down Expand Up @@ -558,6 +578,53 @@ public FlowInstanceDetailResp cancelWithoutPermission(@NotNull Long id) {
return cancel(flowInstance, true);
}

public String startBatchCancelFlowInstance(Collection<Long> flowInstanceIds) {
String terminateId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("BatchFlowTerminate");
User user = authenticationFacade.currentUser();
Future<List<BatchTerminateFlowResult>> future = commonAsyncTaskExecutor.submit(
new RouteLogCallable<List<BatchTerminateFlowResult>>("BatchFlowTerminate", terminateId, "terminate") {
@Override
public List<BatchTerminateFlowResult> doCall() {
SecurityContextUtils.setCurrentUser(user);
List<BatchTerminateFlowResult> results = new ArrayList<>();
for (Long id : flowInstanceIds) {
try {
new SpringTransactionManager(transactionTemplate)
.doInTransactionWithoutResult(() -> cancelWithWritePermission(id, false));
results.add(BatchTerminateFlowResult.success(id));
log.info("Terminate flow success, flowInstanceId={}", id);
} catch (Exception e) {
log.info("Terminate flow failed, flowInstanceId={}", id, e);
results.add(BatchTerminateFlowResult.failed(id, e.getMessage()));
}
}
return results;
}
});
futureCache.put(terminateId, future);
return terminateId;
}

public List<BatchTerminateFlowResult> getBatchCancelResult(String terminateId) {
statefulUuidStateIdGenerator.checkCurrentUserId(terminateId);
Future<List<BatchTerminateFlowResult>> future =
(Future<List<BatchTerminateFlowResult>>) futureCache.get(terminateId);
if (!future.isDone()) {
return Collections.emptyList();
}
try {
futureCache.invalid(terminateId);
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public String getBatchCancelLog(String terminateId) {
statefulUuidStateIdGenerator.checkCurrentUserId(terminateId);
return LogUtils.getRouteTaskLog(logPath, "BatchFlowTerminate", terminateId, "terminate");
}

public Map<Long, FlowStatus> getStatus(Set<Long> ids) {
Specification<FlowInstanceEntity> specification = Specification.where(FlowInstanceSpecs.idIn(ids))
.and(FlowInstanceSpecs.organizationIdEquals(authenticationFacade.currentOrganizationId()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.service.flow.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class BatchTerminateFlowResult {
private Boolean terminateSucceed;
private Long flowInstanceId;
private String failReason;

public static BatchTerminateFlowResult success(Long id) {
return new BatchTerminateFlowResult(true, id, null);
}

public static BatchTerminateFlowResult failed(Long id, String failReason) {
return new BatchTerminateFlowResult(false, id, failReason);

}

}
Loading
0