From bffb4a996ce0db854403ad762c94e352d2de37c3 Mon Sep 17 00:00:00 2001 From: ungreat Date: Mon, 14 Apr 2025 14:25:51 +0800 Subject: [PATCH 1/7] add schedule terminate service --- .../controller/v2/FlowInstanceController.java | 20 ++ .../web/controller/v2/ScheduleController.java | 17 ++ .../odc-server/src/main/resources/log4j2.xml | 4 + .../odc/config/ScheduleConfiguration.java | 6 +- .../odc/service/flow/FlowInstanceService.java | 77 +++++++- .../flow/model/BatchTerminateFlowResult.java | 39 ++++ .../odc/service/schedule/ScheduleService.java | 182 +++++++++++++++--- .../export/ScheduleExportService.java | 67 +------ .../export/ScheduleTaskImportService.java | 6 +- .../export/model/ScheduleTerminateCmd.java | 28 +++ .../export/model/ScheduleTerminateResult.java | 44 +++++ .../BatchSchedulePermissionValidator.java | 107 ++++++++++ .../task/executor/logger/LogUtils.java | 13 ++ 13 files changed, 510 insertions(+), 100 deletions(-) create mode 100644 server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/BatchTerminateFlowResult.java create mode 100644 server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateCmd.java create mode 100644 server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateResult.java create mode 100644 server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/util/BatchSchedulePermissionValidator.java diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java index 2e1fd46293..32f2df1ba6 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java @@ -16,6 +16,7 @@ package com.oceanbase.odc.server.web.controller.v2; import java.io.IOException; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; @@ -49,6 +50,7 @@ import com.oceanbase.odc.service.common.util.WebResponseUtils; import com.oceanbase.odc.service.flow.FlowInstanceService; import com.oceanbase.odc.service.flow.FlowTaskInstanceService; +import com.oceanbase.odc.service.flow.model.BatchTerminateFlowResult; import com.oceanbase.odc.service.flow.model.BinaryDataResult; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; import com.oceanbase.odc.service.flow.model.FlowInstanceApprovalReq; @@ -251,4 +253,22 @@ public SuccessResponse getPartitionPlan(@PathVariable Long return Responses.ok(this.partitionPlanScheduleService.getPartitionPlanByFlowInstanceId(id)); } + @ApiOperation(value = "cancelFlowInstance", notes = "批量终止流程") + @RequestMapping(value = "/batchCancelFlowInstance", method = RequestMethod.POST) + public SuccessResponse batchCancelFlowInstance(@RequestBody Collection flowInstanceIds) { + return Responses.single(flowInstanceService.startBatchCancelFlowInstance(flowInstanceIds)); + } + + @ApiOperation(value = "getBatchCancelResult", notes = "获取批量终止结果") + @RequestMapping(value = "/getBatchCancelResult", method = RequestMethod.GET) + public SuccessResponse> getBatchCancelResult(String terminateId) { + return Responses.single(flowInstanceService.getBatchCancelResult(terminateId)); + } + + @ApiOperation(value = "getBatchCancelLog", notes = "获取批量终止结果") + @RequestMapping(value = "/getBatchCancelLog", method = RequestMethod.GET) + public SuccessResponse getBatchCancelLog(String terminateId) { + return Responses.single(flowInstanceService.getBatchCancelLog(terminateId)); + } + } diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java index b7112d7996..1f73f06a2d 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java @@ -41,6 +41,8 @@ import com.oceanbase.odc.service.common.util.WebResponseUtils; import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration; import com.oceanbase.odc.service.schedule.ScheduleService; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateCmd; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateResult; import com.oceanbase.odc.service.schedule.model.ChangeScheduleResp; import com.oceanbase.odc.service.schedule.model.CreateScheduleReq; import com.oceanbase.odc.service.schedule.model.OperationType; @@ -283,4 +285,19 @@ public SuccessResponse updateLimiterConfig(@PathVariable @RequestBody RateLimitConfiguration limiterConfig) { return Responses.single(scheduleService.updateDlmRateLimit(id, limiterConfig)); } + + @RequestMapping(value = "batchTerminateScheduleAndTask", method = RequestMethod.POST) + public SuccessResponse startTerminateScheduleAndTask(@RequestBody ScheduleTerminateCmd cmd) { + return Responses.ok(scheduleService.startTerminateScheduleAndTask(cmd)); + } + + @RequestMapping(value = "getTerminateScheduleResult", method = RequestMethod.GET) + public SuccessResponse> getTerminateScheduleResult(String terminateId) { + return Responses.ok(scheduleService.getTerminateScheduleResult(terminateId)); + } + + @RequestMapping(value = "getTerminateScheduleLog", method = RequestMethod.GET) + public SuccessResponse getTerminateScheduleLog(String terminateId) { + return Responses.ok(scheduleService.getTerminateLog(terminateId)); + } } diff --git a/server/odc-server/src/main/resources/log4j2.xml b/server/odc-server/src/main/resources/log4j2.xml index cc19b190f0..b5bb5c0c3f 100644 --- a/server/odc-server/src/main/resources/log4j2.xml +++ b/server/odc-server/src/main/resources/log4j2.xml @@ -918,6 +918,10 @@ + + + + diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java b/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java index 048c3ff01b..0f7f6e599a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/config/ScheduleConfiguration.java @@ -311,8 +311,8 @@ public ThreadPoolTaskExecutor queryProfileMonitorExecutor() { return executor; } - @Bean(name = "scheduleImportExecutor") - public ThreadPoolTaskExecutor scheduleImportExecutor() { + @Bean(name = "commonAsyncTaskExecutor") + public ThreadPoolTaskExecutor commonAsyncTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); int minPoolSize = Math.max(SystemUtils.availableProcessors(), 4); executor.setCorePoolSize(minPoolSize); @@ -324,7 +324,7 @@ public ThreadPoolTaskExecutor scheduleImportExecutor() { executor.setTaskDecorator(new TraceDecorator<>()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); executor.initialize(); - log.info("scheduleImportExecutor initialized"); + log.info("commonAsyncTaskExecutor initialized"); return executor; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java index d25efa063b..a33355653e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java @@ -29,6 +29,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -46,6 +47,7 @@ import org.flowable.engine.history.HistoricProcessInstanceQuery; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.i18n.LocaleContextHolder; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -54,6 +56,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.validation.annotation.Validated; import com.fasterxml.jackson.core.type.TypeReference; @@ -61,6 +64,7 @@ import com.oceanbase.odc.common.i18n.I18n; import com.oceanbase.odc.common.json.JsonUtils; import com.oceanbase.odc.common.lang.Holder; +import com.oceanbase.odc.common.task.RouteLogCallable; import com.oceanbase.odc.common.util.StringUtils; import com.oceanbase.odc.core.authority.util.SkipAuthorize; import com.oceanbase.odc.core.flow.model.TaskParameters; @@ -94,6 +98,7 @@ import com.oceanbase.odc.metadb.task.TaskEntity; import com.oceanbase.odc.plugin.task.api.datatransfer.model.DataTransferConfig; import com.oceanbase.odc.service.collaboration.environment.EnvironmentService; +import com.oceanbase.odc.service.common.FutureCache; import com.oceanbase.odc.service.common.response.SuccessResponse; import com.oceanbase.odc.service.common.util.SpringContextUtil; import com.oceanbase.odc.service.common.util.SqlUtils; @@ -122,6 +127,7 @@ import com.oceanbase.odc.service.flow.instance.FlowInstanceConfigurer; import com.oceanbase.odc.service.flow.instance.FlowTaskInstance; import com.oceanbase.odc.service.flow.listener.AutoApproveUserTaskListener; +import com.oceanbase.odc.service.flow.model.BatchTerminateFlowResult; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; import com.oceanbase.odc.service.flow.model.ExecutionStrategyConfig; import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp; @@ -148,6 +154,7 @@ import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.iam.model.User; import com.oceanbase.odc.service.iam.model.UserResourceRole; +import com.oceanbase.odc.service.iam.util.SecurityContextUtils; import com.oceanbase.odc.service.integration.IntegrationService; import com.oceanbase.odc.service.integration.client.ApprovalClient; import com.oceanbase.odc.service.integration.model.ApprovalProperties; @@ -174,8 +181,11 @@ import com.oceanbase.odc.service.regulation.risklevel.model.RiskLevelDescriber; import com.oceanbase.odc.service.schedule.ScheduleService; import com.oceanbase.odc.service.schedule.model.ScheduleStatus; +import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator; import com.oceanbase.odc.service.task.TaskService; +import com.oceanbase.odc.service.task.executor.logger.LogUtils; import com.oceanbase.odc.service.task.model.ExecutorInfo; +import com.oceanbase.odc.service.task.service.SpringTransactionManager; import com.oceanbase.tools.loaddump.common.enums.ObjectType; import io.micrometer.core.instrument.Tag; @@ -195,6 +205,11 @@ @SkipAuthorize("flow instance use internal check") public class FlowInstanceService { + private static final long MAX_EXPORT_OBJECT_COUNT = 10000; + private static final String ODC_SITE_URL = "odc.site.url"; + private static final int MAX_APPLY_DATABASE_SIZE = 10; + private final List> dataTransferTaskInitHooks = new ArrayList<>(); + private final List> shadowTableComparingTaskHooks = new ArrayList<>(); @Autowired private FlowInstanceRepository flowInstanceRepository; @Autowired @@ -262,15 +277,20 @@ public class FlowInstanceService { @Autowired private EnvironmentService environmentService; @Autowired + private StatefulUuidStateIdGenerator statefulUuidStateIdGenerator; + @Autowired + private ThreadPoolTaskExecutor commonAsyncTaskExecutor; + @Autowired + private FutureCache futureCache; + @Autowired private FlowPermissionHelper flowPermissionHelper; @Autowired private MeterManager meterManager; + @Autowired + private TransactionTemplate transactionTemplate; - private static final long MAX_EXPORT_OBJECT_COUNT = 10000; - private static final String ODC_SITE_URL = "odc.site.url"; - private static final int MAX_APPLY_DATABASE_SIZE = 10; - private final List> dataTransferTaskInitHooks = new ArrayList<>(); - private final List> shadowTableComparingTaskHooks = new ArrayList<>(); + @Value("${odc.log.directory:./log}") + private String logPath; @PostConstruct public void init() { @@ -558,6 +578,53 @@ public FlowInstanceDetailResp cancelWithoutPermission(@NotNull Long id) { return cancel(flowInstance, true); } + public String startBatchCancelFlowInstance(Collection flowInstanceIds) { + String terminateId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("BatchFlowTerminate"); + User user = authenticationFacade.currentUser(); + Future> future = commonAsyncTaskExecutor.submit( + new RouteLogCallable>("BatchFlowTerminate", terminateId, "terminate") { + @Override + public List doCall() { + SecurityContextUtils.setCurrentUser(user); + List results = new ArrayList<>(); + for (Long id : flowInstanceIds) { + try { + new SpringTransactionManager(transactionTemplate) + .doInTransactionWithoutResult(() -> cancelWithWritePermission(id, false)); + results.add(BatchTerminateFlowResult.success(id)); + log.info("Terminate flow success, flowInstanceId={}", id); + } catch (Exception e) { + log.info("Terminate flow failed, flowInstanceId={}", id, e); + results.add(BatchTerminateFlowResult.failed(id, e.getMessage())); + } + } + return results; + } + }); + futureCache.put(terminateId, future); + return terminateId; + } + + public List getBatchCancelResult(String terminateId) { + statefulUuidStateIdGenerator.checkCurrentUserId(terminateId); + Future> future = + (Future>) futureCache.get(terminateId); + if (!future.isDone()) { + return Collections.emptyList(); + } + try { + futureCache.invalid(terminateId); + return future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public String getBatchCancelLog(String terminateId) { + statefulUuidStateIdGenerator.checkCurrentUserId(terminateId); + return LogUtils.getRouteTaskLog(logPath, "BatchFlowTerminate", terminateId, "terminate"); + } + public Map getStatus(Set ids) { Specification specification = Specification.where(FlowInstanceSpecs.idIn(ids)) .and(FlowInstanceSpecs.organizationIdEquals(authenticationFacade.currentOrganizationId())); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/BatchTerminateFlowResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/BatchTerminateFlowResult.java new file mode 100644 index 0000000000..da107cd5e3 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/BatchTerminateFlowResult.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.flow.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class BatchTerminateFlowResult { + private Boolean terminateSucceed; + private Long flowInstanceId; + private String failReason; + + public static BatchTerminateFlowResult success(Long id) { + return new BatchTerminateFlowResult(true, id, null); + } + + public static BatchTerminateFlowResult failed(Long id, String failReason) { + return new BatchTerminateFlowResult(false, id, failReason); + + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java index 8fd5af4b38..547c6b3da8 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java @@ -20,6 +20,7 @@ import java.io.File; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -28,6 +29,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; @@ -50,6 +52,7 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.integration.jdbc.lock.JdbcLockRegistry; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionTemplate; @@ -57,10 +60,12 @@ import com.alibaba.fastjson.JSONObject; import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.common.task.RouteLogCallable; import com.oceanbase.odc.common.util.StringUtils; import com.oceanbase.odc.core.alarm.AlarmUtils; import com.oceanbase.odc.core.authority.util.SkipAuthorize; import com.oceanbase.odc.core.shared.PreConditions; +import com.oceanbase.odc.core.shared.Verify; import com.oceanbase.odc.core.shared.constant.ErrorCodes; import com.oceanbase.odc.core.shared.constant.OrganizationType; import com.oceanbase.odc.core.shared.constant.ResourceRoleName; @@ -79,8 +84,10 @@ import com.oceanbase.odc.metadb.schedule.ScheduleRepository; import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity; import com.oceanbase.odc.metadb.schedule.ScheduleTaskRepository; +import com.oceanbase.odc.metadb.task.TaskEntity; import com.oceanbase.odc.service.collaboration.project.ProjectService; import com.oceanbase.odc.service.collaboration.project.model.Project; +import com.oceanbase.odc.service.common.FutureCache; import com.oceanbase.odc.service.common.util.SpringContextUtil; import com.oceanbase.odc.service.connection.ConnectionService; import com.oceanbase.odc.service.connection.database.DatabaseService; @@ -91,6 +98,7 @@ import com.oceanbase.odc.service.dlm.model.DataArchiveParameters; import com.oceanbase.odc.service.dlm.model.DataDeleteParameters; import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration; +import com.oceanbase.odc.service.flow.FlowInstanceService; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp; import com.oceanbase.odc.service.iam.OrganizationService; @@ -99,11 +107,15 @@ import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.iam.model.Organization; import com.oceanbase.odc.service.iam.model.User; +import com.oceanbase.odc.service.iam.util.SecurityContextUtils; import com.oceanbase.odc.service.objectstorage.ObjectStorageFacade; +import com.oceanbase.odc.service.partitionplan.PartitionPlanScheduleService; import com.oceanbase.odc.service.quartz.QuartzJobServiceProxy; import com.oceanbase.odc.service.quartz.model.MisfireStrategy; import com.oceanbase.odc.service.quartz.util.QuartzCronExpressionUtils; import com.oceanbase.odc.service.regulation.approval.ApprovalFlowConfigSelector; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateCmd; +import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateResult; import com.oceanbase.odc.service.schedule.factory.ScheduleResponseMapperFactory; import com.oceanbase.odc.service.schedule.flowtask.AlterScheduleParameters; import com.oceanbase.odc.service.schedule.flowtask.ApprovalFlowClient; @@ -135,10 +147,13 @@ import com.oceanbase.odc.service.schedule.model.TriggerStrategy; import com.oceanbase.odc.service.schedule.model.UpdateScheduleReq; import com.oceanbase.odc.service.schedule.processor.ScheduleChangePreprocessor; +import com.oceanbase.odc.service.schedule.util.BatchSchedulePermissionValidator; import com.oceanbase.odc.service.schedule.util.ScheduleDescriptionGenerator; import com.oceanbase.odc.service.sqlplan.model.SqlPlanParameters; +import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator; import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants; import com.oceanbase.odc.service.task.exception.JobException; +import com.oceanbase.odc.service.task.executor.logger.LogUtils; import com.oceanbase.odc.service.task.model.OdcTaskLogLevel; import com.oceanbase.odc.service.task.schedule.JobScheduler; @@ -156,11 +171,11 @@ @SkipAuthorize public class ScheduleService { + private final ScheduleMapper scheduleMapper = ScheduleMapper.INSTANCE; @Value("${odc.task.trigger.minimum-interval:600}") private int minInterval; @Autowired private ScheduleRepository scheduleRepository; - @Autowired private ScheduleTaskRepository scheduleTaskRepository; @Autowired @@ -168,71 +183,66 @@ public class ScheduleService { @Autowired @Qualifier("quartzJobServiceProxy") private QuartzJobServiceProxy quartzJobService; - @Autowired private ObjectStorageFacade objectStorageFacade; - @Autowired private FlowInstanceRepository flowInstanceRepository; - + @Autowired + private FlowInstanceService flowInstanceService; + @Autowired + private PartitionPlanScheduleService partitionPlanScheduleService; @Autowired private ScheduleTaskService scheduleTaskService; - @Autowired private ScheduleResponseMapperFactory scheduleResponseMapperFactory; - @Autowired @Lazy private ProjectService projectService; - @Autowired private ProjectPermissionValidator projectPermissionValidator; - @Autowired private ApprovalFlowConfigSelector approvalFlowConfigSelector; - @Autowired private DatabaseService databaseService; - @Autowired private EnvironmentRepository environmentRepository; - @Autowired private ScheduleChangeLogService scheduleChangeLogService; - @Autowired private OrganizationService organizationService; - @Autowired private ScheduleChangePreprocessor preprocessor; - @Autowired private LatestTaskMappingRepository latestTaskMappingRepository; - @Autowired private ConnectionService connectionService; - @Autowired private DlmLimiterService dlmLimiterService; - @Autowired private UserService userService; - @Autowired private ScheduledTaskLoggerService scheduledTaskLoggerService; - @Autowired private JdbcLockRegistry jdbcLockRegistry; - @Autowired private ApprovalFlowClient approvalFlowService; - @Autowired private ScheduleDescriptionGenerator descriptionGenerator; @Autowired - private TransactionTemplate txTemplate; + private StatefulUuidStateIdGenerator statefulUuidStateIdGenerator; + @Autowired + private ThreadPoolTaskExecutor commonAsyncTaskExecutor; + @Autowired + private FutureCache futureCache; - private final ScheduleMapper scheduleMapper = ScheduleMapper.INSTANCE; + @Autowired + private BatchSchedulePermissionValidator batchSchedulePermissionValidator; + + @Value("${odc.log.directory:./log}") + private String logPath; + + @Autowired + private TransactionTemplate txTemplate; @Transactional(rollbackFor = Exception.class) public List dispatchCreateSchedule(CreateFlowInstanceReq createReq) { @@ -566,12 +576,20 @@ public void terminate(ScheduleEntity scheduleConfig) throws SchedulerException { scheduleRepository.updateStatusById(scheduleConfig.getId(), ScheduleStatus.TERMINATED); } - @Transactional(rollbackFor = Exception.class) public void innerTerminate(Long scheduleId) throws SchedulerException { - ScheduleEntity schedule = nullSafeGetById(scheduleId); - JobKey jobKey = QuartzKeyGenerator.generateJobKey(schedule); - quartzJobService.deleteJob(jobKey); - scheduleRepository.updateStatusById(schedule.getId(), ScheduleStatus.TERMINATED); + txTemplate.execute(status -> { + try { + ScheduleEntity schedule = nullSafeGetById(scheduleId); + JobKey jobKey = QuartzKeyGenerator.generateJobKey(schedule); + quartzJobService.deleteJob(jobKey); + scheduleRepository.updateStatusById(schedule.getId(), ScheduleStatus.TERMINATED); + return true; + } catch (Exception e) { + status.setRollbackOnly(); + throw new RuntimeException(e); + } + }); + } /** @@ -697,6 +715,114 @@ public void stopTask(Long scheduleId, Long scheduleTaskId) { } } + public String startTerminateScheduleAndTask(ScheduleTerminateCmd cmd) { + batchSchedulePermissionValidator.checkScheduleIdsPermission(cmd.getScheduleType(), cmd.getIds()); + User user = authenticationFacade.currentUser(); + String terminateId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("ScheduleTerminate"); + Future> future = commonAsyncTaskExecutor.submit( + new RouteLogCallable>("ScheduleTerminate", terminateId, "terminate") { + @Override + public List doCall() { + SecurityContextUtils.setCurrentUser(user); + return syncTerminateScheduleAndTask(cmd); + } + }); + futureCache.put(terminateId, future); + return terminateId; + } + + public List getTerminateScheduleResult(String terminateId) { + statefulUuidStateIdGenerator.checkCurrentUserId(terminateId); + Future future = futureCache.get(terminateId); + if (!future.isDone()) { + return Collections.emptyList(); + } + try { + futureCache.invalid(terminateId); + return (List) future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public String getTerminateLog(String terminateId) { + statefulUuidStateIdGenerator.checkCurrentUserId(terminateId); + return LogUtils.getRouteTaskLog(logPath, "ScheduleTerminate", terminateId, "terminate"); + } + + public List syncTerminateScheduleAndTask(ScheduleTerminateCmd cmd) { + log.info("Start to terminate schedule, type={}, scheduleIds={}", cmd.getScheduleType(),cmd.getIds()); + List results = new ArrayList<>(); + if (ScheduleType.PARTITION_PLAN.equals(cmd.getScheduleType())) { + processTerminatePartitionPlan(cmd, results); + return results; + } + List scheduleEntities = scheduleRepository.findByIdIn(cmd.getIds()); + Verify.verify(Objects.equals(scheduleEntities.size(), cmd.getIds().size()), "Invalid schedule Ids"); + for (ScheduleEntity schedule : scheduleEntities) { + try { + Optional latestTaskEntity = + scheduleTaskRepository.getLatestScheduleTaskByJobNameAndJobGroup( + String.valueOf(schedule.getId()), + schedule.getType().name()); + if (!latestTaskEntity.isPresent() || !latestTaskEntity.get().getStatus().isProcessing()) { + innerTerminate(schedule.getId()); + log.info("Schedule task stop success, scheduleId={}", schedule.getId()); + results.add(ScheduleTerminateResult.ofSuccess(schedule.getType(), schedule.getId())); + continue; + } + ScheduleTaskEntity scheduleTaskEntity = latestTaskEntity.get(); + scheduleTaskService.stop(scheduleTaskEntity.getId()); + final int maxRetryTimes = 30; + int retryTimes = 0; + while (retryTimes < maxRetryTimes) { + latestTaskEntity = scheduleTaskRepository.getLatestScheduleTaskByJobNameAndJobGroup( + String.valueOf(schedule.getId()), + schedule.getType().name()); + if (latestTaskEntity.get().getStatus().isTerminated()) { + innerTerminate(schedule.getId()); + results.add( + ScheduleTerminateResult.ofSuccess(schedule.getType(), schedule.getId())); + log.info("Schedule task stop success, scheduleId={}", schedule.getId()); + break; + } + retryTimes++; + Thread.sleep(2000); + } + log.info( + "Wait task 60s, still not terminate, please try again. Schedule task stop Failed, scheduleId={}", + schedule.getId()); + results.add(ScheduleTerminateResult.ofFailed(cmd.getScheduleType(), schedule.getId(), + "Wait task 60s, still not terminate, please try again.")); + } catch (Exception e) { + log.error("Terminate schedule task failed,scheduleId={}", schedule.getId(), e); + results.add(ScheduleTerminateResult.ofFailed(cmd.getScheduleType(), schedule.getId(), + e.getMessage())); + } + } + return results; + } + + + // The partition plan uses Schedule, but it is created through flow, and the front end also displays + // it through flow. + // To ensure that the customer see the same id, the flowInstanceId is used + private void processTerminatePartitionPlan(ScheduleTerminateCmd cmd, List results) { + Map flowInstanceId2TaskEntity = flowInstanceService.getTaskByFlowInstanceIds(cmd.getIds()); + for (Map.Entry entry : flowInstanceId2TaskEntity.entrySet()) { + Long flowInstanceId = entry.getKey(); + TaskEntity taskEntity = entry.getValue(); + try { + partitionPlanScheduleService.disablePartitionPlan(taskEntity.getDatabaseId()); + results.add(ScheduleTerminateResult.ofSuccess(cmd.getScheduleType(), flowInstanceId)); + log.info("PartitionPlan task stop success, flowInstanceId={}", flowInstanceId); + } catch (Exception e) { + results.add(ScheduleTerminateResult.ofFailed(cmd.getScheduleType(), flowInstanceId, e.getMessage())); + log.info("PartitionPlan task stop failed, flowInstanceId={}", flowInstanceId, e); + } + } + } + /** * @param scheduleId the task must belong to a valid schedule,so this param is not be null. * @param scheduleTaskId the task uid. Start a paused or pending task. diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java index 87cc1f7b33..39923f1ced 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleExportService.java @@ -18,7 +18,6 @@ import java.io.File; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -31,10 +30,6 @@ import org.springframework.stereotype.Service; import com.oceanbase.odc.common.task.RouteLogCallable; -import com.oceanbase.odc.core.shared.OrganizationIsolated; -import com.oceanbase.odc.core.shared.constant.OrganizationType; -import com.oceanbase.odc.core.shared.constant.ResourceRoleName; -import com.oceanbase.odc.core.shared.constant.ResourceType; import com.oceanbase.odc.metadb.flow.FlowInstanceEntity; import com.oceanbase.odc.metadb.schedule.ScheduleEntity; import com.oceanbase.odc.metadb.schedule.ScheduleRepository; @@ -48,19 +43,15 @@ import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.iam.model.User; import com.oceanbase.odc.service.objectstorage.ObjectStorageFacade; -import com.oceanbase.odc.service.schedule.ScheduleService; import com.oceanbase.odc.service.schedule.export.model.FileExportResponse; import com.oceanbase.odc.service.schedule.export.model.ScheduleExportListView; import com.oceanbase.odc.service.schedule.export.model.ScheduleTaskExportRequest; -import com.oceanbase.odc.service.schedule.model.Schedule; import com.oceanbase.odc.service.schedule.model.ScheduleMapper; import com.oceanbase.odc.service.schedule.model.ScheduleType; +import com.oceanbase.odc.service.schedule.util.BatchSchedulePermissionValidator; import com.oceanbase.odc.service.state.StatefulUuidStateIdGenerator; import com.oceanbase.odc.service.task.executor.logger.LogUtils; -import lombok.AllArgsConstructor; -import lombok.Data; - @Service public class ScheduleExportService { public static final String ASYNC_TASK_BASE_BUCKET = "scheduleexport"; @@ -82,7 +73,7 @@ public class ScheduleExportService { private FlowInstanceService flowInstanceService; @Autowired - private ScheduleService scheduleService; + private BatchSchedulePermissionValidator batchSchedulePermissionValidator; @Autowired private UserService userService; @@ -97,7 +88,7 @@ public class ScheduleExportService { private StatefulUuidStateIdGenerator statefulUuidStateIdGenerator; @Autowired - private ThreadPoolTaskExecutor scheduleImportExecutor; + private ThreadPoolTaskExecutor commonAsyncTaskExecutor; @Autowired private DatabaseService databaseService; @@ -113,10 +104,10 @@ private String getPersonalBucketName() { } public String startExport(ScheduleTaskExportRequest request) { - checkRequestIdsPermission(request); + batchSchedulePermissionValidator.checkScheduleIdsPermission(request.getScheduleType(), request.getIds()); String previewId = statefulUuidStateIdGenerator.generateCurrentUserIdStateId("scheduleExport"); User user = authenticationFacade.currentUser(); - Future future = scheduleImportExecutor.submit( + Future future = commonAsyncTaskExecutor.submit( new ScheduleTaskExportCallable(previewId, request, user, scheduleTaskExporter, getPersonalBucketName(), objectStorageFacade)); futureCache.put(previewId, future); @@ -124,7 +115,7 @@ public String startExport(ScheduleTaskExportRequest request) { } public List getExportListView(ScheduleTaskExportRequest request) { - checkRequestIdsPermission(request); + batchSchedulePermissionValidator.checkScheduleIdsPermission(request.getScheduleType(), request.getIds()); if (request.getScheduleType().equals(ScheduleType.PARTITION_PLAN)) { return getPartitionPlanView(request); } @@ -183,50 +174,4 @@ public String getExportLog(String exportId) { File logFile = new File(filePath); return LogUtils.getLatestLogContent(logFile, 10000L, 1048576L); } - - private void checkRequestIdsPermission(ScheduleTaskExportRequest request) { - Set projectIds; - if (request.getScheduleType().equals(ScheduleType.PARTITION_PLAN)) { - List flowInstanceEntities = flowInstanceService.listByIds(request.getIds()); - projectIds = - flowInstanceEntities.stream().map(FlowInstanceEntity::getProjectId).collect(Collectors.toSet()); - List flowOrganizationIsolateds = flowInstanceEntities.stream().map( - f -> new FlowOrganizationIsolated(f.getOrganizationId(), f.getId())).collect( - Collectors.toList()); - horizontalDataPermissionValidator.checkCurrentOrganization(flowOrganizationIsolateds); - } else { - List scheduleEntities = scheduleRepository.findByIdIn(request.getIds()).stream() - .map(ScheduleMapper.INSTANCE::entityToModel).collect( - Collectors.toList()); - horizontalDataPermissionValidator.checkCurrentOrganization(scheduleEntities); - projectIds = scheduleEntities.stream().map(Schedule::getProjectId).collect(Collectors.toSet()); - } - if (authenticationFacade.currentOrganization().getType().equals(OrganizationType.TEAM)) { - projectPermissionValidator.checkProjectRole(projectIds, ResourceRoleName.all()); - } - } - - - @Data - @AllArgsConstructor - private final static class FlowOrganizationIsolated implements OrganizationIsolated { - - private Long organizationId; - private Long id; - - @Override - public String resourceType() { - return ResourceType.ODC_FLOW_INSTANCE.name(); - } - - @Override - public Long organizationId() { - return organizationId; - } - - @Override - public Long id() { - return id; - } - } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java index 9fe365601f..82cf077e3a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/ScheduleTaskImportService.java @@ -46,7 +46,7 @@ public class ScheduleTaskImportService { private FutureCache futureCache; @Autowired - private ThreadPoolTaskExecutor scheduleImportExecutor; + private ThreadPoolTaskExecutor commonAsyncTaskExecutor; @Autowired private ScheduleTaskImporter scheduleTaskImporter; @@ -63,7 +63,7 @@ public class ScheduleTaskImportService { public String startPreviewImportTask(ScheduleTaskImportRequest request) { String previewId = statefulUuidStateIdGenerator.generateStateId("scheduleImportReview"); User user = authenticationFacade.currentUser(); - Future> future = scheduleImportExecutor.submit( + Future> future = commonAsyncTaskExecutor.submit( () -> { SecurityContextUtils.setCurrentUser(user); return scheduleTaskImporter.preview(request); @@ -98,7 +98,7 @@ public String startImportTask(ScheduleTaskImportRequest request) { String previewId = statefulUuidStateIdGenerator.generateStateId("scheduleImport"); User user = authenticationFacade.currentUser(); - Future> future = scheduleImportExecutor.submit( + Future> future = commonAsyncTaskExecutor.submit( new ScheduleTaskImportCallable(user, previewId, scheduleTaskImporter, request)); futureCache.put(previewId, future); return previewId; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateCmd.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateCmd.java new file mode 100644 index 0000000000..735d1c5fe3 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateCmd.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.schedule.export.model; + +import java.util.List; + +import com.oceanbase.odc.service.schedule.model.ScheduleType; + +import lombok.Data; + +@Data +public class ScheduleTerminateCmd { + private ScheduleType scheduleType; + private List ids; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateResult.java new file mode 100644 index 0000000000..34b8e84e39 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/export/model/ScheduleTerminateResult.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.schedule.export.model; + +import com.oceanbase.odc.service.schedule.model.ScheduleType; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ScheduleTerminateResult { + + private Boolean terminateSucceed; + + private ScheduleType scheduleType; + + private Long id; + + private String failReason; + + public static ScheduleTerminateResult ofSuccess(ScheduleType scheduleType, Long id) { + return new ScheduleTerminateResult(true, scheduleType, id, null); + } + + public static ScheduleTerminateResult ofFailed(ScheduleType scheduleType, Long id, String failReason) { + return new ScheduleTerminateResult(false, scheduleType, id, failReason); + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/util/BatchSchedulePermissionValidator.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/util/BatchSchedulePermissionValidator.java new file mode 100644 index 0000000000..8cc38c0786 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/util/BatchSchedulePermissionValidator.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.schedule.util; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.oceanbase.odc.core.shared.OrganizationIsolated; +import com.oceanbase.odc.core.shared.constant.OrganizationType; +import com.oceanbase.odc.core.shared.constant.ResourceRoleName; +import com.oceanbase.odc.core.shared.constant.ResourceType; +import com.oceanbase.odc.metadb.flow.FlowInstanceEntity; +import com.oceanbase.odc.metadb.schedule.ScheduleRepository; +import com.oceanbase.odc.service.flow.FlowInstanceService; +import com.oceanbase.odc.service.iam.HorizontalDataPermissionValidator; +import com.oceanbase.odc.service.iam.ProjectPermissionValidator; +import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; +import com.oceanbase.odc.service.schedule.model.Schedule; +import com.oceanbase.odc.service.schedule.model.ScheduleMapper; +import com.oceanbase.odc.service.schedule.model.ScheduleType; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Component +public class BatchSchedulePermissionValidator { + + @Autowired + private AuthenticationFacade authenticationFacade; + + @Autowired + private ProjectPermissionValidator projectPermissionValidator; + + @Autowired + private FlowInstanceService flowInstanceService; + + @Autowired + private HorizontalDataPermissionValidator horizontalDataPermissionValidator; + + @Autowired + private ScheduleRepository scheduleRepository; + + + public void checkScheduleIdsPermission(ScheduleType scheduleType, Collection ids) { + Set projectIds; + if (scheduleType.equals(ScheduleType.PARTITION_PLAN)) { + List flowInstanceEntities = flowInstanceService.listByIds(ids); + projectIds = + flowInstanceEntities.stream().map(FlowInstanceEntity::getProjectId).collect(Collectors.toSet()); + List flowOrganizationIsolateds = flowInstanceEntities.stream().map( + f -> new FlowOrganizationIsolated(f.getOrganizationId(), f.getId())).collect( + Collectors.toList()); + horizontalDataPermissionValidator.checkCurrentOrganization(flowOrganizationIsolateds); + } else { + List scheduleEntities = scheduleRepository.findByIdIn(ids).stream() + .map(ScheduleMapper.INSTANCE::entityToModel).collect( + Collectors.toList()); + horizontalDataPermissionValidator.checkCurrentOrganization(scheduleEntities); + projectIds = scheduleEntities.stream().map(Schedule::getProjectId).collect(Collectors.toSet()); + } + if (authenticationFacade.currentOrganization().getType().equals(OrganizationType.TEAM)) { + projectPermissionValidator.checkProjectRole(projectIds, ResourceRoleName.all()); + } + } + + + @Data + @AllArgsConstructor + private final static class FlowOrganizationIsolated implements OrganizationIsolated { + + private Long organizationId; + private Long id; + + @Override + public String resourceType() { + return ResourceType.ODC_FLOW_INSTANCE.name(); + } + + @Override + public Long organizationId() { + return organizationId; + } + + @Override + public Long id() { + return id; + } + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/logger/LogUtils.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/logger/LogUtils.java index fd6d66a8e8..4c5903dde6 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/logger/LogUtils.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/logger/LogUtils.java @@ -16,6 +16,8 @@ package com.oceanbase.odc.service.task.executor.logger; +import static com.oceanbase.odc.common.task.RouteLogCallable.LOG_PATH_PATTERN; + import java.io.File; import java.nio.charset.StandardCharsets; import java.util.LinkedList; @@ -97,4 +99,15 @@ public static String generateScheduleTaskLogFileName(@NonNull Long scheduleId, L return String.format(SCHEDULE_LOG_FILE_NAME_PATTERN, scheduleId, scheduleTaskId); } + public static String getRouteTaskLog(String logPath, String workspace, String taskId, String fileName) { + File logFile = getLogFileFromCurrentMachine(logPath, workspace, taskId, fileName); + return LogUtils.getLatestLogContent(logFile, 10000L, 1048576L); + } + + private static File getLogFileFromCurrentMachine(String logPath, String workspace, String taskId, String fileName) { + String filePath = String.format(LOG_PATH_PATTERN, logPath, workspace, taskId, fileName); + log.info("GetLogFilePath: {}", filePath); + return new File(filePath); + } + } From 8aede9c06f31dfe9f3613ef54927d878b54b8721 Mon Sep 17 00:00:00 2001 From: ungreat Date: Wed, 16 Apr 2025 11:10:41 +0800 Subject: [PATCH 2/7] fix --- .../odc/service/schedule/ScheduleService.java | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java index 547c6b3da8..a31e57d983 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java @@ -156,6 +156,7 @@ import com.oceanbase.odc.service.task.executor.logger.LogUtils; import com.oceanbase.odc.service.task.model.OdcTaskLogLevel; import com.oceanbase.odc.service.task.schedule.JobScheduler; +import com.oceanbase.odc.service.task.service.SpringTransactionManager; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -576,20 +577,12 @@ public void terminate(ScheduleEntity scheduleConfig) throws SchedulerException { scheduleRepository.updateStatusById(scheduleConfig.getId(), ScheduleStatus.TERMINATED); } + @Transactional(rollbackFor = Exception.class) public void innerTerminate(Long scheduleId) throws SchedulerException { - txTemplate.execute(status -> { - try { - ScheduleEntity schedule = nullSafeGetById(scheduleId); - JobKey jobKey = QuartzKeyGenerator.generateJobKey(schedule); - quartzJobService.deleteJob(jobKey); - scheduleRepository.updateStatusById(schedule.getId(), ScheduleStatus.TERMINATED); - return true; - } catch (Exception e) { - status.setRollbackOnly(); - throw new RuntimeException(e); - } - }); - + ScheduleEntity schedule = nullSafeGetById(scheduleId); + JobKey jobKey = QuartzKeyGenerator.generateJobKey(schedule); + quartzJobService.deleteJob(jobKey); + scheduleRepository.updateStatusById(schedule.getId(), ScheduleStatus.TERMINATED); } /** @@ -751,7 +744,7 @@ public String getTerminateLog(String terminateId) { } public List syncTerminateScheduleAndTask(ScheduleTerminateCmd cmd) { - log.info("Start to terminate schedule, type={}, scheduleIds={}", cmd.getScheduleType(),cmd.getIds()); + log.info("Start to terminate schedule, type={}, scheduleIds={}", cmd.getScheduleType(), cmd.getIds()); List results = new ArrayList<>(); if (ScheduleType.PARTITION_PLAN.equals(cmd.getScheduleType())) { processTerminatePartitionPlan(cmd, results); @@ -766,7 +759,7 @@ public List syncTerminateScheduleAndTask(ScheduleTermin String.valueOf(schedule.getId()), schedule.getType().name()); if (!latestTaskEntity.isPresent() || !latestTaskEntity.get().getStatus().isProcessing()) { - innerTerminate(schedule.getId()); + innerTerminateInTx(schedule); log.info("Schedule task stop success, scheduleId={}", schedule.getId()); results.add(ScheduleTerminateResult.ofSuccess(schedule.getType(), schedule.getId())); continue; @@ -780,7 +773,7 @@ public List syncTerminateScheduleAndTask(ScheduleTermin String.valueOf(schedule.getId()), schedule.getType().name()); if (latestTaskEntity.get().getStatus().isTerminated()) { - innerTerminate(schedule.getId()); + innerTerminateInTx(schedule); results.add( ScheduleTerminateResult.ofSuccess(schedule.getType(), schedule.getId())); log.info("Schedule task stop success, scheduleId={}", schedule.getId()); @@ -803,6 +796,16 @@ public List syncTerminateScheduleAndTask(ScheduleTermin return results; } + private void innerTerminateInTx(ScheduleEntity schedule) { + new SpringTransactionManager(txTemplate) + .doInTransactionWithoutResult(() -> { + try { + innerTerminate(schedule.getId()); + } catch (SchedulerException e) { + throw new RuntimeException(e); + } + }); + } // The partition plan uses Schedule, but it is created through flow, and the front end also displays // it through flow. From 98cc8279e6981eaa58f59450d81945709935cbeb Mon Sep 17 00:00:00 2001 From: ungreat Date: Mon, 21 Apr 2025 11:34:51 +0800 Subject: [PATCH 3/7] fix --- .../odc/server/web/controller/v2/FlowInstanceController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java index 32f2df1ba6..0e0783dfb1 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java @@ -265,7 +265,7 @@ public SuccessResponse> getBatchCancelResult(Stri return Responses.single(flowInstanceService.getBatchCancelResult(terminateId)); } - @ApiOperation(value = "getBatchCancelLog", notes = "获取批量终止结果") + @ApiOperation(value = "getBatchCancelLog", notes = "获取批量终止日志") @RequestMapping(value = "/getBatchCancelLog", method = RequestMethod.GET) public SuccessResponse getBatchCancelLog(String terminateId) { return Responses.single(flowInstanceService.getBatchCancelLog(terminateId)); From c1897fe439d33fc87816caffbd35f9ae3b9f30eb Mon Sep 17 00:00:00 2001 From: ungreat Date: Mon, 21 Apr 2025 14:17:58 +0800 Subject: [PATCH 4/7] review --- .../server/web/controller/v2/FlowInstanceController.java | 6 +++--- .../odc/server/web/controller/v2/ScheduleController.java | 6 +++--- .../java/com/oceanbase/odc/service/common/FutureCache.java | 1 + 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java index 0e0783dfb1..2507d7e99f 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java @@ -254,19 +254,19 @@ public SuccessResponse getPartitionPlan(@PathVariable Long } @ApiOperation(value = "cancelFlowInstance", notes = "批量终止流程") - @RequestMapping(value = "/batchCancelFlowInstance", method = RequestMethod.POST) + @RequestMapping(value = "/asyncCancel", method = RequestMethod.POST) public SuccessResponse batchCancelFlowInstance(@RequestBody Collection flowInstanceIds) { return Responses.single(flowInstanceService.startBatchCancelFlowInstance(flowInstanceIds)); } @ApiOperation(value = "getBatchCancelResult", notes = "获取批量终止结果") - @RequestMapping(value = "/getBatchCancelResult", method = RequestMethod.GET) + @RequestMapping(value = "/asyncResult", method = RequestMethod.GET) public SuccessResponse> getBatchCancelResult(String terminateId) { return Responses.single(flowInstanceService.getBatchCancelResult(terminateId)); } @ApiOperation(value = "getBatchCancelLog", notes = "获取批量终止日志") - @RequestMapping(value = "/getBatchCancelLog", method = RequestMethod.GET) + @RequestMapping(value = "/asyncLog", method = RequestMethod.GET) public SuccessResponse getBatchCancelLog(String terminateId) { return Responses.single(flowInstanceService.getBatchCancelLog(terminateId)); } diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java index 1f73f06a2d..e414d3d2ee 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java @@ -286,17 +286,17 @@ public SuccessResponse updateLimiterConfig(@PathVariable return Responses.single(scheduleService.updateDlmRateLimit(id, limiterConfig)); } - @RequestMapping(value = "batchTerminateScheduleAndTask", method = RequestMethod.POST) + @RequestMapping(value = "/schedules/asyncTerminate", method = RequestMethod.POST) public SuccessResponse startTerminateScheduleAndTask(@RequestBody ScheduleTerminateCmd cmd) { return Responses.ok(scheduleService.startTerminateScheduleAndTask(cmd)); } - @RequestMapping(value = "getTerminateScheduleResult", method = RequestMethod.GET) + @RequestMapping(value = "/schedules/asyncResult", method = RequestMethod.GET) public SuccessResponse> getTerminateScheduleResult(String terminateId) { return Responses.ok(scheduleService.getTerminateScheduleResult(terminateId)); } - @RequestMapping(value = "getTerminateScheduleLog", method = RequestMethod.GET) + @RequestMapping(value = "/schedules/asyncLog", method = RequestMethod.GET) public SuccessResponse getTerminateScheduleLog(String terminateId) { return Responses.ok(scheduleService.getTerminateLog(terminateId)); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java index 91503c0147..24eab9b127 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/common/FutureCache.java @@ -32,6 +32,7 @@ public class FutureCache { private final Cache> tempId2Future = Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.MINUTES) + .maximumSize(1000L) .removalListener((String key, Future future, RemovalCause cause) -> { if (future != null) { future.cancel(true); From 32c31a128c1a72bf57707dc1297ec0a4bb53c848 Mon Sep 17 00:00:00 2001 From: ungreat Date: Mon, 21 Apr 2025 14:18:47 +0800 Subject: [PATCH 5/7] review --- .../odc/server/web/controller/v2/FlowInstanceController.java | 4 ++-- .../odc/server/web/controller/v2/ScheduleController.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java index 2507d7e99f..c76d51f0be 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java @@ -260,13 +260,13 @@ public SuccessResponse batchCancelFlowInstance(@RequestBody Collection> getBatchCancelResult(String terminateId) { return Responses.single(flowInstanceService.getBatchCancelResult(terminateId)); } @ApiOperation(value = "getBatchCancelLog", notes = "获取批量终止日志") - @RequestMapping(value = "/asyncLog", method = RequestMethod.GET) + @RequestMapping(value = "/asyncCancelLog", method = RequestMethod.GET) public SuccessResponse getBatchCancelLog(String terminateId) { return Responses.single(flowInstanceService.getBatchCancelLog(terminateId)); } diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java index e414d3d2ee..c2462940a1 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java @@ -291,12 +291,12 @@ public SuccessResponse startTerminateScheduleAndTask(@RequestBody Schedu return Responses.ok(scheduleService.startTerminateScheduleAndTask(cmd)); } - @RequestMapping(value = "/schedules/asyncResult", method = RequestMethod.GET) + @RequestMapping(value = "/schedules/asyncTerminateResult", method = RequestMethod.GET) public SuccessResponse> getTerminateScheduleResult(String terminateId) { return Responses.ok(scheduleService.getTerminateScheduleResult(terminateId)); } - @RequestMapping(value = "/schedules/asyncLog", method = RequestMethod.GET) + @RequestMapping(value = "/schedules/asyncTerminateLog", method = RequestMethod.GET) public SuccessResponse getTerminateScheduleLog(String terminateId) { return Responses.ok(scheduleService.getTerminateLog(terminateId)); } From 3ec9ed94be24a052ed28b5133f8bc3fa51fe8b62 Mon Sep 17 00:00:00 2001 From: ungreat Date: Mon, 21 Apr 2025 15:09:08 +0800 Subject: [PATCH 6/7] review --- .../odc/server/web/controller/v2/FlowInstanceController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java index c76d51f0be..88af58f9a2 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java @@ -254,7 +254,7 @@ public SuccessResponse getPartitionPlan(@PathVariable Long } @ApiOperation(value = "cancelFlowInstance", notes = "批量终止流程") - @RequestMapping(value = "/asyncCancel", method = RequestMethod.POST) + @RequestMapping(value = "/asyncCancel", method = RequestMethod.POST) public SuccessResponse batchCancelFlowInstance(@RequestBody Collection flowInstanceIds) { return Responses.single(flowInstanceService.startBatchCancelFlowInstance(flowInstanceIds)); } From d400d163c826e5d36aa986cc6166ce1be107f692 Mon Sep 17 00:00:00 2001 From: ungreat Date: Tue, 6 May 2025 14:49:01 +0800 Subject: [PATCH 7/7] fix stateful route --- .../odc/server/web/controller/v2/FlowInstanceController.java | 4 ++++ .../odc/server/web/controller/v2/ScheduleController.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java index 88af58f9a2..abe2d688fc 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/FlowInstanceController.java @@ -63,6 +63,8 @@ import com.oceanbase.odc.service.partitionplan.model.PartitionPlanConfig; import com.oceanbase.odc.service.schedule.ScheduleService; import com.oceanbase.odc.service.session.model.SqlExecuteResult; +import com.oceanbase.odc.service.state.model.StateName; +import com.oceanbase.odc.service.state.model.StatefulRoute; import com.oceanbase.odc.service.task.model.OdcTaskLogLevel; import io.swagger.annotations.ApiOperation; @@ -261,12 +263,14 @@ public SuccessResponse batchCancelFlowInstance(@RequestBody Collection> getBatchCancelResult(String terminateId) { return Responses.single(flowInstanceService.getBatchCancelResult(terminateId)); } @ApiOperation(value = "getBatchCancelLog", notes = "获取批量终止日志") @RequestMapping(value = "/asyncCancelLog", method = RequestMethod.GET) + @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId") public SuccessResponse getBatchCancelLog(String terminateId) { return Responses.single(flowInstanceService.getBatchCancelLog(terminateId)); } diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java index c2462940a1..209ea7bd48 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ScheduleController.java @@ -59,6 +59,8 @@ import com.oceanbase.odc.service.schedule.model.ScheduleTaskOverview; import com.oceanbase.odc.service.schedule.model.ScheduleType; import com.oceanbase.odc.service.schedule.model.UpdateScheduleReq; +import com.oceanbase.odc.service.state.model.StateName; +import com.oceanbase.odc.service.state.model.StatefulRoute; import com.oceanbase.odc.service.task.executor.logger.LogUtils; import com.oceanbase.odc.service.task.model.OdcTaskLogLevel; @@ -292,11 +294,13 @@ public SuccessResponse startTerminateScheduleAndTask(@RequestBody Schedu } @RequestMapping(value = "/schedules/asyncTerminateResult", method = RequestMethod.GET) + @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId") public SuccessResponse> getTerminateScheduleResult(String terminateId) { return Responses.ok(scheduleService.getTerminateScheduleResult(terminateId)); } @RequestMapping(value = "/schedules/asyncTerminateLog", method = RequestMethod.GET) + @StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId") public SuccessResponse getTerminateScheduleLog(String terminateId) { return Responses.ok(scheduleService.getTerminateLog(terminateId)); }