package com.hortonworks.smm.kafka.services.clients;

import com.google.common.annotations.VisibleForTesting;
import com.hortonworks.smm.kafka.common.config.KafkaAdminClientConfig;
import com.hortonworks.smm.kafka.common.config.KafkaMetricsConfig;
import com.hortonworks.smm.kafka.common.utils.ThreadUtils;
import com.hortonworks.smm.kafka.services.Service;
import com.hortonworks.smm.kafka.services.clients.dtos.ConsumerGroupInfo;
import com.hortonworks.smm.kafka.services.clients.dtos.ConsumerInfo;
import com.hortonworks.smm.kafka.services.clients.dtos.ConsumerOffsetResetParams;
import com.hortonworks.smm.kafka.services.metric.MetricsService;
import com.hortonworks.smm.kafka.services.security.AuthorizationException;
import com.hortonworks.smm.kafka.services.security.SMMAuthorizer;
import com.hortonworks.smm.kafka.services.security.SecurityUtil;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.SecurityContext;
import kafka.admin.ConsumerGroupCommand;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.jdk.javaapi.CollectionConverters;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/services/clients/ConsumerGroupManagementService.class */
public class ConsumerGroupManagementService implements Service {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerGroupManagementService.class);
    private ConsumerGroupsService consumerGroupsService;
    private final ScheduledExecutorService scheduler = ThreadUtils.createScheduledExecutorService(1, "consumer-metric-service-task-%d", true);
    private MetricsService metricsService;
    private ConsumerGroupManagementTask consumerGroupManagementTask;
    private KafkaAdminClientConfig kafkaAdminClientConfig;
    private AdminClient adminClient;
    private SMMAuthorizer authorizer;

    @Inject
    public ConsumerGroupManagementService(KafkaMetricsConfig kafkaMetricsConfig, ConsumerGroupsService consumerGroupsService, MetricsService metricsService, AdminClient adminClient, KafkaAdminClientConfig kafkaAdminClientConfig, SMMAuthorizer sMMAuthorizer) {
        this.metricsService = metricsService;
        this.consumerGroupsService = consumerGroupsService;
        this.adminClient = adminClient;
        this.kafkaAdminClientConfig = kafkaAdminClientConfig;
        this.authorizer = sMMAuthorizer;
        if (kafkaMetricsConfig != null) {
            this.consumerGroupManagementTask = new ConsumerGroupManagementTask(consumerGroupsService, metricsService, kafkaMetricsConfig);
            refreshValuesAndEmitMetrics();
            long consumerGroupRefreshIntervalMs = kafkaMetricsConfig.getConsumerGroupRefreshIntervalMs();
            this.scheduler.scheduleWithFixedDelay(this.consumerGroupManagementTask, consumerGroupRefreshIntervalMs, consumerGroupRefreshIntervalMs, TimeUnit.MILLISECONDS);
            LOG.info("Consumer group thread started with cache waitTillRefresh interval : {} ms", Long.valueOf(consumerGroupRefreshIntervalMs));
        }
    }

    @VisibleForTesting
    void refreshValuesAndEmitMetrics() {
        this.consumerGroupsService.refreshLEOAndwaitTillNextRefreshEnd();
        this.consumerGroupManagementTask.run();
    }

    void refreshConsumerGroups() {
        this.consumerGroupsService.refreshLEOAndwaitTillNextRefreshEnd();
        this.consumerGroupManagementTask.refreshGroups();
    }

    @VisibleForTesting
    void setMetricsService(MetricsService metricsService) {
        this.metricsService = metricsService;
    }

    public Collection<String> consumerGroupNames() {
        return consumerGroups().names();
    }

    public Collection<ConsumerGroupInfo> allConsumerGroups() {
        return consumerGroups().all();
    }

    private ConsumerGroups consumerGroups() {
        return this.consumerGroupManagementTask.consumerGroups();
    }

    public Collection<ConsumerGroupInfo> consumerGroups(ClientState clientState) {
        switch (clientState) {
            case active:
                return activeConsumerGroups();
            case inactive:
                return inactiveConsumerGroups();
            case all:
                return allConsumerGroups();
            default:
                throw new IllegalArgumentException("Given ClientState [" + clientState + "] is not supported");
        }
    }

    public Collection<ConsumerGroupInfo> consumerGroups(ClientState clientState, String... strArr) {
        return (Collection) consumerGroups(clientState).stream().filter(consumerGroupInfo -> {
            return CollectionUtils.containsAny(consumerGroupInfo.topicPartitionAssignments().keySet(), Arrays.asList(strArr));
        }).collect(Collectors.toList());
    }

    private Collection<ConsumerGroupInfo> activeConsumerGroups() {
        return consumerGroups().activeConsumerGroups();
    }

    private Collection<ConsumerGroupInfo> inactiveConsumerGroups() {
        return consumerGroups().inactiveConsumerGroups();
    }

    public ConsumerGroupInfo consumerGroup(String str) {
        return consumerGroups().getGroup(str);
    }

    public ConsumerInfo consumerInfo(String str) {
        return consumerGroups().getConsumerInfo(str);
    }

    public void resetOffsets(String str, ConsumerOffsetResetParams consumerOffsetResetParams, SecurityContext securityContext) {
        if (!SecurityUtil.authorizeGroupDescribe(this.authorizer, securityContext, str)) {
            throw new NotFoundException(str + " group not found");
        }
        if (!SecurityUtil.authorizeGroupRead(this.authorizer, securityContext, str)) {
            throw new AuthorizationException("The user is not authorized to reset offset for consumer group : " + str);
        }
        checkGroupInactiveState(str);
        ArrayList arrayList = new ArrayList();
        List<ConsumerOffsetResetParams.TopicPartitionsPair> topics = consumerOffsetResetParams.getTopics();
        if (((List) topics.stream().map((v0) -> {
            return v0.getTopic();
        }).collect(Collectors.toList())).contains(ConsumerOffsetResetParams.TOPIC_WILD_CARD)) {
            arrayList.addAll(getAllAuthorizedCommittedOffsetTopicArgs(str, securityContext));
        } else {
            for (ConsumerOffsetResetParams.TopicPartitionsPair topicPartitionsPair : topics) {
                authorizeTopic(topicPartitionsPair.getTopic(), securityContext);
                arrayList.add("--topic");
                if (topicPartitionsPair.getPartitions() == null || topicPartitionsPair.getPartitions().isEmpty()) {
                    arrayList.add(topicPartitionsPair.getTopic());
                } else {
                    arrayList.add(String.format("%s:%s", topicPartitionsPair.getTopic(), String.join(",", (List) topicPartitionsPair.getPartitions().stream().map((v0) -> {
                        return v0.toString();
                    }).collect(Collectors.toList()))));
                }
            }
        }
        if (!arrayList.contains("--topic") && !arrayList.contains("--all-topics")) {
            LOG.info("Cancelling reset offset operation. No topics to reset for {}.", str);
            return;
        }
        arrayList.add("--bootstrap-server");
        arrayList.add((String) this.kafkaAdminClientConfig.getConfig().get("bootstrap.servers"));
        arrayList.add("--group");
        arrayList.add(str);
        arrayList.add("--reset-offsets");
        OffsetResetScenario scenario = consumerOffsetResetParams.getScenario();
        arrayList.add("--" + scenario.getName());
        String scenarioArg = consumerOffsetResetParams.getScenarioArg();
        if (OffsetResetScenario.OFFSET.equals(scenario)) {
            try {
                Long.parseLong(scenarioArg);
                arrayList.add(scenarioArg);
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException(String.format("Offset reset scenario argument (%s) is not a number.", scenarioArg));
            }
        } else if (OffsetResetScenario.DATETIME.equals(scenario)) {
            try {
                Utils.getDateTime(scenarioArg);
                arrayList.add(scenarioArg);
            } catch (ParseException e2) {
                throw new IllegalArgumentException(String.format("Offset reset scenario argument (%s) is an invalid timestamp format.", scenarioArg));
            }
        }
        arrayList.add("--execute");
        executeResetOffset(arrayList);
        try {
            refreshConsumerGroups();
        } catch (Exception e3) {
            LOG.error("Error occoured while refreshing SMM's internal ConsumerGroupsState after resetting offsets! Offset reset was still probably successful!", e3);
        }
    }

    public Collection<ConsumerInfo> allConsumerInfo() {
        return consumerGroups().allConsumers();
    }

    public MetricsService metricsService() {
        return this.metricsService;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.scheduler.shutdownNow();
        this.scheduler.awaitTermination(300000L, TimeUnit.MILLISECONDS);
        if (this.consumerGroupsService != null) {
            this.consumerGroupsService.close();
        }
    }

    @VisibleForTesting
    protected void executeResetOffset(List<String> list) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : this.kafkaAdminClientConfig.getConfig().entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().toString());
        }
        try {
            new ConsumerGroupCommand.ConsumerGroupService(new ConsumerGroupCommand.ConsumerGroupCommandOptions((String[]) list.toArray(new String[0])), CollectionConverters.asScala(hashMap)).resetOffsets();
        } catch (IllegalStateException e) {
            throw new IllegalStateException("Unexpected error while resetting offsets");
        } catch (Exception e2) {
            Throwable cause = e2.getCause();
            if (cause instanceof UnknownTopicOrPartitionException) {
                throw new IllegalArgumentException("Unknown topic or partition was provided as argument for resetting offsets.", e2);
            }
            if (!(cause instanceof LeaderNotAvailableException)) {
                throw e2;
            }
            throw new IllegalArgumentException("There is no currently available leader for the given partition.", e2);
        }
    }

    @VisibleForTesting
    protected Set<String> queryCommittedOffsetTopics(String str) {
        try {
            return (Set) ((Map) this.adminClient.listConsumerGroupOffsets(str, new ListConsumerGroupOffsetsOptions().timeoutMs(Integer.valueOf((int) this.kafkaAdminClientConfig.getRequestTimeoutMs()))).partitionsToOffsetAndMetadata().get(this.kafkaAdminClientConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS)).keySet().stream().map((v0) -> {
                return v0.topic();
            }).collect(Collectors.toSet());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException("Error while listing consumer group offsets from Kafka", e);
        }
    }

    @VisibleForTesting
    protected ConsumerGroupDescription queryConsumerGroupDescription(String str) {
        return (ConsumerGroupDescription) ((Map) this.adminClient.describeConsumerGroups(Collections.singletonList(str), new DescribeConsumerGroupsOptions().timeoutMs(Integer.valueOf((int) this.kafkaAdminClientConfig.getRequestTimeoutMs()))).describedGroups().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            try {
                return (ConsumerGroupDescription) ((KafkaFuture) entry.getValue()).get(this.kafkaAdminClientConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new RuntimeException("Error while describing consumer group from Kafka", e);
            }
        }))).values().stream().findFirst().orElseThrow(() -> {
            return new RuntimeException(String.format("No result while describing consumer group %s from Kafka", str));
        });
    }

    private void checkGroupInactiveState(String str) {
        String consumerGroupState = queryConsumerGroupDescription(str).state().toString();
        if (!consumerGroupState.equals(ConsumerGroupState.EMPTY.toString()) && !consumerGroupState.equals(ConsumerGroupState.DEAD.toString())) {
            throw new IllegalArgumentException(String.format("Assignments can only be reset if the group %s is inactive, but the current state is %s.", str, consumerGroupState));
        }
    }

    private List<String> getAllAuthorizedCommittedOffsetTopicArgs(String str, SecurityContext securityContext) {
        Set<String> queryCommittedOffsetTopics = queryCommittedOffsetTopics(str);
        if (queryCommittedOffsetTopics.size() == 0) {
            LOG.debug("No committed offset topics exist for group {}", str);
            return Collections.EMPTY_LIST;
        }
        Set<String> set = (Set) queryCommittedOffsetTopics.stream().filter(str2 -> {
            return SecurityUtil.authorizeTopicRead(this.authorizer, securityContext, str2);
        }).collect(Collectors.toSet());
        if (set.size() == 0) {
            LOG.debug("No authorized committed offset topics exist for group {}", str);
            return Collections.EMPTY_LIST;
        }
        ArrayList arrayList = new ArrayList();
        if (queryCommittedOffsetTopics.size() == set.size()) {
            arrayList.add("--all-topics");
        } else {
            for (String str3 : set) {
                arrayList.add("--topic");
                arrayList.add(str3);
            }
        }
        return arrayList;
    }

    private void authorizeTopic(String str, SecurityContext securityContext) {
        if (!SecurityUtil.authorizeTopicRead(this.authorizer, securityContext, str)) {
            throw new AuthorizationException("The user is not authorized to reset offset on the provided topics.");
        }
    }
}
