package com.cloudera.cmon.firehose;

import com.cloudera.cmf.protocol.firehose.status.EntityStatus;
import com.cloudera.cmon.MetricEnum;
import com.cloudera.cmon.MetricInfo;
import com.cloudera.cmon.MetricSchema;
import com.cloudera.cmon.MonitoringTypes;
import com.cloudera.cmon.TimeSeriesEntityType;
import com.cloudera.cmon.firehose.event.AgentMessageServiceIPC;
import com.cloudera.cmon.firehose.event.AgentMessages;
import com.cloudera.cmon.firehose.event.AgentMsg;
import com.cloudera.cmon.firehose.event.EntityStatusRecord;
import com.cloudera.cmon.firehose.event.HTableRecord;
import com.cloudera.cmon.firehose.event.HostRecord;
import com.cloudera.cmon.firehose.event.MetricValue;
import com.cloudera.cmon.firehose.event.MetricWriteRecord;
import com.cloudera.cmon.firehose.event.RoleRecord;
import com.cloudera.cmon.firehose.event.ServiceRecord;
import com.cloudera.cmon.firehose.event.TimeSeriesEntityRecord;
import com.cloudera.cmon.firehose.event.WriteImpalaQueriesRequest;
import com.cloudera.cmon.firehose.event.WriteMetricsRequest;
import com.cloudera.cmon.firehose.event.WriteStatusRecordsRequest;
import com.cloudera.cmon.firehose.event.WriteStatusRequest;
import com.cloudera.cmon.kaiser.SubjectRecordId;
import com.cloudera.cmon.tstore.TimeSeriesDataStore;
import com.cloudera.cmon.tstore.TimeSeriesEntityRetriever;
import com.cloudera.cmon.tstore.TimeSeriesMetadataStore;
import com.cloudera.enterprise.ThrottlingLogger;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Histogram;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cmon/firehose/AgentMessageServiceHandler.class */
public class AgentMessageServiceHandler implements AgentMessageServiceIPC {
    private static final Logger LOG = LoggerFactory.getLogger(AgentMessageServiceHandler.class);
    private static final ThrottlingLogger THROTTLED_LOG = new ThrottlingLogger(LOG, Duration.standardMinutes(30));
    private static final Duration ACCEPTANCE_WINDOW = CMONConfiguration.getSingleton().getAcceptanceTimeWindowDuration();
    private long lastHeard = 0;
    private final Counter numAgentMessagesOutsideAcceptanceWindow = Metrics.newCounter(AgentMessageServiceHandler.class, "numAgentMessagesOutsideAcceptanceWindow");
    private final Counter numWriteRequestsOutsideAcceptanceWindow = Metrics.newCounter(AgentMessageServiceHandler.class, "numWriteRequestsOutsideAcceptanceWindow");
    private final Counter numAgentMessageRPCsReceived = Metrics.newCounter(AgentMessageServiceHandler.class, "numAgentMessageRPCsReceived");
    private final Counter numAgentMessagesReceived = Metrics.newCounter(AgentMessageServiceHandler.class, "numOfAgentMessagesReceived");
    private final Histogram writeStatusRecordsDurationMillisHistogram = Metrics.newHistogram(AgentMessageServiceHandler.class, "writeStatusRecordsDurationMillis", true);
    private final Histogram writeMetricsDurationHistogram = Metrics.newHistogram(AgentMessageServiceHandler.class, "writeMetricsDurationMs", true);
    private final Histogram writeImpalaQueriesDurationHistogram = Metrics.newHistogram(AgentMessageServiceHandler.class, "writeImpalaQueriesDurationMs", true);
    final FirehosePipeline processingPipeline;

    public AgentMessageServiceHandler(FirehosePipeline firehosePipeline) {
        Preconditions.checkNotNull(firehosePipeline, "Pipeline must not be null");
        this.processingPipeline = firehosePipeline;
    }

    public synchronized void sendAgentMessages(AgentMessages agentMessages) {
        for (AgentMsg agentMsg : agentMessages.getAgentMsgs()) {
            Instant instant = new Instant(agentMsg.getTsSecs().longValue() * 1000);
            if (isOutsideAcceptanceWindow(instant)) {
                THROTTLED_LOG.warn("Agent message includes timestamp outside acceptance window from " + agentMsg.getHostname() + " at " + instant);
                this.numAgentMessagesOutsideAcceptanceWindow.inc(1L);
            }
            this.processingPipeline.receiveEvent(new FhMessage(agentMsg));
            this.numAgentMessagesReceived.inc(1L);
        }
        this.numAgentMessageRPCsReceived.inc(1L);
        this.lastHeard = System.currentTimeMillis();
    }

    public synchronized long getNumAgentMessagesReceived() {
        return this.numAgentMessagesReceived.count();
    }

    public synchronized long getNumAgentMessagesRPCsReceived() {
        return this.numAgentMessageRPCsReceived.count();
    }

    public synchronized long getNumAgentMessagesOutsideAcceptanceWindow() {
        return this.numAgentMessagesOutsideAcceptanceWindow.count();
    }

    public synchronized long getNumWriteRequestsOutsideAcceptanceWindow() {
        return this.numWriteRequestsOutsideAcceptanceWindow.count();
    }

    public synchronized long getLastHeard() {
        return this.lastHeard;
    }

    public void writeStatusRecords(WriteStatusRecordsRequest writeStatusRecordsRequest) {
        Preconditions.checkNotNull(writeStatusRecordsRequest);
        Instant instant = new Instant();
        if (!writeStatusRecordsRequest.getSubjectRecordSchemaVersion().equals(9L)) {
            THROTTLED_LOG.warn("Got writeStatusRecords request with schema version: " + writeStatusRecordsRequest.getSubjectRecordSchemaVersion());
            return;
        }
        Instant instant2 = new Instant(writeStatusRecordsRequest.getTimestampMillis());
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(writeStatusRecordsRequest.getRecords().size());
        for (Map.Entry entry : writeStatusRecordsRequest.getRecords().entrySet()) {
            newHashMapWithExpectedSize.put(entry.getKey(), ((ByteBuffer) entry.getValue()).array());
        }
        if (newHashMapWithExpectedSize.isEmpty()) {
            return;
        }
        this.processingPipeline.getSubjectRecordStore().write(newHashMapWithExpectedSize, instant2);
        this.writeStatusRecordsDurationMillisHistogram.update(new Duration(instant, (ReadableInstant) null).getMillis());
    }

    public void writeMetrics(WriteMetricsRequest writeMetricsRequest) {
        TimeSeriesMetadataStore.TimeSeriesEntity host;
        double doubleValue;
        Preconditions.checkNotNull(writeMetricsRequest);
        Instant instant = new Instant();
        ArrayList newArrayList = Lists.newArrayList();
        for (MetricWriteRecord metricWriteRecord : writeMetricsRequest.getWriteRecords()) {
            Instant instant2 = new Instant(metricWriteRecord.getTimestampMs());
            if (isOutsideAcceptanceWindow(instant2)) {
                THROTTLED_LOG.warn("Write record (with Firehose client) timestamp outside valid window:" + instant2);
                this.numWriteRequestsOutsideAcceptanceWindow.inc(1L);
            } else {
                Object entityRecord = metricWriteRecord.getEntityRecord();
                if (entityRecord instanceof HostRecord) {
                    host = TimeSeriesEntityRetriever.getHost(this.processingPipeline.getTimeSeriesStore(), ((HostRecord) entityRecord).getHostId());
                } else if (entityRecord instanceof ServiceRecord) {
                    host = TimeSeriesEntityRetriever.getService(this.processingPipeline.getTimeSeriesStore(), ((ServiceRecord) entityRecord).getServiceName());
                } else if (entityRecord instanceof RoleRecord) {
                    host = TimeSeriesEntityRetriever.getRole(this.processingPipeline.getTimeSeriesStore(), ((RoleRecord) entityRecord).getRoleName());
                } else if (entityRecord instanceof HTableRecord) {
                    HTableRecord hTableRecord = (HTableRecord) entityRecord;
                    host = TimeSeriesEntityRetriever.getHTable(this.processingPipeline.getTimeSeriesStore(), hTableRecord.getServiceName(), hTableRecord.getTableName());
                } else {
                    Preconditions.checkState(entityRecord instanceof TimeSeriesEntityRecord);
                    TimeSeriesEntityRecord timeSeriesEntityRecord = (TimeSeriesEntityRecord) entityRecord;
                    try {
                        host = this.processingPipeline.getTimeSeriesStore().lookupTimeSeriesEntity(TimeSeriesEntityType.fromString(timeSeriesEntityRecord.getType()), timeSeriesEntityRecord.getName());
                    } catch (IllegalArgumentException e) {
                        THROTTLED_LOG.info("Invalid time series type specified: " + timeSeriesEntityRecord.getType());
                    }
                }
                if (host == null) {
                    THROTTLED_LOG.info("Unknown entity: " + entityRecord.toString());
                } else {
                    addSCMClockOffsetIfNecessary(host, metricWriteRecord, newArrayList);
                    ImmutableMap.Builder builder = ImmutableMap.builder();
                    for (MetricValue metricValue : metricWriteRecord.getMetricValues()) {
                        Object value = metricValue.getValue();
                        if (value instanceof Double) {
                            doubleValue = ((Double) value).doubleValue();
                        } else if (value instanceof Long) {
                            doubleValue = ((Long) value).doubleValue();
                        } else {
                            THROTTLED_LOG.info("Invalid metric value: \"" + value + "\",must be a double or long");
                        }
                        Preconditions.checkArgument((metricValue.getId() == null && metricValue.getName() == null) ? false : true);
                        MetricInfo metricInfo = metricValue.getId() != null ? MetricSchema.getCurrentSchema().getMetricInfo(metricValue.getId().intValue()) : MetricSchema.getCurrentSchema().getMetricInfoByName(metricValue.getName());
                        if (metricInfo == null) {
                            THROTTLED_LOG.info("Invalid metric id: " + metricValue.getId());
                        } else {
                            builder.put(metricInfo, Double.valueOf(doubleValue));
                        }
                    }
                    newArrayList.add(new TimeSeriesDataStore.WriteEntry<>(host, new Instant(metricWriteRecord.getTimestampMs()), builder.build()));
                }
            }
        }
        this.processingPipeline.getTimeSeriesStore().writeBulk(newArrayList);
        this.writeMetricsDurationHistogram.update(new Duration(instant, (ReadableInstant) null).getMillis());
    }

    private void addSCMClockOffsetIfNecessary(TimeSeriesMetadataStore.TimeSeriesEntity timeSeriesEntity, MetricWriteRecord metricWriteRecord, List<TimeSeriesDataStore.WriteEntry<MetricInfo>> list) {
        Preconditions.checkNotNull(timeSeriesEntity);
        Preconditions.checkNotNull(metricWriteRecord);
        Preconditions.checkNotNull(list);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Instant now = Instant.now();
        if (timeSeriesEntity.getType().equals(MonitoringTypes.CMSERVER_ENTITY_TYPE)) {
            builder.put(MetricSchema.getCurrentSchema().getMetricInfo(MetricEnum.CM_CLOCK_OFFSET_WITH_SMON.getUniqueMetricId()), Double.valueOf(now.getMillis() - metricWriteRecord.getTimestampMs().longValue()));
            list.add(new TimeSeriesDataStore.WriteEntry<>(timeSeriesEntity, now, builder.build()));
        }
    }

    public void writeImpalaQueries(WriteImpalaQueriesRequest writeImpalaQueriesRequest) {
        Preconditions.checkNotNull(writeImpalaQueriesRequest);
        Preconditions.checkNotNull(writeImpalaQueriesRequest.getQueryUpdates());
        Instant instant = new Instant();
        for (Map.Entry entry : writeImpalaQueriesRequest.getQueryUpdates().entrySet()) {
            String str = (String) entry.getKey();
            this.processingPipeline.getImpalaQueryManager().updateQueries((List) entry.getValue(), str);
        }
        this.writeImpalaQueriesDurationHistogram.update(new Duration(instant, (ReadableInstant) null).getMillis());
    }

    public static boolean isOutsideAcceptanceWindow(Instant instant) {
        Preconditions.checkNotNull(instant);
        Instant now = Instant.now();
        return instant.plus(ACCEPTANCE_WINDOW).isBefore(now) || instant.minus(ACCEPTANCE_WINDOW).isAfter(now);
    }

    public static boolean isOutsideAcceptanceWindow(AgentMsg agentMsg) {
        Preconditions.checkNotNull(agentMsg);
        return isOutsideAcceptanceWindow(new Instant(agentMsg.getTsSecs().longValue() * 1000));
    }

    public void writeStatus(WriteStatusRequest writeStatusRequest) {
        Preconditions.checkNotNull(writeStatusRequest);
        Preconditions.checkNotNull(writeStatusRequest.getStatusRecords());
        for (EntityStatusRecord entityStatusRecord : writeStatusRequest.getStatusRecords()) {
            TimeSeriesEntityRecord entity = entityStatusRecord.getEntity();
            try {
                TimeSeriesMetadataStore.TimeSeriesEntity lookupTimeSeriesEntity = this.processingPipeline.getTimeSeriesStore().lookupTimeSeriesEntity(TimeSeriesEntityType.fromString(entity.getType()), entity.getName());
                if (lookupTimeSeriesEntity == null) {
                    THROTTLED_LOG.info("Unknown time series entity: {}", entity.toString());
                } else {
                    MetricInfo metricInfoByName = MetricSchema.getCurrentSchema().getMetricInfoByName(entityStatusRecord.getMetric());
                    if (metricInfoByName == null) {
                        THROTTLED_LOG.info("No MetricInfo found for {}", entityStatusRecord.getMetric());
                    } else {
                        try {
                            EntityStatus createEntityStatus = EntityStatus.createEntityStatus(entityStatusRecord);
                            this.processingPipeline.getSubjectRecordStore().write(SubjectRecordId.createForEntityStatus(lookupTimeSeriesEntity.getId(), metricInfoByName).toString(), new Instant(entityStatusRecord.getTimestampMs()), createEntityStatus.encode());
                            THROTTLED_LOG.debug("Successfully wrote status {}", entityStatusRecord.toString());
                        } catch (RuntimeException e) {
                            THROTTLED_LOG.warn("Failed to write status {}", entityStatusRecord.toString());
                        }
                    }
                }
            } catch (IllegalArgumentException e2) {
                THROTTLED_LOG.info("Invalid time series type specified : {}", entity.getType());
            }
        }
    }
}
