package com.hortonworks.smm.kafka.services.clients;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.hortonworks.smm.kafka.common.config.KafkaAdminClientConfig;
import com.hortonworks.smm.kafka.common.config.KafkaConsumerConfig;
import com.hortonworks.smm.kafka.common.config.KafkaManagementConfig;
import com.hortonworks.smm.kafka.common.config.KafkaMetricsConfig;
import com.hortonworks.smm.kafka.services.clients.dtos.ConsumerGroupInfo;
import com.hortonworks.smm.kafka.services.clients.dtos.ConsumerInfo;
import com.hortonworks.smm.kafka.services.clients.dtos.ConsumerOffsetResetParams;
import com.hortonworks.smm.kafka.services.clients.dtos.ConsumerPartitionInfo;
import com.hortonworks.smm.kafka.services.clients.dtos.ConsumerPartitionOffsetInfo;
import com.hortonworks.smm.kafka.services.clients.dtos.PartitionAssignment;
import com.hortonworks.smm.kafka.services.management.TopicManagementService;
import com.hortonworks.smm.kafka.services.management.cache.ManagementCacheFactory;
import com.hortonworks.smm.kafka.services.management.cache.broker.impl.BrokerManagementCacheImpl;
import com.hortonworks.smm.kafka.services.management.cache.topic.impl.TopicManagementCacheImpl;
import com.hortonworks.smm.kafka.services.management.helper.AdminClientHelper;
import com.hortonworks.smm.kafka.services.metric.MockMetricsService;
import com.hortonworks.smm.kafka.services.security.AuthenticationContext;
import com.hortonworks.smm.kafka.services.security.Permission;
import com.hortonworks.smm.kafka.services.security.ResourceType;
import com.hortonworks.smm.kafka.services.security.SMMAuthorizer;
import com.hortonworks.smm.kafka.services.security.auth.SMMSecurityContext;
import com.hortonworks.smm.kafka.services.security.impl.NoopAuthorizer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hortonworks/smm/kafka/services/clients/ConsumerGroupManagementServiceTest.class */
public class ConsumerGroupManagementServiceTest {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerGroupManagementServiceTest.class);
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final long DEFAULT_WAIT_TIME_MS = 30000;
    private static final String CLIENT_ID_FORMAT = "%s-clientId-%s";

    @Rule
    public TestName testName = new TestName();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hortonworks/smm/kafka/services/clients/ConsumerGroupManagementServiceTest$ConsumerGroupExecutor.class */
    public static class ConsumerGroupExecutor implements AutoCloseable {
        private ExecutorService executor;
        private final String groupId;
        private List<ConsumerRunnable> consumers;

        public ConsumerGroupExecutor(int i, int i2, String str, String str2) {
            this(i, i2, str, str2, true);
        }

        public ConsumerGroupExecutor(int i, int i2, String str, String str2, boolean z) {
            this.consumers = new LinkedList();
            this.executor = Executors.newFixedThreadPool(i);
            this.groupId = str2;
            IntStream.range(0, i2).forEach(i3 -> {
                submit(new ConsumerRunnable(str, str2, ConsumerGroupManagementServiceTest.fetchClientId(str2, i3), z));
            });
        }

        String groupId() {
            return this.groupId;
        }

        public void submit(ConsumerRunnable consumerRunnable) {
            this.consumers.add(consumerRunnable);
            this.executor.submit(consumerRunnable);
        }

        public void waitTillMsgCount(int i, long j) throws Exception {
            long currentTimeMillis = System.currentTimeMillis();
            do {
                int i2 = 0;
                Iterator<ConsumerRunnable> it = this.consumers.iterator();
                while (it.hasNext()) {
                    i2 += it.next().getMsgsCt();
                }
                if (i2 == i) {
                    return;
                } else {
                    Thread.sleep(Math.min(100L, j));
                }
            } while (System.currentTimeMillis() - currentTimeMillis <= j);
            throw new TimeoutException("Could not receive messages " + i + " in duration " + j + " msecs");
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.consumers.forEach(consumerRunnable -> {
                consumerRunnable.shutdown();
            });
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hortonworks/smm/kafka/services/clients/ConsumerGroupManagementServiceTest$ConsumerRunnable.class */
    public static class ConsumerRunnable implements Runnable {
        private KafkaConsumer<String, String> consumer;
        private String topic;
        private boolean autoAssignPartitions;
        private AtomicInteger msgsCt;

        public ConsumerRunnable(String str, String str2, String str3, boolean z) {
            this.msgsCt = new AtomicInteger();
            this.consumer = new KafkaConsumer<>(ConsumerGroupManagementServiceTest.getConsumerConfig(str2, str3).getConfig());
            this.topic = str;
            this.autoAssignPartitions = z;
        }

        public ConsumerRunnable(String str, String str2, String str3) {
            this(str, str2, str3, true);
        }

        private void subscribe(String str) {
            if (this.autoAssignPartitions) {
                this.consumer.subscribe(Collections.singleton(str));
            } else {
                this.consumer.assign((Collection) this.consumer.partitionsFor(str).stream().map(partitionInfo -> {
                    return new TopicPartition(str, partitionInfo.partition());
                }).collect(Collectors.toList()));
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                subscribe(this.topic);
                while (true) {
                    ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    this.consumer.commitSync();
                    this.msgsCt.addAndGet(poll.count());
                }
            } catch (WakeupException e) {
                this.consumer.close();
            } catch (Throwable th) {
                this.consumer.close();
                throw th;
            }
        }

        public int getMsgsCt() {
            return this.msgsCt.get();
        }

        public void shutdown() {
            this.consumer.wakeup();
        }
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void afterClass() {
        CLUSTER.stop();
    }

    @Test
    public void testConsumerAPIsWithSinglePartitionAndSingleConsumer() throws Exception {
        String methodName = this.testName.getMethodName();
        String str = methodName + "-topic";
        CLUSTER.createTopic(str);
        produceData(str, (List) IntStream.range(0, 10).boxed().map(num -> {
            return "message-" + num;
        }).collect(Collectors.toList()));
        String str2 = methodName + "-group";
        ConsumerGroupManagementService consumerGroupManagementService = null;
        try {
            ConsumerGroupExecutor consumerGroupExecutor = new ConsumerGroupExecutor(1, 1, str, str2);
            consumerGroupManagementService = getConsumerGroupManagementService();
            waitTillStable(consumerGroupManagementService, str2, 500L);
            consumerGroupExecutor.waitTillMsgCount(10, DEFAULT_WAIT_TIME_MS);
            consumerGroupManagementService.refreshConsumerGroups();
            Assert.assertTrue(consumerGroupManagementService.consumerGroupNames().contains(str2));
            ConsumerGroupInfo consumerGroup = consumerGroupManagementService.consumerGroup(str2);
            Integer num2 = 0;
            com.hortonworks.smm.kafka.services.management.dtos.TopicPartition topicPartition = new com.hortonworks.smm.kafka.services.management.dtos.TopicPartition(str, num2.intValue());
            Assert.assertEquals(str2, consumerGroup.id());
            Assert.assertEquals("Stable", consumerGroup.state());
            Assert.assertTrue(Collections.singletonList(topicPartition).containsAll(topicPartitions(consumerGroup)));
            verifyCommittedOffsets(consumerGroup, 10L);
            verifyTotalLag(consumerGroup, 0L);
            verifyUniqueClientIds(consumerGroup, 1);
            String fetchClientId = fetchClientId(str2, 0);
            ConsumerInfo consumerInfo = consumerGroupManagementService.consumerInfo(fetchClientId);
            Assert.assertEquals(fetchClientId, consumerInfo.clientId());
            ConsumerPartitionInfo consumerPartitionInfo = (ConsumerPartitionInfo) consumerInfo.consumerPartitionInfos().get(0);
            Assert.assertEquals(0L, ((ConsumerPartitionOffsetInfo) ((Map) consumerPartitionInfo.topicPartitionOffsets().get(str)).get(num2)).lag());
            Assert.assertEquals(10L, ((ConsumerPartitionOffsetInfo) ((Map) consumerPartitionInfo.topicPartitionOffsets().get(str)).get(num2)).offset());
            consumerGroupExecutor.close();
            consumerGroupManagementService.refreshConsumerGroups();
            ConsumerGroupInfo consumerGroup2 = consumerGroupManagementService.consumerGroup(str2);
            Assert.assertEquals(str2, consumerGroup2.id());
            Assert.assertEquals("Empty", consumerGroup2.state());
            closeQuietly(consumerGroupManagementService);
        } catch (Throwable th) {
            closeQuietly(consumerGroupManagementService);
            throw th;
        }
    }

    private void closeQuietly(AutoCloseable... autoCloseableArr) {
        for (AutoCloseable autoCloseable : autoCloseableArr) {
            if (autoCloseable != null) {
                try {
                    autoCloseable.close();
                } catch (Exception e) {
                }
            }
        }
    }

    private void waitTillStable(ConsumerGroupManagementService consumerGroupManagementService, String str, long j) {
        waitTillState(consumerGroupManagementService, str, j, "Stable");
    }

    private void waitTillEmpty(ConsumerGroupManagementService consumerGroupManagementService, String str, long j) {
        waitTillState(consumerGroupManagementService, str, j, "Empty");
    }

    private void waitTillState(ConsumerGroupManagementService consumerGroupManagementService, String str, long j, String str2) {
        long currentTimeMillis = System.currentTimeMillis();
        long millis = TimeUnit.SECONDS.toMillis(60L);
        while (System.currentTimeMillis() < currentTimeMillis + millis) {
            consumerGroupManagementService.refreshConsumerGroups();
            ConsumerGroupInfo consumerGroup = consumerGroupManagementService.consumerGroup(str);
            if (Objects.nonNull(consumerGroup) && str2.equals(consumerGroup.state())) {
                return;
            } else {
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e) {
                }
            }
        }
        throw new IllegalStateException("Consumer Group : " + str + " is not [" + str2 + "] after " + millis + " ms");
    }

    private ConsumerGroupManagementService getConsumerGroupManagementService() {
        return getConsumerGroupManagementService(new NoopAuthorizer());
    }

    private ConsumerGroupManagementService getConsumerGroupManagementService(SMMAuthorizer sMMAuthorizer) {
        KafkaAdminClientConfig kafkaAdminClientConfig = getKafkaAdminClientConfig();
        AdminClient create = AdminClient.create(kafkaAdminClientConfig.getConfig());
        AdminClientHelper adminClientHelper = new AdminClientHelper(kafkaAdminClientConfig);
        KafkaManagementConfig kafkaManagementConfig = getKafkaManagementConfig();
        BrokerManagementCacheImpl brokerManagementCacheImpl = new BrokerManagementCacheImpl(create, adminClientHelper, kafkaManagementConfig);
        TopicManagementService topicManagementService = new TopicManagementService(new ManagementCacheFactory(brokerManagementCacheImpl, new TopicManagementCacheImpl(create, adminClientHelper, brokerManagementCacheImpl, kafkaManagementConfig)));
        KafkaMetricsConfig kafkaMetricsConfig = new KafkaMetricsConfig(MockMetricsService.class.getName(), 3000L, 3000L, 1800000L, 1800000L, 20, kafkaAdminClientConfig.getConfig());
        return new ConsumerGroupManagementService(kafkaMetricsConfig, new ConsumerGroupsService(create, adminClientHelper, getConsumerConfig(UUID.randomUUID().toString(), UUID.randomUUID().toString()), kafkaMetricsConfig, topicManagementService), new MockMetricsService(), create, kafkaAdminClientConfig, sMMAuthorizer);
    }

    @Test
    public void testAuthorizationWhenResettingOffsets() throws Exception {
        String methodName = this.testName.getMethodName();
        String str = methodName + "-group";
        String str2 = methodName + "-topic1";
        String str3 = methodName + "-topic2";
        SMMAuthorizer sMMAuthorizer = (SMMAuthorizer) Mockito.spy(new NoopAuthorizer());
        ConsumerGroupManagementService consumerGroupManagementService = (ConsumerGroupManagementService) Mockito.spy(getConsumerGroupManagementService(sMMAuthorizer));
        ((ConsumerGroupManagementService) Mockito.doNothing().when(consumerGroupManagementService)).executeResetOffset(ArgumentMatchers.anyList());
        ((ConsumerGroupManagementService) Mockito.doReturn(ImmutableSet.of(str2, str3)).when(consumerGroupManagementService)).queryCommittedOffsetTopics((String) ArgumentMatchers.eq(str));
        ConsumerOffsetResetParams consumerOffsetResetParams = new ConsumerOffsetResetParams(ImmutableList.of(new ConsumerOffsetResetParams.TopicPartitionsPair("*", ImmutableList.of())), OffsetResetScenario.LATEST, "");
        SMMSecurityContext sMMSecurityContext = new SMMSecurityContext(() -> {
            return "test";
        }, "http");
        consumerGroupManagementService.resetOffsets(str, consumerOffsetResetParams, sMMSecurityContext);
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(1))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.TOPIC), (String) ArgumentMatchers.eq(str2), (Permission) ArgumentMatchers.eq(Permission.READ));
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(1))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.TOPIC), (String) ArgumentMatchers.eq(str3), (Permission) ArgumentMatchers.eq(Permission.READ));
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(1))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.GROUP), (String) ArgumentMatchers.eq(str), (Permission) ArgumentMatchers.eq(Permission.READ));
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(1))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.GROUP), (String) ArgumentMatchers.eq(str), (Permission) ArgumentMatchers.eq(Permission.DESCRIBE));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(List.class);
        ((ConsumerGroupManagementService) Mockito.verify(consumerGroupManagementService, Mockito.times(1))).executeResetOffset((List) forClass.capture());
        List list = (List) forClass.getValue();
        Assertions.assertTrue(list.contains("--all-topics"));
        Assertions.assertFalse(list.contains("--topics"));
        consumerGroupManagementService.resetOffsets(str, new ConsumerOffsetResetParams(ImmutableList.of(new ConsumerOffsetResetParams.TopicPartitionsPair(str2, ImmutableList.of())), OffsetResetScenario.LATEST, ""), sMMSecurityContext);
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(2))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.TOPIC), (String) ArgumentMatchers.eq(str2), (Permission) ArgumentMatchers.eq(Permission.READ));
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(1))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.TOPIC), (String) ArgumentMatchers.eq(str3), (Permission) ArgumentMatchers.eq(Permission.READ));
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(2))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.GROUP), (String) ArgumentMatchers.eq(str), (Permission) ArgumentMatchers.eq(Permission.READ));
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(2))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.GROUP), (String) ArgumentMatchers.eq(str), (Permission) ArgumentMatchers.eq(Permission.DESCRIBE));
        ((ConsumerGroupManagementService) Mockito.verify(consumerGroupManagementService, Mockito.times(2))).executeResetOffset((List) forClass.capture());
        List list2 = (List) forClass.getValue();
        Assertions.assertFalse(list2.contains("--all-topics"));
        Assertions.assertTrue(list2.contains("--topic"));
        Assertions.assertTrue(list2.contains(str2));
        ConsumerOffsetResetParams consumerOffsetResetParams2 = new ConsumerOffsetResetParams(ImmutableList.of(new ConsumerOffsetResetParams.TopicPartitionsPair("*", ImmutableList.of())), OffsetResetScenario.LATEST, "");
        ((SMMAuthorizer) Mockito.doReturn(false).when(sMMAuthorizer)).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.TOPIC), (String) ArgumentMatchers.eq(str2), (Permission) ArgumentMatchers.eq(Permission.READ));
        consumerGroupManagementService.resetOffsets(str, consumerOffsetResetParams2, sMMSecurityContext);
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(3))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.TOPIC), (String) ArgumentMatchers.eq(str2), (Permission) ArgumentMatchers.eq(Permission.READ));
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(2))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.TOPIC), (String) ArgumentMatchers.eq(str3), (Permission) ArgumentMatchers.eq(Permission.READ));
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(3))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.GROUP), (String) ArgumentMatchers.eq(str), (Permission) ArgumentMatchers.eq(Permission.READ));
        ((SMMAuthorizer) Mockito.verify(sMMAuthorizer, Mockito.times(3))).authorize((AuthenticationContext) ArgumentMatchers.any(), (ResourceType) ArgumentMatchers.eq(ResourceType.GROUP), (String) ArgumentMatchers.eq(str), (Permission) ArgumentMatchers.eq(Permission.DESCRIBE));
        ((ConsumerGroupManagementService) Mockito.verify(consumerGroupManagementService, Mockito.times(3))).executeResetOffset((List) forClass.capture());
        List list3 = (List) forClass.getValue();
        Assertions.assertFalse(list3.contains("--all-topics"));
        Assertions.assertTrue(list3.contains("--topic"));
        Assertions.assertTrue(list3.contains(str3));
    }

    @Test
    public void testAllInvalidStateWhenResettingOffsets() throws Exception {
        testInvalidStateWhenResettingOffsets(ConsumerGroupState.UNKNOWN);
        testInvalidStateWhenResettingOffsets(ConsumerGroupState.COMPLETING_REBALANCE);
        testInvalidStateWhenResettingOffsets(ConsumerGroupState.PREPARING_REBALANCE);
        testInvalidStateWhenResettingOffsets(ConsumerGroupState.STABLE);
    }

    private void testInvalidStateWhenResettingOffsets(ConsumerGroupState consumerGroupState) throws Exception {
        String str = this.testName.getMethodName() + consumerGroupState.toString();
        String str2 = str + "-group";
        String str3 = str + "-topic";
        CLUSTER.createTopic(str3, 3, 1);
        AutoCloseable autoCloseable = (ConsumerGroupManagementService) Mockito.spy(getConsumerGroupManagementService((SMMAuthorizer) Mockito.spy(new NoopAuthorizer())));
        ((ConsumerGroupManagementService) Mockito.doNothing().when(autoCloseable)).executeResetOffset(ArgumentMatchers.anyList());
        ((ConsumerGroupManagementService) Mockito.doReturn(ImmutableSet.of(str3)).when(autoCloseable)).queryCommittedOffsetTopics((String) ArgumentMatchers.eq(str2));
        ConsumerGroupDescription consumerGroupDescription = (ConsumerGroupDescription) Mockito.mock(ConsumerGroupDescription.class);
        ((ConsumerGroupDescription) Mockito.doReturn(consumerGroupState).when(consumerGroupDescription)).state();
        ((ConsumerGroupManagementService) Mockito.doReturn(consumerGroupDescription).when(autoCloseable)).queryConsumerGroupDescription((String) ArgumentMatchers.eq(str2));
        produceData(str3, (List) IntStream.range(0, 20).boxed().map(num -> {
            return "message-" + num;
        }).collect(Collectors.toList()));
        ConsumerGroupExecutor consumerGroupExecutor = null;
        try {
            consumerGroupExecutor = new ConsumerGroupExecutor(1, 1, str3, str2, false);
            consumerGroupExecutor.waitTillMsgCount(20, DEFAULT_WAIT_TIME_MS);
            autoCloseable.refreshConsumerGroups();
            ConsumerOffsetResetParams consumerOffsetResetParams = new ConsumerOffsetResetParams(ImmutableList.of(new ConsumerOffsetResetParams.TopicPartitionsPair("*", ImmutableList.of())), OffsetResetScenario.LATEST, "");
            SMMSecurityContext sMMSecurityContext = new SMMSecurityContext(() -> {
                return "test";
            }, "http");
            Assertions.assertThrows(IllegalArgumentException.class, () -> {
                autoCloseable.resetOffsets(str2, consumerOffsetResetParams, sMMSecurityContext);
            });
            closeQuietly(consumerGroupExecutor, autoCloseable);
        } catch (Throwable th) {
            closeQuietly(consumerGroupExecutor, autoCloseable);
            throw th;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testExitPreventionWhenResettingOffsets() {
        Exit.setExitProcedure((i, str) -> {
            throw new IllegalStateException("Kafka CLI tried to exit");
        });
        ConsumerGroupManagementService consumerGroupManagementService = getConsumerGroupManagementService((SMMAuthorizer) Mockito.spy(new NoopAuthorizer()));
        ArrayList arrayList = new ArrayList();
        arrayList.add("--bootstrap-server");
        arrayList.add((String) getKafkaAdminClientConfig().getConfig().get("bootstrap.servers"));
        arrayList.add("--group");
        arrayList.add("foo");
        arrayList.add("--reset-offsets");
        consumerGroupManagementService.executeResetOffset(arrayList);
    }

    @Test
    public void testManuallyAssignedPartitionConsumerGroups() throws Exception {
        String methodName = this.testName.getMethodName();
        String str = methodName + "-topic";
        CLUSTER.createTopic(str, 3, 1);
        produceData(str, (List) IntStream.range(0, 20).boxed().map(num -> {
            return "message-" + num;
        }).collect(Collectors.toList()));
        ConsumerGroupExecutor consumerGroupExecutor = null;
        ConsumerGroupManagementService consumerGroupManagementService = null;
        String str2 = methodName + "-group";
        try {
            consumerGroupExecutor = new ConsumerGroupExecutor(1, 1, str, str2, false);
            consumerGroupManagementService = getConsumerGroupManagementService();
            consumerGroupExecutor.waitTillMsgCount(20, DEFAULT_WAIT_TIME_MS);
            consumerGroupManagementService.refreshConsumerGroups();
            Assert.assertTrue(consumerGroupManagementService.consumerGroupNames().contains(str2));
            ConsumerGroupInfo consumerGroup = consumerGroupManagementService.consumerGroup(str2);
            Assert.assertTrue(topicPartitions(consumerGroup).containsAll((List) IntStream.range(0, 3).boxed().map(num2 -> {
                return new com.hortonworks.smm.kafka.services.management.dtos.TopicPartition(str, num2.intValue());
            }).collect(Collectors.toList())));
            verifyCommittedOffsets(consumerGroup, 20L);
            verifyTotalLag(consumerGroup, 0L);
            verifyAssignmentsWithDefaults(consumerGroup, 3);
            closeQuietly(consumerGroupExecutor, consumerGroupManagementService);
        } catch (Throwable th) {
            closeQuietly(consumerGroupExecutor, consumerGroupManagementService);
            throw th;
        }
    }

    @Test
    public void testConsumerAPIsWithMultiplePartitionsAndMultipleGroups() throws Exception {
        String methodName = this.testName.getMethodName();
        String str = methodName + "-topic";
        CLUSTER.createTopic(str, 3, 1);
        produceData(str, (List) IntStream.range(0, 20).boxed().map(num -> {
            return "message-" + num;
        }).collect(Collectors.toList()));
        String str2 = methodName + "-group1";
        String str3 = methodName + "-group2";
        ConsumerGroupExecutor consumerGroupExecutor = null;
        ConsumerGroupExecutor consumerGroupExecutor2 = null;
        ConsumerGroupManagementService consumerGroupManagementService = null;
        try {
            consumerGroupExecutor = new ConsumerGroupExecutor(3, 3, str, str2);
            consumerGroupExecutor2 = new ConsumerGroupExecutor(3, 2, str, str3);
            consumerGroupManagementService = getConsumerGroupManagementService();
            waitTillStable(consumerGroupManagementService, str2, 500L);
            waitTillStable(consumerGroupManagementService, str3, 500L);
            consumerGroupExecutor.waitTillMsgCount(20, DEFAULT_WAIT_TIME_MS);
            consumerGroupExecutor2.waitTillMsgCount(20, DEFAULT_WAIT_TIME_MS);
            consumerGroupManagementService.refreshConsumerGroups();
            Assert.assertTrue(consumerGroupManagementService.consumerGroupNames().containsAll(Arrays.asList(str2, str3)));
            ConsumerGroupInfo consumerGroup = consumerGroupManagementService.consumerGroup(str2);
            ConsumerGroupInfo consumerGroup2 = consumerGroupManagementService.consumerGroup(str3);
            ArrayList arrayList = new ArrayList();
            IntStream.range(0, 3).forEach(i -> {
                arrayList.add(new com.hortonworks.smm.kafka.services.management.dtos.TopicPartition(str, i));
            });
            Assert.assertTrue(topicPartitions(consumerGroup).containsAll(arrayList));
            Assert.assertTrue(topicPartitions(consumerGroup2).containsAll(arrayList));
            verifyTotalLag(consumerGroup, 0L);
            verifyTotalLag(consumerGroup2, 0L);
            verifyCommittedOffsets(consumerGroup, 20L);
            verifyCommittedOffsets(consumerGroup2, 20L);
            verifyUniqueClientIds(consumerGroup, 3);
            verifyUniqueClientIds(consumerGroup2, 2);
            long count = consumerGroupManagementService.allConsumerInfo().stream().filter(consumerInfo -> {
                return !consumerInfo.clientId().equals("-");
            }).count();
            Assert.assertTrue(count > 0 && count <= 5);
            closeQuietly(consumerGroupExecutor, consumerGroupExecutor2, consumerGroupManagementService);
        } catch (Throwable th) {
            closeQuietly(consumerGroupExecutor, consumerGroupExecutor2, consumerGroupManagementService);
            throw th;
        }
    }

    @Test
    public void testGroupApisByState() throws Exception {
        String methodName = this.testName.getMethodName();
        String str = methodName + "-topic";
        CLUSTER.createTopic(str, 3, 1);
        produceData(str, (List) IntStream.range(0, 20).boxed().map(num -> {
            return "message-" + num;
        }).collect(Collectors.toList()));
        ConsumerGroupManagementService consumerGroupManagementService = getConsumerGroupManagementService();
        List<ConsumerGroupExecutor> list = null;
        try {
            list = createConsumerGroupExecutors(str, 2, methodName + "-stable-group-%s");
            Set set = (Set) list.stream().map(consumerGroupExecutor -> {
                return consumerGroupExecutor.groupId;
            }).collect(Collectors.toSet());
            String str2 = methodName + "-empty-group-%s";
            List<ConsumerGroupExecutor> createConsumerGroupExecutors = createConsumerGroupExecutors(str, 3, str2);
            Set set2 = (Set) createConsumerGroupExecutors.stream().map(consumerGroupExecutor2 -> {
                return consumerGroupExecutor2.groupId;
            }).collect(Collectors.toSet());
            set2.forEach(str3 -> {
                waitTillStable(consumerGroupManagementService, str3, 100L);
            });
            consumerGroupManagementService.refreshConsumerGroups();
            Set<String> fetchGroupdIds = fetchGroupdIds(consumerGroupManagementService.allConsumerGroups(), "Stable");
            Assertions.assertTrue(fetchGroupdIds.containsAll(set));
            Assertions.assertTrue(fetchGroupdIds.containsAll(set2));
            createConsumerGroupExecutors.forEach(consumerGroupExecutor3 -> {
                try {
                    consumerGroupExecutor3.close();
                } catch (Exception e) {
                }
            });
            IntStream.range(0, 3).forEach(i -> {
                waitTillEmpty(consumerGroupManagementService, String.format(str2, Integer.valueOf(i)), 100L);
            });
            consumerGroupManagementService.refreshConsumerGroups();
            Set<String> fetchGroupdIds2 = fetchGroupdIds(consumerGroupManagementService.allConsumerGroups(), "Stable");
            Assertions.assertTrue(fetchGroupdIds2.containsAll(set));
            Assertions.assertFalse(fetchGroupdIds2.stream().anyMatch(str4 -> {
                return set2.contains(str4);
            }));
            Set<String> fetchGroupdIds3 = fetchGroupdIds(consumerGroupManagementService.allConsumerGroups(), "Empty");
            Assertions.assertTrue(fetchGroupdIds3.containsAll(set2));
            Assertions.assertFalse(fetchGroupdIds3.stream().anyMatch(str5 -> {
                return set.contains(str5);
            }));
            closeQuietly(consumerGroupManagementService);
            list.forEach(consumerGroupExecutor4 -> {
                closeQuietly(consumerGroupExecutor4);
            });
        } catch (Throwable th) {
            closeQuietly(consumerGroupManagementService);
            list.forEach(consumerGroupExecutor42 -> {
                closeQuietly(consumerGroupExecutor42);
            });
            throw th;
        }
    }

    @Test
    public void testIsActiveConsumerGroups() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        Assertions.assertFalse(ConsumerGroupFetcherTask.isActive(hashMap, 1000L));
        for (String str : Arrays.asList("a", "b", "c", "d")) {
            for (int i = 0; i < 10; i++) {
                ((Map) hashMap.computeIfAbsent(str, str2 -> {
                    return new HashMap();
                })).computeIfAbsent(Integer.valueOf(i), num -> {
                    return new PartitionAssignment(0L, 0L, 0L, "", "", "", Long.valueOf(currentTimeMillis));
                });
            }
        }
        Assertions.assertTrue(ConsumerGroupFetcherTask.isActive(hashMap, 1000L));
        Thread.sleep(1000L);
        Assertions.assertFalse(ConsumerGroupFetcherTask.isActive(hashMap, 1000L));
    }

    private Set<String> fetchGroupdIds(Collection<ConsumerGroupInfo> collection, String str) {
        return (Set) collection.stream().filter(consumerGroupInfo -> {
            return consumerGroupInfo.state().equals(str);
        }).map(consumerGroupInfo2 -> {
            return consumerGroupInfo2.id();
        }).collect(Collectors.toSet());
    }

    private List<ConsumerGroupExecutor> createConsumerGroupExecutors(String str, int i, String str2) {
        ArrayList arrayList = new ArrayList(i);
        IntStream.range(0, i).forEach(i2 -> {
            arrayList.add(new ConsumerGroupExecutor(1, 1, str, String.format(str2, Integer.valueOf(i2))));
        });
        return arrayList;
    }

    private Set<com.hortonworks.smm.kafka.services.management.dtos.TopicPartition> topicPartitions(ConsumerGroupInfo consumerGroupInfo) {
        HashSet hashSet = new HashSet();
        consumerGroupInfo.topicPartitionAssignments().forEach((str, map) -> {
            map.keySet().forEach(num -> {
                hashSet.add(new com.hortonworks.smm.kafka.services.management.dtos.TopicPartition(str, num.intValue()));
            });
        });
        return hashSet;
    }

    private void verifyTotalLag(ConsumerGroupInfo consumerGroupInfo, long j) {
        Assert.assertEquals(j, consumerGroupInfo.topicPartitionAssignments().values().stream().map(map -> {
            return map.values();
        }).flatMap((v0) -> {
            return v0.stream();
        }).mapToLong(partitionAssignment -> {
            return partitionAssignment.lag().longValue();
        }).sum());
    }

    private void verifyCommittedOffsets(ConsumerGroupInfo consumerGroupInfo, long j) {
        Assert.assertEquals(j, consumerGroupInfo.topicPartitionAssignments().values().stream().map(map -> {
            return map.values();
        }).flatMap((v0) -> {
            return v0.stream();
        }).mapToLong(partitionAssignment -> {
            return partitionAssignment.offset().longValue();
        }).sum());
    }

    private void verifyAssignmentsWithDefaults(ConsumerGroupInfo consumerGroupInfo, int i) {
        Assert.assertEquals(i, consumerGroupInfo.topicPartitionAssignments().values().stream().map(map -> {
            return map.values();
        }).flatMap((v0) -> {
            return v0.stream();
        }).filter(partitionAssignment -> {
            return "-".equals(partitionAssignment.clientId()) && "-".equals(partitionAssignment.host()) && "-".equals(partitionAssignment.consumerInstanceId());
        }).count());
    }

    private void verifyUniqueClientIds(ConsumerGroupInfo consumerGroupInfo, int i) {
        LOG.info("###### Verifying unique client ids for consumerGroupInfo [{}] ", consumerGroupInfo);
        int uniqueClientIds = getUniqueClientIds(consumerGroupInfo);
        Assertions.assertTrue(uniqueClientIds > 0 && uniqueClientIds <= i);
        LOG.info("Finished verifying unique client ids for consumerGroupInfo [{}], unique clients: [{}] ", consumerGroupInfo, Integer.valueOf(uniqueClientIds));
    }

    private int getUniqueClientIds(ConsumerGroupInfo consumerGroupInfo) {
        return ((Set) consumerGroupInfo.topicPartitionAssignments().values().stream().map(map -> {
            return map.values();
        }).flatMap((v0) -> {
            return v0.stream();
        }).map(partitionAssignment -> {
            return partitionAssignment.clientId();
        }).collect(Collectors.toSet())).size();
    }

    private void verifyConsumerPartitionsForGroup(ConsumerInfo consumerInfo, String str, int i) {
        Assert.assertEquals(i, ((List) consumerInfo.consumerPartitionInfos().stream().filter(consumerPartitionInfo -> {
            return consumerPartitionInfo.getGroupId().equals(str);
        }).collect(Collectors.toList())).size());
    }

    @Test
    public void testConsumerAPIsWithOffsetLag() throws Exception {
        String methodName = this.testName.getMethodName();
        String str = methodName + "-topic";
        CLUSTER.createTopic(str, 1, 1);
        List<String> list = (List) IntStream.range(0, 20).boxed().map(num -> {
            return "message-" + num;
        }).collect(Collectors.toList());
        produceData(str, list);
        String str2 = methodName + "-group1";
        KafkaConsumer kafkaConsumer = null;
        ConsumerGroupManagementService consumerGroupManagementService = null;
        try {
            kafkaConsumer = new KafkaConsumer(getConsumerConfig(str2, "test-clientId-" + UUID.randomUUID()).getConfig());
            kafkaConsumer.subscribe(Collections.singleton(str));
            kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
            kafkaConsumer.commitSync();
            consumerGroupManagementService = getConsumerGroupManagementService();
            waitTillStable(consumerGroupManagementService, str2, 500L);
            ConsumerGroupInfo consumerGroup = consumerGroupManagementService.consumerGroup(str2);
            verifyTotalLag(consumerGroup, 0L);
            verifyCommittedOffsets(consumerGroup, 20L);
            verifyUniqueClientIds(consumerGroup, 1);
            produceData(str, list);
            consumerGroupManagementService.refreshConsumerGroups();
            ConsumerGroupInfo consumerGroup2 = consumerGroupManagementService.consumerGroup(str2);
            verifyTotalLag(consumerGroup2, 20L);
            verifyCommittedOffsets(consumerGroup2, 20L);
            closeQuietly(kafkaConsumer, consumerGroupManagementService);
        } catch (Throwable th) {
            closeQuietly(kafkaConsumer, consumerGroupManagementService);
            throw th;
        }
    }

    private void produceData(String str, List<String> list) throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(str, list, properties, CLUSTER.time);
    }

    private KafkaAdminClientConfig getKafkaAdminClientConfig() {
        return new KafkaAdminClientConfig(CLUSTER.bootstrapServers());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static KafkaConsumerConfig getConsumerConfig(String str, String str2) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("bootstrap.servers", CLUSTER.bootstrapServers());
        hashMap2.put("group.id", str);
        hashMap2.put("key.deserializer", StringDeserializer.class.getName());
        hashMap2.put("value.deserializer", StringDeserializer.class.getName());
        hashMap2.put("auto.offset.reset", "earliest");
        hashMap2.put("client.id", str2);
        hashMap2.put("smm.commit.offsets.refresh.interval.ms", 100);
        hashMap.put("poll.timeout.ms", Long.valueOf(DEFAULT_WAIT_TIME_MS));
        hashMap.put("properties", hashMap2);
        return new KafkaConsumerConfig(CLUSTER.bootstrapServers(), "", new HashMap(), hashMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String fetchClientId(String str, int i) {
        return String.format(CLIENT_ID_FORMAT, str, Integer.valueOf(i));
    }

    private KafkaManagementConfig getKafkaManagementConfig() {
        return new KafkaManagementConfig(15000L);
    }
}
