package com.hortonworks.smm.kafka.services.replication;

import com.google.common.annotations.VisibleForTesting;
import com.hortonworks.smm.kafka.common.config.StreamsReplicationManagerAuthConfig;
import com.hortonworks.smm.kafka.common.config.StreamsReplicationManagerConfig;
import com.hortonworks.smm.kafka.services.SSLReverseProxy;
import com.hortonworks.smm.kafka.services.Service;
import com.hortonworks.smm.kafka.services.common.errors.InvalidConfigException;
import com.hortonworks.smm.kafka.services.metric.TimeSpan;
import com.hortonworks.smm.kafka.services.replication.core.ClusterReplicationStats;
import com.hortonworks.smm.kafka.services.replication.core.ClusterReplicationStatsEntry;
import com.hortonworks.smm.kafka.services.replication.core.TimelineMetrics;
import com.hortonworks.smm.kafka.services.replication.core.TopicReplicationStats;
import com.hortonworks.smm.kafka.services.replication.core.TopicReplicationStatsEntry;
import com.hortonworks.smm.kafka.services.replication.dtos.ClusterInfoDTO;
import com.hortonworks.smm.kafka.services.replication.dtos.ClusterReplicationStatsDTO;
import com.hortonworks.smm.kafka.services.replication.dtos.TopicReplicationStatsDTO;
import com.hortonworks.smm.kafka.services.replication.dtos.TopicTimelineMetricsDTO;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.core.GenericType;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.glassfish.jersey.uri.internal.JerseyUriBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/services/replication/StreamsReplicationManagerService.class */
public class StreamsReplicationManagerService implements Service {
    public static final Logger LOG = LoggerFactory.getLogger(StreamsReplicationManagerService.class);
    public static final DescribeClusterOptions DESCRIBE_CLUSTER_OPTIONS = new DescribeClusterOptions().timeoutMs(15000);
    public static final String CLUSTER_ID_ERROR_MSG = "Failed to fetch kafkaClusterId from SRMServicce!";
    public static final String SRM_NOT_CONFIGURED_ERROR_MSG = "Streams Replication Manager (SRM) is not configured";
    public static final String REPLICATIONS = "/replications";
    private volatile String colocatedClusterAlias;
    private StreamsReplicationManagerConfig streamsReplicationManagerConfig;
    private List<String> hosts;
    private int hostCount;
    private AtomicInteger activeHostIndex;
    protected Client client;
    protected AdminClient adminClient;

    @Inject
    public StreamsReplicationManagerService(@Nullable StreamsReplicationManagerConfig streamsReplicationManagerConfig, AdminClient adminClient) {
        this(streamsReplicationManagerConfig, streamsReplicationManagerConfig == null ? null : SSLReverseProxy.createClient(streamsReplicationManagerConfig.getProperties()), adminClient);
    }

    @VisibleForTesting
    public StreamsReplicationManagerService(StreamsReplicationManagerConfig streamsReplicationManagerConfig, Client client, AdminClient adminClient) {
        this.colocatedClusterAlias = null;
        if (streamsReplicationManagerConfig == null) {
            LOG.info("Streams Replication Manager is not configured");
            return;
        }
        Objects.requireNonNull(client, "Client must be configured");
        this.streamsReplicationManagerConfig = streamsReplicationManagerConfig;
        this.client = client;
        this.adminClient = adminClient;
        if (streamsReplicationManagerConfig.getHostPorts() == null) {
            this.hosts = Collections.singletonList(streamsReplicationManagerConfig.getHost() + ':' + streamsReplicationManagerConfig.getPort());
        } else {
            this.hosts = (List) Arrays.stream(streamsReplicationManagerConfig.getHostPorts().split(",")).filter(str -> {
                return !str.isEmpty();
            }).collect(Collectors.toList());
        }
        this.hostCount = this.hosts.size();
        if (this.hostCount == 0) {
            throw new IllegalArgumentException("streamsReplicationManagerConfig.host must contain a list of SRM Service URIs");
        }
        this.activeHostIndex = new AtomicInteger(0);
        StreamsReplicationManagerAuthConfig authConfig = streamsReplicationManagerConfig.getAuthConfig();
        if (authConfig != null && authConfig.isUseBasicAuth()) {
            client.register(HttpAuthenticationFeature.basic(authConfig.getUsername(), authConfig.getPassword()));
        }
        try {
            getOrFetchColocatedClusterAlias();
        } catch (Exception e) {
            LOG.warn("Error while initializing colocated Kafka cluster alias!", e);
        }
    }

    public boolean isStreamsReplicationManagerConfigured() {
        return (this.streamsReplicationManagerConfig == null || this.adminClient == null || this.client == null) ? false : true;
    }

    public ClusterReplicationStatsDTO clusterReplicationStatsForColocatedCluster(TimeSpan timeSpan) throws InvalidConfigException {
        validateStreamsReplicationManagerConfig();
        return clusterReplicationStats(getOrFetchColocatedClusterAlias(), timeSpan);
    }

    public ClusterReplicationStatsDTO clusterReplicationStats(TimeSpan timeSpan) throws InvalidConfigException {
        validateStreamsReplicationManagerConfig();
        return clusterReplicationStats(null, timeSpan);
    }

    public TopicReplicationStatsDTO topicReplicationStatsForColocatedCluster(String str, TimeSpan timeSpan) throws InvalidConfigException {
        validateStreamsReplicationManagerConfig();
        return topicReplicationStats(str, getOrFetchColocatedClusterAlias(), timeSpan);
    }

    public Map<String, TopicReplicationStatsEntry> topicReplicationStatsWithoutTimelineMetricsForColocatedCluster(String str, TimeSpan timeSpan) throws InvalidConfigException {
        validateStreamsReplicationManagerConfig();
        return topicReplicationStatsWithoutTimeLineMetrics(str, getOrFetchColocatedClusterAlias(), timeSpan);
    }

    public TopicTimelineMetricsDTO topicTimelineMetricsForColocatedCluster(String str, TimeSpan timeSpan) throws InvalidConfigException {
        validateStreamsReplicationManagerConfig();
        return topicTimelineMetrics(str, timeSpan);
    }

    public TopicTimelineMetricsDTO topicTimelineMetrics(String str, TimeSpan timeSpan) throws InvalidConfigException {
        validateStreamsReplicationManagerConfig();
        String orFetchColocatedClusterAlias = getOrFetchColocatedClusterAlias();
        return TopicTimelineMetricsDTO.from(timelineMetrics(buildURLForTopicTimelineMetricForColocatedCluster(orFetchColocatedClusterAlias, str, "replication-latency-ms"), timeSpan), timelineMetrics(buildURLForTopicTimelineMetricForColocatedCluster(orFetchColocatedClusterAlias, str, "byte-rate"), timeSpan), true);
    }

    public TopicTimelineMetricsDTO topicTimelineMetrics(String str, String str2, String str3, TimeSpan timeSpan) throws InvalidConfigException {
        validateStreamsReplicationManagerConfig();
        return TopicTimelineMetricsDTO.from(timelineMetrics(buildURLForTopicTimelineMetric(str, str2, str3, "replication-latency-ms"), timeSpan), timelineMetrics(buildURLForTopicTimelineMetric(str, str2, str3, "byte-rate"), timeSpan), str2.equals(getOrFetchColocatedClusterAlias()));
    }

    public Map<String, TopicReplicationStatsEntry> topicReplicationStatsWithoutTimeLineMetrics(String str, String str2, TimeSpan timeSpan) throws InvalidConfigException {
        validateStreamsReplicationManagerConfig();
        return (Map) fetchFromSrmService(String.format("/remote-topics/%s/%s", str, str2), timeSpan, new GenericType<HashMap<String, TopicReplicationStatsEntry>>() { // from class: com.hortonworks.smm.kafka.services.replication.StreamsReplicationManagerService.1
        });
    }

    public TopicReplicationStatsDTO topicReplicationStats(String str, String str2, TimeSpan timeSpan) throws InvalidConfigException {
        validateStreamsReplicationManagerConfig();
        return TopicReplicationStatsDTO.from(new TopicReplicationStats(topicReplicationStatsWithoutTimeLineMetrics(str, str2, timeSpan)), timelineMetrics(buildURLForTimelineMetric(str, str2, "byte-rate"), timeSpan), timelineMetrics(buildURLForTimelineMetric(str, str2, "replication-latency-ms"), timeSpan), str2.equals(getOrFetchColocatedClusterAlias()));
    }

    private ClusterReplicationStatsDTO clusterReplicationStats(String str, TimeSpan timeSpan) throws InvalidConfigException {
        return ClusterReplicationStatsDTO.from(new ClusterReplicationStats((Map) fetchFromSrmService(str != null ? "/replications?targets=" + str : REPLICATIONS, timeSpan, new GenericType<HashMap<String, ClusterReplicationStatsEntry>>() { // from class: com.hortonworks.smm.kafka.services.replication.StreamsReplicationManagerService.2
        })), getOrFetchColocatedClusterAlias());
    }

    protected ClusterInfoDTO clusterInfo(String str) throws InvalidConfigException {
        validateStreamsReplicationManagerConfig();
        return (ClusterInfoDTO) fetchFromSrmService(String.format("/cluster-info/%s", str), new GenericType<ClusterInfoDTO>() { // from class: com.hortonworks.smm.kafka.services.replication.StreamsReplicationManagerService.3
        });
    }

    private Map<Long, Double> timelineMetrics(String str, TimeSpan timeSpan) {
        return (Map) ((TimelineMetrics) fetchFromSrmService(str, timeSpan, TimelineMetrics.class)).metrics().stream().collect(Collectors.toMap(timelineMetricEntry -> {
            return Long.valueOf(timelineMetricEntry.range().midpoint());
        }, (v0) -> {
            return v0.avg();
        }));
    }

    private <T> T fetchFromSrmService(String str, TimeSpan timeSpan, Class<T> cls) {
        return (T) fetchFromSrmService(str, timeSpan, builder -> {
            return builder.get(cls);
        });
    }

    protected <T> T fetchFromSrmService(String str, TimeSpan timeSpan, GenericType<T> genericType) {
        return (T) fetchFromSrmService(str, timeSpan, builder -> {
            return builder.get(genericType);
        });
    }

    protected <T> T fetchFromSrmService(String str, GenericType<T> genericType) {
        return (T) fetchFromSrmService(str, builder -> {
            return builder.get(genericType);
        });
    }

    private <T> T fetchFromSrmService(String str, TimeSpan timeSpan, Function<Invocation.Builder, T> function) {
        try {
            return function.apply(createRequest(str, timeSpan));
        } catch (Throwable th) {
            rotateActiveHost();
            throw th;
        }
    }

    private <T> T fetchFromSrmService(String str, Function<Invocation.Builder, T> function) {
        try {
            return function.apply(createRequest(str));
        } catch (Throwable th) {
            rotateActiveHost();
            throw th;
        }
    }

    private Invocation.Builder createRequest(String str, TimeSpan timeSpan) {
        return this.client.target(buildURL(str, timeSpan)).request();
    }

    private Invocation.Builder createRequest(String str) {
        return this.client.target(buildURL(str)).request();
    }

    private String buildURLForTimelineMetric(String str, String str2, String str3) {
        return String.format("/cluster-metrics/%s/%s/%s", str, str2, str3);
    }

    private String buildURLForTopicTimelineMetric(String str, String str2, String str3, String str4) {
        return String.format("/topic-metrics/%s/%s/%s/%s", str, str2, str3, str4);
    }

    private String buildURLForTopicTimelineMetricForColocatedCluster(String str, String str2, String str3) {
        return String.format("/topic-metrics/%s/%s/%s", str, str2, str3);
    }

    private URI buildURL(String str, TimeSpan timeSpan) {
        JerseyUriBuilder sRMUriBuilder = getSRMUriBuilder(str);
        if (timeSpan.timePeriod() != null) {
            sRMUriBuilder.queryParam("minutes", new Object[]{Long.valueOf(timeSpan.timePeriod().minutes())});
        } else {
            sRMUriBuilder.queryParam("from", new Object[]{timeSpan.startTimeMs()});
            sRMUriBuilder.queryParam("to", new Object[]{timeSpan.endTimeMs()});
        }
        return sRMUriBuilder.build(new Object[0]);
    }

    private URI buildURL(String str) {
        return getSRMUriBuilder(str).build(new Object[0]);
    }

    private JerseyUriBuilder getSRMUriBuilder(String str) {
        JerseyUriBuilder jerseyUriBuilder = new JerseyUriBuilder();
        jerseyUriBuilder.uri(String.format("%s://%s/v2%s", this.streamsReplicationManagerConfig.getProtocol(), getActiveHost(), str));
        return jerseyUriBuilder;
    }

    private String getActiveHost() {
        return this.hostCount == 1 ? this.hosts.get(0) : this.hosts.get(this.activeHostIndex.get());
    }

    private void rotateActiveHost() {
        if (this.hostCount > 1) {
            this.activeHostIndex.getAndUpdate(i -> {
                return (i + 1) % this.hostCount;
            });
        }
    }

    private String getOrFetchColocatedClusterAlias() {
        String str = this.colocatedClusterAlias;
        if (StringUtils.isEmpty(str)) {
            try {
                ClusterInfoDTO clusterInfo = clusterInfo((String) this.adminClient.describeCluster(DESCRIBE_CLUSTER_OPTIONS).clusterId().get(15L, TimeUnit.SECONDS));
                if (clusterInfo == null) {
                    throw new RuntimeException("Failed to fetch kafkaClusterId from SRMServicce! Null response received when fetching clusterInfo from SRM API!");
                }
                String clusterAlias = clusterInfo.getClusterAlias();
                this.colocatedClusterAlias = clusterAlias;
                str = clusterAlias;
            } catch (InvalidConfigException e) {
                throw new RuntimeException(SRM_NOT_CONFIGURED_ERROR_MSG, e);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(CLUSTER_ID_ERROR_MSG, e2);
            } catch (ExecutionException e3) {
                throw new RuntimeException(CLUSTER_ID_ERROR_MSG, e3.getCause());
            } catch (TimeoutException e4) {
                throw new RuntimeException(CLUSTER_ID_ERROR_MSG, e4);
            } catch (ProcessingException e5) {
                throw new RuntimeException(CLUSTER_ID_ERROR_MSG, e5);
            }
        }
        return str;
    }

    private void validateStreamsReplicationManagerConfig() throws InvalidConfigException {
        if (!isStreamsReplicationManagerConfigured()) {
            throw new InvalidConfigException(SRM_NOT_CONFIGURED_ERROR_MSG);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (isStreamsReplicationManagerConfigured()) {
            this.client.close();
        }
        this.adminClient.close();
    }
}
