package com.cloudera.nav.s3.extractor;

import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.cloudera.nav.core.model.Entity;
import com.cloudera.nav.persist.ElementManager;
import com.cloudera.nav.persist.RelationManager;
import com.cloudera.nav.persist.Transaction;
import com.cloudera.nav.s3.S3ExtractorContext;
import com.cloudera.nav.s3.S3IdGenerator;
import com.cloudera.nav.s3.model.S3EventNotification;
import com.cloudera.nav.s3.model.S3Object;
import com.cloudera.nav.s3.model.S3ObjectChangeState;
import com.cloudera.nav.s3.model.S3ObjectTransformResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/nav/s3/extractor/S3IncrementalExtractor.class */
public class S3IncrementalExtractor extends S3Extractor {
    private static final Logger LOG = LoggerFactory.getLogger(S3IncrementalExtractor.class);
    private final int maxReceiveCount;
    private final NavSqsApiClient sqsClient;
    private final SqsQueue queue;
    private final String region;
    private final S3ObjectChangeStateTransformer transformer;
    private final S3ObjectChangeStateHandler handler;
    private final S3EventNotificationFactory eventFactory;
    private Map<String, S3BucketState> bucketStates;
    private Set<String> bucketExtractionRunningSet;

    /* JADX INFO: Access modifiers changed from: package-private */
    public S3IncrementalExtractor(S3ExtractorContext s3ExtractorContext, ElementManager elementManager, RelationManager relationManager, NavSqsApiClient navSqsApiClient, SqsQueue sqsQueue, String str, Map<String, S3BucketState> map, S3ObjectChangeStateTransformer s3ObjectChangeStateTransformer, S3ObjectChangeStateHandler s3ObjectChangeStateHandler, S3EventNotificationFactory s3EventNotificationFactory) {
        super(s3ExtractorContext, new Transaction(elementManager, relationManager));
        this.sqsClient = navSqsApiClient;
        this.queue = sqsQueue;
        this.region = str;
        this.bucketStates = map;
        this.transformer = s3ObjectChangeStateTransformer;
        this.handler = s3ObjectChangeStateHandler;
        this.eventFactory = s3EventNotificationFactory;
        this.maxReceiveCount = s3ExtractorContext.getOptions().getS3Options().getSqsMaxReceiveCount();
    }

    public boolean extract() throws Exception {
        boolean z;
        LOG.info("Extraction begin for queue {}", this.queue.queueUrl);
        try {
            this.numObjects = 0L;
            this.extractionStartTime = Instant.now();
            this.transaction.begin();
            this.bucketExtractionRunningSet = generateBucketExtractionRunningSet();
            updateExtractionEndTime();
            do {
                List<Message> pollMessages = pollMessages();
                z = pollMessages.size() >= this.context.getOptions().getS3Options().getS3IncrementalExtractorBatchSize();
                deleteMessages(pollMessages, processObjectStates(combineChangeMessage(filterMessages(pollMessages))));
                if (this.context.getOptions().getDevOptions().doPerformanceProfiling()) {
                    LOG.debug("Incremental extraction profiling: Processed {} messages from {}. Extracted {} objects in {} ms.", new Object[]{Integer.valueOf(pollMessages.size()), this.queue.queueUrl, Long.valueOf(this.numObjects), Long.valueOf(System.currentTimeMillis() - this.extractionStartTime.getMillis())});
                }
            } while (z);
            this.transaction.softCommit();
            updateIncrementalExtractionFinishedStates();
            LOG.info("Extraction finished for queue {}. Extracted {} objects in {} ms.", new Object[]{this.queue.queueUrl, Long.valueOf(this.numObjects), Long.valueOf(System.currentTimeMillis() - this.extractionStartTime.getMillis())});
            this.transaction.close();
            return true;
        } catch (Throwable th) {
            LOG.info("Extraction finished for queue {}. Extracted {} objects in {} ms.", new Object[]{this.queue.queueUrl, Long.valueOf(this.numObjects), Long.valueOf(System.currentTimeMillis() - this.extractionStartTime.getMillis())});
            this.transaction.close();
            throw th;
        }
    }

    List<Message> pollMessages() {
        ReceiveMessageRequest withMaxNumberOfMessages = new ReceiveMessageRequest(this.queue.queueUrl).withAttributeNames(new String[]{"ApproximateReceiveCount"}).withMaxNumberOfMessages(10);
        LinkedList newLinkedList = Lists.newLinkedList();
        do {
            List messages = this.sqsClient.receiveMessage(withMaxNumberOfMessages).getMessages();
            newLinkedList.addAll(messages);
            if (messages.isEmpty()) {
                break;
            }
        } while (newLinkedList.size() < this.context.getOptions().getS3Options().getS3IncrementalExtractorBatchSize());
        return newLinkedList;
    }

    List<S3EventNotification> filterMessages(List<Message> list) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            try {
                if (isOverMaxReceiveCount(next)) {
                    LOG.trace("Message is over the max receive count of {}. The message is skipped and tracked for deletion. {}", Integer.valueOf(this.maxReceiveCount), next.getBody());
                } else {
                    S3EventNotification createNotification = this.eventFactory.createNotification(next.getMessageId(), next.getBody());
                    if (isSupportedEventType(createNotification)) {
                        String bucketName = createNotification.getBucketName();
                        if (isChangeEventBeforeBucketExtraction(bucketName, createNotification.getEventTime())) {
                            LOG.trace("Message happened before bucket extraction and is skipped and tracked for deletion. {}", next.getBody());
                        } else if (isBucketExtractionRunning(bucketName)) {
                            LOG.trace("Message skipped because bucket extraction is running for this object. {}", next.getBody());
                            it.remove();
                        } else {
                            LOG.trace("Applying message: {}", next.getBody());
                            newArrayList.add(createNotification);
                        }
                    } else {
                        LOG.trace("Message event type is not supported. The message is skipped and tracked for deletion. {}", next.getBody());
                    }
                }
            } catch (Exception e) {
                LOG.trace("Failed to parse message. Message is skipped and tracked for deletion. {}", next.getBody());
            }
        }
        return newArrayList;
    }

    Map<String, S3ObjectChangeState> combineChangeMessage(List<S3EventNotification> list) {
        Long valueOf;
        HashMap newHashMap = Maps.newHashMap();
        for (S3EventNotification s3EventNotification : list) {
            String generateObjectIdentity = S3IdGenerator.generateObjectIdentity(this.context.getSource(), s3EventNotification.getBucketName(), s3EventNotification.getObjectKey());
            S3ObjectChangeState s3ObjectChangeState = (S3ObjectChangeState) newHashMap.get(generateObjectIdentity);
            if (s3ObjectChangeState == null) {
                String str = null;
                boolean z = false;
                Boolean bool = false;
                Optional<S3Object> fetchObjectByIdentity = this.context.getDao().fetchObjectByIdentity(generateObjectIdentity, this.transaction.getEm());
                if (fetchObjectByIdentity.isPresent()) {
                    str = ((S3Object) fetchObjectByIdentity.get()).getSequencer();
                    valueOf = ((S3Object) fetchObjectByIdentity.get()).getId();
                    bool = ((S3Object) fetchObjectByIdentity.get()).isDeleted();
                    z = ((S3Object) fetchObjectByIdentity.get()).isImplicit();
                } else {
                    valueOf = Long.valueOf(this.context.getSequenceGenerator().getNextElementId());
                }
                s3ObjectChangeState = new S3ObjectChangeState(valueOf, s3EventNotification.getObjectKey(), s3EventNotification.getBucketName(), s3EventNotification.awsRegion, str, null, null, !fetchObjectByIdentity.isPresent(), z, bool);
                newHashMap.put(generateObjectIdentity, s3ObjectChangeState);
            }
            newHashMap.put(generateObjectIdentity, this.handler.handle(s3ObjectChangeState, s3EventNotification));
        }
        return newHashMap;
    }

    Set<String> processObjectStates(Map<String, S3ObjectChangeState> map) {
        HashSet newHashSet = Sets.newHashSet();
        for (S3ObjectChangeState s3ObjectChangeState : map.values()) {
            if (!isChangeStateUpToDate(s3ObjectChangeState)) {
                S3ObjectTransformResult transform = this.transformer.transform(s3ObjectChangeState);
                if (transform.getSkipMessageId().isPresent()) {
                    newHashSet.add(transform.getSkipMessageId().get());
                }
                if (transform.getS3Object().isPresent() && this.context.getFilters().accept((Entity) transform.getS3Object().get(), this.context.getSource())) {
                    if (s3ObjectChangeState.isNewObject()) {
                        persistObject((S3Object) transform.getS3Object().get(), false);
                        Optional<Long> optional = this.context.getBucketIdCache().get(s3ObjectChangeState.getBucketName());
                        Preconditions.checkState(optional.isPresent());
                        persistParentChildRelation((S3Object) transform.getS3Object().get(), (Long) optional.get());
                    } else {
                        persistObject((S3Object) transform.getS3Object().get(), false);
                    }
                }
            }
        }
        return newHashSet;
    }

    private void deleteMessages(List<Message> list, Set<String> set) {
        HashSet newHashSet = Sets.newHashSet();
        for (Message message : list) {
            if (!set.contains(message.getMessageId())) {
                newHashSet.add(new DeleteMessageBatchRequestEntry().withId(message.getMessageId()).withReceiptHandle(message.getReceiptHandle()));
                if (newHashSet.size() == 10) {
                    this.sqsClient.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(this.queue.queueUrl).withEntries(newHashSet));
                    newHashSet = Sets.newHashSet();
                }
            }
        }
        if (newHashSet.isEmpty()) {
            return;
        }
        this.sqsClient.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(this.queue.queueUrl).withEntries(newHashSet));
    }

    private void updateIncrementalExtractionFinishedStates() {
        for (S3BucketState s3BucketState : this.bucketStates.values()) {
            if (!isBucketExtractionRunning(s3BucketState.bucketName) && this.region.equals(s3BucketState.region)) {
                s3BucketState.lastSuccessfulExtractionTime = this.extractionEndTime;
            }
        }
    }

    private boolean isBucketExtractionRunning(String str) {
        return this.bucketExtractionRunningSet.contains(str);
    }

    private boolean isChangeEventBeforeBucketExtraction(String str, Instant instant) {
        Instant instant2 = this.bucketStates.get(str).bucketExtractionStartTime;
        Preconditions.checkNotNull(instant2, "BulkExtractionTime cannot be null. Malformeds3 states file could cause this.");
        return instant.isBefore(instant2);
    }

    private Set<String> generateBucketExtractionRunningSet() {
        HashSet newHashSet = Sets.newHashSet();
        for (S3BucketState s3BucketState : this.bucketStates.values()) {
            if (s3BucketState.isBucketExtractionRunning && this.region.equals(s3BucketState.region)) {
                newHashSet.add(s3BucketState.bucketName);
            }
        }
        LOG.debug("Current bucketExtractionRunningSet: {}", newHashSet);
        return newHashSet;
    }

    @VisibleForTesting
    void setBucketExtractionRunningSet(Set<String> set) {
        this.bucketExtractionRunningSet = set;
    }

    private boolean isSupportedEventType(S3EventNotification s3EventNotification) {
        return !Strings.isNullOrEmpty(s3EventNotification.getEventName()) && (s3EventNotification.getEventName().startsWith("ObjectCreated") || s3EventNotification.getEventName().startsWith("ObjectRemoved"));
    }

    private boolean isChangeStateUpToDate(S3ObjectChangeState s3ObjectChangeState) {
        return s3ObjectChangeState.getCreateRecord() == null && s3ObjectChangeState.getDeleteRecord() == null;
    }

    private boolean isOverMaxReceiveCount(Message message) {
        String str = (String) message.getAttributes().get("ApproximateReceiveCount");
        return !Strings.isNullOrEmpty(str) && Integer.parseInt(str) >= this.maxReceiveCount;
    }
}
