package com.cloudera.nav.s3.extractor;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.cloudera.nav.extract.ExtractorStateStore;
import com.cloudera.nav.persist.ElementManager;
import com.cloudera.nav.persist.ElementManagerFactory;
import com.cloudera.nav.persist.RelationManagerFactory;
import com.cloudera.nav.persist.Transaction;
import com.cloudera.nav.s3.ApiLimitReachedException;
import com.cloudera.nav.s3.AwsRegionUtils;
import com.cloudera.nav.s3.S3ExtractorContext;
import com.cloudera.nav.s3.S3FiltersUtils;
import com.cloudera.nav.s3.S3Utils;
import com.cloudera.nav.s3.model.CustomQueue;
import com.cloudera.nav.s3.model.S3Bucket;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/nav/s3/extractor/S3ExtractorRunnable.class */
class S3ExtractorRunnable implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(S3ExtractorRunnable.class);
    private final int maxThreads;
    private final ElementManagerFactory emf;
    private final RelationManagerFactory rmf;
    private final S3ExtractorContext context;
    private final ExtractorStateStore stateStore;
    private AwsApiClientFactory apiClientFactory = null;
    private final String jsonStateFileName = S3Utils.getS3StateJsonFileName();

    /* JADX INFO: Access modifiers changed from: package-private */
    public S3ExtractorRunnable(ElementManagerFactory elementManagerFactory, RelationManagerFactory relationManagerFactory, ExtractorStateStore extractorStateStore, S3ExtractorContext s3ExtractorContext) {
        this.emf = elementManagerFactory;
        this.rmf = relationManagerFactory;
        this.context = s3ExtractorContext;
        this.maxThreads = s3ExtractorContext.getOptions().getS3Options().getS3ExtractorMaxThreads();
        this.stateStore = extractorStateStore;
    }

    @Override // java.lang.Runnable
    public void run() {
        ExecutorService createExtractorExecutor = createExtractorExecutor();
        try {
            try {
                CompletionService<Boolean> createCompletionService = createCompletionService(createExtractorExecutor);
                S3ExtractorState loadState = loadState();
                AtomicLong atomicLong = new AtomicLong(loadState.getApiCount());
                this.apiClientFactory = createApiClientFactory(atomicLong);
                NavS3ApiClient orCreateS3Client = this.apiClientFactory.getOrCreateS3Client();
                Collection<Bucket> filterBuckets = S3FiltersUtils.filterBuckets(orCreateS3Client.listBuckets(), this.context);
                LOG.debug("Current Buckets are " + filterBuckets.toString());
                markDeletedBucketsInSolr(filterBuckets, loadState);
                if (this.context.getOptions().getS3Options().isS3IncrementalExtractorEnabled()) {
                    if (this.context.getOptions().getS3Options().isAutoSetupEnabled()) {
                        createQueueIfNeeded(filterBuckets, loadState, orCreateS3Client);
                    } else {
                        handleCustomQueues(loadState);
                    }
                }
                List<Callable<Boolean>> tasks = createTaskFactory(loadState, orCreateS3Client).getTasks(filterBuckets);
                Iterator<Callable<Boolean>> it = tasks.iterator();
                while (it.hasNext()) {
                    createCompletionService.submit(it.next());
                }
                for (int i = 0; i < tasks.size(); i++) {
                    createCompletionService.take().get();
                }
                commitTransaction();
                saveState(atomicLong, loadState);
                if (createExtractorExecutor != null) {
                    createExtractorExecutor.shutdown();
                }
            } catch (ApiLimitReachedException e) {
                LOG.error("Api limit reached.", e);
                Throwables.propagate(e);
                if (createExtractorExecutor != null) {
                    createExtractorExecutor.shutdown();
                }
            } catch (Exception e2) {
                LOG.error("Error happened during s3 extraction.", e2);
                Throwables.propagate(e2);
                if (createExtractorExecutor != null) {
                    createExtractorExecutor.shutdown();
                }
            }
        } catch (Throwable th) {
            if (createExtractorExecutor != null) {
                createExtractorExecutor.shutdown();
            }
            throw th;
        }
    }

    @VisibleForTesting
    ExecutorService createExtractorExecutor() {
        return Executors.newFixedThreadPool(this.maxThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("S3ExtractorRunnable-%d").build());
    }

    @VisibleForTesting
    CompletionService<Boolean> createCompletionService(ExecutorService executorService) {
        return new ExecutorCompletionService(executorService);
    }

    @VisibleForTesting
    S3ExtractorTaskFactory createTaskFactory(S3ExtractorState s3ExtractorState, NavS3ApiClient navS3ApiClient) {
        return new S3ExtractorTaskFactory(this.context, s3ExtractorState, this.emf, this.rmf, this.apiClientFactory, navS3ApiClient);
    }

    @VisibleForTesting
    AwsApiClientFactory createApiClientFactory(AtomicLong atomicLong) {
        return new AwsApiClientFactory(new BasicAWSCredentials(this.context.getOptions().getS3Options().getAwsAccessKey(), this.context.getOptions().getS3Options().getAwsSecretKey()), atomicLong, this.context.getOptions().getS3Options());
    }

    @VisibleForTesting
    void markDeletedBucketsInSolr(Collection<Bucket> collection, S3ExtractorState s3ExtractorState) {
        ElementManager createElementManager = this.emf.createElementManager();
        Throwable th = null;
        try {
            createElementManager.begin(true);
            Set<S3Bucket> fetchDeletedBuckets = this.context.getDao().fetchDeletedBuckets(collection, this.context.getSource(), createElementManager);
            this.context.getDao().markBucketsDeletedRecursively(fetchDeletedBuckets, createElementManager);
            s3ExtractorState.removeBucketStates(fetchDeletedBuckets);
            createElementManager.softCommit();
            if (createElementManager != null) {
                if (0 == 0) {
                    createElementManager.close();
                    return;
                }
                try {
                    createElementManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createElementManager != null) {
                if (0 != 0) {
                    try {
                        createElementManager.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createElementManager.close();
                }
            }
            throw th3;
        }
    }

    void commitTransaction() {
        Transaction transaction = new Transaction(this.emf.createElementManager(), this.rmf.createRelationManager());
        Throwable th = null;
        try {
            transaction.begin();
            transaction.commit(true);
            if (transaction != null) {
                if (0 == 0) {
                    transaction.close();
                    return;
                }
                try {
                    transaction.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (transaction != null) {
                if (0 != 0) {
                    try {
                        transaction.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    transaction.close();
                }
            }
            throw th3;
        }
    }

    private void createQueueIfNeeded(Collection<Bucket> collection, S3ExtractorState s3ExtractorState, NavS3ApiClient navS3ApiClient) {
        for (String str : getRegionsWithoutQueue(collection, s3ExtractorState, navS3ApiClient)) {
            s3ExtractorState.addIncrementalState(str, new SqsManager(this.apiClientFactory.getOrCreateSqsClient(str), this.context).findOrCreateQueue());
        }
    }

    @VisibleForTesting
    void handleCustomQueues(S3ExtractorState s3ExtractorState) {
        String customQueuesJson = this.context.getOptions().getS3Options().getCustomQueuesJson();
        try {
            List list = (List) this.context.getMapper().readValue(customQueuesJson, new TypeReference<List<CustomQueue>>() { // from class: com.cloudera.nav.s3.extractor.S3ExtractorRunnable.1
            });
            if (isQueueStateSame(list, s3ExtractorState)) {
                return;
            }
            updateQueueState(list, s3ExtractorState);
        } catch (IOException e) {
            LOG.error("Invalid custom queues json: {}", customQueuesJson);
            Throwables.propagate(e);
        }
    }

    private void updateQueueState(Collection<CustomQueue> collection, S3ExtractorState s3ExtractorState) {
        s3ExtractorState.resetIncrementalExtractorState();
        for (CustomQueue customQueue : collection) {
            Optional<SqsQueue> queueByQueueUrl = new SqsManager(createSqsClient(customQueue.region), this.context).getQueueByQueueUrl(customQueue.queueUrl);
            if (!queueByQueueUrl.isPresent()) {
                LOG.error("Unable to find queue: {}", customQueue.queueUrl);
                throw new QueueDoesNotExistException(customQueue.queueUrl);
            }
            s3ExtractorState.addIncrementalState(customQueue.region, (SqsQueue) queueByQueueUrl.get());
            s3ExtractorState.setBucketStatesStaleByRegion(customQueue.region);
        }
    }

    @VisibleForTesting
    NavSqsApiClient createSqsClient(String str) {
        return this.apiClientFactory.getOrCreateSqsClient(str);
    }

    @VisibleForTesting
    Set<String> getRegionsWithoutQueue(Collection<Bucket> collection, S3ExtractorState s3ExtractorState, NavS3ApiClient navS3ApiClient) {
        LinkedHashSet newLinkedHashSet = Sets.newLinkedHashSet();
        Iterator<Bucket> it = collection.iterator();
        while (it.hasNext()) {
            String bucketRegion = getBucketRegion(it.next().getName(), s3ExtractorState, navS3ApiClient);
            if (!s3ExtractorState.doesQueueExistInRegion(bucketRegion)) {
                newLinkedHashSet.add(bucketRegion);
            }
        }
        return newLinkedHashSet;
    }

    @VisibleForTesting
    String getBucketRegion(String str, S3ExtractorState s3ExtractorState, NavS3ApiClient navS3ApiClient) {
        return s3ExtractorState.doesBucketExist(str) ? s3ExtractorState.getBucketStateByName(str).region : AwsRegionUtils.getBucketRegionViaS3Api(str, navS3ApiClient);
    }

    @VisibleForTesting
    S3ExtractorState loadState() {
        S3ExtractorState s3ExtractorState = (S3ExtractorState) this.stateStore.load(S3ExtractorState.class, new String[]{this.jsonStateFileName});
        if (s3ExtractorState == null) {
            return new S3ExtractorState();
        }
        s3ExtractorState.resetCurrentExtractionRunStates();
        s3ExtractorState.resetApiCountIfNeeded();
        return s3ExtractorState;
    }

    private void saveState(AtomicLong atomicLong, S3ExtractorState s3ExtractorState) {
        s3ExtractorState.setApiCount(atomicLong.get());
        this.stateStore.save(s3ExtractorState, new String[]{this.jsonStateFileName});
    }

    @VisibleForTesting
    boolean isQueueStateSame(Collection<CustomQueue> collection, S3ExtractorState s3ExtractorState) {
        HashSet newHashSet = Sets.newHashSet();
        for (CustomQueue customQueue : collection) {
            if (!s3ExtractorState.containsQueue(customQueue.queueUrl, customQueue.region)) {
                return false;
            }
            newHashSet.add(customQueue.region);
        }
        return Sets.difference(s3ExtractorState.getAllQueueRegions(), newHashSet).isEmpty();
    }

    @VisibleForTesting
    void setDeletedBuckets(S3ExtractorState s3ExtractorState) {
        Collection<String> successfulBucketExtractionBuckets = s3ExtractorState.getSuccessfulBucketExtractionBuckets();
        ElementManager createElementManager = this.emf.createElementManager();
        Throwable th = null;
        try {
            try {
                this.context.getDao().markDeletedObjects(successfulBucketExtractionBuckets, this.context.getExtractorRunId(), createElementManager);
                if (createElementManager != null) {
                    if (0 == 0) {
                        createElementManager.close();
                        return;
                    }
                    try {
                        createElementManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createElementManager != null) {
                if (th != null) {
                    try {
                        createElementManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createElementManager.close();
                }
            }
            throw th4;
        }
    }
}
