package com.cloudera.cmon.pipeline;

import com.cloudera.cmf.license.License;
import com.cloudera.cmon.RateMonitor;
import com.cloudera.enterprise.EnterpriseService;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.PrintWriter;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cmon/pipeline/PipelineStage.class */
public class PipelineStage<T> extends EnterpriseService implements PipelineStageMXBean {
    protected static final Logger LOG = LoggerFactory.getLogger(PipelineStage.class);
    protected final ThreadPoolExecutor executor;
    protected PipelineStage<T> nextStage;
    protected String stageName;
    ItemReceiver<T> receiver;
    private final AtomicLong eventsProcessed;
    private final AtomicLong eventsForwarded;
    private final AtomicLong eventsSkipped;
    private final AtomicLong eventsDropped;
    private final RateMonitor rateMonitor;
    private final int queueCapacity;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.cloudera.cmon.pipeline.PipelineStage$3, reason: invalid class name */
    /* loaded from: input_file:com/cloudera/cmon/pipeline/PipelineStage$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$com$cloudera$cmon$pipeline$PipelineStage$ItemReceiver$OverflowPolicy = new int[ItemReceiver.OverflowPolicy.values().length];

        static {
            try {
                $SwitchMap$com$cloudera$cmon$pipeline$PipelineStage$ItemReceiver$OverflowPolicy[ItemReceiver.OverflowPolicy.SKIP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$cloudera$cmon$pipeline$PipelineStage$ItemReceiver$OverflowPolicy[ItemReceiver.OverflowPolicy.QUEUE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:com/cloudera/cmon/pipeline/PipelineStage$ItemReceiver.class */
    public static abstract class ItemReceiver<T> {

        /* loaded from: input_file:com/cloudera/cmon/pipeline/PipelineStage$ItemReceiver$OverflowPolicy.class */
        public enum OverflowPolicy {
            SKIP,
            QUEUE
        }

        public abstract T receiveItem(T t) throws ItemRejectedException;

        protected OverflowPolicy overflowPolicy(T t) {
            return OverflowPolicy.SKIP;
        }

        public void reportState(PrintWriter printWriter) {
        }

        public Instant getTimeOfRecentItem() {
            return null;
        }

        public void stop() {
        }
    }

    public PipelineStage(String str, int i) {
        this(str, 1, 1, i);
    }

    public PipelineStage(String str, int i, int i2, int i3) {
        super("Pipeline Stage " + str);
        this.nextStage = null;
        this.receiver = null;
        this.eventsProcessed = new AtomicLong(0L);
        this.eventsForwarded = new AtomicLong(0L);
        this.eventsSkipped = new AtomicLong(0L);
        this.eventsDropped = new AtomicLong(0L);
        this.stageName = str;
        this.queueCapacity = i3;
        this.executor = new ThreadPoolExecutor(i, i2, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue(i3), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(str + "-%d").build());
        this.rateMonitor = new RateMonitor("RateMonitor-" + str) { // from class: com.cloudera.cmon.pipeline.PipelineStage.1
            @Override // com.cloudera.cmon.RateMonitor
            protected long getEventCount() {
                return PipelineStage.this.getEventsProcessed();
            }
        };
    }

    public String toString() {
        return "Stage " + this.stageName;
    }

    public void setNextStage(PipelineStage<T> pipelineStage) {
        this.nextStage = pipelineStage;
    }

    public boolean enqueue(final T t) {
        Runnable runnable = new Runnable() { // from class: com.cloudera.cmon.pipeline.PipelineStage.2
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.lang.Runnable
            public void run() {
                PipelineStage.this.driver(t);
            }

            public String toString() {
                return "Queued item: " + t.toString();
            }
        };
        if (this.executor.isShutdown()) {
            LOG.warn(this.stageName + " stage has shutdown. Item needs manual process.");
            this.executor.getQueue().add(runnable);
            return true;
        }
        if (getInputQueueSize() > this.queueCapacity / 2) {
            runnable = handleCongestion(t, runnable);
            if (runnable == null) {
                return false;
            }
        }
        try {
            this.executor.execute(runnable);
            return true;
        } catch (RejectedExecutionException e) {
            this.eventsDropped.incrementAndGet();
            return false;
        }
    }

    protected Runnable handleCongestion(T t, Runnable runnable) {
        if (this.executor.isShutdown()) {
            LOG.error(this.executor + " is not running");
            return null;
        }
        ItemReceiver.OverflowPolicy overflowPolicy = this.receiver.overflowPolicy(t);
        switch (AnonymousClass3.$SwitchMap$com$cloudera$cmon$pipeline$PipelineStage$ItemReceiver$OverflowPolicy[overflowPolicy.ordinal()]) {
            case 1:
                incrementEventsSkipped();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Skipping event");
                }
                if (this.nextStage == null) {
                    return null;
                }
                incrementEventsForwarded();
                this.nextStage.enqueue(t);
                return null;
            case License.VERSION_TWO /* 2 */:
                return runnable;
            default:
                LOG.error("Unknown overflow policy: " + overflowPolicy);
                return null;
        }
    }

    public void setItemReceiver(ItemReceiver<T> itemReceiver) {
        this.receiver = itemReceiver;
    }

    @Override // com.cloudera.cmon.pipeline.PipelineStageMXBean
    public String getStageName() {
        return this.stageName;
    }

    @Override // com.cloudera.cmon.pipeline.PipelineStageMXBean
    public long getEventsProcessed() {
        return this.eventsProcessed.get();
    }

    private void incrementEventsProcessed() {
        this.eventsProcessed.incrementAndGet();
    }

    @Override // com.cloudera.cmon.pipeline.PipelineStageMXBean
    public long getEventsForwarded() {
        return this.eventsForwarded.get();
    }

    private void incrementEventsForwarded() {
        this.eventsForwarded.incrementAndGet();
    }

    @Override // com.cloudera.cmon.pipeline.PipelineStageMXBean
    public long getEventsSkipped() {
        return this.eventsSkipped.get();
    }

    private void incrementEventsSkipped() {
        this.eventsSkipped.incrementAndGet();
    }

    @Override // com.cloudera.cmon.pipeline.PipelineStageMXBean
    public long getEventsDropped() {
        return this.eventsDropped.get();
    }

    @Override // com.cloudera.cmon.pipeline.PipelineStageMXBean
    public int getInputQueueSize() {
        return this.executor.getQueue().size();
    }

    @Override // com.cloudera.cmon.pipeline.PipelineStageMXBean
    public int getInputQueueCapacity() {
        return this.queueCapacity;
    }

    protected void driver(T t) {
        try {
            T receiveItem = this.receiver.receiveItem(t);
            incrementEventsProcessed();
            if (receiveItem != null && this.nextStage != null) {
                incrementEventsForwarded();
                this.nextStage.enqueue(receiveItem);
            }
        } catch (Exception e) {
            LOG.error(this.stageName + " stage encountered error", e);
            captureException(e);
        }
    }

    @Override // com.cloudera.enterprise.EnterpriseService
    public void startService() {
        this.rateMonitor.start();
    }

    @Override // com.cloudera.enterprise.EnterpriseService
    public void stopService() {
        this.rateMonitor.stop();
        this.receiver.stop();
        List<Runnable> shutdownNow = this.executor.shutdownNow();
        LOG.info("Stopping " + this.stageName + " stage; found " + shutdownNow.size() + " queued items");
        try {
            this.executor.awaitTermination(3L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
        if (!this.executor.isTerminated()) {
            LOG.warn("Stage " + this.stageName + " did not stop within allowed time");
        }
        this.executor.getQueue().addAll(shutdownNow);
    }

    @Override // com.cloudera.enterprise.EnterpriseService
    public void reportState(PrintWriter printWriter) {
        printWriter.println("Input queue size is " + getInputQueueSize() + " (of " + this.queueCapacity + ").<br />");
        printWriter.println(getEventsProcessed() + " events processed.<br />");
        printWriter.println(getEventsSkipped() + " events skipped.<br />");
        printWriter.println(getEventsForwarded() + " events forwarded.<br />");
        printWriter.println(getEventsDropped() + " events dropped.<br />");
        printWriter.format("Current rate: %.2f msg/sec. Max rate: %.2f msg/sec.<br/>", Double.valueOf(this.rateMonitor.getRate()), Double.valueOf(this.rateMonitor.getMaxRate()));
        printWriter.format("Thread pool size is %d (max %d).<br/>", Integer.valueOf(this.executor.getPoolSize()), Integer.valueOf(this.executor.getMaximumPoolSize()));
        reportLastException(printWriter);
        printWriter.println("First element in queue: ");
        printWriter.println(this.executor.getQueue().peek());
        printWriter.println("<div style=\"padding: 20px 20px;\">");
        this.receiver.reportState(printWriter);
        printWriter.println("</div>");
    }

    public Instant getTimeOfRecentItem() {
        return this.receiver.getTimeOfRecentItem();
    }

    public void manualProcess(T t) throws ItemRejectedException {
        T receiveItem = this.receiver.receiveItem(t);
        incrementEventsProcessed();
        if (receiveItem == null || this.nextStage == null) {
            return;
        }
        incrementEventsForwarded();
        this.nextStage.manualProcess(receiveItem);
    }

    public int manualProcess() {
        Preconditions.checkState(this.executor.isTerminated(), "Detected race against executor threads");
        Runnable poll = this.executor.getQueue().poll();
        int i = 0;
        while (poll != null) {
            poll.run();
            poll = this.executor.getQueue().poll();
            i++;
        }
        return i;
    }
}
