package de.gwdg.cdstar.ext.elastic;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.gwdg.cdstar.FailableConsumer;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.event.ChangeEvent;
import java.io.IOException;
import java.util.Objects;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/gwdg/cdstar/ext/elastic/ActiveMQListener.class */
public class ActiveMQListener {
    public static final String CLIENT_IDENTITY = "esingest";
    public static final String CLIENT_NAME = "esingest";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ActiveMQListener.class);
    public static ObjectMapper om = new ObjectMapper();
    private Connection connection;
    private final String topicName;
    private final String target;
    private final ActiveMQConnectionFactory connectionFactory;
    private boolean closed;
    private TopicSubscriber consumer;

    public ActiveMQListener(String str, String str2) {
        this.target = str;
        this.topicName = str2;
        if (!str.startsWith("failover:")) {
            log.warn("Broker url does not use the 'failover' meta-transport. The process will NOT reconnect automatically if the connection is lost.");
        }
        this.connectionFactory = new ActiveMQConnectionFactory(str);
    }

    public void unsubscribe() throws IOException {
        try {
            connect().createSession(false, 1).unsubscribe("esingest");
            log.info("Succesfully unsubscribed from {} (client: {}:{})", this.target, "esingest", "esingest");
        } catch (JMSException e) {
            close();
            throw new IOException(e);
        }
    }

    private synchronized Connection connect() throws JMSException {
        if (this.connection != null) {
            return this.connection;
        }
        this.connection = this.connectionFactory.createConnection();
        this.connection.setClientID("esingest");
        this.connection.start();
        return this.connection;
    }

    public void handleOne(FailableConsumer<ChangeEvent> failableConsumer) throws Exception {
        if (this.consumer == null) {
            try {
                Session createSession = connect().createSession(false, 2);
                this.consumer = createSession.createDurableSubscriber(createSession.createTopic(this.topicName), "esingest");
                log.info("Succesfully subscribed to {} (topic: {}, client: {}:{})", this.target, this.topicName, "esingest", "esingest");
            } catch (JMSException e) {
                throw new IOException("Failed to subscribe to topic", e);
            }
        }
        while (!this.closed) {
            Message receive = this.consumer.receive(1000L);
            if (receive != null) {
                if (receive instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) receive;
                    String text = textMessage.getText();
                    log.debug("Message recieved: {}", receive);
                    failableConsumer.accept((ChangeEvent) om.readValue(text, ChangeEvent.class));
                    textMessage.acknowledge();
                } else {
                    log.warn("Ignored non-text message: {}", receive);
                }
            }
        }
    }

    public void close() {
        this.closed = true;
        if (this.connection != null) {
            Connection connection = this.connection;
            Objects.requireNonNull(connection);
            Utils.closeQuietly(connection::close);
        }
    }
}
