package com.cloudera.cmon.pipeline;

import com.cloudera.cmon.RateMonitor;
import com.cloudera.enterprise.EnterpriseService;
import com.cloudera.enterprise.EnterpriseServiceException;
import com.google.common.base.Preconditions;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/cloudera/cmon/pipeline/Pipeline.class */
public class Pipeline<T> extends EnterpriseService {
    final String name;
    final LinkedList<PipelineStage<T>> stages;
    private PipelineStage<T> lastStage;
    private RateMonitor rateMonitor;
    private final AtomicLong numEvents;

    public Pipeline(String str) {
        super(str);
        this.stages = new LinkedList<>();
        this.rateMonitor = null;
        this.numEvents = new AtomicLong(0L);
        this.name = str;
        setupRateMonitor();
    }

    public void addStage(PipelineStage<T> pipelineStage) {
        if (this.lastStage != null) {
            this.lastStage.setNextStage(pipelineStage);
        }
        this.lastStage = pipelineStage;
        this.stages.add(pipelineStage);
        addDependency(pipelineStage);
    }

    public void addStageToFront(PipelineStage<T> pipelineStage) {
        Preconditions.checkNotNull(pipelineStage);
        PipelineStage<T> first = this.stages.getFirst();
        this.stages.offerFirst(pipelineStage);
        if (first != null) {
            pipelineStage.setNextStage(first);
        }
    }

    private void setupRateMonitor() {
        this.rateMonitor = new RateMonitor("RateMonitor-" + this.name) { // from class: com.cloudera.cmon.pipeline.Pipeline.1
            @Override // com.cloudera.cmon.RateMonitor
            protected long getEventCount() {
                return Pipeline.this.numEvents.get();
            }
        };
    }

    @Override // com.cloudera.enterprise.EnterpriseService
    public void startService() throws EnterpriseServiceException {
        this.rateMonitor.start();
    }

    @Override // com.cloudera.enterprise.EnterpriseService
    public void stopService() throws EnterpriseServiceException {
        this.rateMonitor.stop();
    }

    @Override // com.cloudera.enterprise.EnterpriseService
    public void reportState(PrintWriter printWriter) {
        printWriter.println("<b>Pipeline stage summary:</b>");
        printWriter.println("<table><tr>");
        cell(printWriter, "name");
        cell(printWriter, "dropped");
        cell(printWriter, "skipped");
        cell(printWriter, "forwarded");
        cell(printWriter, "processed");
        cell(printWriter, "size");
        printWriter.println("</tr>");
        for (PipelineStage<T> pipelineStage : getStages()) {
            printWriter.println("<tr>");
            cell(printWriter, pipelineStage.getName());
            cell(printWriter, Long.valueOf(pipelineStage.getEventsDropped()));
            cell(printWriter, Long.valueOf(pipelineStage.getEventsSkipped()));
            cell(printWriter, Long.valueOf(pipelineStage.getEventsForwarded()));
            cell(printWriter, Long.valueOf(pipelineStage.getEventsProcessed()));
            cell(printWriter, Integer.valueOf(pipelineStage.getInputQueueSize()));
            printWriter.println("</tr>");
        }
        printWriter.println("</table>");
        printWriter.println("A total of " + this.numEvents.get() + " messages received.");
        printWriter.format("Current rate: %.2f msg/sec. Max rate: %.2f msg/sec.", Double.valueOf(this.rateMonitor.getRate()), Double.valueOf(this.rateMonitor.getMaxRate()));
    }

    private void cell(PrintWriter printWriter, Object obj) {
        printWriter.append("<td>");
        printWriter.append((CharSequence) obj.toString());
        printWriter.append("</td>");
    }

    public List<PipelineStage<T>> getStages() {
        return Collections.unmodifiableList(this.stages);
    }

    public boolean receiveEvent(T t) {
        Preconditions.checkState(this.stages.size() > 0, "Pipeline not initialised");
        this.numEvents.incrementAndGet();
        return this.stages.get(0).enqueue(t);
    }

    public void receiveEventNoThreading(T t) throws ItemRejectedException {
        Preconditions.checkState(this.stages.size() > 0, "Pipeline not initialised");
        this.stages.get(0).manualProcess(t);
    }

    public void stopAndFlush() throws ItemRejectedException {
        int i;
        Preconditions.checkState(this.stages.size() > 0, "Pipeline not initialised");
        Iterator<PipelineStage<T>> it = this.stages.iterator();
        while (it.hasNext()) {
            it.next().stopService();
        }
        do {
            i = 0;
            Iterator<PipelineStage<T>> it2 = this.stages.iterator();
            while (it2.hasNext()) {
                i += it2.next().manualProcess();
            }
        } while (i > 0);
    }
}
