package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;

/* loaded from: input_file:org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.class */
public class BlockingQueueConsumer {
    private static Log logger = LogFactory.getLog(BlockingQueueConsumer.class);
    private final BlockingQueue<Delivery> queue;
    private volatile ShutdownSignalException shutdown;
    private final String[] queues;
    private final int prefetchCount;
    private final boolean transactional;
    private Channel channel;
    private InternalConsumer consumer;
    private final AtomicBoolean cancelled;
    private final AtomicBoolean cancelReceived;
    private final AcknowledgeMode acknowledgeMode;
    private final ConnectionFactory connectionFactory;
    private final MessagePropertiesConverter messagePropertiesConverter;
    private final ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter;
    private Set<Long> deliveryTags;
    private final boolean defaultRequeuRejected;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/amqp/rabbit/listener/BlockingQueueConsumer$Delivery.class */
    public static class Delivery {
        private final Envelope envelope;
        private final AMQP.BasicProperties properties;
        private final byte[] body;

        public Delivery(Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
            this.envelope = envelope;
            this.properties = basicProperties;
            this.body = bArr;
        }

        public Envelope getEnvelope() {
            return this.envelope;
        }

        public AMQP.BasicProperties getProperties() {
            return this.properties;
        }

        public byte[] getBody() {
            return this.body;
        }
    }

    /* loaded from: input_file:org/springframework/amqp/rabbit/listener/BlockingQueueConsumer$InternalConsumer.class */
    private class InternalConsumer extends DefaultConsumer {
        public InternalConsumer(Channel channel) {
            super(channel);
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            if (BlockingQueueConsumer.logger.isDebugEnabled()) {
                BlockingQueueConsumer.logger.debug("Received shutdown signal for consumer tag=" + str, shutdownSignalException);
            }
            BlockingQueueConsumer.this.shutdown = shutdownSignalException;
            BlockingQueueConsumer.this.deliveryTags.clear();
        }

        public void handleCancel(String str) throws IOException {
            if (BlockingQueueConsumer.logger.isWarnEnabled()) {
                BlockingQueueConsumer.logger.warn("Cancel received");
            }
            BlockingQueueConsumer.this.cancelReceived.set(true);
        }

        public void handleCancelOk(String str) {
            if (BlockingQueueConsumer.logger.isDebugEnabled()) {
                BlockingQueueConsumer.logger.debug("Received cancellation notice for " + BlockingQueueConsumer.this);
            }
            BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            if (BlockingQueueConsumer.this.cancelled.get() && BlockingQueueConsumer.this.acknowledgeMode.isTransactionAllowed()) {
                return;
            }
            if (BlockingQueueConsumer.logger.isDebugEnabled()) {
                BlockingQueueConsumer.logger.debug("Storing delivery for " + BlockingQueueConsumer.this);
            }
            try {
                BlockingQueueConsumer.this.queue.put(new Delivery(envelope, basicProperties, bArr));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean z, int i, String... strArr) {
        this(connectionFactory, messagePropertiesConverter, activeObjectCounter, acknowledgeMode, z, i, true, strArr);
    }

    public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean z, int i, boolean z2, String... strArr) {
        this.queue = new LinkedBlockingQueue();
        this.cancelled = new AtomicBoolean(false);
        this.cancelReceived = new AtomicBoolean(false);
        this.deliveryTags = new LinkedHashSet();
        this.connectionFactory = connectionFactory;
        this.messagePropertiesConverter = messagePropertiesConverter;
        this.activeObjectCounter = activeObjectCounter;
        this.acknowledgeMode = acknowledgeMode;
        this.transactional = z;
        this.prefetchCount = i;
        this.defaultRequeuRejected = z2;
        this.queues = strArr;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public String getConsumerTag() {
        return this.consumer.getConsumerTag();
    }

    private void checkShutdown() {
        if (this.shutdown != null) {
            throw Utility.fixStackTrace(this.shutdown);
        }
    }

    private Message handle(Delivery delivery) throws InterruptedException {
        if (delivery == null && this.shutdown != null) {
            throw this.shutdown;
        }
        if (delivery == null) {
            return null;
        }
        byte[] body = delivery.getBody();
        MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties(delivery.getProperties(), delivery.getEnvelope(), "UTF-8");
        messageProperties.setMessageCount(0);
        Message message = new Message(body, messageProperties);
        if (logger.isDebugEnabled()) {
            logger.debug("Received message: " + message);
        }
        this.deliveryTags.add(Long.valueOf(messageProperties.getDeliveryTag()));
        return message;
    }

    public Message nextMessage() throws InterruptedException, ShutdownSignalException {
        logger.trace("Retrieving delivery for " + this);
        return handle(this.queue.take());
    }

    public Message nextMessage(long j) throws InterruptedException, ShutdownSignalException {
        if (logger.isDebugEnabled()) {
            logger.debug("Retrieving delivery for " + this);
        }
        checkShutdown();
        Message handle = handle(this.queue.poll(j, TimeUnit.MILLISECONDS));
        if (handle == null && this.cancelReceived.get()) {
            throw new ConsumerCancelledException();
        }
        return handle;
    }

    public void start() throws AmqpException {
        int i;
        if (logger.isDebugEnabled()) {
            logger.debug("Starting consumer " + this);
        }
        this.channel = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory, this.transactional).getChannel();
        this.consumer = new InternalConsumer(this.channel);
        this.deliveryTags.clear();
        this.activeObjectCounter.add(this);
        int i2 = 3;
        do {
            try {
                if (!this.acknowledgeMode.isAutoAck()) {
                    this.channel.basicQos(this.prefetchCount);
                }
                for (int i3 = 0; i3 < this.queues.length; i3++) {
                    this.channel.queueDeclarePassive(this.queues[i3]);
                }
                i2 = 0;
            } catch (IOException e) {
                if (i2 <= 0 || !this.channel.isOpen()) {
                    this.activeObjectCounter.release(this);
                    throw new FatalListenerStartupException("Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.", e);
                }
                if (logger.isWarnEnabled()) {
                    logger.warn("Reconnect failed; retries left=" + (i2 - 1), e);
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            i = i2;
            i2--;
        } while (i > 0);
        for (int i4 = 0; i4 < this.queues.length; i4++) {
            try {
                this.channel.basicConsume(this.queues[i4], this.acknowledgeMode.isAutoAck(), this.consumer);
                if (logger.isDebugEnabled()) {
                    logger.debug("Started on queue '" + this.queues[i4] + "': " + this);
                }
            } catch (IOException e3) {
                throw RabbitUtils.convertRabbitAccessException(e3);
            }
        }
    }

    public void stop() {
        this.cancelled.set(true);
        if (this.consumer != null && this.consumer.getChannel() != null && this.consumer.getConsumerTag() != null && !this.cancelReceived.get()) {
            try {
                RabbitUtils.closeMessageConsumer(this.consumer.getChannel(), this.consumer.getConsumerTag(), this.transactional);
            } catch (Exception e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Error closing consumer", e);
                }
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Closing Rabbit Channel: " + this.channel);
        }
        RabbitUtils.setPhysicalCloseRequired(true);
        RabbitUtils.closeChannel(this.channel);
        this.deliveryTags.clear();
        this.consumer = null;
    }

    public String toString() {
        return "Consumer: tag=[" + (this.consumer != null ? this.consumer.getConsumerTag() : null) + "], channel=" + this.channel + ", acknowledgeMode=" + this.acknowledgeMode + " local queue size=" + this.queue.size();
    }

    public void rollbackOnExceptionIfNecessary(Throwable th) throws Exception {
        boolean z = (this.acknowledgeMode.isAutoAck() || this.acknowledgeMode.isManual()) ? false : true;
        try {
            try {
                if (this.transactional) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Initiating transaction rollback on application exception: " + th);
                    }
                    RabbitUtils.rollbackIfNecessary(this.channel);
                }
                if (z) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Rejecting messages");
                    }
                    boolean z2 = this.defaultRequeuRejected;
                    for (Throwable th2 = th; z2 && th2 != null; th2 = th2.getCause()) {
                        if (th2 instanceof AmqpRejectAndDontRequeueException) {
                            z2 = false;
                        }
                    }
                    Iterator<Long> it = this.deliveryTags.iterator();
                    while (it.hasNext()) {
                        this.channel.basicReject(it.next().longValue(), z2);
                    }
                    if (this.transactional) {
                        RabbitUtils.commitIfNecessary(this.channel);
                    }
                }
            } catch (Exception e) {
                logger.error("Application exception overridden by rollback exception", th);
                throw e;
            }
        } finally {
            this.deliveryTags.clear();
        }
    }

    public boolean commitIfNecessary(boolean z) throws IOException {
        if (this.deliveryTags.isEmpty()) {
            return false;
        }
        try {
            if ((this.acknowledgeMode.isAutoAck() || this.acknowledgeMode.isManual()) ? false : true) {
                if (this.transactional && !z) {
                    Iterator<Long> it = this.deliveryTags.iterator();
                    while (it.hasNext()) {
                        ConnectionFactoryUtils.registerDeliveryTag(this.connectionFactory, this.channel, it.next());
                    }
                } else if (!this.deliveryTags.isEmpty()) {
                    this.channel.basicAck(((Long) new ArrayList(this.deliveryTags).get(this.deliveryTags.size() - 1)).longValue(), true);
                }
            }
            if (z) {
                RabbitUtils.commitIfNecessary(this.channel);
            }
            return true;
        } finally {
            this.deliveryTags.clear();
        }
    }
}
