package com.hortonworks.smm.kafka.services.clients;

import com.hortonworks.smm.kafka.common.config.KafkaConsumerConfig;
import com.hortonworks.smm.kafka.common.logging.ThrottlingLogger;
import com.hortonworks.smm.kafka.services.clients.dtos.ConsumerGroupInfo;
import com.hortonworks.smm.kafka.services.clients.dtos.PartitionAssignment;
import com.hortonworks.smm.kafka.services.management.helper.AdminClientHelper;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.common.OffsetAndMetadata;
import kafka.coordinator.group.BaseKey;
import kafka.coordinator.group.GroupMetadata;
import kafka.coordinator.group.GroupMetadataKey;
import kafka.coordinator.group.GroupMetadataManager;
import kafka.coordinator.group.GroupSummary;
import kafka.coordinator.group.GroupTopicPartition;
import kafka.coordinator.group.MemberSummary;
import kafka.coordinator.group.OffsetKey;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

/* loaded from: input_file:com/hortonworks/smm/kafka/services/clients/ConsumerGroupFetcherTask.class */
class ConsumerGroupFetcherTask implements Runnable, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerGroupFetcherTask.class);
    private static final Logger THROTTLED_LOG = new ThrottlingLogger(LOG, Duration.ofMinutes(1));
    private final Supplier<Set<TopicPartition>> topicPartitionsSupplier;
    private KafkaConsumer<ByteBuffer, ByteBuffer> kafkaConsumer;
    private long commitOffsetRefreshMs;
    private final AdminClient adminClient;
    private final AdminClientHelper adminClientHelper;
    private final Long inactiveGroupTimeoutMs;
    private boolean refreshLEOs;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Map<String, Map<TopicPartition, OffsetWithMetadata>> commitOffsets = new ConcurrentHashMap();
    private final Map<String, ConsumerGroupInfo> consumerGroupInfos = new ConcurrentHashMap();
    private final Map<String, GroupMetadata> groups = new HashMap();
    private Map<TopicPartition, Long> logEndOffsets = new ConcurrentHashMap();
    private boolean seekToBeginning = true;
    private long cycle = Long.MIN_VALUE;
    private final Object lockObj = new Object();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hortonworks/smm/kafka/services/clients/ConsumerGroupFetcherTask$OffsetWithMetadata.class */
    public static final class OffsetWithMetadata {
        static final OffsetWithMetadata DEFAULT = new OffsetWithMetadata(0, -1);
        private long offset;
        private long commitTimestamp;

        private OffsetWithMetadata(long j, long j2) {
            this.offset = j;
            this.commitTimestamp = j2;
        }

        public String toString() {
            return "OffsetWithMetadata {offset=" + this.offset + ", commitTimestamp=" + this.commitTimestamp + '}';
        }
    }

    public ConsumerGroupFetcherTask(KafkaConsumerConfig kafkaConsumerConfig, AdminClient adminClient, AdminClientHelper adminClientHelper, long j, Supplier<Set<TopicPartition>> supplier) {
        this.topicPartitionsSupplier = supplier;
        this.inactiveGroupTimeoutMs = Long.valueOf(j);
        this.adminClient = adminClient;
        this.adminClientHelper = adminClientHelper;
        createKafkaConsumer(kafkaConsumerConfig);
    }

    private void createKafkaConsumer(KafkaConsumerConfig kafkaConsumerConfig) {
        Map config = kafkaConsumerConfig.getConfig();
        HashMap hashMap = new HashMap(config);
        hashMap.put("key.deserializer", ByteBufferDeserializer.class.getName());
        hashMap.put("value.deserializer", ByteBufferDeserializer.class.getName());
        hashMap.put("client.id", ConsumerGroupsService.CG_FETCHER_GROUP_ID_PREFIX);
        hashMap.put("group.id", ConsumerGroupsService.CG_FETCHER_GROUP_ID_PREFIX);
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("enable.auto.commit", "false");
        this.commitOffsetRefreshMs = Long.parseLong(config.getOrDefault(ConsumerGroupsService.COMMIT_OFFSETS_REFRESH_INTERVAL_MS, Integer.valueOf(ConsumerGroupsService.DEFAULT_COMMIT_OFFSETS_REFRESH_INTERVAL_MS)).toString());
        this.kafkaConsumer = new KafkaConsumer<>(hashMap);
        refreshLEOsForAllPartitions();
    }

    private void incrementCycle() {
        synchronized (this.lockObj) {
            this.cycle++;
            this.lockObj.notifyAll();
        }
    }

    public void refreshLEOAndwaitTillNextRefreshEnd() {
        synchronized (this.lockObj) {
            this.refreshLEOs = true;
            long j = this.cycle;
            while (j + 2 > this.cycle) {
                if (this.closed.get()) {
                    throw new IllegalStateException("ConsumerGroupFetcherTask is closed, therefore waiting for the next refresh is impossible!");
                }
                try {
                    this.lockObj.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("InterruptedException is thrown while trying to forceRefresh ConsumerGroupFetcherTask");
                }
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Runnable
    public void run() {
        boolean z;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            while (!this.closed.get()) {
                long currentTimeMillis2 = System.currentTimeMillis();
                try {
                    try {
                        synchronized (this.lockObj) {
                            z = this.refreshLEOs;
                            this.refreshLEOs = false;
                        }
                        if (z || currentTimeMillis2 - currentTimeMillis > this.commitOffsetRefreshMs) {
                            refreshLEOsForAllPartitions();
                            updateConsumerGroupsWithLEOs();
                            currentTimeMillis = System.currentTimeMillis();
                        }
                    } catch (Throwable th) {
                        incrementCycle();
                        throw th;
                    }
                } catch (WakeupException e) {
                    incrementCycle();
                } catch (Exception e2) {
                    THROTTLED_LOG.warn("Encountered error while fetching and build consumer groups", e2);
                    incrementCycle();
                }
                if (this.seekToBeginning) {
                    Collection<TopicPartition> consumerOffsetTopicPartitions = consumerOffsetTopicPartitions();
                    if (consumerOffsetTopicPartitions.isEmpty()) {
                        LOG.warn("Received empty partition list for consumer offsets topic. Retrying again..");
                        incrementCycle();
                    } else {
                        this.kafkaConsumer.assign(consumerOffsetTopicPartitions);
                        this.kafkaConsumer.seekToBeginning(Collections.emptyList());
                        this.seekToBeginning = false;
                        LOG.info("Seeking to the beginning of the consumer offsets topic, partition assignment : {}", this.kafkaConsumer.assignment());
                    }
                } else {
                    ConsumerRecords poll = this.kafkaConsumer.poll(Duration.ofMillis(5000L));
                    HashSet hashSet = new HashSet();
                    HashSet hashSet2 = new HashSet();
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        try {
                            BaseKey readMessageKey = GroupMetadataManager.readMessageKey((ByteBuffer) consumerRecord.key());
                            ByteBuffer byteBuffer = (ByteBuffer) consumerRecord.value();
                            if (readMessageKey instanceof GroupMetadataKey) {
                                handleGroupMetadataKey(hashSet, hashSet2, (GroupMetadataKey) readMessageKey, byteBuffer);
                            } else if (readMessageKey instanceof OffsetKey) {
                                handleOffsetKey((OffsetKey) readMessageKey, byteBuffer);
                            }
                        } catch (Exception e3) {
                            LOG.error("Error while extracting ConsumerGroupId! consumerRecord: {}", consumerRecord, e3);
                        }
                    }
                    hashSet2.forEach(str -> {
                        this.commitOffsets.remove(str);
                        this.consumerGroupInfos.remove(str);
                    });
                    if (!hashSet.isEmpty()) {
                        this.consumerGroupInfos.putAll(buildConsumerGroups(hashSet));
                    }
                    if (!hashSet.isEmpty() || !hashSet2.isEmpty()) {
                        LOG.debug("## received consumer group updates added: [{}], removed: [{}], total: [{}] took time [{}] ms", new Object[]{hashSet, hashSet2, Integer.valueOf(this.consumerGroupInfos.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2)});
                    }
                }
                incrementCycle();
            }
            Utils.closeQuietly(this.kafkaConsumer, "kafka consumer for fetching consumer-groups");
            synchronized (this.lockObj) {
                this.refreshLEOs = false;
                this.lockObj.notifyAll();
            }
        } catch (Throwable th2) {
            Utils.closeQuietly(this.kafkaConsumer, "kafka consumer for fetching consumer-groups");
            synchronized (this.lockObj) {
                this.refreshLEOs = false;
                this.lockObj.notifyAll();
                throw th2;
            }
        }
    }

    private Collection<TopicPartition> consumerOffsetTopicPartitions() {
        return (Collection) this.kafkaConsumer.partitionsFor("__consumer_offsets").stream().map(partitionInfo -> {
            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
        }).collect(Collectors.toList());
    }

    private void updateConsumerGroupsWithLEOs() {
        try {
            ((Collection) this.adminClientHelper.resultFromFuture(this.adminClient.listConsumerGroups().all())).forEach(consumerGroupListing -> {
                String groupId = consumerGroupListing.groupId();
                if (isValidGroup(groupId) && consumerGroupListing.isSimpleConsumerGroup()) {
                    this.consumerGroupInfos.computeIfAbsent(groupId, str -> {
                        return new ConsumerGroupInfo(str, "Empty");
                    });
                }
            });
        } catch (RuntimeException e) {
            LOG.error("Error while reading the simple consumer groups", e);
        }
        for (ConsumerGroupInfo consumerGroupInfo : this.consumerGroupInfos.values()) {
            String id = consumerGroupInfo.id();
            Map<String, Map<Integer, PartitionAssignment>> map = consumerGroupInfo.topicPartitionAssignments();
            if (map == null || map.isEmpty()) {
                HashMap hashMap = new HashMap();
                this.commitOffsets.getOrDefault(id, Collections.emptyMap()).forEach((topicPartition, offsetWithMetadata) -> {
                    long j = offsetWithMetadata.offset;
                    ((Map) hashMap.computeIfAbsent(topicPartition.topic(), str -> {
                        return new HashMap();
                    })).computeIfAbsent(Integer.valueOf(topicPartition.partition()), num -> {
                        Long orDefault = this.logEndOffsets.getOrDefault(topicPartition, 0L);
                        return new PartitionAssignment(Long.valueOf(orDefault.longValue() == 0 ? 0L : orDefault.longValue() - j), Long.valueOf(j), orDefault, "-", "-", "-", Long.valueOf(offsetWithMetadata.commitTimestamp));
                    });
                });
                consumerGroupInfo.setTopicPartitionAssignments(hashMap);
            } else {
                map.forEach((str, map2) -> {
                    map2.forEach((num, partitionAssignment) -> {
                        TopicPartition topicPartition2 = new TopicPartition(str, num.intValue());
                        Long orDefault = this.logEndOffsets.getOrDefault(topicPartition2, 0L);
                        OffsetWithMetadata orDefault2 = this.commitOffsets.getOrDefault(id, Collections.emptyMap()).getOrDefault(topicPartition2, OffsetWithMetadata.DEFAULT);
                        Long valueOf = Long.valueOf(orDefault2.offset);
                        partitionAssignment.updateOffsets(orDefault, valueOf, Long.valueOf(orDefault.longValue() == 0 ? 0L : orDefault.longValue() - valueOf.longValue()), Long.valueOf(orDefault2.commitTimestamp));
                    });
                });
            }
            consumerGroupInfo.setActive(isActive(consumerGroupInfo.topicPartitionAssignments(), this.inactiveGroupTimeoutMs.longValue()));
        }
    }

    private Map<String, ConsumerGroupInfo> buildConsumerGroups(Collection<String> collection) {
        HashMap hashMap = new HashMap();
        for (String str : collection) {
            try {
                GroupMetadata groupMetadata = this.groups.get(str);
                GroupSummary summary = groupMetadata.summary();
                String state = summary.state();
                LOG.trace("Consumer group: {}, state: {}", str, state);
                if (InvalidGroupStatus.GROUP_DEAD.state().equals(state) || !groupMetadata.isConsumerGroup()) {
                    LOG.debug("Ignoring Consumer group: {} with state: {} is the protocol consumerGroup: {}", new Object[]{str, state, Boolean.valueOf(groupMetadata.isConsumerGroup())});
                } else {
                    Map<String, Map<Integer, PartitionAssignment>> buildPartitionAssignments = buildPartitionAssignments(str, summary);
                    hashMap.put(str, new ConsumerGroupInfo(str, state, buildPartitionAssignments, isActive(buildPartitionAssignments, this.inactiveGroupTimeoutMs.longValue())));
                }
            } catch (Exception e) {
                LOG.error("Error while building ConsumerGroupInfos! groupName: {}, GroupMetadata: {}", new Object[]{str, this.groups.get(str), e});
            }
        }
        return hashMap;
    }

    static boolean isActive(Map<String, Map<Integer, PartitionAssignment>> map, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map<Integer, PartitionAssignment>> it = map.values().iterator();
        while (it.hasNext()) {
            Iterator<PartitionAssignment> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                if (it2.next().commitTimestamp().longValue() + j > currentTimeMillis) {
                    return true;
                }
            }
        }
        return false;
    }

    private void refreshLEOsForAllPartitions() {
        LOG.debug("Refreshing LEOs for all partitions");
        try {
            long currentTimeMillis = System.currentTimeMillis();
            Set<TopicPartition> set = this.topicPartitionsSupplier.get();
            this.logEndOffsets = this.kafkaConsumer.endOffsets(set);
            LOG.info("LEOs of all partitions [size: {}] fetched successfully in {} ms", Integer.valueOf(set.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (WakeupException e) {
            throw e;
        } catch (Exception e2) {
            LOG.warn("Error occurred while getting topics and offsets", e2);
        }
    }

    private Map<String, Map<Integer, PartitionAssignment>> buildPartitionAssignments(String str, GroupSummary groupSummary) {
        HashMap hashMap = new HashMap();
        Collection<MemberSummary> asJavaCollection = JavaConverters.asJavaCollection(groupSummary.members());
        Map<TopicPartition, OffsetWithMetadata> orDefault = this.commitOffsets.getOrDefault(str, Collections.emptyMap());
        HashSet hashSet = new HashSet();
        for (MemberSummary memberSummary : asJavaCollection) {
            String memberId = memberSummary.memberId();
            String clientId = memberSummary.clientId();
            String clientHost = memberSummary.clientHost();
            try {
                List partitions = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(memberSummary.assignment())).partitions();
                if (orDefault != null) {
                    partitions.forEach(topicPartition -> {
                        addToPartitionAssignments(hashMap, orDefault, topicPartition, memberId, clientId, clientHost);
                        hashSet.add(topicPartition);
                    });
                }
            } catch (Exception e) {
                LOG.error("Error while parsing Consumer! groupName: {}, ConsumerGroupSummary: {}, MemberSummary: {}", new Object[]{str, groupSummary, memberSummary, e});
            }
        }
        if (orDefault != null && !orDefault.isEmpty()) {
            ((List) orDefault.keySet().stream().filter(topicPartition2 -> {
                return !hashSet.contains(topicPartition2);
            }).collect(Collectors.toList())).forEach(topicPartition3 -> {
                addToPartitionAssignments(hashMap, orDefault, topicPartition3, "-", "-", "-");
            });
        }
        return hashMap;
    }

    private void addToPartitionAssignments(Map<String, Map<Integer, PartitionAssignment>> map, Map<TopicPartition, OffsetWithMetadata> map2, TopicPartition topicPartition, String str, String str2, String str3) {
        Long orDefault = this.logEndOffsets.getOrDefault(topicPartition, 0L);
        OffsetWithMetadata orDefault2 = map2.getOrDefault(topicPartition, OffsetWithMetadata.DEFAULT);
        Long valueOf = Long.valueOf(orDefault2.offset);
        map.computeIfAbsent(topicPartition.topic(), str4 -> {
            return new HashMap();
        }).put(Integer.valueOf(topicPartition.partition()), new PartitionAssignment(Long.valueOf(orDefault.longValue() == 0 ? 0L : orDefault.longValue() - valueOf.longValue()), valueOf, orDefault, str, str2, str3, Long.valueOf(orDefault2.commitTimestamp)));
    }

    private void handleGroupMetadataKey(Set<String> set, Set<String> set2, GroupMetadataKey groupMetadataKey, ByteBuffer byteBuffer) {
        String key = groupMetadataKey.key();
        GroupMetadata readGroupMessageValue = GroupMetadataManager.readGroupMessageValue(key, byteBuffer, Time.SYSTEM);
        LOG.debug("Received group metadata [{}], [{}]", key, readGroupMessageValue);
        if (readGroupMessageValue == null) {
            LOG.debug("Removing group [{}]", key);
            this.groups.remove(key);
            set2.add(key);
            set.remove(key);
            return;
        }
        if (isValidGroup(key)) {
            this.groups.put(key, readGroupMessageValue);
            set.add(key);
        }
    }

    private boolean isValidGroup(String str) {
        return (str == null || str.isEmpty()) ? false : true;
    }

    private void handleOffsetKey(OffsetKey offsetKey, ByteBuffer byteBuffer) {
        GroupTopicPartition key = offsetKey.key();
        if (byteBuffer == null) {
            Map<TopicPartition, OffsetWithMetadata> map = this.commitOffsets.get(key.group());
            if (map != null) {
                map.remove(key.topicPartition());
                return;
            }
            return;
        }
        OffsetAndMetadata readOffsetMessageValue = GroupMetadataManager.readOffsetMessageValue(byteBuffer);
        if (readOffsetMessageValue != null) {
            this.commitOffsets.computeIfAbsent(key.group(), str -> {
                return new HashMap();
            }).put(key.topicPartition(), new OffsetWithMetadata(readOffsetMessageValue.offset(), readOffsetMessageValue.commitTimestamp()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, ConsumerGroupInfo> consumerGroupInfos() {
        return Collections.unmodifiableMap(this.consumerGroupInfos);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.closed.set(true);
        this.adminClient.close(Duration.ofMinutes(1L));
        this.kafkaConsumer.wakeup();
        LOG.info("Closed the ConsumerGroup fetcher task");
    }
}
