package com.cloudera.cmon.firehose;

import com.cloudera.cmon.UnknownMetricException;
import com.cloudera.cmon.domain.FhMetric;
import com.cloudera.cmon.firehose.event.AvroActivityUpdate;
import com.cloudera.cmon.firehose.event.FirehosePipelineMessage;
import com.cloudera.cmon.firehose.event.MetricValue;
import com.cloudera.cmon.pipeline.ItemRejectedException;
import com.cloudera.cmon.pipeline.PipelineStage;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cmon/firehose/DebugRecordingReceiver.class */
public class DebugRecordingReceiver extends PipelineStage.ItemReceiver<FhMessage> {
    private final boolean enabled;
    private DataFileWriter<FirehosePipelineMessage> writer;
    private static Logger LOG = LoggerFactory.getLogger(DebugRecordingReceiver.class);

    public DebugRecordingReceiver(CMONConfiguration cMONConfiguration) {
        String debugRecordingFilename = cMONConfiguration.getDebugRecordingFilename();
        if (debugRecordingFilename == null) {
            this.enabled = false;
            return;
        }
        this.enabled = true;
        this.writer = new DataFileWriter<>(new SpecificDatumWriter(FirehosePipelineMessage.SCHEMA$));
        this.writer.setCodec(CodecFactory.deflateCodec(5));
        File file = new File(debugRecordingFilename);
        try {
            if (file.exists()) {
                this.writer.appendTo(new File(debugRecordingFilename));
            } else {
                this.writer.create(FirehosePipelineMessage.SCHEMA$, file);
                this.writer.flush();
            }
        } catch (IOException e) {
            LOG.warn("Could not open " + debugRecordingFilename, e);
        }
    }

    public FhMessage receiveItem(FhMessage fhMessage) throws ItemRejectedException {
        FirehosePipelineMessage firehosePipelineMessage;
        if (!this.enabled) {
            return fhMessage;
        }
        synchronized (this) {
            if (this.writer == null) {
                return fhMessage;
            }
            try {
                firehosePipelineMessage = new FirehosePipelineMessage();
                firehosePipelineMessage.setAgentMsg(fhMessage.getAgentMsg());
                firehosePipelineMessage.setActivityUpdate(toAvro(fhMessage.getActivityUpdate()));
                firehosePipelineMessage.setTsReceived(Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(fhMessage.tsReceived)));
            } catch (IOException e) {
                LOG.warn("Could not append to debug log.", e);
            }
            try {
                this.writer.append(firehosePipelineMessage);
                this.writer.flush();
                return fhMessage;
            } catch (ClassCastException e2) {
                throw e2;
            }
        }
    }

    private AvroActivityUpdate toAvro(ActivityUpdate activityUpdate) {
        if (activityUpdate == null) {
            return null;
        }
        AvroActivityUpdate avroActivityUpdate = new AvroActivityUpdate();
        avroActivityUpdate.setActivityId(activityUpdate.getActivityId());
        avroActivityUpdate.setMetrics(Lists.newArrayList());
        avroActivityUpdate.setServiceName(activityUpdate.getServiceName());
        Iterator<FhMetric> it = activityUpdate.getMetrics().iterator();
        while (it.hasNext()) {
            avroActivityUpdate.getMetrics().add(it.next().toMetricValue());
        }
        return avroActivityUpdate;
    }

    public static ActivityUpdate fromAvro(long j, AvroActivityUpdate avroActivityUpdate) {
        if (avroActivityUpdate == null) {
            return null;
        }
        ActivityUpdate activityUpdate = new ActivityUpdate(avroActivityUpdate.getServiceName().toString(), avroActivityUpdate.getActivityId().toString(), new Instant(j * 1000));
        Iterator it = avroActivityUpdate.getMetrics().iterator();
        while (it.hasNext()) {
            try {
                activityUpdate.add(new FhMetric(avroActivityUpdate.getActivityId().toString(), (MetricValue) it.next(), new Date(j * 1000)));
            } catch (UnknownMetricException e) {
                throw new RuntimeException(e);
            }
        }
        return activityUpdate;
    }

    public boolean isEnabled() {
        return this.enabled;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static int readEventsFromFile(FirehosePipeline firehosePipeline, String str) throws IOException {
        int i = 0;
        Iterator it = new DataFileReader(new File(str), new SpecificDatumReader(FirehosePipelineMessage.SCHEMA$)).iterator();
        while (it.hasNext()) {
            i++;
            firehosePipeline.receiveEvent(new FhMessage((FirehosePipelineMessage) it.next()));
        }
        return i;
    }
}
