package com.hortonworks.smm.kafka.services.management;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.hortonworks.smm.kafka.common.config.KafkaConsumerConfig;
import com.hortonworks.smm.kafka.common.utils.AdminClientUtil;
import com.hortonworks.smm.kafka.services.extension.KafkaAdminServiceTest;
import com.hortonworks.smm.kafka.services.management.dtos.BrokerNode;
import com.hortonworks.smm.kafka.services.management.dtos.KafkaResourceConfig;
import com.hortonworks.smm.kafka.services.management.dtos.KafkaResourceConfigEntry;
import com.hortonworks.smm.kafka.services.management.dtos.NewPartitions;
import com.hortonworks.smm.kafka.services.management.dtos.NewTopicPartitions;
import com.hortonworks.smm.kafka.services.management.dtos.NewTopics;
import com.hortonworks.smm.kafka.services.management.dtos.TopicPartitionInfo;
import com.hortonworks.smm.kafka.services.management.dtos.TopicPartitionInfos;
import com.hortonworks.smm.kafka.services.management.dtos.TopicSummary;
import com.hortonworks.smm.kafka.services.management.util.ManagementUtil;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DisplayName("Topic management service tests")
@KafkaAdminServiceTest(numBrokerNodes = 3)
/* loaded from: input_file:com/hortonworks/smm/kafka/services/management/TopicManagementServiceTest.class */
public class TopicManagementServiceTest {
    private static final Logger LOG = LoggerFactory.getLogger(TopicManagementServiceTest.class);
    private static final int TIME_OUT_MS = 120000;

    @DisplayName("Test fetch topic and topic info")
    @TestTemplate
    public void testTopicApis(AdminClient adminClient, TopicManagementService topicManagementService) throws Exception {
        List list = (List) IntStream.range(1, 8).boxed().map(num -> {
            return "topic-" + num;
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new NewTopic((String) it.next(), 3, (short) 2));
        }
        AdminClientUtil.createTopics(adminClient, arrayList, 120000L);
        topicManagementService.refreshCache(list, false);
        Collection<?> topicInfos = ManagementUtil.toTopicInfos((Map) adminClient.describeTopics(list).all().get());
        HashSet hashSet = new HashSet(topicManagementService.allTopicNames());
        hashSet.remove("__consumer_offsets");
        Assertions.assertTrue(Sets.newHashSet(hashSet).containsAll(list));
        Assertions.assertTrue(Sets.newHashSet(topicManagementService.topicInfos(list)).containsAll(topicInfos));
        AdminClientUtil.deleteTopics(adminClient, list, 120000L);
    }

    @DisplayName("Test fetch broker nodes")
    @TestTemplate
    public void testBrokerApis(AdminClient adminClient, BrokerManagementService brokerManagementService) throws Exception {
        brokerManagementService.syncCacheRefresh();
        Collection allBrokers = brokerManagementService.allBrokers();
        HashSet hashSet = new HashSet();
        Iterator it = ((Collection) adminClient.describeCluster().nodes().get()).iterator();
        while (it.hasNext()) {
            hashSet.add(BrokerNode.from((Node) it.next()));
        }
        Assertions.assertEquals(hashSet, new HashSet(allBrokers));
    }

    @DisplayName("Test ser/des TopicPartitionInfos pojo")
    @TestTemplate
    public void testTopicPartitionInfoPojos() throws Exception {
        HashMap hashMap = new HashMap();
        for (int i = 1; i < 3; i++) {
            ((Collection) hashMap.computeIfAbsent("topic", str -> {
                return new ArrayList();
            })).add(TopicPartitionInfo.from(new org.apache.kafka.common.TopicPartitionInfo(i, new Node(1, "foo", 6111), Collections.emptyList(), Collections.emptyList())));
        }
        TopicPartitionInfos topicPartitionInfos = new TopicPartitionInfos(hashMap);
        LOG.info("topicPartitionInfos [{}] ", topicPartitionInfos);
        ObjectMapper objectMapper = new ObjectMapper();
        String writeValueAsString = objectMapper.writeValueAsString(topicPartitionInfos);
        LOG.info("resultStr [{}] ", writeValueAsString);
        TopicPartitionInfos topicPartitionInfos2 = (TopicPartitionInfos) objectMapper.readValue(writeValueAsString, TopicPartitionInfos.class);
        LOG.info("resultObj [{}]", topicPartitionInfos2);
        Assertions.assertEquals(topicPartitionInfos, topicPartitionInfos2);
    }

    @DisplayName("Test brokers with null or NO_NODE and other values")
    @TestTemplate
    public void testTopicLeaderWithNullOrNoNode() {
        int abs = Math.abs(new Random().nextInt(10000));
        Node node = new Node(abs, "host-" + abs, abs + 1, "rack-" + abs);
        BrokerNode.refreshPool(Collections.singleton(node), node);
        Assertions.assertEquals(node, BrokerNode.from(node).toNode());
        Assertions.assertNotNull(BrokerNode.NO_NODE);
        Assertions.assertEquals(BrokerNode.NO_NODE, BrokerNode.from((Node) null));
        Assertions.assertEquals(BrokerNode.NO_NODE, BrokerNode.from(Node.noNode()));
    }

    @DisplayName("Test topic summary")
    @TestTemplate
    public void testTopicSummary(AdminClient adminClient, KafkaConsumerConfig kafkaConsumerConfig, TopicManagementService topicManagementService) throws Exception {
        AdminClientUtil.createTopic(adminClient, new NewTopic("test", 3, (short) 2), 120000L);
        topicManagementService.refreshCache(Collections.singleton("test"), false);
        Assert.assertEquals(new TopicSummary((short) 2, 3, 3, 100.0f, 0.0f, false), topicManagementService.getTopicSummary(topicManagementService.topicInfo("test")));
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaConsumerConfig.getConfig().get("bootstrap.servers"));
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        new KafkaProducer(properties).send(new ProducerRecord("test", "ads", "asd")).get();
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", kafkaConsumerConfig.getConfig().get("bootstrap.servers"));
        properties2.put("key.deserializer", StringDeserializer.class);
        properties2.put("value.deserializer", StringDeserializer.class);
        properties2.put("group.id", "test-group");
        properties2.setProperty("auto.offset.reset", "earliest");
        properties2.setProperty("enable.auto.commit", "true");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        kafkaConsumer.subscribe(Collections.singleton("test"));
        kafkaConsumer.poll(Duration.ofMinutes(1L));
        kafkaConsumer.commitSync();
        topicManagementService.syncCacheRefresh();
        Assertions.assertTrue(topicManagementService.getTopicSummary(topicManagementService.topicInfo("__consumer_offsets")).isInternal());
        AdminClientUtil.deleteTopic(adminClient, "test", 120000L);
    }

    @DisplayName("Test create topic")
    @TestTemplate
    public void testCreateTopic(AdminClient adminClient, TopicManagementService topicManagementService, ResourceConfigsService resourceConfigsService, TestInfo testInfo) throws ExecutionException, InterruptedException {
        String name = ((Method) testInfo.getTestMethod().get()).getName();
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        ArrayList<com.hortonworks.smm.kafka.services.management.dtos.NewTopic> arrayList2 = new ArrayList();
        for (int i = 0; i < 3; i++) {
            String str = name + i;
            int nextInt = ThreadLocalRandom.current().nextInt(1, 3);
            short nextInt2 = (short) ThreadLocalRandom.current().nextInt(1, 3);
            HashMap hashMap2 = new HashMap();
            int nextInt3 = ThreadLocalRandom.current().nextInt(0, 10000);
            hashMap2.put("cleanup.policy", "delete,compact");
            hashMap2.put("retention.hours", String.valueOf(nextInt3));
            hashMap2.put("delete.retention.hours", String.valueOf(nextInt3));
            hashMap.put(str, String.valueOf(nextInt3));
            arrayList.add(str);
            arrayList2.add(new com.hortonworks.smm.kafka.services.management.dtos.NewTopic(str, nextInt, nextInt2, hashMap2));
        }
        topicManagementService.createTopics(new NewTopics(arrayList2));
        topicManagementService.refreshCache(arrayList, false);
        Map map = (Map) adminClient.describeTopics(arrayList).all().get();
        for (com.hortonworks.smm.kafka.services.management.dtos.NewTopic newTopic : arrayList2) {
            String name2 = newTopic.name();
            TopicDescription topicDescription = (TopicDescription) map.get(name2);
            Assert.assertNotNull(topicDescription);
            Assertions.assertEquals(newTopic.numPartitions(), topicDescription.partitions().size());
            Assertions.assertEquals(newTopic.replicationFactor(), ((org.apache.kafka.common.TopicPartitionInfo) topicDescription.partitions().get(0)).replicas().size());
            KafkaResourceConfig kafkaResourceConfig = (KafkaResourceConfig) resourceConfigsService.topicConfigs(Collections.singleton(name2)).iterator().next();
            String str2 = (String) hashMap.get(name2);
            for (String str3 : Arrays.asList("retention.hours", "delete.retention.hours")) {
                for (KafkaResourceConfigEntry kafkaResourceConfigEntry : kafkaResourceConfig.resourceConfigs()) {
                    if (kafkaResourceConfigEntry.name().equals(str3)) {
                        Assertions.assertEquals(str2, kafkaResourceConfigEntry.value());
                    }
                }
            }
        }
        AdminClientUtil.deleteTopics(adminClient, arrayList, 120000L);
    }

    @DisplayName("Test create partitions")
    @TestTemplate
    public void testCreatePartition(AdminClient adminClient, TopicManagementService topicManagementService, TestInfo testInfo) throws ExecutionException, InterruptedException {
        String name = ((Method) testInfo.getTestMethod().get()).getName();
        AdminClientUtil.createTopic(adminClient, new NewTopic(name, 3, (short) 2), 120000L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(Lists.newArrayList(new Integer[]{0, 1}));
        arrayList.add(Lists.newArrayList(new Integer[]{2, 0}));
        HashMap hashMap = new HashMap();
        hashMap.put(name, new NewPartitions(5, arrayList));
        topicManagementService.createTopicPartitions(new NewTopicPartitions(hashMap));
        TopicDescription topicDescription = (TopicDescription) ((Map.Entry) ((Map) adminClient.describeTopics(Collections.singleton(name)).all().get()).entrySet().iterator().next()).getValue();
        HashMap hashMap2 = new HashMap();
        Iterator it = topicDescription.partitions().iterator();
        while (it.hasNext()) {
            Integer valueOf = Integer.valueOf(((org.apache.kafka.common.TopicPartitionInfo) it.next()).leader().id());
            hashMap2.put(valueOf, Integer.valueOf(((Integer) hashMap2.getOrDefault(valueOf, 0)).intValue() + 1));
        }
        Assertions.assertEquals(5, topicDescription.partitions().size());
        Assertions.assertEquals(2, ((Integer) hashMap2.get(0)).intValue());
        Assertions.assertEquals(1, ((Integer) hashMap2.get(1)).intValue());
        Assertions.assertEquals(2, ((Integer) hashMap2.get(2)).intValue());
        AdminClientUtil.deleteTopic(adminClient, name, 120000L);
    }

    @DisplayName("Test delete topic")
    @TestTemplate
    public void testDeleteTopic(AdminClient adminClient, TopicManagementService topicManagementService, TestInfo testInfo) throws ExecutionException, InterruptedException {
        String name = ((Method) testInfo.getTestMethod().get()).getName();
        Integer num = 2;
        AdminClientUtil.createTopic(adminClient, new NewTopic(name, 3, num.shortValue()), 120000L);
        Assertions.assertTrue(((Set) adminClient.listTopics().names().get()).contains(name));
        topicManagementService.deleteTopics(Collections.singletonList(name));
        Assertions.assertFalse(((Set) adminClient.listTopics().names().get()).contains(name));
    }

    @DisplayName("Test partial cache refresh")
    @TestTemplate
    public void testPartialCacheRefresh(AdminClient adminClient, TopicManagementService topicManagementService, TestInfo testInfo) {
        String name = ((Method) testInfo.getTestMethod().get()).getName();
        Assertions.assertFalse(topicManagementService.allTopicNames().contains(name));
        AdminClientUtil.createTopic(adminClient, new NewTopic(name, 1, (short) 1), 120000L);
        Assertions.assertFalse(topicManagementService.allTopicNames().contains(name));
        topicManagementService.refreshCache(Collections.singleton(name), false);
        Assertions.assertTrue(topicManagementService.allTopicNames().contains(name));
        Assertions.assertNotNull(topicManagementService.topicInfo(name));
        AdminClientUtil.deleteTopic(adminClient, name, 120000L);
        topicManagementService.refreshCache(Collections.singleton(name), true);
        Assertions.assertFalse(topicManagementService.allTopicNames().contains(name));
        try {
            Assertions.assertNull(topicManagementService.topicInfo(name));
            Assertions.fail("Should throw an exception");
        } catch (RuntimeException e) {
        }
    }

    @DisplayName("Test boundary cases for 'create' dtos to kafka entities transformation")
    @TestTemplate
    public void testCreateDTOs() {
        Assertions.assertNotNull(ManagementUtil.toKafkaNewTopics(new NewTopics((Collection) null)));
        Assertions.assertNotNull(ManagementUtil.toKafkaNewTopics(new NewTopics(new ArrayList())));
        Assertions.assertNotNull(ManagementUtil.toKafkaNewTopic(new com.hortonworks.smm.kafka.services.management.dtos.NewTopic((String) null, -1, (short) -1, (Map) null)));
        Assertions.assertNotNull(ManagementUtil.toKafkaNewTopicPartitions(new NewTopicPartitions((Map) null)));
        Assertions.assertNotNull(ManagementUtil.toKafkaNewTopicPartitions(new NewTopicPartitions(new HashMap())));
        Assertions.assertNotNull(ManagementUtil.toKafkaNewPartitions(new NewPartitions(-1, new ArrayList())));
        Assertions.assertNotNull(ManagementUtil.toKafkaNewPartitions(new NewPartitions(-1, (List) null)));
    }

    @DisplayName("Test the topic original config conversion")
    @TestTemplate
    public void testTopicOriginalConfigConversion() {
        HashMap hashMap = new HashMap();
        hashMap.put("hello", "world");
        hashMap.put("retention.hours", String.valueOf(5L));
        hashMap.put("delete.retention.hours", String.valueOf(48L));
        NewTopic kafkaNewTopic = ManagementUtil.toKafkaNewTopic(new com.hortonworks.smm.kafka.services.management.dtos.NewTopic("name", 1, (short) 1, hashMap));
        Assertions.assertEquals(3, kafkaNewTopic.configs().size());
        Assertions.assertTrue(kafkaNewTopic.configs().containsKey("hello"));
        Assertions.assertEquals(String.valueOf(18000000L), kafkaNewTopic.configs().get("retention.ms"));
        Assertions.assertEquals(String.valueOf(172800000L), kafkaNewTopic.configs().get("delete.retention.ms"));
    }
}
