package com.cloudera.cmon.firehose.polling.kafka;

import com.cloudera.cmf.cdhclient.util.HttpConnectionConfigurator;
import com.cloudera.cmf.descriptors.ReadOnlyRoleDescriptor;
import com.cloudera.cmf.descriptors.ReadOnlyScmDescriptorPlus;
import com.cloudera.cmf.descriptors.ReadOnlyServiceDescriptor;
import com.cloudera.cmon.MonitoringTypes;
import com.cloudera.cmon.TimeSeriesEntityType;
import com.cloudera.cmon.firehose.CMONConfiguration;
import com.cloudera.cmon.firehose.polling.AbstractFirehoseCdhWork;
import com.cloudera.cmon.firehose.polling.FirehoseClientConfiguration;
import com.cloudera.cmon.kaiser.TimeSeriesHelper;
import com.cloudera.cmon.tstore.TimeSeriesEntityBuilder;
import com.cloudera.cmon.tstore.TimeSeriesMetadataStore;
import com.cloudera.cmon.tstore.TimeSeriesStore;
import com.cloudera.enterprise.JsonUtil2;
import com.cloudera.enterprise.ThrottlingLogger;
import com.cloudera.enterprise.UrlUtil;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cmon/firehose/polling/kafka/KafkaEntitiesInfoFetcher.class */
public class KafkaEntitiesInfoFetcher extends AbstractFirehoseCdhWork<Void> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaEntitiesInfoFetcher.class);
    private static final ThrottlingLogger THROTTLED_LOG = new ThrottlingLogger(LOG, Duration.standardMinutes(15));
    private static final String KAFKA_PRODUCER_ENTITY_TYPE_NAME = "KAFKA_PRODUCER";
    private static final String KAFKA_CONSUMER_ENTITY_TYPE_NAME = "KAFKA_CONSUMER";
    private final ReadOnlyServiceDescriptor service;
    private final ReadOnlyScmDescriptorPlus descriptor;
    private final TimeSeriesStore tstore;
    private final HttpConnectionConfigurator httpConnectionConfigurator;
    private Map<String, KafkaTopicProperties> topics;
    private final Duration timeout;
    private final int maxKafkaBrokersToPoll;

    /* JADX INFO: Access modifiers changed from: private */
    @JsonIgnoreProperties(ignoreUnknown = true)
    /* loaded from: input_file:com/cloudera/cmon/firehose/polling/kafka/KafkaEntitiesInfoFetcher$KafkaTopicProperties.class */
    public static class KafkaTopicProperties {

        @JsonProperty
        public boolean isInternal;

        private KafkaTopicProperties() {
        }
    }

    public KafkaEntitiesInfoFetcher(ReadOnlyServiceDescriptor readOnlyServiceDescriptor, ReadOnlyScmDescriptorPlus readOnlyScmDescriptorPlus, TimeSeriesStore timeSeriesStore, CMONConfiguration cMONConfiguration, @Nullable HttpConnectionConfigurator httpConnectionConfigurator) {
        Preconditions.checkNotNull(readOnlyServiceDescriptor);
        Preconditions.checkArgument("KAFKA".equals(readOnlyServiceDescriptor.getServiceType()));
        Preconditions.checkNotNull(readOnlyScmDescriptorPlus);
        Preconditions.checkNotNull(timeSeriesStore);
        Preconditions.checkNotNull(cMONConfiguration);
        this.service = readOnlyServiceDescriptor;
        this.descriptor = readOnlyScmDescriptorPlus;
        this.tstore = timeSeriesStore;
        this.httpConnectionConfigurator = httpConnectionConfigurator;
        this.timeout = cMONConfiguration.getKafkaBrokerHttpTimeout();
        this.maxKafkaBrokersToPoll = cMONConfiguration.getMaxKafkaBrokersToPoll();
    }

    @Override // com.cloudera.cmon.firehose.polling.CdhTask.FirehoseCdhWork
    public void preWork(FirehoseClientConfiguration firehoseClientConfiguration) {
    }

    @Override // com.cloudera.cmon.firehose.polling.CdhTask.FirehoseCdhWork
    public Void doWork(FirehoseClientConfiguration firehoseClientConfiguration, boolean z) throws Exception {
        if (!this.descriptor.isMonitoringEnabledForKafka(this.service)) {
            return null;
        }
        int i = 0;
        ArrayList<ReadOnlyRoleDescriptor> newArrayList = Lists.newArrayList(ReadOnlyScmDescriptorPlus.getRunningRolesByType(this.service, "KAFKA_BROKER"));
        Collections.shuffle(newArrayList);
        for (ReadOnlyRoleDescriptor readOnlyRoleDescriptor : newArrayList) {
            if (i == this.maxKafkaBrokersToPoll) {
                return null;
            }
            Map<String, KafkaTopicProperties> topicsFromBroker = getTopicsFromBroker(readOnlyRoleDescriptor);
            if (topicsFromBroker != null && !topicsFromBroker.isEmpty()) {
                this.topics = topicsFromBroker;
                return null;
            }
            i++;
        }
        return null;
    }

    private Map<String, KafkaTopicProperties> getTopicsFromBroker(ReadOnlyRoleDescriptor readOnlyRoleDescriptor) {
        Preconditions.checkNotNull(readOnlyRoleDescriptor);
        URL kafkaBrokerUrlEndPoint = this.descriptor.getKafkaBrokerUrlEndPoint(readOnlyRoleDescriptor, "/api/topics");
        if (null == kafkaBrokerUrlEndPoint) {
            return null;
        }
        try {
            try {
                InputStream inputStream = getInputStream(kafkaBrokerUrlEndPoint.toString());
                if (inputStream == null) {
                    THROTTLED_LOG.warn("Could not read URL: " + kafkaBrokerUrlEndPoint.toString());
                    IOUtils.closeQuietly(inputStream);
                    return null;
                }
                Map<String, KafkaTopicProperties> map = (Map) JsonUtil2.valueFromStream(new TypeReference<Map<String, KafkaTopicProperties>>() { // from class: com.cloudera.cmon.firehose.polling.kafka.KafkaEntitiesInfoFetcher.1
                }, inputStream);
                IOUtils.closeQuietly(inputStream);
                return map;
            } catch (JsonUtil2.JsonRuntimeException e) {
                THROTTLED_LOG.warn("Could not deserialize response from: " + kafkaBrokerUrlEndPoint.toString(), e);
                IOUtils.closeQuietly((InputStream) null);
                return null;
            } catch (Exception e2) {
                THROTTLED_LOG.warn("Could not read URL: " + kafkaBrokerUrlEndPoint.toString(), e2);
                IOUtils.closeQuietly((InputStream) null);
                return null;
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly((InputStream) null);
            throw th;
        }
    }

    @VisibleForTesting
    protected InputStream getInputStream(String str) throws Exception {
        Preconditions.checkNotNull(str);
        return UrlUtil.readUrlWithTimeouts(str, this.timeout, this.timeout, this.httpConnectionConfigurator, UrlUtil.EMPTY_REQUEST_PROPERTIES, false);
    }

    @Override // com.cloudera.cmon.firehose.polling.CdhTask.FirehoseCdhWork
    public void postWork(FirehoseClientConfiguration firehoseClientConfiguration) {
        if (null == this.topics) {
            return;
        }
        HashSet newHashSet = Sets.newHashSet();
        TimeSeriesEntityType fromString = TimeSeriesEntityType.fromString("KAFKA_TOPIC");
        Preconditions.checkNotNull(fromString);
        for (Map.Entry<String, KafkaTopicProperties> entry : this.topics.entrySet()) {
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put(MonitoringTypes.SERVICE_NAME_ATTRIBUTE.toString(), this.service.getName());
            newHashMap.put("kafkaTopicName", entry.getKey());
            newHashMap.put("kafkaInternalTopic", Boolean.toString(entry.getValue().isInternal));
            newHashSet.add(TimeSeriesEntityBuilder.fromAttributes(fromString, newHashMap, false).getOrCreate(this.tstore).getName());
        }
        new TimeSeriesHelper(this.tstore).deleteOldEntitiesWithinService(this.service.getName(), newHashSet, TimeSeriesEntityType.fromString("KAFKA_TOPIC").getCategory());
        cleanupProducerAndConsumerEntities();
    }

    private void cleanupProducerAndConsumerEntities() {
        TimeSeriesEntityType fromString = TimeSeriesEntityType.fromString(KAFKA_PRODUCER_ENTITY_TYPE_NAME);
        TimeSeriesEntityType fromString2 = TimeSeriesEntityType.fromString(KAFKA_CONSUMER_ENTITY_TYPE_NAME);
        if (fromString == null || fromString2 == null) {
            return;
        }
        HashSet newHashSet = Sets.newHashSet(Iterables.transform(this.topics.entrySet(), new Function<Map.Entry<String, KafkaTopicProperties>, String>() { // from class: com.cloudera.cmon.firehose.polling.kafka.KafkaEntitiesInfoFetcher.2
            @Nullable
            public String apply(@Nullable Map.Entry<String, KafkaTopicProperties> entry) {
                if (entry != null) {
                    return entry.getKey();
                }
                return null;
            }
        }));
        for (TimeSeriesMetadataStore.TimeSeriesEntity timeSeriesEntity : this.tstore.searchTimeSeriesEntities((List<String>) ImmutableList.of(fromString.getCategory(), fromString2.getCategory()), (Map<String, String>) ImmutableMap.of(MonitoringTypes.SERVICE_NAME_ATTRIBUTE.toString(), this.service.getName()), -1)) {
            if (!newHashSet.contains(timeSeriesEntity.getAttributes().get("kafkaTopicName"))) {
                this.tstore.deleteTimeSeriesEntity(timeSeriesEntity);
            }
        }
    }

    @Override // com.cloudera.cmon.firehose.polling.CdhTask.FirehoseCdhWork
    public String getUserToImpersonate(FirehoseClientConfiguration firehoseClientConfiguration) {
        return null;
    }
}
