package de.gwdg.cdstar.plugins.push;

import com.fasterxml.jackson.core.JsonProcessingException;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.event.ChangeEvent;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.SplittableRandom;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ByteArrayEntity;

/* loaded from: input_file:de/gwdg/cdstar/plugins/push/PushConsumer.class */
public class PushConsumer implements Runnable {
    final PushEventFilter parent;
    final String target;
    private final ScheduledExecutorService pool;
    private ChangeEvent currentEvent;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final SplittableRandom random = new SplittableRandom();
    private int errors = 0;
    private int dropped = 0;
    private final Deque<ChangeEvent> queue = new ArrayDeque();

    public PushConsumer(PushEventFilter pushEventFilter, String str) {
        this.parent = pushEventFilter;
        this.target = str;
        this.pool = pushEventFilter.pool;
    }

    public synchronized void add(ChangeEvent changeEvent) {
        if (this.parent.isStopped()) {
            this.parent.logFailed(this, changeEvent);
            return;
        }
        boolean isEmpty = this.queue.isEmpty();
        if (this.queue.size() > this.parent.maxQueue) {
            this.dropped++;
            this.parent.logFailed(this, changeEvent);
        }
        this.queue.add(changeEvent);
        if (isEmpty) {
            this.pool.execute(this);
        }
    }

    @Override // java.lang.Runnable
    public synchronized void run() {
        if (this.parent.isStopped()) {
            return;
        }
        this.currentEvent = this.queue.peekFirst();
        if (this.currentEvent == null) {
            return;
        }
        try {
            byte[] writeValueAsBytes = PushEventFilter.om.writeValueAsBytes(this.currentEvent);
            HttpPost httpPost = new HttpPost(this.target);
            this.parent.additionalHeaders.forEach((str, str2) -> {
                httpPost.setHeader(str, str2);
            });
            httpPost.setHeader("Content-Type", "application/json; charset=UTF-8");
            if (this.errors > 0) {
                httpPost.setHeader("Push-Retry", Integer.toString(this.errors));
            }
            httpPost.setHeader("Push-Queue", Utils.join(" ", new String[]{Integer.toString(this.queue.size() - 1), Integer.toString(this.parent.maxQueue), Integer.toString(this.dropped)}));
            httpPost.setEntity(new ByteArrayEntity(writeValueAsBytes));
            this.parent.asyncHttpClient.execute(httpPost, new FutureCallback<HttpResponse>() { // from class: de.gwdg.cdstar.plugins.push.PushConsumer.1
                public void completed(HttpResponse httpResponse) {
                    int statusCode = httpResponse.getStatusLine().getStatusCode();
                    if (statusCode == 200 || statusCode == 202 || statusCode == 204) {
                        PushConsumer.this.onSuccess(PushConsumer.this.currentEvent);
                        return;
                    }
                    if (statusCode != 503) {
                        PushConsumer.this.onError(new RuntimeException("Wrong status code: " + statusCode));
                        return;
                    }
                    int i = PushConsumer.this.parent.defaultCooldown;
                    if (httpResponse.getFirstHeader("Retry-After") != null) {
                        try {
                            i = 1000 * Math.max(1, Integer.parseInt(httpResponse.getFirstHeader("Retry-After").getValue()));
                        } catch (NumberFormatException e) {
                            PushEventFilter.log.warn("Push consumer returned invalid Retry-After header: {}", PushConsumer.this.target);
                        }
                    }
                    PushConsumer.this.onCooldownRequested(i);
                }

                public void failed(Exception exc) {
                    PushConsumer.this.onError(exc);
                }

                public void cancelled() {
                    PushConsumer.this.onError(new CancellationException());
                }
            });
        } catch (JsonProcessingException e) {
            throw Utils.wtf(e);
        }
    }

    synchronized void onSuccess(ChangeEvent changeEvent) {
        if (this.parent.isStopped()) {
            return;
        }
        if (changeEvent != this.queue.poll()) {
            Utils.wtf();
        }
        PushEventFilter.log.debug("Event [{}] delivered to {}", changeEvent.getTx(), this.target);
        if (this.errors > this.parent.maxRetry) {
            PushEventFilter.log.info("Consumer {} responsive again.", this.target);
        }
        this.errors = 0;
        if (this.queue.isEmpty()) {
            return;
        }
        this.pool.execute(this);
    }

    synchronized void onCooldownRequested(int i) {
        if (this.parent.isStopped()) {
            return;
        }
        this.errors = 0;
        this.pool.schedule(this, i, TimeUnit.MILLISECONDS);
    }

    synchronized void onError(Throwable th) {
        if (this.parent.isStopped()) {
            return;
        }
        this.errors++;
        int i = this.parent.defaultDelay;
        if (this.errors > this.parent.maxRetry) {
            if ((this.errors - 1) % this.parent.maxRetry == 0) {
                PushEventFilter.log.debug("Consumer {} unresponsive. Last error: {}", this.target, th.toString());
            }
            i = this.parent.defaultCooldown;
        }
        this.pool.schedule(this, i + this.random.nextInt(i / 10), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void stop() {
        if (!$assertionsDisabled && !this.parent.isStopped()) {
            throw new AssertionError();
        }
        while (!this.queue.isEmpty()) {
            this.parent.logFailed(this, this.queue.pop());
        }
    }

    static {
        $assertionsDisabled = !PushConsumer.class.desiredAssertionStatus();
    }
}
