package com.hortonworks.smm.kafka.services.management;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.hortonworks.smm.kafka.common.config.AlertConfig;
import com.hortonworks.smm.kafka.common.config.KafkaAdminClientConfig;
import com.hortonworks.smm.kafka.common.utils.ThreadUtils;
import com.hortonworks.smm.kafka.common.utils.TopicUtils;
import com.hortonworks.smm.kafka.notification.api.AlertNotificationContext;
import com.hortonworks.smm.kafka.notification.api.AlertNotifications;
import com.hortonworks.smm.kafka.notification.api.DefaultNotificationContextWrapper;
import com.hortonworks.smm.kafka.services.Service;
import com.hortonworks.smm.kafka.services.message.TopicMessageService;
import com.hortonworks.smm.kafka.services.notification.storable.UserNotificationStatus;
import com.hortonworks.smm.storage.StorageManager;
import com.hortonworks.smm.storage.exception.StorageException;
import com.hortonworks.smm.storage.search.SearchQuery;
import com.hortonworks.smm.storage.search.WhereClause;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/services/management/NotificationQueryService.class */
public class NotificationQueryService implements Service {
    private static final int CONSUMER_POOL_SIZE = 5;
    private final Pool consumerPool;
    private final long pollTimeoutInMs;
    private UserNotificationStatusStorageManager storageManager;
    private final ScheduledExecutorService executors;
    private final String alertsTopic;
    public static final Logger LOG = LoggerFactory.getLogger(NotificationQueryService.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hortonworks/smm/kafka/services/management/NotificationQueryService$Pool.class */
    public static class Pool {
        private final BlockingQueue<KafkaConsumer<byte[], byte[]>> consumers;
        private final int size;

        Pool(Collection<KafkaConsumer<byte[], byte[]>> collection) {
            this.size = collection.size();
            this.consumers = new ArrayBlockingQueue(this.size, false, collection);
        }

        KafkaConsumer<byte[], byte[]> lease(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.consumers.poll(j, timeUnit);
        }

        KafkaConsumer<byte[], byte[]> lease() throws InterruptedException {
            return this.consumers.take();
        }

        void release(KafkaConsumer<byte[], byte[]> kafkaConsumer) throws InterruptedException {
            this.consumers.put(kafkaConsumer);
        }

        void close() {
            for (int i = 0; i < this.size; i++) {
                try {
                    KafkaConsumer<byte[], byte[]> lease = lease(10L, TimeUnit.SECONDS);
                    if (lease != null) {
                        lease.close();
                    }
                } catch (Exception e) {
                    NotificationQueryService.LOG.error("Error while closing consumer", e);
                }
            }
        }
    }

    /* loaded from: input_file:com/hortonworks/smm/kafka/services/management/NotificationQueryService$UserNotificationStatusStorageManager.class */
    public interface UserNotificationStatusStorageManager extends Runnable {
        void writeUserNotificationStatus(String str, Integer num, Long l);

        void deleteUserNotificationStatus(String str, Integer num, Long l);

        Map<Integer, Set<Long>> readUserNotificationStatus(String str);
    }

    /* loaded from: input_file:com/hortonworks/smm/kafka/services/management/NotificationQueryService$UserNotificationStatusStorageManagerImpl.class */
    private static final class UserNotificationStatusStorageManagerImpl implements UserNotificationStatusStorageManager {
        private final NotificationQueryService queryService;
        private final StorageManager storageManager;

        private UserNotificationStatusStorageManagerImpl(NotificationQueryService notificationQueryService, StorageManager storageManager) {
            this.queryService = notificationQueryService;
            this.storageManager = storageManager;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                NotificationQueryService.LOG.info("Clearing the expired UserNotificationStatus");
                for (Map.Entry entry : this.queryService.getBeginningOffsets().entrySet()) {
                    int partition = ((TopicPartition) entry.getKey()).partition();
                    Collection<UserNotificationStatus> search = this.storageManager.search(SearchQuery.searchFrom(UserNotificationStatus.NAMESPACE).where(WhereClause.begin().eq(UserNotificationStatus.TOPIC_PARTITION, Integer.valueOf(partition)).and().lt(UserNotificationStatus.OFFSET, (Long) entry.getValue()).combine()));
                    if (search == null) {
                        return;
                    }
                    for (UserNotificationStatus userNotificationStatus : search) {
                        try {
                            this.storageManager.remove(userNotificationStatus.getStorableKey());
                        } catch (StorageException e) {
                            NotificationQueryService.LOG.warn("Error while removing the notification status : {}", userNotificationStatus, e);
                        }
                    }
                    NotificationQueryService.LOG.info("Removed {} expired UserNotificationStatus", Integer.valueOf(search.size()));
                }
            } catch (Exception e2) {
                NotificationQueryService.LOG.error("Error while clearing the expired user notification status", e2);
            }
        }

        @Override // com.hortonworks.smm.kafka.services.management.NotificationQueryService.UserNotificationStatusStorageManager
        public void writeUserNotificationStatus(String str, Integer num, Long l) {
            UserNotificationStatus userNotificationStatus = new UserNotificationStatus(str, num, l);
            try {
                this.storageManager.addOrUpdate(userNotificationStatus);
            } catch (StorageException e) {
                NotificationQueryService.LOG.error("Error while adding {}", userNotificationStatus, e);
                throw new StorageException("Error while adding " + userNotificationStatus, e);
            }
        }

        @Override // com.hortonworks.smm.kafka.services.management.NotificationQueryService.UserNotificationStatusStorageManager
        public void deleteUserNotificationStatus(String str, Integer num, Long l) {
            UserNotificationStatus userNotificationStatus = new UserNotificationStatus(str, num, l);
            try {
                this.storageManager.remove(userNotificationStatus.getStorableKey());
            } catch (StorageException e) {
                NotificationQueryService.LOG.error("Error while deleting {}", userNotificationStatus, e);
                throw new StorageException("Error while deleting " + userNotificationStatus, e);
            }
        }

        @Override // com.hortonworks.smm.kafka.services.management.NotificationQueryService.UserNotificationStatusStorageManager
        public Map<Integer, Set<Long>> readUserNotificationStatus(String str) {
            HashMap hashMap = new HashMap();
            try {
                Collection<UserNotificationStatus> search = this.storageManager.search(SearchQuery.searchFrom(UserNotificationStatus.NAMESPACE).where(WhereClause.begin().eq(UserNotificationStatus.USER_NAME, str).combine()));
                if (search != null) {
                    for (UserNotificationStatus userNotificationStatus : search) {
                        ((Set) hashMap.computeIfAbsent(userNotificationStatus.getTopicPartition(), num -> {
                            return new HashSet();
                        })).add(userNotificationStatus.getOffset());
                    }
                }
                return hashMap;
            } catch (StorageException e) {
                NotificationQueryService.LOG.error("Error while reading status for user : {}", str, e);
                throw new StorageException("Error while reading user notification status for user: " + str, e);
            }
        }
    }

    @Inject
    public NotificationQueryService(KafkaAdminClientConfig kafkaAdminClientConfig, AlertConfig alertConfig, StorageManager storageManager) {
        Objects.requireNonNull(alertConfig, "alertConfig can not be null");
        Map<String, String> kafkaConsumerConfig = alertConfig.kafkaConsumerConfig();
        this.pollTimeoutInMs = Long.parseLong(kafkaConsumerConfig.getOrDefault(TopicMessageService.POLLING_TIMEOUT_INTERVAL_PROPERTY, "1000"));
        this.alertsTopic = alertConfig.kafkaAlertNotificationsTopic();
        TopicUtils.createTopicIfNeeded(kafkaAdminClientConfig, this.alertsTopic);
        this.consumerPool = initConsumerPool(kafkaConsumerConfig);
        this.storageManager = new UserNotificationStatusStorageManagerImpl(storageManager);
        this.executors = ThreadUtils.createScheduledExecutorService(1, "notification-status-cleaner-%d", true);
        this.executors.scheduleWithFixedDelay(this.storageManager, 0L, 1L, TimeUnit.DAYS);
    }

    @VisibleForTesting
    public void setUserNotificationStatusStorageManager(UserNotificationStatusStorageManager userNotificationStatusStorageManager) {
        this.storageManager = userNotificationStatusStorageManager;
    }

    private Pool initConsumerPool(Map<String, String> map) {
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < CONSUMER_POOL_SIZE; i++) {
            linkedList.add(createConsumer(map, i));
        }
        return new Pool(linkedList);
    }

    private KafkaConsumer<byte[], byte[]> createConsumer(Map<String, String> map, int i) {
        HashMap hashMap = new HashMap(map);
        hashMap.put("key.deserializer", LongDeserializer.class.getName());
        hashMap.put("value.deserializer", ByteArrayDeserializer.class.getName());
        String str = "__smm_notifications_query_group" + i;
        hashMap.put("client.id", str);
        hashMap.put("group.id", str);
        hashMap.put("enable.auto.commit", "false");
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(hashMap);
        kafkaConsumer.assign(Collections.singletonList(new TopicPartition(this.alertsTopic, 0)));
        return kafkaConsumer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<TopicPartition, Long> getBeginningOffsets() {
        List<PartitionInfo> partitionsFor;
        KafkaConsumer<byte[], byte[]> kafkaConsumer = null;
        try {
            try {
                kafkaConsumer = this.consumerPool.lease();
                partitionsFor = kafkaConsumer.partitionsFor(this.alertsTopic);
            } catch (InterruptedException e) {
                LOG.error("Error while fetching beginning offsets", e);
                if (kafkaConsumer != null) {
                    try {
                        this.consumerPool.release(kafkaConsumer);
                    } catch (InterruptedException e2) {
                    }
                }
            }
            if (partitionsFor == null || partitionsFor.isEmpty()) {
                if (kafkaConsumer != null) {
                    try {
                        this.consumerPool.release(kafkaConsumer);
                    } catch (InterruptedException e3) {
                    }
                }
                return Collections.emptyMap();
            }
            ArrayList arrayList = new ArrayList();
            for (PartitionInfo partitionInfo : partitionsFor) {
                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
            Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(arrayList);
            if (kafkaConsumer != null) {
                try {
                    this.consumerPool.release(kafkaConsumer);
                } catch (InterruptedException e4) {
                }
            }
            return beginningOffsets;
        } catch (Throwable th) {
            if (kafkaConsumer != null) {
                try {
                    this.consumerPool.release(kafkaConsumer);
                } catch (InterruptedException e5) {
                }
            }
            throw th;
        }
    }

    public AlertNotifications alertNotifications(String str, long j, int i) {
        KafkaConsumer<byte[], byte[]> lease;
        TopicPartition topicPartition;
        long longValue;
        long longValue2;
        Preconditions.checkArgument(i > 0, "limit value should be greater than zero");
        Preconditions.checkArgument(j >= -2, "start offset should be greater than  or equal to -2 (-2 is to fetch earliest notifications)");
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        long j2 = j;
        long j3 = -1;
        try {
            try {
                lease = this.consumerPool.lease();
                topicPartition = new TopicPartition(this.alertsTopic, 0);
                List singletonList = Collections.singletonList(topicPartition);
                Map beginningOffsets = lease.beginningOffsets(singletonList);
                Map endOffsets = lease.endOffsets(singletonList);
                longValue = ((Long) beginningOffsets.get(topicPartition)).longValue();
                longValue2 = ((Long) endOffsets.get(topicPartition)).longValue();
            } catch (InterruptedException e) {
                LOG.error("Error while fetching notifications", e);
                if (0 != 0) {
                    try {
                        this.consumerPool.release(null);
                    } catch (InterruptedException e2) {
                    }
                }
            }
            if (longValue2 == longValue) {
                AlertNotifications alertNotifications = new AlertNotifications(linkedHashMap, Long.valueOf(longValue2), Long.valueOf(longValue2));
                if (lease != null) {
                    try {
                        this.consumerPool.release(lease);
                    } catch (InterruptedException e3) {
                    }
                }
                return alertNotifications;
            }
            if (j == -1) {
                if (longValue2 - longValue >= i) {
                    lease.seek(topicPartition, longValue2 - i);
                } else {
                    lease.seek(topicPartition, longValue);
                }
                j3 = longValue2 - longValue >= 2 * ((long) i) ? longValue2 - (2 * i) : longValue;
            } else if (j == -2) {
                lease.seek(topicPartition, longValue);
                j3 = longValue;
            } else {
                if (j <= longValue2) {
                    longValue2 = j;
                }
                lease.seek(topicPartition, longValue2);
                j3 = longValue2 - longValue >= ((long) i) ? longValue2 - i : longValue;
            }
            Set<Long> orDefault = markedAlertNotifications(str).getOrDefault(Integer.valueOf(topicPartition.partition()), Collections.emptySet());
            int i2 = 0;
            while (i2 < i) {
                ConsumerRecords poll = lease.poll(Duration.ofMillis(this.pollTimeoutInMs));
                if (poll.count() == 0) {
                    break;
                }
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    byte[] bArr = (byte[]) consumerRecord.value();
                    try {
                        linkedHashMap.put(Long.valueOf(consumerRecord.offset()), new DefaultNotificationContextWrapper((AlertNotificationContext) OBJECT_MAPPER.readValue(new String(bArr, 0, bArr.length, StandardCharsets.UTF_8), AlertNotificationContext.class), orDefault.contains(Long.valueOf(consumerRecord.offset()))));
                        j2 = consumerRecord.offset() + 1;
                        i2++;
                    } catch (Exception e4) {
                        LOG.error("Error occurred for notification with offset [{}]", Long.valueOf(consumerRecord.offset()), e4);
                    }
                    if (i2 == i) {
                        break;
                    }
                }
            }
            if (lease != null) {
                try {
                    this.consumerPool.release(lease);
                } catch (InterruptedException e5) {
                }
            }
            return new AlertNotifications(linkedHashMap, Long.valueOf(j2), Long.valueOf(j3));
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    this.consumerPool.release(null);
                } catch (InterruptedException e6) {
                }
            }
            throw th;
        }
    }

    public void markAlertNotifications(String str, List<Long> list) {
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            this.storageManager.writeUserNotificationStatus(str, 0, it.next());
        }
    }

    public void unmarkAlertNotifications(String str, List<Long> list) {
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            this.storageManager.deleteUserNotificationStatus(str, 0, it.next());
        }
    }

    private Map<Integer, Set<Long>> markedAlertNotifications(String str) {
        return this.storageManager.readUserNotificationStatus(str);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.consumerPool.close();
    }
}
