package com.hortonworks.smm.kafka.services.connect;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.hortonworks.registries.auth.KerberosLogin;
import com.hortonworks.registries.auth.Login;
import com.hortonworks.registries.auth.NOOPLogin;
import com.hortonworks.registries.auth.util.JaasConfiguration;
import com.hortonworks.smm.kafka.common.config.KafkaConnectConfig;
import com.hortonworks.smm.kafka.common.errors.KafkaConnectConfigValidationException;
import com.hortonworks.smm.kafka.services.SSLReverseProxy;
import com.hortonworks.smm.kafka.services.Service;
import com.hortonworks.smm.kafka.services.connect.rest.ConfigInfos;
import com.hortonworks.smm.kafka.services.connect.rest.ConnectorConfigAndStatus;
import com.hortonworks.smm.kafka.services.connect.rest.ConnectorPermissions;
import com.hortonworks.smm.kafka.services.connect.rest.ConnectorPluginDTO;
import com.hortonworks.smm.kafka.services.connect.rest.ConnectorTopicsResponse;
import com.hortonworks.smm.kafka.services.connect.rest.KafkaConnectErrorResponse;
import com.hortonworks.smm.kafka.services.connect.rest.TaskConfigResponse;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.security.auth.login.LoginException;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/hortonworks/smm/kafka/services/connect/KafkaConnectClient.class */
public class KafkaConnectClient implements Service {
    public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
    public static final String DEFAULT_JAAS_CONFIG = "java.security.auth.login.config";
    public static final String JAAS_CONFIG_ENTRY = "KafkaConnectClient";
    public static final String RESUME_CONNECTOR = "connectors/{connector}/resume";
    public static final String RESTART_CONNECTOR = "connectors/{connector}/restart";
    public static final String PAUSE_CONNECTOR = "connectors/{connector}/pause";
    public static final String TASKS = "connectors/{connector}/tasks";
    public static final String CONNECTOR_TOPICS = "connectors/{connector}/topics";
    public static final String CONNECTOR = "connectors/{connector}";
    public static final String CONNECTORS = "connectors";
    public static final String VALIDATE_CONFIG = "connector-plugins/{connector}/config/validate";
    public static final String CONNECTOR_CONFIG = "connectors/{connector}/config";
    public static final String RESTART_TASK = "connectors/{connector}/tasks/{task}/restart";
    public static final String CONNECTOR_PLUGINS = "connector-plugins";
    public static final String CONNECTOR_PERMISSIONS = "connector-permissions";
    public static final String TASK_TEMPLATE = "task";
    public static final String CONNECTOR_TEMPLATE = "connector";
    private String clientRoot;

    @VisibleForTesting
    Login login;

    @VisibleForTesting
    Client webClient;
    private static final Logger log = LoggerFactory.getLogger(KafkaConnectClient.class);
    private static final Pattern KERBEROS_TO_LOCAL = Pattern.compile("(.*?)[/@].*");

    @Inject
    public KafkaConnectClient(@Nullable KafkaConnectConfig kafkaConnectConfig) {
        if (kafkaConnectConfig == null) {
            log.info("Kafka Connect Client is not configured");
            return;
        }
        initializeSecurityContext(kafkaConnectConfig);
        this.webClient = SSLReverseProxy.createClient(kafkaConnectConfig.properties());
        this.clientRoot = String.format("%s://%s:%d/%s", kafkaConnectConfig.protocol(), kafkaConnectConfig.host(), kafkaConnectConfig.port(), Objects.toString(kafkaConnectConfig.prefix(), ""));
    }

    public boolean isConfigured() {
        return this.webClient != null;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.webClient != null) {
            this.webClient.close();
        }
        if (this.login != null) {
            try {
                this.login.close();
            } catch (Exception e) {
                log.warn("Exception thrown while closing the kerberos login.", e);
            }
        }
    }

    public void configureConnector(String str, Map<String, ?> map, @Nullable Principal principal) {
        okOrThrow(putRequest(CONNECTOR_CONFIG, str, map, principal));
    }

    protected void initializeSecurityContext(KafkaConnectConfig kafkaConnectConfig) {
        this.login = tryJaasLogin(kafkaConnectConfig).orElse(new NOOPLogin());
    }

    private Optional<Login> tryJaasLogin(KafkaConnectConfig kafkaConnectConfig) {
        String str = (String) kafkaConnectConfig.properties().get(SASL_JAAS_CONFIG);
        if (str != null) {
            KerberosLogin kerberosLogin = kerberosLogin();
            try {
                kerberosLogin.configure(new HashMap(), JAAS_CONFIG_ENTRY, new JaasConfiguration(JAAS_CONFIG_ENTRY, str));
                kerberosLogin.login();
                return Optional.of(kerberosLogin);
            } catch (LoginException e) {
                log.error("Failed to initialize the dynamic JAAS config: " + str, e);
            } catch (Exception e2) {
                log.error("Failed to parse the dynamic JAAS config: " + str, e2);
            }
        }
        log.info("Dynamic JAAS login unsuccessful, trying static JAAS login.");
        String property = System.getProperty(DEFAULT_JAAS_CONFIG);
        if (property == null || property.trim().isEmpty()) {
            log.warn("System property for jaas config file is not defined. This is okay if Connect is not running in secured mode");
        } else {
            KerberosLogin kerberosLogin2 = kerberosLogin();
            kerberosLogin2.configure(new HashMap(), JAAS_CONFIG_ENTRY);
            try {
                kerberosLogin2.login();
                return Optional.of(kerberosLogin2);
            } catch (LoginException e3) {
                log.error("Could not login using jaas config section KafkaConnectClient", e3);
            }
        }
        return Optional.empty();
    }

    public Response proxyValidateConnectorConfig(String str, Map<String, ?> map, @Nullable Principal principal) {
        return okOrThrow(putRequest(VALIDATE_CONFIG, str, map, principal));
    }

    public ConfigInfos validateConnectorConfig(String str, Map<String, String> map, @Nullable Principal principal) {
        Response putRequest = putRequest(VALIDATE_CONFIG, str, map, principal);
        if (putRequest.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) {
            return (ConfigInfos) putRequest.readEntity(ConfigInfos.class);
        }
        throw new KafkaConnectConfigValidationException(putRequest.getStatus(), ((KafkaConnectErrorResponse) putRequest.readEntity(KafkaConnectErrorResponse.class)).getMessage());
    }

    public Collection<ConnectorPluginDTO> getConnectorPlugins(@Nullable Principal principal) {
        return (Collection) doAsLoggedIn(() -> {
            return (Collection) target(CONNECTOR_PLUGINS, principal).request().get(new GenericType<Collection<ConnectorPluginDTO>>() { // from class: com.hortonworks.smm.kafka.services.connect.KafkaConnectClient.1
            });
        });
    }

    public void deleteConnector(String str, @Nullable Principal principal) {
        okOrThrow((Response) doAsLoggedIn(() -> {
            return target(CONNECTOR, principal).resolveTemplate("connector", str).request().delete();
        }));
    }

    public Map<String, ConnectorConfigAndStatus> getConnectorConfigsAndStatuses(@Nullable Principal principal) {
        return (Map) doAsLoggedIn(() -> {
            return (Map) target("connectors?expand=status&expand=info", principal).request().get(new GenericType<Map<String, ConnectorConfigAndStatus>>() { // from class: com.hortonworks.smm.kafka.services.connect.KafkaConnectClient.2
            });
        });
    }

    private Response putRequest(String str, String str2, Map<String, ?> map, @Nullable Principal principal) {
        Entity json = Entity.json(map);
        return (Response) doAsLoggedIn(() -> {
            return target(str, principal).resolveTemplate("connector", str2).request().header("Content-Type", "application/json").put(json);
        });
    }

    public Map<String, ConnectorTopicsResponse> getConnectorTopics(String str, @Nullable Principal principal) {
        return (Map) doAsLoggedIn(() -> {
            try {
                return (Map) target(CONNECTOR_TOPICS, principal).resolveTemplate("connector", str).request().get(new GenericType<Map<String, ConnectorTopicsResponse>>() { // from class: com.hortonworks.smm.kafka.services.connect.KafkaConnectClient.3
                });
            } catch (WebApplicationException e) {
                if (e.getResponse().getStatus() != 403) {
                    throw e;
                }
                log.info("Connect active topic tracking is disabled, cannot list active topics of connector {}.", str);
                return Collections.emptyMap();
            }
        });
    }

    public List<TaskConfigResponse> getTaskConfigs(String str, @Nullable Principal principal) {
        return (List) doAsLoggedIn(() -> {
            return (List) target(TASKS, principal).resolveTemplate("connector", str).request().get(new GenericType<List<TaskConfigResponse>>() { // from class: com.hortonworks.smm.kafka.services.connect.KafkaConnectClient.4
            });
        });
    }

    public void pauseConnector(String str, @Nullable Principal principal) {
        Entity json = Entity.json("");
        okOrThrow((Response) doAsLoggedIn(() -> {
            return target(PAUSE_CONNECTOR, principal).resolveTemplate("connector", str).request().put(json);
        }));
    }

    public void restartConnector(String str, @Nullable Principal principal) {
        okOrThrow((Response) doAsLoggedIn(() -> {
            return target(RESTART_CONNECTOR, principal).resolveTemplate("connector", str).request().post((Entity) null);
        }));
    }

    public void resumeConnector(String str, @Nullable Principal principal) {
        Entity json = Entity.json("");
        okOrThrow((Response) doAsLoggedIn(() -> {
            return target(RESUME_CONNECTOR, principal).resolveTemplate("connector", str).request().put(json);
        }));
    }

    public void restartTask(String str, int i, @Nullable Principal principal) {
        okOrThrow((Response) doAsLoggedIn(() -> {
            return target(RESTART_TASK, principal).resolveTemplate("connector", str).resolveTemplate("task", Integer.valueOf(i)).request().post((Entity) null);
        }));
    }

    public ConnectorPermissions getConnectorPermissions(@Nullable Principal principal) {
        return (ConnectorPermissions) doAsLoggedIn(() -> {
            return (ConnectorPermissions) target(CONNECTOR_PERMISSIONS, principal).request().get(ConnectorPermissions.class);
        });
    }

    private Response okOrThrow(Response response) {
        if (response.getStatusInfo().getFamily() != Response.Status.Family.SUCCESSFUL) {
            throw new WebApplicationException("Received error response from internal Connect REST API.", response);
        }
        return response;
    }

    private <T> T doAsLoggedIn(PrivilegedAction<T> privilegedAction) {
        try {
            return (T) this.login.doAction(privilegedAction);
        } catch (LoginException e) {
            throw new ProcessingException("An error occurred during Kerberos login", e);
        }
    }

    private WebTarget target(String str, @Nullable Principal principal) {
        Preconditions.checkState(isConfigured(), "Connect Service not configured.");
        if (null != principal) {
            str = str + (str.indexOf(63) == -1 ? '?' : '&') + "doAs=" + KERBEROS_TO_LOCAL.matcher(principal.getName()).replaceFirst("$1");
        }
        log.info("Building request for endpoint: {}", str);
        return this.webClient.target(this.clientRoot + str);
    }

    @VisibleForTesting
    KerberosLogin kerberosLogin() {
        return new KerberosLogin();
    }
}
