package com.cloudera.cdx.client.impl.bulk;

import com.cloudera.cdx.client.CdxExporter;
import com.cloudera.cdx.client.ExporterConfig;
import com.cloudera.cdx.client.impl.bulk.store.LocalFileSystemRecordStore;
import com.cloudera.cdx.client.impl.bulk.store.RecordStore;
import com.cloudera.cdx.client.impl.bulk.store.SimpleFileRecordStore;
import com.cloudera.cdx.client.impl.bulk.upload.DatabusUploader;
import com.cloudera.cdx.client.impl.bulk.upload.FileUploader;
import com.cloudera.cdx.client.impl.bulk.upload.LocalFileSystemUploader;
import com.cloudera.cdx.client.impl.bulk.upload.S3Uploader;
import com.cloudera.cdx.extractor.model.DefaultStreams;
import com.cloudera.cmf.cdhclient.util.ThrottlingLogger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.IOUtils;
import org.joda.time.Duration;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cdx/client/impl/bulk/CdxBulkExporter.class */
public class CdxBulkExporter<T> implements CdxExporter<T> {
    private static Map<String, ReentrantLock> storeLocksMap = new HashMap();
    public static final DateTimeFormatter FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmmssSSS").withZoneUTC();
    private static final Logger LOG = LoggerFactory.getLogger(CdxBulkExporter.class);
    private static final Logger THROTTLED_LOG = new ThrottlingLogger(LOG, Duration.standardMinutes(15));
    private static final Map<String, String> DEFAULT_DBUS_STREAM = ImmutableMap.builder().put(DefaultStreams.OOZIE_WORKFLOWS, "OozieWorkflow").put(DefaultStreams.HIVE_QUERY_AUDITS, "HiveAudit").put(DefaultStreams.IMPALA_QUERY_PROFILE, "ImpalaQueryProfile").put(DefaultStreams.YARN_JHIST, "MrJhist").put(DefaultStreams.SPARK_EVENT_LOG, "SparkEventLog").put(DefaultStreams.SPARK2_EVENT_LOG, "SparkEventLog").put(DefaultStreams.YARN_APPS, "YarnApp").put("Sdx-details", "SdxDetails").put(DefaultStreams.HIVE_TEZ_APPS, "TezHistoryProtobuf").put(DefaultStreams.HIVE_APPS, "HiveHistoryProtobuf").put(DefaultStreams.HIVE_LLAP_APPS, "LlapHistoryProtobuf").put("cluster-metrics", "ClusterMetrics").build();
    private final String streamName;
    private final RecordStore<T> store;
    private ScheduledExecutorService executor;
    private AtomicBoolean exporting = new AtomicBoolean(false);

    /* loaded from: input_file:com/cloudera/cdx/client/impl/bulk/CdxBulkExporter$RecordStoreType.class */
    public enum RecordStoreType {
        LOCAL_SERIALIZING,
        SIMPLE_FILE
    }

    public CdxBulkExporter(String str, ExporterConfig exporterConfig) {
        this.streamName = str;
        this.store = createStore(str, exporterConfig);
        try {
            Iterator<FileUploader> it = createExporters(str, exporterConfig).iterator();
            while (it.hasNext()) {
                this.store.addSubscriber(it.next());
            }
        } catch (Exception e) {
            THROTTLED_LOG.error("Failed to create exporters", e);
            System.err.println(e);
            System.exit(-1);
        }
        scheduleExport(exporterConfig.getExportPeriod());
    }

    private void scheduleExport(long j) {
        this.executor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(CdxBulkExporter.class.getSimpleName()).build());
        this.executor.scheduleAtFixedRate(new Runnable() { // from class: com.cloudera.cdx.client.impl.bulk.CdxBulkExporter.1
            @Override // java.lang.Runnable
            public void run() {
                if (CdxBulkExporter.this.exporting.compareAndSet(false, true)) {
                    try {
                        try {
                            CdxBulkExporter.this.store.export();
                            CdxBulkExporter.this.exporting.set(false);
                        } catch (Exception e) {
                            CdxBulkExporter.THROTTLED_LOG.error("Export failed for stream: {}", CdxBulkExporter.this.streamName, e);
                            throw e;
                        }
                    } catch (Throwable th) {
                        CdxBulkExporter.this.exporting.set(false);
                        throw th;
                    }
                }
            }
        }, j, j, TimeUnit.SECONDS);
    }

    private Collection<FileUploader> createExporters(String str, ExporterConfig exporterConfig) {
        ImmutableList.Builder builder = ImmutableList.builder();
        if (exporterConfig.isDatabusUploadEnabled()) {
            String property = exporterConfig.getProperty("export.dbus.stream." + str.toLowerCase());
            if (property == null) {
                property = DEFAULT_DBUS_STREAM.get(str);
                String property2 = exporterConfig.getProperty("export.dbus.stream.prefix");
                if (property2 != null && property != null) {
                    property = property2 + property;
                }
            }
            if (property == null) {
                LOG.info("Skipping DatabusUploader for stream {}", str);
            } else {
                LOG.info("Stream {} configured to upload to Databus stream {}", str, property);
                builder.add(new DatabusUploader(str, property, exporterConfig, str));
            }
        }
        if (exporterConfig.isS3UploadEnabled()) {
            builder.add(newS3Uploader(str, exporterConfig));
        }
        if (exporterConfig.isLocalUploadEnabled()) {
            builder.add(new LocalFileSystemUploader(str, exporterConfig));
        }
        return builder.build();
    }

    @VisibleForTesting
    FileUploader newS3Uploader(String str, ExporterConfig exporterConfig) {
        return new S3Uploader(str, exporterConfig);
    }

    private RecordStore<T> createStore(String str, ExporterConfig exporterConfig) {
        RecordStoreType recordStoreType = (RecordStoreType) Preconditions.checkNotNull(exporterConfig.getStoreType());
        String str2 = str + "-" + exporterConfig.getServiceName();
        ReentrantLock reentrantLock = storeLocksMap.get(str2);
        if (reentrantLock == null) {
            reentrantLock = new ReentrantLock();
            storeLocksMap.put(str2, reentrantLock);
        }
        switch (recordStoreType) {
            case LOCAL_SERIALIZING:
                return new LocalFileSystemRecordStore(str, exporterConfig, reentrantLock);
            case SIMPLE_FILE:
                return new SimpleFileRecordStore(str, exporterConfig, reentrantLock);
            default:
                throw new IllegalArgumentException();
        }
    }

    @Override // com.cloudera.cdx.client.CdxExporter
    public void send(T t) {
        this.store.persist(t);
    }

    @Override // com.cloudera.cdx.client.CdxExporter
    public void close(long j, TimeUnit timeUnit) {
        this.executor.shutdown();
        IOUtils.closeQuietly(this.store);
        flush();
    }

    @Override // com.cloudera.cdx.client.CdxExporter
    public void flush() {
        if (this.store != null) {
            try {
                this.store.export();
            } catch (Exception e) {
                LOG.error("Metadata export failed", e);
            }
        }
    }

    @Override // com.cloudera.cdx.client.CdxExporter
    public String getStreamName() {
        return this.streamName;
    }
}
