package com.hortonworks.smm.kafka.services.replication;

import com.google.common.collect.ImmutableMap;
import com.hortonworks.smm.kafka.common.config.StreamsReplicationManagerConfig;
import com.hortonworks.smm.kafka.common.utils.HashMaps;
import com.hortonworks.smm.kafka.services.common.errors.InvalidConfigException;
import com.hortonworks.smm.kafka.services.metric.TimeSpan;
import com.hortonworks.smm.kafka.services.replication.core.ClusterReplicationStats;
import com.hortonworks.smm.kafka.services.replication.core.ClusterReplicationStatsEntry;
import com.hortonworks.smm.kafka.services.replication.core.ReplicationMetric;
import com.hortonworks.smm.kafka.services.replication.core.ReplicationStatus;
import com.hortonworks.smm.kafka.services.replication.core.TimeWindow;
import com.hortonworks.smm.kafka.services.replication.core.TimelineMetricEntry;
import com.hortonworks.smm.kafka.services.replication.core.TimelineMetrics;
import com.hortonworks.smm.kafka.services.replication.core.TopicReplicationStatsEntry;
import com.hortonworks.smm.kafka.services.replication.dtos.ClusterInfoDTO;
import com.hortonworks.smm.kafka.services.replication.dtos.ClusterReplicationStatsDTO;
import com.hortonworks.smm.kafka.services.replication.dtos.ReplicationMetricValueDTO;
import com.hortonworks.smm.kafka.services.replication.dtos.TopicReplicationStatsDTO;
import com.hortonworks.smm.kafka.services.replication.dtos.TopicReplicationStatsEntryDTO;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.KafkaFuture;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:com/hortonworks/smm/kafka/services/replication/StreamsReplicationManagerServiceTest.class */
public class StreamsReplicationManagerServiceTest {
    private static final String PROTOCOL = "http";
    private static final String HOSTNAME = "localhost";
    private static final int PORT1 = 1234;
    private static final int PORT2 = 1235;
    private static final int QUERY_MINUTES = 10;
    private static final String V2_REPLICATIONS_PATH = "/v2/replications";
    private static final String V2_REPLICATIONS_QUERY_PARAMS = "targets=dummyClusterId&from=1&to=2";
    private static final String MINUTES_30 = "minutes=30";

    @Mock
    private Invocation.Builder mockRequestClusterInfo;

    @Mock
    private WebTarget mockTargetClusterInfo;

    @Mock
    private Invocation.Builder mockRequestGeneric;

    @Mock
    private WebTarget mockTargetGeneric;

    @Mock
    private Client mockClient;

    @Mock
    private AdminClient adminClient;

    @Mock
    private KafkaFuture<String> clusterIdFuture;

    @Mock
    private DescribeClusterResult describeClusterResult;
    private StreamsReplicationManagerService service;

    @Before
    public void setup() throws InterruptedException, ExecutionException, TimeoutException, URISyntaxException {
        StreamsReplicationManagerConfig streamsReplicationManagerConfig = new StreamsReplicationManagerConfig();
        streamsReplicationManagerConfig.setProtocol(PROTOCOL);
        streamsReplicationManagerConfig.setHostPorts("localhost:1234,localhost:1235");
        URI uri = new URI("http://localhost:1234/v2/cluster-info/dummyClusterId");
        Mockito.when(this.mockTargetClusterInfo.request()).thenReturn(this.mockRequestClusterInfo);
        Mockito.when(this.mockClient.target((URI) ArgumentMatchers.eq(uri))).thenReturn(this.mockTargetClusterInfo);
        Mockito.when(this.mockRequestClusterInfo.get((GenericType) ArgumentMatchers.any(GenericType.class))).thenReturn(new ClusterInfoDTO("dummyClusterId", "dummyClusterId"));
        Mockito.when(this.clusterIdFuture.get(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any())).thenReturn("dummyClusterId");
        Mockito.when(this.describeClusterResult.clusterId()).thenReturn(this.clusterIdFuture);
        Mockito.when(this.adminClient.describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(this.describeClusterResult);
        this.service = new StreamsReplicationManagerService(streamsReplicationManagerConfig, this.mockClient, this.adminClient);
    }

    @Test
    public void testTopicReplicationStats() throws URISyntaxException, InvalidConfigException {
        HashMap create = HashMaps.create("whatsupsir?", new TopicReplicationStatsEntry("whatsupsir?", "upstreamTopic", "topic", "sourceCluster", "targetCluster", 1, 0, (ReplicationMetric) null, (ReplicationMetric) null, (ReplicationMetric) null));
        TopicReplicationStatsDTO topicReplicationStatsDTO = new TopicReplicationStatsDTO(ImmutableMap.of("whatsupsir?", new TopicReplicationStatsEntryDTO("whatsupsir?", "upstreamTopic", "topic", "sourceCluster", "targetCluster", 1, 0, (ReplicationMetricValueDTO) null, (ReplicationMetricValueDTO) null, (ReplicationMetricValueDTO) null)), ImmutableMap.of(1L, Double.valueOf(0.0d)), ImmutableMap.of(1L, Double.valueOf(0.0d)), false);
        URI uri = new URI(PROTOCOL, null, HOSTNAME, PORT1, "/v2/remote-topics/sourceCluster/targetCluster", MINUTES_30, null);
        Mockito.when(this.mockTargetGeneric.request()).thenReturn(this.mockRequestGeneric);
        Mockito.when(this.mockClient.target((URI) ArgumentMatchers.eq(uri))).thenReturn(this.mockTargetGeneric);
        Mockito.when(this.mockRequestGeneric.get((GenericType) ArgumentMatchers.any(GenericType.class))).thenReturn(create);
        TimelineMetrics timelineMetrics = new TimelineMetrics();
        TimelineMetricEntry timelineMetricEntry = new TimelineMetricEntry();
        timelineMetricEntry.setAvg(0.0d);
        timelineMetricEntry.setRange(new TimeWindow(0L, 2L, 1L, ""));
        timelineMetrics.setMetrics(Collections.singletonList(timelineMetricEntry));
        URI uri2 = new URI(PROTOCOL, null, HOSTNAME, PORT1, "/v2/cluster-metrics/sourceCluster/targetCluster/replication-latency-ms", MINUTES_30, null);
        Mockito.when(this.mockTargetGeneric.request()).thenReturn(this.mockRequestGeneric);
        Mockito.when(this.mockClient.target((URI) ArgumentMatchers.eq(uri2))).thenReturn(this.mockTargetGeneric);
        Mockito.when(this.mockRequestGeneric.get(TimelineMetrics.class)).thenReturn(timelineMetrics);
        URI uri3 = new URI(PROTOCOL, null, HOSTNAME, PORT1, "/v2/cluster-metrics/sourceCluster/targetCluster/byte-rate", MINUTES_30, null);
        Mockito.when(this.mockTargetGeneric.request()).thenReturn(this.mockRequestGeneric);
        Mockito.when(this.mockClient.target((URI) ArgumentMatchers.eq(uri3))).thenReturn(this.mockTargetGeneric);
        Mockito.when(this.mockRequestGeneric.get(TimelineMetrics.class)).thenReturn(timelineMetrics);
        Assert.assertEquals(topicReplicationStatsDTO, this.service.topicReplicationStats("sourceCluster", "targetCluster", new TimeSpan(TimeSpan.TimePeriod.LAST_THIRTY_MINUTES)));
    }

    @Test
    public void testClusterReplicationStats() throws InvalidConfigException, URISyntaxException {
        ImmutableMap of = ImmutableMap.of("something", new ClusterReplicationStatsEntry("something", "sourceCluster", "targetCluster", ReplicationStatus.ACTIVE, "active because it is active", 1, 1, (ReplicationMetric) null, (ReplicationMetric) null, (ReplicationMetric) null));
        ClusterReplicationStatsDTO from = ClusterReplicationStatsDTO.from(new ClusterReplicationStats(of), "dummyClusterId");
        URI uri = new URI(PROTOCOL, null, HOSTNAME, PORT1, V2_REPLICATIONS_PATH, V2_REPLICATIONS_QUERY_PARAMS, null);
        Mockito.when(this.mockTargetGeneric.request()).thenReturn(this.mockRequestGeneric);
        Mockito.when(this.mockClient.target((URI) ArgumentMatchers.eq(uri))).thenReturn(this.mockTargetGeneric);
        Mockito.when(this.mockRequestGeneric.get((GenericType) ArgumentMatchers.any(GenericType.class))).thenReturn(of);
        Assert.assertEquals(from, this.service.clusterReplicationStatsForColocatedCluster(getTimeSpan()));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(URI.class);
        ((Client) Mockito.verify(this.mockClient, Mockito.times(2))).target((URI) forClass.capture());
        List allValues = forClass.getAllValues();
        Assert.assertEquals(2L, allValues.size());
        Assert.assertEquals(PROTOCOL, ((URI) allValues.get(1)).getScheme());
        Assert.assertEquals(HOSTNAME, ((URI) allValues.get(1)).getHost());
        Assert.assertEquals(1234L, ((URI) allValues.get(1)).getPort());
        Assert.assertEquals(V2_REPLICATIONS_PATH, ((URI) allValues.get(1)).getPath());
        Assert.assertTrue(((URI) allValues.get(1)).getQuery().contains(V2_REPLICATIONS_QUERY_PARAMS));
    }

    @Test
    public void testHostRotation() throws InvalidConfigException, URISyntaxException {
        RuntimeException runtimeException = new RuntimeException();
        URI uri = new URI(PROTOCOL, null, HOSTNAME, PORT1, V2_REPLICATIONS_PATH, V2_REPLICATIONS_QUERY_PARAMS, null);
        Mockito.when(this.mockTargetGeneric.request()).thenReturn(this.mockRequestGeneric);
        Mockito.when(this.mockClient.target((URI) ArgumentMatchers.eq(uri))).thenReturn(this.mockTargetGeneric);
        URI uri2 = new URI(PROTOCOL, null, HOSTNAME, PORT2, V2_REPLICATIONS_PATH, V2_REPLICATIONS_QUERY_PARAMS, null);
        Mockito.when(this.mockTargetGeneric.request()).thenReturn(this.mockRequestGeneric);
        Mockito.when(this.mockClient.target((URI) ArgumentMatchers.eq(uri2))).thenReturn(this.mockTargetGeneric);
        Mockito.when(this.mockRequestGeneric.get((GenericType) ArgumentMatchers.any(GenericType.class))).thenThrow(new Throwable[]{runtimeException});
        try {
            this.service.clusterReplicationStatsForColocatedCluster(getTimeSpan());
        } catch (RuntimeException e) {
            Assert.assertSame(runtimeException, e);
        }
        try {
            this.service.clusterReplicationStatsForColocatedCluster(getTimeSpan());
        } catch (RuntimeException e2) {
            Assert.assertSame(runtimeException, e2);
        }
        ArgumentCaptor forClass = ArgumentCaptor.forClass(URI.class);
        ((Client) Mockito.verify(this.mockClient, Mockito.times(3))).target((URI) forClass.capture());
        List allValues = forClass.getAllValues();
        for (int i = 0; i < allValues.size(); i++) {
            if (i != 0) {
                Assert.assertEquals(PROTOCOL, ((URI) allValues.get(i)).getScheme());
                Assert.assertEquals(HOSTNAME, ((URI) allValues.get(i)).getHost());
                Assert.assertEquals(V2_REPLICATIONS_PATH, ((URI) allValues.get(i)).getPath());
                Assert.assertTrue(((URI) allValues.get(i)).getQuery().contains(V2_REPLICATIONS_QUERY_PARAMS));
            }
        }
        Assert.assertEquals(1234L, ((URI) allValues.get(1)).getPort());
        Assert.assertEquals(1235L, ((URI) allValues.get(2)).getPort());
    }

    private TimeSpan getTimeSpan() {
        return new TimeSpan(1L, 2L);
    }
}
