package com.cloudera.cdx.client.impl.bulk.upload;

import com.cloudera.altus.authentication.credentials.BasicAltusCredentials;
import com.cloudera.altus.authentication.credentials.StaticAltusCredentialsProvider;
import com.cloudera.altus.client.AltusClientConfigurationBuilder;
import com.cloudera.altus.dbus.api.DbusClient;
import com.cloudera.altus.dbus.api.DbusClientBuilder;
import com.cloudera.altus.dbus.model.PutRecordRequest;
import com.cloudera.altus.dbus.model.PutRecordResponse;
import com.cloudera.altus.dbus.model.Record;
import com.cloudera.cdx.client.AltusCredentials;
import com.cloudera.cdx.client.ExporterConfig;
import com.cloudera.cdx.extractor.model.TelemetryPublisherCountersMap;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
import java.util.TimeZone;
import org.apache.commons.lang.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cdx/client/impl/bulk/upload/DatabusUploader.class */
public class DatabusUploader implements FileUploader {
    private final String dbusStreamName;
    private final String configuredDbusStreamName;
    private final ExporterConfig config;
    private final String cdxStream;
    private final DefaultHttpClient httpClient = new DefaultHttpClient();
    private static final Logger LOG = LoggerFactory.getLogger(DatabusUploader.class);
    private static final String TZ = TimeZone.getDefault().getID();
    private static final TelemetryPublisherCountersMap tpCounters = TelemetryPublisherCountersMap.getInstance();

    public DatabusUploader(String str, String str2, ExporterConfig exporterConfig, String str3) {
        HttpParams params = this.httpClient.getParams();
        LOG.debug("Setting the DatabusUploader Connection Timeout to " + exporterConfig.getDatabusConnectionTimeout());
        LOG.debug("Setting the DatabusUploader Socket Timeout to " + exporterConfig.getDatabusSocketTimeout());
        HttpConnectionParams.setConnectionTimeout(params, (int) exporterConfig.getDatabusConnectionTimeout());
        HttpConnectionParams.setSoTimeout(params, (int) exporterConfig.getDatabusSocketTimeout());
        this.httpClient.setParams(params);
        this.dbusStreamName = (String) Preconditions.checkNotNull(str);
        this.configuredDbusStreamName = (String) Preconditions.checkNotNull(str2);
        this.config = exporterConfig;
        this.cdxStream = (String) Preconditions.checkNotNull(str3);
        Preconditions.checkArgument(exporterConfig.getDatabusUrl().isPresent());
        Preconditions.checkArgument(exporterConfig.getAltusCredentials().isPresent());
        this.httpClient.setHttpRequestRetryHandler(new StandardHttpRequestRetryHandler(5, false));
        if (exporterConfig.isProxySupportEnabled()) {
            String proxyServer = exporterConfig.getProxyServer();
            int proxyPort = exporterConfig.getProxyPort();
            HttpHost httpHost = new HttpHost(proxyServer, proxyPort);
            if (exporterConfig.getProxyUser() != null && exporterConfig.getProxyPassword() != null) {
                UsernamePasswordCredentials usernamePasswordCredentials = new UsernamePasswordCredentials(exporterConfig.getProxyUser(), exporterConfig.getProxyPassword());
                AuthScope authScope = new AuthScope(proxyServer, proxyPort);
                BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
                basicCredentialsProvider.setCredentials(authScope, usernamePasswordCredentials);
                this.httpClient.setCredentialsProvider(basicCredentialsProvider);
            }
            this.httpClient.getParams().setParameter("http.route.default-proxy", httpHost);
        }
    }

    @VisibleForTesting
    DbusClient getApiClient(ExporterConfig exporterConfig) {
        AltusClientConfigurationBuilder withClientApplicationName = AltusClientConfigurationBuilder.defaultBuilder().withClientApplicationName("DatabusUploader");
        if (exporterConfig.isProxySupportEnabled()) {
            withClientApplicationName.withProxyUri(String.format("%s://%s:%d", exporterConfig.getProxyScheme(), exporterConfig.getProxyServer(), Integer.valueOf(exporterConfig.getProxyPort()))).withProxyUsername(exporterConfig.getProxyUser()).withProxyPassword(exporterConfig.getProxyPassword());
        }
        return DbusClientBuilder.defaultBuilder().withClientConfiguration(withClientApplicationName.build()).withCredentials(new StaticAltusCredentialsProvider(new BasicAltusCredentials(((AltusCredentials) exporterConfig.getAltusCredentials().get()).getAccessKeyId(), ((AltusCredentials) exporterConfig.getAltusCredentials().get()).getPrivateKey()))).withEndPoint((String) exporterConfig.getDatabusUrl().get()).build();
    }

    @Override // com.cloudera.cdx.client.impl.bulk.upload.FileUploader
    public void export(String str) {
        File file = new File(str);
        DbusClient apiClient = getApiClient(this.config);
        PutRecordRequest createUploadRequest = createUploadRequest(str);
        LOG.debug("Headers: {}", createUploadRequest.getRecord().getHeaders());
        if (!file.exists()) {
            LOG.warn("File {} does not exist any more. Possible that the file is pushed already. Will not create upload request.", str);
            return;
        }
        PutRecordResponse putRecord = apiClient.putRecord(createUploadRequest);
        LOG.debug("Record ID : {}", putRecord.getRecord().getRecordId());
        LOG.debug("Status : {}", putRecord.getRecord().getStatus());
        LOG.debug("Upload URL : {}", putRecord.getRecord().getUploadUrl());
        LOG.debug("Record size : {}", createUploadRequest.getRecord().getPayloadSize());
        try {
            URI create = URI.create(putRecord.getRecord().getUploadUrl());
            if (!file.exists()) {
                LOG.warn("File {} does not exist any more. Possible that the file is pushed already. Will not upload the payload.", str);
                return;
            }
            int putPayload = putPayload(create, file);
            tpCounters.get(this.dbusStreamName).incrementExportedDataSize(createUploadRequest.getRecord().getPayloadSize().longValue());
            tpCounters.getDataSizeBean().incrementExportedDataSize(createUploadRequest.getRecord().getPayloadSize().longValue());
            LOG.debug("Status code : {}", Integer.valueOf(putPayload));
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    private int putPayload(URI uri, File file) throws IOException {
        HttpPut httpPut = new HttpPut(uri);
        httpPut.setEntity(new FileEntity(file));
        CloseableHttpResponse execute = this.httpClient.execute((HttpUriRequest) httpPut);
        int statusCode = execute.getStatusLine().getStatusCode();
        if (statusCode == 307) {
            Header[] headers = execute.getHeaders("Location");
            if (headers.length > 0) {
                String value = headers[0].getValue();
                if (value.startsWith("https://")) {
                    value = value.substring(8);
                } else if (value.startsWith("http://")) {
                    value = value.substring(7);
                }
                LOG.warn("Received redirect to {}", value);
                try {
                    try {
                        int putPayload = putPayload(new URI(uri.getScheme(), uri.getUserInfo(), value, uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment()), file);
                        EntityUtils.consumeQuietly(execute.getEntity());
                        return putPayload;
                    } catch (URISyntaxException e) {
                        LOG.error("Could not create redirect URL: {}", e.getMessage());
                        EntityUtils.consumeQuietly(execute.getEntity());
                    }
                } catch (Throwable th) {
                    EntityUtils.consumeQuietly(execute.getEntity());
                    throw th;
                }
            } else {
                LOG.error("Received HTTP redirect with empty Location header.");
            }
        }
        EntityUtils.consumeQuietly(execute.getEntity());
        return statusCode;
    }

    private PutRecordRequest createUploadRequest(String str) {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        Record record = new Record();
        record.setStreamName(this.configuredDbusStreamName);
        if (StringUtils.isEmpty(this.config.getCmClusterUuid())) {
            throw new RuntimeException("No cluster UUID, unable to upload.");
        }
        record.setPartitionKey(this.config.getCmClusterUuid());
        record.setPayloadSize(Long.valueOf(Paths.get(str, new String[0]).toFile().length()));
        try {
            addHeaders(record);
            putRecordRequest.setRecord(record);
            return putRecordRequest;
        } catch (JsonProcessingException e) {
            throw Throwables.propagate(e);
        }
    }

    private void addHeaders(Record record) throws JsonProcessingException {
        List<com.cloudera.altus.dbus.model.Header> headers = record.getHeaders();
        headers.add(header("format", this.config.getFileFormat().name()));
        headers.add(header("compression", this.config.getCompressionType().name()));
        headers.add(header("cm-id", this.config.getCmId()));
        headers.add(header("cdx-stream", this.cdxStream));
        headers.add(header("cluster-type", this.config.getClusterType()));
        headers.add(header("time-zone", TZ));
        if (StringUtils.isNotEmpty(this.config.getClusterId())) {
            headers.add(header("cluster-id", this.config.getCmClusterUuid()));
        }
        if (StringUtils.isNotEmpty(this.config.getServiceId())) {
            headers.add(header("Service-id", this.config.getServiceId()));
        }
        if (StringUtils.isNotEmpty(this.config.getServiceType())) {
            headers.add(header("service-type", this.config.getServiceType()));
        }
        if (this.config.getVersion() != null) {
            headers.add(header("cdh-version", this.config.getVersion().toString()));
        }
        if (StringUtils.isNotEmpty(this.config.getClusterDisplayName())) {
            headers.add(header("cluster-display-name", this.config.getClusterDisplayName()));
        }
        addDatabusHeaders(headers);
    }

    private void addDatabusHeaders(List<com.cloudera.altus.dbus.model.Header> list) throws JsonProcessingException {
        Properties databusHeaderProperties = this.config.getDatabusHeaderProperties();
        Enumeration<?> propertyNames = databusHeaderProperties.propertyNames();
        while (propertyNames.hasMoreElements()) {
            String str = (String) propertyNames.nextElement();
            LOG.debug("Processing Databus Header: [" + str + "]");
            list.add(header(str, databusHeaderProperties.getProperty(str)));
        }
    }

    @Override // com.cloudera.cdx.client.impl.bulk.upload.FileUploader
    public String getStreamName() {
        return this.cdxStream;
    }

    @Override // com.cloudera.cdx.client.impl.bulk.upload.FileUploader
    public boolean isDataExportedSizeThresholdLimitReached() {
        return tpCounters.getDataSizeBean().isDataExportedSizeThresholdLimitReached();
    }

    private com.cloudera.altus.dbus.model.Header header(String str, String str2) {
        com.cloudera.altus.dbus.model.Header header = new com.cloudera.altus.dbus.model.Header();
        header.setName(str);
        header.setValue(str2);
        return header;
    }
}
