/*
 * Decompiled with CFR 0.152.
 */
package org.logstash.config.ir.compiler;

import co.elastic.logstash.api.Input;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaObject;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.config.ir.compiler.CommonActions;
import org.logstash.execution.JavaBasePipelineExt;
import org.logstash.execution.queue.QueueWriter;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;

@JRubyClass(name={"JavaInputDelegator"})
public class JavaInputDelegatorExt
extends RubyObject {
    private static final long serialVersionUID = 1L;
    private AbstractNamespacedMetricExt metric;
    private JavaBasePipelineExt pipeline;
    private Input input;
    private DecoratingQueueWriter decoratingQueueWriter;

    public JavaInputDelegatorExt(Ruby runtime, RubyClass metaClass) {
        super(runtime, metaClass);
    }

    public static JavaInputDelegatorExt create(JavaBasePipelineExt pipeline, AbstractNamespacedMetricExt metric, Input input, Map<String, Object> pluginArgs) {
        JavaInputDelegatorExt instance = new JavaInputDelegatorExt(RubyUtil.RUBY, RubyUtil.JAVA_INPUT_DELEGATOR_CLASS);
        AbstractNamespacedMetricExt scopedMetric = metric.namespace(RubyUtil.RUBY.getCurrentContext(), (IRubyObject)RubyUtil.RUBY.newSymbol(input.getId()));
        scopedMetric.gauge(RubyUtil.RUBY.getCurrentContext(), (IRubyObject)MetricKeys.NAME_KEY, (IRubyObject)RubyUtil.RUBY.newString(input.getName()));
        instance.setMetric(RubyUtil.RUBY.getCurrentContext(), (IRubyObject)scopedMetric);
        instance.input = input;
        instance.pipeline = pipeline;
        instance.initializeQueueWriter(pluginArgs);
        return instance;
    }

    @JRubyMethod(name={"start"})
    public IRubyObject start(ThreadContext context) {
        QueueWriter queueWriter;
        QueueWriter qw = this.pipeline.getQueueWriter(this.input.getId());
        if (this.decoratingQueueWriter != null) {
            this.decoratingQueueWriter.setInnerQueueWriter(qw);
            queueWriter = this.decoratingQueueWriter;
        } else {
            queueWriter = qw;
        }
        Thread t = new Thread(() -> this.input.start(queueWriter::push));
        t.setName(this.pipeline.pipelineId().asJavaString() + "_" + this.input.getName() + "_" + this.input.getId());
        t.start();
        return JavaObject.wrap((Ruby)context.getRuntime(), (Object)t);
    }

    @JRubyMethod(name={"metric="})
    public IRubyObject setMetric(ThreadContext context, IRubyObject metric) {
        this.metric = (AbstractNamespacedMetricExt)metric;
        return this;
    }

    @JRubyMethod(name={"metric"})
    public IRubyObject getMetric(ThreadContext context) {
        return this.metric;
    }

    @JRubyMethod(name={"config_name"}, meta=true)
    public IRubyObject configName(ThreadContext context) {
        return context.getRuntime().newString(this.input.getName());
    }

    @JRubyMethod(name={"id"})
    public IRubyObject getId(ThreadContext context) {
        return context.getRuntime().newString(this.input.getId());
    }

    @JRubyMethod(name={"threadable"})
    public IRubyObject isThreadable(ThreadContext context) {
        return context.fals;
    }

    @JRubyMethod(name={"register"})
    public IRubyObject register(ThreadContext context) {
        return this;
    }

    @JRubyMethod(name={"do_close"})
    public IRubyObject close(ThreadContext context) {
        return this;
    }

    @JRubyMethod(name={"stop?"})
    public IRubyObject isStopping(ThreadContext context) {
        return context.fals;
    }

    @JRubyMethod(name={"do_stop"})
    public IRubyObject doStop(ThreadContext context) {
        try {
            this.input.stop();
            this.input.awaitStop();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return this;
    }

    private void initializeQueueWriter(Map<String, Object> pluginArgs) {
        ArrayList<Function<Map<String, Object>, Map<String, Object>>> inputActions = new ArrayList<Function<Map<String, Object>, Map<String, Object>>>();
        for (Map.Entry<String, Object> entry : pluginArgs.entrySet()) {
            Function<Map<String, Object>, Map<String, Object>> inputAction = CommonActions.getInputAction(entry);
            if (inputAction == null) continue;
            inputActions.add(inputAction);
        }
        this.decoratingQueueWriter = inputActions.size() == 0 ? null : new DecoratingQueueWriter(inputActions);
    }

    static class DecoratingQueueWriter
    implements QueueWriter {
        private QueueWriter innerQueueWriter;
        private final List<Function<Map<String, Object>, Map<String, Object>>> inputActions;

        DecoratingQueueWriter(List<Function<Map<String, Object>, Map<String, Object>>> inputActions) {
            this.inputActions = inputActions;
        }

        @Override
        public void push(Map<String, Object> event) {
            for (Function<Map<String, Object>, Map<String, Object>> action : this.inputActions) {
                event = action.apply(event);
            }
            this.innerQueueWriter.push(event);
        }

        private void setInnerQueueWriter(QueueWriter innerQueueWriter) {
            this.innerQueueWriter = innerQueueWriter;
        }
    }
}

