package com.hortonworks.smm.kafka.services.metric.cm;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.hortonworks.smm.kafka.common.config.KafkaMetricsConfig;
import com.hortonworks.smm.kafka.services.management.cm.KafkaClusterInfoService;
import com.hortonworks.smm.kafka.services.management.dtos.BrokerNode;
import com.hortonworks.smm.kafka.services.metric.AbstractMetricDescriptorSupplier;
import com.hortonworks.smm.kafka.services.metric.MetricDescriptor;
import com.hortonworks.smm.kafka.services.metric.MetricDescriptorSupplier;
import com.hortonworks.smm.kafka.services.metric.MetricTimeSpanController;
import com.hortonworks.smm.kafka.services.metric.MetricTsToDataSortedMap;
import com.hortonworks.smm.kafka.services.metric.MetricUtils;
import com.hortonworks.smm.kafka.services.metric.MetricsEmitter;
import com.hortonworks.smm.kafka.services.metric.MetricsFetcher;
import com.hortonworks.smm.kafka.services.metric.MetricsService;
import com.hortonworks.smm.kafka.services.metric.MetricsStoreApiClient;
import com.hortonworks.smm.kafka.services.metric.TimeSpan;
import com.hortonworks.smm.kafka.services.metric.cm.pojo.TimeSeriesRequest;
import com.hortonworks.smm.kafka.services.metric.cm.pojo.TimeSeriesResponse;
import com.hortonworks.smm.kafka.services.metric.dtos.AggrTopicMetrics;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hortonworks/smm/kafka/services/metric/cm/CMMetricsFetcher.class */
public class CMMetricsFetcher implements MetricsFetcher {
    private static final String DESCRIPTORS_MESSAGE = "metricDescriptors must not be empty!";
    private final MetricTimeSpanController metricTimeSpanController = new CMMetricTimeSpanController();
    private final MetricsStoreApiClient metricsStoreApiClient;
    private final MetricsEmitter metricsEmitter;
    private final String serviceName;
    private boolean requestMetricsSeparately;
    private static final Logger LOG = LoggerFactory.getLogger(CMMetricsFetcher.class);
    private static final Map<String, String> METRICS_SERVICE_TAGS_TO_CM_ATTRIBUTES = ImmutableMap.builder().put("topic", "kafkaTopicName").put("partition", "kafkaPartitionId").put("clientId", "kafkaClientId").put("group", "kafkaConsumerGroup").put("hostname", "hostname").put("logicalPartition", "logicalPartition").put("serviceName", "serviceName").put(AbstractMetricDescriptorSupplier.CONNECTOR_NAME, "kafkaConnectConnectorName").put(AbstractMetricDescriptorSupplier.CONNECTOR_TASK_ID, "kafkaConnectConnectorTaskId").put("roleType", "roleType").build();

    /* loaded from: input_file:com/hortonworks/smm/kafka/services/metric/cm/CMMetricsFetcher$TypedMetricRequest.class */
    public static class TypedMetricRequest {
        private final Collection<MetricDescriptor> metricDescriptors;
        private final TimeSeriesRequest request;
        private final TimeSpan timeSpan;

        public TypedMetricRequest(Collection<MetricDescriptor> collection, TimeSeriesRequest timeSeriesRequest, TimeSpan timeSpan) {
            this.metricDescriptors = collection;
            this.request = timeSeriesRequest;
            this.timeSpan = timeSpan;
        }

        public TimeSeriesRequest getRequest() {
            return this.request;
        }

        public static TypedMetricRequest buildTypedMetricRequest(Map<String, String> map, Collection<MetricDescriptor> collection, TimeSpan timeSpan) {
            return new TypedMetricRequest(collection, new TimeSeriesRequest(buildQuery(map, collection), timeSpan.startTimeMs().longValue(), timeSpan.endTimeMs().longValue()), timeSpan);
        }

        public static String buildQuery(Map<String, String> map, Collection<MetricDescriptor> collection) {
            StringBuilder sb = new StringBuilder("select ");
            sb.append((String) collection.stream().map(metricDescriptor -> {
                return metricDescriptor.metricName().getName();
            }).collect(Collectors.joining(", ")));
            ArrayList arrayList = new ArrayList();
            if (map != null && !map.isEmpty()) {
                map.forEach((str, str2) -> {
                    if (MetricUtils.WILD_CARD.equals(str2)) {
                        return;
                    }
                    arrayList.add(((String) CMMetricsFetcher.METRICS_SERVICE_TAGS_TO_CM_ATTRIBUTES.getOrDefault(str, str)) + "=" + doubleQuote(str2));
                });
            }
            if (!arrayList.isEmpty()) {
                sb.append(" where ");
                sb.append(String.join(" and ", arrayList));
            }
            return sb.toString();
        }

        private static String doubleQuote(String str) {
            return str.isEmpty() ? "" : "\"" + str + "\"";
        }
    }

    public CMMetricsFetcher(MetricsStoreApiClient metricsStoreApiClient, MetricsEmitter metricsEmitter, KafkaClusterInfoService kafkaClusterInfoService) {
        this.metricsStoreApiClient = metricsStoreApiClient;
        this.metricsEmitter = metricsEmitter;
        this.serviceName = kafkaClusterInfoService.getServiceName();
    }

    @Override // com.hortonworks.smm.kafka.services.metric.MetricsFetcher
    public Map<MetricDescriptor, Map<Long, Double>> getBrokerMetrics(@NonNull BrokerNode brokerNode, @NonNull TimeSpan timeSpan, @NonNull Collection<MetricDescriptor> collection) {
        if (brokerNode == null) {
            throw new NullPointerException("brokerNode is marked non-null but is null");
        }
        if (timeSpan == null) {
            throw new NullPointerException("timeSpan is marked non-null but is null");
        }
        if (collection == null) {
            throw new NullPointerException("metricDescriptors is marked non-null but is null");
        }
        Preconditions.checkArgument(!collection.isEmpty(), DESCRIPTORS_MESSAGE);
        Collection<MetricDescriptor> collection2 = (Collection) filterQueryableMetricDescriptors(collection).stream().map(MetricDescriptor::copy).collect(Collectors.toList());
        collection2.forEach(metricDescriptor -> {
            metricDescriptor.queryTags().put("hostname", String.valueOf(brokerNode.host()));
        });
        return queryMetrics(collection2, timeSpan);
    }

    @Override // com.hortonworks.smm.kafka.services.metric.MetricsFetcher
    public Map<MetricDescriptor, Map<Long, Double>> getClusterMetrics(@NonNull TimeSpan timeSpan, @NonNull Collection<MetricDescriptor> collection) {
        if (timeSpan == null) {
            throw new NullPointerException("timeSpan is marked non-null but is null");
        }
        if (collection == null) {
            throw new NullPointerException("metricDescriptors is marked non-null but is null");
        }
        Preconditions.checkArgument(!collection.isEmpty(), DESCRIPTORS_MESSAGE);
        return queryMetrics(filterQueryableMetricDescriptors(collection), timeSpan);
    }

    @Override // com.hortonworks.smm.kafka.services.metric.MetricsFetcher
    public Map<MetricDescriptor, Map<Long, Double>> getHostMetrics(@NonNull BrokerNode brokerNode, @NonNull TimeSpan timeSpan, @NonNull Collection<MetricDescriptor> collection) {
        if (brokerNode == null) {
            throw new NullPointerException("brokerNode is marked non-null but is null");
        }
        if (timeSpan == null) {
            throw new NullPointerException("timeSpan is marked non-null but is null");
        }
        if (collection == null) {
            throw new NullPointerException("metricDescriptors is marked non-null but is null");
        }
        Preconditions.checkArgument(!collection.isEmpty(), DESCRIPTORS_MESSAGE);
        return getBrokerMetrics(brokerNode, timeSpan, collection);
    }

    @Override // com.hortonworks.smm.kafka.services.metric.MetricsFetcher
    public MetricDescriptorSupplier getMetricDescriptorSupplier() {
        return new CMMetricDescriptorSupplier(this.serviceName);
    }

    @Override // com.hortonworks.smm.kafka.services.metric.MetricsFetcher
    public boolean emitMetrics(Map<MetricDescriptor, Long> map) throws Exception {
        return this.metricsEmitter.emitMetrics(map);
    }

    @Override // com.hortonworks.smm.kafka.services.metric.MetricsFetcher
    public boolean supportsBatchMetricsRequest() {
        return true;
    }

    @Override // com.hortonworks.smm.kafka.services.metric.MetricsFetcher
    public void adjustTopicOverviewMetrics(MetricsService metricsService, AggrTopicMetrics aggrTopicMetrics, TimeSpan timeSpan, String str) {
        if (getMetricTimeSpanController().isTimeStampCached(timeSpan)) {
            Set singleton = Collections.singleton(str);
            Long maxSumAmongLowerTimePeriods = getMaxSumAmongLowerTimePeriods(aggrTopicMetrics.getMessagesIn(), timeSpan, timeSpan2 -> {
                return metricsService.getTopicMessagesInSum(timeSpan2, singleton);
            });
            Long maxSumAmongLowerTimePeriods2 = getMaxSumAmongLowerTimePeriods(aggrTopicMetrics.getBytesIn(), timeSpan, timeSpan3 -> {
                return metricsService.getTopicBytesInSum(timeSpan3, singleton);
            });
            Long maxSumAmongLowerTimePeriods3 = getMaxSumAmongLowerTimePeriods(aggrTopicMetrics.getBytesOut(), timeSpan, timeSpan4 -> {
                return metricsService.getTopicBytesOutSum(timeSpan4, singleton);
            });
            aggrTopicMetrics.setMessagesIn(maxSumAmongLowerTimePeriods);
            aggrTopicMetrics.setBytesIn(maxSumAmongLowerTimePeriods2);
            aggrTopicMetrics.setBytesOut(maxSumAmongLowerTimePeriods3);
        }
    }

    private Long getMaxSumAmongLowerTimePeriods(Long l, TimeSpan timeSpan, Function<TimeSpan, Map<MetricDescriptor, MetricTsToDataSortedMap<Long>>> function) {
        TimeSpan.TimePeriod timePeriod;
        TimeSpan.TimePeriod[] values = TimeSpan.TimePeriod.values();
        int length = values.length;
        for (int i = 0; i < length && (timePeriod = values[i]) != timeSpan.timePeriod(); i++) {
            Map<MetricDescriptor, MetricTsToDataSortedMap<Long>> apply = function.apply(new TimeSpan(timePeriod));
            Long extractLongMaxTimestampValue = apply.size() == 1 ? MetricUtils.extractLongMaxTimestampValue(apply.entrySet().iterator().next().getValue()) : 0L;
            if (apply.size() > 1) {
                LOG.warn("{} metrics found for {} period while comparing with time span {} for metric descriptors {}", new Object[]{Integer.valueOf(apply.size()), timePeriod, timeSpan, apply.keySet()});
            }
            if (extractLongMaxTimestampValue.longValue() > l.longValue()) {
                l = extractLongMaxTimestampValue;
            }
        }
        return l;
    }

    @Override // com.hortonworks.smm.kafka.services.metric.MetricsFetcher
    public MetricTimeSpanController getMetricTimeSpanController() {
        return this.metricTimeSpanController;
    }

    public void configure(Map<String, ?> map) {
        this.requestMetricsSeparately = KafkaMetricsConfig.requestMetricsSeparately(map);
    }

    private Map<MetricDescriptor, Map<Long, Double>> queryMetrics(Collection<MetricDescriptor> collection, TimeSpan timeSpan) {
        Iterator<MetricDescriptor> it = collection.iterator();
        while (it.hasNext()) {
            String str = it.next().queryTags().get("topic");
            if (str != null && str.contains(AbstractMetricDescriptorSupplier.MULTIPLE_TOPICS_SEPARATOR)) {
                throw new IllegalArgumentException("CMMetricsFetcher does not support querying for multiple topics for one MetricDescriptor! Only wildcards are allowed!");
            }
        }
        return (Map) (this.requestMetricsSeparately ? collection.parallelStream().map(metricDescriptor -> {
            return TypedMetricRequest.buildTypedMetricRequest(metricDescriptor.queryTags(), Collections.singleton(metricDescriptor), timeSpan);
        }) : MetricUtils.groupByQueryTags(collection).entrySet().parallelStream().map(entry -> {
            return TypedMetricRequest.buildTypedMetricRequest((Map) entry.getKey(), (Collection) entry.getValue(), timeSpan);
        })).map(this::getMetricsFromCmApi).reduce(new ConcurrentHashMap(), (map, map2) -> {
            map.putAll(map2);
            return map;
        });
    }

    private Map<MetricDescriptor, Map<Long, Double>> getMetricsFromCmApi(TypedMetricRequest typedMetricRequest) {
        if (typedMetricRequest == null) {
            return new HashMap();
        }
        long currentTimeMillis = System.currentTimeMillis();
        TimeSeriesResponse timeSeriesResponse = (TimeSeriesResponse) this.metricsStoreApiClient.apiPost("/timeseries", typedMetricRequest.request, TimeSeriesResponse.class);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        timeSeriesResponse.getItems().stream().flatMap(item -> {
            return item.getTimeSeries().stream();
        }).forEach(timeSeries -> {
            MetricDescriptor extractMetricDescriptor = extractMetricDescriptor(timeSeries.getMetadata(), typedMetricRequest.metricDescriptors);
            if (extractMetricDescriptor != null) {
                Map<Long, Double> extractMetrics = extractMetrics(timeSeries.getData());
                CMAggregatorUtils.maybeAggregate(extractMetrics, hashMap, extractMetricDescriptor, hashMap2);
                hashMap.put(extractMetricDescriptor, extractMetrics);
            }
        });
        CMAggregatorUtils.avgPostHandler(hashMap, hashMap2);
        LOG.debug("Fetched and parsed metrics from CM for timeSpan: {}, params: {}, receivedMetricPoints: {}, time taken: {} ms, queried metricDescriptor(s): {}", new Object[]{typedMetricRequest.timeSpan, typedMetricRequest.request, Integer.valueOf(hashMap.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), typedMetricRequest.metricDescriptors});
        LOG.trace("Parsed metrics for timeSpan. timeSpan: {}, metrics: {}", typedMetricRequest.timeSpan, hashMap);
        return hashMap;
    }

    private static Collection<MetricDescriptor> filterQueryableMetricDescriptors(Collection<MetricDescriptor> collection) {
        return (Collection) collection.stream().filter(metricDescriptor -> {
            return metricDescriptor != CMMetricDescriptorSupplier.DUMMY_METRIC_DESCRIPTOR;
        }).collect(Collectors.toList());
    }

    private Map<Long, Double> extractMetrics(List<TimeSeriesResponse.Data> list) {
        return (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getTimestampMsTruncatedToMinutes();
        }, (v0) -> {
            return v0.getValue();
        }, (d, d2) -> {
            return d2;
        }));
    }

    private MetricDescriptor extractMetricDescriptor(TimeSeriesResponse.Metadata metadata, Collection<MetricDescriptor> collection) {
        Optional<MetricDescriptor> findFirst = collection.stream().filter(metricDescriptor -> {
            return metricDescriptor.metricName().getName().equals(metadata.getMetricName());
        }).findFirst();
        if (!findFirst.isPresent()) {
            LOG.error("No metricDescriptor matched for metadata of time series data returned. MetricName of metadata: {}", metadata.getMetricName());
            return null;
        }
        MetricDescriptor metricDescriptor2 = findFirst.get();
        Map<String, String> attributes = metadata.getAttributes();
        HashMap hashMap = new HashMap();
        metricDescriptor2.metricName().getTags().forEach(str -> {
            String str = METRICS_SERVICE_TAGS_TO_CM_ATTRIBUTES.get(str);
            if (str != null && attributes.containsKey(str)) {
                hashMap.put(str, attributes.get(str));
            }
            if ("category".equals(str)) {
                maybeAddCategoryQueryTag(attributes, hashMap);
            }
        });
        return MetricDescriptor.newBuilder().withQueryTags(hashMap).withAggregationFunction(metricDescriptor2.aggrFunction()).build(metricDescriptor2.metricName());
    }

    private static void maybeAddCategoryQueryTag(Map<String, ?> map, Map<String, String> map2) {
        String str = (String) map.get("category");
        boolean z = -1;
        switch (str.hashCode()) {
            case -1730922675:
                if (str.equals("KAFKA_CONNECT_CONNECTOR_SOURCE_TASK_METRICS")) {
                    z = 4;
                    break;
                }
                break;
            case -1592831339:
                if (str.equals("SERVICE")) {
                    z = false;
                    break;
                }
                break;
            case -48697387:
                if (str.equals("KAFKA_CONNECT_CONNECTOR_SINK_TASK_METRICS")) {
                    z = 5;
                    break;
                }
                break;
            case 2098589:
                if (str.equals("DISK")) {
                    z = 8;
                    break;
                }
                break;
            case 2223528:
                if (str.equals("HOST")) {
                    z = 7;
                    break;
                }
                break;
            case 2521206:
                if (str.equals("ROLE")) {
                    z = true;
                    break;
                }
                break;
            case 118399426:
                if (str.equals("KAFKA_BROKER_TOPIC")) {
                    z = 2;
                    break;
                }
                break;
            case 460237353:
                if (str.equals("KAFKA_CONNECT_CONNECTOR_TASK_METRICS")) {
                    z = 3;
                    break;
                }
                break;
            case 1721108050:
                if (str.equals("KAFKA_CONNECT_CONNECTOR_TASK_ERROR_METRICS")) {
                    z = 6;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
            case true:
            case true:
            case true:
            case true:
            case true:
            case MetricDescriptorSupplier.TOPIC_DEPENDENT_AGGREGATED_METRICS /* 6 */:
            case true:
            case true:
                map2.put("category", str);
                return;
            default:
                return;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
    }
}
