package com.hortonworks.smm.kafka.notification;

import com.google.common.util.concurrent.RateLimiter;
import com.hortonworks.smm.kafka.notification.api.AlertNotificationContext;
import com.hortonworks.smm.kafka.notification.api.Notifier;
import com.hortonworks.smm.kafka.notification.api.NotifierConfigTemplate;
import com.hortonworks.smm.kafka.notification.api.NotifierConfiguration;
import com.hortonworks.smm.kafka.notification.api.NotifierContext;
import com.hortonworks.smm.kafka.notification.api.RateLimiterConfig;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hortonworks/smm/kafka/notification/NotifierWithRateLimiter.class */
public class NotifierWithRateLimiter implements Notifier {
    private static final Logger LOG = LoggerFactory.getLogger(NotifierWithRateLimiter.class);
    private final RateLimiter rateLimiter;
    private final ExecutorService executorService;
    private final PendingNotifications pendingNotifications;
    private Notifier delegate;
    private Future<?> future;
    private final Object lock = new Object();

    /* loaded from: input_file:com/hortonworks/smm/kafka/notification/NotifierWithRateLimiter$PendingNotifications.class */
    private static class PendingNotifications {
        private final Object lock = new Object();
        private final Map<Long, AlertNotificationContext> alertsWithNotification = new LinkedHashMap();

        PendingNotifications() {
        }

        void add(AlertNotificationContext alertNotificationContext) {
            synchronized (this.lock) {
                this.alertsWithNotification.put(alertNotificationContext.alertPolicyId(), alertNotificationContext);
            }
        }

        AlertNotificationContext lastEntry() {
            AlertNotificationContext alertNotificationContext = null;
            synchronized (this.lock) {
                Iterator<AlertNotificationContext> it = this.alertsWithNotification.values().iterator();
                if (it.hasNext()) {
                    alertNotificationContext = it.next();
                    it.remove();
                }
            }
            return alertNotificationContext;
        }

        boolean isEmpty() {
            boolean isEmpty;
            synchronized (this.lock) {
                isEmpty = this.alertsWithNotification.isEmpty();
            }
            return isEmpty;
        }
    }

    public NotifierWithRateLimiter(Notifier notifier, ExecutorService executorService) {
        RateLimiterConfig rateLimiterConfig = notifier.config().rateLimiterConfig();
        Objects.requireNonNull(rateLimiterConfig, "rateLimiterConfig can not be null");
        this.delegate = notifier;
        this.rateLimiter = RateLimiter.create(rateLimiterConfig.toRateLimitPerSec());
        this.executorService = executorService;
        this.pendingNotifications = new PendingNotifications();
    }

    @Override // com.hortonworks.smm.kafka.notification.api.Notifier
    public Long id() {
        return this.delegate.id();
    }

    @Override // com.hortonworks.smm.kafka.notification.api.Notifier
    public NotifierConfiguration config() {
        return this.delegate.config();
    }

    @Override // com.hortonworks.smm.kafka.notification.api.Notifier
    public void init(NotifierContext<? extends NotifierConfigTemplate> notifierContext) {
        this.delegate.init(notifierContext);
    }

    @Override // com.hortonworks.smm.kafka.notification.api.Notifier
    public void notify(AlertNotificationContext alertNotificationContext) {
        if (this.pendingNotifications.isEmpty() && this.rateLimiter.tryAcquire()) {
            LOG.debug("Notifying as rate limiter acquisition is available for notification [{}]", alertNotificationContext.id());
            this.delegate.notify(alertNotificationContext);
            return;
        }
        this.pendingNotifications.add(alertNotificationContext);
        LOG.debug("Added to pending notifications as rate limiter acquisition is not successful for notification [{}]", alertNotificationContext.id());
        synchronized (this.lock) {
            if (this.future == null || this.future.isDone() || this.future.isCancelled()) {
                LOG.debug("scheduling task here as no running task available");
                this.future = scheduleTask();
            }
        }
    }

    private Future<?> scheduleTask() {
        return this.executorService.submit(() -> {
            while (true) {
                AlertNotificationContext lastEntry = this.pendingNotifications.lastEntry();
                if (lastEntry == null) {
                    LOG.debug("Task is completed as there are no pending notifications");
                    return;
                } else {
                    this.rateLimiter.acquire();
                    this.delegate.notify(lastEntry);
                }
            }
        });
    }

    @Override // com.hortonworks.smm.kafka.notification.api.Notifier, java.lang.AutoCloseable
    public void close() throws Exception {
        this.delegate.close();
    }
}
