package com.cloudera.cdx.extractor.oozie;

import com.cloudera.cdx.extractor.AbstractExtractorState;
import com.cloudera.cdx.extractor.ServiceExtractionTask;
import com.cloudera.cdx.extractor.model.TelemetryPublisherCountersMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.AuthOozieClient;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cdx/extractor/oozie/OozieExtractor.class */
public class OozieExtractor implements ServiceExtractionTask {
    private static final Logger LOG = LoggerFactory.getLogger(OozieExtractor.class);
    private static TelemetryPublisherCountersMap tpCounters = TelemetryPublisherCountersMap.getInstance();

    @VisibleForTesting
    static final String OOZIE_CLIENT_FILTER = "";
    private final OozieExtractorContext context;
    private final String oozieClientUrl;
    private final OozieClient oozieClient;

    @VisibleForTesting
    final int pageSize;
    private OozieExtractorState state;
    private OozieExtractorDao extractorDao;
    private String leftBoundWorkflowId;
    private String firstExtractedWorkflowId;

    public OozieExtractor(OozieExtractorContext oozieExtractorContext) {
        this(oozieExtractorContext, oozieExtractorContext.getOptions().getExtractorOoziePageSize(), new AuthOozieClient(oozieExtractorContext.getOozieClientUrl() + "/oozie"), null);
    }

    @VisibleForTesting
    OozieExtractor(OozieExtractorContext oozieExtractorContext, int i, OozieClient oozieClient, OozieExtractorDao oozieExtractorDao) {
        this.context = oozieExtractorContext;
        this.pageSize = i;
        this.oozieClient = oozieClient;
        this.oozieClientUrl = oozieClient.getOozieUrl();
        this.extractorDao = oozieExtractorDao;
        Preconditions.checkArgument(i > 1, "Page size must be greater than one.");
    }

    public AbstractExtractorState run() {
        String streamName = this.context.getCdxExporter().getStreamName();
        try {
            try {
                loadState();
                this.extractorDao = new OozieExtractorDao(this.context, this.oozieClient);
                reExtractErrors();
                saveState();
                reExtractUnfinishedWorkflows();
                saveState();
                extractWorkflows();
                saveState();
                tpCounters.get(streamName).incrementIngestSuccessCount();
                OozieExtractorState oozieExtractorState = this.state;
                this.context.getCdxExporter().close(this.context.getOptions().getExporterShutdownSeconds(), TimeUnit.SECONDS);
                return oozieExtractorState;
            } catch (Exception e) {
                LOG.error("Error extracting Oozie workflows.", e);
                tpCounters.get(streamName).incrementIngestFailCount();
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.context.getCdxExporter().close(this.context.getOptions().getExporterShutdownSeconds(), TimeUnit.SECONDS);
            throw th;
        }
    }

    @VisibleForTesting
    void loadState() {
        this.state = (OozieExtractorState) this.context.getStateStore().load(OozieExtractorState.class, new String[]{this.oozieClientUrl});
        if (this.state == null) {
            this.state = new OozieExtractorState();
            this.state.setUnfinishedWorkflowIds(Sets.newHashSet());
        }
        if (this.state.getErrors() == null) {
            this.state.setErrors(Maps.newHashMap());
        }
    }

    private void saveState() {
        if (this.firstExtractedWorkflowId != null) {
            this.state.setRightBoundWorkflowId(this.firstExtractedWorkflowId);
        }
        this.context.getStateStore().save(this.state, new String[]{this.oozieClientUrl});
    }

    @VisibleForTesting
    void extractWorkflows() throws OozieClientException {
        int i = 0;
        this.firstExtractedWorkflowId = null;
        boolean z = false;
        while (!z) {
            LOG.debug("Requesting page {} of Oozie workflows", Integer.valueOf(i));
            List<WorkflowJob> pageOfWorkflows = getPageOfWorkflows(i);
            i++;
            LOG.debug("Found {} Oozie workflows", Integer.valueOf(pageOfWorkflows.size()));
            z = extractUnextractedWorkflows(pageOfWorkflows);
        }
        LOG.debug("Done extracting Oozie workflows");
    }

    private boolean extractUnextractedWorkflows(List<WorkflowJob> list) throws OozieClientException {
        if (list.isEmpty()) {
            return true;
        }
        for (int startIndex = getStartIndex(list); startIndex < list.size(); startIndex++) {
            WorkflowJob workflowJob = list.get(startIndex);
            if (workflowJob.getId().equals(this.state.getRightBoundWorkflowId())) {
                return true;
            }
            try {
                workflowJob = this.oozieClient.getJobInfo(workflowJob.getId());
                if (this.firstExtractedWorkflowId == null) {
                    this.firstExtractedWorkflowId = workflowJob.getId();
                }
                processWorkflow(workflowJob);
            } catch (Exception e) {
                LOG.warn("Exception while extracting " + workflowJob.getId(), e);
                addError(workflowJob.getId());
            }
            this.leftBoundWorkflowId = workflowJob.getId();
        }
        return list.size() < this.pageSize;
    }

    private List<WorkflowJob> getPageOfWorkflows(int i) throws OozieClientException {
        int i2 = (i * (this.pageSize - 1)) + 1;
        LOG.debug("Getting Oozie workflows {} through {} (inclusive)", Integer.valueOf(i2), Integer.valueOf((i2 + this.pageSize) - 1));
        return this.oozieClient.getJobsInfo(OOZIE_CLIENT_FILTER, i2, this.pageSize);
    }

    @VisibleForTesting
    int getStartIndex(List<WorkflowJob> list) {
        if (list.isEmpty() || this.leftBoundWorkflowId == null) {
            return 0;
        }
        for (int i = 0; i < list.size(); i++) {
            if (list.get(i).getId().equals(this.leftBoundWorkflowId)) {
                return i + 1;
            }
        }
        return list.size();
    }

    private void processWorkflow(WorkflowJob workflowJob) throws OozieClientException {
        if (!isWorkflowFinished(workflowJob)) {
            LOG.debug("adding unfinished workflow {} to be re-extracted later", workflowJob.getId());
            this.state.getUnfinishedWorkflowIds().add(workflowJob.getId());
        } else {
            LOG.debug("Extracting Oozie workflow {}", workflowJob.getId());
            this.extractorDao.extractWorkflow(workflowJob, this.context.getReporter());
            this.state.setLastFinishedTime(Math.max(this.state.getLastFinishedTime(), workflowJob.getEndTime().getTime()));
        }
    }

    private boolean isWorkflowFinished(WorkflowJob workflowJob) {
        return (WorkflowJob.Status.RUNNING.equals(workflowJob.getStatus()) || WorkflowJob.Status.PREP.equals(workflowJob.getStatus())) ? false : true;
    }

    @VisibleForTesting
    void reExtractUnfinishedWorkflows() throws OozieClientException {
        LOG.debug("Extracting previously unfinished Oozie workflows");
        UnmodifiableIterator it = ImmutableList.copyOf(this.state.getUnfinishedWorkflowIds()).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            this.state.getUnfinishedWorkflowIds().remove(str);
            extractSingleWorkflow(str);
        }
        LOG.debug("Done extracting previously unfinished Oozie workflows");
    }

    private boolean extractSingleWorkflow(String str) {
        try {
            processWorkflow(this.oozieClient.getJobInfo(str));
            return true;
        } catch (OozieClientException e) {
            if (ErrorCode.E0604.name().equals(e.getErrorCode())) {
                LOG.debug("Skipping unfinished workflow {} because it no longer exists.", str);
                return true;
            }
            addError(str);
            return false;
        } catch (RuntimeException e2) {
            addError(str);
            return false;
        }
    }

    private void addError(String str) {
        Integer num = (Integer) this.state.getErrors().get(str);
        this.state.getErrors().put(str, num == null ? 1 : Integer.valueOf(num.intValue() + 1));
    }

    @VisibleForTesting
    void reExtractErrors() throws OozieClientException {
        LOG.debug("Extracting error workflows");
        UnmodifiableIterator it = ImmutableList.copyOf(this.state.getErrors().keySet()).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            if (extractSingleWorkflow(str)) {
                this.state.getErrors().remove(str);
            }
        }
        Iterator it2 = this.state.getErrors().entrySet().iterator();
        while (it2.hasNext()) {
            Map.Entry entry = (Map.Entry) it2.next();
            int extractorOozieMaxRetryCount = this.context.getOptions().getExtractorOozieMaxRetryCount();
            if (extractorOozieMaxRetryCount >= 0 && ((Integer) entry.getValue()).intValue() > extractorOozieMaxRetryCount) {
                it2.remove();
            }
        }
    }

    @VisibleForTesting
    void setLeftBoundWorkflowId(String str) {
        this.leftBoundWorkflowId = str;
    }
}
