package com.cloudera.nav.lineage.export;

import com.cloudera.nav.lineage.api.LineageNode;
import com.cloudera.nav.persistence.relational.dao.LineageExportCacheDAO;
import com.cloudera.nav.server.NavOptions;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

/* loaded from: input_file:com/cloudera/nav/lineage/export/StreamingLineageWriter.class */
public class StreamingLineageWriter implements LineageWriter {
    private final OutputStream stream;
    private JsonGenerator jg;
    private final int batchSize;
    private final LineageExportCacheDAO cache;
    private final ObjectMapper objectMapper;
    private Collection<LineageNode> batch;

    public StreamingLineageWriter(OutputStream outputStream, NavOptions navOptions, LineageExportCacheDAO lineageExportCacheDAO, ObjectMapper objectMapper) {
        Preconditions.checkNotNull(outputStream);
        Preconditions.checkNotNull(lineageExportCacheDAO);
        this.stream = outputStream;
        this.objectMapper = objectMapper;
        this.batchSize = navOptions.getLineageExportWriterBatchSize();
        Preconditions.checkState(this.batchSize > 0);
        this.cache = lineageExportCacheDAO;
        resetBatch();
    }

    @Override // com.cloudera.nav.lineage.export.LineageWriter
    public void begin() {
        this.cache.clearEntityIds();
        try {
            this.jg = this.objectMapper.getFactory().createJsonGenerator(this.stream);
            this.jg.writeStartArray();
        } catch (IOException e) {
            Throwables.propagate(e);
        }
    }

    @Override // com.cloudera.nav.lineage.export.LineageWriter
    public void end() {
        flush();
        try {
            this.jg.writeEndArray();
            this.jg.flush();
        } catch (IOException e) {
            Throwables.propagate(e);
        }
    }

    @Override // com.cloudera.nav.lineage.export.LineageWriter
    public void writeError(Throwable th) {
        writeErrorMessage("\n" + th.toString() + "\n");
        try {
            this.stream.write(Joiner.on("\n").join(th.getStackTrace()).getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            Throwables.propagate(e);
        }
    }

    @Override // com.cloudera.nav.lineage.export.LineageWriter
    public void writeErrorMessage(String str) {
        try {
            this.stream.write(str.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            Throwables.propagate(e);
        }
    }

    private void resetBatch() {
        this.batch = Lists.newArrayListWithExpectedSize(this.batchSize);
    }

    @Override // com.cloudera.nav.lineage.export.LineageWriter
    public void writeLineage(Collection<LineageNode> collection) {
        this.batch.addAll(collection);
        if (this.batch.size() >= this.batchSize) {
            flush();
        }
    }

    private void flush() {
        Preconditions.checkState(this.jg != null, "begin() must be called before writing.");
        HashSet newHashSetWithExpectedSize = Sets.newHashSetWithExpectedSize(this.batch.size());
        Iterator<LineageNode> it = this.batch.iterator();
        while (it.hasNext()) {
            newHashSetWithExpectedSize.add(it.next().getId());
        }
        Set addEntityIds = this.cache.addEntityIds(newHashSetWithExpectedSize);
        try {
            for (LineageNode lineageNode : this.batch) {
                if (!addEntityIds.contains(lineageNode.getId())) {
                    this.jg.writeObject(lineageNode);
                }
            }
        } catch (IOException e) {
            Throwables.propagate(e);
        }
        resetBatch();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.cache.clearEntityIds();
        this.jg.close();
    }
}
