package com.hortonworks.smm.kafka.services.notification;

import com.google.common.collect.Maps;
import com.hortonworks.smm.kafka.common.config.ConfigField;
import com.hortonworks.smm.kafka.common.utils.ThreadUtils;
import com.hortonworks.smm.kafka.notification.NotifierWithRateLimiter;
import com.hortonworks.smm.kafka.notification.api.AlertNotificationContext;
import com.hortonworks.smm.kafka.notification.api.Notifier;
import com.hortonworks.smm.kafka.notification.api.NotifierConfigTemplate;
import com.hortonworks.smm.kafka.notification.api.NotifierConfiguration;
import com.hortonworks.smm.kafka.notification.api.NotifierContext;
import com.hortonworks.smm.kafka.notification.api.NotifierProvider;
import com.hortonworks.smm.kafka.notification.api.NotifierProvidersConfig;
import com.hortonworks.smm.kafka.services.core.NotifierStorable;
import com.hortonworks.smm.storage.StorageManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hortonworks/smm/kafka/services/notification/DefaultNotificationService.class */
public class DefaultNotificationService implements NotificationService {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultNotificationService.class);
    private final NotifierProvidersConfig notifierProvidersConfig;
    private final StorageManager storageManager;
    private ExecutorService notifierTasksExecutor;
    private final ConcurrentMap<String, NotifierProvider> notifierProviders = new ConcurrentHashMap();
    private final ConcurrentMap<Long, Notifier> notifiers = new ConcurrentHashMap();
    private final Collection<NotifierConfiguration> notifierConfigurations = new ArrayList();

    public DefaultNotificationService(NotifierProvidersConfig notifierProvidersConfig, StorageManager storageManager) {
        this.notifierProvidersConfig = notifierProvidersConfig;
        this.storageManager = storageManager;
    }

    public void start() {
        this.notifierTasksExecutor = ThreadUtils.createFixedPoolExecutorService(this.notifierProvidersConfig.executorThreadPoolSize().intValue(), "notifier-task-%d", true);
        Iterator it = this.notifierProvidersConfig.providerClasses().iterator();
        while (it.hasNext()) {
            registerNotifierProvider((String) it.next());
        }
        Iterator it2 = this.storageManager.list(NotifierStorable.NAME_SPACE).iterator();
        while (it2.hasNext()) {
            loadNotifier(((NotifierStorable) it2.next()).toNotifierConfiguration());
        }
    }

    @Override // com.hortonworks.smm.kafka.services.notification.NotificationService
    public Map<String, List<ConfigField>> notifierConfigTemplateFields() {
        return Maps.transformEntries(this.notifierProviders, (str, notifierProvider) -> {
            List entries = notifierProvider.notifierConfigTemplate().entries();
            ArrayList arrayList = new ArrayList(entries.size());
            Iterator it = entries.iterator();
            while (it.hasNext()) {
                arrayList.add(((NotifierConfigTemplate.Entry) it.next()).configField());
            }
            return arrayList;
        });
    }

    @Override // com.hortonworks.smm.kafka.services.notification.NotificationService
    public void registerNotifierProvider(String str) {
        NotifierProvider loadAndInitNotifierProvider = loadAndInitNotifierProvider(str);
        String id = loadAndInitNotifierProvider.id();
        if (this.notifierProviders.putIfAbsent(id, loadAndInitNotifierProvider) != null) {
            throw new IllegalStateException("Notifier with id " + id + " is already registered.");
        }
    }

    @Override // com.hortonworks.smm.kafka.services.notification.NotificationService
    public NotifierProvidersConfig allNotifierProviders() {
        return this.notifierProvidersConfig;
    }

    @Override // com.hortonworks.smm.kafka.services.notification.NotificationService
    public Collection<NotifierConfiguration> allNotifiers() {
        return Collections.unmodifiableCollection(this.notifierConfigurations);
    }

    private Notifier loadNotifier(NotifierConfiguration notifierConfiguration) {
        String notifierProviderId = notifierConfiguration.notifierProviderId();
        NotifierProvider notifierProvider = this.notifierProviders.get(notifierProviderId);
        if (notifierProvider == null) {
            throw new IllegalArgumentException("NotifierProvider for the given provider id: " + notifierProviderId + " is not registered.");
        }
        notifierProvider.validateNotifier(notifierConfiguration);
        Notifier createNotifier = notifierProvider.createNotifier();
        createNotifier.init(new NotifierContext(notifierConfiguration, notifierProvider.notifierConfigTemplate()));
        if (notifierConfiguration.rateLimiterConfig() != null) {
            createNotifier = new NotifierWithRateLimiter(createNotifier, this.notifierTasksExecutor);
        }
        Long id = createNotifier.id();
        mayBeRemoveNotifier(id, false);
        this.notifiers.put(id, createNotifier);
        this.notifierConfigurations.add(notifierConfiguration);
        return createNotifier;
    }

    @Override // com.hortonworks.smm.kafka.services.notification.NotificationService
    public Notifier createNotifier(NotifierConfiguration notifierConfiguration) {
        if (notifierConfiguration.id() != null) {
            throw new IllegalArgumentException("Create operation is not allowed with notifierConfiguration containing id as non null");
        }
        NotifierStorable fromConfig = NotifierStorable.fromConfig(notifierConfiguration);
        fromConfig.setId(this.storageManager.nextId(NotifierStorable.NAME_SPACE));
        this.storageManager.add(fromConfig);
        return loadNotifier(NotifierConfiguration.with(fromConfig.getId(), notifierConfiguration));
    }

    @Override // com.hortonworks.smm.kafka.services.notification.NotificationService
    public Notifier getNotifier(Long l) {
        Objects.requireNonNull(l, "notifierId can not be null");
        return this.notifiers.get(l);
    }

    @Override // com.hortonworks.smm.kafka.services.notification.NotificationService
    public Notifier updateNotifier(NotifierConfiguration notifierConfiguration) {
        Objects.requireNonNull(notifierConfiguration.id(), "notifierId can not be null in the given notifierConfiguration");
        this.storageManager.addOrUpdate(NotifierStorable.fromConfig(notifierConfiguration));
        return loadNotifier(notifierConfiguration);
    }

    @Override // com.hortonworks.smm.kafka.services.notification.NotificationService
    public void deleteNotifier(Long l) {
        this.storageManager.remove(NotifierStorable.storableKey(l));
        mayBeRemoveNotifier(l, true);
    }

    private void mayBeRemoveNotifier(Long l, boolean z) {
        Objects.requireNonNull(l, "notifierId must not be null");
        Notifier remove = this.notifiers.remove(l);
        if (remove == null) {
            if (z) {
                throw new IllegalArgumentException("Notifier with id: " + l + " does not exist.");
            }
        } else {
            this.notifierConfigurations.remove(remove.config());
            try {
                remove.close();
            } catch (Exception e) {
                LOG.warn("Error occurred while closing notifier with id: " + l);
            }
        }
    }

    private NotifierProvider loadAndInitNotifierProvider(String str) {
        try {
            return (NotifierProvider) Thread.currentThread().getContextClassLoader().loadClass(str).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.hortonworks.smm.kafka.services.notification.NotificationService
    public void notify(AlertNotificationContext alertNotificationContext) {
        for (Long l : alertNotificationContext.notifierInstanceIds()) {
            Notifier notifier = this.notifiers.get(l);
            if (notifier == null) {
                throw new IllegalStateException("No notifier exists with id: " + l);
            }
            notifier.notify(alertNotificationContext);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.notifierTasksExecutor != null) {
            try {
                this.notifierTasksExecutor.shutdown();
                this.notifierTasksExecutor.awaitTermination(300L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
        }
        Iterator<Notifier> it = this.notifiers.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e2) {
                LOG.debug("Error occurred while closing notifier", e2);
            }
        }
    }
}
