package com.hortonworks.smm.kafka.services.lineage;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.hortonworks.smm.kafka.common.config.KafkaMetricsApiClientConfig;
import com.hortonworks.smm.kafka.common.entities.KafkaProducer;
import com.hortonworks.smm.kafka.common.entities.KafkaTopic;
import com.hortonworks.smm.kafka.common.entities.KafkaTopicPartition;
import com.hortonworks.smm.kafka.common.entities.TopicPartitionsToProducers;
import com.hortonworks.smm.kafka.services.common.errors.InvalidConfigException;
import com.hortonworks.smm.kafka.services.common.errors.InvalidKafkaMetricsApiResponse;
import com.hortonworks.smm.kafka.services.management.TopicManagementService;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/services/lineage/ProducerLineageService.class */
public class ProducerLineageService implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerLineageService.class);
    private final TopicManagementService topicManagementService;
    private final KafkaMetricsApiClient kafkaMetricsApiClient;
    private AsyncLoadingCache<PartitionLeaderHostAndTopicName, TopicLineagesResponse> cache;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hortonworks/smm/kafka/services/lineage/ProducerLineageService$PartitionLeaderHostAndTopicName.class */
    public static final class PartitionLeaderHostAndTopicName {
        private final String partitionLeaderHost;
        private final String topicName;

        public PartitionLeaderHostAndTopicName(String str, String str2) {
            this.partitionLeaderHost = str;
            this.topicName = str2;
        }

        public String getPartitionLeaderHost() {
            return this.partitionLeaderHost;
        }

        public String getTopicName() {
            return this.topicName;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof PartitionLeaderHostAndTopicName)) {
                return false;
            }
            PartitionLeaderHostAndTopicName partitionLeaderHostAndTopicName = (PartitionLeaderHostAndTopicName) obj;
            String partitionLeaderHost = getPartitionLeaderHost();
            String partitionLeaderHost2 = partitionLeaderHostAndTopicName.getPartitionLeaderHost();
            if (partitionLeaderHost == null) {
                if (partitionLeaderHost2 != null) {
                    return false;
                }
            } else if (!partitionLeaderHost.equals(partitionLeaderHost2)) {
                return false;
            }
            String topicName = getTopicName();
            String topicName2 = partitionLeaderHostAndTopicName.getTopicName();
            return topicName == null ? topicName2 == null : topicName.equals(topicName2);
        }

        public int hashCode() {
            String partitionLeaderHost = getPartitionLeaderHost();
            int hashCode = (1 * 59) + (partitionLeaderHost == null ? 43 : partitionLeaderHost.hashCode());
            String topicName = getTopicName();
            return (hashCode * 59) + (topicName == null ? 43 : topicName.hashCode());
        }

        public String toString() {
            return "ProducerLineageService.PartitionLeaderHostAndTopicName(partitionLeaderHost=" + getPartitionLeaderHost() + ", topicName=" + getTopicName() + ")";
        }
    }

    @Inject
    public ProducerLineageService(TopicManagementService topicManagementService, KafkaMetricsApiClient kafkaMetricsApiClient, @Nullable KafkaMetricsApiClientConfig kafkaMetricsApiClientConfig) {
        this.topicManagementService = topicManagementService;
        this.kafkaMetricsApiClient = kafkaMetricsApiClient;
        initCache(kafkaMetricsApiClientConfig != null ? kafkaMetricsApiClientConfig.getProperties() : ImmutableMap.of());
    }

    private void initCache(Map<String, Object> map) {
        int intValue = CollectionUtils.isEmpty(map) ? KafkaMetricsApiClientConfig.CACHE_EXPIRATION_MILLIS_DEFAULT.intValue() : Integer.parseInt(map.getOrDefault("cache_expiration_millis", KafkaMetricsApiClientConfig.CACHE_EXPIRATION_MILLIS_DEFAULT).toString());
        int intValue2 = CollectionUtils.isEmpty(map) ? KafkaMetricsApiClientConfig.CACHE_MAXIMUM_SIZE_DEFAULT.intValue() : Integer.parseInt(map.getOrDefault("cache_maximum_size", KafkaMetricsApiClientConfig.CACHE_MAXIMUM_SIZE_DEFAULT).toString());
        LOG.info("Initializing Lineage Cache with expiration: {} ms, max. entries: {}!", Integer.valueOf(intValue), Integer.valueOf(intValue2));
        this.cache = Caffeine.newBuilder().expireAfterWrite(Duration.ofMillis(intValue)).maximumSize(intValue2).recordStats().buildAsync(partitionLeaderHostAndTopicName -> {
            return this.kafkaMetricsApiClient.getLineageForTopic(partitionLeaderHostAndTopicName.getTopicName(), partitionLeaderHostAndTopicName.getPartitionLeaderHost());
        });
    }

    public Set<KafkaProducer> connectedProducers(KafkaTopicPartition kafkaTopicPartition) throws InvalidConfigException, InvalidKafkaMetricsApiResponse {
        try {
            return ((TopicLineagesResponse) this.cache.get(new PartitionLeaderHostAndTopicName(fetchTopicPartitionLeaderHost(kafkaTopicPartition), kafkaTopicPartition.getTopic().getName())).get()).getPartitionToProducers().getOrDefault(kafkaTopicPartition, Collections.emptySet());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            processAndThrowExceptions(e2.getCause());
            return Collections.emptySet();
        }
    }

    public TopicPartitionsToProducers connectedProducers(KafkaTopic kafkaTopic) throws InvalidConfigException, InvalidKafkaMetricsApiResponse {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Set<String> fetchTopicPartitionLeaderHosts = fetchTopicPartitionLeaderHosts(kafkaTopic);
        CompletableFuture[] completableFutureArr = new CompletableFuture[fetchTopicPartitionLeaderHosts.size()];
        int i = 0;
        Iterator<String> it = fetchTopicPartitionLeaderHosts.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            completableFutureArr[i2] = this.cache.get(new PartitionLeaderHostAndTopicName(it.next(), kafkaTopic.getName()));
        }
        try {
            CompletableFuture.allOf(completableFutureArr).get();
            for (CompletableFuture completableFuture : completableFutureArr) {
                ((TopicLineagesResponse) completableFuture.get()).getPartitionToProducers().forEach((kafkaTopicPartition, set) -> {
                    ((Set) concurrentHashMap.computeIfAbsent(kafkaTopicPartition, kafkaTopicPartition -> {
                        return new HashSet();
                    })).addAll(set);
                });
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            processAndThrowExceptions(e2.getCause());
        }
        return new TopicPartitionsToProducers(concurrentHashMap);
    }

    @VisibleForTesting
    public CacheStats getCacheStats() {
        return this.cache.synchronous().stats();
    }

    private Set<String> fetchTopicPartitionLeaderHosts(KafkaTopic kafkaTopic) {
        Set<String> set = (Set) this.topicManagementService.topicInfo(kafkaTopic.getName()).partitions().stream().map(topicPartitionInfo -> {
            return topicPartitionInfo.leader().host();
        }).collect(Collectors.toSet());
        if (CollectionUtils.isEmpty(set)) {
            throw new IllegalArgumentException(String.format("Could not find a single partition for Topic: %s", kafkaTopic.getName()));
        }
        return set;
    }

    private String fetchTopicPartitionLeaderHost(KafkaTopicPartition kafkaTopicPartition) {
        Optional findFirst = this.topicManagementService.topicInfo(kafkaTopicPartition.getTopic().getName()).partitions().stream().filter(topicPartitionInfo -> {
            return topicPartitionInfo.partition() == kafkaTopicPartition.getPartition();
        }).map(topicPartitionInfo2 -> {
            return topicPartitionInfo2.leader().host();
        }).findFirst();
        if (findFirst.isPresent()) {
            return (String) findFirst.get();
        }
        throw new IllegalArgumentException(String.format("The requested partition: %s in topic: %s is not present!", kafkaTopicPartition.getTopic(), Integer.valueOf(kafkaTopicPartition.getPartition())));
    }

    private void processAndThrowExceptions(Throwable th) throws InvalidConfigException, InvalidKafkaMetricsApiResponse {
        if (th instanceof InvalidConfigException) {
            throw new InvalidConfigException(th.getMessage());
        }
        if (!(th instanceof InvalidKafkaMetricsApiResponse)) {
            throw new RuntimeException(th.getMessage());
        }
        throw new InvalidKafkaMetricsApiResponse(th.getMessage());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.cache.synchronous().cleanUp();
    }
}
