package com.hortonworks.smm.kafka.alerts.scheduler;

import com.hortonworks.smm.kafka.alerts.bundle.PolicyComponentBundleRegistry;
import com.hortonworks.smm.kafka.alerts.dto.AlertPolicy;
import com.hortonworks.smm.kafka.alerts.policy.PolicyExecutable;
import com.hortonworks.smm.kafka.alerts.policy.PolicyExecutor;
import com.hortonworks.smm.kafka.alerts.processor.AlertListener;
import com.hortonworks.smm.kafka.alerts.processor.AlertProcessor;
import com.hortonworks.smm.kafka.alerts.processor.DefaultAlertProcessor;
import com.hortonworks.smm.kafka.alerts.publish.AlertNotificationReceiverTask;
import com.hortonworks.smm.kafka.alerts.publish.AlertPublisher;
import com.hortonworks.smm.kafka.alerts.publish.message.AlertMessageComposer;
import com.hortonworks.smm.kafka.common.config.AlertConfig;
import java.util.HashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/alerts/scheduler/AlertScheduler.class */
public class AlertScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(AlertScheduler.class);
    private static final Long DEFAULT_SCHEDULER_SHUTDOWN_TIMEOUT_MS = 180000L;
    private final ScheduledThreadPoolExecutor threadPool;
    private final HashMap<Long, AlertPolicyExecutionRunner> alertPolicyIdToAlertPolicyRunner = new HashMap<>();
    private final PolicyComponentBundleRegistry policyComponentBundleRegistry;
    private final AlertProcessor alertProcessor;

    @Inject
    public AlertScheduler(AlertConfig alertConfig, PolicyComponentBundleRegistry policyComponentBundleRegistry, AlertPublisher alertPublisher, AlertNotificationReceiverTask alertNotificationReceiverTask, DefaultAlertProcessor defaultAlertProcessor) {
        this.threadPool = new ScheduledThreadPoolExecutor(alertConfig.executorThreadCount().intValue());
        this.threadPool.setRemoveOnCancelPolicy(true);
        this.policyComponentBundleRegistry = policyComponentBundleRegistry;
        this.alertProcessor = defaultAlertProcessor;
        this.alertProcessor.addAlertListener(alertPublisher);
        this.alertProcessor.addAlertListener(alertNotificationReceiverTask);
    }

    public synchronized void registerOrUpdate(AlertPolicy alertPolicy, PolicyExecutable policyExecutable) {
        PolicyExecutor policyExecutor = this.policyComponentBundleRegistry.policyComponentBundle(alertPolicy.version()).policyExecutor();
        AlertMessageComposer alertMessageComposer = this.policyComponentBundleRegistry.policyComponentBundle(alertPolicy.version()).alertMessageComposer();
        if (this.alertPolicyIdToAlertPolicyRunner.containsKey(alertPolicy.id())) {
            this.alertPolicyIdToAlertPolicyRunner.get(alertPolicy.id()).updatePolicy(alertPolicy, policyExecutable, policyExecutor, alertMessageComposer);
            LOG.debug("Successfully updated alert policy with id : {}", alertPolicy.id());
        } else {
            AlertPolicyExecutionRunner alertPolicyExecutionRunner = new AlertPolicyExecutionRunner(alertPolicy, policyExecutable, policyExecutor, alertMessageComposer, this.alertProcessor, this.threadPool);
            this.alertPolicyIdToAlertPolicyRunner.put(alertPolicy.id(), alertPolicyExecutionRunner);
            alertPolicyExecutionRunner.start();
            LOG.debug("Successfully registered alert policy with id : {}", alertPolicy.id());
        }
    }

    public synchronized void unregister(Long l) {
        if (!this.alertPolicyIdToAlertPolicyRunner.containsKey(l)) {
            LOG.debug("Alert policy id : {} has not been registered to unregister", l);
            return;
        }
        this.alertPolicyIdToAlertPolicyRunner.get(l).cancel();
        this.alertPolicyIdToAlertPolicyRunner.remove(l);
        LOG.debug("Successfully unregistered alert policy with id : {}", l);
    }

    public void addAlertListener(AlertListener alertListener) {
        this.alertProcessor.addAlertListener(alertListener);
    }

    public void close() throws Exception {
        try {
            this.threadPool.shutdown();
            if (!this.threadPool.awaitTermination(DEFAULT_SCHEDULER_SHUTDOWN_TIMEOUT_MS.longValue(), TimeUnit.MILLISECONDS)) {
                this.threadPool.shutdownNow();
            }
        } finally {
            this.alertProcessor.close();
        }
    }
}
