package com.hortonworks.smm.kafka.alerts.attribute.registry.impl;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.hortonworks.smm.kafka.ResourceType;
import com.hortonworks.smm.kafka.alerts.attribute.ResourceAttributeTags;
import com.hortonworks.smm.kafka.alerts.attribute.ResourceAttributeValue;
import com.hortonworks.smm.kafka.alerts.attribute.ResourceTag;
import com.hortonworks.smm.kafka.alerts.attribute.registry.AttributeRegistry;
import com.hortonworks.smm.kafka.alerts.util.type.Type;
import com.hortonworks.smm.kafka.services.common.errors.InvalidConfigException;
import com.hortonworks.smm.kafka.services.metric.TimeSpan;
import com.hortonworks.smm.kafka.services.replication.StreamsReplicationManagerService;
import com.hortonworks.smm.kafka.services.replication.core.ReplicationStatus;
import com.hortonworks.smm.kafka.services.replication.core.TopicReplicationStatsEntry;
import com.hortonworks.smm.kafka.services.replication.dtos.ClusterReplicationStatsEntryDTO;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hortonworks/smm/kafka/alerts/attribute/registry/impl/ClusterReplicationAttributeRegistry.class */
public final class ClusterReplicationAttributeRegistry extends AttributeRegistry {
    private static final String ERROR_MSG = "Error while trying to query SRM API for metrics!";
    public static final String REPLICATION_LATENCY = "REPLICATION_LATENCY";
    public static final String REPLICATION_THROUGHPUT = "REPLICATION_THROUGHPUT";
    public static final String CHECKPOINT_LATENCY = "CHECKPOINT_LATENCY";
    public static final String REPLICATION_STATUS = "REPLICATION_STATUS";
    public static final String TARGET_CLUSTER_TAG_NAME = "targetCluster";
    private StreamsReplicationManagerService streamsReplicationManagerService;
    private List<ResourceTag> resourceTags = new ArrayList();
    private static final Logger LOG = LoggerFactory.getLogger(ClusterReplicationAttributeRegistry.class);
    private static ResourceTag rootTag = ROOT_TAG;
    public static final String CLUSTER_TAG_NAME = "cluster";
    private static ResourceTag clusterTag = new ResourceTag(ResourceType.CLUSTER, CLUSTER_TAG_NAME, rootTag, false);
    public static final String CLUSTER_TOPIC_TAG_NAME = "topic";
    private static ResourceTag clusterTopicTag = new ResourceTag(ResourceType.TOPIC, CLUSTER_TOPIC_TAG_NAME, clusterTag, false);
    private static ResourceTag targetClusterTag = new ResourceTag(ResourceType.CLUSTER, "targetCluster", rootTag, false);
    public static final String SOURCE_CLUSTER_TAG_NAME = "sourceCluster";
    private static ResourceTag sourceClusterTag = new ResourceTag(ResourceType.CLUSTER, SOURCE_CLUSTER_TAG_NAME, targetClusterTag, false);
    public static final String TOPIC_NAME_ON_TARGET_TAG_NAME = "topicNameOnTarget";
    private static ResourceTag topicNameOnTargetTag = new ResourceTag(ResourceType.TOPIC, TOPIC_NAME_ON_TARGET_TAG_NAME, sourceClusterTag, false);

    @Inject
    public ClusterReplicationAttributeRegistry(StreamsReplicationManagerService streamsReplicationManagerService) {
        this.streamsReplicationManagerService = streamsReplicationManagerService;
        this.resourceTags.add(clusterTag);
        this.resourceTags.add(clusterTopicTag);
        this.resourceTags.add(targetClusterTag);
        this.resourceTags.add(sourceClusterTag);
        this.resourceTags.add(topicNameOnTargetTag);
        createAttributeTemplate(REPLICATION_LATENCY, "15 min average replication latency in milliseconds", Type.DOUBLE, Lists.newArrayList(new ResourceTag[]{clusterTag, clusterTopicTag, sourceClusterTag, targetClusterTag, topicNameOnTargetTag}), Sets.newHashSet(new String[]{clusterTag.name(), clusterTopicTag.name(), sourceClusterTag.name(), targetClusterTag.name(), topicNameOnTargetTag.name()}), replicationLatencyFunction());
        createAttributeTemplate(REPLICATION_THROUGHPUT, "15 min average replication throughput in bytes per second", Type.DOUBLE, Lists.newArrayList(new ResourceTag[]{clusterTag, clusterTopicTag, sourceClusterTag, targetClusterTag, topicNameOnTargetTag}), Sets.newHashSet(new String[]{clusterTag.name(), clusterTopicTag.name(), sourceClusterTag.name(), targetClusterTag.name(), topicNameOnTargetTag.name()}), throughputBpsFunction());
        createAttributeTemplate(CHECKPOINT_LATENCY, "15 min average checkpoint latency in milliseconds", Type.DOUBLE, Lists.newArrayList(new ResourceTag[]{clusterTag, clusterTopicTag, sourceClusterTag, targetClusterTag, topicNameOnTargetTag}), Sets.newHashSet(new String[]{clusterTag.name(), clusterTopicTag.name(), sourceClusterTag.name(), targetClusterTag.name(), topicNameOnTargetTag.name()}), checkpointLatencyFunction());
        createAttributeTemplate(REPLICATION_STATUS, "Replication status of a replication pipeline", Type.ENUM, Lists.newArrayList(new ResourceTag[]{clusterTag, sourceClusterTag, targetClusterTag}), Sets.newHashSet(new String[]{clusterTag.name(), sourceClusterTag.name(), targetClusterTag.name()}), new HashSet(Arrays.asList(ReplicationStatus.values())), replicationStatusFunction());
    }

    @Override // com.hortonworks.smm.kafka.alerts.attribute.registry.AttributeRegistry
    public ResourceType resourceType() {
        return ResourceType.CLUSTER_REPLICATION;
    }

    @Override // com.hortonworks.smm.kafka.alerts.attribute.registry.AttributeRegistry
    public List<ResourceTag> resourceTags() {
        return this.resourceTags;
    }

    public static List<String> getV1ReplicationTagNames() {
        return ImmutableList.of(CLUSTER_TAG_NAME, CLUSTER_TOPIC_TAG_NAME);
    }

    public static List<String> getV2ReplicationTagNames() {
        return ImmutableList.of("targetCluster", SOURCE_CLUSTER_TAG_NAME, TOPIC_NAME_ON_TARGET_TAG_NAME);
    }

    public static ResourceTag getTargetClusterTag() {
        return targetClusterTag;
    }

    public static ResourceTag getSourceClusterTag() {
        return sourceClusterTag;
    }

    public static ResourceTag getTopicNameOnTargetTag() {
        return topicNameOnTargetTag;
    }

    private Function<ResourceAttributeTags, ResourceAttributeValue> replicationLatencyFunction() {
        return resourceAttributeTags -> {
            String str = resourceAttributeTags.tagsAsMap().get(CLUSTER_TAG_NAME);
            String str2 = resourceAttributeTags.tagsAsMap().get(CLUSTER_TOPIC_TAG_NAME);
            String str3 = resourceAttributeTags.tagsAsMap().get(SOURCE_CLUSTER_TAG_NAME);
            String str4 = resourceAttributeTags.tagsAsMap().get("targetCluster");
            String str5 = resourceAttributeTags.tagsAsMap().get(TOPIC_NAME_ON_TARGET_TAG_NAME);
            boolean isClusterUsedInsteadOfSourceAndTargetAndVerifyTags = isClusterUsedInsteadOfSourceAndTargetAndVerifyTags(str, str3, str4, str2, str5);
            try {
                if (str2 == null && str5 == null) {
                    LOG.debug("Querying replication latency at cluster level! cluster: {}, sourceCluster: {}, targetCluster: {}", new Object[]{str, str3, str4});
                    return isClusterUsedInsteadOfSourceAndTargetAndVerifyTags ? new ResourceAttributeValue(Double.valueOf(getMetricFromReplicationStatsForColocatedCluster(str).replicationLatencyMsMetric().avg())) : new ResourceAttributeValue(Double.valueOf(getMetricFromReplicationStats(str3, str4).replicationLatencyMsMetric().avg()));
                }
                LOG.debug("Querying replication latency for clusterTopic: {}, targetClusterTopic: {}, cluster: {}, sourceCluster: {}, targetCluster: {}", new Object[]{str2, str5, str, str3, str4});
                return isClusterUsedInsteadOfSourceAndTargetAndVerifyTags ? new ResourceAttributeValue(Double.valueOf(getTopicReplicationStatsEntryForColocatedCluster(str, str2).replicationLatencyMs().avg())) : new ResourceAttributeValue(Double.valueOf(getTopicReplicationStatsEntry(str3, str4, str5).replicationLatencyMs().avg()));
            } catch (Exception e) {
                LOG.error(e.getMessage());
                throw new RuntimeException(ERROR_MSG, e);
            }
        };
    }

    private Function<ResourceAttributeTags, ResourceAttributeValue> throughputBpsFunction() {
        return resourceAttributeTags -> {
            String str = resourceAttributeTags.tagsAsMap().get(CLUSTER_TAG_NAME);
            String str2 = resourceAttributeTags.tagsAsMap().get(CLUSTER_TOPIC_TAG_NAME);
            String str3 = resourceAttributeTags.tagsAsMap().get(SOURCE_CLUSTER_TAG_NAME);
            String str4 = resourceAttributeTags.tagsAsMap().get("targetCluster");
            String str5 = resourceAttributeTags.tagsAsMap().get(TOPIC_NAME_ON_TARGET_TAG_NAME);
            boolean isClusterUsedInsteadOfSourceAndTargetAndVerifyTags = isClusterUsedInsteadOfSourceAndTargetAndVerifyTags(str, str3, str4, str2, str5);
            try {
                if (str2 == null && str5 == null) {
                    LOG.debug("Querying throughput bps at cluster level! cluster: {}, sourceCluster: {}, targetCluster: {}", new Object[]{str, str3, str4});
                    return isClusterUsedInsteadOfSourceAndTargetAndVerifyTags ? new ResourceAttributeValue(Double.valueOf(getMetricFromReplicationStatsForColocatedCluster(str).throughputBpsMetric().avg())) : new ResourceAttributeValue(Double.valueOf(getMetricFromReplicationStats(str3, str4).throughputBpsMetric().avg()));
                }
                LOG.debug("Querying throughput for clusterTopic: {}, targetClusterTopic: {}, cluster: {}, sourceCluster: {}, targetCluster: {}", new Object[]{str2, str5, str, str3, str4});
                return isClusterUsedInsteadOfSourceAndTargetAndVerifyTags ? new ResourceAttributeValue(Double.valueOf(getTopicReplicationStatsEntryForColocatedCluster(str, str2).throughputBps().avg())) : new ResourceAttributeValue(Double.valueOf(getTopicReplicationStatsEntry(str3, str4, str5).throughputBps().avg()));
            } catch (Exception e) {
                LOG.error(e.getMessage());
                throw new RuntimeException(ERROR_MSG, e);
            }
        };
    }

    private Function<ResourceAttributeTags, ResourceAttributeValue> checkpointLatencyFunction() {
        return resourceAttributeTags -> {
            String str = resourceAttributeTags.tagsAsMap().get(CLUSTER_TAG_NAME);
            String str2 = resourceAttributeTags.tagsAsMap().get(CLUSTER_TOPIC_TAG_NAME);
            String str3 = resourceAttributeTags.tagsAsMap().get(SOURCE_CLUSTER_TAG_NAME);
            String str4 = resourceAttributeTags.tagsAsMap().get("targetCluster");
            String str5 = resourceAttributeTags.tagsAsMap().get(TOPIC_NAME_ON_TARGET_TAG_NAME);
            boolean isClusterUsedInsteadOfSourceAndTargetAndVerifyTags = isClusterUsedInsteadOfSourceAndTargetAndVerifyTags(str, str3, str4, str2, str5);
            try {
                if (str2 == null && str5 == null) {
                    LOG.debug("Querying checkpoint latency at cluster level! cluster: {}, sourceCluster: {}, targetCluster: {}", new Object[]{str, str3, str4});
                    return isClusterUsedInsteadOfSourceAndTargetAndVerifyTags ? new ResourceAttributeValue(Double.valueOf(getMetricFromReplicationStatsForColocatedCluster(str).checkpointLatencyMsMetric().avg())) : new ResourceAttributeValue(Double.valueOf(getMetricFromReplicationStats(str3, str4).checkpointLatencyMsMetric().avg()));
                }
                LOG.debug("Querying checkpoint latency for clusterTopic: {}, targetClusterTopic: {}, cluster: {}, sourceCluster: {}, targetCluster: {}", new Object[]{str2, str5, str, str3, str4});
                return isClusterUsedInsteadOfSourceAndTargetAndVerifyTags ? new ResourceAttributeValue(Double.valueOf(getTopicReplicationStatsEntryForColocatedCluster(str, str2).checkpointLatencyMs().avg())) : new ResourceAttributeValue(Double.valueOf(getTopicReplicationStatsEntry(str3, str4, str5).checkpointLatencyMs().avg()));
            } catch (Exception e) {
                LOG.error(e.getMessage());
                throw new RuntimeException(ERROR_MSG, e);
            }
        };
    }

    private Function<ResourceAttributeTags, ResourceAttributeValue> replicationStatusFunction() {
        return resourceAttributeTags -> {
            String str = resourceAttributeTags.tagsAsMap().get(CLUSTER_TAG_NAME);
            String str2 = resourceAttributeTags.tagsAsMap().get(SOURCE_CLUSTER_TAG_NAME);
            String str3 = resourceAttributeTags.tagsAsMap().get("targetCluster");
            boolean isClusterUsedInsteadOfSourceAndTargetAndVerifyTags = isClusterUsedInsteadOfSourceAndTargetAndVerifyTags(str, str2, str3);
            try {
                LOG.debug("Querying replication status at cluster level! cluster: {}, sourceCluster: {}, targetCluster: {}", new Object[]{str, str2, str3});
                return new ResourceAttributeValue((isClusterUsedInsteadOfSourceAndTargetAndVerifyTags ? getMetricFromReplicationStatsForColocatedCluster(str).status() : getMetricFromReplicationStats(str2, str3).status()).toString());
            } catch (Exception e) {
                throw new RuntimeException(ERROR_MSG, e);
            }
        };
    }

    private TopicReplicationStatsEntry getTopicReplicationStatsEntryForColocatedCluster(String str, String str2) throws InvalidConfigException {
        return (TopicReplicationStatsEntry) this.streamsReplicationManagerService.topicReplicationStatsWithoutTimelineMetricsForColocatedCluster(str, getTimeSpanForFetchingTimeSpan()).get(str2);
    }

    private TopicReplicationStatsEntry getTopicReplicationStatsEntry(String str, String str2, String str3) throws InvalidConfigException {
        return (TopicReplicationStatsEntry) this.streamsReplicationManagerService.topicReplicationStatsWithoutTimeLineMetrics(str, str2, getTimeSpanForFetchingTimeSpan()).get(str3);
    }

    private ClusterReplicationStatsEntryDTO getMetricFromReplicationStats(String str, String str2) throws InvalidConfigException {
        Optional findFirst = this.streamsReplicationManagerService.clusterReplicationStats(getTimeSpanForFetchingTimeSpan()).clusterReplicationStatsEntryMap().entrySet().stream().filter(entry -> {
            ClusterReplicationStatsEntryDTO clusterReplicationStatsEntryDTO = (ClusterReplicationStatsEntryDTO) entry.getValue();
            return clusterReplicationStatsEntryDTO.source().equals(str) && clusterReplicationStatsEntryDTO.target().equals(str2);
        }).findFirst();
        if (findFirst.isPresent()) {
            return (ClusterReplicationStatsEntryDTO) ((Map.Entry) findFirst.get()).getValue();
        }
        throw new RuntimeException("Could not find a replicationFlow in SRM API call response, where source is: " + str + ", and target is: " + str2 + '!');
    }

    private ClusterReplicationStatsEntryDTO getMetricFromReplicationStatsForColocatedCluster(String str) throws InvalidConfigException {
        Optional findFirst = this.streamsReplicationManagerService.clusterReplicationStatsForColocatedCluster(getTimeSpanForFetchingTimeSpan()).clusterReplicationStatsEntryMap().entrySet().stream().filter(entry -> {
            return ((ClusterReplicationStatsEntryDTO) entry.getValue()).source().equals(str);
        }).findFirst();
        if (findFirst.isPresent()) {
            return (ClusterReplicationStatsEntryDTO) ((Map.Entry) findFirst.get()).getValue();
        }
        throw new RuntimeException("Could not find a replicationFlow in SRM API call response, where source is: " + str + '!');
    }

    private boolean isClusterUsedInsteadOfSourceAndTargetAndVerifyTags(String str, String str2, String str3, String str4, String str5) {
        if (str4 == null || str5 == null) {
            return isClusterUsedInsteadOfSourceAndTargetAndVerifyTags(str, str2, str3);
        }
        throw new IllegalStateException("\"clusterTopicName\" and \"targetClusterTopicName\" cannot be non-null at the same time!");
    }

    private boolean isClusterUsedInsteadOfSourceAndTargetAndVerifyTags(String str, String str2, String str3) {
        if (str == null && str2 == null && str3 == null) {
            throw new IllegalStateException("All of \"cluster\" and \"sourceCluster\" and \"targetCluster\" cannot be null!");
        }
        if (str != null && (str2 != null || str3 != null)) {
            throw new IllegalStateException("If \"cluster\" tag is not null \"sourceCluster\" and \"targetCluster\" must be null!");
        }
        if ((str2 == null || str3 != null) && (str3 == null || str2 != null)) {
            return str != null;
        }
        throw new IllegalStateException("In case either \"sourceCluster\" or \"targetCluster\" is not null the other must not be null either!");
    }

    private TimeSpan getTimeSpanForFetchingTimeSpan() {
        Instant now = Instant.now();
        return new TimeSpan(Long.valueOf(now.minus(15L, (TemporalUnit) ChronoUnit.MINUTES).toEpochMilli()), Long.valueOf(now.toEpochMilli()));
    }
}
