8000 Added validations for the cluster parameter in API by rdrck47 · Pull Request #1100 · tchiotludo/akhq · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Added validations for the cluster parameter in API #1100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/main/java/org/akhq/controllers/AclsController.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.akhq.controllers;

import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.security.annotation.Secured;
import io.swagger.v3.oas.annotations.Operation;
import org.akhq.configs.Role;
import org.akhq.models.AccessControl;
import org.akhq.modules.KafkaModule;
import org.akhq.repositories.AccessControlListRepository;
import org.apache.kafka.common.resource.ResourceType;

Expand All @@ -24,11 +27,18 @@ public class AclsController extends AbstractController {
public AclsController(AccessControlListRepository aclRepository) {
this.aclRepository = aclRepository;
}
@Inject
private KafkaModule kafkaModule;

@Operation(tags = {"acls"}, summary = "List all acls")
@Get
public List<AccessControl> list(HttpRequest<?> request, String cluster, Optional<String> search) throws ExecutionException, InterruptedException {
return aclRepository.findAll(cluster, search);
if(kafkaModule.clusterExists(cluster)) {
return aclRepository.findAll(cluster, search);
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}

@Operation(tags = {"acls"}, summary = "Get acls for a principal")
Expand All @@ -38,6 +48,11 @@ public AccessControl principal(
String principal,
Optional<ResourceType> resourceType
) throws ExecutionException, InterruptedException {
return aclRepository.findByPrincipal(cluster, principal, resourceType);
if(kafkaModule.clusterExists(cluster)) {
return aclRepository.findByPrincipal(cluster, principal, resourceType);
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}
}
14 changes: 12 additions & 2 deletions src/main/java/org/akhq/controllers/AkhqController.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
Expand All @@ -18,6 +19,7 @@
import lombok.NoArgsConstructor;
import org.akhq.configs.*;
import org.akhq.modules.HasAnyPermission;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.VersionProvider;

import jakarta.inject.Inject;
Expand Down Expand Up @@ -49,6 +51,9 @@ public class AkhqController extends AbstractController {
@Inject
private VersionProvider versionProvider;

@Inject
private KafkaModule kafkaModule;

@HasAnyPermission()
@Get("api/cluster")
@Operation(tags = {"AKHQ"}, summary = "Get all cluster for current instance")
Expand Down Expand Up @@ -167,10 +172,15 @@ public HttpResponse<?> rapidoc() {
@Get("api/{cluster}/ui-options")
@Operation(tags = {"AKHQ"}, summary = "Get ui options for cluster")
public Connection.UiOptions options(String cluster) {
return this.connections.stream().filter(conn -> cluster.equals(conn.getName()))
.map(conn -> conn.mergeOptions(this.uIOptions) )
if(kafkaModule.clusterExists(cluster)) {
return this.connections.stream().filter(conn -> cluster.equals(conn.getName()))
.map(conn -> conn.mergeOptions(this.uIOptions))
.findAny()
.orElseThrow(() -> new RuntimeException("No cluster found"));
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}

@AllArgsConstructor
Expand Down
118 changes: 91 additions & 27 deletions src/main/java/org/akhq/controllers/ConnectController.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.micronaut.context.annotation.Value;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Delete;
import io.micronaut.http.annotation.Get;
Expand All @@ -13,6 +14,7 @@
import org.akhq.configs.Role;
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
import org.akhq.modules.KafkaModule;
import org.akhq.repositories.ConnectRepository;
import org.akhq.utils.Pagination;
import org.akhq.utils.ResultPagedList;
Expand All @@ -38,35 +40,52 @@ public class ConnectController extends AbstractController {
public ConnectController(ConnectRepository connectRepository) {
this.connectRepository = connectRepository;
}
@Inject
private KafkaModule kafkaModule;

@Get
@Operation(tags = {"connect"}, summary = "List all connect definitions")
public ResultPagedList<ConnectDefinition> list(
HttpRequest<?> request, String cluster, String connectId, Optional<String> search, Optional<Integer> page)
throws IOException, RestClientException, ExecutionException, InterruptedException
{
URIBuilder uri = URIBuilder.fromURI(request.getUri());
Pagination pagination = new Pagination(pageSize, uri, page.orElse(1));

return ResultPagedList.of(this.connectRepository.getPaginatedDefinitions(cluster, connectId, pagination, search));
if(kafkaModule.clusterExists(cluster)) {
URIBuilder uri = URIBuilder.fromURI(request.getUri());
Pagination pagination = new Pagination(pageSize, uri, page.orElse(1));

return ResultPagedList.of(this.connectRepository.getPaginatedDefinitions(cluster, connectId, pagination, search));
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}

@Get("/plugins")
@Operation(tags = {"connect"}, summary = "List all connect plugins")
public List<ConnectPlugin> pluginsList(String cluster, String connectId) {
return this.connectReposito 8000 ry.getPlugins(cluster, connectId);
if(kafkaModule.clusterExists(cluster)) {
return this.connectRepository.getPlugins(cluster, connectId);
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}

@Get("/plugins/{type}")
@Operation(tags = {"connect"}, summary = "Retrieve a connect plugin")
public ConnectPlugin plugins(String cluster, String connectId, String type) {
List<ConnectPlugin> plugins = this.connectRepository.getPlugins(cluster, connectId);

return plugins
.stream()
.filter(connectPlugin -> connectPlugin.getClassName().equals(type))
.findAny()
.orElseThrow();
if(kafkaModule.clusterExists(cluster)) {
List<ConnectPlugin> plugins = this.connectRepository.getPlugins(cluster, connectId);

return plugins
.stream()
.filter(connectPlugin -> connectPlugin.getClassName().equals(type))
.findAny()
.orElseThrow();
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}

@Secured(Role.ROLE_CONNECT_INSERT)
Expand All @@ -78,34 +97,58 @@ public ConnectDefinition create(
String name,
Map<String, String> configs
) {
return this.connectRepository.create(cluster, connectId, name, configs);
if(kafkaModule.clusterExists(cluster)) {
return this.connectRepository.create(cluster, connectId, name, configs);
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}

@Secured(Role.ROLE_CONNECT_DELETE)
@Delete("/{name}")
@Operation(tags = {"connect"}, summary = "Delete a connect definition")
public HttpResponse<?> delete(String cluster, String connectId, String name) {
this.connectRepository.delete(cluster, connectId, name);
if(kafkaModule.clusterExists(cluster)) {
this.connectRepository.delete(cluster, connectId, name);

return HttpResponse.noContent();
return HttpResponse.noContent();
} else {
return HttpResponse.status(HttpStatus.NOT_FOUND);
}
}

@Get("/{name}")
@Operation(tags = {"connect"}, summary = "Retrieve a connect definition")
public ConnectDefinition home(HttpRequest<?> request, String cluster, String connectId, String name) {
return this.connectRepository.getDefinition(cluster, connectId, name);
if(kafkaModule.clusterExists(cluster)) {
return this.connectRepository.getDefinition(cluster, connectId, name);
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}

@Get("/{name}/tasks")
@Operation(tags = {"connect"}, summary = "Retrieve a connect task")
public List<ConnectDefinition.TaskDefinition> tasks(HttpRequest<?> request, String cluster, String connectId, String name) {
return this.connectRepository.getDefinition(cluster, connectId, name).getTasks();
if(kafkaModule.clusterExists(cluster)) {
return this.connectRepository.getDefinition(cluster, connectId, name).getTasks();
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}

@Get("/{name}/configs")
@Operation(tags = {"connect"}, summary = "Retrieve a connect config")
public Map<String, String> configs(HttpRequest<?> request, String cluster, String connectId, String name) {
return this.connectRepository.getDefinition(cluster, connectId, name).getConfigs();
if(kafkaModule.clusterExists(cluster)) {
return this.connectRepository.getDefinition(cluster, connectId, name).getConfigs();
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}

@Secured(Role.ROLE_CONNECT_UPDATE)
Expand All @@ -117,42 +160,63 @@ public ConnectDefinition update(
String name,
Map<String, String> configs
) {
return this.connectRepository.update(cluster, connectId, name, configs);
if(kafkaModule.clusterExists(cluster)) {
return this.connectRepository.update(cluster, connectId, name, configs);
} else {
HttpResponse.status(HttpStatus.NOT_FOUND);
return null;
}
}

@Secured(Role.ROLE_CONNECT_STATE_UPDATE)
@Get("/{name}/restart")
@Operation(tags = {"connect"}, summary = "Restart a connect definition")
public HttpResponse<?> definitionRestart(String cluster, String connectId, String name) {
this.connectRepository.restart(cluster, connectId, name);
if(kafkaModule.clusterExists(cluster)) {
this.connectRepository.restart(cluster, connectId, name);

return HttpResponse.noContent();
return HttpResponse.noContent();
} else {
return HttpResponse.status(HttpStatus.NOT_FOUND);
}
}

@Secured(Role.ROLE_CONNECT_STATE_UPDATE)
@Get("/{name}/pause")
@Operation(tags = {"connect"}, summary = "Pause a connect definition")
public HttpResponse<?> definitionPause(String cluster, String connectId, String name) {
this.connectRepository.pause(cluster, connectId, name);
if(kafkaModule.clusterExists(cluster)) {
this.connectRepository.pause(cluster, connectId, name);

return HttpResponse.noContent();
return HttpResponse.noContent();
} else {
return HttpResponse.status(HttpStatus.NOT_FOUND);
}
}

@Secured(Role.ROLE_CONNECT_STATE_UPDATE)
@Get("/{name}/resume")
@Operation(tags = {"connect"}, summary = "Resume a connect definition")
public HttpResponse<?> definitionResume(String cluster, String connectId, String name) {
this.connectRepository.resume(cluster, connectId, name);
if(kafkaModule.clusterExists(cluster)) {
this.connectRepository.resume(cluster, connectId, name);

return HttpResponse.noContent();
return HttpResponse.noContent();
} else {
return HttpResponse.status(HttpStatus.NOT_FOUND);
}
}

@Secured(Role.ROLE_CONNECT_STATE_UPDATE)
@Get("/{name}/tasks/{taskId}/restart")
@Operation(tags = {"connect"}, summary = "Restart a connect task")
public HttpResponse<?> taskRestart(HttpRequest<?> request, String cluster, String connectId, String name, int taskId) {
this.connectRepository.restartTask(cluster, connectId, name, taskId);
if(kafkaModule.clusterExists(cluster)) {
this.connectRepository.restartTask(cluster, connectId, name, taskId);

return HttpResponse.noContent();
return HttpResponse.noContent();
} else {
return HttpResponse.status(HttpStatus.NOT_FOUND);
}
}
}
Loading
0