package com.cloudera.nav.cdx.extractor;

import com.cloudera.cdx.client.CdxImporter;
import com.cloudera.cdx.extractor.model.Entity;
import com.cloudera.cdx.extractor.model.MetadataElement;
import com.cloudera.cdx.extractor.model.graph.LineageGraph;
import com.cloudera.cdx.extractor.model.graph.SparkLineageGraph;
import com.cloudera.nav.core.model.Source;
import com.cloudera.nav.core.model.SourceType;
import com.cloudera.nav.extract.AbstractServiceTask;
import com.cloudera.nav.extract.UtilityIdGenerator;
import com.cloudera.nav.hdfs.extractor.HdfsIdGenerator;
import com.cloudera.nav.hive.extractor.HiveIdGenerator;
import com.cloudera.nav.hive.hivequeryextractor.CDXHiveQueryExtractorContext;
import com.cloudera.nav.hive.hivequeryextractor.HiveQueryIdGenerator;
import com.cloudera.nav.hive.hivequeryextractor.extractor.CDXHiveQueryExtractor;
import com.cloudera.nav.idgenerator.SequenceGenerator;
import com.cloudera.nav.impala.ImpalaExtractorContext;
import com.cloudera.nav.impala.ImpalaIdGenerator;
import com.cloudera.nav.impala.extractor.ImpalaQueryExtractor;
import com.cloudera.nav.persist.CDXTransaction;
import com.cloudera.nav.persist.CDXTransactionFactory;
import com.cloudera.nav.persist.SourceManager;
import com.cloudera.nav.persist.Transaction;
import com.cloudera.nav.pushextractor.PushExtractorDao;
import com.cloudera.nav.pushextractor.spark.CDXSparkExtractor;
import com.cloudera.nav.pushextractor.spark.CDXSparkLineageGraph;
import com.cloudera.nav.pushextractor.spark.CDXSparkPushExtractorContext;
import com.cloudera.nav.server.NavOptions;
import com.cloudera.nav.utils.NavUtils;
import com.cloudera.nav.utils.SourceNotFoundException;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/nav/cdx/extractor/CDXQueryRunnable.class */
public class CDXQueryRunnable extends AbstractServiceTask {
    private static final Logger LOG = LoggerFactory.getLogger(CDXQueryRunnable.class);
    private final CdxImporter cdxImporter;
    private final Map<Source, String> sourceToExtractorRunId;
    private final CDXTransactionFactory tf;
    private final NavOptions options;
    private final SourceManager sourceManager;
    private final SequenceGenerator sequenceGenerator;
    private final ImpalaIdGenerator impalaIdGenerator;
    private final HiveIdGenerator hiveIdGenerator;
    private final HiveQueryIdGenerator hiveQueryIdGenerator;
    private final HdfsIdGenerator hdfsIdGenerator;

    public CDXQueryRunnable(String str, CdxImporter cdxImporter, CDXTransactionFactory cDXTransactionFactory, NavOptions navOptions, SourceManager sourceManager, SequenceGenerator sequenceGenerator, HiveIdGenerator hiveIdGenerator, HiveQueryIdGenerator hiveQueryIdGenerator, Map<Source, String> map) {
        this(str, cdxImporter, cDXTransactionFactory, navOptions, sourceManager, sequenceGenerator, null, hiveIdGenerator, hiveQueryIdGenerator, null, map);
    }

    public CDXQueryRunnable(String str, CdxImporter cdxImporter, CDXTransactionFactory cDXTransactionFactory, NavOptions navOptions, SourceManager sourceManager, SequenceGenerator sequenceGenerator, ImpalaIdGenerator impalaIdGenerator, HiveQueryIdGenerator hiveQueryIdGenerator, Map<Source, String> map) {
        this(str, cdxImporter, cDXTransactionFactory, navOptions, sourceManager, sequenceGenerator, impalaIdGenerator, null, hiveQueryIdGenerator, null, map);
    }

    public CDXQueryRunnable(String str, CdxImporter cdxImporter, CDXTransactionFactory cDXTransactionFactory, NavOptions navOptions, SourceManager sourceManager, SequenceGenerator sequenceGenerator, HiveIdGenerator hiveIdGenerator, HdfsIdGenerator hdfsIdGenerator, Map<Source, String> map) {
        this(str, cdxImporter, cDXTransactionFactory, navOptions, sourceManager, sequenceGenerator, null, hiveIdGenerator, null, hdfsIdGenerator, map);
    }

    public CDXQueryRunnable(String str, CdxImporter cdxImporter, CDXTransactionFactory cDXTransactionFactory, NavOptions navOptions, SourceManager sourceManager, SequenceGenerator sequenceGenerator, ImpalaIdGenerator impalaIdGenerator, HiveIdGenerator hiveIdGenerator, HiveQueryIdGenerator hiveQueryIdGenerator, HdfsIdGenerator hdfsIdGenerator, Map<Source, String> map) {
        super(str, str);
        this.cdxImporter = cdxImporter;
        this.sourceToExtractorRunId = map;
        this.tf = cDXTransactionFactory;
        this.options = navOptions;
        this.sourceManager = sourceManager;
        this.sequenceGenerator = sequenceGenerator;
        this.impalaIdGenerator = impalaIdGenerator;
        this.hiveIdGenerator = hiveIdGenerator;
        this.hiveQueryIdGenerator = hiveQueryIdGenerator;
        this.hdfsIdGenerator = hdfsIdGenerator;
    }

    private int pollSize() {
        return this.options.getCDXOptions().getMaxPollSize();
    }

    private int pollTimeout() {
        return this.options.getCDXOptions().getMaxPollTimeout();
    }

    private String getExtractorRunId(Source source) {
        String nextExtractorRunId;
        if (this.sourceToExtractorRunId.containsKey(source)) {
            nextExtractorRunId = this.sourceToExtractorRunId.get(source);
        } else {
            nextExtractorRunId = UtilityIdGenerator.getNextExtractorRunId(source);
            this.sourceToExtractorRunId.put(source, nextExtractorRunId);
        }
        return nextExtractorRunId;
    }

    private <T extends MetadataElement> Source getSource(T t) {
        Source transientSourceByIdentity = this.sourceManager.getTransientSourceByIdentity(t.getSourceId());
        if (transientSourceByIdentity == null) {
            throw new SourceNotFoundException(t.getSourceId());
        }
        return transientSourceByIdentity;
    }

    private Optional<Source> getHMSSource(Source source) {
        Optional of = source.getSourceType() == SourceType.HIVE ? Optional.of(source) : Optional.fromNullable(Iterables.getOnlyElement(this.sourceManager.getSiblingSourcesByType(source, SourceType.HIVE), (Object) null));
        if (!of.isPresent()) {
            throw new SourceNotFoundException("HIVE source");
        }
        Optional<Source> template = this.sourceManager.getTemplate((Source) of.get());
        if (template.isPresent()) {
            return template;
        }
        throw new SourceNotFoundException("HMS source");
    }

    public final void run() {
        try {
            try {
                CDXTransaction createTransaction = this.tf.createTransaction();
                PushExtractorDao pushExtractorDao = new PushExtractorDao(createTransaction);
                createTransaction.begin();
                int i = 0;
                while (true) {
                    int i2 = i;
                    i++;
                    if (i2 >= pollSize()) {
                        break;
                    }
                    Optional poll = this.cdxImporter.poll(pollTimeout());
                    if (!poll.isPresent()) {
                        break;
                    }
                    LineageGraph lineageGraph = (Entity) poll.get();
                    if (lineageGraph instanceof LineageGraph) {
                        LineageGraph lineageGraph2 = lineageGraph;
                        Source source = getSource(lineageGraph2);
                        String extractorRunId = getExtractorRunId(source);
                        Optional<Source> hMSSource = getHMSSource(source);
                        if (source.getSourceType() == SourceType.IMPALA) {
                            new ImpalaQueryExtractor(new ImpalaExtractorContext(this.impalaIdGenerator, this.hiveIdGenerator, this.sequenceGenerator, source, (Source) hMSSource.get(), this.options, extractorRunId), pushExtractorDao, lineageGraph2).extract();
                        } else {
                            new CDXHiveQueryExtractor(new CDXHiveQueryExtractorContext(this.hiveQueryIdGenerator, this.hiveIdGenerator, this.sequenceGenerator, source, (Source) hMSSource.get(), this.options, extractorRunId, this.sourceManager), pushExtractorDao, lineageGraph2, extractorRunId).extract();
                        }
                    } else if (lineageGraph instanceof SparkLineageGraph) {
                        SparkLineageGraph sparkLineageGraph = (SparkLineageGraph) lineageGraph;
                        Source source2 = getSource(sparkLineageGraph);
                        String extractorRunId2 = getExtractorRunId(source2);
                        Optional<Source> hMSSource2 = getHMSSource(source2);
                        Optional fromNullable = Optional.fromNullable(Iterables.getOnlyElement(this.sourceManager.getSiblingSourcesByType(source2, SourceType.HDFS), (Object) null));
                        if (!fromNullable.isPresent()) {
                            throw new SourceNotFoundException("HDFS source");
                        }
                        if (source2.getSourceType() == SourceType.SPARK) {
                            new CDXSparkExtractor(new CDXSparkPushExtractorContext(this.hiveIdGenerator, this.hdfsIdGenerator, this.sequenceGenerator, this.sourceManager, source2, this.options, extractorRunId2, source2.getClusterName(), ((Source) hMSSource2.get()).getIdentity(), ((Source) fromNullable.get()).getIdentity())).extract(new CDXSparkLineageGraph(sparkLineageGraph), pushExtractorDao);
                        }
                    } else {
                        LOG.error("Unknown entity type returned by cdx importer for source polling. Skipping: " + lineageGraph.getClass().getName());
                    }
                }
                createTransaction.commit();
                IOUtils.closeQuietly(createTransaction);
            } catch (Exception e) {
                LOG.error("Error extracting CDX queries.", e);
                NavUtils.rollbackQuietly((Transaction) null);
                IOUtils.closeQuietly((Closeable) null);
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly((Closeable) null);
            throw th;
        }
    }
}
