package com.cloudera.cdx.extractor.pushextractor;

import com.cloudera.cdx.client.CdxExporter;
import com.cloudera.cdx.extractor.model.ServiceType;
import com.cloudera.cdx.extractor.model.SparkLineageGraphShim;
import com.cloudera.cdx.extractor.model.graph.SparkInput;
import com.cloudera.cdx.extractor.model.graph.SparkLineageGraph;
import com.cloudera.cdx.extractor.model.graph.SparkOutput;
import com.cloudera.cdx.extractor.util.TableWriter;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.Collection;
import java.util.Iterator;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cdx/extractor/pushextractor/SparkPushExtractor.class */
public class SparkPushExtractor extends AbstractPushExtractor<SparkLineageGraphShim, SparkLineageGraph> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkPushExtractor.class);
    private final QueryPushExtractorContext context;

    public SparkPushExtractor(QueryPushExtractorContext queryPushExtractorContext, CdxExporter<SparkLineageGraph> cdxExporter, TableWriter tableWriter) {
        super(queryPushExtractorContext, cdxExporter, tableWriter);
        this.context = queryPushExtractorContext;
    }

    @Override // com.cloudera.cdx.extractor.pushextractor.PushExtractor
    public boolean extract(Collection<SparkLineageGraphShim> collection) {
        long j = this.maxFinishTime;
        Iterator<SparkLineageGraphShim> it = collection.iterator();
        while (it.hasNext()) {
            SparkLineageGraph sparkLineageGraph = it.next().getSparkLineageGraph();
            processGraph(sparkLineageGraph);
            publish(sparkLineageGraph);
            writeTables(sparkLineageGraph);
            j = Math.max(j, sparkLineageGraph.getStartTimestamp().getMillis() + sparkLineageGraph.getDuration());
        }
        this.tableWriter.rollover();
        updateTimes(j, Instant.now().getMillis());
        return true;
    }

    private void writeTables(SparkLineageGraph sparkLineageGraph) {
        for (SparkInput sparkInput : sparkLineageGraph.getInputs()) {
            if (sparkInput.getDataSourceType().equals(ServiceType.HIVE.name())) {
                String[] split = sparkInput.getResource().split("\\.");
                this.tableWriter.addTable(split[0], split[1]);
            }
        }
        for (SparkOutput sparkOutput : sparkLineageGraph.getOutputs()) {
            if (sparkOutput.getDataSourceType().equals(ServiceType.HIVE.name())) {
                String[] split2 = sparkOutput.getResource().split("\\.");
                this.tableWriter.addTable(split2[0], split2[1]);
            }
        }
    }

    private void publish(SparkLineageGraph sparkLineageGraph) {
        try {
            LOG.debug("Sending {}", this.context.getObjectMapper().writeValueAsString(sparkLineageGraph));
        } catch (JsonProcessingException e) {
            LOG.error("Error Sending {}", sparkLineageGraph.toString());
        }
        this.cdxExporter.send(sparkLineageGraph);
    }

    private void processGraph(SparkLineageGraph sparkLineageGraph) {
        QueryIdGenerator queryIdGenerator = this.context.getQueryIdGenerator();
        String cdxId = this.context.getService().getCdxId();
        String generateSparkIds = queryIdGenerator.generateSparkIds(sparkLineageGraph.getApplicationId(), cdxId);
        sparkLineageGraph.setCdxId(generateSparkIds);
        sparkLineageGraph.setOperationId(generateSparkIds);
        sparkLineageGraph.setOperationExecutionId(queryIdGenerator.generateSparkIds(sparkLineageGraph.getYarnApplicationId() + "-Exec", cdxId));
        sparkLineageGraph.setSourceId(cdxId);
        sparkLineageGraph.setExtractionTime(Instant.now());
    }
}
