package com.hortonworks.smm.kafka.monitoring.interceptors;

import com.hortonworks.smm.kafka.monitoring.entities.MonitoringConsumerMetrics;
import com.hortonworks.smm.kafka.monitoring.serdes.MonitoringConsumerMetricsSerde;
import com.hortonworks.smm.kafka.monitoring.utils.Constants;
import com.hortonworks.smm.kafka.monitoring.utils.CustomThreadFactory;
import com.hortonworks.smm.kafka.monitoring.utils.TopicUtils;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hortonworks/smm/kafka/monitoring/interceptors/MonitoringConsumerInterceptor.class */
public class MonitoringConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
    public static final String DEFAULT_CONSUMER_METRICS_TOPIC = "__smm_consumer_metrics";
    public static final String PROPERTY_CONSUMER_METRICS_TOPIC = "smm.consumer.metrics.topic";
    private static final int CLOSE_TIME_OUT = 3;
    private String smmConsumerMetricsTopic;
    private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new CustomThreadFactory("smm-consumer-metrics-publisher", true));
    private static final Logger LOG = LoggerFactory.getLogger(MonitoringConsumerInterceptor.class);
    private String groupId = null;
    private String clientId = null;
    Map<String, MonitoringConsumerMetrics> metricsBufferByTopicPartition = new HashMap();
    KafkaProducer<String, MonitoringConsumerMetrics> metricsProducer = null;
    private ReentrantLock metricsLock = new ReentrantLock();
    private volatile boolean closed = false;
    private ScheduledFuture metricsFuture = executorService.scheduleAtFixedRate(() -> {
        try {
            pushMetricsToKafka();
        } catch (Exception e) {
            LOG.error("Unexpected error while pushing consumer latency metrics to Kafka.", e);
        }
    }, 30, 30, TimeUnit.SECONDS);

    public MonitoringConsumerInterceptor() {
        this.smmConsumerMetricsTopic = null;
        this.smmConsumerMetricsTopic = System.getProperty(PROPERTY_CONSUMER_METRICS_TOPIC, DEFAULT_CONSUMER_METRICS_TOPIC);
    }

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> consumerRecords) {
        if (this.closed) {
            return consumerRecords;
        }
        this.metricsLock.lock();
        try {
            consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                if (consumerRecord.timestamp() == -1) {
                    LOG.error("Record has no timestamp.");
                    return;
                }
                String str = consumerRecord.topic();
                Integer valueOf = Integer.valueOf(consumerRecord.partition());
                this.metricsBufferByTopicPartition.computeIfAbsent(str + Constants.DELIMITER + valueOf, str2 -> {
                    return new MonitoringConsumerMetrics(str, valueOf.intValue(), this.groupId, this.clientId);
                }).updateMetric(Long.valueOf((consumerRecord.timestamp() / 30000) * 30), System.currentTimeMillis() - consumerRecord.timestamp());
            });
        } catch (Exception e) {
            LOG.error("Unexpected exception in SMM interceptor. Failed to capture the latency metrics.", e);
        } finally {
            this.metricsLock.unlock();
        }
        return consumerRecords;
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void close() {
        this.closed = true;
        this.metricsFuture.cancel(false);
        pushMetricsToKafka();
        this.metricsProducer.close(Duration.ofSeconds(3L));
    }

    public void configure(Map<String, ?> map) {
        this.groupId = (String) map.get("group.id");
        this.clientId = (String) map.get("client.id");
        initializeMetricsProducer(map);
    }

    void initializeMetricsProducer(Map<String, ?> map) {
        Map<String, Object> interceptorProducerConfig = InterceptorProducerConfigurationHelper.getInterceptorProducerConfig(map);
        interceptorProducerConfig.put("value.serializer", new MonitoringConsumerMetricsSerde().serializer().getClass().getName());
        this.metricsProducer = new KafkaProducer<>(interceptorProducerConfig);
        TopicUtils.createTopicIfNeeded(interceptorProducerConfig, this.smmConsumerMetricsTopic);
    }

    private void pushMetricsToKafka() {
        this.metricsLock.lock();
        try {
            Map<String, MonitoringConsumerMetrics> map = this.metricsBufferByTopicPartition;
            this.metricsBufferByTopicPartition = new HashMap();
            map.forEach((str, monitoringConsumerMetrics) -> {
                try {
                    this.metricsProducer.send(new ProducerRecord(this.smmConsumerMetricsTopic, monitoringConsumerMetrics.getTopic(), monitoringConsumerMetrics));
                } catch (Exception e) {
                    LOG.error("Failed to push the latency metrics to Kafka.", e);
                }
            });
        } finally {
            this.metricsLock.unlock();
        }
    }
}
