package com.hortonworks.smm.kafka.monitoring.interceptors;

import com.hortonworks.smm.kafka.monitoring.entities.MonitoringProducerMetrics;
import com.hortonworks.smm.kafka.monitoring.serdes.MonitoringProducerMetricsSerde;
import com.hortonworks.smm.kafka.monitoring.utils.Constants;
import com.hortonworks.smm.kafka.monitoring.utils.CustomThreadFactory;
import com.hortonworks.smm.kafka.monitoring.utils.TopicUtils;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hortonworks/smm/kafka/monitoring/interceptors/MonitoringProducerInterceptor.class */
public class MonitoringProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
    public static final String DEFAULT_PRODUCER_METRICS_TOPIC = "__smm_producer_metrics";
    public static final String PROPERTY_PRODUCER_METRICS_TOPIC = "smm.producer.metrics.topic";
    private static final int CLOSE_TIME_OUT = 3;
    private String smmProducerMetricsTopic;
    private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new CustomThreadFactory("smm-producer-metrics-publisher", true));
    private static final Logger LOG = LoggerFactory.getLogger(MonitoringProducerInterceptor.class);
    KafkaProducer<String, MonitoringProducerMetrics> metricsProducer = null;
    Map<String, MonitoringProducerMetrics> metricsBufferByTopicPartition = new HashMap();
    private final ReentrantLock metricsLock = new ReentrantLock();
    private volatile boolean closed = false;
    private ScheduledFuture metricsFuture = service.scheduleAtFixedRate(() -> {
        try {
            pushMetricsToKafka();
        } catch (Exception e) {
            LOG.error("Unexpected error while pushing producer metrics to Kafka.", e);
        }
    }, 30, 30, TimeUnit.SECONDS);

    public MonitoringProducerInterceptor() {
        this.smmProducerMetricsTopic = null;
        this.smmProducerMetricsTopic = System.getProperty(PROPERTY_PRODUCER_METRICS_TOPIC, DEFAULT_PRODUCER_METRICS_TOPIC);
    }

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
        return producerRecord;
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception exc) {
        if (exc != null || this.closed) {
            return;
        }
        if (!recordMetadata.hasTimestamp()) {
            LOG.error("Record does not have a timestamp.");
            return;
        }
        String str = recordMetadata.topic();
        int partition = recordMetadata.partition();
        this.metricsLock.lock();
        try {
            if (recordMetadata.hasTimestamp()) {
                this.metricsBufferByTopicPartition.computeIfAbsent(str + Constants.DELIMITER + partition, str2 -> {
                    return new MonitoringProducerMetrics(str, partition);
                }).updateCount((recordMetadata.timestamp() / 30000) * 30);
            }
        } finally {
            this.metricsLock.unlock();
        }
    }

    public void close() {
        this.closed = true;
        this.metricsFuture.cancel(false);
        pushMetricsToKafka();
        this.metricsProducer.close(Duration.ofSeconds(3L));
    }

    public void configure(Map<String, ?> map) {
        initializeMetricsProducer(map);
    }

    void initializeMetricsProducer(Map<String, ?> map) {
        Map<String, Object> interceptorProducerConfig = InterceptorProducerConfigurationHelper.getInterceptorProducerConfig(map);
        interceptorProducerConfig.put("value.serializer", new MonitoringProducerMetricsSerde().serializer().getClass().getName());
        this.metricsProducer = new KafkaProducer<>(interceptorProducerConfig);
        TopicUtils.createTopicIfNeeded(interceptorProducerConfig, this.smmProducerMetricsTopic);
    }

    private void pushMetricsToKafka() {
        this.metricsLock.lock();
        try {
            Map<String, MonitoringProducerMetrics> map = this.metricsBufferByTopicPartition;
            this.metricsBufferByTopicPartition = new HashMap();
            map.forEach((str, monitoringProducerMetrics) -> {
                try {
                    this.metricsProducer.send(new ProducerRecord(this.smmProducerMetricsTopic, monitoringProducerMetrics.getTopic(), monitoringProducerMetrics));
                } catch (Exception e) {
                    LOG.error("Failed to push the latency metrics to Kafka.", e);
                }
            });
        } finally {
            this.metricsLock.unlock();
        }
    }
}
