package com.hortonworks.smm.kafka.webservice.resources.connect;

import com.codahale.metrics.annotation.Timed;
import com.google.common.collect.ImmutableMap;
import com.hortonworks.smm.kafka.common.errors.NotFoundException;
import com.hortonworks.smm.kafka.services.connect.EnhancedSampleConfigsService;
import com.hortonworks.smm.kafka.services.connect.KafkaConnectService;
import com.hortonworks.smm.kafka.services.connect.NifiFlowService;
import com.hortonworks.smm.kafka.services.connect.core.ConnectorOperation;
import com.hortonworks.smm.kafka.services.connect.core.TaskOperation;
import com.hortonworks.smm.kafka.services.connect.dtos.ConnectorTemplate;
import com.hortonworks.smm.kafka.services.connect.dtos.ConnectorsResponse;
import com.hortonworks.smm.kafka.services.connect.dtos.SamplePluginConfig;
import com.hortonworks.smm.kafka.services.connect.rest.ConnectorPermissions;
import com.hortonworks.smm.kafka.services.connect.rest.ConnectorPluginDTO;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import java.security.Principal;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;

@Api(description = "End point for Kafka Connect operations", tags = {KafkaConnectResource.DESCRIPTION})
@Produces({"application/json"})
@Path("/api/v1/admin/kafka-connect")
/* loaded from: input_file:com/hortonworks/smm/kafka/webservice/resources/connect/KafkaConnectResource.class */
public class KafkaConnectResource {
    private static final String DESCRIPTION = "Kafka Connect operations";
    public static final String ENHANCE_CONFIG_INPUT_DESC = "Provided configuration to expand with the missing parameters.";
    public static final String ENHANCE_CONFIG_API_NOTES = "In case of a non Stateless NiFi connector it returns the configuration from the request expanded with the missing sample configuration for a given connector. In case of a Predefined Stateless NiFi connector it returns the configuration from the request expanded with the sample configuration and the predefined flow parameters. In case of a Non Predefined Stateless NiFi connector a flow.snapshot configuration should be provided, and it returns the configuration from the request expanded with the sample configuration and the flow parameters from the flow.snapshot.";
    private final KafkaConnectService kafkaConnectService;
    private final NifiFlowService nifiFlowService;
    private final EnhancedSampleConfigsService enhancedSampleConfigsService;
    private final KafkaConnectPresenter presenter = new KafkaConnectPresenter();

    @Inject
    public KafkaConnectResource(KafkaConnectService kafkaConnectService, NifiFlowService nifiFlowService, EnhancedSampleConfigsService enhancedSampleConfigsService) {
        this.kafkaConnectService = kafkaConnectService;
        this.nifiFlowService = nifiFlowService;
        this.enhancedSampleConfigsService = enhancedSampleConfigsService;
    }

    @GET
    @Path("/connectors")
    @Timed
    @ApiOperation(value = "Get info for all configured Connectors", response = ConnectorsResponse.class, tags = {DESCRIPTION})
    public ConnectorsResponse connectors(@Context SecurityContext securityContext) {
        return this.presenter.presentConnectorResponses((List) this.kafkaConnectService.getConnectors(securityContext.getUserPrincipal()).stream().map(connector -> {
            return this.presenter.presentConnector(connector, this.nifiFlowService.transformConfigToPresent(connector.getConfig()));
        }).collect(Collectors.toList()));
    }

    @Path("/connectors/{connector}")
    @Timed
    @ApiOperation(value = "Create or reconfigure a Connector", tags = {DESCRIPTION})
    @PUT
    public void configureConnector(@PathParam("connector") @ApiParam("Connector name") String str, Map<String, Object> map, @Context SecurityContext securityContext) {
        this.kafkaConnectService.configureConnector(str, delegateComputeConfig(map), securityContext.getUserPrincipal());
    }

    @Path("/connectors/{connector}")
    @Timed
    @DELETE
    @ApiOperation(value = "Delete a Connector", tags = {DESCRIPTION})
    public void deleteConnector(@PathParam("connector") @ApiParam("Connector name") String str, @Context SecurityContext securityContext) {
        this.kafkaConnectService.deleteConnector(str, securityContext.getUserPrincipal());
    }

    @Path("/connector-plugins/{connector}/config/validate")
    @Timed
    @Deprecated
    @ApiOperation(value = "Validate Connector Configuration", tags = {DESCRIPTION})
    @PUT
    public Response validateConfig(@PathParam("connector") @ApiParam("Connector name") String str, Map<String, ?> map, @Context SecurityContext securityContext) {
        return this.kafkaConnectService.proxyValidateConnectorConfig(str, map, securityContext.getUserPrincipal());
    }

    @Path("/connectors/{connector}/{operation}")
    @Timed
    @ApiOperation(value = "Pause, resume, or restart a Connector", tags = {DESCRIPTION})
    @POST
    public void connectorOperation(@PathParam("connector") @ApiParam("Connector name") String str, @PathParam("operation") @ApiParam("Operation") ConnectorOperation connectorOperation, @Context SecurityContext securityContext) {
        this.kafkaConnectService.connectorOperation(str, connectorOperation, securityContext.getUserPrincipal());
    }

    @Path("/connectors/{connector}/tasks/{task}/{operation}")
    @Timed
    @ApiOperation(value = "Restart a Task", tags = {DESCRIPTION})
    @POST
    public void taskOperation(@PathParam("connector") @ApiParam("Connector name") String str, @PathParam("task") @ApiParam("Task id") int i, @PathParam("operation") @ApiParam("Operation") TaskOperation taskOperation, @Context SecurityContext securityContext) {
        this.kafkaConnectService.taskOperation(str, i, taskOperation, securityContext.getUserPrincipal());
    }

    @GET
    @Path("/connector-plugins")
    @Timed
    @Deprecated
    @ApiOperation(value = "Return a list of connector plugins installed in the Kafka Connect cluster", response = ConnectorPluginDTO.class, responseContainer = "List", tags = {DESCRIPTION})
    public Collection<ConnectorPluginDTO> pluginConnectors(@Context SecurityContext securityContext) {
        return this.kafkaConnectService.getConnectorPlugins(securityContext.getUserPrincipal());
    }

    @GET
    @Path("/connector-plugins/{connector}/config/sample")
    @Timed
    @Deprecated
    @ApiOperation(value = "Return a sample configuration of a connector plugin", response = SamplePluginConfig.class, tags = {DESCRIPTION})
    public SamplePluginConfig pluginConfigSample(@PathParam("connector") @ApiParam("Connector name") String str, @Context SecurityContext securityContext) {
        return (SamplePluginConfig) this.kafkaConnectService.getSamplePluginConfig(str, securityContext.getUserPrincipal()).orElseThrow(() -> {
            return new NotFoundException("Connector does not exist");
        });
    }

    @GET
    @Path("/connector-templates")
    @Timed
    @ApiOperation(value = "Returns a list of connector templates that can be used for creating connectors", response = ConnectorTemplate.class, responseContainer = "List", tags = {DESCRIPTION})
    public Set<ConnectorTemplate> connectorTemplates(@Context SecurityContext securityContext) {
        return this.presenter.present(this.kafkaConnectService.getConnectorPlugins(securityContext.getUserPrincipal()), this.nifiFlowService.sinkFlows(), this.nifiFlowService.sourceFlows());
    }

    @Path("/connector-templates/config/validate")
    @Timed
    @ApiOperation(value = "Validate Template Configuration", tags = {DESCRIPTION})
    @PUT
    public ValidateTemplateConfigResponse validateTemplateConfig(@NotNull @Valid ValidateTemplateConfigRequest validateTemplateConfigRequest, @Context SecurityContext securityContext) {
        return new ValidateTemplateConfigResponse((List) enhanceAndValidateConfig(validateTemplateConfigRequest.getConnectorConfig(), securityContext.getUserPrincipal()).entrySet().stream().flatMap(entry -> {
            return ((List) entry.getValue()).stream();
        }).collect(Collectors.toList()));
    }

    @Path("/connector-templates/config/validate-detailed")
    @Timed
    @ApiOperation(value = "Validate Template Configuration", tags = {DESCRIPTION})
    @PUT
    public ValidateTemplateConfigDetailedResponse validateTemplateConfigDetailed(@NotNull ValidateTemplateConfigRequest validateTemplateConfigRequest, @Context SecurityContext securityContext) {
        if (!validateTemplateConfigRequest.isConnectorClassPresent()) {
            return singleErrorValidationResponse("connector.class", "The connector.class configuration is required.");
        }
        if (!validateTemplateConfigRequest.isSmmConfigsProperlySet()) {
            return singleErrorValidationResponse("meta.smm.predefined.flow.version", "If meta.smm.predefined.flow.name is present, then meta.smm.predefined.flow.version must be present too");
        }
        Map<String, List<String>> enhanceAndValidateConfig = enhanceAndValidateConfig(validateTemplateConfigRequest.getConnectorConfig(), securityContext.getUserPrincipal());
        return new ValidateTemplateConfigDetailedResponse(enhanceAndValidateConfig.values().stream().mapToInt((v0) -> {
            return v0.size();
        }).sum(), (Map) enhanceAndValidateConfig.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new PropertyValidationResult((List) entry.getValue());
        })));
    }

    @GET
    @Path("/connector-templates/config/sample")
    @Timed
    @ApiOperation(value = "Template Configuration", tags = {DESCRIPTION})
    public SamplePluginConfig handleGetSampleConfigForTemplate(@QueryParam("connectorPluginClass") @NotEmpty String str, @QueryParam("name") String str2, @QueryParam("version") String str3, @Context SecurityContext securityContext) {
        return this.enhancedSampleConfigsService.createEnhancedSample(str, Objects.toString(str2, str.substring(str.lastIndexOf(46) + 1)), str3, Collections.emptyMap(), securityContext.getUserPrincipal());
    }

    @Path("/connector-templates/config/enhance")
    @Timed
    @ApiOperation(value = "Enhance Provided Configuration", notes = ENHANCE_CONFIG_API_NOTES, response = SamplePluginConfig.class, tags = {DESCRIPTION})
    @POST
    public SamplePluginConfig handleEnhanceConfig(@NotNull @ApiParam("Provided configuration to expand with the missing parameters.") SamplePluginConfig samplePluginConfig, @QueryParam("connectorPluginClass") @NotEmpty String str, @Context SecurityContext securityContext) {
        return this.enhancedSampleConfigsService.createEnhancedSample(str, Objects.toString(samplePluginConfig.getConfig().get("meta.smm.predefined.flow.name"), str.substring(str.lastIndexOf(46) + 1)), Objects.toString(samplePluginConfig.getConfig().get("meta.smm.predefined.flow.version"), null), samplePluginConfig.getConfig(), securityContext.getUserPrincipal());
    }

    @GET
    @Path("/connector-permissions")
    @Timed
    @ApiOperation(value = "Connector Permissions", response = ConnectorPermissions.class, tags = {DESCRIPTION})
    public ConnectorPermissions getConnectorPermissions(@Context SecurityContext securityContext) {
        return this.kafkaConnectService.getConnectorPermissions(securityContext.getUserPrincipal());
    }

    private ValidateTemplateConfigDetailedResponse singleErrorValidationResponse(String str, String str2) {
        return new ValidateTemplateConfigDetailedResponse(1, ImmutableMap.of(str, new PropertyValidationResult(Collections.singletonList(str2))));
    }

    private Map<String, List<String>> enhanceAndValidateConfig(Map<String, Object> map, Principal principal) {
        return this.kafkaConnectService.validateConnectorConfig(delegateComputeConfig(map), principal);
    }

    private Map<String, String> delegateComputeConfig(Map<String, Object> map) {
        return this.nifiFlowService.isNiFiConnectorConfiguration(map) ? this.nifiFlowService.computeNiFiConfig(map) : this.kafkaConnectService.computeConfig(map);
    }
}
