package org.apache.crunch.lib;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Proxy;
import javassist.util.proxy.MethodFilter;
import javassist.util.proxy.MethodHandler;
import javassist.util.proxy.ProxyFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.ReflectionUtils;

/* loaded from: input_file:lib/crunch-core-0.11.0.7.1.7.0-551.jar:org/apache/crunch/lib/Mapreduce.class */
public class Mapreduce {
    private static final OutputCommitter NO_OP_OUTPUT_COMMITTER = new OutputCommitter() { // from class: org.apache.crunch.lib.Mapreduce.1
        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
            return false;
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public void setupJob(JobContext jobContext) throws IOException {
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/crunch-core-0.11.0.7.1.7.0-551.jar:org/apache/crunch/lib/Mapreduce$CtxtMethodHandler.class */
    public static class CtxtMethodHandler implements MethodHandler {
        public static final MethodFilter FILTER = new MethodFilter() { // from class: org.apache.crunch.lib.Mapreduce.CtxtMethodHandler.1
            @Override // javassist.util.proxy.MethodFilter
            public boolean isHandled(Method method) {
                return true;
            }
        };
        private final TaskInputOutputContext ctxt;
        private Emitter emitter;

        public CtxtMethodHandler(TaskInputOutputContext taskInputOutputContext) {
            this.ctxt = taskInputOutputContext;
        }

        public void set(Emitter emitter) {
            this.emitter = emitter;
        }

        @Override // javassist.util.proxy.MethodHandler
        public Object invoke(Object obj, Method method, Method method2, Object[] objArr) throws Throwable {
            if (!"write".equals(method.getName())) {
                return method.invoke(this.ctxt, objArr);
            }
            this.emitter.emit(Pair.of(objArr[0], objArr[1]));
            return null;
        }
    }

    /* loaded from: input_file:lib/crunch-core-0.11.0.7.1.7.0-551.jar:org/apache/crunch/lib/Mapreduce$MapperFn.class */
    private static class MapperFn<K1, V1, K2 extends Writable, V2 extends Writable> extends DoFn<Pair<K1, V1>, Pair<K2, V2>> {
        private final Class<? extends Mapper<K1, V1, K2, V2>> mapperClass;
        private transient Mapper<K1, V1, K2, V2> instance;
        private transient Mapper.Context context;
        private transient CtxtMethodHandler handler;
        private transient Method setupMethod;
        private transient Method mapMethod;
        private transient Method cleanupMethod;

        public MapperFn(Class<? extends Mapper<K1, V1, K2, V2>> cls) {
            this.mapperClass = (Class) Preconditions.checkNotNull(cls);
        }

        @Override // org.apache.crunch.DoFn
        public void initialize() {
            if (this.instance == null) {
                this.instance = (Mapper) ReflectionUtils.newInstance(this.mapperClass, getConfiguration());
                try {
                    for (Method method : this.mapperClass.getDeclaredMethods()) {
                        if ("setup".equals(method.getName())) {
                            this.setupMethod = method;
                            this.setupMethod.setAccessible(true);
                        } else if ("cleanup".equals(method.getName())) {
                            this.cleanupMethod = method;
                            this.cleanupMethod.setAccessible(true);
                        } else if ("map".equals(method.getName())) {
                            this.mapMethod = method;
                            this.mapMethod.setAccessible(true);
                        }
                    }
                    if (this.mapMethod == null) {
                        throw new CrunchRuntimeException("No map method for class: " + this.mapperClass);
                    }
                    ProxyFactory proxyFactory = new ProxyFactory();
                    proxyFactory.setSuperclass(Mapper.Context.class);
                    proxyFactory.setFilter(CtxtMethodHandler.FILTER);
                    Class[] clsArr = {Mapper.class};
                    Object[] objArr = {this.instance};
                    if (!Modifier.isAbstract(Mapper.Context.class.getModifiers())) {
                        clsArr = new Class[]{Mapper.class, Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class, OutputCommitter.class, Class.forName("org.apache.hadoop.mapreduce.StatusReporter"), InputSplit.class};
                        objArr = new Object[]{this.instance, getConfiguration(), getTaskAttemptID(), null, null, Mapreduce.NO_OP_OUTPUT_COMMITTER, null, null};
                    }
                    this.handler = new CtxtMethodHandler(getContext());
                    this.context = (Mapper.Context) proxyFactory.create(clsArr, objArr, this.handler);
                } catch (Exception e) {
                    throw new CrunchRuntimeException(e);
                }
            }
            if (this.setupMethod != null) {
                try {
                    this.setupMethod.invoke(this.instance, this.context);
                } catch (Exception e2) {
                    throw new CrunchRuntimeException(e2);
                }
            }
        }

        @Override // org.apache.crunch.DoFn
        public void process(Pair<K1, V1> pair, Emitter<Pair<K2, V2>> emitter) {
            this.handler.set(emitter);
            try {
                this.mapMethod.invoke(this.instance, pair.first(), pair.second(), this.context);
            } catch (Exception e) {
                throw new CrunchRuntimeException(e);
            }
        }

        @Override // org.apache.crunch.DoFn
        public void cleanup(Emitter<Pair<K2, V2>> emitter) {
            if (this.cleanupMethod != null) {
                this.handler.set(emitter);
                try {
                    this.cleanupMethod.invoke(this.instance, this.context);
                } catch (Exception e) {
                    throw new CrunchRuntimeException(e);
                }
            }
        }
    }

    /* loaded from: input_file:lib/crunch-core-0.11.0.7.1.7.0-551.jar:org/apache/crunch/lib/Mapreduce$ReducerFn.class */
    private static class ReducerFn<K1, V1, K2 extends Writable, V2 extends Writable> extends DoFn<Pair<K1, Iterable<V1>>, Pair<K2, V2>> {
        private final Class<? extends Reducer<K1, V1, K2, V2>> reducerClass;
        private transient Reducer<K1, V1, K2, V2> instance;
        private transient CtxtMethodHandler handler;
        private transient Reducer.Context context;
        private transient Method setupMethod;
        private transient Method reduceMethod;
        private transient Method cleanupMethod;

        public ReducerFn(Class<? extends Reducer<K1, V1, K2, V2>> cls) {
            this.reducerClass = (Class) Preconditions.checkNotNull(cls);
        }

        @Override // org.apache.crunch.DoFn
        public void initialize() {
            if (this.instance == null) {
                this.instance = (Reducer) ReflectionUtils.newInstance(this.reducerClass, getConfiguration());
                try {
                    for (Method method : this.reducerClass.getDeclaredMethods()) {
                        if ("setup".equals(method.getName())) {
                            this.setupMethod = method;
                            this.setupMethod.setAccessible(true);
                        } else if ("cleanup".equals(method.getName())) {
                            this.cleanupMethod = method;
                            this.cleanupMethod.setAccessible(true);
                        } else if ("reduce".equals(method.getName())) {
                            this.reduceMethod = method;
                            this.reduceMethod.setAccessible(true);
                        }
                    }
                    if (this.reduceMethod == null) {
                        throw new CrunchRuntimeException("No reduce method for class: " + this.reducerClass);
                    }
                    ProxyFactory proxyFactory = new ProxyFactory();
                    proxyFactory.setSuperclass(Reducer.Context.class);
                    proxyFactory.setFilter(CtxtMethodHandler.FILTER);
                    Class[] clsArr = {Reducer.class};
                    Object[] objArr = {this.instance};
                    if (!Modifier.isAbstract(Reducer.Context.class.getModifiers())) {
                        Class<?> cls = Class.forName("org.apache.hadoop.mapred.RawKeyValueIterator");
                        Object newProxyInstance = Proxy.newProxyInstance(cls.getClassLoader(), new Class[]{cls}, new InvocationHandler() { // from class: org.apache.crunch.lib.Mapreduce.ReducerFn.1
                            @Override // java.lang.reflect.InvocationHandler
                            public Object invoke(Object obj, Method method2, Object[] objArr2) throws Throwable {
                                return "next".equals(method2.getName()) ? true : null;
                            }
                        });
                        clsArr = new Class[]{Reducer.class, Configuration.class, TaskAttemptID.class, cls, Counter.class, Counter.class, RecordWriter.class, OutputCommitter.class, Class.forName("org.apache.hadoop.mapreduce.StatusReporter"), RawComparator.class, Class.class, Class.class};
                        objArr = new Object[]{this.instance, getConfiguration(), getTaskAttemptID(), newProxyInstance, null, null, null, Mapreduce.NO_OP_OUTPUT_COMMITTER, null, null, NullWritable.class, NullWritable.class};
                    }
                    this.handler = new CtxtMethodHandler(getContext());
                    this.context = (Reducer.Context) proxyFactory.create(clsArr, objArr, this.handler);
                } catch (Exception e) {
                    throw new CrunchRuntimeException(e);
                }
            }
            if (this.setupMethod != null) {
                try {
                    this.setupMethod.invoke(this.instance, this.context);
                } catch (Exception e2) {
                    throw new CrunchRuntimeException(e2);
                }
            }
        }

        @Override // org.apache.crunch.DoFn
        public void process(Pair<K1, Iterable<V1>> pair, Emitter<Pair<K2, V2>> emitter) {
            this.handler.set(emitter);
            try {
                this.reduceMethod.invoke(this.instance, pair.first(), pair.second(), this.context);
            } catch (Exception e) {
                throw new CrunchRuntimeException(e);
            }
        }

        @Override // org.apache.crunch.DoFn
        public void cleanup(Emitter<Pair<K2, V2>> emitter) {
            if (this.cleanupMethod != null) {
                this.handler.set(emitter);
                try {
                    this.cleanupMethod.invoke(this.instance, this.context);
                } catch (Exception e) {
                    throw new CrunchRuntimeException(e);
                }
            }
        }
    }

    public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map(PTable<K1, V1> pTable, Class<? extends Mapper<K1, V1, K2, V2>> cls, Class<K2> cls2, Class<V2> cls3) {
        return pTable.parallelDo((DoFn<S, Pair<K1, V1>>) new MapperFn(cls), tableOf(cls2, cls3));
    }

    public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce(PGroupedTable<K1, V1> pGroupedTable, Class<? extends Reducer<K1, V1, K2, V2>> cls, Class<K2> cls2, Class<V2> cls3) {
        return pGroupedTable.parallelDo((DoFn<S, Pair<K1, V1>>) new ReducerFn(cls), tableOf(cls2, cls3));
    }

    private static <K extends Writable, V extends Writable> PTableType<K, V> tableOf(Class<K> cls, Class<V> cls2) {
        return Writables.tableOf(Writables.writables(cls), Writables.writables(cls2));
    }
}
