/*
 * Decompiled with CFR 0.152.
 */
package org.logstash.execution;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.Ruby;
import org.jruby.runtime.ThreadContext;
import org.logstash.RubyUtil;
import org.logstash.config.ir.CompiledPipeline;
import org.logstash.config.ir.compiler.Dataset;
import org.logstash.execution.QueueBatch;
import org.logstash.execution.QueueReadClient;

public final class WorkerLoop
implements Runnable {
    public static final ThreadLocal<ThreadContext> THREAD_CONTEXT = ThreadLocal.withInitial(() -> ((Ruby)RubyUtil.RUBY).getCurrentContext());
    private static final Logger LOGGER = LogManager.getLogger(WorkerLoop.class);
    private final Dataset execution;
    private final QueueReadClient readClient;
    private final AtomicBoolean flushRequested;
    private final AtomicBoolean flushing;
    private final AtomicBoolean shutdownRequested;
    private final LongAdder consumedCounter;
    private final LongAdder filteredCounter;
    private final boolean drainQueue;

    public WorkerLoop(CompiledPipeline pipeline, QueueReadClient readClient, LongAdder filteredCounter, LongAdder consumedCounter, AtomicBoolean flushRequested, AtomicBoolean flushing, AtomicBoolean shutdownRequested, boolean drainQueue) {
        this.consumedCounter = consumedCounter;
        this.filteredCounter = filteredCounter;
        this.execution = pipeline.buildExecution();
        this.drainQueue = drainQueue;
        this.readClient = readClient;
        this.flushRequested = flushRequested;
        this.flushing = flushing;
        this.shutdownRequested = shutdownRequested;
    }

    @Override
    public void run() {
        try {
            QueueBatch batch;
            boolean isShutdown = false;
            do {
                isShutdown = isShutdown || this.shutdownRequested.get();
                batch = this.readClient.readBatch();
                this.consumedCounter.add(batch.filteredSize());
                boolean isFlush = this.flushRequested.compareAndSet(true, false);
                this.readClient.startMetrics(batch);
                this.execution.compute(batch.to_a(), isFlush, false);
                int filteredCount = batch.filteredSize();
                this.filteredCounter.add(filteredCount);
                this.readClient.addOutputMetrics(filteredCount);
                this.readClient.addFilteredMetrics(filteredCount);
                this.readClient.closeBatch(batch);
                if (!isFlush) continue;
                this.flushing.set(false);
            } while (!isShutdown || this.isDraining());
            batch = this.readClient.newBatch();
            this.readClient.startMetrics(batch);
            this.execution.compute(batch.to_a(), true, true);
            this.readClient.closeBatch(batch);
        }
        catch (Exception ex) {
            LOGGER.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.", (Throwable)ex);
            throw new IllegalStateException(ex);
        }
    }

    private boolean isDraining() {
        return this.drainQueue && !this.readClient.isEmpty();
    }
}

