package com.hortonworks.smm.kafka.services.message;

import com.google.common.collect.Lists;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import com.hortonworks.smm.kafka.common.config.KafkaConsumerConfig;
import com.hortonworks.smm.kafka.common.errors.NotFoundException;
import com.hortonworks.smm.kafka.common.utils.ValidationUtils;
import com.hortonworks.smm.kafka.services.Service;
import com.hortonworks.smm.kafka.services.message.dtos.DeserializerInfo;
import com.hortonworks.smm.kafka.services.message.dtos.PartitionOffsetInfo;
import com.hortonworks.smm.kafka.services.message.dtos.SerdesMappings;
import com.hortonworks.smm.kafka.services.message.dtos.TopicContent;
import com.hortonworks.smm.kafka.services.message.dtos.TopicOffsetInfo;
import com.hortonworks.smm.kafka.services.message.dtos.TopicRecord;
import com.hortonworks.smm.kafka.services.message.util.TopicMessageServiceUtil;
import com.hortonworks.smm.kafka.services.schema.SchemaRegistryService;
import com.hortonworks.smm.kafka.services.schema.dtos.TopicSchemaMapping;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.UUID;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.Validate;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/services/message/TopicMessageService.class */
public class TopicMessageService implements Service {
    public static final String POLLING_TIMEOUT_INTERVAL_PROPERTY = "poll.timeout.ms";
    private static final String CONSUMER_ID_FORMAT = "_streams_messaging_manager_consumer_%s";
    public static final long DEFAULT_RESPONSE_TIME_OUT_IN_MS = 30000;
    private KafkaConsumerConfig config;
    private SchemaRegistryService schemaRegistryService;
    private long pollingIntervalInMs;
    private Duration pollintIntervalDuration;
    private static final String DEFAULT_DESERIALIZER_CLASS = StringDeserializer.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger(TopicMessageService.class);
    private static final SerdesMappings SERDES_MAPPING = new SerdesMappings(Lists.newArrayList(new DeserializerInfo[]{new DeserializerInfo("String", StringDeserializer.class.getName(), "A deserializer for String type"), new DeserializerInfo("Short", ShortDeserializer.class.getName(), "A deserializer for Short type"), new DeserializerInfo("Integer", IntegerDeserializer.class.getName(), "A deserializer for Integer type"), new DeserializerInfo("Long", LongDeserializer.class.getName(), "A deserializer for Long type"), new DeserializerInfo("Float", FloatDeserializer.class.getName(), "A deserializer for Float type"), new DeserializerInfo("Double", DoubleDeserializer.class.getName(), "A deserializer for Double type"), new DeserializerInfo("Bytes", BytesDeserializer.class.getName(), "A deserializer for Bytes type"), new DeserializerInfo("ByteArray", ByteArrayDeserializer.class.getName(), "A deserializer for ByteArray type"), new DeserializerInfo("ByteBuffer", ByteBufferDeserializer.class.getName(), "A deserializer for ByteBuffer type"), new DeserializerInfo("Avro", KafkaAvroDeserializer.class.getName(), "A deserializer for KafkaAvro type")}));

    @Inject
    public TopicMessageService(KafkaConsumerConfig kafkaConsumerConfig, SchemaRegistryService schemaRegistryService) {
        this.config = kafkaConsumerConfig;
        this.schemaRegistryService = schemaRegistryService;
        this.pollingIntervalInMs = kafkaConsumerConfig.getPollTimeOutMs();
        this.pollintIntervalDuration = Duration.of(this.pollingIntervalInMs, ChronoUnit.MILLIS);
    }

    public TopicOffsetInfo getOffset(String str) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(generateConsumerConfig(this.config, null, DEFAULT_DESERIALIZER_CLASS, DEFAULT_DESERIALIZER_CLASS));
        Throwable th = null;
        try {
            List partitionsFor = kafkaConsumer.partitionsFor(str);
            if (CollectionUtils.isEmpty(partitionsFor)) {
                throw new NotFoundException("Topic [" + str + "] doesn't exists");
            }
            ArrayList arrayList = new ArrayList();
            Iterator it = partitionsFor.iterator();
            while (it.hasNext()) {
                arrayList.add(new TopicPartition(str, ((PartitionInfo) it.next()).partition()));
            }
            Map beginningOffsets = kafkaConsumer.beginningOffsets(arrayList);
            Map endOffsets = kafkaConsumer.endOffsets(arrayList);
            LOG.info("startOffset[{}], endOffset [{}]", beginningOffsets, endOffsets);
            ArrayList arrayList2 = new ArrayList();
            for (Map.Entry entry : beginningOffsets.entrySet()) {
                arrayList2.add(new PartitionOffsetInfo(((TopicPartition) entry.getKey()).partition(), ((Long) entry.getValue()).longValue(), ((Long) endOffsets.get(entry.getKey())).longValue()));
            }
            TopicOffsetInfo topicOffsetInfo = new TopicOffsetInfo(arrayList2);
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            return topicOffsetInfo;
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    public TopicContent getTopicContent(String str, int i, Long l, Long l2, Long l3, Integer num, Integer num2, String str2, String str3) {
        return getTopicContent(str, i, l, l2, l3, num, num2, str2, str3, DEFAULT_RESPONSE_TIME_OUT_IN_MS);
    }

    public TopicContent getTopicContent(String str, int i, Long l, Long l2, Long l3, Integer num, Integer num2, String str2, String str3, long j) {
        validateInputs(str, l, l2, l3, num, num2);
        KafkaConsumer<Object, Object> kafkaConsumer = new KafkaConsumer<>(generateConsumerConfig(this.config, l3, str2, str3));
        Throwable th = null;
        try {
            checkTopicPartition(str, i, kafkaConsumer);
            if (l.equals(l2)) {
                TopicContent buildTopicContent = buildTopicContent(this.schemaRegistryService.getTopicSchemaMapping(str), Collections.emptyMap());
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return buildTopicContent;
            }
            TopicPartition topicPartition = new TopicPartition(str, i);
            kafkaConsumer.assign(Collections.singletonList(topicPartition));
            Long l4 = (Long) kafkaConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
            LOG.info("logEndOffset [{}], endOffSet[{}], startOffset [{}], numOfRecords [{}]", new Object[]{l4, l2, l, l3});
            Validate.validState(l.longValue() <= l4.longValue(), "Given startOffset %d is more than log end offset %d", new Object[]{l, l4});
            if (l2 != null) {
                Validate.validState(l2.longValue() <= l4.longValue(), "Given endOffset %d is more than log end offset %d", new Object[]{l2, l4});
            }
            kafkaConsumer.seek(topicPartition, l.longValue());
            TreeMap treeMap = new TreeMap();
            long currentTimeMillis = System.currentTimeMillis() + j;
            boolean z = true;
            do {
                ConsumerRecords poll = kafkaConsumer.poll(this.pollintIntervalDuration);
                if (!poll.isEmpty()) {
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord<Object, Object> consumerRecord = (ConsumerRecord) it.next();
                        TopicRecord extractTopicRecord = extractTopicRecord(num, num2, consumerRecord);
                        long offset = consumerRecord.offset();
                        if (l2 != null) {
                            if (offset < l2.longValue() && offset + 1 <= l4.longValue()) {
                                treeMap.put(Long.valueOf(offset), extractTopicRecord);
                                if (l2.longValue() == offset - 1 || offset + 1 == l4.longValue()) {
                                    z = false;
                                    break;
                                }
                            } else {
                                z = false;
                                break;
                            }
                        }
                        if (l3 != null) {
                            treeMap.put(Long.valueOf(offset), extractTopicRecord);
                            l3 = Long.valueOf(l3.longValue() - 1);
                            if (l3.longValue() == 0 || offset + 1 >= l4.longValue()) {
                                z = false;
                                break;
                            }
                        }
                    }
                    if (!z) {
                        break;
                    }
                } else {
                    break;
                }
            } while (currentTimeMillis - System.currentTimeMillis() > 0);
            TopicContent buildTopicContent2 = buildTopicContent(this.schemaRegistryService.getTopicSchemaMapping(str), treeMap);
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            return buildTopicContent2;
        } catch (Throwable th4) {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th4;
        }
    }

    private TopicRecord extractTopicRecord(Integer num, Integer num2, ConsumerRecord<Object, Object> consumerRecord) {
        Integer num3 = null;
        Integer num4 = null;
        Object serialize = TopicMessageServiceUtil.serialize(consumerRecord.key());
        Object serialize2 = TopicMessageServiceUtil.serialize(consumerRecord.value());
        if (shouldBeTruncated(serialize, num)) {
            String str = (String) serialize;
            num3 = Integer.valueOf(str.length());
            serialize = str.substring(0, num.intValue());
        }
        if (shouldBeTruncated(serialize2, num2)) {
            String str2 = (String) serialize2;
            num4 = Integer.valueOf(str2.length());
            serialize2 = str2.substring(0, num2.intValue());
        }
        return new TopicRecord(serialize, serialize2, num3, num4, consumerRecord.timestampType(), consumerRecord.timestamp());
    }

    private boolean shouldBeTruncated(Object obj, Integer num) {
        return (num == null || num.intValue() == 0 || obj == null || !(obj instanceof String) || ((String) obj).length() <= num.intValue()) ? false : true;
    }

    private void validateInputs(String str, Long l, Long l2, Long l3, Integer num, Integer num2) {
        Objects.requireNonNull(str, "topicName can not be null");
        ValidationUtils.onlyOneElementIsNotNull("Only one element is allowed to be provided among: \"numOfRecords\", \"endOffset\", !", new Object[]{l3, l2});
        Validate.inclusiveBetween(0L, Long.MAX_VALUE, l.longValue());
        if (l2 != null) {
            Validate.inclusiveBetween(0L, Long.MAX_VALUE, l2.longValue());
            Validate.isTrue(l2.longValue() >= l.longValue(), "\"startOffset\" should be less than or equal to \"endOffset\"", new Object[0]);
        }
        if (l3 != null) {
            Validate.inclusiveBetween(0L, Long.MAX_VALUE, l3.longValue());
        }
        if (num != null) {
            Validate.inclusiveBetween(0L, 2147483647L, num.intValue());
        }
        if (num2 != null) {
            Validate.inclusiveBetween(0L, 2147483647L, num2.intValue());
        }
    }

    public SerdesMappings getSerdesMapping() {
        return SERDES_MAPPING;
    }

    private void checkTopicPartition(String str, int i, KafkaConsumer<Object, Object> kafkaConsumer) {
        List partitionsFor = kafkaConsumer.partitionsFor(str);
        if (partitionsFor != null) {
            Iterator it = partitionsFor.iterator();
            while (it.hasNext()) {
                if (((PartitionInfo) it.next()).partition() == i) {
                    return;
                }
            }
        }
        throw new NotFoundException("Given topic [" + str + "] partition [" + i + "] does not exist");
    }

    private TopicContent buildTopicContent(TopicSchemaMapping topicSchemaMapping, Map<Long, TopicRecord> map) {
        return new TopicContent(topicSchemaMapping.getKeySchemaName(), topicSchemaMapping.getValueSchemaName(), map);
    }

    private Map<String, Object> generateConsumerConfig(KafkaConsumerConfig kafkaConsumerConfig, Long l, String str, String str2) {
        HashMap hashMap = new HashMap(kafkaConsumerConfig.getConfig());
        hashMap.put("client.id", String.format(CONSUMER_ID_FORMAT, UUID.randomUUID()));
        hashMap.put("key.deserializer", str);
        hashMap.put("value.deserializer", str2);
        if (l != null && l.longValue() != 0) {
            hashMap.put("max.poll.records", Integer.valueOf(l.intValue()));
        }
        return hashMap;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
    }
}
