package com.cloudera.enterprise.distcp.mapred;

import com.cloudera.enterprise.distcp.DistCpConstants;
import com.cloudera.enterprise.distcp.DistCpOptionSwitch;
import com.cloudera.enterprise.distcp.DistCpOptions;
import com.cloudera.enterprise.distcp.HdfsCopyListing;
import com.cloudera.enterprise.distcp.acllib.DistCpFileStatus;
import com.cloudera.enterprise.distcp.avro.CopyStatus;
import com.cloudera.enterprise.distcp.avro.FileCopyStatus;
import com.cloudera.enterprise.distcp.avro.InputPath;
import com.cloudera.enterprise.distcp.avro.PathType;
import com.cloudera.enterprise.distcp.mapred.CopyMapper;
import com.cloudera.enterprise.distcp.util.CdhAclXAttrsUtils;
import com.cloudera.enterprise.distcp.util.DistCpUtils;
import com.cloudera.enterprise.distcp.util.FsCache;
import com.cloudera.enterprise.distcp.util.SecurityUtils;
import com.cloudera.enterprise.distcp.util.VersionChecker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;

/* loaded from: input_file:com/cloudera/enterprise/distcp/mapred/CopyCommitter.class */
public class CopyCommitter extends FileOutputCommitter {
    private static final Log LOG = LogFactory.getLog(CopyCommitter.class);

    @VisibleForTesting
    static final String RESULT_LISTING_NAME = "source.listing";

    @VisibleForTesting
    static final String STATUS_FILE_NAME = "job.status";
    private final TaskAttemptContext taskAttemptContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudera/enterprise/distcp/mapred/CopyCommitter$InputTranslator.class */
    public static class InputTranslator implements Translator<InputPath> {
        private FileStatus status;
        private final Text relPath = new Text();
        private final DistCpFileStatus distCpFileStatus = new DistCpFileStatus();

        public InputTranslator(Configuration configuration) throws IOException {
            this.status = DistCpUtils.newFileStatus(configuration);
        }

        @Override // com.cloudera.enterprise.distcp.mapred.CopyCommitter.Translator
        public Schema getSchema() {
            return InputPath.SCHEMA$;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.cloudera.enterprise.distcp.mapred.CopyCommitter.Translator
        public InputPath next(SequenceFile.Reader reader) throws IOException {
            FileStatus nextFileStatusObject = DistCpUtils.getNextFileStatusObject(this.status, reader, this.relPath);
            this.status = nextFileStatusObject;
            if (nextFileStatusObject == null) {
                return null;
            }
            return InputPath.newBuilder().setSourcePath(this.status.getPath().toString()).setType(this.status.isSymlink() ? PathType.SYMLINK : this.status.isDirectory() ? PathType.DIRECTORY : PathType.FILE).setLength(this.status.getLen()).m26build();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudera/enterprise/distcp/mapred/CopyCommitter$StatusTranslator.class */
    public static class StatusTranslator implements Translator<FileCopyStatus> {
        private final Text path;
        private final Text message;

        private StatusTranslator() {
            this.path = new Text();
            this.message = new Text();
        }

        @Override // com.cloudera.enterprise.distcp.mapred.CopyCommitter.Translator
        public Schema getSchema() {
            return FileCopyStatus.SCHEMA$;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.cloudera.enterprise.distcp.mapred.CopyCommitter.Translator
        public FileCopyStatus next(SequenceFile.Reader reader) throws IOException {
            while (reader.next(this.path, this.message)) {
                if (!CopyMapper.PROGRESS_MARKER.equals(this.path)) {
                    CopyStatus copyStatus = CopyStatus.ERROR;
                    String text = this.message.toString();
                    if (text.startsWith("DELETED:")) {
                        text = text.substring("DELETED:".length());
                        copyStatus = CopyStatus.DELETED;
                    } else if (text.startsWith("SKIP:")) {
                        text = text.substring("SKIP:".length());
                        copyStatus = CopyStatus.SKIPPED;
                    } else if (text.startsWith("FAIL:")) {
                        text = text.substring("FAIL:".length());
                    } else {
                        CopyCommitter.LOG.warn("Unknown file copy status in output: " + text);
                    }
                    String trim = text.trim();
                    if (trim.isEmpty()) {
                        trim = null;
                    }
                    return FileCopyStatus.newBuilder().setPath(this.path.toString()).setStatus(copyStatus).setMessage(trim).m24build();
                }
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudera/enterprise/distcp/mapred/CopyCommitter$Translator.class */
    public interface Translator<OUT> {
        Schema getSchema();

        OUT next(SequenceFile.Reader reader) throws IOException;
    }

    public CopyCommitter(Path path, TaskAttemptContext taskAttemptContext) throws IOException {
        super(path, taskAttemptContext);
        this.taskAttemptContext = taskAttemptContext;
    }

    public void commitJob(JobContext jobContext) throws IOException {
        Configuration configuration = jobContext.getConfiguration();
        super.commitJob(jobContext);
        SecurityUtils.setCredPassword(jobContext.getCredentials(), configuration);
        try {
            if (!configuration.getBoolean(DistCpConstants.CONF_LABEL_DRY_RUN, false)) {
                try {
                    cleanupTempFiles(jobContext);
                    String str = configuration.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
                    boolean z = configuration.getBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
                    if ((str != null && !str.isEmpty()) || z) {
                        preserveFileAttributesForDirectories(configuration);
                    }
                    if (configuration.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
                        commitData(configuration);
                    }
                    this.taskAttemptContext.setStatus("Commit Successful");
                } catch (IOException e) {
                    LOG.debug("Failed to commit job: " + e.getMessage(), e);
                    throw e;
                }
            }
            try {
                String str2 = configuration.get(DistCpConstants.CONF_LABEL_LOG_PATH);
                if (str2 != null) {
                    saveResults(jobContext, new Path(str2), configuration);
                }
                cleanup(configuration);
            } catch (IOException e2) {
                LOG.debug("Failed to save results and cleanup: " + e2.getMessage(), e2);
                if (0 == 0) {
                    throw e2;
                }
            }
        } catch (Throwable th) {
            try {
                String str3 = configuration.get(DistCpConstants.CONF_LABEL_LOG_PATH);
                if (str3 != null) {
                    saveResults(jobContext, new Path(str3), configuration);
                }
                cleanup(configuration);
            } catch (IOException e3) {
                LOG.debug("Failed to save results and cleanup: " + e3.getMessage(), e3);
                if (0 == 0) {
                    throw e3;
                }
            }
            throw th;
        }
    }

    public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
        try {
            super.abortJob(jobContext, state);
        } finally {
            cleanupTempFiles(jobContext);
            cleanup(jobContext.getConfiguration());
        }
    }

    private void cleanupTempFiles(JobContext jobContext) {
        try {
            Configuration configuration = jobContext.getConfiguration();
            Path path = new Path(configuration.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
            FileSystem fileSystem = FsCache.get(path, configuration);
            String jobID = jobContext.getJobID().toString();
            deleteAttemptTempFiles(path, fileSystem, jobID);
            deleteAttemptTempFiles(path.getParent(), fileSystem, jobID);
        } catch (Throwable th) {
            LOG.warn("Unable to cleanup temp files", th);
        }
    }

    private void deleteAttemptTempFiles(Path path, FileSystem fileSystem, String str) throws IOException {
        FileStatus[] globStatus;
        if (path == null || (globStatus = fileSystem.globStatus(new Path(path, ".distcp.tmp." + str.replaceAll("job", "attempt") + "*"))) == null || globStatus.length <= 0) {
            return;
        }
        for (FileStatus fileStatus : globStatus) {
            LOG.info("Cleaning up " + fileStatus.getPath());
            fileSystem.delete(fileStatus.getPath(), false);
        }
    }

    private void cleanup(Configuration configuration) {
        Path path = new Path(configuration.get(DistCpConstants.CONF_LABEL_META_FOLDER));
        try {
            FileSystem fileSystem = FsCache.get(path, configuration);
            LOG.info("Cleaning up temporary work folder: " + path);
            fileSystem.delete(path, true);
        } catch (IOException e) {
            LOG.error("Exception encountered ", e);
        }
    }

    private void preserveFileAttributesForDirectories(Path path, Configuration configuration) throws IOException {
        EnumSet<DistCpOptions.FileAttribute> unpackAttributes = DistCpUtils.unpackAttributes(configuration.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS));
        boolean z = configuration.getBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
        boolean z2 = configuration.getBoolean(DistCpConstants.CONF_LABEL_ACLS, false);
        boolean z3 = configuration.getBoolean(DistCpConstants.CONF_LABEL_SKIP_ACL_ERR, false);
        boolean z4 = configuration.getBoolean(DistCpConstants.CONF_LABEL_IGNORE_MISSING_FILES, false);
        FileSystem fileSystem = FsCache.get(path, configuration);
        SequenceFile.Reader reader = new SequenceFile.Reader(configuration, new SequenceFile.Reader.Option[]{SequenceFile.Reader.file(path)});
        long len = fileSystem.getFileStatus(path).getLen();
        Path path2 = new Path(configuration.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
        boolean isCloudTarget = DistCpUtils.isCloudTarget(configuration);
        ObjectMapper objectMapper = new ObjectMapper();
        long j = 0;
        try {
            FileStatus distCpFileStatus = new DistCpFileStatus();
            FileStatus newFileStatus = DistCpUtils.newFileStatus(configuration);
            Text text = new Text();
            boolean z5 = reader.getValueClass() == DistCpFileStatus.class;
            FileStatus fileStatus = z5 ? distCpFileStatus : newFileStatus;
            while (reader.next(text, fileStatus)) {
                FsPermission fsPermission = null;
                if (z5) {
                    newFileStatus = distCpFileStatus.toFileStatusObject(VersionChecker.isContextCdhPre60());
                    fsPermission = CdhAclXAttrsUtils.getPermissionExtension(distCpFileStatus.getPermissionExtension());
                }
                if (newFileStatus.isDirectory()) {
                    Path path3 = new Path(path2.toString() + "/" + text);
                    if (!path2.equals(path3)) {
                        FileSystem fileSystem2 = FsCache.get(path3, configuration);
                        if (isCloudTarget) {
                            try {
                                DistCpUtils.preserveDirAttributesOnCloud(newFileStatus, fileSystem2, path3, configuration, objectMapper);
                            } catch (FileNotFoundException e) {
                                if (!z4) {
                                    throw e;
                                }
                            }
                        } else {
                            DistCpUtils.preserve(fileSystem2, path3, newFileStatus, fsPermission, unpackAttributes, z2, z3, z);
                        }
                        j++;
                        this.taskAttemptContext.progress();
                        this.taskAttemptContext.setStatus("Preserving status on directory entries. [" + ((reader.getPosition() * 100) / len) + "%]");
                    }
                }
            }
            LOG.info("Preserved status on " + j + " dir entries on target");
        } finally {
            IOUtils.closeStream(reader);
        }
    }

    private void preserveFileAttributesForDirectories(Configuration configuration) throws IOException {
        if (configuration.getBoolean(DistCpConstants.CONF_LABEL_REBASE_SOURCES, false)) {
            LOG.info("About to preserve attributes: " + configuration.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS));
            preserveFileAttributesForDirectories(new Path(configuration.get(DistCpConstants.CONF_LABEL_PARENT_DIR_LISTING_FILE_PATH)), configuration);
        }
    }

    private void commitData(Configuration configuration) throws IOException {
        Path path = new Path(configuration.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
        Path path2 = new Path(configuration.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
        FileSystem fileSystem = FsCache.get(path, configuration);
        LOG.info("Atomic commit enabled. Moving " + path + " to " + path2);
        if (fileSystem.exists(path2) && fileSystem.exists(path)) {
            LOG.error("Pre-existing final-path found at: " + path2);
            throw new IOException("Target-path can't be committed to because it exists at " + path2 + ". Copied data is in temp-dir: " + path + ". ");
        }
        boolean rename = fileSystem.rename(path, path2);
        if (!rename) {
            LOG.warn("Rename failed. Perhaps data already moved. Verifying...");
            rename = fileSystem.exists(path2) && !fileSystem.exists(path);
        }
        if (!rename) {
            LOG.error("Unable to commit data to " + path2);
            throw new IOException("Atomic commit failed. Temporary data in " + path + ", Unable to move to " + path2);
        }
        LOG.info("Data committed successfully to " + path2);
        this.taskAttemptContext.setStatus("Data committed successfully to " + path2);
    }

    private void saveResults(JobContext jobContext, Path path, Configuration configuration) throws IOException {
        FileSystem fileSystem = FsCache.get(path, configuration);
        createListingFile(path, configuration, fileSystem);
        createStatusFile(jobContext, path, configuration, fileSystem);
    }

    private void createListingFile(Path path, Configuration configuration, FileSystem fileSystem) throws IOException {
        Path path2 = new Path(configuration.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
        FSDataOutputStream create = fileSystem.create(new Path(path, RESULT_LISTING_NAME));
        InputTranslator inputTranslator = new InputTranslator(configuration);
        try {
            DataFileWriter create2 = new DataFileWriter(new SpecificDatumWriter(inputTranslator.getSchema())).create(inputTranslator.getSchema(), create);
            if (fileSystem.exists(path2)) {
                translateSequence(configuration, path2, new InputTranslator(configuration), create2, null);
            } else {
                LOG.debug("Not translating non-existent file: " + path2.toString());
            }
            IOUtils.closeStream(create2);
        } catch (Throwable th) {
            IOUtils.closeStream((Closeable) null);
            throw th;
        }
    }

    private void createStatusFile(JobContext jobContext, Path path, Configuration configuration, FileSystem fileSystem) throws IOException {
        FSDataOutputStream create = fileSystem.create(new Path(path, STATUS_FILE_NAME));
        Path outputPath = CopyOutputFormat.getOutputPath(jobContext);
        Path parent = configuration.getBoolean(DistCpOptionSwitch.COPY_LISTING_ON_SOURCE.getConfigLabel(), false) ? new Path(configuration.get(DistCpConstants.CONF_LABEL_DIFF_FILE_PATH)).getParent() : path;
        StatusTranslator statusTranslator = new StatusTranslator();
        DataFileWriter dataFileWriter = null;
        try {
            dataFileWriter = new DataFileWriter(new SpecificDatumWriter(statusTranslator.getSchema())).create(statusTranslator.getSchema(), create);
            writeToStatusFile(configuration, fileSystem, outputPath, "part-r-*", null, "reducer output", statusTranslator, dataFileWriter);
            writeToStatusFile(configuration, fileSystem, parent, HdfsCopyListing.PART_R_LISTING_ERRORS, this.taskAttemptContext, "source path error", statusTranslator, dataFileWriter);
            IOUtils.closeStream(dataFileWriter);
        } catch (Throwable th) {
            IOUtils.closeStream(dataFileWriter);
            throw th;
        }
    }

    private static void writeToStatusFile(Configuration configuration, FileSystem fileSystem, Path path, String str, TaskAttemptContext taskAttemptContext, String str2, StatusTranslator statusTranslator, DataFileWriter<StatusTranslator> dataFileWriter) throws IOException {
        FileStatus[] globStatus = fileSystem.globStatus(new Path(path, str));
        if (globStatus == null || globStatus.length <= 0) {
            LOG.warn("Could not find " + str2 + " files.");
            return;
        }
        translateSequence(configuration, globStatus[0].getPath(), statusTranslator, dataFileWriter, taskAttemptContext);
        if (globStatus.length > 1) {
            LOG.warn("Unexpected number of " + str2 + " files, ignoring leftovers.");
        }
    }

    private static <T> void translateSequence(Configuration configuration, Path path, Translator<T> translator, DataFileWriter dataFileWriter, TaskAttemptContext taskAttemptContext) throws IOException {
        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(configuration, new SequenceFile.Reader.Option[]{SequenceFile.Reader.file(path)});
            while (true) {
                T next = translator.next(reader);
                if (next == null) {
                    dataFileWriter.flush();
                    IOUtils.closeStream(reader);
                    return;
                } else {
                    dataFileWriter.append(next);
                    if (taskAttemptContext != null) {
                        taskAttemptContext.getCounter(CopyMapper.Counter.FAIL).increment(1L);
                    }
                }
            }
        } catch (Throwable th) {
            IOUtils.closeStream(reader);
            throw th;
        }
    }
}
