package com.cloudera.nav.oozie.extractor;

import com.cloudera.cdx.extractor.model.Entity;
import com.cloudera.cdx.extractor.model.OozieWorkflowAction;
import com.cloudera.cdx.extractor.model.OozieWorkflowJob;
import com.cloudera.nav.core.model.Source;
import com.cloudera.nav.extract.UtilityIdGenerator;
import com.cloudera.nav.oozie.CDXOozieExtractorContext;
import com.cloudera.nav.persist.CDXTransaction;
import com.cloudera.nav.persist.CDXTransactionFactory;
import com.cloudera.nav.utils.NavUtils;
import com.cloudera.nav.utils.SourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/nav/oozie/extractor/CDXOozieExtractor.class */
public class CDXOozieExtractor implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(CDXOozieExtractor.class);
    private static final Set<String> SUPPORTED_ACTION_TYPES = ImmutableSet.of("map-reduce", "pig", "hive", "sqoop", "hive2");
    private final CDXOozieExtractorContext context;
    private final OozieExtractorReporter reporter;
    private final OozieCounters counters;
    private final String counterName;
    private final CDXTransactionFactory tf;
    private CDXOozieExtractorDao extractorDao;

    public CDXOozieExtractor(CDXOozieExtractorContext cDXOozieExtractorContext, String str) {
        this.context = cDXOozieExtractorContext;
        this.counters = cDXOozieExtractorContext.getCounters();
        this.reporter = cDXOozieExtractorContext.getReporter();
        this.counterName = str;
        this.tf = new CDXTransactionFactory(cDXOozieExtractorContext.getEmf(), cDXOozieExtractorContext.getRmf(), cDXOozieExtractorContext.getCDXImporter());
    }

    @Override // java.lang.Runnable
    public void run() {
        CDXTransaction cDXTransaction = null;
        this.reporter.markStart();
        try {
            try {
                cDXTransaction = this.tf.createTransaction();
                cDXTransaction.begin();
                this.extractorDao = new CDXOozieExtractorDao(this.context, cDXTransaction);
                extractWorkflows();
                cDXTransaction.commit();
                this.reporter.markEnd(true);
                IOUtils.closeQuietly(cDXTransaction);
                this.counters.updateCounters(this.counterName, this.reporter);
            } catch (Exception e) {
                LOG.error("Error extracting Oozie workflows.", e);
                NavUtils.rollbackQuietly(cDXTransaction);
                this.reporter.markEnd(false);
                IOUtils.closeQuietly(cDXTransaction);
                this.counters.updateCounters(this.counterName, this.reporter);
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly(cDXTransaction);
            this.counters.updateCounters(this.counterName, this.reporter);
            throw th;
        }
    }

    @VisibleForTesting
    public int pollSize() {
        return this.context.getOptions().getCDXOptions().getMaxPollSize();
    }

    @VisibleForTesting
    public int pollTimeout() {
        return this.context.getOptions().getCDXOptions().getMaxPollTimeout();
    }

    @VisibleForTesting
    public boolean isOozieExtractionEnabled() {
        return this.context.getOptions().getCDXOptions().isOozieEnabled();
    }

    @VisibleForTesting
    public String getExtractionRunId(Source source) {
        String nextExtractorRunId;
        Map<Source, String> sourceToExtractorRunId = this.context.getSourceToExtractorRunId();
        if (sourceToExtractorRunId.containsKey(source)) {
            nextExtractorRunId = sourceToExtractorRunId.get(source);
        } else {
            nextExtractorRunId = UtilityIdGenerator.getNextExtractorRunId(source);
            sourceToExtractorRunId.put(source, nextExtractorRunId);
        }
        return nextExtractorRunId;
    }

    @VisibleForTesting
    public Source getSource(OozieWorkflowJob oozieWorkflowJob) {
        String sourceId = oozieWorkflowJob.getSourceId();
        Source transientSourceByIdentity = this.context.getSourceManager().getTransientSourceByIdentity(sourceId);
        if (transientSourceByIdentity == null) {
            throw new SourceNotFoundException(sourceId);
        }
        return transientSourceByIdentity;
    }

    @VisibleForTesting
    void extractWorkflows() throws IOException {
        int i = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= pollSize()) {
                return;
            }
            Optional poll = this.context.getCDXImporter().poll(pollTimeout());
            if (!poll.isPresent()) {
                return;
            }
            if (isOozieExtractionEnabled()) {
                Entity entity = (Entity) poll.get();
                if (!(entity instanceof OozieWorkflowJob)) {
                    LOG.info("Unknown entity type returned by cdx importer for oozie polling. Skipping: " + entity.getClass().getName());
                }
                OozieWorkflowJob oozieWorkflowJob = (OozieWorkflowJob) entity;
                LOG.trace("Got message " + oozieWorkflowJob.getJobId());
                String workflowXml = oozieWorkflowJob.getWorkflowXml();
                Source source = getSource(oozieWorkflowJob);
                processWorkflow(oozieWorkflowJob, workflowXml, source, getExtractionRunId(source));
            }
        }
    }

    @VisibleForTesting
    public void processWorkflow(OozieWorkflowJob oozieWorkflowJob, String str, Source source, String str2) {
        if (!hasSupportedAction(oozieWorkflowJob)) {
            LOG.debug("skipping workflow {} because it doesn't have any action types that Navigator is familiar with.", oozieWorkflowJob.getJobId());
        } else {
            LOG.debug("Extracting Oozie workflow {}", oozieWorkflowJob.getJobId());
            this.extractorDao.extractWorkflow(oozieWorkflowJob, str, source, str2, this.context.getReporter());
        }
    }

    @VisibleForTesting
    boolean hasSupportedAction(OozieWorkflowJob oozieWorkflowJob) {
        for (OozieWorkflowAction oozieWorkflowAction : oozieWorkflowJob.getActions()) {
            if (SUPPORTED_ACTION_TYPES.contains(oozieWorkflowAction.getType()) && oozieWorkflowAction.getStatus() == OozieWorkflowAction.Status.OK) {
                return true;
            }
        }
        return false;
    }
}
