package com.hortonworks.smm.kafka.services.metric.cm;

import com.cloudera.cmon.firehose.BasicFirehoseClient;
import com.cloudera.cmon.firehose.event.AgentMsg;
import com.cloudera.cmon.firehose.event.EntityTypeUpdate;
import com.cloudera.cmon.firehose.event.EntityTypeUpdateEntry;
import com.cloudera.cmon.firehose.event.MetricValue;
import com.cloudera.cmon.firehose.event.RoleUpdate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.hortonworks.smm.kafka.common.config.CmServerClientConfig;
import com.hortonworks.smm.kafka.common.config.KafkaMetricsConfig;
import com.hortonworks.smm.kafka.services.management.cm.KafkaClusterInfoService;
import com.hortonworks.smm.kafka.services.metric.MetricDescriptor;
import com.hortonworks.smm.kafka.services.metric.MetricsEmitter;
import com.hortonworks.smm.kafka.services.metric.MetricsStoreApiClient;
import com.hortonworks.smm.kafka.services.metric.cm.pojo.RoleConfigResponse;
import com.hortonworks.smm.kafka.services.metric.cm.pojo.RolesResponse;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.inject.Singleton;
import org.apache.kafka.common.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/services/metric/cm/CMMetricsEmitter.class */
public class CMMetricsEmitter implements MetricsEmitter, Configurable {
    private static final String CM_METRICS_EMIT_CONSUMER_METRICS_TIMEOUT = "cm.metrics.emit.consumer.metrics.timeout";
    private static final String CM_METRICS_SERVICE_NAME_PROPERTY = "cm.metrics.service.name";
    private final ReadWriteLock readWriteLock;
    private final MetricsStoreApiClient metricsStoreApiClient;
    private final Integer consumerMetricsEmissionTimeout;
    private BasicFirehoseClient firehoseClient;
    private String smonHost;
    private Integer smonPort;
    private String clusterName;
    private String serviceName;
    private String roleName;
    private static final Logger LOG = LoggerFactory.getLogger(CMMetricsEmitter.class);
    private static final Integer DEFAULT_CONSUMER_EMISSION_TIMEOUT = 10000;
    private static final Map<String, String> CONSUMER_METRIC_NAME = ImmutableMap.builder().put("stats(kafka_consumer_group_lag_rate, counter)", "kafka_consumer_group_lag_count").put("stats(kafka_consumer_group_committed_offset_rate, counter)", "kafka_consumer_group_committed_offset_count").build();
    private static final Map<String, String> CONSUMER_METRIC_CATEGORY = ImmutableMap.builder().put("stats(kafka_consumer_group_lag_rate, counter)", "KAFKA_CONSUMER_GROUP").put("stats(kafka_consumer_group_committed_offset_rate, counter)", "KAFKA_CONSUMER_GROUP").build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hortonworks/smm/kafka/services/metric/cm/CMMetricsEmitter$ConsumerMetricAttributes.class */
    public static class ConsumerMetricAttributes {
        private final String clusterName;
        private final String serviceName;
        private final String kafkaConsumerGroup;

        ConsumerMetricAttributes(String str, String str2, String str3) {
            this.clusterName = str;
            this.serviceName = str2;
            this.kafkaConsumerGroup = str3;
        }

        public Map<String, String> asAttributeMap() {
            return ImmutableMap.builder().put("clusterName", this.clusterName).put("serviceName", this.serviceName).put("kafkaConsumerGroup", this.kafkaConsumerGroup).build();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ConsumerMetricAttributes consumerMetricAttributes = (ConsumerMetricAttributes) obj;
            return this.clusterName.equals(consumerMetricAttributes.clusterName) && this.serviceName.equals(consumerMetricAttributes.serviceName) && this.kafkaConsumerGroup.equals(consumerMetricAttributes.kafkaConsumerGroup);
        }

        public int hashCode() {
            return Objects.hash(this.clusterName, this.serviceName, this.kafkaConsumerGroup);
        }
    }

    public CMMetricsEmitter(MetricsStoreApiClient metricsStoreApiClient, KafkaMetricsConfig kafkaMetricsConfig, KafkaClusterInfoService kafkaClusterInfoService) {
        this(metricsStoreApiClient, kafkaMetricsConfig, kafkaClusterInfoService, null);
    }

    @VisibleForTesting
    public CMMetricsEmitter(MetricsStoreApiClient metricsStoreApiClient, KafkaMetricsConfig kafkaMetricsConfig, KafkaClusterInfoService kafkaClusterInfoService, BasicFirehoseClient basicFirehoseClient) {
        this.readWriteLock = new ReentrantReadWriteLock();
        this.metricsStoreApiClient = metricsStoreApiClient;
        this.serviceName = kafkaClusterInfoService.getServiceName();
        this.clusterName = kafkaClusterInfoService.getClusterName();
        this.roleName = kafkaClusterInfoService.getRoleName();
        this.consumerMetricsEmissionTimeout = (Integer) kafkaMetricsConfig.getConfig().getOrDefault(CM_METRICS_EMIT_CONSUMER_METRICS_TIMEOUT, DEFAULT_CONSUMER_EMISSION_TIMEOUT);
        try {
            configureSmonProperties(basicFirehoseClient);
        } catch (Exception e) {
            LOG.error("Error while trying to fetch SMON host and port!", e);
        }
    }

    public void configure(Map<String, ?> map) {
    }

    private void checkMetricsConfig(CmServerClientConfig cmServerClientConfig) {
        if (cmServerClientConfig == null || CollectionUtils.isEmpty(cmServerClientConfig.getConfig()) || !cmServerClientConfig.getConfig().containsKey("cm.metrics.service.name")) {
            throw new IllegalArgumentException("KafkaMetricsConfig's config needs to contain the following property: cm.metrics.service.name");
        }
    }

    private void configureSmonProperties(BasicFirehoseClient basicFirehoseClient) {
        Lock writeLock = this.readWriteLock.writeLock();
        writeLock.lock();
        try {
            try {
                RolesResponse rolesResponse = (RolesResponse) this.metricsStoreApiClient.apiGet("/cm/service/roles", ImmutableMap.of("contentType", "application/json"), RolesResponse.class);
                if (rolesResponse == null) {
                    throw new RuntimeException("Null response received while trying to fetch SMON host from cm's path: \"/cm/service/roles\"");
                }
                String str = null;
                String str2 = null;
                Iterator<RolesResponse.Role> it = rolesResponse.getItems().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    RolesResponse.Role next = it.next();
                    if ("SERVICEMONITOR".equals(next.getType())) {
                        str = next.getName();
                        str2 = next.getHostRef().getHostname();
                        break;
                    }
                }
                if (str == null || str2 == null) {
                    throw new RuntimeException("Could not determine either the roleName or the host of ServiceMonitor from API response: \"/cm/service/roles\"");
                }
                this.smonHost = str2;
                configureSmonPort(str);
                configureFireHoseClient(basicFirehoseClient);
                writeLock.unlock();
            } catch (Exception e) {
                throw new RuntimeException("Error while trying to fetch Metrics frmon Smon!", e);
            }
        } catch (Throwable th) {
            writeLock.unlock();
            throw th;
        }
    }

    private void configureFireHoseClient(BasicFirehoseClient basicFirehoseClient) {
        BasicFirehoseClient basicFirehoseClient2;
        if (basicFirehoseClient != null) {
            basicFirehoseClient2 = basicFirehoseClient;
        } else {
            try {
                basicFirehoseClient2 = new BasicFirehoseClient(this.smonHost, this.smonPort.intValue(), this.consumerMetricsEmissionTimeout.intValue());
            } catch (IOException e) {
                throw new RuntimeException("Error while initalizing BasicFirehoseClient!", e);
            }
        }
        this.firehoseClient = basicFirehoseClient2;
    }

    private void configureSmonPort(String str) {
        try {
            Integer num = null;
            Iterator<RoleConfigResponse.RoleConfig> it = ((RoleConfigResponse) this.metricsStoreApiClient.apiGet("/cm/service/roles/" + str + "/config", ImmutableMap.of("contentType", "application/json", "view", "FULL"), RoleConfigResponse.class)).getItems().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                RoleConfigResponse.RoleConfig next = it.next();
                if ("firehose_listen_port".equals(next.getName())) {
                    num = next.getValue() != null ? Integer.valueOf(next.getValue()) : Integer.valueOf(next.getDefaultValue());
                }
            }
            if (num == null) {
                throw new RuntimeException("Could not determine the port of ServiceMonitor from API response: \"/cm/service/roles/" + str + "\"/config");
            }
            this.smonPort = num;
        } catch (RuntimeException e) {
            throw new RuntimeException("Error while trying to fetch SMON port, from CM API path: \"/cm/service/roles/" + str + "/config\"");
        }
    }

    @Override // com.hortonworks.smm.kafka.services.metric.MetricsEmitter
    public boolean emitMetrics(Map<MetricDescriptor, Long> map) throws Exception {
        return emitMetrics(map, null);
    }

    @VisibleForTesting
    public boolean emitMetrics(Map<MetricDescriptor, Long> map, BasicFirehoseClient basicFirehoseClient) throws Exception {
        try {
            return pushMetricsIntoCm(map, this.clusterName, this.serviceName, this.roleName);
        } catch (Exception e) {
            configureSmonProperties(basicFirehoseClient);
            return pushMetricsIntoCm(map, this.clusterName, this.serviceName, this.roleName);
        }
    }

    private boolean pushMetricsIntoCm(Map<MetricDescriptor, Long> map, String str, String str2, String str3) throws IOException {
        Lock readLock = this.readWriteLock.readLock();
        readLock.lock();
        try {
            if (this.firehoseClient == null) {
                throw new IllegalStateException("There was an error while intializing FirehoseClient therefore metric emission is impossible!");
            }
            HashMap hashMap = new HashMap();
            map.forEach((metricDescriptor, l) -> {
                String name = metricDescriptor.metricName().getName();
                String str4 = CONSUMER_METRIC_CATEGORY.get(name);
                String str5 = CONSUMER_METRIC_NAME.get(name);
                if (str4 == null || str5 == null) {
                    return;
                }
                hashMap.computeIfAbsent(str4, str6 -> {
                    return new HashMap();
                });
                ConsumerMetricAttributes consumerMetricAttributes = new ConsumerMetricAttributes(str, str2, metricDescriptor.queryTags().get("group"));
                ((Map) hashMap.get(str4)).computeIfAbsent(consumerMetricAttributes, consumerMetricAttributes2 -> {
                    return new LinkedList();
                });
                ((List) ((Map) hashMap.get(str4)).get(consumerMetricAttributes)).add(MetricValue.newBuilder().setName(str5).setValue(l).build());
            });
            LinkedList linkedList = new LinkedList();
            hashMap.forEach((str4, map2) -> {
                LinkedList linkedList2 = new LinkedList();
                map2.forEach((consumerMetricAttributes, list) -> {
                    linkedList2.add(EntityTypeUpdateEntry.newBuilder().setAttributes(consumerMetricAttributes.asAttributeMap()).setMetrics(list).build());
                });
                linkedList.add(EntityTypeUpdate.newBuilder().setCreateNewEntities(true).setDeleteOldEntities(false).setEntityType(str4).setEntities(linkedList2).build());
            });
            if (linkedList.isEmpty()) {
                return true;
            }
            long currentTimeMillis = System.currentTimeMillis() / 1000;
            AgentMsg build = AgentMsg.newBuilder().setRoleMetrics(Collections.singletonList(RoleUpdate.newBuilder().setEntityTypeUpdates(linkedList).setServiceType("KAFKA").setServiceName(str2).setRolename(str3).setRoletype("KAFKA_BROKER").setMetrics(Collections.emptyList()).setDirUpdates(Collections.emptyList()).setFlumeUpdates(Collections.emptyList()).setTsSecs(currentTimeMillis).build())).setTsSecs(currentTimeMillis).setHostname("dummy").setHostId("dummy").setActivityUpdates(Collections.emptyList()).setAttemptMetrics(Collections.emptyList()).setServiceUpdates(Collections.emptyList()).build();
            readLock = this.readWriteLock.readLock();
            readLock.lock();
            try {
                this.firehoseClient.send(build);
                readLock.unlock();
                return true;
            } finally {
            }
        } finally {
            readLock.unlock();
        }
    }
}
