package com.hortonworks.smm.kafka.services.connect;

import com.hortonworks.smm.kafka.common.config.KafkaConnectConfig;
import com.hortonworks.smm.kafka.common.errors.KafkaConnectConfigValidationException;
import com.hortonworks.smm.kafka.services.connect.core.Connector;
import com.hortonworks.smm.kafka.services.connect.core.ConnectorOperation;
import com.hortonworks.smm.kafka.services.connect.core.ConnectorTopics;
import com.hortonworks.smm.kafka.services.connect.core.Task;
import com.hortonworks.smm.kafka.services.connect.core.TaskOperation;
import com.hortonworks.smm.kafka.services.connect.dtos.SamplePluginConfig;
import com.hortonworks.smm.kafka.services.connect.plugins.SamplePluginConfigGenerator;
import com.hortonworks.smm.kafka.services.connect.rest.ConnectorConfigAndStatus;
import com.hortonworks.smm.kafka.services.connect.rest.ConnectorPermissions;
import com.hortonworks.smm.kafka.services.connect.rest.ConnectorPluginDTO;
import com.hortonworks.smm.kafka.services.connect.rest.ConnectorTopicsResponse;
import com.hortonworks.smm.kafka.services.connect.rest.TaskConfigResponse;
import com.hortonworks.smm.kafka.services.connect.rest.TaskStatusResponse;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/services/connect/KafkaConnectService.class */
public class KafkaConnectService {
    private static final Logger log = LoggerFactory.getLogger(KafkaConnectService.class);
    private KafkaConnectConfig kafkaConnectConfig;
    private KafkaConnectClient client;

    @Inject
    public KafkaConnectService(@Nullable KafkaConnectConfig kafkaConnectConfig, KafkaConnectClient kafkaConnectClient) {
        if (kafkaConnectConfig == null) {
            log.info("Kafka Connect service is not configured");
        } else {
            this.kafkaConnectConfig = kafkaConnectConfig;
            this.client = kafkaConnectClient;
        }
    }

    public List<Connector> getConnectors(@Nullable Principal principal) {
        return (List) this.client.getConnectorConfigsAndStatuses(principal).entrySet().parallelStream().map(entry -> {
            return gatherConnector((String) entry.getKey(), (ConnectorConfigAndStatus) entry.getValue(), principal);
        }).collect(Collectors.toList());
    }

    private Connector gatherConnector(String str, ConnectorConfigAndStatus connectorConfigAndStatus, @Nullable Principal principal) {
        ConnectorTopicsResponse connectorTopicsResponse = this.client.getConnectorTopics(str, principal).get(str);
        return Connector.builder().name(str).workerId(connectorConfigAndStatus.getStatus().getConnector().getWorkerId()).type(connectorConfigAndStatus.getInfo().getType()).state(connectorConfigAndStatus.getStatus().getConnector().getState()).trace(connectorConfigAndStatus.getStatus().getConnector().getTrace()).config(new HashMap(connectorConfigAndStatus.getInfo().getConfig())).tasks(buildAllTasks(str, connectorConfigAndStatus, principal)).topics(connectorTopicsResponse == null ? null : new ConnectorTopics(connectorTopicsResponse.getConnector(), connectorTopicsResponse.getTopics())).build();
    }

    private List<Task> buildAllTasks(String str, ConnectorConfigAndStatus connectorConfigAndStatus, @Nullable Principal principal) {
        List<TaskConfigResponse> taskConfigs = this.client.getTaskConfigs(str, principal);
        Map map = (Map) connectorConfigAndStatus.getStatus().getTasks().stream().collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, Function.identity()));
        return (List) taskConfigs.stream().filter(taskConfigResponse -> {
            return map.containsKey(Integer.valueOf(taskConfigResponse.getId().getTask()));
        }).map(taskConfigResponse2 -> {
            return buildSingleTask(taskConfigResponse2, (TaskStatusResponse) map.get(Integer.valueOf(taskConfigResponse2.getId().getTask())));
        }).collect(Collectors.toList());
    }

    public void connectorOperation(String str, ConnectorOperation connectorOperation, @Nullable Principal principal) {
        switch (connectorOperation) {
            case PAUSE:
                this.client.pauseConnector(str, principal);
                return;
            case RESTART:
                this.client.restartConnector(str, principal);
                return;
            case RESUME:
                this.client.resumeConnector(str, principal);
                return;
            default:
                throw new UnsupportedOperationException(String.format("No support for ConnectorOperation %s", connectorOperation));
        }
    }

    public void taskOperation(String str, int i, TaskOperation taskOperation, @Nullable Principal principal) {
        if (taskOperation != TaskOperation.RESTART) {
            throw new UnsupportedOperationException(String.format("No support for TaskOperation %s", taskOperation));
        }
        this.client.restartTask(str, i, principal);
    }

    public boolean isConfigured() {
        return this.client != null && this.client.isConfigured();
    }

    public Collection<ConnectorPluginDTO> getConnectorPlugins(@Nullable Principal principal) {
        return this.client.getConnectorPlugins(principal);
    }

    @Deprecated
    public Response proxyValidateConnectorConfig(String str, Map<String, ?> map, @Nullable Principal principal) {
        return this.client.proxyValidateConnectorConfig(str, map, principal);
    }

    public Map<String, List<String>> validateConnectorConfig(Map<String, String> map, Principal principal) {
        if (map == null || !map.containsKey(NifiFlowService.CONNECTOR_CLASS)) {
            throw new KafkaConnectConfigValidationException(String.format("The %s field must be present in configs", NifiFlowService.CONNECTOR_CLASS));
        }
        map.entrySet().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).filter(entry -> {
            return entry.getValue() == null;
        }).findAny().ifPresent(entry2 -> {
            throw new KafkaConnectConfigValidationException(String.format("Please use non null value for key: %s", entry2.getKey()));
        });
        return (Map) this.client.validateConnectorConfig(map.get(NifiFlowService.CONNECTOR_CLASS), map, principal).getConfigs().stream().map((v0) -> {
            return v0.getConfigValue();
        }).filter(configValueInfo -> {
            return !configValueInfo.getErrors().isEmpty();
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getName();
        }, Collectors.mapping((v0) -> {
            return v0.getErrors();
        }, Collector.of(ArrayList::new, (v0, v1) -> {
            v0.addAll(v1);
        }, (list, list2) -> {
            list.addAll(list2);
            return list;
        }, new Collector.Characteristics[0]))));
    }

    public void deleteConnector(String str, @Nullable Principal principal) {
        this.client.deleteConnector(str, principal);
    }

    public void configureConnector(String str, Map<String, ?> map, @Nullable Principal principal) {
        this.client.configureConnector(str, map, principal);
    }

    public Optional<SamplePluginConfig> getSamplePluginConfig(String str, @Nullable Principal principal) {
        String computeShortClassName = computeShortClassName(str);
        if (!isExistingPlugin(computeShortClassName, principal)) {
            return Optional.empty();
        }
        try {
            return Optional.of(SamplePluginConfigGenerator.generate(this.kafkaConnectConfig.getSamplePluginConfigsPath(), computeShortClassName));
        } catch (Exception e) {
            throw new IllegalStateException("Failed to generate sample plugin configuration", e);
        }
    }

    private String computeShortClassName(String str) {
        return str.substring(str.lastIndexOf(".") + 1);
    }

    public ConnectorPermissions getConnectorPermissions(@Nullable Principal principal) {
        return this.client.getConnectorPermissions(principal);
    }

    public Map<String, String> computeConfig(Map<String, Object> map) {
        validateConfig(map);
        return ConnectorConfigTransformer.transformValuesToString(map);
    }

    private void validateConfig(Map<String, Object> map) {
        map.entrySet().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).filter(entry -> {
            return !(entry.getValue() instanceof String);
        }).filter(entry2 -> {
            return !(entry2.getValue() instanceof Number);
        }).filter(entry3 -> {
            return !(entry3.getValue() instanceof Boolean);
        }).findAny().ifPresent(entry4 -> {
            throw new KafkaConnectConfigValidationException(String.format("PUT document value for %s must be a String, Number or Boolean. Found %s.", entry4.getKey(), entry4.getValue()));
        });
    }

    private boolean isExistingPlugin(String str, @Nullable Principal principal) {
        return this.client.getConnectorPlugins(principal).stream().map((v0) -> {
            return v0.getConnectorPluginClass();
        }).anyMatch(str2 -> {
            return str2.endsWith("." + str);
        });
    }

    private Task buildSingleTask(TaskConfigResponse taskConfigResponse, TaskStatusResponse taskStatusResponse) {
        return Task.builder().id(taskConfigResponse.getId().getTask()).workerId(taskStatusResponse.getWorkerId()).state(taskStatusResponse.getState()).config(taskConfigResponse.getConfig()).trace(taskStatusResponse.getTrace()).build();
    }
}
