package com.cloudera.cmf.event.publish;

import com.cloudera.cmf.event.Event;
import com.cloudera.cmf.event.shaded.com.google.common.annotations.VisibleForTesting;
import com.cloudera.cmf.event.shaded.com.google.common.base.Preconditions;
import com.cloudera.cmf.event.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.cloudera.cmf.event.shaded.org.apache.avro.AvroRemoteException;
import com.cloudera.cmf.event.shaded.org.joda.time.Duration;
import com.cloudera.cmf.event.shaded.org.joda.time.Instant;
import com.cloudera.cmf.event.shaded.org.slf4j.Logger;
import com.cloudera.cmf.event.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.ConnectException;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/cloudera/cmf/event/publish/EventStorePublisherWithRetry.class */
public class EventStorePublisherWithRetry implements EventStorePublishAPI, EventStorePublisherWithRetryMXBean {
    public static final int DEFAULT_QUEUE_SIZE = 1000;
    private final ThreadPoolExecutor publishExecutor;
    private final Duration retryInterval;
    private final EventStorePublishAPI publishAPI;
    private final int sleepJitterMs;
    public static final int DEFAULT_JITTER_MS = 500;
    private final int queueSize;
    private final BlockingQueue<Runnable> queue;
    private final SimpleThrottlingLogger failureLogger;
    private final SimpleThrottlingLogger rejectionLogger;
    static final Logger LOG = LoggerFactory.getLogger(EventStorePublisherWithRetry.class);
    private static final Duration DEFAULT_LOG_WINDOW = Duration.standardMinutes(30);
    private final AtomicLong dropCount = new AtomicLong(0);
    private final AtomicLong exceptionCount = new AtomicLong(0);
    private final AtomicLong pushbackCount = new AtomicLong(0);
    private final AtomicLong sentCount = new AtomicLong(0);

    /* loaded from: input_file:com/cloudera/cmf/event/publish/EventStorePublisherWithRetry$PublishEventTask.class */
    private class PublishEventTask implements Runnable {
        private final Event event;

        PublishEventTask(Event event) {
            this.event = event;
        }

        private void handleIOExceptionLogging(String str, IOException iOException) {
            Throwable cause = iOException.getCause();
            EventStorePublisherWithRetry.this.failureLogger.tryLog(str, (cause == null || !(cause instanceof ConnectException)) ? iOException : null);
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            while (!z) {
                try {
                    z = EventStorePublisherWithRetry.this.publishAPI.publishEvent(this.event);
                } catch (Exception e) {
                    String str = "Failed to publish event: " + this.event;
                    EventStorePublisherWithRetry.this.exceptionCount.incrementAndGet();
                    if (e instanceof AvroRemoteException) {
                        Throwable cause = e.getCause();
                        if (cause == null || !(cause instanceof IOException)) {
                            EventStorePublisherWithRetry.this.failureLogger.tryLog(str, e);
                        } else {
                            handleIOExceptionLogging(str, (IOException) cause);
                        }
                    } else if (e instanceof IOException) {
                        handleIOExceptionLogging(str, (IOException) e);
                    } else {
                        EventStorePublisherWithRetry.this.failureLogger.tryLog(str, e);
                    }
                }
                if (z) {
                    EventStorePublisherWithRetry.this.sentCount.incrementAndGet();
                    return;
                }
                EventStorePublisherWithRetry.this.pushbackCount.incrementAndGet();
                EventStorePublisherWithRetry.this.failureLogger.tryLog("Publication rejected by event store, will retry", null);
                try {
                    Thread.sleep(EventStorePublisherWithRetry.this.retryInterval.getMillis() + new Random().nextInt(EventStorePublisherWithRetry.this.sleepJitterMs));
                } catch (InterruptedException e2) {
                    EventStorePublisherWithRetry.LOG.warn("Publish event task was interrupted", (Throwable) e2);
                    Thread.interrupted();
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:com/cloudera/cmf/event/publish/EventStorePublisherWithRetry$SimpleThrottlingLogger.class */
    public static class SimpleThrottlingLogger {
        final Duration logWindow;
        private Instant lastLogInstant = new Instant(0);
        private long suppressedLogCount = 0;

        public SimpleThrottlingLogger(Duration duration) {
            this.logWindow = duration;
        }

        @VisibleForTesting
        protected long getSuppressedLogCount() {
            long j;
            synchronized (PublishEventTask.class) {
                j = this.suppressedLogCount;
            }
            return j;
        }

        public synchronized void tryLog(String str, Exception exc) {
            Instant instant = new Instant();
            Duration duration = new Duration(instant.getMillis() - this.lastLogInstant.getMillis());
            if (!duration.isLongerThan(this.logWindow)) {
                this.suppressedLogCount++;
                return;
            }
            if (this.lastLogInstant.getMillis() != 0) {
                str = str + " - 1 of " + (this.suppressedLogCount + 1) + " failure(s) in last " + duration.getStandardSeconds() + "s";
            }
            if (exc != null) {
                EventStorePublisherWithRetry.LOG.warn(str, (Throwable) exc);
            } else {
                EventStorePublisherWithRetry.LOG.warn(str);
            }
            this.suppressedLogCount = 0L;
            this.lastLogInstant = instant;
        }
    }

    @VisibleForTesting
    EventStorePublisherWithRetry(EventStorePublishAPI eventStorePublishAPI, long j, int i, RejectedExecutionHandler rejectedExecutionHandler, long j2, int i2) {
        Preconditions.checkNotNull(eventStorePublishAPI);
        Preconditions.checkArgument(j > 0);
        Preconditions.checkArgument(i > 0);
        Preconditions.checkArgument(j2 >= 0);
        Preconditions.checkArgument(i2 > 0);
        Preconditions.checkNotNull(rejectedExecutionHandler);
        this.publishAPI = eventStorePublishAPI;
        this.retryInterval = new Duration(j);
        this.sleepJitterMs = i2;
        this.queueSize = i;
        this.failureLogger = new SimpleThrottlingLogger(Duration.standardSeconds(j2));
        this.rejectionLogger = new SimpleThrottlingLogger(Duration.standardSeconds(j2));
        this.queue = new LinkedBlockingQueue(this.queueSize);
        this.publishExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, this.queue, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("EventStorePublisherWithRetry-%d").build(), rejectedExecutionHandler);
    }

    public static EventStorePublisherWithRetry createWithAvroRpc(String str, int i, long j, int i2, RejectedExecutionHandler rejectedExecutionHandler, long j2, int i3) {
        return new EventStorePublisherWithRetry(new AvroEventStorePublishProxy(str, i), j, i2, rejectedExecutionHandler, j2, i3);
    }

    public static EventStorePublisherWithRetry createWithAvroRpc(String str, int i, long j) {
        return createWithAvroRpc(str, i, j, 1000, new ThreadPoolExecutor.AbortPolicy(), DEFAULT_LOG_WINDOW.toStandardSeconds().getSeconds(), DEFAULT_JITTER_MS);
    }

    @Override // com.cloudera.cmf.event.publish.EventStorePublishAPI
    public boolean publishEvent(Event event) {
        try {
            this.publishExecutor.submit(new PublishEventTask(event));
            return true;
        } catch (RejectedExecutionException e) {
            this.rejectionLogger.tryLog(String.format("Dropped event, queue (%d) over capacity (%d total dropped)", Integer.valueOf(this.queue.size()), Long.valueOf(this.dropCount.incrementAndGet())), null);
            return false;
        }
    }

    private void shutdown() {
        this.publishExecutor.shutdown();
    }

    @Override // com.cloudera.cmf.event.publish.EventStorePublishAPI
    public void closePublishAPI() throws IOException {
        shutdown();
    }

    public void reportState(PrintWriter printWriter) {
        printWriter.println("<p><b>EventStorePublisherWithRetry</b> (" + EventStorePublisherWithRetry.class + ")");
        printWriter.println("<hr/>");
        printWriter.println("<br/>Events successfully sent: " + this.sentCount.get());
        printWriter.println("<br/>Events dropped client-side (due to a full send buffer): " + this.dropCount.get());
        printWriter.println("<br/>Events rejected server-side (told to try again): " + this.pushbackCount.get());
        printWriter.println("<br/>Events rejected server-side due to exception: " + this.exceptionCount.get());
        printWriter.println("<br/>Current queue size: " + this.queue.size() + " (remaining: " + this.queue.remainingCapacity() + ")");
        printWriter.println("</p>");
    }

    @Override // com.cloudera.cmf.event.publish.EventStorePublisherWithRetryMXBean
    public long getSendQueueSize() {
        return this.queue.size();
    }

    @Override // com.cloudera.cmf.event.publish.EventStorePublisherWithRetryMXBean
    public long getSendQueueMaxSize() {
        return this.queueSize;
    }

    @Override // com.cloudera.cmf.event.publish.EventStorePublisherWithRetryMXBean
    public long getDropCount() {
        return this.dropCount.get();
    }

    @Override // com.cloudera.cmf.event.publish.EventStorePublisherWithRetryMXBean
    public long getExceptionCount() {
        return this.exceptionCount.get();
    }

    @Override // com.cloudera.cmf.event.publish.EventStorePublisherWithRetryMXBean
    public long getPushbackCount() {
        return this.pushbackCount.get();
    }

    @Override // com.cloudera.cmf.event.publish.EventStorePublisherWithRetryMXBean
    public long getSentCount() {
        return this.sentCount.get();
    }
}
