package cn.cerc.db.queue.rabbitmq;

import cn.cerc.db.core.Curl;
import cn.cerc.db.core.ServerConfig;
import cn.cerc.db.core.Utils;
import cn.cerc.db.log.KnowallLog;
import cn.cerc.db.maintain.MaintainConfig;
import cn.cerc.db.queue.OnStringMessage;
import cn.cerc.db.queue.entity.CheckMQEntity;
import cn.cerc.mis.exception.QueueTimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/cerc/db/queue/rabbitmq/RabbitQueue.class */
public class RabbitQueue implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(RabbitQueue.class);
    private int maximum = 1;
    private Channel channel;
    private final String queueId;

    public RabbitQueue(String str) {
        this.queueId = str;
    }

    private void initChannel() {
        try {
            try {
                Connection connection = RabbitServer.getInstance().getConnection();
                this.channel = connection.createChannel();
                if (this.channel == null) {
                    throw new RuntimeException("rabbitmq channel 创建失败，请立即检查 mq 的服务状态");
                }
                this.channel.addShutdownListener(shutdownSignalException -> {
                    log.debug("{} rabbitmq channel closed", Integer.valueOf(this.channel.getChannelNumber()));
                });
                this.channel.basicQos(this.maximum);
                this.channel.queueDeclare(this.queueId, true, false, false, (Map) null);
                if (connection != null) {
                    RabbitServer.getInstance().releaseConnection(connection);
                }
            } catch (IOException | InterruptedException e) {
                Curl curl = new Curl();
                String property = ServerConfig.getInstance().getProperty("qc.api.rabbitmq.heartbeat.site");
                if (Utils.isEmpty(property)) {
                    log.error("未配置rabbitmq心跳监测地址 qc.api.rabbitmq.heartbeat.site");
                    if (0 != 0) {
                        RabbitServer.getInstance().releaseConnection(null);
                        return;
                    }
                    return;
                }
                String appProduct = ServerConfig.getAppProduct();
                String appVersion = ServerConfig.getAppVersion();
                CheckMQEntity checkMQEntity = new CheckMQEntity();
                checkMQEntity.setProjcet(appProduct);
                checkMQEntity.setVersion(appVersion);
                checkMQEntity.setAlive(false);
                try {
                    curl.doPost(property, checkMQEntity);
                } catch (Exception e2) {
                    log.warn("{} {} MQ连接超时，qc监控MQ接口异常", new Object[]{appProduct, appVersion, e2});
                }
                if (0 != 0) {
                    RabbitServer.getInstance().releaseConnection(null);
                }
            }
        } catch (Throwable th) {
            if (0 != 0) {
                RabbitServer.getInstance().releaseConnection(null);
            }
            throw th;
        }
    }

    public void watch(final OnStringMessage onStringMessage) {
        initChannel();
        if (onStringMessage == null || this.channel == null) {
            return;
        }
        try {
            this.channel.basicConsume(this.queueId, false, new DefaultConsumer(this.channel) { // from class: cn.cerc.db.queue.rabbitmq.RabbitQueue.1
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    String str2 = new String(bArr);
                    long currentTimeMillis = System.currentTimeMillis();
                    try {
                        try {
                        } catch (Exception e) {
                            RabbitQueue.this.channel.basicReject(envelope.getDeliveryTag(), true);
                            RabbitQueue.log.error(String.format("queueId %s, payload %s, message %s", RabbitQueue.this.queueId, str2, e.getMessage()), e);
                            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                            if (currentTimeMillis2 > QueueTimeoutException.Timeout) {
                                QueueTimeoutException queueTimeoutException = new QueueTimeoutException(onStringMessage.getClass(), str2, currentTimeMillis2);
                                RabbitQueue.log.warn(queueTimeoutException.getMessage(), queueTimeoutException);
                            }
                        }
                        if (RabbitQueue.this.channel == null || !RabbitQueue.this.channel.isOpen()) {
                            long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
                            if (currentTimeMillis3 > QueueTimeoutException.Timeout) {
                                QueueTimeoutException queueTimeoutException2 = new QueueTimeoutException(onStringMessage.getClass(), str2, currentTimeMillis3);
                                RabbitQueue.log.warn(queueTimeoutException2.getMessage(), queueTimeoutException2);
                                return;
                            }
                            return;
                        }
                        synchronized (RabbitQueue.this.channel) {
                            if (RabbitQueue.this.channel == null || !RabbitQueue.this.channel.isOpen()) {
                                long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis;
                                if (currentTimeMillis4 > QueueTimeoutException.Timeout) {
                                    QueueTimeoutException queueTimeoutException3 = new QueueTimeoutException(onStringMessage.getClass(), str2, currentTimeMillis4);
                                    RabbitQueue.log.warn(queueTimeoutException3.getMessage(), queueTimeoutException3);
                                    return;
                                }
                                return;
                            }
                            if (onStringMessage.consume(str2, true)) {
                                RabbitQueue.this.channel.basicAck(envelope.getDeliveryTag(), false);
                            } else {
                                RabbitQueue.this.channel.basicReject(envelope.getDeliveryTag(), true);
                            }
                            long currentTimeMillis5 = System.currentTimeMillis() - currentTimeMillis;
                            if (currentTimeMillis5 > QueueTimeoutException.Timeout) {
                                QueueTimeoutException queueTimeoutException4 = new QueueTimeoutException(onStringMessage.getClass(), str2, currentTimeMillis5);
                                RabbitQueue.log.warn(queueTimeoutException4.getMessage(), queueTimeoutException4);
                            }
                            if (MaintainConfig.build().illegalConsume()) {
                                RabbitQueue.log.warn("运维正在检修，异常消费 push 消息，队列编号 {}, 消息内容 {}", RabbitQueue.this.queueId, str2);
                            }
                        }
                    } catch (Throwable th) {
                        long currentTimeMillis6 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis6 > QueueTimeoutException.Timeout) {
                            QueueTimeoutException queueTimeoutException5 = new QueueTimeoutException(onStringMessage.getClass(), str2, currentTimeMillis6);
                            RabbitQueue.log.warn(queueTimeoutException5.getMessage(), queueTimeoutException5);
                        }
                        throw th;
                    }
                }
            });
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    public void pop(OnStringMessage onStringMessage) throws IOException {
        initChannel();
        for (int i = 0; i < this.maximum; i++) {
            try {
                GetResponse basicGet = this.channel.basicGet(this.queueId, false);
                if (basicGet == null) {
                    return;
                }
                String str = new String(basicGet.getBody());
                Envelope envelope = basicGet.getEnvelope();
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    try {
                        if (onStringMessage.consume(str, true)) {
                            this.channel.basicAck(envelope.getDeliveryTag(), false);
                        } else {
                            this.channel.basicReject(envelope.getDeliveryTag(), true);
                        }
                        if (MaintainConfig.build().illegalConsume()) {
                            log.warn("运维正在检修，异常消费 pull 消息，队列编号 {}, 消息内容 {}", this.queueId, str);
                        }
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis2 > QueueTimeoutException.Timeout) {
                            QueueTimeoutException queueTimeoutException = new QueueTimeoutException(onStringMessage.getClass(), str, currentTimeMillis2);
                            log.warn(queueTimeoutException.getMessage(), queueTimeoutException);
                        }
                    } catch (Exception e) {
                        this.channel.basicReject(envelope.getDeliveryTag(), true);
                        log.error(String.format("queueId %s, payload %s, message %s", this.queueId, str, e.getMessage()), e);
                        long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis3 > QueueTimeoutException.Timeout) {
                            QueueTimeoutException queueTimeoutException2 = new QueueTimeoutException(onStringMessage.getClass(), str, currentTimeMillis3);
                            log.warn(queueTimeoutException2.getMessage(), queueTimeoutException2);
                        }
                    }
                } catch (Throwable th) {
                    long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis;
                    if (currentTimeMillis4 > QueueTimeoutException.Timeout) {
                        QueueTimeoutException queueTimeoutException3 = new QueueTimeoutException(onStringMessage.getClass(), str, currentTimeMillis4);
                        log.warn(queueTimeoutException3.getMessage(), queueTimeoutException3);
                    }
                    throw th;
                }
            } catch (IOException e2) {
                log.error("queueId {}, message {}", new Object[]{this.queueId, e2.getMessage(), e2});
                return;
            }
        }
    }

    public String push(String str) {
        if (MaintainConfig.build().illegalProduce()) {
            log.warn("运维正在检修，异常生产消息，队列编号 {}, 消息内容 {}", this.queueId, str);
        }
        initChannel();
        try {
            this.channel.confirmSelect();
            this.channel.basicPublish(Utils.EMPTY, this.queueId, MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes(StandardCharsets.UTF_8));
            if (this.channel.waitForConfirms()) {
                return "ok";
            }
            log.error("{} 消息发送后确认失败", getClass().getSimpleName(), KnowallLog.of(str));
            return "failure";
        } catch (IOException | InterruptedException e) {
            log.error("{} 消息发送时出现异常 {}", new Object[]{this.queueId, e.getMessage(), KnowallLog.of(e, new String[0]).add("msg", str)});
            return "failure";
        }
    }

    public int getMaximum() {
        return this.maximum;
    }

    public void setMaximum(int i) {
        this.maximum = i;
    }

    public int getMessageCount() {
        int i = 0;
        try {
            i = this.channel.queueDeclare().getMessageCount();
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
        return i;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.channel != null) {
            synchronized (this.channel) {
                try {
                    this.channel.close();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
                this.channel = null;
            }
        }
    }
}
