package com.cloudera.cmf.service.hdfs;

import com.cloudera.cmf.cdhclient.CdhContext;
import com.cloudera.cmf.cdhclient.common.hdfs.FSDataInputStream;
import com.cloudera.cmf.cdhclient.common.hdfs.FileStatus;
import com.cloudera.cmf.cdhclient.common.hdfs.FileSystem;
import com.cloudera.cmf.cdhclient.common.hdfs.Path;
import com.cloudera.cmf.model.DbCommand;
import com.cloudera.cmf.model.DbRole;
import com.cloudera.cmf.model.DbService;
import com.cloudera.cmf.persist.CmfEntityManager;
import com.cloudera.cmf.service.GenericServiceCdhClient;
import com.cloudera.cmf.service.ReplicationUtils;
import com.cloudera.cmf.service.ServiceDataProvider;
import com.cloudera.cmf.service.config.ParamParseException;
import com.cloudera.cmf.service.hive.HiveCmdWork;
import com.cloudera.cmf.service.hive.HiveReplicationCommand;
import com.cloudera.cmf.service.scm.ScmParamTrackerStore;
import com.cloudera.enterprise.JsonUtil2;
import com.cloudera.enterprise.ThrottlingLogger;
import com.cloudera.enterprise.distcp.avro.FileCopyStatus;
import com.cloudera.enterprise.distcp.avro.InputPath;
import com.cloudera.server.common.CsvStreamGenerator;
import com.cloudera.server.common.ReplicationReportGenerator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.lang.StringUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cmf/service/hdfs/DistCpLogFetcher.class */
public class DistCpLogFetcher {
    private static final String HIVE_PERF_LOG_FILE_NAME = "perfLogHiveImport.log";
    private static final String HDFS_PERF_LOG_FILE_PATTERN = "perfData(.*)log";
    private static Logger LOG = LoggerFactory.getLogger(DistCpLogFetcher.class);
    private static final Logger THROTTLED_LOG = new ThrottlingLogger(LOG, Duration.standardMinutes(15));
    private static final int MAX_LIMIT = 1000;
    private final GenericServiceCdhClient hdfsClient;
    private final String listingPath;
    private final String statusPath;
    private final String hdfsPerfLogPath;
    private final String hivePerfLogPath;
    private final DbCommand command;
    private String user;
    private final FileSystem testFs;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudera/cmf/service/hdfs/DistCpLogFetcher$HdfsLogFetcher.class */
    public class HdfsLogFetcher<T> implements Callable<List<T>> {
        private final String path;
        private final Class<T> klass;
        private final long offset;
        private final int limit;

        private HdfsLogFetcher(String str, Class<T> cls, long j, int i) {
            this.path = str;
            this.klass = cls;
            this.offset = j;
            this.limit = i;
        }

        @Override // java.util.concurrent.Callable
        public List<T> call() throws IOException {
            return DistCpLogFetcher.this.testFs == null ? doTask() : readContents(DistCpLogFetcher.this.testFs);
        }

        private List<T> doTask() throws IOException {
            DistCpLogFetcher.LOG.trace("Fetching HDFS logs.");
            FileSystem fileSystem = null;
            try {
                fileSystem = CdhContext.getCurrentContext().getHadoopFactory().getFileSystem(DistCpLogFetcher.this.hdfsClient.getConfig().asStringMap());
                List<T> readContents = readContents(fileSystem);
                DistCpLogFetcher.LOG.trace("Fetched [" + readContents.size() + "] entries of HDFS logs.");
                if (fileSystem != null) {
                    fileSystem.close();
                }
                return readContents;
            } catch (Throwable th) {
                if (fileSystem != null) {
                    fileSystem.close();
                }
                throw th;
            }
        }

        private List<T> readContents(FileSystem fileSystem) throws IOException {
            DistCpLogFetcher.LOG.trace("Reading HDFS contents on path [" + this.path + "], offset [" + this.offset + "] and limit [" + this.limit + "].");
            FSDataInputStream open = fileSystem.open(this.path);
            if (open == null) {
                throw new IOException("Could not open path: " + this.path);
            }
            InputStream inputStream = open.getInputStream();
            DataFileStream dataFileStream = null;
            ArrayList newArrayList = Lists.newArrayList();
            try {
                dataFileStream = new DataFileStream(inputStream, new SpecificDatumReader(this.klass));
                DistCpLogFetcher.LOG.trace("Skipping [" + this.offset + "] entries.");
                for (long j = 0; j < this.offset && dataFileStream.hasNext(); j++) {
                    dataFileStream.next();
                }
                DistCpLogFetcher.LOG.trace("Skipped [" + this.offset + "] entries.");
                int i = 0;
                while (i < this.limit && dataFileStream.hasNext()) {
                    newArrayList.add(dataFileStream.next());
                    i++;
                }
                DistCpLogFetcher.LOG.trace("Read [" + i + "] HDFS entries.");
                if (dataFileStream != null) {
                    dataFileStream.close();
                }
                inputStream.close();
                DistCpLogFetcher.LOG.trace("HDFS contents read. Total [" + newArrayList.size() + "] entries.");
                return newArrayList;
            } catch (Throwable th) {
                if (dataFileStream != null) {
                    dataFileStream.close();
                }
                inputStream.close();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudera/cmf/service/hdfs/DistCpLogFetcher$HdfsLogFetcherCsvWriter.class */
    public class HdfsLogFetcherCsvWriter<T> implements Callable<List<T>> {
        private final String path;
        private final Class<T> klass;
        private final CsvStreamGenerator csv;
        private final ReplicationReportGenerator.CsvWriter csvWriter;
        private final BufferedWriter bufferedWriter;
        private final long limit;

        private HdfsLogFetcherCsvWriter(String str, Class<T> cls, ReplicationReportGenerator.CsvWriter csvWriter, CsvStreamGenerator csvStreamGenerator, BufferedWriter bufferedWriter, long j) {
            this.path = str;
            this.klass = cls;
            this.csv = csvStreamGenerator;
            this.bufferedWriter = bufferedWriter;
            this.csvWriter = csvWriter;
            this.limit = j;
        }

        @Override // java.util.concurrent.Callable
        public List<T> call() throws IOException {
            return DistCpLogFetcher.this.testFs == null ? initFsReadAndWriteCsv() : readContentsWriteCsv(DistCpLogFetcher.this.testFs);
        }

        private List<T> initFsReadAndWriteCsv() throws IOException {
            DistCpLogFetcher.LOG.trace("Fetching HDFS logs and writing to Csv");
            FileSystem fileSystem = null;
            try {
                fileSystem = CdhContext.getCurrentContext().getHadoopFactory().getFileSystem(DistCpLogFetcher.this.hdfsClient.getConfig().asStringMap());
                Preconditions.checkNotNull(fileSystem, "fs is Null");
                readContentsWriteCsv(fileSystem);
                if (fileSystem != null) {
                    fileSystem.close();
                }
                return Lists.newArrayList();
            } catch (Throwable th) {
                if (fileSystem != null) {
                    fileSystem.close();
                }
                throw th;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private List<T> readContentsWriteCsv(FileSystem fileSystem) throws IOException {
            FSDataInputStream open = fileSystem.open(this.path);
            if (open == null) {
                throw new IOException("Could not open path: " + this.path);
            }
            InputStream inputStream = open.getInputStream();
            DataFileStream dataFileStream = null;
            Lists.newArrayList();
            try {
                dataFileStream = new DataFileStream(inputStream, new SpecificDatumReader(this.klass));
                for (long j = 0; j < this.limit && dataFileStream.hasNext(); j++) {
                    this.csvWriter.processRow(dataFileStream.next(), this.csv.currentRow());
                    this.csv.writeLine(this.bufferedWriter);
                }
                this.bufferedWriter.flush();
                if (dataFileStream != null) {
                    dataFileStream.close();
                }
                inputStream.close();
                return Lists.newArrayList();
            } catch (Throwable th) {
                if (dataFileStream != null) {
                    dataFileStream.close();
                }
                inputStream.close();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudera/cmf/service/hdfs/DistCpLogFetcher$PerfDataFetcher.class */
    public class PerfDataFetcher implements Callable<List<String>> {
        private final String path;
        private final PerfReportType perfReportType;
        private final String fileNameOrPattern;

        private PerfDataFetcher(String str, PerfReportType perfReportType, String str2) {
            this.path = str;
            this.perfReportType = perfReportType;
            this.fileNameOrPattern = str2;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public List<String> call() throws IOException, IllegalArgumentException {
            return DistCpLogFetcher.this.testFs == null ? doTask() : readPerfData(DistCpLogFetcher.this.testFs);
        }

        private List<String> doTask() throws IOException, IllegalArgumentException {
            FileSystem fileSystem = null;
            try {
                fileSystem = CdhContext.getCurrentContext().getHadoopFactory().getFileSystem(DistCpLogFetcher.this.hdfsClient.getConfig().asStringMap());
                List<String> readPerfData = readPerfData(fileSystem);
                if (fileSystem != null) {
                    fileSystem.close();
                }
                return readPerfData;
            } catch (Throwable th) {
                if (fileSystem != null) {
                    fileSystem.close();
                }
                throw th;
            }
        }

        private List<String> readPerfData(FileSystem fileSystem) throws IOException {
            ArrayList newArrayList = Lists.newArrayList();
            ArrayList<String> newArrayList2 = Lists.newArrayList();
            Pattern compile = Pattern.compile(this.fileNameOrPattern);
            try {
                for (FileStatus fileStatus : fileSystem.listStatus(this.path)) {
                    Path path = fileStatus.getPath();
                    DistCpLogFetcher.LOG.debug("readContents: read file path: " + path.toString());
                    String name = path.getName();
                    DistCpLogFetcher.LOG.debug("readContents: read file name: " + name);
                    if (compile.matcher(name).matches()) {
                        DistCpLogFetcher.LOG.debug("readContents: file name: " + name + " is a performance log");
                        newArrayList2.add(path.toString());
                    } else {
                        DistCpLogFetcher.LOG.debug("readContents: file name: " + name + " is NOT performance log");
                    }
                }
                for (String str : newArrayList2) {
                    String str2 = null;
                    FSDataInputStream open = fileSystem.open(str);
                    if (open == null) {
                        DistCpLogFetcher.LOG.warn("readContents: could not open file: " + str);
                    } else {
                        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open.getInputStream()));
                        try {
                            for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                                if (this.perfReportType == PerfReportType.SUMMARY_PERF) {
                                    str2 = readLine;
                                } else {
                                    if (this.perfReportType != PerfReportType.FULL_PERF) {
                                        throw new IllegalArgumentException();
                                    }
                                    newArrayList.add(readLine);
                                }
                            }
                            if (str2 != null) {
                                newArrayList.add(str2);
                            }
                        } finally {
                            bufferedReader.close();
                        }
                    }
                }
                if (newArrayList.size() == 0 && DistCpLogFetcher.this.command.isActive()) {
                    newArrayList.add("No performance statistics available yet: please try again later.");
                }
                return newArrayList;
            } catch (FileNotFoundException e) {
                newArrayList.add("No performance statistics path available yet: please try again later.");
                return newArrayList;
            }
        }
    }

    /* loaded from: input_file:com/cloudera/cmf/service/hdfs/DistCpLogFetcher$PerfReportType.class */
    public enum PerfReportType {
        FULL_PERF,
        SUMMARY_PERF
    }

    public DistCpLogFetcher(ServiceDataProvider serviceDataProvider, DbCommand dbCommand) {
        this(serviceDataProvider, dbCommand, null);
    }

    @VisibleForTesting
    DistCpLogFetcher(ServiceDataProvider serviceDataProvider, DbCommand dbCommand, FileSystem fileSystem) {
        Map map;
        DbService service;
        this.user = null;
        this.command = dbCommand;
        Preconditions.checkArgument(dbCommand.getName().equals(DistCpCommand.COMMAND_NAME) || dbCommand.getName().equals(HiveReplicationCommand.COMMAND_NAME), "Not a distcp command.");
        if (StringUtils.isEmpty(dbCommand.getResultDataPath())) {
            byte[] resultData = dbCommand.getResultData();
            Preconditions.checkState(resultData != null, "Command has no result data.");
            map = (Map) JsonUtil2.valueFromBytes(HashMap.class, resultData);
        } else {
            try {
                map = (Map) JsonUtil2.valueFromStream(HashMap.class, new FileInputStream(dbCommand.getResultDataPath()));
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
        String str = null;
        if (dbCommand.getName().equals(HiveReplicationCommand.COMMAND_NAME)) {
            str = map.containsKey(HiveCmdWork.Type.DATA.name()) ? (String) ((Map) map.get(HiveCmdWork.Type.DATA.name())).get("logPath") : str;
            service = ReplicationUtils.findHdfsService(CmfEntityManager.currentCmfEntityManager(), dbCommand.getService());
        } else {
            str = (String) map.get("logPath");
            service = dbCommand.getService();
        }
        LOG.trace("DistCp logPath is [" + str + "] for command with id [" + dbCommand.getId() + "].");
        if (str == null) {
            THROTTLED_LOG.info("DistCp logPath is not set for command with id: " + dbCommand.getId());
        }
        Preconditions.checkArgument(service != null, "Failed to locate HDFS service.");
        try {
            this.user = HdfsParams.HDFS_PROCESS_USER_NAME.extractFromStringMap(service.getServiceConfigsMap(), service.getServiceVersion());
        } catch (ParamParseException e2) {
            LOG.warn("Unable to determine user, continue as we may not need it");
        }
        LOG.trace("DistCp user is [" + this.user + "].");
        this.listingPath = str != null ? str + "/source.listing" : null;
        this.statusPath = str != null ? str + "/job.status" : null;
        this.hdfsPerfLogPath = str;
        LOG.info("HDFS Perf path=" + this.hdfsPerfLogPath);
        this.hivePerfLogPath = ReplicationUtils.buildHiveLogPath(dbCommand);
        LOG.info("Hive Perf path=" + this.hivePerfLogPath);
        this.testFs = fileSystem;
        this.hdfsClient = createHdfsClient(serviceDataProvider, service);
    }

    public static GenericServiceCdhClient createHdfsClient(ServiceDataProvider serviceDataProvider, DbService dbService) {
        ScmParamTrackerStore scmParamTrackerStore = serviceDataProvider.getScmParamTrackerStore();
        try {
            if (serviceDataProvider.getServiceHandlerRegistry().get(dbService).requiresCredentials(CmfEntityManager.currentCmfEntityManager(), dbService)) {
                byte[] customKerberosKeytab = ReplicationUtils.getCustomKerberosKeytab(scmParamTrackerStore);
                String customPrincipalName = ReplicationUtils.getCustomPrincipalName(scmParamTrackerStore);
                if (customKerberosKeytab != null && customPrincipalName != null) {
                    LOG.trace("Creating secure HDFS client. principalName=[" + customPrincipalName + "]");
                    return new GenericServiceCdhClient(serviceDataProvider, dbService, customPrincipalName, customKerberosKeytab);
                }
            }
            LOG.trace("Creating unsecure HDFS client.");
            return new GenericServiceCdhClient(serviceDataProvider, dbService, ((DbRole) Preconditions.checkNotNull(ReplicationUtils.chooseDfsSourceRole(dbService, serviceDataProvider.getServiceHandlerRegistry()))).getRoleType());
        } catch (IOException e) {
            THROTTLED_LOG.error("Unable to create HDFS client: ", e);
            throw new IllegalStateException(e);
        }
    }

    public List<InputPath> readListing(long j, int i) throws IOException {
        Preconditions.checkState(!this.command.isActive(), "Command is still active.");
        LOG.debug("Reading DistCp listing with offset [" + j + "] and limit [" + i + "].");
        return this.listingPath != null ? readContents(this.listingPath, InputPath.class, j, i) : new ArrayList();
    }

    public void writeListingCsv(ReplicationReportGenerator.CsvWriter csvWriter, CsvStreamGenerator csvStreamGenerator, BufferedWriter bufferedWriter, long j) throws IOException {
        Preconditions.checkState(!this.command.isActive(), "Command is still active.");
        writeCsv(this.listingPath, InputPath.class, csvWriter, csvStreamGenerator, bufferedWriter, j);
    }

    public List<FileCopyStatus> readStatus(long j, int i) throws IOException {
        Preconditions.checkState(!this.command.isActive(), "Command is still active.");
        LOG.debug("Reading DistCp status with offset [" + j + "] and limit [" + i + "].");
        return this.statusPath != null ? readContents(this.statusPath, FileCopyStatus.class, j, i) : new ArrayList();
    }

    public void writeStatusCsv(ReplicationReportGenerator.CsvWriter csvWriter, CsvStreamGenerator csvStreamGenerator, BufferedWriter bufferedWriter, long j) throws IOException {
        Preconditions.checkState(!this.command.isActive(), "Command is still active.");
        writeCsv(this.statusPath, FileCopyStatus.class, csvWriter, csvStreamGenerator, bufferedWriter, j);
    }

    private <T> List<T> readContents(String str, Class<T> cls, long j, int i) throws IOException {
        Preconditions.checkArgument(j >= 0, "Offset must be positive.");
        Preconditions.checkArgument(i > 0, "Limit must be greater than 0.");
        int min = Math.min(1000, i);
        LOG.debug("Reading DistCp contents with path [" + str + "], offset [" + j + "] and limit [" + min + "].");
        try {
            List<T> list = (List) this.hdfsClient.runTask(new HdfsLogFetcher(str, cls, j, min)).get();
            LOG.debug("DistCp contents of size [" + list.size() + "] read successfully.");
            return list;
        } catch (InterruptedException e) {
            LOG.warn("readContents interrupted: ", e);
            return ImmutableList.of();
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            throw Throwables.propagate(cause != null ? cause : e2);
        }
    }

    private <T> void writeCsv(String str, Class<T> cls, ReplicationReportGenerator.CsvWriter csvWriter, CsvStreamGenerator csvStreamGenerator, BufferedWriter bufferedWriter, long j) throws IOException {
        try {
            this.hdfsClient.runTask(new HdfsLogFetcherCsvWriter(str, cls, csvWriter, csvStreamGenerator, bufferedWriter, j)).get();
        } catch (InterruptedException e) {
            LOG.warn("writeCsv interrupted: ", e);
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            throw Throwables.propagate(cause != null ? cause : e2);
        }
    }

    public List<String> readPerfData(PerfReportType perfReportType) throws IOException {
        LOG.debug("Reading DistCp performance data with hdfsPerfLogPath [" + this.hdfsPerfLogPath + "], reportType [" + perfReportType + "].");
        try {
            List<String> list = (List) this.hdfsClient.runTask(new PerfDataFetcher(this.hdfsPerfLogPath, perfReportType, HDFS_PERF_LOG_FILE_PATTERN)).get();
            LOG.debug("DistCp performance data of size [" + list.size() + "] read successfully.");
            return list;
        } catch (InterruptedException e) {
            LOG.warn("readHDFSPerf interrupted: ", e);
            return ImmutableList.of();
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            throw Throwables.propagate(cause != null ? cause : e2);
        }
    }

    public List<String> readHivePerfData() throws IOException {
        LOG.debug("Reading Hive performance data with hivePerfLogPath [" + this.hivePerfLogPath + "]");
        try {
            List<String> list = (List) this.hdfsClient.runTask(new PerfDataFetcher(this.hivePerfLogPath, PerfReportType.FULL_PERF, HIVE_PERF_LOG_FILE_NAME)).get();
            LOG.debug("Hive performance data of size [" + list.size() + "] read successfully.");
            return list;
        } catch (InterruptedException e) {
            LOG.warn("readHivePerf interrupted: ", e);
            return ImmutableList.of();
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            throw Throwables.propagate(cause != null ? cause : e2);
        }
    }
}
