package com.hortonworks.smm.kafka.services.lineage;

import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.collect.ImmutableMap;
import com.hortonworks.smm.kafka.common.config.KafkaAdminClientConfig;
import com.hortonworks.smm.kafka.common.config.KafkaMetricsApiClientConfig;
import com.hortonworks.smm.kafka.common.entities.KafkaTopic;
import com.hortonworks.smm.kafka.common.entities.KafkaTopicPartition;
import com.hortonworks.smm.kafka.common.utils.HashMaps;
import com.hortonworks.smm.kafka.services.common.errors.InvalidConfigException;
import com.hortonworks.smm.kafka.services.management.TopicManagementService;
import com.hortonworks.smm.kafka.services.management.dtos.BrokerNode;
import com.hortonworks.smm.kafka.services.management.dtos.TopicInfo;
import com.hortonworks.smm.kafka.services.management.dtos.TopicPartitionInfo;
import com.hortonworks.smm.kafka.services.management.helper.AdminClientHelper;
import io.dropwizard.testing.FixtureHelpers;
import io.dropwizard.util.Sets;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.Header;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;

/* loaded from: input_file:com/hortonworks/smm/kafka/services/lineage/ProducerLineageFetcherTest.class */
class ProducerLineageFetcherTest {
    private static final int PORT = 7002;
    private static final String TOPIC = "topic1";
    private static final Integer PARTITION = 0;
    private static ClientAndServer mockServer;
    private static KafkaMetricsApiClientConfig kafkaMetricsApiClientConfig;
    private static AdminClientHelper adminClientHelper;
    private static KafkaMetricsApiClient kafkaMetricsApiClient;
    private static TopicManagementService topicManagementService;

    ProducerLineageFetcherTest() {
    }

    @BeforeAll
    static void setup() {
        mockServer = new ClientAndServer(new Integer[]{Integer.valueOf(PORT)});
        kafkaMetricsApiClientConfig = new KafkaMetricsApiClientConfig(Collections.singletonList("localhost"), "http", Integer.valueOf(PORT), ImmutableMap.of());
        adminClientHelper = new AdminClientHelper(new KafkaAdminClientConfig("localhost:9999", HashMaps.create("request.timeout.ms", 1000)));
        kafkaMetricsApiClient = new KafkaMetricsApiClient(kafkaMetricsApiClientConfig);
        topicManagementService = (TopicManagementService) Mockito.mock(TopicManagementService.class);
    }

    @AfterAll
    static void finish() {
        mockServer.close();
    }

    @Test
    void testLineageFetcherNotConfigured() {
        KafkaMetricsApiClient kafkaMetricsApiClient2 = new KafkaMetricsApiClient((KafkaMetricsApiClientConfig) null);
        Assert.assertThrows(InvalidConfigException.class, () -> {
            kafkaMetricsApiClient2.getLineageForTopic(TOPIC, "localhost");
        });
    }

    @Test
    void testKafkaDoesNotHostTopicPartition() throws Exception {
        Mockito.when(topicManagementService.topicInfo((String) ArgumentMatchers.eq(TOPIC))).thenReturn(new TopicInfo(TOPIC, false, Collections.emptyList()));
        ProducerLineageService producerLineageService = new ProducerLineageService(topicManagementService, kafkaMetricsApiClient, kafkaMetricsApiClientConfig);
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            producerLineageService.connectedProducers(new KafkaTopic(TOPIC));
        });
        producerLineageService.close();
    }

    @Test
    void testGetLineageForTopicPartition() throws Exception {
        mockServer.when(HttpRequest.request().withPath("/api/producer-details").withQueryStringParameter("topic", new String[]{TOPIC})).respond(HttpResponse.response().withHeaders(new Header[]{new Header("Content-Type", new String[]{"application/json; charset=utf-8"}), new Header("vary", new String[]{"Accept-Encoding"})}).withBody(FixtureHelpers.fixture("mockServerResponses/producerLineage.json")));
        setupProperTopicManagementServiceResponse();
        ProducerLineageService producerLineageService = new ProducerLineageService(topicManagementService, kafkaMetricsApiClient, kafkaMetricsApiClientConfig);
        MatcherAssert.assertThat(producerLineageService.connectedProducers(new KafkaTopicPartition(new KafkaTopic(TOPIC), PARTITION.intValue())).stream().map((v0) -> {
            return v0.getClientId();
        }).collect(Collectors.toSet()), CoreMatchers.is(Sets.of("qwe", "qwe1")));
        producerLineageService.close();
    }

    @Test
    void testGetLineageForTopic() throws Exception {
        mockServer.when(HttpRequest.request().withPath("/api/producer-details").withQueryStringParameter("topic", new String[]{TOPIC})).respond(HttpResponse.response().withHeaders(new Header[]{new Header("Content-Type", new String[]{"application/json; charset=utf-8"}), new Header("vary", new String[]{"Accept-Encoding"})}).withBody(FixtureHelpers.fixture("mockServerResponses/producerLineage.json")));
        setupProperTopicManagementServiceResponse();
        ProducerLineageService producerLineageService = new ProducerLineageService(topicManagementService, kafkaMetricsApiClient, kafkaMetricsApiClientConfig);
        MatcherAssert.assertThat((Set) producerLineageService.connectedProducers(new KafkaTopic(TOPIC)).getTopicPartitionsToProducers().values().stream().flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.getClientId();
        }).collect(Collectors.toSet()), CoreMatchers.is(Sets.of("qwe", "qwe1", "qwe2", "qwe3", "qwe4")));
        producerLineageService.connectedProducers(new KafkaTopic(TOPIC));
        producerLineageService.connectedProducers(new KafkaTopic(TOPIC));
        CacheStats cacheStats = producerLineageService.getCacheStats();
        Assert.assertEquals(2L, cacheStats.hitCount());
        Assert.assertEquals(1L, cacheStats.missCount());
        producerLineageService.close();
    }

    private void setupProperTopicManagementServiceResponse() {
        Mockito.when(topicManagementService.topicInfo((String) ArgumentMatchers.eq(TOPIC))).thenReturn(new TopicInfo(TOPIC, false, Collections.singletonList(new TopicPartitionInfo(PARTITION.intValue(), new BrokerNode(0, "localhost", 1, "rack"), Collections.emptyList(), Collections.emptyList()))));
    }
}
