package com.cloudera.nav.hdfs.extractor;

import com.cloudera.cmf.cdhclient.CdhContext;
import com.cloudera.cmf.cdhclient.CdhHadoopObjectFactory;
import com.cloudera.cmf.cdhclient.HadoopConfiguration;
import com.cloudera.cmf.cdhclient.common.hdfs.FileStatus;
import com.cloudera.cmf.cdhclient.common.hdfs.FileSystem;
import com.cloudera.cmf.cdhclient.common.security.UserGroupInformation;
import com.cloudera.nav.VersionData;
import com.cloudera.nav.core.model.Source;
import com.cloudera.nav.extract.EntityFilters;
import com.cloudera.nav.extract.Util;
import com.cloudera.nav.hdfs.HdfsCSVGeneratorContext;
import com.cloudera.nav.hdfs.HdfsExtractorUtils;
import com.cloudera.nav.server.NavOptions;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Stack;
import java.util.function.Predicate;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/nav/hdfs/extractor/HDFSFIlePublisherShim.class */
public class HDFSFIlePublisherShim implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(HDFSFIlePublisherShim.class);
    private final String superuser;
    private final String hdfsName;
    private final String nameService;
    private final HadoopConfiguration config;
    private final NavOptions options;
    private final EntityFilters filters;
    private final VersionString cdhFullVersion;
    private final Source source;

    public HDFSFIlePublisherShim(String str, String str2, String str3, Source source, EntityFilters entityFilters, HadoopConfiguration hadoopConfiguration, NavOptions navOptions, String str4) {
        this.superuser = str;
        this.hdfsName = str2;
        this.nameService = str3;
        this.source = source;
        this.config = hadoopConfiguration;
        this.options = navOptions;
        this.filters = entityFilters;
        this.cdhFullVersion = VersionString.of(str4);
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Small Files will be uploaded to the service {}", this.hdfsName);
        CdhHadoopObjectFactory hadoopFactory = Util.getCdhContext(this.options).getHadoopFactory();
        String str = this.nameService != null ? this.hdfsName + this.nameService : this.hdfsName;
        try {
            UserGroupInformation ugi = HdfsExtractorUtils.getUgi(hadoopFactory, this.config, this.superuser);
            HdfsCSVGeneratorContext hdfsCSVGeneratorContext = new HdfsCSVGeneratorContext(this.source, this.filters, this.options, this.cdhFullVersion);
            ugi.reloginFromKeytab();
            LOG.debug("Starting initial bulk extraction for hdfsService {}, nameService {}", str, this.nameService);
            uploadFiles(hdfsCSVGeneratorContext, ugi);
        } catch (Exception e) {
            LOG.error("Internal Error while extracting ", e);
        }
    }

    void uploadFiles(HdfsCSVGeneratorContext hdfsCSVGeneratorContext, UserGroupInformation userGroupInformation) throws InterruptedException, IOException {
        try {
            Preconditions.checkNotNull(userGroupInformation);
            userGroupInformation.doAs(new PrivilegedExceptionAction<Void>() { // from class: com.cloudera.nav.hdfs.extractor.HDFSFIlePublisherShim.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedExceptionAction
                public Void run() throws Exception {
                    FileSystem fileSystem = CdhContext.getCurrentContext().getHadoopFactory().getFileSystem(HDFSFIlePublisherShim.this.config.asStringMap());
                    try {
                        String hDFSTargetPath = getHDFSTargetPath();
                        createSmallfilesRootDirectoryIfDoesNotExist(fileSystem, hDFSTargetPath);
                        deleteOldTempFiles(fileSystem, hDFSTargetPath);
                        long currentTimeMillis = System.currentTimeMillis();
                        HDFSFIlePublisherShim.LOG.info("Uploading the files with time stamp {}", Long.valueOf(currentTimeMillis));
                        uploadFilesToHDFS(fileSystem, hDFSTargetPath, getSourcePath(), currentTimeMillis);
                        HDFSFIlePublisherShim.LOG.info("Small Files upload completed successfully with time stamp {}.", Long.valueOf(currentTimeMillis));
                        return null;
                    } catch (Throwable th) {
                        HDFSFIlePublisherShim.LOG.error("Error encountered in running the file publisher", th);
                        throw Throwables.propagate(th);
                    }
                }

                private void deleteOldTempFiles(FileSystem fileSystem, String str) {
                    HDFSFIlePublisherShim.LOG.info("Deleting old temporary files on HDFS at {}.", str);
                    try {
                        for (FileStatus fileStatus : fileSystem.listStatus(str)) {
                            if (fileStatus.getPath().toString().endsWith(HdfsCSVGeneratorShim.TEMP_FILE_SUFFIX)) {
                                fileSystem.delete(fileStatus.getPath().toString(), false);
                            }
                        }
                    } catch (IOException e) {
                        HDFSFIlePublisherShim.LOG.error("Error encountered in deleting temporary files on HDFS at {}.", str, e);
                    }
                }

                private void createSmallfilesRootDirectoryIfDoesNotExist(FileSystem fileSystem, String str) throws IOException, FileNotFoundException {
                    if (fileSystem.exists(str)) {
                        return;
                    }
                    fileSystem.mkdirs(str);
                }

                protected File getSourcePath() {
                    return HDFSFIlePublisherShim.this.options.getSmallFilesCSVFilesRoot();
                }

                protected String getHDFSTargetPath() {
                    return HDFSFIlePublisherShim.this.options.getHdfsTargetRootPathForSmallFiles();
                }

                private void uploadFilesToHDFS(FileSystem fileSystem, String str, File file, long j) throws IOException {
                    Path path = file.toPath();
                    Stack stack = new Stack();
                    stack.add(path);
                    while (!stack.isEmpty()) {
                        for (Path path2 : Files.list((Path) stack.pop()).filter(new Predicate<Path>() { // from class: com.cloudera.nav.hdfs.extractor.HDFSFIlePublisherShim.1.1
                            @Override // java.util.function.Predicate
                            public boolean test(Path path3) {
                                return path3.toString().endsWith(HdfsCSVGeneratorShim.ZIP_EXTENSION);
                            }
                        })) {
                            if (Files.isDirectory(path2, new LinkOption[0])) {
                                stack.push(path2);
                                createDirectoryInHdfs(fileSystem, str, path2, file);
                            } else {
                                uploadFileToHDFS(fileSystem, str, path2, file, j);
                            }
                        }
                    }
                }

                /* JADX WARN: Finally extract failed */
                private void uploadFileToHDFS(FileSystem fileSystem, String str, Path path, File file, long j) throws IOException {
                    ZipOutputStream zipOutputStream;
                    Throwable th;
                    WritableByteChannel newChannel;
                    Throwable th2;
                    String hdfsPath = getHdfsPath(str, file, path);
                    String str2 = hdfsPath + HdfsCSVGeneratorShim.TEMP_FILE_SUFFIX;
                    try {
                        zipOutputStream = new ZipOutputStream(fileSystem.create(str2, true));
                        th = null;
                        try {
                            newChannel = Channels.newChannel(zipOutputStream);
                            th2 = null;
                        } catch (Throwable th3) {
                            if (zipOutputStream != null) {
                                if (0 != 0) {
                                    try {
                                        zipOutputStream.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    zipOutputStream.close();
                                }
                            }
                            throw th3;
                        }
                    } catch (Throwable th5) {
                        HDFSFIlePublisherShim.LOG.error("Error encountered in uploading the file {}. Will attempt to delete this temporary file.", str2, th5);
                        if (fileSystem.exists(str2)) {
                            if (fileSystem.delete(str2, false)) {
                                HDFSFIlePublisherShim.LOG.trace("Deleted the incompletely loaded file {}", str2);
                            } else {
                                HDFSFIlePublisherShim.LOG.warn("Error encountered in deleting the incompletely uploaded file {}. Will be deleted in next iteration.", str2);
                            }
                        }
                    }
                    try {
                        try {
                            List<String> addDataFile = addDataFile(zipOutputStream, newChannel, path);
                            HDFSFIlePublisherShim.LOG.debug("added file {} to {} successfully.", path.toString(), str2);
                            addUpLoadInformationFile(zipOutputStream, j, newChannel, addDataFile);
                            HDFSFIlePublisherShim.LOG.debug("added upload information file {} successfully.", str2);
                            if (newChannel != null) {
                                if (0 != 0) {
                                    try {
                                        newChannel.close();
                                    } catch (Throwable th6) {
                                        th2.addSuppressed(th6);
                                    }
                                } else {
                                    newChannel.close();
                                }
                            }
                            if (zipOutputStream != null) {
                                if (0 != 0) {
                                    try {
                                        zipOutputStream.close();
                                    } catch (Throwable th7) {
                                        th.addSuppressed(th7);
                                    }
                                } else {
                                    zipOutputStream.close();
                                }
                            }
                            if (!fileSystem.rename(str2, hdfsPath)) {
                                HDFSFIlePublisherShim.LOG.error("Unknown Error encountered in renaming the file from {} to {} ", str2, hdfsPath);
                            }
                            HDFSFIlePublisherShim.LOG.debug("renamed file {} to {} successfully.", str2.toString(), hdfsPath);
                        } finally {
                        }
                    } catch (Throwable th8) {
                        if (newChannel != null) {
                            if (th2 != null) {
                                try {
                                    newChannel.close();
                                } catch (Throwable th9) {
                                    th2.addSuppressed(th9);
                                }
                            } else {
                                newChannel.close();
                            }
                        }
                        throw th8;
                    }
                }

                private void addUpLoadInformationFile(ZipOutputStream zipOutputStream, long j, WritableByteChannel writableByteChannel, List<String> list) throws IOException {
                    zipOutputStream.putNextEntry(new ZipEntry("uploadDetails.info"));
                    StringBuilder sb = new StringBuilder();
                    sb.append("navVersion=").append(VersionData.getVersion()).append(System.lineSeparator());
                    sb.append("uploadTime=").append(j).append(System.lineSeparator());
                    sb.append("totalFiles=").append(list.size()).append(System.lineSeparator());
                    int i = 0;
                    Iterator<String> it = list.iterator();
                    while (it.hasNext()) {
                        int i2 = i;
                        i++;
                        sb.append("file.").append(i2).append("=").append(it.next()).append(System.lineSeparator());
                    }
                    ByteBuffer wrap = ByteBuffer.wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
                    while (wrap.hasRemaining()) {
                        writableByteChannel.write(wrap);
                    }
                }

                private List<String> addDataFile(ZipOutputStream zipOutputStream, WritableByteChannel writableByteChannel, Path path) throws IOException {
                    ArrayList newArrayList = Lists.newArrayList();
                    ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(path.toFile()));
                    Throwable th = null;
                    try {
                        try {
                            for (ZipEntry nextEntry = zipInputStream.getNextEntry(); nextEntry != null; nextEntry = zipInputStream.getNextEntry()) {
                                HDFSFIlePublisherShim.LOG.debug("Adding zip entry {} to the file.", nextEntry.getName());
                                ReadableByteChannel newChannel = Channels.newChannel(zipInputStream);
                                zipOutputStream.putNextEntry(nextEntry);
                                fastChannelCopy(newChannel, writableByteChannel);
                                newArrayList.add(nextEntry.getName());
                            }
                            if (zipInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        zipInputStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    zipInputStream.close();
                                }
                            }
                            return newArrayList;
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (zipInputStream != null) {
                            if (th != null) {
                                try {
                                    zipInputStream.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                zipInputStream.close();
                            }
                        }
                        throw th3;
                    }
                }

                private void fastChannelCopy(ReadableByteChannel readableByteChannel, WritableByteChannel writableByteChannel) throws IOException {
                    ByteBuffer allocateDirect = ByteBuffer.allocateDirect(16384);
                    while (readableByteChannel.read(allocateDirect) != -1) {
                        allocateDirect.flip();
                        writableByteChannel.write(allocateDirect);
                        allocateDirect.compact();
                    }
                    allocateDirect.flip();
                    while (allocateDirect.hasRemaining()) {
                        writableByteChannel.write(allocateDirect);
                    }
                }

                private void createDirectoryInHdfs(FileSystem fileSystem, String str, Path path, File file) throws IOException {
                    String hdfsPath = getHdfsPath(str, file, path);
                    HDFSFIlePublisherShim.LOG.info("Creating directory {}.", hdfsPath);
                    fileSystem.mkdirs(hdfsPath);
                }

                private String getHdfsPath(String str, File file, Path path) {
                    return String.format("%s/%d_%s", str, Long.valueOf(System.currentTimeMillis()), path.toFile().getAbsolutePath().substring(file.getAbsolutePath().length() + 1));
                }
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    CdhHadoopObjectFactory getHadoopObjectFactory(NavOptions navOptions) {
        return Util.getCdhContext(navOptions).getHadoopFactory();
    }
}
