/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.cluster.coordination;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.coordination.ApplyCommitRequest;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.Join;
import org.elasticsearch.cluster.coordination.PublishRequest;
import org.elasticsearch.cluster.coordination.PublishResponse;
import org.elasticsearch.cluster.coordination.PublishWithJoinResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

public class PublicationTransportHandler {
    private static final Logger logger = LogManager.getLogger(PublicationTransportHandler.class);
    public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
    public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";
    private final TransportService transportService;
    private final NamedWriteableRegistry namedWriteableRegistry;
    private final Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest;
    private AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference();
    private AtomicReference<PublishRequest> currentPublishRequestToSelf = new AtomicReference();
    private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
    private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
    private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
    private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();

    public PublicationTransportHandler(TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest, BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit) {
        this.transportService = transportService;
        this.namedWriteableRegistry = namedWriteableRegistry;
        this.handlePublishRequest = handlePublishRequest;
        transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, BytesTransportRequest::new, "generic", false, false, (request, channel, task) -> channel.sendResponse(this.handleIncomingPublishRequest((BytesTransportRequest)request)));
        transportService.registerRequestHandler("internal:discovery/zen/publish/send", BytesTransportRequest::new, "generic", false, false, (request, channel, task) -> {
            this.handleIncomingPublishRequest((BytesTransportRequest)request);
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        });
        transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, "generic", false, false, ApplyCommitRequest::new, (request, channel, task) -> handleApplyCommit.accept((ApplyCommitRequest)request, this.transportCommitCallback(channel)));
        transportService.registerRequestHandler("internal:discovery/zen/publish/commit", PublishClusterStateAction.CommitClusterStateRequest::new, "generic", false, false, (request, channel, task) -> {
            Optional<ClusterState> matchingClusterState = Optional.ofNullable(this.lastSeenClusterState.get()).filter(cs -> cs.stateUUID().equals(request.stateUUID));
            if (!matchingClusterState.isPresent()) {
                throw new IllegalStateException("can't resolve cluster state with uuid [" + request.stateUUID + "] to commit");
            }
            ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(matchingClusterState.get().getNodes().getMasterNode(), matchingClusterState.get().term(), matchingClusterState.get().version());
            handleApplyCommit.accept(applyCommitRequest, this.transportCommitCallback(channel));
        });
    }

    private ActionListener<Void> transportCommitCallback(final TransportChannel channel) {
        return new ActionListener<Void>(){

            @Override
            public void onResponse(Void aVoid) {
                try {
                    channel.sendResponse(TransportResponse.Empty.INSTANCE);
                }
                catch (IOException e) {
                    logger.debug("failed to send response on commit", (Throwable)e);
                }
            }

            @Override
            public void onFailure(Exception e) {
                try {
                    channel.sendResponse(e);
                }
                catch (IOException ie) {
                    e.addSuppressed(ie);
                    logger.debug("failed to send response on commit", (Throwable)e);
                }
            }
        };
    }

    public PublishClusterStateStats stats() {
        return new PublishClusterStateStats(this.fullClusterStateReceivedCount.get(), this.incompatibleClusterStateDiffReceivedCount.get(), this.compatibleClusterStateDiffReceivedCount.get());
    }

    public PublicationContext newPublicationContext(final ClusterChangedEvent clusterChangedEvent) {
        final DiscoveryNodes nodes = clusterChangedEvent.state().nodes();
        final ClusterState newState = clusterChangedEvent.state();
        final ClusterState previousState = clusterChangedEvent.previousState();
        final boolean sendFullVersion = clusterChangedEvent.previousState().getBlocks().disableStatePersistence();
        final HashMap<Version, BytesReference> serializedStates = new HashMap<Version, BytesReference>();
        final HashMap<Version, BytesReference> serializedDiffs = new HashMap<Version, BytesReference>();
        PublicationTransportHandler.buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(), nodes, sendFullVersion, serializedStates, serializedDiffs);
        return new PublicationContext(){

            @Override
            public void sendPublishRequest(DiscoveryNode destination, final PublishRequest publishRequest, final ActionListener<PublishWithJoinResponse> originalListener) {
                ActionListener<PublishWithJoinResponse> responseActionListener;
                assert (publishRequest.getAcceptedState() == clusterChangedEvent.state()) : "state got switched on us";
                if (destination.equals(nodes.getLocalNode())) {
                    PublishRequest previousRequest = PublicationTransportHandler.this.currentPublishRequestToSelf.getAndSet(publishRequest);
                    assert (previousRequest == null || previousRequest.getAcceptedState().term() < publishRequest.getAcceptedState().term());
                    responseActionListener = new ActionListener<PublishWithJoinResponse>(){

                        @Override
                        public void onResponse(PublishWithJoinResponse publishWithJoinResponse) {
                            PublicationTransportHandler.this.currentPublishRequestToSelf.compareAndSet(publishRequest, null);
                            originalListener.onResponse(publishWithJoinResponse);
                        }

                        @Override
                        public void onFailure(Exception e) {
                            PublicationTransportHandler.this.currentPublishRequestToSelf.compareAndSet(publishRequest, null);
                            originalListener.onFailure(e);
                        }
                    };
                } else {
                    responseActionListener = originalListener;
                }
                if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
                    logger.trace("sending full cluster state version {} to {}", (Object)newState.version(), (Object)destination);
                    PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener);
                } else {
                    logger.trace("sending cluster state diff for version {} to {}", (Object)newState.version(), (Object)destination);
                    PublicationTransportHandler.this.sendClusterStateDiff(newState, serializedDiffs, serializedStates, destination, responseActionListener);
                }
            }

            @Override
            public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest, final ActionListener<TransportResponse.Empty> responseActionListener) {
                TransportRequest transportRequest;
                String actionName;
                if (Coordinator.isZen1Node(destination)) {
                    actionName = "internal:discovery/zen/publish/commit";
                    transportRequest = new PublishClusterStateAction.CommitClusterStateRequest(newState.stateUUID());
                } else {
                    actionName = PublicationTransportHandler.COMMIT_STATE_ACTION_NAME;
                    transportRequest = applyCommitRequest;
                }
                PublicationTransportHandler.this.transportService.sendRequest(destination, actionName, transportRequest, PublicationTransportHandler.this.stateRequestOptions, new TransportResponseHandler<TransportResponse.Empty>(){

                    @Override
                    public TransportResponse.Empty read(StreamInput in) {
                        return TransportResponse.Empty.INSTANCE;
                    }

                    @Override
                    public void handleResponse(TransportResponse.Empty response) {
                        responseActionListener.onResponse(response);
                    }

                    @Override
                    public void handleException(TransportException exp) {
                        responseActionListener.onFailure(exp);
                    }

                    @Override
                    public String executor() {
                        return "generic";
                    }
                });
            }
        };
    }

    private void sendClusterStateToNode(ClusterState clusterState, BytesReference bytes, DiscoveryNode node, final ActionListener<PublishWithJoinResponse> responseActionListener, boolean sendDiffs, Map<Version, BytesReference> serializedStates) {
        try {
            TransportResponseHandler<TransportResponse> transportResponseHandler;
            String actionName;
            BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion());
            final Consumer<TransportException> transportExceptionHandler = exp -> {
                if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
                    logger.debug("resending full cluster state to node {} reason {}", (Object)node, (Object)exp.getDetailedMessage());
                    this.sendFullClusterState(clusterState, serializedStates, node, responseActionListener);
                } else {
                    logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", (Object)node), (Throwable)exp);
                    responseActionListener.onFailure((Exception)exp);
                }
            };
            TransportResponseHandler<PublishWithJoinResponse> publishWithJoinResponseHandler = new TransportResponseHandler<PublishWithJoinResponse>(){

                @Override
                public PublishWithJoinResponse read(StreamInput in) throws IOException {
                    return new PublishWithJoinResponse(in);
                }

                @Override
                public void handleResponse(PublishWithJoinResponse response) {
                    responseActionListener.onResponse(response);
                }

                @Override
                public void handleException(TransportException exp) {
                    transportExceptionHandler.accept(exp);
                }

                @Override
                public String executor() {
                    return "generic";
                }
            };
            if (Coordinator.isZen1Node(node)) {
                actionName = "internal:discovery/zen/publish/send";
                transportResponseHandler = publishWithJoinResponseHandler.wrap(empty -> new PublishWithJoinResponse(new PublishResponse(clusterState.term(), clusterState.version()), Optional.of(new Join(node, this.transportService.getLocalNode(), clusterState.term(), clusterState.term(), clusterState.version()))), in -> TransportResponse.Empty.INSTANCE);
            } else {
                actionName = PUBLISH_STATE_ACTION_NAME;
                transportResponseHandler = publishWithJoinResponseHandler;
            }
            this.transportService.sendRequest(node, actionName, (TransportRequest)request, this.stateRequestOptions, transportResponseHandler);
        }
        catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", (Object)node), (Throwable)e);
            responseActionListener.onFailure(e);
        }
    }

    private static void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, DiscoveryNodes discoveryNodes, boolean sendFullVersion, Map<Version, BytesReference> serializedStates, Map<Version, BytesReference> serializedDiffs) {
        Diff<ClusterState> diff = null;
        for (DiscoveryNode node : discoveryNodes) {
            try {
                if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
                    if (serializedStates.containsKey(node.getVersion())) continue;
                    serializedStates.put(node.getVersion(), PublicationTransportHandler.serializeFullClusterState(clusterState, node.getVersion()));
                    continue;
                }
                if (diff == null) {
                    diff = clusterState.diff(previousState);
                }
                if (serializedDiffs.containsKey(node.getVersion())) continue;
                serializedDiffs.put(node.getVersion(), PublicationTransportHandler.serializeDiffClusterState(diff, node.getVersion()));
            }
            catch (IOException e) {
                throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", (Throwable)e, node);
            }
        }
    }

    private void sendFullClusterState(ClusterState clusterState, Map<Version, BytesReference> serializedStates, DiscoveryNode node, ActionListener<PublishWithJoinResponse> responseActionListener) {
        BytesReference bytes = serializedStates.get(node.getVersion());
        if (bytes == null) {
            try {
                bytes = PublicationTransportHandler.serializeFullClusterState(clusterState, node.getVersion());
                serializedStates.put(node.getVersion(), bytes);
            }
            catch (Exception e) {
                logger.warn(() -> new ParameterizedMessage("failed to serialize cluster state before publishing it to node {}", (Object)node), (Throwable)e);
                responseActionListener.onFailure(e);
                return;
            }
        }
        this.sendClusterStateToNode(clusterState, bytes, node, responseActionListener, false, serializedStates);
    }

    private void sendClusterStateDiff(ClusterState clusterState, Map<Version, BytesReference> serializedDiffs, Map<Version, BytesReference> serializedStates, DiscoveryNode node, ActionListener<PublishWithJoinResponse> responseActionListener) {
        BytesReference bytes = serializedDiffs.get(node.getVersion());
        assert (bytes != null) : "failed to find serialized diff for node " + node + " of version [" + node.getVersion() + "]";
        this.sendClusterStateToNode(clusterState, bytes, node, responseActionListener, true, serializedStates);
    }

    public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
        BytesStreamOutput bStream = new BytesStreamOutput();
        try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream);){
            stream.setVersion(nodeVersion);
            stream.writeBoolean(true);
            clusterState.writeTo(stream);
        }
        return bStream.bytes();
    }

    public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
        BytesStreamOutput bStream = new BytesStreamOutput();
        try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream);){
            stream.setVersion(nodeVersion);
            stream.writeBoolean(false);
            diff.writeTo(stream);
        }
        return bStream.bytes();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
        ClusterState incomingState;
        StreamInput in;
        block10: {
            PublishWithJoinResponse publishWithJoinResponse;
            Compressor compressor = CompressorFactory.compressor(request.bytes());
            in = request.bytes().streamInput();
            try {
                ClusterState incomingState2;
                if (compressor != null) {
                    in = compressor.streamInput(in);
                }
                in = new NamedWriteableAwareStreamInput(in, this.namedWriteableRegistry);
                in.setVersion(request.version());
                if (!in.readBoolean()) break block10;
                try {
                    incomingState2 = ClusterState.readFrom(in, this.transportService.getLocalNode());
                }
                catch (Exception e) {
                    logger.warn("unexpected error while deserializing an incoming cluster state", (Throwable)e);
                    throw e;
                }
                this.fullClusterStateReceivedCount.incrementAndGet();
                logger.debug("received full cluster state version [{}] with size [{}]", (Object)incomingState2.version(), (Object)request.bytes().length());
                PublishWithJoinResponse response = this.acceptState(incomingState2);
                this.lastSeenClusterState.set(incomingState2);
                publishWithJoinResponse = response;
            }
            catch (Throwable throwable) {
                IOUtils.close((Closeable[])new Closeable[]{in});
                throw throwable;
            }
            IOUtils.close((Closeable[])new Closeable[]{in});
            return publishWithJoinResponse;
        }
        ClusterState lastSeen = this.lastSeenClusterState.get();
        if (lastSeen == null) {
            logger.debug("received diff for but don't have any local cluster state - requesting full state");
            this.incompatibleClusterStateDiffReceivedCount.incrementAndGet();
            throw new IncompatibleClusterStateVersionException("have no local cluster state");
        }
        try {
            Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
            incomingState = diff.apply(lastSeen);
        }
        catch (IncompatibleClusterStateVersionException e) {
            this.incompatibleClusterStateDiffReceivedCount.incrementAndGet();
            throw e;
        }
        catch (Exception e) {
            logger.warn("unexpected error while deserializing an incoming cluster state", (Throwable)e);
            throw e;
        }
        this.compatibleClusterStateDiffReceivedCount.incrementAndGet();
        logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", (Object)incomingState.version(), (Object)incomingState.stateUUID(), (Object)request.bytes().length());
        PublishWithJoinResponse response = this.acceptState(incomingState);
        this.lastSeenClusterState.compareAndSet(lastSeen, incomingState);
        PublishWithJoinResponse publishWithJoinResponse = response;
        IOUtils.close((Closeable[])new Closeable[]{in});
        return publishWithJoinResponse;
    }

    private PublishWithJoinResponse acceptState(ClusterState incomingState) {
        if (this.transportService.getLocalNode().equals(incomingState.nodes().getMasterNode())) {
            PublishRequest publishRequest = this.currentPublishRequestToSelf.get();
            if (publishRequest == null || !publishRequest.getAcceptedState().stateUUID().equals(incomingState.stateUUID())) {
                throw new IllegalStateException("publication to self failed for " + publishRequest);
            }
            return this.handlePublishRequest.apply(publishRequest);
        }
        return this.handlePublishRequest.apply(new PublishRequest(incomingState));
    }

    public static interface PublicationContext {
        public void sendPublishRequest(DiscoveryNode var1, PublishRequest var2, ActionListener<PublishWithJoinResponse> var3);

        public void sendApplyCommit(DiscoveryNode var1, ApplyCommitRequest var2, ActionListener<TransportResponse.Empty> var3);
    }
}

