package de.gwdg.cdstar.ext.rabbitmq;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.event.ChangeEvent;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/gwdg/cdstar/ext/rabbitmq/RabbitMQConnector.class */
class RabbitMQConnector implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RabbitMQConnector.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private Connection connection;
    private Channel channel;
    private final String exchangeName;
    private BlockingQueue<ChangeEvent> sendQueue;
    private Thread sendThread;
    private final String exchangeType;
    private final ConnectionFactory factory;
    private volatile boolean closed = false;
    SortedMap<Long, ChangeEvent> inFlight = new TreeMap();
    private int sendQueueSize = 1024;
    private final Duration reconnectCooldown = Duration.ofSeconds(10);

    public RabbitMQConnector(URI uri, String str, String str2) {
        this.exchangeName = str;
        this.exchangeType = str2;
        setSendQueueSize(this.sendQueueSize);
        this.factory = new ConnectionFactory();
        try {
            this.factory.setUri(uri);
        } catch (URISyntaxException | KeyManagementException | NoSuchAlgorithmException e) {
            throw new IllegalArgumentException("Invalid broker URL: " + uri, e);
        }
    }

    public synchronized void setSendQueueSize(int i) {
        this.sendQueueSize = i;
        BlockingQueue<ChangeEvent> blockingQueue = this.sendQueue;
        this.sendQueue = i > 0 ? new ArrayBlockingQueue<>(i) : new LinkedBlockingQueue<>();
        if (blockingQueue != null) {
            blockingQueue.drainTo(this.sendQueue);
        }
    }

    public synchronized void start() throws IOException, TimeoutException {
        if (this.sendThread != null) {
            throw new IllegalStateException("Already started");
        }
        connect();
        this.sendThread = new Thread(null, this::sendLoop, "rabbitmq-send");
        this.sendThread.start();
    }

    private synchronized void connect() throws IOException, TimeoutException {
        if (this.connection != null && this.connection.isOpen()) {
            Utils.closeQuietly(this.connection);
            synchronized (this.inFlight) {
                this.inFlight.values().removeIf(changeEvent -> {
                    enqueueOrDrop(changeEvent, "reconnect");
                    return true;
                });
            }
        }
        this.connection = this.factory.newConnection();
        this.channel = this.connection.createChannel();
        try {
            if (this.exchangeType == null) {
                this.channel.exchangeDeclarePassive(this.exchangeName);
            } else {
                this.channel.exchangeDeclare(this.exchangeName, this.exchangeType, true);
            }
            this.channel.confirmSelect();
            this.channel.addConfirmListener(this::handleAck, this::handleNack);
            this.connection.addBlockedListener(this::handleBlocked, this::handleUnblocked);
        } catch (IOException e) {
            log.error(this.exchangeType == null ? "Exchange does not exist: {}" : "Failed to declare exchange: {}", (Throwable) e);
            throw e;
        }
    }

    private void sendLoop() {
        while (true) {
            try {
                ChangeEvent poll = this.sendQueue.poll(1000L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    sendOne(poll);
                } else if (this.closed) {
                    return;
                }
            } catch (InterruptedException e) {
                close();
                return;
            }
        }
    }

    private void sendOne(ChangeEvent changeEvent) {
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties();
        byte[] json = toJson(changeEvent);
        String str = changeEvent.getVault() + ":" + changeEvent.getArchive();
        while (true) {
            try {
                long nextPublishSeqNo = this.channel.getNextPublishSeqNo();
                this.channel.basicPublish(this.exchangeName, str, basicProperties, json);
                synchronized (this.inFlight) {
                    this.inFlight.put(Long.valueOf(nextPublishSeqNo), changeEvent);
                }
                return;
            } catch (IOException e) {
                if (this.closed) {
                    drop(changeEvent, e);
                    return;
                } else {
                    log.warn("Send failed (will reconnect)", (Throwable) e);
                    reconnect();
                }
            }
        }
    }

    private void reconnect() {
        do {
            try {
                connect();
            } catch (Exception e) {
                log.debug("Reconnect failed", (Throwable) e);
            }
            if (this.closed) {
                return;
            }
        } while (Utils.sleepInterruptable(this.reconnectCooldown.toMillis()));
    }

    public boolean enqueueOrDrop(ChangeEvent changeEvent, String str) {
        if (!this.closed && this.sendQueue.offer(changeEvent)) {
            return true;
        }
        drop(changeEvent, new RuntimeException("Send buffer overflow (" + str + ")"));
        return false;
    }

    public boolean enqueueOrDrop(ChangeEvent changeEvent) {
        return enqueueOrDrop(changeEvent, "submit");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try {
            this.sendThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        Utils.closeQuietly(this.channel);
        Utils.closeQuietly(this.connection);
        this.inFlight.values().forEach(changeEvent -> {
            drop(changeEvent, new RuntimeException("shutdown"));
        });
        this.inFlight.clear();
    }

    void handleBlocked(String str) throws IOException {
        log.warn("Connection blocked");
    }

    void handleUnblocked() throws IOException {
        log.info("Connection unblocked");
    }

    void handleNack(long j, boolean z) throws IOException {
        synchronized (this.inFlight) {
            if (z) {
                SortedMap<Long, ChangeEvent> headMap = this.inFlight.headMap(Long.valueOf(j + 1));
                headMap.values().forEach(changeEvent -> {
                    drop(changeEvent, new IOException("NACK"));
                });
                headMap.clear();
            } else {
                ChangeEvent remove = this.inFlight.remove(Long.valueOf(j));
                if (remove != null) {
                    drop(remove, new IOException("NACK"));
                }
            }
        }
    }

    void handleAck(long j, boolean z) throws IOException {
        synchronized (this.inFlight) {
            if (z) {
                this.inFlight.headMap(Long.valueOf(j + 1)).clear();
            } else {
                this.inFlight.remove(Long.valueOf(j));
            }
        }
    }

    static byte[] toJson(ChangeEvent changeEvent) {
        try {
            return mapper.writeValueAsBytes(changeEvent);
        } catch (IOException e) {
            throw Utils.wtf(e);
        }
    }

    private void drop(ChangeEvent changeEvent, Exception exc) {
        log.warn("Event dropped: {}", new String(toJson(changeEvent), StandardCharsets.UTF_8), exc);
    }

    static {
        mapper.disable(SerializationFeature.INDENT_OUTPUT);
    }
}
