package de.gwdg.cdstar.ext.elastic.cli;

import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.client.CDStarClientConfig;
import de.gwdg.cdstar.client.CDStarRestClient;
import de.gwdg.cdstar.ext.elastic.ActiveMQListener;
import de.gwdg.cdstar.ext.elastic.ElasticIngest;
import de.gwdg.cdstar.ext.elastic.ElasticSearchClient;
import de.gwdg.cdstar.runtime.Config;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import picocli.CommandLine;

@CommandLine.Command(name = "listen")
/* loaded from: input_file:de/gwdg/cdstar/ext/elastic/cli/ListenCommand.class */
public class ListenCommand extends BaseCommand {
    @Override // de.gwdg.cdstar.ext.elastic.cli.BaseCommand
    public void run(Config config) throws Exception {
        Path path = Paths.get(config.get("journal"), new String[0]);
        ActiveMQListener activeMQListener = new ActiveMQListener(config.get("broker"), config.get("topic"));
        ElasticSearchClient elasticSearchClient = new ElasticSearchClient(config.getURI("elastic"));
        CDStarRestClient cDStarRestClient = new CDStarRestClient(new CDStarClientConfig(config.getURI("cdstar")));
        ElasticIngest elasticIngest = new ElasticIngest(path, cDStarRestClient, elasticSearchClient);
        elasticIngest.setIndexMask(config.get("index"));
        try {
            elasticIngest.start();
            while (true) {
                Objects.requireNonNull(elasticIngest);
                activeMQListener.handleOne(elasticIngest::submit);
            }
        } catch (Throwable th) {
            Objects.requireNonNull(activeMQListener);
            Utils.closeQuietly(activeMQListener::close);
            Objects.requireNonNull(elasticIngest);
            Utils.closeQuietly(elasticIngest::stop);
            Utils.closeQuietly(cDStarRestClient);
            Utils.closeQuietly(elasticSearchClient);
            throw th;
        }
    }
}
