package com.hortonworks.smm.kafka.services.management.cache.topic.impl;

import com.hortonworks.smm.kafka.common.config.KafkaManagementConfig;
import com.hortonworks.smm.kafka.common.errors.NotFoundException;
import com.hortonworks.smm.kafka.common.utils.ThreadUtils;
import com.hortonworks.smm.kafka.services.management.cache.broker.BrokerManagementCache;
import com.hortonworks.smm.kafka.services.management.cache.topic.TopicManagementCache;
import com.hortonworks.smm.kafka.services.management.dtos.KafkaResourceConfig;
import com.hortonworks.smm.kafka.services.management.dtos.KafkaResourceConfigEntry;
import com.hortonworks.smm.kafka.services.management.dtos.TopicInfo;
import com.hortonworks.smm.kafka.services.management.dtos.TopicPartitionInfo;
import com.hortonworks.smm.kafka.services.management.helper.AdminClientHelper;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.kafka.clients.admin.AdminClient;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/services/management/cache/topic/impl/TopicManagementCacheImpl.class */
public class TopicManagementCacheImpl extends TopicManagementCache {
    private static final long DEFAULT_SCHEDULER_SHUTDOWN_TIMEOUT_MS = 300000;
    private static final Long NEW_TOPIC_REFLECT_WAIT_TIME_MS = 60000L;
    private final AtomicReference<Set<String>> allTopicNamesRef;
    private final AtomicReference<Map<String, TopicInfo>> allTopicInfosRef;
    private final AtomicReference<Map<String, Collection<TopicPartitionInfo>>> allTopicPartitionInfosRef;
    private final AtomicReference<Map<String, KafkaResourceConfig>> topicConfigsRef;
    private final AtomicReference<Collection<KafkaResourceConfigEntry>> defaultTopicConfigsRef;
    private final BrokerManagementCache brokerManagementCache;
    private final ScheduledExecutorService scheduler;
    private final ReentrantLock lock;

    /* loaded from: input_file:com/hortonworks/smm/kafka/services/management/cache/topic/impl/TopicManagementCacheImpl$CacheLoader.class */
    private static final class CacheLoader implements Runnable {
        private final TopicManagementCache cache;

        private CacheLoader(TopicManagementCache topicManagementCache) {
            this.cache = topicManagementCache;
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                this.cache.load();
                TopicManagementCacheImpl.LOG.debug("Time taken to load the cache = {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (Exception e) {
                TopicManagementCacheImpl.LOG.error("Error while loading the cache", e);
            }
        }
    }

    @Inject
    public TopicManagementCacheImpl(AdminClient adminClient, AdminClientHelper adminClientHelper, BrokerManagementCache brokerManagementCache, KafkaManagementConfig kafkaManagementConfig) {
        super(adminClient, adminClientHelper);
        this.allTopicNamesRef = new AtomicReference<>();
        this.allTopicInfosRef = new AtomicReference<>();
        this.allTopicPartitionInfosRef = new AtomicReference<>();
        this.topicConfigsRef = new AtomicReference<>();
        this.defaultTopicConfigsRef = new AtomicReference<>();
        this.lock = new ReentrantLock();
        this.brokerManagementCache = brokerManagementCache;
        this.scheduler = ThreadUtils.createScheduledExecutorService(1, "tmc-scheduler-%d", true);
        CacheLoader cacheLoader = new CacheLoader(this);
        cacheLoader.run();
        Long cacheRefreshIntervalMs = kafkaManagementConfig.cacheRefreshIntervalMs();
        this.scheduler.scheduleWithFixedDelay(cacheLoader, cacheRefreshIntervalMs.longValue(), cacheRefreshIntervalMs.longValue(), TimeUnit.MILLISECONDS);
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.topic.TopicManagementCache
    public Set<String> allTopicNames() {
        return this.allTopicNamesRef.get();
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.topic.TopicManagementCache
    public Map<String, TopicInfo> allTopicInfos() {
        return this.allTopicInfosRef.get();
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.topic.TopicManagementCache
    public Map<String, Collection<TopicPartitionInfo>> allTopicPartitionInfos() {
        return this.allTopicPartitionInfosRef.get();
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.topic.TopicManagementCache
    public Map<String, KafkaResourceConfig> topicConfigs() {
        return this.topicConfigsRef.get();
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.topic.TopicManagementCache
    public Collection<KafkaResourceConfigEntry> defaultTopicConfigs() {
        return this.defaultTopicConfigsRef.get();
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.topic.TopicManagementCache
    public void load() {
        this.lock.lock();
        try {
            Set<String> fetchTopicNames = fetchTopicNames();
            Map<String, TopicInfo> fetchTopicInfos = fetchTopicInfos(fetchTopicNames);
            Map<String, Collection<TopicPartitionInfo>> topicPartitionInfos = toTopicPartitionInfos(fetchTopicInfos.values());
            Map<String, KafkaResourceConfig> fetchTopicConfigs = fetchTopicConfigs(fetchTopicNames);
            Collection<KafkaResourceConfigEntry> fetchDefaultTopicConfigs = fetchDefaultTopicConfigs(this.brokerManagementCache.brokerConfigs().entrySet().iterator().next().getValue());
            this.allTopicNamesRef.set(fetchTopicNames);
            this.allTopicInfosRef.set(fetchTopicInfos);
            this.allTopicPartitionInfosRef.set(topicPartitionInfos);
            this.topicConfigsRef.set(fetchTopicConfigs);
            this.defaultTopicConfigsRef.set(fetchDefaultTopicConfigs);
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.topic.TopicManagementCache
    public void load(Collection<String> collection, boolean z) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        this.lock.lock();
        try {
            HashSet hashSet = new HashSet();
            HashMap hashMap = new HashMap(allTopicInfos());
            HashMap hashMap2 = new HashMap(topicConfigs());
            if (z) {
                hashSet.addAll(allTopicNames());
                for (String str : collection) {
                    hashSet.remove(str);
                    hashMap.remove(str);
                    hashMap2.remove(str);
                }
            } else {
                hashSet.addAll(waitTillTopicCreateCompletion(collection, NEW_TOPIC_REFLECT_WAIT_TIME_MS.longValue()));
                hashMap.putAll(fetchTopicInfos(collection));
                hashMap2.putAll(fetchTopicConfigs(collection));
            }
            this.allTopicNamesRef.set(Collections.unmodifiableSet(hashSet));
            this.allTopicInfosRef.set(Collections.unmodifiableMap(hashMap));
            this.allTopicPartitionInfosRef.set(toTopicPartitionInfos(hashMap.values()));
            this.topicConfigsRef.set(Collections.unmodifiableMap(hashMap2));
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.topic.TopicManagementCache, java.lang.AutoCloseable
    public void close() throws Exception {
        super.close();
        this.scheduler.shutdown();
        this.scheduler.awaitTermination(DEFAULT_SCHEDULER_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }

    private Set<String> waitTillTopicCreateCompletion(Collection<String> collection, long j) {
        long j2 = j;
        Set<String> fetchTopicNames = fetchTopicNames();
        while (true) {
            Set<String> set = fetchTopicNames;
            if (set.containsAll(collection)) {
                return set;
            }
            if (j2 == 0) {
                throw new NotFoundException("Not able to list all the new topics : " + collection + " within " + j + " ms");
            }
            try {
                long min = Math.min(1000L, j2);
                Thread.sleep(min);
                j2 -= min;
            } catch (InterruptedException e) {
            }
            fetchTopicNames = fetchTopicNames();
        }
    }
}
