/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.proton;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpNoLocalFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;

public class ProtonServerSenderContext
extends ProtonInitializable
implements ProtonDeliveryHandler {
    private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
    private static final Symbol COPY = Symbol.valueOf((String)"copy");
    private static final Symbol TOPIC = Symbol.valueOf((String)"topic");
    private static final Symbol QUEUE = Symbol.valueOf((String)"queue");
    private static final Symbol SHARED = Symbol.valueOf((String)"shared");
    private static final Symbol GLOBAL = Symbol.valueOf((String)"global");
    private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback();
    private Consumer brokerConsumer;
    private ReadyListener onflowControlReady;
    protected final AMQPSessionContext protonSession;
    protected final Sender sender;
    protected final AMQPConnectionContext connection;
    protected boolean closed = false;
    protected final AMQPSessionCallback sessionSPI;
    private boolean multicast;
    private RoutingType defaultRoutingType;
    private RoutingType routingTypeToUse = this.defaultRoutingType = RoutingType.ANYCAST;
    private boolean shared = false;
    private boolean global = false;
    private boolean isVolatile = false;
    private boolean preSettle;
    private SimpleString tempQueueName;
    private final AtomicBoolean draining = new AtomicBoolean(false);
    private int credits = 0;
    private AtomicInteger pending = new AtomicInteger(0);
    private final Object creditsLock = new Object();
    private final java.util.function.Consumer<? super MessageReference> executeDelivery;

    public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
        this.connection = connection;
        this.sender = sender;
        this.protonSession = protonSession;
        this.sessionSPI = server;
        this.executeDelivery = this::executeDelivery;
    }

    public Object getBrokerConsumer() {
        return this.brokerConsumer;
    }

    @Override
    public void onFlow(int currentCredits, boolean drain) {
        this.connection.requireInHandler();
        this.setupCredit();
        ServerConsumerImpl serverConsumer = (ServerConsumerImpl)this.brokerConsumer;
        if (drain) {
            if (this.draining.compareAndSet(false, true)) {
                final ProtonServerSenderContext plugSender = (ProtonServerSenderContext)serverConsumer.getProtocolContext();
                serverConsumer.forceDelivery(1L, new Runnable(){

                    @Override
                    public void run() {
                        try {
                            ProtonServerSenderContext.this.connection.runNow(() -> {
                                plugSender.reportDrained();
                                ProtonServerSenderContext.this.setupCredit();
                            });
                        }
                        finally {
                            ProtonServerSenderContext.this.draining.set(false);
                        }
                    }
                });
            }
        } else {
            serverConsumer.receiveCredits(-1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean hasCredits() {
        if (!this.connection.flowControl(this.onflowControlReady)) {
            return false;
        }
        Object object = this.creditsLock;
        synchronized (object) {
            return this.credits > 0 && this.sender.getLocalState() != EndpointState.CLOSED;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setupCredit() {
        Object object = this.creditsLock;
        synchronized (object) {
            this.credits = this.sender.getCredit() - this.pending.get();
            if (this.credits < 0) {
                this.credits = 0;
            }
        }
    }

    public Sender getSender() {
        return this.sender;
    }

    public void start() throws ActiveMQAMQPException {
        this.sessionSPI.start();
        try {
            if (this.brokerConsumer != null) {
                this.sessionSPI.startSender(this.brokerConsumer);
            }
        }
        catch (Exception e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void initialise() throws Exception {
        Map.Entry<Symbol, DescribedType> filter;
        super.initialise();
        Source source = (Source)this.sender.getRemoteSource();
        SimpleString queue = null;
        String selector = null;
        HashMap<Symbol, DescribedType> supportedFilters = new HashMap<Symbol, DescribedType>();
        this.sender.setSenderSettleMode(this.sender.getRemoteSenderSettleMode());
        this.sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        if (source != null && (filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS)) != null) {
            selector = filter.getValue().getDescribed().toString();
            try {
                SelectorParser.parse((String)selector);
            }
            catch (FilterException e) {
                throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
            }
            supportedFilters.put(filter.getKey(), filter.getValue());
        }
        if (source == null) {
            String clientId = this.getClientId();
            String pubId = this.sender.getName();
            this.global = ProtonServerSenderContext.hasRemoteDesiredCapability((Link)this.sender, GLOBAL);
            queue = ProtonServerSenderContext.createQueueName(this.connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, this.global, false);
            QueueQueryResult result = this.sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
            this.multicast = true;
            this.routingTypeToUse = RoutingType.MULTICAST;
            if (!result.isExists()) throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + this.sender.getName());
            source = new Source();
            source.setAddress(queue.toString());
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDistributionMode(COPY);
            source.setCapabilities(new Symbol[]{TOPIC});
            SimpleString filterString = result.getFilterString();
            if (filterString != null) {
                selector = filterString.toString();
                boolean noLocal = false;
                String remoteContainerId = this.sender.getSession().getConnection().getRemoteContainer();
                String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
                if (selector.endsWith(noLocalFilter)) {
                    if (selector.length() > noLocalFilter.length()) {
                        noLocalFilter = " AND " + noLocalFilter;
                        selector = selector.substring(0, selector.length() - noLocalFilter.length());
                    } else {
                        selector = null;
                    }
                    noLocal = true;
                }
                if (noLocal) {
                    supportedFilters.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
                }
                if (selector != null && !selector.trim().isEmpty()) {
                    supportedFilters.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(selector));
                }
            }
            this.sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        } else if (source.getDynamic()) {
            this.tempQueueName = queue = SimpleString.toSimpleString((String)UUID.randomUUID().toString());
            try {
                this.sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
            }
            catch (Exception e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
            }
            source.setAddress(queue.toString());
        } else {
            SimpleString matchingAnycastQueue;
            Set routingTypes;
            AddressQueryResult addressQueryResult;
            boolean clientDefined;
            SimpleString addressToUse;
            SimpleString queueNameToUse = null;
            this.shared = ProtonServerSenderContext.hasCapabilities(SHARED, source);
            this.global = ProtonServerSenderContext.hasCapabilities(GLOBAL, source);
            if (CompositeAddress.isFullyQualified((String)source.getAddress())) {
                addressToUse = SimpleString.toSimpleString((String)CompositeAddress.extractAddressName((String)source.getAddress()));
                queueNameToUse = SimpleString.toSimpleString((String)CompositeAddress.extractQueueName((String)source.getAddress()));
            } else {
                addressToUse = new SimpleString(source.getAddress());
            }
            boolean bl = clientDefined = ProtonServerSenderContext.hasCapabilities(TOPIC, source) || ProtonServerSenderContext.hasCapabilities(QUEUE, source);
            if (clientDefined) {
                this.multicast = ProtonServerSenderContext.hasCapabilities(TOPIC, source);
                addressQueryResult = null;
                try {
                    addressQueryResult = this.sessionSPI.addressQuery(addressToUse, this.multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
                }
                catch (ActiveMQSecurityException e) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
                }
                catch (ActiveMQAMQPException e) {
                    throw e;
                }
                catch (Exception e) {
                    throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
                }
                if (!addressQueryResult.isExists()) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
                routingTypes = addressQueryResult.getRoutingTypes();
                if (this.multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
                    throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for topic support");
                }
                if (!this.multicast && !routingTypes.contains(RoutingType.ANYCAST) && queueNameToUse == null) {
                    throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
                }
            } else {
                addressQueryResult = null;
                try {
                    addressQueryResult = this.sessionSPI.addressQuery(addressToUse, this.defaultRoutingType, true);
                }
                catch (ActiveMQSecurityException e) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
                }
                catch (ActiveMQAMQPException e) {
                    throw e;
                }
                catch (Exception e) {
                    throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
                }
                if (!addressQueryResult.isExists()) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
                routingTypes = addressQueryResult.getRoutingTypes();
                this.multicast = routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1;
            }
            RoutingType routingType = this.routingTypeToUse = this.multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST;
            if (this.multicast) {
                Map.Entry<Symbol, DescribedType> filter2 = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
                if (filter2 != null) {
                    String remoteContainerId = this.sender.getSession().getConnection().getRemoteContainer();
                    String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
                    selector = selector != null ? selector + " AND " + noLocalFilter : noLocalFilter;
                    supportedFilters.put(filter2.getKey(), filter2.getValue());
                }
                queue = this.getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
                SimpleString simpleStringSelector = SimpleString.toSimpleString((String)selector);
                if (queue != null) {
                    this.multicast = false;
                } else if (TerminusDurability.UNSETTLED_STATE.equals((Object)source.getDurable()) || TerminusDurability.CONFIGURATION.equals((Object)source.getDurable())) {
                    String clientId = this.getClientId();
                    String pubId = this.sender.getName();
                    queue = ProtonServerSenderContext.createQueueName(this.connection.isUseCoreSubscriptionNaming(), clientId, pubId, this.shared, this.global, false);
                    QueueQueryResult result = this.sessionSPI.queueQuery(queue, this.routingTypeToUse, false);
                    if (result.isExists()) {
                        if (!Objects.equals(result.getFilterString(), simpleStringSelector) || this.sender.getSource() != null && !this.sender.getSource().getAddress().equals(result.getAddress().toString())) {
                            if (result.getConsumerCount() != 0) throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                            this.sessionSPI.deleteQueue(queue);
                            this.sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                        }
                    } else if (this.shared) {
                        this.sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                    } else {
                        this.sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                    }
                } else {
                    this.isVolatile = true;
                    if (this.shared && this.sender.getName() != null) {
                        queue = ProtonServerSenderContext.createQueueName(this.connection.isUseCoreSubscriptionNaming(), this.getClientId(), this.sender.getName(), this.shared, this.global, this.isVolatile);
                        QueueQueryResult result = this.sessionSPI.queueQuery(queue, this.routingTypeToUse, false);
                        if (!(result.isExists() && Objects.equals(result.getAddress(), addressToUse) && Objects.equals(result.getFilterString(), simpleStringSelector))) {
                            this.sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                        }
                    } else {
                        this.tempQueueName = queue = SimpleString.toSimpleString((String)UUID.randomUUID().toString());
                        try {
                            this.sessionSPI.createTemporaryQueue(addressToUse, queue, RoutingType.MULTICAST, simpleStringSelector);
                        }
                        catch (Exception e) {
                            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                        }
                    }
                }
            } else if (queueNameToUse != null) {
                this.routingTypeToUse = null;
                matchingAnycastQueue = this.getMatchingQueue(queueNameToUse, addressToUse, null);
                if (matchingAnycastQueue == null) throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                queue = matchingAnycastQueue;
            } else {
                matchingAnycastQueue = this.sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
                queue = matchingAnycastQueue != null ? matchingAnycastQueue : addressToUse;
            }
            if (queue == null) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
            }
            try {
                if (!this.sessionSPI.queueQuery(queue, this.routingTypeToUse, !this.multicast).isExists()) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
            }
            catch (ActiveMQAMQPNotFoundException e) {
                throw e;
            }
            catch (Exception e) {
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
            }
        }
        this.preSettle = this.sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
        source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
        boolean browseOnly = !this.multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
        try {
            this.brokerConsumer = (Consumer)this.sessionSPI.createSender(this, queue, this.multicast ? null : selector, browseOnly);
            this.onflowControlReady = () -> ((Consumer)this.brokerConsumer).promptDelivery();
            return;
        }
        catch (ActiveMQAMQPResourceLimitExceededException e1) {
            throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
        }
        catch (ActiveMQSecurityException e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
        }
        catch (Exception e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
        }
    }

    private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
        if (queueName != null) {
            QueueQueryResult result = this.sessionSPI.queueQuery(queueName, routingType, false);
            if (!result.isExists()) {
                throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
            }
            if (!result.getAddress().equals((Object)address)) {
                throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
            }
            return this.sessionSPI.getMatchingQueue(address, queueName, routingType);
        }
        return null;
    }

    protected String getClientId() {
        return this.connection.getRemoteContainer();
    }

    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
        this.closed = true;
        if (condition != null) {
            this.sender.setCondition(condition);
        }
        this.protonSession.removeSender(this.sender);
        this.connection.runLater(() -> {
            this.sender.close();
            try {
                this.sessionSPI.closeSender(this.brokerConsumer);
            }
            catch (Exception e) {
                log.warn((Object)e.getMessage(), (Throwable)e);
            }
            this.sender.close();
            this.connection.flush();
        });
    }

    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
        try {
            this.closed = true;
            this.sessionSPI.closeSender(this.brokerConsumer);
            if (remoteLinkClose) {
                Source source = (Source)this.sender.getSource();
                if (source != null && source.getAddress() != null && this.multicast) {
                    SimpleString queueName = SimpleString.toSimpleString((String)source.getAddress());
                    QueueQueryResult result = this.sessionSPI.queueQuery(queueName, this.routingTypeToUse, false);
                    if (result.isExists() && source.getDynamic()) {
                        this.sessionSPI.deleteQueue(queueName);
                    } else if (source.getDurable() == TerminusDurability.NONE && this.tempQueueName != null && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
                        this.sessionSPI.removeTemporaryQueue(this.tempQueueName);
                    } else {
                        SimpleString queue;
                        String clientId = this.getClientId();
                        String pubId = this.sender.getName();
                        if (pubId.contains("|")) {
                            pubId = pubId.split("\\|")[0];
                        }
                        if ((result = this.sessionSPI.queueQuery(queue = ProtonServerSenderContext.createQueueName(this.connection.isUseCoreSubscriptionNaming(), clientId, pubId, this.shared, this.global, this.isVolatile), this.multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false)).isExists() && !this.isVolatile && result.getConsumerCount() == 0) {
                            this.sessionSPI.deleteQueue(queue);
                        }
                    }
                } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
                    try {
                        this.sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString((String)source.getAddress()));
                    }
                    catch (Exception exception) {}
                }
            }
        }
        catch (Exception e) {
            log.warn((Object)e.getMessage(), (Throwable)e);
            throw new ActiveMQAMQPInternalErrorException(e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
        if (this.closed) {
            return;
        }
        OperationContext oldContext = this.sessionSPI.recoverContext();
        try {
            Message message = ((MessageReference)delivery.getContext()).getMessage();
            DeliveryState remoteState = delivery.getRemoteState();
            if (remoteState != null && remoteState.getType() == DeliveryState.DeliveryStateType.Accepted) {
                if (!delivery.isSettled()) {
                    try {
                        this.sessionSPI.ack(null, this.brokerConsumer, message);
                    }
                    catch (Exception e) {
                        log.warn((Object)e.toString(), (Throwable)e);
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
                    }
                    delivery.settle();
                }
            } else {
                this.handleExtendedDeliveryOutcomes(message, delivery, remoteState);
            }
            if (!this.preSettle) {
                this.protonSession.replaceTag(delivery.getTag());
            }
        }
        finally {
            this.sessionSPI.afterIO(this.connectionFlusher);
            this.sessionSPI.resetContext(oldContext);
        }
    }

    private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException {
        boolean settleImmediate = true;
        boolean handled = true;
        if (remoteState == null) {
            log.debug((Object)("Received null disposition for delivery update: " + remoteState));
            return true;
        }
        switch (remoteState.getType()) {
            case Transactional: {
                TransactionalState txState = (TransactionalState)remoteState;
                ProtonTransactionImpl tx = (ProtonTransactionImpl)this.sessionSPI.getTransaction(txState.getTxnId(), false);
                if (txState.getOutcome() == null) break;
                settleImmediate = false;
                Outcome outcome = txState.getOutcome();
                if (!(outcome instanceof Accepted)) break;
                if (!delivery.remotelySettled()) {
                    TransactionalState txAccepted = new TransactionalState();
                    txAccepted.setOutcome((Outcome)Accepted.getInstance());
                    txAccepted.setTxnId(txState.getTxnId());
                    delivery.disposition((DeliveryState)txAccepted);
                }
                try {
                    this.sessionSPI.ack((Transaction)tx, this.brokerConsumer, message);
                    tx.addDelivery(delivery, this);
                    break;
                }
                catch (Exception e) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
                }
            }
            case Released: {
                try {
                    this.sessionSPI.cancel(this.brokerConsumer, message, false);
                    break;
                }
                catch (Exception e) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
                }
            }
            case Rejected: {
                try {
                    this.sessionSPI.reject(this.brokerConsumer, message);
                    break;
                }
                catch (Exception e) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
                }
            }
            case Modified: {
                try {
                    Modified modification = (Modified)remoteState;
                    if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
                        message.rejectConsumer(this.brokerConsumer.sequentialID());
                    }
                    if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
                        this.sessionSPI.cancel(this.brokerConsumer, message, true);
                        break;
                    }
                    this.sessionSPI.cancel(this.brokerConsumer, message, false);
                    break;
                }
                catch (Exception e) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
                }
            }
            default: {
                log.debug((Object)("Received null or unknown disposition for delivery update: " + remoteState));
                handled = false;
            }
        }
        if (settleImmediate) {
            delivery.settle();
        }
        return handled;
    }

    public void settle(Delivery delivery) {
        this.connection.requireInHandler();
        delivery.settle();
    }

    public synchronized void checkState() {
        this.sessionSPI.resumeDelivery(this.brokerConsumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int deliverMessage(MessageReference messageReference, ServerConsumer consumer) throws Exception {
        if (this.closed) {
            return 0;
        }
        Object object = this.creditsLock;
        synchronized (object) {
            if (this.sender.getLocalState() == EndpointState.CLOSED) {
                return 0;
            }
            this.pending.incrementAndGet();
            --this.credits;
        }
        if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) {
            messageReference.onDelivery(this.executeDelivery);
            this.connection.runNow((Runnable)messageReference);
        } else {
            this.connection.runNow(() -> this.executeDelivery(messageReference));
        }
        return 1;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeDelivery(MessageReference messageReference) {
        try {
            if (this.sender.getLocalState() == EndpointState.CLOSED) {
                log.debug((Object)("Not delivering message " + messageReference + " as the sender is closed and credits were available, if you see too many of these it means clients are issuing credits and closing the connection with pending credits a lot of times"));
                return;
            }
            AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
            this.sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection)this.sessionSPI.getTransportConnection().getProtocolConnection());
            ReadableBuffer sendBuffer = message.getSendBuffer(messageReference.getDeliveryCount());
            byte[] tag = this.preSettle ? new byte[]{} : this.protonSession.getTag();
            boolean releaseRequired = sendBuffer instanceof NettyReadable;
            Delivery delivery = this.sender.delivery(tag, 0, tag.length);
            delivery.setMessageFormat((int)message.getMessageFormat());
            delivery.setContext((Object)messageReference);
            try {
                if (releaseRequired) {
                    this.sender.send(sendBuffer);
                    releaseRequired = false;
                    ((NettyReadable)sendBuffer).getByteBuf().release();
                } else {
                    this.sender.sendNoCopy(sendBuffer);
                }
                if (this.preSettle) {
                    try {
                        this.sessionSPI.ack(null, this.brokerConsumer, messageReference.getMessage());
                    }
                    catch (Exception e) {
                        log.debug((Object)e.getMessage(), (Throwable)e);
                    }
                    delivery.settle();
                } else {
                    this.sender.advance();
                }
                this.connection.flush();
            }
            finally {
                Object object = this.creditsLock;
                synchronized (object) {
                    this.pending.decrementAndGet();
                }
                if (releaseRequired) {
                    ((NettyReadable)sendBuffer).getByteBuf().release();
                }
            }
        }
        catch (Exception e) {
            log.warn((Object)e.getMessage(), (Throwable)e);
            this.brokerConsumer.errorProcessing((Throwable)e, messageReference);
        }
    }

    private static boolean hasCapabilities(Symbol symbol, Source source) {
        if (source != null && source.getCapabilities() != null) {
            for (Symbol cap : source.getCapabilities()) {
                if (!symbol.equals(cap)) continue;
                return true;
            }
        }
        return false;
    }

    private static boolean hasRemoteDesiredCapability(Link link, Symbol capability) {
        Symbol[] remoteDesiredCapabilities = link.getRemoteDesiredCapabilities();
        if (remoteDesiredCapabilities != null) {
            for (Symbol cap : remoteDesiredCapabilities) {
                if (!capability.equals(cap)) continue;
                return true;
            }
        }
        return false;
    }

    private static SimpleString createQueueName(boolean useCoreSubscriptionNaming, String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) {
        String queue;
        if (useCoreSubscriptionNaming) {
            boolean durable = !isVolatile;
            String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId;
            String clientID = clientId == null || clientId.isEmpty() || global ? null : clientId;
            return ActiveMQDestination.createQueueNameForSubscription((boolean)durable, (String)clientID, (String)subscriptionName);
        }
        String string = queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId;
        if (shared) {
            if (queue.contains("|")) {
                queue = queue.split("\\|")[0];
            }
            if (isVolatile) {
                queue = queue + ":shared-volatile";
            }
            if (global) {
                queue = queue + ":global";
            }
        }
        return SimpleString.toSimpleString((String)queue);
    }

    public void reportDrained() {
        this.connection.requireInHandler();
        this.sender.drained();
        this.connection.flush();
    }

    private final class ConnectionFlushIOCallback
    implements IOCallback {
        private ConnectionFlushIOCallback() {
        }

        public void done() {
            ProtonServerSenderContext.this.connection.flush();
        }

        public void onError(int errorCode, String errorMessage) {
            ProtonServerSenderContext.this.connection.flush();
        }
    }
}

