package de.gwdg.cdstar.ext.activemq;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.event.ChangeEvent;
import de.gwdg.cdstar.runtime.Config;
import de.gwdg.cdstar.runtime.Plugin;
import de.gwdg.cdstar.runtime.RuntimeContext;
import de.gwdg.cdstar.runtime.filter.AbstractEventFilter;
import de.gwdg.cdstar.runtime.listener.RuntimeListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(name = {"amq-embedded"})
@Deprecated
/* loaded from: input_file:de/gwdg/cdstar/ext/activemq/ActiveMQBroker.class */
public class ActiveMQBroker implements RuntimeListener {
    static ObjectMapper mapper = new ObjectMapper();
    private static final String DEFAULT_TRANSPORT = "auto+nio://localhost:5671";
    private static final Logger log;
    private final BrokerService broker;
    private final Config cfg;
    private final int bufferSize;
    private ActiveMQConnectionFactory cf;
    private Connection conn;
    private BlockingQueue<ChangeEvent> sendQueue;
    private Thread sendThread;
    private volatile boolean closed = false;
    private final List<String> topics = new ArrayList();
    private final List<String> queues = new ArrayList();

    public ActiveMQBroker(Config config) throws Exception {
        this.cfg = config;
        this.cfg.setDefault("buffer", "0");
        this.cfg.setDefault("topic", "cdstar");
        this.broker = new BrokerService();
        this.broker.setBrokerName(this.cfg.get("name", "cdstar-embedded"));
        this.bufferSize = this.cfg.getInt("buffer");
        if (this.bufferSize > 0) {
            this.sendQueue = new ArrayBlockingQueue(this.bufferSize);
        } else if (this.bufferSize < 0) {
            this.sendQueue = new LinkedBlockingQueue();
        } else {
            this.sendQueue = new SynchronousQueue();
        }
        if (this.cfg.hasKey("topic")) {
            this.topics.addAll(this.cfg.getList("topic"));
        }
        if (this.cfg.hasKey("queue")) {
            this.queues.addAll(this.cfg.getList("queue"));
        }
    }

    private void addTransport(String str, String str2) throws Exception {
        TransportConnector addConnector = this.broker.addConnector(str2);
        if (!str.isEmpty()) {
            addConnector.setName(str);
        }
        log.info("Added transport: {}", addConnector);
    }

    public void onInit(RuntimeContext runtimeContext) throws Exception {
        this.broker.setDataDirectory(runtimeContext.getServiceDir("activemq").toString());
        if (this.cfg.with("transport").isEmpty()) {
            addTransport(MulticastDiscoveryAgent.DEFAULT_HOST_STR, DEFAULT_TRANSPORT);
        } else {
            for (Map.Entry entry : this.cfg.with("transport").entrySet()) {
                addTransport((String) entry.getKey(), (String) entry.getValue());
            }
        }
        this.broker.start();
        this.cf = new ActiveMQConnectionFactory("vm://" + this.broker.getBrokerName() + "?create=false&waitForStart=10000");
        this.sendThread = new Thread(this::sendLoop);
        this.sendThread.setName("amq-send-loop");
        this.sendThread.start();
        log.info("sendThread {}", this.sendThread);
        runtimeContext.register(new AbstractEventFilter() { // from class: de.gwdg.cdstar.ext.activemq.ActiveMQBroker.1
            public void triggerEvent(ChangeEvent changeEvent) throws Exception {
                ActiveMQBroker.this.sendEvent(changeEvent);
            }
        });
    }

    public void onShutdown(RuntimeContext runtimeContext) {
        this.closed = true;
        try {
            this.broker.stop();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public BrokerService getBroker() {
        return this.broker;
    }

    protected void sendEvent(ChangeEvent changeEvent) throws InterruptedException {
        log.debug("Send event: ", changeEvent);
        this.sendQueue.put(changeEvent);
    }

    static String toJson(ChangeEvent changeEvent) {
        try {
            return mapper.writeValueAsString(changeEvent);
        } catch (IOException e) {
            throw Utils.wtf(e);
        }
    }

    void sendLoop() {
        log.trace("sendLoop");
        ChangeEvent changeEvent = null;
        try {
            this.conn = this.cf.createConnection();
            Session createSession = this.conn.createSession(false, 2);
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = this.topics.iterator();
            while (it.hasNext()) {
                arrayList.add(createSession.createProducer(createSession.createTopic(it.next())));
            }
            Iterator<String> it2 = this.queues.iterator();
            while (it2.hasNext()) {
                arrayList.add(createSession.createProducer(createSession.createQueue(it2.next())));
            }
            log.trace("sendLoop {}", Boolean.valueOf(this.closed));
            while (!this.closed) {
                log.trace("loop");
                while (true) {
                    ChangeEvent poll = this.sendQueue.poll(1000L, TimeUnit.MILLISECONDS);
                    changeEvent = poll;
                    if (poll != null) {
                        TextMessage createTextMessage = createSession.createTextMessage(toJson(changeEvent));
                        createTextMessage.setStringProperty("vault", changeEvent.getVault());
                        createTextMessage.setJMSDeliveryMode(2);
                        Iterator it3 = arrayList.iterator();
                        while (it3.hasNext()) {
                            ((MessageProducer) it3.next()).send(createTextMessage);
                        }
                        log.debug("Transmitted event: {}", changeEvent);
                    }
                }
            }
        } catch (Exception e) {
            log.error("JSM error", (Throwable) e);
        }
        if (changeEvent != null) {
            log.warn("JSM send queue closed. Unable to deliver event: {}", toJson(changeEvent));
        }
        while (!this.sendQueue.isEmpty()) {
            log.warn("JSM send queue closed. Unable to deliver event: {}", toJson(this.sendQueue.poll()));
        }
    }

    static {
        mapper.disable(SerializationFeature.INDENT_OUTPUT);
        log = LoggerFactory.getLogger((Class<?>) ActiveMQBroker.class);
    }
}
