package com.hortonworks.smm.kafka.services;

import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.smm.kafka.common.config.KafkaConsumerConfig;
import com.hortonworks.smm.kafka.common.errors.NotFoundException;
import com.hortonworks.smm.kafka.common.utils.AdminClientUtil;
import com.hortonworks.smm.kafka.services.config.KafkaProducerConfig;
import com.hortonworks.smm.kafka.services.extension.KafkaAdminServiceTest;
import com.hortonworks.smm.kafka.services.helper.KafkaProducerPayloadGenerator;
import com.hortonworks.smm.kafka.services.message.TopicMessageService;
import com.hortonworks.smm.kafka.services.message.dtos.PartitionOffsetInfo;
import com.hortonworks.smm.kafka.services.message.dtos.TopicContent;
import com.hortonworks.smm.kafka.services.message.dtos.TopicOffsetInfo;
import com.hortonworks.smm.kafka.services.message.dtos.TopicRecord;
import com.hortonworks.smm.kafka.services.message.util.TopicMessageServiceUtil;
import com.hortonworks.smm.kafka.services.schema.SchemaRegistryService;
import com.hortonworks.smm.kafka.services.schema.dtos.TopicSchemaMapping;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestTemplate;

@DisplayName("Topic message service tests")
@KafkaAdminServiceTest(numBrokerNodes = 3)
/* loaded from: input_file:com/hortonworks/smm/kafka/services/TopicMessageServiceTest.class */
class TopicMessageServiceTest {
    private static final int TIMEOUT_MS = 60000;

    TopicMessageServiceTest() {
    }

    @DisplayName("Test record truncation")
    @TestTemplate
    void testTruncation(AdminClient adminClient, KafkaProducerConfig kafkaProducerConfig, KafkaConsumerConfig kafkaConsumerConfig, SchemaRegistryService schemaRegistryService, TestInfo testInfo) throws Exception {
        String name = ((Method) testInfo.getTestMethod().get()).getName();
        Short sh = 2;
        String name2 = StringSerializer.class.getName();
        String name3 = StringSerializer.class.getName();
        AdminClientUtil.createTopic(adminClient, new NewTopic(name, 2, sh.shortValue()), 60000L);
        KafkaProducer createKafkaProducer = createKafkaProducer(kafkaProducerConfig, name2, name3);
        Throwable th = null;
        for (int i = 0; i < 2; i++) {
            for (int i2 = 0; i2 < 10; i2++) {
                try {
                    try {
                        createKafkaProducer.send(new ProducerRecord(name, Integer.valueOf(i), "1111111111", "1111111111")).get();
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (createKafkaProducer != null) {
                        if (th != null) {
                            try {
                                createKafkaProducer.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            createKafkaProducer.close();
                        }
                    }
                    throw th2;
                }
            }
        }
        if (createKafkaProducer != null) {
            if (0 != 0) {
                try {
                    createKafkaProducer.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                createKafkaProducer.close();
            }
        }
        KafkaProducerPayloadGenerator kafkaProducerPayloadGenerator = new KafkaProducerPayloadGenerator(KafkaProducerPayloadGenerator.Type.STRING);
        for (Map.Entry entry : topicMessageService(kafkaConsumerConfig, schemaRegistryService).getTopicContent(name, 0, 0L, (Long) null, 10L, 2, 2, kafkaProducerPayloadGenerator.getDeserializerClass(), kafkaProducerPayloadGenerator.getDeserializerClass()).getOffsetToRecordMap().entrySet()) {
            Assertions.assertEquals("11", ((TopicRecord) entry.getValue()).getKey());
            Assertions.assertEquals(10, ((TopicRecord) entry.getValue()).getOriginalKeyLength());
            Assertions.assertEquals("11", ((TopicRecord) entry.getValue()).getValue());
            Assertions.assertEquals(10, ((TopicRecord) entry.getValue()).getOriginalValueLength());
        }
        Iterator it = topicMessageService(kafkaConsumerConfig, schemaRegistryService).getTopicContent(name, 0, 0L, (Long) null, 10L, 10, 10, kafkaProducerPayloadGenerator.getDeserializerClass(), kafkaProducerPayloadGenerator.getDeserializerClass()).getOffsetToRecordMap().entrySet().iterator();
        while (it.hasNext()) {
            assertRecordsInCaseNotTruncated("1111111111", (Map.Entry) it.next());
        }
        Iterator it2 = topicMessageService(kafkaConsumerConfig, schemaRegistryService).getTopicContent(name, 0, 0L, (Long) null, 10L, 30, 30, kafkaProducerPayloadGenerator.getDeserializerClass(), kafkaProducerPayloadGenerator.getDeserializerClass()).getOffsetToRecordMap().entrySet().iterator();
        while (it2.hasNext()) {
            assertRecordsInCaseNotTruncated("1111111111", (Map.Entry) it2.next());
        }
    }

    @DisplayName("Test invalid parameters")
    @TestTemplate
    void testInvalidParameters(AdminClient adminClient, KafkaProducerConfig kafkaProducerConfig, KafkaConsumerConfig kafkaConsumerConfig, SchemaRegistryService schemaRegistryService, TestInfo testInfo) throws Exception {
        String name = ((Method) testInfo.getTestMethod().get()).getName();
        Short sh = 2;
        String name2 = DoubleSerializer.class.getName();
        String name3 = DoubleSerializer.class.getName();
        ArrayList arrayList = new ArrayList();
        AdminClientUtil.createTopic(adminClient, new NewTopic(name, 2, sh.shortValue()), 60000L);
        TopicMessageService topicMessageService = topicMessageService(kafkaConsumerConfig, schemaRegistryService);
        String name4 = LongDeserializer.class.getName();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            topicMessageService.getTopicContent(name, 0, 2L, 1L, (Long) null, (Integer) null, (Integer) null, name4, name4);
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            topicMessageService.getTopicContent(name, 0, 1L, 2L, 1L, (Integer) null, (Integer) null, name4, name4);
        });
        produceMessages(kafkaProducerConfig, name2, name3, 2, name, arrayList, 1);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            topicMessageService.getTopicContent(name, 0, 3L, 5L, (Long) null, (Integer) null, (Integer) null, name4, name4);
        });
        AdminClientUtil.deleteTopic(adminClient, name, 60000L);
    }

    @DisplayName("Test get topic offsets")
    @TestTemplate
    void testGetOffset(AdminClient adminClient, KafkaProducerConfig kafkaProducerConfig, KafkaConsumerConfig kafkaConsumerConfig, SchemaRegistryService schemaRegistryService, TestInfo testInfo) throws Exception {
        String name = ((Method) testInfo.getTestMethod().get()).getName();
        Short sh = 2;
        ArrayList arrayList = new ArrayList();
        String name2 = DoubleSerializer.class.getName();
        String name3 = DoubleSerializer.class.getName();
        AdminClientUtil.createTopic(adminClient, new NewTopic(name, 2, sh.shortValue()), 60000L);
        produceMessages(kafkaProducerConfig, name2, name3, 2, name, arrayList, 1);
        Assertions.assertEquals(new TopicOffsetInfo(arrayList), topicMessageService(kafkaConsumerConfig, schemaRegistryService).getOffset(name));
        AdminClientUtil.deleteTopic(adminClient, name, 60000L);
    }

    @DisplayName("Test get offset for non-existent topic")
    @TestTemplate
    void testGetOffsetForNonexistentTopic(KafkaConsumerConfig kafkaConsumerConfig, SchemaRegistryService schemaRegistryService, TestInfo testInfo) {
        String name = ((Method) testInfo.getTestMethod().get()).getName();
        try {
            topicMessageService(kafkaConsumerConfig, schemaRegistryService).getOffset(name);
            Assertions.fail("Should have thrown NotFoundException");
        } catch (NotFoundException e) {
            Assertions.assertEquals("Topic [" + name + "] doesn't exists", e.getMessage());
        }
    }

    @DisplayName("Test topic content")
    @TestTemplate
    void testTopicContent(AdminClient adminClient, KafkaProducerConfig kafkaProducerConfig, KafkaConsumerConfig kafkaConsumerConfig, SchemaRegistryService schemaRegistryService, TestInfo testInfo) throws Exception {
        Short sh = 2;
        for (KafkaProducerPayloadGenerator.Type type : KafkaProducerPayloadGenerator.Type.values()) {
            String str = ((Method) testInfo.getTestMethod().get()).getName() + '.' + type.name();
            String str2 = str + "-keySchema";
            String str3 = str + "-valueSchema";
            ArrayList arrayList = new ArrayList();
            KafkaProducerPayloadGenerator kafkaProducerPayloadGenerator = new KafkaProducerPayloadGenerator(type);
            AdminClientUtil.createTopic(adminClient, new NewTopic(str, 2, sh.shortValue()), 60000L);
            KafkaProducer createKafkaProducer = createKafkaProducer(kafkaProducerConfig, kafkaProducerPayloadGenerator.getSerializerClass(), kafkaProducerPayloadGenerator.getSerializerClass());
            Throwable th = null;
            for (int i = 0; i < 2; i++) {
                try {
                    try {
                        TreeMap treeMap = new TreeMap();
                        Object one = kafkaProducerPayloadGenerator.getOne();
                        Object one2 = kafkaProducerPayloadGenerator.getOne();
                        for (int i2 = 0; i2 < 5; i2++) {
                            RecordMetadata recordMetadata = (RecordMetadata) createKafkaProducer.send(new ProducerRecord(str, Integer.valueOf(i), one, one2)).get();
                            treeMap.put(Long.valueOf(recordMetadata.offset()), new TopicRecord(TopicMessageServiceUtil.serialize(one), TopicMessageServiceUtil.serialize(one2), (Integer) null, (Integer) null, TimestampType.CREATE_TIME, recordMetadata.timestamp()));
                        }
                        arrayList.add(new TopicContent(str2, str3, treeMap));
                    } catch (Throwable th2) {
                        if (createKafkaProducer != null) {
                            if (th != null) {
                                try {
                                    createKafkaProducer.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                createKafkaProducer.close();
                            }
                        }
                        throw th2;
                    }
                } finally {
                }
            }
            if (createKafkaProducer != null) {
                if (0 != 0) {
                    try {
                        createKafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createKafkaProducer.close();
                }
            }
            schemaRegistryService.registerTopicSchemaMapping(str, new TopicSchemaMapping(str2, str3));
            for (int i3 = 0; i3 < 2; i3++) {
                Assertions.assertEquals(arrayList.get(i3), topicMessageService(kafkaConsumerConfig, schemaRegistryService).getTopicContent(str, i3, 0L, 5L, (Long) null, (Integer) null, (Integer) null, kafkaProducerPayloadGenerator.getDeserializerClass(), kafkaProducerPayloadGenerator.getDeserializerClass()));
                Assertions.assertEquals(arrayList.get(i3), topicMessageService(kafkaConsumerConfig, schemaRegistryService).getTopicContent(str, i3, 0L, (Long) null, 5L, (Integer) null, (Integer) null, kafkaProducerPayloadGenerator.getDeserializerClass(), kafkaProducerPayloadGenerator.getDeserializerClass()));
                Assertions.assertEquals(arrayList.get(i3), topicMessageService(kafkaConsumerConfig, schemaRegistryService).getTopicContent(str, i3, 0L, (Long) null, 10L, (Integer) null, (Integer) null, kafkaProducerPayloadGenerator.getDeserializerClass(), kafkaProducerPayloadGenerator.getDeserializerClass()));
                TopicContent topicContent = topicMessageService(kafkaConsumerConfig, schemaRegistryService).getTopicContent(str, i3, 0L, (Long) null, 3L, (Integer) null, (Integer) null, kafkaProducerPayloadGenerator.getDeserializerClass(), kafkaProducerPayloadGenerator.getDeserializerClass());
                Assertions.assertEquals(3, topicContent.getOffsetToRecordMap().size());
                int i4 = 0;
                for (Map.Entry entry : topicContent.getOffsetToRecordMap().entrySet()) {
                    int i5 = i4;
                    i4++;
                    Assertions.assertEquals(i5, (Long) entry.getKey());
                    Assertions.assertEquals(entry.getValue(), ((TopicContent) arrayList.get(i3)).getOffsetToRecordMap().get(entry.getKey()));
                }
            }
            AdminClientUtil.deleteTopic(adminClient, str, 60000L);
        }
    }

    @DisplayName("Test get topic content for non-existent topic")
    @TestTemplate
    void testTopicContentForNonexistentTopic(KafkaConsumerConfig kafkaConsumerConfig, SchemaRegistryService schemaRegistryService, TestInfo testInfo) {
        String name = ((Method) testInfo.getTestMethod().get()).getName();
        String name2 = LongDeserializer.class.getName();
        try {
            Assertions.fail("Should have thrown NotFoundException " + topicMessageService(kafkaConsumerConfig, schemaRegistryService).getTopicContent(name, 0, 0L, 0L, (Long) null, (Integer) null, (Integer) null, name2, name2, 0L));
        } catch (NotFoundException e) {
            Assertions.assertEquals("Given topic [" + name + "] partition [0] does not exist", e.getMessage());
        }
    }

    @DisplayName("Test get topic content for non-existent partition")
    @TestTemplate
    void testTopicContentForNonexistentPartition(AdminClient adminClient, KafkaConsumerConfig kafkaConsumerConfig, SchemaRegistryService schemaRegistryService, TestInfo testInfo) {
        String name = ((Method) testInfo.getTestMethod().get()).getName();
        AdminClientUtil.createTopic(adminClient, new NewTopic(name, 2, (short) 2), 60000L);
        String name2 = LongDeserializer.class.getName();
        try {
            topicMessageService(kafkaConsumerConfig, schemaRegistryService).getTopicContent(name, 20989, 0L, 0L, (Long) null, (Integer) null, (Integer) null, name2, name2);
        } catch (NotFoundException e) {
            Assertions.assertEquals("Given topic [" + name + "] partition [20989] does not exist", e.getMessage());
        }
        AdminClientUtil.deleteTopic(adminClient, name, 60000L);
    }

    private TopicMessageService topicMessageService(KafkaConsumerConfig kafkaConsumerConfig, SchemaRegistryService schemaRegistryService) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(kafkaConsumerConfig.getConfig());
        hashMap.put("poll.timeout.ms", 500);
        hashMap.put("max.poll.records", 1);
        return new TopicMessageService(new KafkaConsumerConfig((String) kafkaConsumerConfig.getConfig().get("bootstrap.servers"), (String) kafkaConsumerConfig.getConfig().get(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name()), new HashMap(), hashMap), schemaRegistryService);
    }

    private <T> KafkaProducer<T, T> createKafkaProducer(KafkaProducerConfig kafkaProducerConfig, String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(kafkaProducerConfig.getProperties());
        hashMap.put("key.serializer", str);
        hashMap.put("value.serializer", str2);
        return new KafkaProducer<>(hashMap);
    }

    private void produceMessages(KafkaProducerConfig kafkaProducerConfig, String str, String str2, int i, String str3, List<PartitionOffsetInfo> list, int i2) throws InterruptedException, ExecutionException {
        KafkaProducer createKafkaProducer = createKafkaProducer(kafkaProducerConfig, str, str2);
        Throwable th = null;
        for (int i3 = 0; i3 < i; i3++) {
            for (int i4 = 0; i4 < i2; i4++) {
                try {
                    try {
                        createKafkaProducer.send(new ProducerRecord(str3, Integer.valueOf(i3), Double.valueOf(Math.random()), Double.valueOf(Math.random()))).get();
                        list.add(new PartitionOffsetInfo(i3, i4, i4 + 1));
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (createKafkaProducer != null) {
                        if (th != null) {
                            try {
                                createKafkaProducer.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createKafkaProducer.close();
                        }
                    }
                    throw th3;
                }
            }
        }
        if (createKafkaProducer != null) {
            if (0 == 0) {
                createKafkaProducer.close();
                return;
            }
            try {
                createKafkaProducer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    private void assertRecordsInCaseNotTruncated(String str, Map.Entry<Long, TopicRecord> entry) {
        Assertions.assertEquals(str, entry.getValue().getKey());
        Assertions.assertNull(entry.getValue().getOriginalKeyLength());
        Assertions.assertEquals(str, entry.getValue().getValue());
        Assertions.assertNull(entry.getValue().getOriginalValueLength());
    }
}
