package de.gwdg.cdstar.ext.redis;

import com.fasterxml.jackson.core.JsonProcessingException;
import de.gwdg.cdstar.SharedObjectMapper;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.event.ChangeEvent;
import de.gwdg.cdstar.runtime.Config;
import de.gwdg.cdstar.runtime.ConfigException;
import de.gwdg.cdstar.runtime.Plugin;
import de.gwdg.cdstar.runtime.RuntimeContext;
import de.gwdg.cdstar.runtime.filter.AbstractEventFilter;
import de.gwdg.cdstar.runtime.listener.RuntimeListener;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.pool2.impl.BaseObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

@Plugin
/* loaded from: input_file:de/gwdg/cdstar/ext/redis/RedisSink.class */
public class RedisSink implements RuntimeListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RedisSink.class);
    public static final String PROP_URI = "uri";
    public static final String PROP_KEY = "key";
    public static final String PROP_MODE = "mode";
    public static final String PROP_QSIZE = "qsize";
    private final Config conf;
    private final URI redisUri;
    private final byte[] listKey;
    private final JedisPool pool;
    private final Mode mode;
    private final BlockingQueue<ChangeEvent> eventQueue;
    private Thread sendThread;
    private volatile boolean pleaseStop;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: de.gwdg.cdstar.ext.redis.RedisSink$2, reason: invalid class name */
    /* loaded from: input_file:de/gwdg/cdstar/ext/redis/RedisSink$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$de$gwdg$cdstar$ext$redis$RedisSink$Mode = new int[Mode.values().length];

        static {
            try {
                $SwitchMap$de$gwdg$cdstar$ext$redis$RedisSink$Mode[Mode.RPUSH.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$de$gwdg$cdstar$ext$redis$RedisSink$Mode[Mode.LPUSH.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$de$gwdg$cdstar$ext$redis$RedisSink$Mode[Mode.PUBLISH.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:de/gwdg/cdstar/ext/redis/RedisSink$Mode.class */
    public enum Mode {
        RPUSH,
        LPUSH,
        PUBLISH
    }

    public RedisSink(Config config) throws Exception {
        this.conf = config;
        this.conf.setDefault(PROP_URI, "redis://localhost:6379/0");
        this.conf.setDefault(PROP_KEY, "cdstar.events");
        this.conf.setDefault(PROP_MODE, "RPUSH");
        this.conf.setDefault(PROP_QSIZE, "1024");
        this.redisUri = this.conf.getURI(PROP_URI);
        this.listKey = this.conf.get(PROP_KEY).getBytes(StandardCharsets.UTF_8);
        this.mode = Mode.valueOf(this.conf.get(PROP_MODE));
        int i = this.conf.getInt(PROP_QSIZE);
        i = i <= 0 ? Integer.MAX_VALUE : i;
        this.eventQueue = i <= 1024 ? new ArrayBlockingQueue<>(i) : new LinkedBlockingQueue<>(i);
        this.pool = new JedisPool(new JedisPoolConfig(), this.redisUri);
        this.sendThread = new Thread(this::sendLoop, "redis-sink-send");
    }

    public void onInit(RuntimeContext runtimeContext) throws ConfigException {
        runtimeContext.register(new AbstractEventFilter() { // from class: de.gwdg.cdstar.ext.redis.RedisSink.1
            public void triggerEvent(ChangeEvent changeEvent) throws Exception {
                RedisSink.this.enqueueEvent(changeEvent);
            }
        });
    }

    public void onStartup(RuntimeContext runtimeContext) throws Exception {
        this.sendThread.start();
    }

    public void onShutdown(RuntimeContext runtimeContext) {
        this.pleaseStop = true;
        for (int i = 0; i < 10; i++) {
            try {
                if (!this.sendThread.isAlive()) {
                    break;
                }
                log.debug("Waiting for thread {} to terminate.", this.sendThread, Integer.valueOf(this.eventQueue.size()));
                this.sendThread.join(TimeUnit.SECONDS.toMillis(1L));
            } catch (InterruptedException e) {
            } finally {
                Utils.closeQuietly(this.pool);
            }
        }
        if (this.sendThread.isAlive()) {
            log.warn("Thread {} did not terminate in time. Interrupting...", this.sendThread, Integer.valueOf(this.eventQueue.size()));
            this.sendThread.interrupt();
            this.sendThread.join(0L);
        }
        while (true) {
            ChangeEvent poll = this.eventQueue.poll();
            if (poll == null) {
                return;
            } else {
                log.warn("Event dropped (shutdown): vault={} archive={} revision={}", poll.getVault(), poll.getArchive(), poll.getRevision());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueueEvent(ChangeEvent changeEvent) {
        if (this.eventQueue.offer(changeEvent)) {
            return;
        }
        log.warn("Event dropped (queue overflow): vault={} archive={} revision={}", changeEvent.getVault(), changeEvent.getArchive(), changeEvent.getRevision());
    }

    private void sendLoop() {
        while (!Thread.interrupted()) {
            try {
                do {
                    ChangeEvent poll = this.eventQueue.poll(1L, TimeUnit.SECONDS);
                    if (poll != null) {
                        try {
                            sendOne(poll);
                        } catch (Exception e) {
                            log.warn("Event dropped (error): vault={} archive={} revision={}", poll.getVault(), poll.getArchive(), poll.getRevision(), e);
                        }
                    }
                } while (!this.pleaseStop);
                return;
            } catch (InterruptedException e2) {
                return;
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v2, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r2v4, types: [byte[], byte[][]] */
    private void sendOne(ChangeEvent changeEvent) throws JsonProcessingException {
        byte[] writeValueAsBytes = SharedObjectMapper.json_compact.writeValueAsBytes(changeEvent);
        Jedis resource = this.pool.getResource();
        try {
            switch (AnonymousClass2.$SwitchMap$de$gwdg$cdstar$ext$redis$RedisSink$Mode[this.mode.ordinal()]) {
                case 1:
                    resource.rpush(this.listKey, (byte[][]) new byte[]{writeValueAsBytes});
                    break;
                case 2:
                    resource.lpush(this.listKey, (byte[][]) new byte[]{writeValueAsBytes});
                    break;
                case BaseObjectPoolConfig.DEFAULT_NUM_TESTS_PER_EVICTION_RUN /* 3 */:
                    resource.publish(this.listKey, writeValueAsBytes);
                    break;
            }
            if (resource != 0) {
                resource.close();
            }
        } catch (Throwable th) {
            if (resource != 0) {
                try {
                    resource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
