/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import com.microsoft.azure.eventhubs.ReceiverRuntimeInformation;
import com.microsoft.azure.eventhubs.impl.AmqpConstants;
import com.microsoft.azure.eventhubs.impl.ClientConstants;
import com.microsoft.azure.eventhubs.impl.ClientEntity;
import com.microsoft.azure.eventhubs.impl.EventDataUtil;
import com.microsoft.azure.eventhubs.impl.EventPositionImpl;
import com.microsoft.azure.eventhubs.impl.MessageReceiver;
import com.microsoft.azure.eventhubs.impl.MessagingFactory;
import com.microsoft.azure.eventhubs.impl.PassByRef;
import com.microsoft.azure.eventhubs.impl.ReceivePump;
import com.microsoft.azure.eventhubs.impl.ReceiverSettingsProvider;
import com.microsoft.azure.eventhubs.impl.StringUtil;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnknownDescribedType;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PartitionReceiverImpl
extends ClientEntity
implements ReceiverSettingsProvider,
PartitionReceiver {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(PartitionReceiverImpl.class);
    private final String partitionId;
    private final MessagingFactory underlyingFactory;
    private final String eventHubName;
    private final String consumerGroupName;
    private final Object receiveHandlerLock;
    private final EventPositionImpl eventPosition;
    private final Long epoch;
    private final boolean isEpochReceiver;
    private final ReceiverOptions receiverOptions;
    private final ReceiverRuntimeInformation runtimeInformation;
    private volatile MessageReceiver internalReceiver;
    private ReceivePump receivePump;

    private PartitionReceiverImpl(MessagingFactory factory, String eventHubName, String consumerGroupName, String partitionId, EventPositionImpl eventPosition, Long epoch, boolean isEpochReceiver, ReceiverOptions receiverOptions, Executor executor) {
        super(null, null, executor);
        this.underlyingFactory = factory;
        this.eventHubName = eventHubName;
        this.consumerGroupName = consumerGroupName;
        this.partitionId = partitionId;
        this.eventPosition = eventPosition;
        this.epoch = epoch;
        this.isEpochReceiver = isEpochReceiver;
        this.receiveHandlerLock = new Object();
        this.receiverOptions = receiverOptions;
        this.runtimeInformation = this.receiverOptions != null && this.receiverOptions.getReceiverRuntimeMetricEnabled() ? new ReceiverRuntimeInformation(partitionId) : null;
    }

    static CompletableFuture<PartitionReceiver> create(MessagingFactory factory, String eventHubName, String consumerGroupName, String partitionId, EventPosition eventPosition, long epoch, boolean isEpochReceiver, ReceiverOptions receiverOptions, Executor executor) throws EventHubException {
        if (epoch < 0L) {
            throw new IllegalArgumentException("epoch cannot be a negative value. Please specify a zero or positive long value.");
        }
        if (StringUtil.isNullOrWhiteSpace(consumerGroupName)) {
            throw new IllegalArgumentException("specify valid string for argument - 'consumerGroupName'");
        }
        final PartitionReceiverImpl receiver = new PartitionReceiverImpl(factory, eventHubName, consumerGroupName, partitionId, (EventPositionImpl)eventPosition, epoch, isEpochReceiver, receiverOptions, executor);
        return receiver.createInternalReceiver().thenApplyAsync(new Function<Void, PartitionReceiver>(){

            @Override
            public PartitionReceiver apply(Void a) {
                return receiver;
            }
        }, executor);
    }

    private CompletableFuture<Void> createInternalReceiver() {
        return MessageReceiver.create(this.underlyingFactory, StringUtil.getRandomString(), String.format("%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId), 999, this).thenAcceptAsync((Consumer)new Consumer<MessageReceiver>(){

            @Override
            public void accept(MessageReceiver r) {
                PartitionReceiverImpl.this.internalReceiver = r;
            }
        }, this.executor);
    }

    final EventPosition getStartingPosition() {
        return this.eventPosition;
    }

    @Override
    public final String getPartitionId() {
        return this.partitionId;
    }

    @Override
    public final int getPrefetchCount() {
        return this.internalReceiver.getPrefetchCount();
    }

    @Override
    public final void setPrefetchCount(int prefetchCount) throws EventHubException {
        if (prefetchCount < 10) {
            throw new IllegalArgumentException(String.format(Locale.US, "PrefetchCount has to be above %s", 10));
        }
        this.internalReceiver.setPrefetchCount(prefetchCount);
    }

    @Override
    public final Duration getReceiveTimeout() {
        return this.internalReceiver.getReceiveTimeout();
    }

    @Override
    public void setReceiveTimeout(Duration value) {
        this.internalReceiver.setReceiveTimeout(value);
    }

    @Override
    public final long getEpoch() {
        return this.epoch;
    }

    @Override
    public final ReceiverRuntimeInformation getRuntimeInformation() {
        return this.runtimeInformation;
    }

    @Override
    public CompletableFuture<Iterable<EventData>> receive(int maxEventCount) {
        return this.internalReceiver.receive(maxEventCount).thenApplyAsync(new Function<Collection<Message>, Iterable<EventData>>(){

            @Override
            public Iterable<EventData> apply(Collection<Message> amqpMessages) {
                DeliveryAnnotations deliveryAnnotations;
                PassByRef lastMessageRef = null;
                if (PartitionReceiverImpl.this.receiverOptions != null && PartitionReceiverImpl.this.receiverOptions.getReceiverRuntimeMetricEnabled()) {
                    lastMessageRef = new PassByRef();
                }
                LinkedList<EventData> events = EventDataUtil.toEventDataCollection(amqpMessages, lastMessageRef);
                if (lastMessageRef != null && lastMessageRef.get() != null && (deliveryAnnotations = ((Message)lastMessageRef.get()).getDeliveryAnnotations()) != null && deliveryAnnotations.getValue() != null) {
                    Map deliveryAnnotationsMap = deliveryAnnotations.getValue();
                    PartitionReceiverImpl.this.runtimeInformation.setRuntimeInformation((Long)deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_SEQUENCE_NUMBER), ((Date)deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_TIME_UTC)).toInstant(), (String)deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_OFFSET));
                }
                return events;
            }
        }, this.executor);
    }

    @Override
    public CompletableFuture<Void> setReceiveHandler(PartitionReceiveHandler receiveHandler) {
        return this.setReceiveHandler(receiveHandler, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> setReceiveHandler(PartitionReceiveHandler receiveHandler, boolean invokeWhenNoEvents) {
        Object object = this.receiveHandlerLock;
        synchronized (object) {
            if (receiveHandler == null) {
                if (this.receivePump != null && this.receivePump.isRunning()) {
                    return this.receivePump.stop();
                }
            } else {
                if (this.receivePump != null && this.receivePump.isRunning()) {
                    throw new IllegalArgumentException("Unexpected value for parameter 'receiveHandler'. PartitionReceiver was already registered with a PartitionReceiveHandler instance. Only 1 instance can be registered.");
                }
                this.receivePump = new ReceivePump(new ReceivePump.IPartitionReceiver(){

                    @Override
                    public CompletableFuture<Iterable<EventData>> receive(int maxBatchSize) {
                        return PartitionReceiverImpl.this.receive(maxBatchSize);
                    }

                    @Override
                    public String getPartitionId() {
                        return PartitionReceiverImpl.this.getPartitionId();
                    }
                }, receiveHandler, invokeWhenNoEvents, this.executor);
                this.executor.execute(this.receivePump);
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> onClose() {
        Object object = this.receiveHandlerLock;
        synchronized (object) {
            if (this.receivePump != null && this.receivePump.isRunning()) {
                this.receivePump.stop();
            }
        }
        if (this.internalReceiver != null) {
            return this.internalReceiver.close();
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public Map<Symbol, UnknownDescribedType> getFilter(Message lastReceivedMessage) {
        String expression;
        if (lastReceivedMessage != null) {
            String lastReceivedOffset = lastReceivedMessage.getMessageAnnotations().getValue().get(AmqpConstants.OFFSET).toString();
            expression = String.format("amqp.annotation.%s >%s '%s'", "x-opt-offset", "", lastReceivedOffset);
        } else {
            expression = this.eventPosition.getExpression();
        }
        if (TRACE_LOGGER.isInfoEnabled()) {
            String logReceivePath = "";
            logReceivePath = this.internalReceiver == null ? "receiverPath[RECEIVER IS NULL]" : "receiverPath[" + this.internalReceiver.getReceivePath() + "]";
            TRACE_LOGGER.info(String.format("%s, action[createReceiveLink], %s", logReceivePath, this.eventPosition));
        }
        return Collections.singletonMap(AmqpConstants.STRING_FILTER, new UnknownDescribedType((Object)AmqpConstants.STRING_FILTER, (Object)expression));
    }

    @Override
    public Map<Symbol, Object> getProperties() {
        if (!(this.isEpochReceiver || this.receiverOptions != null && this.receiverOptions.getIdentifier() != null)) {
            return null;
        }
        HashMap<Symbol, Object> properties = new HashMap<Symbol, Object>();
        if (this.isEpochReceiver) {
            properties.put(AmqpConstants.EPOCH, this.epoch);
        }
        if (this.receiverOptions != null && this.receiverOptions.getIdentifier() != null) {
            properties.put(AmqpConstants.RECEIVER_IDENTIFIER_NAME, this.receiverOptions.getIdentifier());
        }
        return properties;
    }

    @Override
    public Symbol[] getDesiredCapabilities() {
        Symbol[] symbolArray;
        if (this.receiverOptions != null && this.receiverOptions.getReceiverRuntimeMetricEnabled()) {
            Symbol[] symbolArray2 = new Symbol[1];
            symbolArray = symbolArray2;
            symbolArray2[0] = AmqpConstants.ENABLE_RECEIVER_RUNTIME_METRIC_NAME;
        } else {
            symbolArray = null;
        }
        return symbolArray;
    }
}

