package com.cloudera.cmon.firehose;

import com.cloudera.cmf.PollingScmProxy;
import com.cloudera.cmf.descriptors.ConfigUtilities;
import com.cloudera.cmf.descriptors.ReadOnlyScmDescriptorPlus;
import com.cloudera.cmon.MonitoringTypes;
import com.cloudera.cmon.firehose.event.WriteStatusRecordsRequest;
import com.cloudera.cmon.kaiser.SubjectRecordId;
import com.cloudera.cmon.tstore.leveldb.LDBSubjectRecordStore;
import com.cloudera.enterprise.ThrottlingLogger;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Histogram;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cmon/firehose/HMONToSMONHostSubjectRecordPublisher.class */
public class HMONToSMONHostSubjectRecordPublisher implements LDBSubjectRecordStore.SubjectRecordsPersistedListener {
    private static Logger LOG = LoggerFactory.getLogger(HMONToSMONHostSubjectRecordPublisher.class);
    private static Logger THROTTLED_LOG = new ThrottlingLogger(LOG, Duration.standardMinutes(30));
    private final PollingScmProxy pollingScmProxy;
    private FirehoseClient smonClient;
    private final Histogram createClientDurationMillisHistogram = Metrics.newHistogram(getClass(), "createClientDurationMillis", true);
    private final Histogram recordSendDurationMillisHistogram = Metrics.newHistogram(getClass(), "recordSendDurationMillis", true);
    private final Counter recordSendErrorCount = Metrics.newCounter(getClass(), "recordSendErrors");

    public HMONToSMONHostSubjectRecordPublisher(PollingScmProxy pollingScmProxy) {
        Preconditions.checkNotNull(pollingScmProxy);
        this.pollingScmProxy = pollingScmProxy;
    }

    @Override // com.cloudera.cmon.tstore.leveldb.LDBSubjectRecordStore.SubjectRecordsPersistedListener
    public void processRecords(Map<String, byte[]> map, Instant instant) {
        ReadOnlyScmDescriptorPlus scmDescriptor;
        Preconditions.checkNotNull(map);
        Preconditions.checkNotNull(instant);
        if (map.isEmpty() || (scmDescriptor = this.pollingScmProxy.getScmDescriptor()) == null) {
            return;
        }
        InetSocketAddress sMONAddress = ConfigUtilities.getSMONAddress(scmDescriptor);
        if (sMONAddress == null) {
            THROTTLED_LOG.warn("Could not get SMON address from descriptor.");
            return;
        }
        try {
            ensureSMONClientExists(sMONAddress);
            HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
            for (Map.Entry<String, byte[]> entry : map.entrySet()) {
                if (MonitoringTypes.HOST_SUBJECT_TYPE.equals(SubjectRecordId.createFromSubjectId(entry.getKey()).getSubjectType())) {
                    newHashMapWithExpectedSize.put(entry.getKey(), ByteBuffer.wrap(entry.getValue()));
                }
            }
            if (newHashMapWithExpectedSize.isEmpty()) {
                return;
            }
            Instant instant2 = new Instant();
            try {
                this.smonClient.writeStatusRecords(WriteStatusRecordsRequest.newBuilder().setRecords(newHashMapWithExpectedSize).setTimestampMillis(instant.getMillis()).setSubjectRecordSchemaVersion(9L).build());
            } catch (Exception e) {
                THROTTLED_LOG.warn("Failed to send messages to SMON.", e);
                this.recordSendErrorCount.inc();
            }
            this.recordSendDurationMillisHistogram.update(new Duration(instant2, (ReadableInstant) null).getMillis());
        } catch (Exception e2) {
            THROTTLED_LOG.warn("Failed to get a client connection to SMON at " + sMONAddress, e2);
            this.smonClient = null;
        }
    }

    private void ensureSMONClientExists(InetSocketAddress inetSocketAddress) throws IOException {
        if (this.smonClient != null && this.smonClient.getHost().equals(inetSocketAddress.getHostName()) && this.smonClient.getPort() == inetSocketAddress.getPort()) {
            return;
        }
        synchronized (this) {
            if (this.smonClient != null && this.smonClient.getHost().equals(inetSocketAddress.getHostName()) && this.smonClient.getPort() == inetSocketAddress.getPort()) {
                return;
            }
            Instant instant = new Instant();
            this.smonClient = new FirehoseClient(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
            this.createClientDurationMillisHistogram.update(new Duration(instant, (ReadableInstant) null).getMillis());
        }
    }
}
