package com.cloudera.nav.extract;

import com.cloudera.cdx.client.CdxImporter;
import com.cloudera.cdx.extractor.model.Cluster;
import com.cloudera.cdx.extractor.model.Entity;
import com.cloudera.cdx.extractor.model.Service;
import com.cloudera.nav.core.model.ClusterInstance;
import com.cloudera.nav.core.model.ClusterTemplate;
import com.cloudera.nav.core.model.Relation;
import com.cloudera.nav.core.model.Source;
import com.cloudera.nav.core.model.SourceType;
import com.cloudera.nav.persist.CDXTransaction;
import com.cloudera.nav.persist.CDXTransactionFactory;
import com.cloudera.nav.persist.ClusterManager;
import com.cloudera.nav.persist.ElementManagerFactory;
import com.cloudera.nav.persist.RelationManagerFactory;
import com.cloudera.nav.persist.SourceManager;
import com.cloudera.nav.persist.Transaction;
import com.cloudera.nav.server.NavOptions;
import com.cloudera.nav.utils.CommonUtils;
import com.cloudera.nav.utils.ExtractorUtils;
import com.cloudera.nav.utils.NavUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/nav/extract/CDXSourcePoller.class */
public class CDXSourcePoller extends AbstractServiceTask {
    private static final Logger LOG = LoggerFactory.getLogger(CDXSourcePoller.class);
    private final NavOptions options;
    private final SourceManager srcMgr;
    private final CdxImporter cdxImporter;
    private final CDXTransactionFactory tf;
    private final ClusterManager clusterManager;
    private final String extractionRunId;
    private final Long clusterSourceId;

    public CDXSourcePoller(String str, CdxImporter cdxImporter, SourceManager sourceManager, ElementManagerFactory elementManagerFactory, RelationManagerFactory relationManagerFactory, NavOptions navOptions, ClusterManager clusterManager, Long l, String str2) {
        super(str, str);
        this.options = navOptions;
        this.cdxImporter = cdxImporter;
        this.srcMgr = sourceManager;
        this.clusterManager = clusterManager;
        this.tf = new CDXTransactionFactory(elementManagerFactory, relationManagerFactory, cdxImporter);
        this.clusterSourceId = l;
        this.extractionRunId = str2;
    }

    @VisibleForTesting
    public boolean addSource(Service service, ClusterInstance clusterInstance, Cluster cluster, Transaction transaction) {
        boolean z = false;
        Source createIfAbsentTransientSource = this.srcMgr.createIfAbsentTransientSource(service, clusterInstance.getOriginalName(), SourceType.valueOf(ExtractorUtils.sparkServiceTypeConversion(service.getType())));
        if (!this.srcMgr.isRelationPresent(clusterInstance.getId(), createIfAbsentTransientSource.getId(), Relation.RelationshipType.PARENT_CHILD, transaction.getRm())) {
            transaction.add(this.srcMgr.createSourceToClusterRelation(clusterInstance, createIfAbsentTransientSource, this.extractionRunId), false);
            z = true;
        }
        Optional createClusterToSourceTemplateRelation = this.srcMgr.createClusterToSourceTemplateRelation(service, (ClusterTemplate) this.clusterManager.fetchClusterTemplate(cluster).orNull(), this.extractionRunId, transaction.getRm());
        if (createClusterToSourceTemplateRelation.isPresent()) {
            transaction.add((Relation) createClusterToSourceTemplateRelation.get(), false);
            z = true;
        }
        return z;
    }

    private boolean isSupportedSource(String str) {
        return CommonUtils.isValidEnum(SourceType.class, ExtractorUtils.serviceTypeConversion(str));
    }

    private int pollSize() {
        return this.options.getCDXOptions().getMaxPollSize();
    }

    private int pollTimeout() {
        return this.options.getCDXOptions().getMaxPollTimeout();
    }

    public final void run() {
        CDXTransaction cDXTransaction = null;
        try {
            try {
                cDXTransaction = this.tf.createTransaction();
                cDXTransaction.begin();
                int i = 0;
                while (true) {
                    int i2 = i;
                    i++;
                    if (i2 >= pollSize()) {
                        break;
                    }
                    boolean z = false;
                    Optional poll = this.cdxImporter.poll(pollTimeout());
                    if (!poll.isPresent()) {
                        break;
                    }
                    Entity entity = (Entity) poll.get();
                    if (entity instanceof Cluster) {
                        Cluster cluster = (Cluster) entity;
                        ClusterInstance findOrCreateTransientCluster = this.clusterManager.findOrCreateTransientCluster(cluster, this.clusterSourceId, this.extractionRunId);
                        for (Service service : cluster.getServices()) {
                            if (isSupportedSource(service.getType()) && addSource(service, findOrCreateTransientCluster, cluster, cDXTransaction)) {
                                z = true;
                            }
                        }
                    } else {
                        LOG.info("Unknown entity type returned by cdx importer for source polling. Skipping: " + entity.getClass().getName());
                    }
                    if (z) {
                        cDXTransaction.commit(true);
                    } else {
                        cDXTransaction.softCommit();
                    }
                }
                IOUtils.closeQuietly(cDXTransaction);
            } catch (Exception e) {
                LOG.error("Error extracting CDX sources.", e);
                NavUtils.rollbackQuietly(cDXTransaction);
                IOUtils.closeQuietly(cDXTransaction);
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly(cDXTransaction);
            throw th;
        }
    }
}
