8000 Filter consumer groups regexp by alozano3 · Pull Request #628 · tchiotludo/akhq · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Filter consumer groups regexp #628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@
- User groups configuration
- Filter topics with regexp for current groups
- Ldap configuration to match AKHQ groups/roles

- Filter consumer groups with regexp for current groups

## New React UI

Since this is a major rework, the new UI can have some issues, so please [report any issue](https://github.com/tchiotludo/akhq/issues), thanks!
Expand Down Expand Up @@ -409,6 +410,7 @@ Define groups with specific roles for your users
* `roles`: Roles list for the group
* `attributes.topics-filter-regexp`: Regexp to filter topics available for current group
* `attributes.connects-filter-regexp`: Regexp to filter Connect tasks available for current group
* `attributes.consumer-groups-filter-regexp`: Regexp to filter Consumer Groups available for current group


3 defaults group are available :
Expand Down Expand Up @@ -512,6 +514,7 @@ akhq:
# Regexp to filter topic available for group
topics-filter-regexp: "test\\.reader.*"
connects-filter-regexp: "^test.*$"
consumer-groups-filter-regexp: "consumer.*"
topic-writer:
name: topic-writer # Group name
roles:
Expand All @@ -522,6 +525,7 @@ akhq:
attributes:
topics-filter-regexp: "test.*"
connects-filter-regexp: "^test.*$"
consumer-groups-filter-regexp: "consumer.*"
ldap:
groups:
- name: mathematicians
Expand Down
2 changes: 2 additions & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ akhq:
topics-filter-regexp: "test.*"
# Regexp to filter connect configs visible for group
connects-filter-regexp: "^test.*$"
# Regexp to filter consumer groups visible for group
consumer-groups-filter-regexp: "consumer.*"
topic-reader: # unique key
name: topic-reader # Other group
roles:
Expand Down
71 changes: 61 additions & 10 deletions src/main/java/org/akhq/repositories/ConsumerGroupRepository.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package org.akhq.repositories;

import io.micronaut.context.ApplicationContext;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.utils.SecurityService;
import org.akhq.configs.SecurityProperties;
import org.akhq.models.ConsumerGroup;
import org.akhq.models.Partition;
import org.akhq.modules.AbstractKafkaWrapper;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.PagedList;
import org.akhq.utils.Pagination;
import org.akhq.utils.UserGroupUtils;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.akhq.models.ConsumerGroup;
import org.akhq.models.Partition;
import org.akhq.modules.KafkaModule;
import org.akhq.modules.AbstractKafkaWrapper;
import org.akhq.utils.PagedList;
import org.akhq.utils.Pagination;

import javax.inject.Inject;
import javax.inject.Singleton;
Expand All @@ -28,6 +33,15 @@ public class ConsumerGroupRepository extends AbstractRepository {
@Inject
private KafkaModule kafkaModule;

@Inject
private ApplicationContext applicationContext;

@Inject
private UserGroupUtils userGroupUtils;

@Inject
private SecurityProperties securityProperties;

public PagedList<ConsumerGroup> list(String clusterId, Pagination pagination, Optional<String> search) throws ExecutionException, InterruptedException {
return PagedList.of(all(clusterId, search), pagination, groupsList -> this.findByName(clusterId, groupsList));
}
Expand All @@ -36,7 +50,8 @@ public List<String> all(String clusterId, Optional<String> search) throws Execut
ArrayList<String> list = new ArrayList<>();

for (ConsumerGroupListing item : kafkaWrapper.listConsumerGroups(clusterId)) {
if (isSearchMatch(search, item.groupId())) {
if (isSearchMatch(search, item.groupId()) && isMatchRegex(
getConsumerGroupFilterRegex(), item.groupId())) {
list.add(item.groupId());
}
}
Expand All @@ -47,13 +62,19 @@ public List<String> all(String clusterId, Optional<String> search) throws Execut
}

public ConsumerGroup findByName(String clusterId, String name) throws ExecutionException, InterruptedException {
Optional<ConsumerGroup> consumerGroup = this.findByName(clusterId, Collections.singletonList(name)).stream().findFirst();

Optional<ConsumerGroup> consumerGroup = Optional.empty();
if(isMatchRegex(getConsumerGroupFilterRegex(),name)) {
consumerGroup = this.findByName(clusterId, Collections.singletonList(name)).stream().findFirst();
}
return consumerGroup.orElseThrow(() -> new NoSuchElementException("Consumer Group '" + name + "' doesn't exist"));
}

public List<ConsumerGroup> findByName(String clusterId, List<String> groups) throws ExecutionException, InterruptedException {
Map<String, ConsumerGroupDescription> consumerDescriptions = kafkaWrapper.describeConsumerGroups(clusterId, groups);
Optional<List<String>> consumerGroupRegex = getConsumerGroupFilterRegex();
List<String> filteredConsumerGroups = groups.stream()
.filter(t -> isMatchRegex(consumerGroupRegex, t))
.collect(Collectors.toList());
Map<String, ConsumerGroupDescription> consumerDescriptions = kafkaWrapper.describeConsumerGroups(clusterId, filteredConsumerGroups);

Map<String, Map<TopicPartition, OffsetAndMetadata>> groupGroupsOffsets = consumerDescriptions.keySet().stream()
.map(group -> {
Expand Down Expand Up @@ -139,4 +160,34 @@ public void updateOffsets(String clusterId, String name, Map<org.akhq.models.Top

kafkaWrapper.clearConsumerGroupsOffsets();
}

private Optional<List<String>> getConsumerGroupFilterRegex() {

List<String> consumerGroupFilterRegex = new ArrayList<>();

if (applicationContext.containsBean(SecurityService.class)) {
SecurityService securityService = applicationContext.getBean(SecurityService.class);
Optional<Authentication> authentication = securityService.getAuthentication();
if (authentication.isPresent()) {
Authentication auth = authentication.get();
consumerGroupFilterRegex.addAll(getConsumerGroupFilterRegexFromAttributes(auth.getAttributes()));
}
}
// get consumer group filter regex for default groups
consumerGroupFilterRegex.addAll(getConsumerGroupFilterRegexFromAttributes(
ED4F userGroupUtils.getUserAttributes(Collections.singletonList(securityProperties.getDefaultGroup()))
));

return Optional.of(consumerGroupFilterRegex);
}

@SuppressWarnings("unchecked")
private List<String> getConsumerGroupFilterRegexFromAttributes(Map<String, Object> attributes) {
if (attributes.get("consumerGroupsFilterRegexp") != null) {
if (attributes.get("consumerGroupsFilterRegexp") instanceof List) {
return (List<String>)attributes.get("consumerGroupsFilterRegexp");
}
}
return new ArrayList<>();
}
}
1 change: 1 addition & 0 deletions src/test/java/org/akhq/KafkaTestCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class KafkaTestCluster implements Runnable, Stoppable {
public static final int TOPIC_HIDE_INTERNAL_COUNT = 11;
public static final int TOPIC_HIDE_INTERNAL_STREAM_COUNT = 9;
public static final int TOPIC_HIDE_STREAM_COUNT = 17;
public static final int CONSUMER_GROUP_COUNT = 6;

public static final String CONSUMER_STREAM_TEST = "stream-test-example";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.akhq.repositories;

import io.micronaut.context.ApplicationContext;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.authentication.DefaultAuthentication;
import io.micronaut.security.utils.DefaultSecurityService;
import io.micronaut.security.utils.SecurityService;
import lombok.extern.slf4j.Slf4j;
import org.akhq.AbstractTest;
import org.akhq.KafkaTestCluster;
import org.akhq.utils.Pagination;
import org.codehaus.httpcache4j.uri.URIBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import javax.inject.Inject;
import java.util.*;
import java.util.concurrent.ExecutionException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;

@Slf4j
public class ConsumerGroupRepositoryTest extends AbstractTest {

@Inject
@InjectMocks
protected ConsumerGroupRepository consumerGroupRepository;

@Mock
ApplicationContext applicationContext;

@BeforeEach
public void before(){
MockitoAnnotations.initMocks(this);
}

@Test
public void list() throws ExecutionException, InterruptedException {
assertEquals(KafkaTestCluster.CONSUMER_GROUP_COUNT, consumerGroupRepository.list(
KafkaTestCluster.CLUSTER_ID,
new Pagination(100, URIBuilder.empty(), 1),
Optional.empty()
).size());
}

@Test
public void listWithConsumerGroupRegex() throws ExecutionException, InterruptedException {
mockApplicationContext();
assertEquals(5, consumerGroupRepository.list(
KafkaTestCluster.CLUSTER_ID,
new Pagination(100, URIBuilder.empty(), 1),
Optional.empty()
).size());
}

@Test
public void search() throws ExecutionException, InterruptedException {
assertEquals(1, consumerGroupRepository.list(
KafkaTestCluster.CLUSTER_ID,
new Pagination(100, URIBuilder.empty(), 1),
Optional.of("consu 2")
).size());
}

@Test
public void searchWithTopicRegex() throws ExecutionException, InterruptedException {
mockApplicationContext();
assertEquals(0, consumerGroupRepository.list(
KafkaTestCluster.CLUSTER_ID,
new Pagination(100, URIBuilder.empty(), 1),
Optional.of("stream")
).size());
}

@Test
public void findByNameWithTopicRegex() throws ExecutionException, InterruptedException {
mockApplicationContext();
Assertions.assertThrows(NoSuchElementException.class, () -> {
consumerGroupRepository.findByName(KafkaTestCluster.CLUSTER_ID,"cgroup-1");
});

assertEquals(1, consumerGroupRepository.findByName(KafkaTestCluster.CLUSTER_ID, List.of("consumer-6", "cgroup-1")).size());
}

private void mockApplicationContext() {
Authentication auth = new DefaultAuthentication("test", Collections.singletonMap("consumerGroupsFilterRegexp", new ArrayList<>(Arrays.asList("consumer-.*"))));
DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class);
when(securityService.getAuthentication()).thenReturn(Optional.of(auth));
when(applicationContext.containsBean(SecurityService.class)).thenReturn(true);
when(applicationContext.getBean(SecurityService.class)).thenReturn(securityService);
}
}
0