package cn.cerc.db.queue;

import cn.cerc.db.core.Utils;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.model.PagingListResult;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.rocketmq20220801.Client;
import com.aliyun.rocketmq20220801.models.CreateTopicRequest;
import com.aliyun.rocketmq20220801.models.ListTopicsRequest;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.rocketmq.client.java.message.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/cerc/db/queue/AbstractQueue.class */
public abstract class AbstractQueue implements QueueImpl {
    private static final Logger log = LoggerFactory.getLogger(AbstractQueue.class);
    private static List<String> created = new ArrayList();
    private CloudQueue cloudQueue;
    protected RmqQueue rmqQueue;

    @Override // cn.cerc.db.queue.QueueImpl
    public abstract String getQueueId();

    @Override // cn.cerc.db.queue.QueueImpl
    public CloudQueue getQueue() {
        if (this.cloudQueue == null) {
            this.cloudQueue = createQueue(this, getQueueId());
        }
        return this.cloudQueue;
    }

    public AbstractQueue() {
        try {
            getRmqQueue();
        } catch (Exception e) {
            log.error(String.format("队列 %s 初始化失败", getQueueId()), e);
        }
    }

    @Override // cn.cerc.db.queue.QueueImpl
    public RmqQueue getRmqQueue() throws Exception {
        if (this.rmqQueue == null) {
            this.rmqQueue = createRmqQueue(this, getQueueId());
        }
        return this.rmqQueue;
    }

    private static synchronized CloudQueue createQueue(AbstractQueue abstractQueue, String str) {
        MNSClient mNSClient = QueueServer.getMNSClient();
        if (created.contains(str)) {
            log.debug("直接返回消息队列 {}", str);
            return mNSClient.getQueueRef(str);
        }
        PagingListResult listQueue = mNSClient.listQueue(str, Utils.EMPTY, 100);
        if (listQueue != null) {
            Iterator it = listQueue.getResult().iterator();
            while (it.hasNext()) {
                if (((QueueMeta) it.next()).getQueueName().equals(str)) {
                    created.add(str);
                    log.debug("查找并返回消息队列 {}", str);
                    return mNSClient.getQueueRef(str);
                }
            }
        }
        QueueMeta queueMeta = new QueueMeta();
        queueMeta.setQueueName(str);
        abstractQueue.onCreateQueue(queueMeta);
        created.add(str);
        log.debug("创建新的消息队列 {}", str);
        return mNSClient.createQueue(queueMeta);
    }

    private static synchronized RmqQueue createRmqQueue(AbstractQueue abstractQueue, String str) throws Exception {
        Client rmqClient = QueueServer.getRmqClient();
        if (created.contains(str)) {
            log.debug("直接返回消息队列 {}", str);
            return new RmqQueue(str);
        }
        ListTopicsRequest listTopicsRequest = new ListTopicsRequest();
        try {
            listTopicsRequest.setPageNumber(1);
            listTopicsRequest.setPageSize(100);
            if (rmqClient.listTopics(QueueServer.getRmqInstanceId(), listTopicsRequest).getBody().getData().getList().stream().anyMatch(listTopicsResponseBodyDataList -> {
                return str.equals(listTopicsResponseBodyDataList.getTopicName());
            })) {
                return new RmqQueue(str);
            }
            CreateTopicRequest createTopicRequest = new CreateTopicRequest();
            createTopicRequest.setMessageType(MessageType.NORMAL.name());
            if (rmqClient.createTopic(QueueServer.getRmqInstanceId(), str, createTopicRequest).getBody().getSuccess().booleanValue()) {
                return new RmqQueue(str);
            }
            return null;
        } catch (Exception e) {
            throw e;
        }
    }

    protected void onCreateQueue(QueueMeta queueMeta) {
        queueMeta.setPollingWaitSeconds(0);
        queueMeta.setMaxMessageSize(65356L);
        queueMeta.setMessageRetentionPeriod(72000L);
        queueMeta.setVisibilityTimeout(180L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getMessageBody(Message message) {
        return message.getMessageBody();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message popMessage() {
        try {
            Message popMessage = getQueue().popMessage();
            if (popMessage == null) {
                return null;
            }
            log.debug("messageBody：{}", popMessage.getMessageBodyAsString());
            log.debug("messageId：{}", popMessage.getMessageId());
            log.debug("receiptHandle：{}", popMessage.getReceiptHandle());
            log.debug(popMessage.getMessageBody());
            return popMessage;
        } catch (ClientException e) {
            if (e.getMessage().indexOf("返回结果无效，无法解析。") > -1) {
                return null;
            }
            System.out.println("执行异常：" + e.getMessage());
            return null;
        }
    }
}
