package com.hortonworks.smm.kafka.alerts.scheduler;

import com.hortonworks.smm.kafka.alerts.dto.AlertPolicy;
import com.hortonworks.smm.kafka.alerts.policy.PolicyExecutable;
import com.hortonworks.smm.kafka.alerts.policy.PolicyExecutionResult;
import com.hortonworks.smm.kafka.alerts.policy.PolicyExecutor;
import com.hortonworks.smm.kafka.alerts.processor.AlertProcessor;
import com.hortonworks.smm.kafka.alerts.publish.message.AlertMessageComposer;
import com.hortonworks.smm.kafka.notification.api.AlertNotificationContext;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hortonworks/smm/kafka/alerts/scheduler/AlertPolicyExecutionRunner.class */
public final class AlertPolicyExecutionRunner {
    private static final Logger LOG = LoggerFactory.getLogger(AlertPolicyExecutionRunner.class);
    private final ScheduledExecutorService scheduledExecutorService;
    private final AlertProcessor alertProcessor;

    @GuardedBy("this")
    private AlertPolicyBundle alertPolicyBundle;

    @GuardedBy("this")
    private boolean isCancelled = false;

    @GuardedBy("this")
    private long lastScheduledTime = Long.MAX_VALUE;

    @GuardedBy("this")
    private Future<?> lastScheduleFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hortonworks/smm/kafka/alerts/scheduler/AlertPolicyExecutionRunner$AlertPolicyBundle.class */
    public static final class AlertPolicyBundle {
        private final AlertPolicy alertPolicy;
        private final PolicyExecutable policyExecutable;
        private final PolicyExecutor policyExecutor;
        private final AlertMessageComposer alertMessageComposer;
        private boolean cancelled;

        private AlertPolicyBundle(AlertPolicy alertPolicy, PolicyExecutable policyExecutable, PolicyExecutor policyExecutor, AlertMessageComposer alertMessageComposer) {
            this.alertPolicy = alertPolicy;
            this.policyExecutable = policyExecutable;
            this.policyExecutor = policyExecutor;
            this.alertMessageComposer = alertMessageComposer;
            this.cancelled = false;
        }

        public String toString() {
            return "AlertPolicyBundle{alertPolicy=" + this.alertPolicy + ", policyExecutable=" + this.policyExecutable + ", policyExecutor=" + this.policyExecutor + ", alertMessageComposer=" + this.alertMessageComposer + ", cancelled=" + this.cancelled + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AlertPolicyExecutionRunner(AlertPolicy alertPolicy, PolicyExecutable policyExecutable, PolicyExecutor policyExecutor, AlertMessageComposer alertMessageComposer, AlertProcessor alertProcessor, ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = scheduledExecutorService;
        this.alertProcessor = alertProcessor;
        this.alertPolicyBundle = new AlertPolicyBundle(alertPolicy, policyExecutable, policyExecutor, alertMessageComposer);
    }

    private void executePolicy(AlertPolicyBundle alertPolicyBundle) {
        synchronized (this) {
            if (this.isCancelled) {
                LOG.info("Execution of alert policy : {} has been successfully cancelled pre-execution", alertPolicyBundle.alertPolicy.name());
                return;
            }
            AlertPolicy alertPolicy = alertPolicyBundle.alertPolicy;
            boolean z = false;
            try {
                try {
                    LOG.debug("Executing alert for alert policy : {}", alertPolicy);
                    PolicyExecutionResult executePolicy = alertPolicyBundle.policyExecutor.executePolicy(alertPolicyBundle.policyExecutable);
                    LOG.debug("Result of executing alert policy : {} is : {}", alertPolicy, executePolicy);
                    if (executePolicy.shouldTriggerAlert()) {
                        LOG.debug("Composing alert message body for alert policy : {}", alertPolicy);
                        AlertNotificationContext compose = alertPolicyBundle.alertMessageComposer.compose(alertPolicy, executePolicy);
                        LOG.debug("Publishing for alert policy: {} the following message \n{}", alertPolicy, compose);
                        this.alertProcessor.processAlert(compose);
                        z = true;
                    } else {
                        LOG.debug("No need to trigger alert policy: {}", alertPolicy);
                        this.alertProcessor.clearAlert(alertPolicy.id().longValue());
                    }
                    synchronized (this) {
                        if (!this.isCancelled && !alertPolicyBundle.cancelled) {
                            long longValue = (z ? alertPolicy.executionDelayInMillis() : alertPolicy.executionIntervalInMillis()).longValue();
                            LOG.info("Alert policy: {}, Next run is scheduled with delay of [{}] secs", alertPolicy.name(), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(longValue)));
                            scheduleWithDelay(alertPolicyBundle, longValue);
                        } else if (this.isCancelled) {
                            LOG.info("Execution of alert policy : {} has been successfully cancelled", alertPolicy.name());
                        } else {
                            LOG.debug("Execution of alert policy instance : {} has been successfully cancelled due to update", alertPolicy.name());
                        }
                    }
                } catch (Exception e) {
                    LOG.error("Error while executing the alert policy: {}", alertPolicy, e);
                    synchronized (this) {
                        if (!this.isCancelled && !alertPolicyBundle.cancelled) {
                            long longValue2 = (0 != 0 ? alertPolicy.executionDelayInMillis() : alertPolicy.executionIntervalInMillis()).longValue();
                            LOG.info("Alert policy: {}, Next run is scheduled with delay of [{}] secs", alertPolicy.name(), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(longValue2)));
                            scheduleWithDelay(alertPolicyBundle, longValue2);
                        } else if (this.isCancelled) {
                            LOG.info("Execution of alert policy : {} has been successfully cancelled", alertPolicy.name());
                        } else {
                            LOG.debug("Execution of alert policy instance : {} has been successfully cancelled due to update", alertPolicy.name());
                        }
                    }
                }
            } catch (Throwable th) {
                synchronized (this) {
                    if (!this.isCancelled && !alertPolicyBundle.cancelled) {
                        long longValue3 = (0 != 0 ? alertPolicy.executionDelayInMillis() : alertPolicy.executionIntervalInMillis()).longValue();
                        LOG.info("Alert policy: {}, Next run is scheduled with delay of [{}] secs", alertPolicy.name(), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(longValue3)));
                        scheduleWithDelay(alertPolicyBundle, longValue3);
                    } else if (this.isCancelled) {
                        LOG.info("Execution of alert policy : {} has been successfully cancelled", alertPolicy.name());
                    } else {
                        LOG.debug("Execution of alert policy instance : {} has been successfully cancelled due to update", alertPolicy.name());
                    }
                    throw th;
                }
            }
        }
    }

    public synchronized void start() {
        scheduleWithDelay(this.alertPolicyBundle, this.alertPolicyBundle.alertPolicy.executionIntervalInMillis().longValue());
    }

    public synchronized void cancel() {
        this.isCancelled = true;
        if (this.lastScheduleFuture != null) {
            this.lastScheduleFuture.cancel(false);
        }
    }

    public synchronized void updatePolicy(AlertPolicy alertPolicy, PolicyExecutable policyExecutable, PolicyExecutor policyExecutor, AlertMessageComposer alertMessageComposer) {
        this.alertPolicyBundle.cancelled = true;
        if (this.lastScheduleFuture != null) {
            if (this.lastScheduledTime > System.currentTimeMillis() + alertPolicy.executionIntervalInMillis().longValue()) {
                this.lastScheduleFuture.cancel(false);
            }
        }
        this.alertPolicyBundle = new AlertPolicyBundle(alertPolicy, policyExecutable, policyExecutor, alertMessageComposer);
        start();
    }

    @GuardedBy("this")
    private void scheduleWithDelay(AlertPolicyBundle alertPolicyBundle, long j) {
        this.lastScheduledTime = System.currentTimeMillis() + j;
        this.lastScheduleFuture = this.scheduledExecutorService.schedule(() -> {
            executePolicy(alertPolicyBundle);
        }, j, TimeUnit.MILLISECONDS);
    }

    public String toString() {
        return "AlertPolicyExecutionRunner{alertPolicyBundle=" + this.alertPolicyBundle + ", alertProcessor=" + this.alertProcessor + ", isCancelled=" + this.isCancelled + '}';
    }
}
