package de.gwdg.cdstar.ext.elastic;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.client.CDStarRestClient;
import de.gwdg.cdstar.event.ChangeEvent;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import journal.io.api.ClosedJournalException;
import journal.io.api.CompactedDataFileException;
import journal.io.api.Journal;
import journal.io.api.JournalBuilder;
import journal.io.api.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/gwdg/cdstar/ext/elastic/ElasticIngest.class */
public class ElasticIngest {
    private static ObjectMapper om = new ObjectMapper();
    public static final Logger log = LoggerFactory.getLogger((Class<?>) ElasticIngest.class);
    private static final String MASK_PLACEHOLDER = "{}";
    private final ElasticSearchClient es;
    private final CDStarRestClient cdstar;

    /* renamed from: journal, reason: collision with root package name */
    private final Journal f12journal;
    private boolean stopped;
    private Thread batchThread;
    private boolean started;
    private String indexMask;
    private final Map<String, ElasticMapping> mappings = new ConcurrentHashMap();
    ExecutorService pool = new ThreadPoolExecutor(0, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(1024));
    Map<String, IngestTask> nextBatch = new HashMap();

    /* loaded from: input_file:de/gwdg/cdstar/ext/elastic/ElasticIngest$IngestTask.class */
    public class IngestTask {
        final ChangeEvent event;
        final Promise<IngestTask> promise = Promise.empty();
        long startTime = System.currentTimeMillis();

        IngestTask(ChangeEvent changeEvent) {
            this.event = changeEvent;
        }

        public String getKey() {
            return this.event.getVault() + ":" + this.event.getArchive();
        }

        public String toString() {
            return getKey();
        }
    }

    public ElasticIngest(Path path, CDStarRestClient cDStarRestClient, ElasticSearchClient elasticSearchClient) throws IOException {
        this.cdstar = cDStarRestClient.m2081clone();
        this.es = elasticSearchClient;
        Files.createDirectories(path, new FileAttribute[0]);
        this.f12journal = JournalBuilder.of(path.toFile()).open();
        setIndexMask(MASK_PLACEHOLDER);
    }

    public void setIndexMask(String str) {
        int indexOf = str.indexOf(MASK_PLACEHOLDER);
        if (indexOf == -1 || str.indexOf(MASK_PLACEHOLDER, indexOf + 1) != -1) {
            throw new IllegalArgumentException("Mask must contain '{}' exactly once");
        }
        if (str.contains(",")) {
            throw new IllegalArgumentException("Mask must not contain ','");
        }
        this.indexMask = str;
    }

    private String indexNameFromVault(String str) {
        return this.indexMask.replace(MASK_PLACEHOLDER, str);
    }

    public void start() throws ClosedJournalException, CompactedDataFileException, IOException, Exception {
        log.debug("Staring task handling thread...");
        this.batchThread = new Thread(this::ingestLoop, "ElasticIngest");
        this.batchThread.start();
        log.info("Reading unfinished tasks from journal...");
        for (Location location : this.f12journal.undo()) {
            submitInternal(decodeEvent(this.f12journal.read(location, Journal.ReadType.SYNC)), true);
            this.f12journal.delete(location);
            this.f12journal.sync();
        }
        this.started = true;
    }

    public void stop() {
        if (!this.started) {
            throw new IllegalStateException("Ingest not started.");
        }
        this.stopped = true;
        try {
            this.batchThread.join();
        } catch (InterruptedException e) {
            log.warn("Shutdown interrupted", (Throwable) e);
        } finally {
            ExecutorService executorService = this.pool;
            Objects.requireNonNull(executorService);
            Utils.closeQuietly(executorService::shutdown);
            Journal journal2 = this.f12journal;
            Objects.requireNonNull(journal2);
            Utils.closeQuietly(journal2::close);
        }
    }

    public Promise<IngestTask> submit(ChangeEvent changeEvent) throws IOException {
        if (!this.started) {
            throw new IllegalStateException("Ingest not ready. Call start() first.");
        }
        if (this.stopped) {
            throw new IllegalStateException("Ingest stopped.");
        }
        return submitInternal(changeEvent, true);
    }

    public Promise<IngestTask> forceUpdate(String str, String str2) throws IOException {
        if (!this.started) {
            throw new IllegalStateException("Ingest not ready. Call start() first.");
        }
        if (this.stopped) {
            throw new IllegalStateException("Ingest stopped.");
        }
        ChangeEvent changeEvent = new ChangeEvent();
        changeEvent.setVault(str);
        changeEvent.setArchive(str2);
        changeEvent.setTs(System.currentTimeMillis());
        return submitInternal(changeEvent, false);
    }

    private Promise<IngestTask> submitInternal(ChangeEvent changeEvent, boolean z) throws IOException {
        Location write = z ? this.f12journal.write(encodeEvent(changeEvent), Journal.WriteType.SYNC) : null;
        IngestTask ingestTask = new IngestTask(changeEvent);
        log.debug("[{}] Task accepted", ingestTask.getKey());
        addTask(ingestTask);
        ingestTask.promise.then((ingestTask2, th) -> {
            try {
                if (th == null) {
                    log.debug("[{}] Task finished.", ingestTask, th);
                    if (write != null) {
                        this.f12journal.delete(write);
                    }
                } else if (th instanceof CancellationException) {
                    log.info("[{}] Task canceled.", ingestTask, th);
                    if (write != null) {
                        this.f12journal.delete(write);
                    }
                } else {
                    log.error("[{}] Task failed.", ingestTask, th);
                }
            } catch (IOException e) {
                log.warn("[{}] Failed to remove task from journal.", ingestTask, e);
            }
        });
        return ingestTask.promise;
    }

    private void addTask(IngestTask ingestTask) {
        synchronized (this.nextBatch) {
            this.nextBatch.merge(ingestTask.getKey(), ingestTask, (ingestTask2, ingestTask3) -> {
                if (ingestTask2.event.getTs() > ingestTask3.event.getTs()) {
                    log.debug("[{}] Replacing old task for same item in queue.", ingestTask.getKey());
                    ingestTask3.promise.cancel();
                    return ingestTask2;
                }
                log.debug("[{}] Skipping. Newer task for same item already queued.", ingestTask.getKey());
                ingestTask2.promise.cancel();
                return ingestTask3;
            });
            this.nextBatch.notifyAll();
        }
    }

    private void ingestLoop() {
        while (!this.stopped) {
            try {
                try {
                    try {
                        synchronized (this.nextBatch) {
                            if (this.nextBatch.isEmpty()) {
                                this.nextBatch.wait(1000L);
                            } else {
                                ArrayList arrayList = new ArrayList(this.nextBatch.values());
                                this.nextBatch.clear();
                                runSingleBatch(arrayList);
                            }
                        }
                    } catch (Exception e) {
                        log.error("Ingest loop stopped with Exception", (Throwable) e);
                        this.stopped = true;
                        log.debug("Ingest thread stopped");
                        return;
                    }
                } catch (InterruptedException e2) {
                    log.info("Ingest loop interrupted");
                    this.stopped = true;
                    log.debug("Ingest thread stopped");
                    return;
                }
            } catch (Throwable th) {
                this.stopped = true;
                log.debug("Ingest thread stopped");
                throw th;
            }
        }
        this.stopped = true;
        log.debug("Ingest thread stopped");
    }

    private void runSingleBatch(List<IngestTask> list) throws InterruptedException {
        log.debug("Starting batch with {} tasks", Integer.valueOf(list.size()));
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (IngestTask ingestTask : list) {
            this.pool.submit(() -> {
                runSingleTask(ingestTask);
            });
            ingestTask.promise.then((ingestTask2, th) -> {
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
    }

    private void runSingleTask(IngestTask ingestTask) {
        ElasticIngestJob elasticIngestJob = null;
        try {
            try {
                ensureIndex(ingestTask.event.getVault());
                elasticIngestJob = new ElasticIngestJob(this.cdstar, this.es, indexNameFromVault(ingestTask.event.getVault()), ingestTask.event);
                elasticIngestJob.sync();
                ingestTask.promise.tryResolve(ingestTask);
                Utils.closeQuietly(elasticIngestJob);
            } catch (Exception e) {
                ingestTask.promise.tryReject(e);
                Utils.closeQuietly(elasticIngestJob);
            }
        } catch (Throwable th) {
            Utils.closeQuietly(elasticIngestJob);
            throw th;
        }
    }

    private void ensureIndex(String str) throws IOException {
        this.mappings.computeIfAbsent(indexNameFromVault(str), str2 -> {
            return new ElasticMapping(this.es, str2);
        }).ensureIndex();
    }

    public void dropIndex(String str) throws IOException {
        this.mappings.computeIfAbsent(indexNameFromVault(str), str2 -> {
            return new ElasticMapping(this.es, str2);
        }).dropIndexIfExist();
    }

    private byte[] encodeEvent(ChangeEvent changeEvent) throws JsonProcessingException {
        return om.writeValueAsBytes(changeEvent);
    }

    private ChangeEvent decodeEvent(byte[] bArr) throws IOException {
        return (ChangeEvent) om.readValue(bArr, ChangeEvent.class);
    }
}
