package com.cloudera.nav.spark;

import com.cloudera.nav.core.model.EntityType;
import com.cloudera.nav.core.model.Relation;
import com.cloudera.nav.core.model.SourceType;
import com.cloudera.nav.core.model.relations.DataFlowRelation;
import com.cloudera.nav.hdfs.extractor.HdfsIdGenerator;
import com.cloudera.nav.mapreduce.FileBasedLineageExtractor;
import com.cloudera.nav.mapreduce.MRReporter;
import com.cloudera.nav.mapreduce.SupportLevel;
import com.cloudera.nav.mapreduce.model.Job;
import com.cloudera.nav.mapreduce.model.JobExecution;
import com.cloudera.nav.mapreduce.yarn.App;
import com.cloudera.nav.mapreduce.yarn.InOutEntities;
import com.cloudera.nav.mapreduce.yarn.YarnApplicationExtractor;
import com.cloudera.nav.mapreduce.yarn.YarnExtractorContext;
import com.cloudera.nav.mapreduce.yarn.YarnLogClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/nav/spark/SparkYarnApplicationExtractor.class */
public class SparkYarnApplicationExtractor implements YarnApplicationExtractor {
    private static Logger LOG = LoggerFactory.getLogger(SparkYarnApplicationExtractor.class);
    private static final SourceType appSourceType = SourceType.SPARK;
    private final SparkJobHistoryClient sparkClient;
    private final YarnExtractorContext context;
    private final Long sourceId;
    private final String extractorRunId;
    private final YarnLogClient logClient;
    private final FileBasedLineageExtractor columnLeveLineageExtractor;

    public SparkYarnApplicationExtractor(YarnExtractorContext yarnExtractorContext, SparkJobHistoryClient sparkJobHistoryClient, YarnLogClient yarnLogClient, FileBasedLineageExtractor fileBasedLineageExtractor, Long l, String str) {
        this.context = yarnExtractorContext;
        this.sparkClient = sparkJobHistoryClient;
        this.logClient = yarnLogClient;
        this.columnLeveLineageExtractor = fileBasedLineageExtractor;
        this.sourceId = l;
        this.extractorRunId = str;
    }

    public SupportLevel isSupported(App app) {
        return (app == null || !StringUtils.equalsIgnoreCase(SourceType.SPARK.name(), app.getApplicationType())) ? SupportLevel.SKIP_EXTRACTION : SupportLevel.SUPPORTED;
    }

    public Collection<Relation> extract(App app, MRReporter mRReporter, Job job, JobExecution jobExecution) {
        InOutEntities inOutDeps;
        ArrayList newArrayList = Lists.newArrayList();
        try {
            SparkJobContext sparkJobContext = getSparkJobContext(app);
            job.setSourceType(SourceType.SPARK);
            jobExecution.setSourceType(SourceType.SPARK);
            job.setSourceId(this.sourceId);
            jobExecution.setSourceId(this.sourceId);
            job.setExtractorRunId(this.extractorRunId);
            jobExecution.setExtractorRunId(this.extractorRunId);
            Set<String> set = null;
            Set<String> set2 = null;
            if (sparkJobContext != null) {
                Set<String> inputs = sparkJobContext.getInputs();
                Set<String> outputs = sparkJobContext.getOutputs();
                if (CollectionUtils.isNotEmpty(inputs)) {
                    set = inputs;
                }
                if (CollectionUtils.isNotEmpty(outputs)) {
                    set2 = outputs;
                }
            }
            boolean z = true;
            boolean z2 = true;
            if ((CollectionUtils.isEmpty(set) || CollectionUtils.isEmpty(set2)) && (inOutDeps = this.logClient.getInOutDeps(app)) != null) {
                if (CollectionUtils.isEmpty(set) && CollectionUtils.isNotEmpty(inOutDeps.getInputs())) {
                    set = inOutDeps.getInputs();
                    z = false;
                }
                if (CollectionUtils.isEmpty(set2) && CollectionUtils.isNotEmpty(inOutDeps.getOutputs())) {
                    set2 = inOutDeps.getOutputs();
                    z2 = false;
                }
            }
            HashSet newHashSet = Sets.newHashSet();
            Set<String> filterDatasetPaths = filterDatasetPaths(set, newHashSet);
            HashSet newHashSet2 = Sets.newHashSet();
            Set<String> filterDatasetPaths2 = filterDatasetPaths(set2, newHashSet2);
            jobExecution.setInputs(newHashSet);
            jobExecution.setOutputs(newHashSet2);
            newArrayList.addAll(extractDataFlow(jobExecution.getId(), filterDatasetPaths, filterDatasetPaths2, z, z2));
            try {
                newArrayList.addAll(extractColumnLevelLineage(jobExecution, sparkJobContext == null ? null : sparkJobContext.getCustomLineage()));
                mRReporter.incrementJobSuccessfulCount(this.context.getServiceName() + "-spark");
            } catch (RuntimeException e) {
                mRReporter.incrementJobFailedCount(this.context.getServiceName() + "-spark");
                LOG.warn("Unexpected error while extracting column level lineage for Yarn application " + app.getId(), e);
            }
            return newArrayList;
        } catch (JobNotFoundException e2) {
            return newArrayList;
        }
    }

    private Set<String> filterDatasetPaths(Set<String> set, Set<String> set2) {
        HashSet newHashSet = Sets.newHashSet();
        if (set != null) {
            for (String str : set) {
                try {
                    newHashSet.add(HdfsIdGenerator.generateHDFSPathRef(str));
                    set2.add(str);
                } catch (IllegalArgumentException e) {
                    LOG.warn(String.format("%s was not a valid HDFS path", str), e);
                }
            }
        }
        return newHashSet;
    }

    @VisibleForTesting
    Long getSourceId() {
        return this.sourceId;
    }

    @VisibleForTesting
    String getExtractorRunId() {
        return this.extractorRunId;
    }

    @VisibleForTesting
    SparkJobHistoryClient getSparkClient() {
        return this.sparkClient;
    }

    @VisibleForTesting
    YarnLogClient getLogClient() {
        return this.logClient;
    }

    private Collection<Relation> extractDataFlow(Long l, Set<String> set, Set<String> set2, boolean z, boolean z2) {
        LinkedList newLinkedList = Lists.newLinkedList();
        if (CollectionUtils.isNotEmpty(set)) {
            newLinkedList.add(DataFlowRelation.builder().id(this.context.getSequenceGenerator().getNextRelationId()).unlinkedSourceIds(set).sourceSourceType(SourceType.HDFS).targetId(l).targetType(EntityType.OPERATION_EXECUTION).targetSourceType(appSourceType).targetSourceId(this.sourceId).isPropagatable(true).isUnlinked(true).extractorRunId(this.extractorRunId).isUserSpecified(z).build());
        }
        if (CollectionUtils.isNotEmpty(set2)) {
            newLinkedList.add(DataFlowRelation.builder().id(this.context.getSequenceGenerator().getNextRelationId()).sourceId(l).sourceType(EntityType.OPERATION_EXECUTION).sourceSourceType(appSourceType).targetSourceType(SourceType.HDFS).sourceSourceId(this.sourceId).unlinkedTargetIds(set2).isPropagatable(true).isUnlinked(true).extractorRunId(this.extractorRunId).isUserSpecified(z2).build());
        }
        return newLinkedList;
    }

    private Collection<Relation> extractColumnLevelLineage(JobExecution jobExecution, String str) {
        Collection extractColumnLevelLineage;
        Collection inputs = jobExecution.getInputs();
        Collection outputs = jobExecution.getOutputs();
        LinkedList newLinkedList = Lists.newLinkedList();
        if (StringUtils.isNotEmpty(str) && CollectionUtils.isNotEmpty(inputs) && CollectionUtils.isNotEmpty(outputs) && (extractColumnLevelLineage = this.columnLeveLineageExtractor.extractColumnLevelLineage(this.context, jobExecution, str, (String[]) inputs.toArray(new String[inputs.size()]), (String[]) outputs.toArray(new String[outputs.size()]), appSourceType)) != null) {
            Iterator it = extractColumnLevelLineage.iterator();
            while (it.hasNext()) {
                newLinkedList.add((Relation) it.next());
            }
        }
        return newLinkedList;
    }

    private SparkJobContext getSparkJobContext(App app) {
        return this.sparkClient.getSparkJobEnvironment(app.getId());
    }
}
