package com.hortonworks.smm.kafka.notification;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hortonworks.smm.kafka.ResourceType;
import com.hortonworks.smm.kafka.common.config.AlertConfig;
import com.hortonworks.smm.kafka.common.config.KafkaAdminClientConfig;
import com.hortonworks.smm.kafka.notification.api.AlertNotificationContext;
import com.hortonworks.smm.kafka.notification.api.AlertNotifications;
import com.hortonworks.smm.kafka.notification.api.DefaultNotificationContextWrapper;
import com.hortonworks.smm.kafka.notification.api.Resource;
import com.hortonworks.smm.kafka.services.management.NotificationQueryService;
import com.hortonworks.smm.storage.impl.memory.InMemoryStorageManager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:com/hortonworks/smm/kafka/notification/NotificationQueryServiceTest.class */
public class NotificationQueryServiceTest {
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static KafkaAdminClientConfig clientConfig;
    private static final String alertsTopic = "__smm_alert_notifications";
    private static final String userName = "anonymous";

    /* loaded from: input_file:com/hortonworks/smm/kafka/notification/NotificationQueryServiceTest$InMemoryUserNotificationStatusStorageManager.class */
    private static final class InMemoryUserNotificationStatusStorageManager implements NotificationQueryService.UserNotificationStatusStorageManager {
        final Map<String, Map<Integer, Set<Long>>> userToPartitionWiseOffsets;

        private InMemoryUserNotificationStatusStorageManager() {
            this.userToPartitionWiseOffsets = new HashMap();
        }

        public void writeUserNotificationStatus(String str, Integer num, Long l) {
            this.userToPartitionWiseOffsets.computeIfAbsent(str, str2 -> {
                return new HashMap();
            }).computeIfAbsent(num, num2 -> {
                return new HashSet();
            }).add(l);
        }

        public void deleteUserNotificationStatus(String str, Integer num, Long l) {
            this.userToPartitionWiseOffsets.computeIfAbsent(str, str2 -> {
                return new HashMap();
            }).computeIfAbsent(num, num2 -> {
                return new HashSet();
            }).remove(l);
        }

        public Map<Integer, Set<Long>> readUserNotificationStatus(String str) {
            Map<Integer, Set<Long>> map = this.userToPartitionWiseOffsets.get(str);
            return map != null ? map : Collections.emptyMap();
        }

        public void run() {
        }
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void afterClass() {
        CLUSTER.stop();
    }

    @Before
    public void before() {
        clientConfig = new KafkaAdminClientConfig(CLUSTER.bootstrapServers());
    }

    @After
    public void after() throws InterruptedException {
        CLUSTER.deleteTopicAndWait(alertsTopic);
    }

    @Test
    public void testFetchAlertNotifications() throws Exception {
        NotificationQueryService notificationQueryService = new NotificationQueryService(clientConfig, getAlertConfig(), new InMemoryStorageManager());
        notificationQueryService.setUserNotificationStatusStorageManager(new InMemoryUserNotificationStatusStorageManager());
        AlertNotifications alertNotifications = notificationQueryService.alertNotifications(userName, 0L, 10);
        Assert.assertEquals(0L, alertNotifications.notifications().size());
        Assert.assertEquals(new Long(0L), alertNotifications.nextOffset());
        Assert.assertEquals(new Long(0L), alertNotifications.previousOffset());
        AlertNotifications alertNotifications2 = notificationQueryService.alertNotifications(userName, -1L, 10);
        Assert.assertEquals(0L, alertNotifications2.notifications().size());
        Assert.assertEquals(new Long(0L), alertNotifications2.nextOffset());
        Assert.assertEquals(new Long(0L), alertNotifications2.previousOffset());
        AlertNotifications alertNotifications3 = notificationQueryService.alertNotifications(userName, -2L, 10);
        Assert.assertEquals(0L, alertNotifications3.notifications().size());
        Assert.assertEquals(new Long(0L), alertNotifications3.nextOffset());
        Assert.assertEquals(new Long(0L), alertNotifications3.previousOffset());
        AlertNotifications alertNotifications4 = notificationQueryService.alertNotifications(userName, 10L, 10);
        Assert.assertEquals(0L, alertNotifications4.notifications().size());
        Assert.assertEquals(new Long(0L), alertNotifications4.nextOffset());
        Assert.assertEquals(new Long(0L), alertNotifications4.previousOffset());
        IntegrationTestUtils.produceValuesSynchronously(alertsTopic, generateAlerts(100), generateProducerProps(), CLUSTER.time);
        CountDownLatch countDownLatch = new CountDownLatch(10);
        List list = (List) IntStream.range(0, 10).mapToObj(i -> {
            return new Runnable() { // from class: com.hortonworks.smm.kafka.notification.NotificationQueryServiceTest.1
                @Override // java.lang.Runnable
                public void run() {
                    AlertNotifications alertNotifications5 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, -1L, 10);
                    Assert.assertEquals(10L, alertNotifications5.notifications().size());
                    Assert.assertEquals(new Long(100L), alertNotifications5.nextOffset());
                    Assert.assertEquals(new Long(80L), alertNotifications5.previousOffset());
                    AlertNotifications alertNotifications6 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, alertNotifications5.previousOffset().longValue(), 10);
                    Assert.assertEquals(10L, alertNotifications6.notifications().size());
                    Assert.assertEquals(new Long(90L), alertNotifications6.nextOffset());
                    Assert.assertEquals(new Long(70L), alertNotifications6.previousOffset());
                    AlertNotifications alertNotifications7 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, alertNotifications6.nextOffset().longValue(), 10);
                    Assert.assertEquals(10L, alertNotifications7.notifications().size());
                    Assert.assertEquals(new Long(100L), alertNotifications7.nextOffset());
                    Assert.assertEquals(new Long(80L), alertNotifications7.previousOffset());
                    AlertNotifications alertNotifications8 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, alertNotifications7.nextOffset().longValue(), 10);
                    Assert.assertEquals(0L, alertNotifications8.notifications().size());
                    Assert.assertEquals(new Long(100L), alertNotifications8.nextOffset());
                    Assert.assertEquals(new Long(90L), alertNotifications8.previousOffset());
                    AlertNotifications alertNotifications9 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, alertNotifications8.nextOffset().longValue(), 10);
                    Assert.assertEquals(0L, alertNotifications9.notifications().size());
                    Assert.assertEquals(new Long(100L), alertNotifications9.nextOffset());
                    Assert.assertEquals(new Long(90L), alertNotifications9.previousOffset());
                    AlertNotifications alertNotifications10 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, -1L, 100);
                    Assert.assertEquals(100L, alertNotifications10.notifications().size());
                    Assert.assertEquals(new Long(100L), alertNotifications10.nextOffset());
                    Assert.assertEquals(new Long(0L), alertNotifications10.previousOffset());
                    AlertNotifications alertNotifications11 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, -2L, 10);
                    Assert.assertEquals(10L, alertNotifications11.notifications().size());
                    Assert.assertEquals(new Long(10L), alertNotifications11.nextOffset());
                    Assert.assertEquals(new Long(0L), alertNotifications11.previousOffset());
                    AlertNotifications alertNotifications12 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, alertNotifications11.nextOffset().longValue(), 10);
                    Assert.assertEquals(10L, alertNotifications12.notifications().size());
                    Assert.assertEquals(new Long(20L), alertNotifications12.nextOffset());
                    Assert.assertEquals(new Long(0L), alertNotifications12.previousOffset());
                    AlertNotifications alertNotifications13 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, alertNotifications12.previousOffset().longValue(), 10);
                    Assert.assertEquals(10L, alertNotifications13.notifications().size());
                    Assert.assertEquals(new Long(10L), alertNotifications13.nextOffset());
                    Assert.assertEquals(new Long(0L), alertNotifications13.previousOffset());
                    AlertNotifications alertNotifications14 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, 100L, 10);
                    Assert.assertEquals(0L, alertNotifications14.notifications().size());
                    Assert.assertEquals(new Long(100L), alertNotifications14.nextOffset());
                    Assert.assertEquals(new Long(90L), alertNotifications14.previousOffset());
                    AlertNotifications alertNotifications15 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, 50L, 50);
                    Assert.assertEquals(50L, alertNotifications15.notifications().size());
                    Assert.assertEquals(new Long(100L), alertNotifications15.nextOffset());
                    Assert.assertEquals(new Long(0L), alertNotifications15.previousOffset());
                    AlertNotifications alertNotifications16 = notificationQueryService.alertNotifications(NotificationQueryServiceTest.userName, alertNotifications15.previousOffset().longValue(), 50);
                    Assert.assertEquals(50L, alertNotifications16.notifications().size());
                    Assert.assertEquals(new Long(50L), alertNotifications16.nextOffset());
                    Assert.assertEquals(new Long(0L), alertNotifications16.previousOffset());
                    countDownLatch.countDown();
                }
            };
        }).collect(Collectors.toList());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("notification-query-task-%d").setDaemon(true).build());
        try {
            newFixedThreadPool.getClass();
            list.forEach(newFixedThreadPool::submit);
            if (!countDownLatch.await(120L, TimeUnit.SECONDS)) {
                Assert.fail("Tests are not completed");
            }
        } finally {
            newFixedThreadPool.shutdownNow();
            notificationQueryService.close();
        }
    }

    @Test
    public void testMarkAlertNotification() throws Exception {
        NotificationQueryService notificationQueryService = new NotificationQueryService(clientConfig, getAlertConfig(), new InMemoryStorageManager());
        notificationQueryService.setUserNotificationStatusStorageManager(new InMemoryUserNotificationStatusStorageManager());
        IntegrationTestUtils.produceValuesSynchronously(alertsTopic, generateAlerts(50), generateProducerProps(), CLUSTER.time);
        AlertNotifications alertNotifications = notificationQueryService.alertNotifications(userName, 0L, 50);
        Assert.assertEquals(50L, alertNotifications.notifications().size());
        List list = (List) alertNotifications.notifications().keySet().stream().filter(l -> {
            return l.longValue() % 2 == 0;
        }).collect(Collectors.toList());
        notificationQueryService.markAlertNotifications(userName, list);
        AlertNotifications alertNotifications2 = notificationQueryService.alertNotifications(userName, 0L, 50);
        Assert.assertEquals(50L, alertNotifications2.notifications().size());
        Assert.assertEquals(list, (List) alertNotifications2.notifications().entrySet().stream().filter(entry -> {
            return ((DefaultNotificationContextWrapper) entry.getValue()).isMarked();
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList()));
        notificationQueryService.close();
    }

    @Test
    public void testUnmarkAlertNotifications() throws Exception {
        NotificationQueryService notificationQueryService = new NotificationQueryService(clientConfig, getAlertConfig(), new InMemoryStorageManager());
        notificationQueryService.setUserNotificationStatusStorageManager(new InMemoryUserNotificationStatusStorageManager());
        IntegrationTestUtils.produceValuesSynchronously(alertsTopic, generateAlerts(50), generateProducerProps(), CLUSTER.time);
        AlertNotifications alertNotifications = notificationQueryService.alertNotifications(userName, 0L, 50);
        Assert.assertEquals(50L, alertNotifications.notifications().size());
        List list = (List) alertNotifications.notifications().keySet().stream().filter(l -> {
            return l.longValue() % 2 == 0;
        }).collect(Collectors.toList());
        notificationQueryService.markAlertNotifications(userName, list);
        AlertNotifications alertNotifications2 = notificationQueryService.alertNotifications(userName, 0L, 50);
        Assert.assertEquals(50L, alertNotifications2.notifications().size());
        Assert.assertEquals(list, (List) alertNotifications2.notifications().entrySet().stream().filter(entry -> {
            return ((DefaultNotificationContextWrapper) entry.getValue()).isMarked();
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList()));
        notificationQueryService.unmarkAlertNotifications(userName, new ArrayList(list));
        AlertNotifications alertNotifications3 = notificationQueryService.alertNotifications(userName, 0L, 50);
        Assert.assertEquals(50L, alertNotifications3.notifications().size());
        Assert.assertEquals(LongStream.range(0L, 50L).boxed().collect(Collectors.toList()), (List) alertNotifications3.notifications().entrySet().stream().filter(entry2 -> {
            return !((DefaultNotificationContextWrapper) entry2.getValue()).isMarked();
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList()));
        notificationQueryService.close();
    }

    private Properties generateProducerProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("acks", "all");
        properties.put("key.serializer", ByteArraySerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        return properties;
    }

    private static AlertConfig getAlertConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", CLUSTER.bootstrapServers());
        hashMap.put("poll.timeout.ms", "2000");
        return new AlertConfig(2, alertsTopic, true, false, hashMap, hashMap);
    }

    private List<byte[]> generateAlerts(int i) throws Exception {
        ArrayList arrayList = new ArrayList(i);
        ObjectMapper objectMapper = new ObjectMapper();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= i) {
                return arrayList;
            }
            Resource resource = new Resource(ResourceType.BROKER, "broker");
            arrayList.add(objectMapper.writeValueAsBytes(new AlertNotificationContext("payload-" + j2, Collections.emptyMap(), Long.valueOf(j2), "alert-name", Lists.newArrayList(new Long[]{1L}), resource, Lists.newArrayList(new Resource[]{resource}), System.currentTimeMillis())));
            j = j2 + 1;
        }
    }
}
