package com.hortonworks.smm.kafka.services.management.cache.broker.impl;

import com.hortonworks.smm.kafka.common.config.KafkaManagementConfig;
import com.hortonworks.smm.kafka.common.utils.ThreadUtils;
import com.hortonworks.smm.kafka.services.management.cache.broker.BrokerManagementCache;
import com.hortonworks.smm.kafka.services.management.dtos.BrokerNode;
import com.hortonworks.smm.kafka.services.management.dtos.KafkaClusterInfo;
import com.hortonworks.smm.kafka.services.management.dtos.KafkaResourceConfig;
import com.hortonworks.smm.kafka.services.management.helper.AdminClientHelper;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/services/management/cache/broker/impl/BrokerManagementCacheImpl.class */
public class BrokerManagementCacheImpl extends BrokerManagementCache {
    private static final long DEFAULT_SCHEDULER_SHUTDOWN_TIMEOUT_MS = 300000;
    private volatile KafkaClusterInfo clusterInfo;
    private volatile Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> allBrokerIdToLogDirInfo;
    private volatile Map<Integer, KafkaResourceConfig> brokerConfigs;
    private final ScheduledExecutorService scheduler;
    private final ReentrantLock lock;

    /* loaded from: input_file:com/hortonworks/smm/kafka/services/management/cache/broker/impl/BrokerManagementCacheImpl$CacheLoader.class */
    private static final class CacheLoader implements Runnable {
        private final BrokerManagementCache cache;

        private CacheLoader(BrokerManagementCache brokerManagementCache) {
            this.cache = brokerManagementCache;
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                this.cache.load();
                BrokerManagementCacheImpl.LOG.debug("Time taken to load the cache = {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (Exception e) {
                BrokerManagementCacheImpl.LOG.error("Error while loading the cache", e);
            }
        }
    }

    @Inject
    public BrokerManagementCacheImpl(AdminClient adminClient, AdminClientHelper adminClientHelper, KafkaManagementConfig kafkaManagementConfig) {
        super(adminClient, adminClientHelper);
        this.lock = new ReentrantLock();
        this.scheduler = ThreadUtils.createScheduledExecutorService(1, "bmc-scheduler-%d", true);
        CacheLoader cacheLoader = new CacheLoader(this);
        cacheLoader.run();
        Long cacheRefreshIntervalMs = kafkaManagementConfig.cacheRefreshIntervalMs();
        this.scheduler.scheduleWithFixedDelay(cacheLoader, cacheRefreshIntervalMs.longValue(), cacheRefreshIntervalMs.longValue(), TimeUnit.MILLISECONDS);
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.broker.BrokerManagementCache
    public KafkaClusterInfo kafkaClusterInfo() {
        return this.clusterInfo;
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.broker.BrokerManagementCache
    public Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> allBrokerIdToLogDirInfo() {
        return this.allBrokerIdToLogDirInfo;
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.broker.BrokerManagementCache
    public Map<Integer, KafkaResourceConfig> brokerConfigs() {
        return this.brokerConfigs;
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.broker.BrokerManagementCache
    public void load() {
        this.lock.lock();
        try {
            KafkaClusterInfo fetchDescribeCluster = fetchDescribeCluster();
            Collection<BrokerNode> brokerNodes = fetchDescribeCluster.brokerNodes();
            Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> fetchAllBrokersIdToDescribeLogDirs = fetchAllBrokersIdToDescribeLogDirs(brokerNodes);
            Map<Integer, KafkaResourceConfig> fetchBrokerConfigs = fetchBrokerConfigs(brokerNodes);
            this.clusterInfo = fetchDescribeCluster;
            this.allBrokerIdToLogDirInfo = fetchAllBrokersIdToDescribeLogDirs;
            this.brokerConfigs = fetchBrokerConfigs;
            LOG.trace("Refreshed BrokerManagementCache! kafkaClusterInfo: {}, allBrokerIdToLogDirInfo: {}, brokerConfigs: {}", new Object[]{this.clusterInfo, this.allBrokerIdToLogDirInfo, this.brokerConfigs});
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.hortonworks.smm.kafka.services.management.cache.broker.BrokerManagementCache, java.lang.AutoCloseable
    public void close() throws Exception {
        super.close();
        this.scheduler.shutdown();
        this.scheduler.awaitTermination(DEFAULT_SCHEDULER_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }
}
