package com.cloudera.nav.lineage.export;

import com.cloudera.nav.lineage.api.LineageNode;
import com.cloudera.nav.server.NavOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/nav/lineage/export/ExportTaskRunner.class */
public class ExportTaskRunner implements Runnable {
    static final Collection<LineageNode> END_OF_DATA = Collections.emptyList();
    private static Logger LOG = LoggerFactory.getLogger(ExportTaskRunner.class);
    private final ExecutorService executorService = Executors.newFixedThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lineage-export-task-runner-%d").build());
    private final BlockingQueue<Collection<LineageNode>> sharedQueue;
    private final ExportRequest request;
    private final LineageWriter writer;
    private final LineageCalculator calculator;
    private volatile Throwable subtaskError;

    public ExportTaskRunner(ExportRequest exportRequest, LineageWriter lineageWriter, LineageCalculator lineageCalculator, NavOptions navOptions) {
        this.request = exportRequest;
        this.writer = lineageWriter;
        this.calculator = lineageCalculator;
        this.sharedQueue = createSharedQueue(navOptions.getLineageExportMaxQueueSize());
    }

    private BlockingQueue<Collection<LineageNode>> createSharedQueue(int i) {
        return Queues.newArrayBlockingQueue(i);
    }

    @VisibleForTesting
    BlockingQueue<Collection<LineageNode>> getSharedQueue() {
        return this.sharedQueue;
    }

    @Override // java.lang.Runnable
    public void run() {
        Runnable wrapRunner = wrapRunner(createCalculatorRunner());
        Future<?> submit = this.executorService.submit(wrapRunner(createSerializerRunner()));
        this.executorService.submit(wrapRunner);
        Throwable th = null;
        try {
            try {
                submit.get();
                if (this.subtaskError != null) {
                    th = this.subtaskError;
                }
                IOUtils.closeQuietly(this.calculator);
                IOUtils.closeQuietly(this.writer);
                sendError(th);
            } catch (InterruptedException e) {
                th = new RuntimeException("Lineage export aborted");
                if (this.subtaskError != null) {
                    th = this.subtaskError;
                }
                IOUtils.closeQuietly(this.calculator);
                IOUtils.closeQuietly(this.writer);
                sendError(th);
            } catch (ExecutionException e2) {
                Throwable th2 = e2;
                if (this.subtaskError != null) {
                    th2 = this.subtaskError;
                }
                IOUtils.closeQuietly(this.calculator);
                IOUtils.closeQuietly(this.writer);
                sendError(th2);
            }
        } catch (Throwable th3) {
            if (this.subtaskError != null) {
                th = this.subtaskError;
            }
            IOUtils.closeQuietly(this.calculator);
            IOUtils.closeQuietly(this.writer);
            sendError(th);
            throw th3;
        }
    }

    private void sendError(Throwable th) {
        if (th != null) {
            this.writer.writeError(th);
        }
    }

    private Runnable wrapRunner(final Runnable runnable) {
        return new Runnable() { // from class: com.cloudera.nav.lineage.export.ExportTaskRunner.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    runnable.run();
                } catch (Exception e) {
                    ExportTaskRunner.this.subtaskError = e;
                    ExportTaskRunner.this.executorService.shutdownNow();
                }
            }
        };
    }

    @VisibleForTesting
    Runnable createCalculatorRunner() {
        return new Runnable() { // from class: com.cloudera.nav.lineage.export.ExportTaskRunner.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    for (Collection<LineageNode> collection : ExportTaskRunner.this.calculator.calculateLineage(ExportTaskRunner.this.request)) {
                        if (!collection.isEmpty()) {
                            ExportTaskRunner.this.getSharedQueue().put(collection);
                        }
                    }
                    ExportTaskRunner.this.getSharedQueue().put(ExportTaskRunner.END_OF_DATA);
                } catch (InterruptedException e) {
                    ExportTaskRunner.LOG.debug("Lineage calculator interrupted", e);
                }
            }
        };
    }

    @VisibleForTesting
    Runnable createSerializerRunner() {
        return new Runnable() { // from class: com.cloudera.nav.lineage.export.ExportTaskRunner.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ExportTaskRunner.this.writer.begin();
                    while (true) {
                        Collection<LineageNode> take = ExportTaskRunner.this.getSharedQueue().take();
                        if (take == ExportTaskRunner.END_OF_DATA) {
                            ExportTaskRunner.this.writer.end();
                            return;
                        }
                        ExportTaskRunner.this.writer.writeLineage(take);
                    }
                } catch (InterruptedException e) {
                    ExportTaskRunner.LOG.debug("Serializer interrupted", e);
                }
            }
        };
    }
}
