package de.gwdg.cdstar.ext.activemq;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.event.ChangeEvent;
import de.gwdg.cdstar.runtime.Config;
import de.gwdg.cdstar.runtime.ConfigException;
import de.gwdg.cdstar.runtime.RuntimeContext;
import de.gwdg.cdstar.runtime.filter.AbstractEventFilter;
import de.gwdg.cdstar.runtime.listener.RuntimeListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/gwdg/cdstar/ext/activemq/JmsEventFilterFactory.class */
public class JmsEventFilterFactory implements RuntimeListener {
    static final ObjectMapper mapper = new ObjectMapper();
    private static final Logger log;
    public static final String PROP_BROKER = "broker";
    public static final String PROP_TOPIC = "topic";
    public static final String PROP_QUEUE = "queue";
    public static final String PROP_QSIZE = "qsize";
    private Thread sendThread;
    private BlockingQueue<ChangeEvent> sendQueue;
    private ConnectionFactory factory;
    private final Config conf;
    private Connection conn;
    private int sendQueueSize;
    private String broker;
    Set<String> topics = new HashSet();
    Set<String> queues = new HashSet();
    private volatile boolean closed = false;

    public JmsEventFilterFactory(Config config) throws Exception {
        this.conf = config;
    }

    @Override // de.gwdg.cdstar.runtime.listener.RuntimeListener
    public void onInit(RuntimeContext runtimeContext) throws JMSException, ConfigException {
        this.broker = this.conf.get(PROP_BROKER);
        this.factory = new ActiveMQConnectionFactory(this.broker);
        this.conf.setDefault(PROP_QSIZE, "-1");
        this.sendQueueSize = this.conf.getInt(PROP_QSIZE);
        if (this.sendQueueSize > 0) {
            this.sendQueue = new ArrayBlockingQueue(this.sendQueueSize);
        } else {
            this.sendQueue = new LinkedBlockingQueue();
        }
        if (this.conf.hasKey(PROP_TOPIC)) {
            List<String> list = this.conf.getList(PROP_TOPIC);
            Set<String> set = this.topics;
            Objects.requireNonNull(set);
            list.forEach((v1) -> {
                r1.add(v1);
            });
        }
        if (this.conf.hasKey(PROP_QUEUE)) {
            List<String> list2 = this.conf.getList(PROP_QUEUE);
            Set<String> set2 = this.queues;
            Objects.requireNonNull(set2);
            list2.forEach((v1) -> {
                r1.add(v1);
            });
        }
        if (this.queues.isEmpty() && this.topics.isEmpty()) {
            log.warn("No topics or queues configured. Events will not be sent.");
        } else {
            this.sendThread = new Thread(this::sendLoop);
            runtimeContext.register(new AbstractEventFilter() { // from class: de.gwdg.cdstar.ext.activemq.JmsEventFilterFactory.1
                @Override // de.gwdg.cdstar.runtime.filter.AbstractEventFilter
                public void triggerEvent(ChangeEvent changeEvent) throws Exception {
                    JmsEventFilterFactory.this.triggerEvent(changeEvent);
                }
            });
        }
    }

    @Override // de.gwdg.cdstar.runtime.listener.RuntimeListener
    public void onStartup(RuntimeContext runtimeContext) throws Exception {
        this.sendThread.start();
    }

    @Override // de.gwdg.cdstar.runtime.listener.RuntimeListener
    public void onShutdown(RuntimeContext runtimeContext) {
        try {
            this.closed = true;
            this.sendThread.join(1000L);
            Connection connection = this.conn;
            Objects.requireNonNull(connection);
            Utils.closeQuietly(connection::close);
        } catch (InterruptedException e) {
            this.sendThread.interrupt();
        }
    }

    static String toJson(ChangeEvent changeEvent) {
        try {
            return mapper.writeValueAsString(changeEvent);
        } catch (IOException e) {
            throw Utils.wtf(e);
        }
    }

    public void triggerEvent(ChangeEvent changeEvent) {
        if (this.closed) {
            log.warn("JMS send queue closed. Unable to deliver event: {}", toJson(changeEvent));
        } else if (!this.sendQueue.offer(changeEvent)) {
            log.warn("JMS send queue overflow. Unable to deliver event: {}", toJson(changeEvent));
        } else if (log.isDebugEnabled()) {
            log.debug("Event queued: {}/{} (qsize={}/{})", changeEvent.getVault(), changeEvent.getArchive(), Integer.valueOf(this.sendQueue.size()), Integer.valueOf(this.sendQueueSize));
        }
    }

    void sendLoop() {
        ChangeEvent changeEvent = null;
        try {
            log.debug("Connecting to {}", this.broker);
            this.conn = this.factory.createConnection();
            this.conn.start();
            log.debug("Connected: {}", this.conn);
            Session createSession = this.conn.createSession(false, 1);
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = this.topics.iterator();
            while (it.hasNext()) {
                arrayList.add(createSession.createProducer(createSession.createTopic(it.next())));
            }
            Iterator<String> it2 = this.queues.iterator();
            while (it2.hasNext()) {
                arrayList.add(createSession.createProducer(createSession.createQueue(it2.next())));
            }
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                ((MessageProducer) it3.next()).setDeliveryMode(2);
            }
            while (!this.closed) {
                while (true) {
                    ChangeEvent poll = this.sendQueue.poll(1000L, TimeUnit.MILLISECONDS);
                    changeEvent = poll;
                    if (poll != null) {
                        if (log.isTraceEnabled()) {
                            log.trace("Event about to be sent: {}/{}", changeEvent.getVault(), changeEvent.getArchive());
                        }
                        TextMessage createTextMessage = createSession.createTextMessage(toJson(changeEvent));
                        createTextMessage.setStringProperty("vault", changeEvent.getVault());
                        Iterator it4 = arrayList.iterator();
                        while (it4.hasNext()) {
                            ((MessageProducer) it4.next()).send(createTextMessage);
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("Event sent: {}/{}", changeEvent.getVault(), changeEvent.getArchive());
                        }
                    }
                }
            }
        } catch (Exception e) {
            if (changeEvent != null) {
                log.warn("JSM error while delivering event: {}", toJson(changeEvent), e);
            }
        }
        while (!this.sendQueue.isEmpty()) {
            log.warn("JSM send queue closed. Unable to deliver event: {}", toJson(this.sendQueue.poll()));
        }
    }

    Connection createConnection() throws JMSException {
        return this.factory.createConnection();
    }

    static {
        mapper.disable(SerializationFeature.INDENT_OUTPUT);
        log = LoggerFactory.getLogger((Class<?>) JmsEventFilterFactory.class);
    }
}
