package cn.cerc.db.queue;

import cn.cerc.db.core.ServerConfig;
import com.aliyun.rocketmq20220801.Client;
import com.aliyun.rocketmq20220801.models.CreateConsumerGroupRequest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:cn/cerc/db/queue/QueueConsumer.class */
public class QueueConsumer implements AutoCloseable, OnMessageRecevie {
    private PushConsumer pushConsumer;
    private SimpleConsumer pullConsumer;
    private static final Logger log = LoggerFactory.getLogger(QueueConsumer.class);
    private static final Map<String, OnStringMessage> items1 = new HashMap();
    private static final Map<String, FilterExpression> items2 = new HashMap();
    private static final QueueConsumer INSTANCE = new QueueConsumer();

    public static QueueConsumer getInstance() {
        return INSTANCE;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.pushConsumer != null) {
            try {
                this.pushConsumer.close();
                this.pushConsumer = null;
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
        }
        if (this.pullConsumer != null) {
            try {
                this.pullConsumer.close();
                this.pullConsumer = null;
            } catch (IOException e2) {
                log.error(e2.getMessage(), e2);
            }
        }
    }

    public void addConsumer(String str, String str2, OnStringMessage onStringMessage) {
        String str3 = str + "-" + str2;
        log.debug("register consumer: {}", str3);
        items1.put(str3, onStringMessage);
        items2.put(str, new FilterExpression(str2, FilterExpressionType.TAG));
    }

    @Override // cn.cerc.db.queue.OnMessageRecevie
    public boolean consume(MessageView messageView) {
        String str = messageView.getTopic() + "-" + ((String) messageView.getTag().orElse(null));
        log.info("收到一条消息：{}", str);
        String charBuffer = StandardCharsets.UTF_8.decode(messageView.getBody()).toString();
        OnStringMessage onStringMessage = items1.get(str);
        if (onStringMessage != null) {
            return onStringMessage.consume(charBuffer);
        }
        log.error("未注册消息对象{}, data:", str, charBuffer);
        return true;
    }

    @Scheduled(initialDelay = 60000, fixedRate = 5000)
    public void startService() {
        if (items1.size() > 0) {
            log.info("成功注册的推送消息数量：" + items2.size());
            startPush();
        }
    }

    public void startPush() {
        if (this.pushConsumer == null && items1.size() != 0) {
            Client client = QueueServer.getClient();
            String groupId = getGroupId();
            try {
                if (client.getConsumerGroup(QueueServer.getInstanceId(), groupId).getBody().getData() == null) {
                    CreateConsumerGroupRequest createConsumerGroupRequest = new CreateConsumerGroupRequest();
                    createConsumerGroupRequest.setDeliveryOrderType("Concurrently");
                    CreateConsumerGroupRequest.CreateConsumerGroupRequestConsumeRetryPolicy createConsumerGroupRequestConsumeRetryPolicy = new CreateConsumerGroupRequest.CreateConsumerGroupRequestConsumeRetryPolicy();
                    createConsumerGroupRequestConsumeRetryPolicy.setMaxRetryTimes(16);
                    createConsumerGroupRequestConsumeRetryPolicy.setRetryPolicy("FixedRetryPolicy");
                    createConsumerGroupRequest.setConsumeRetryPolicy(createConsumerGroupRequestConsumeRetryPolicy);
                    if (!client.createConsumerGroup(QueueServer.getInstanceId(), groupId, createConsumerGroupRequest).getBody().getSuccess().booleanValue()) {
                        log.error("创建消费组 {} 失败");
                        return;
                    }
                }
                ClientConfiguration config = QueueServer.getConfig();
                try {
                    PushConsumerBuilder newPushConsumerBuilder = ClientServiceProvider.loadService().newPushConsumerBuilder();
                    newPushConsumerBuilder.setConsumerGroup(groupId);
                    newPushConsumerBuilder.setClientConfiguration(config);
                    newPushConsumerBuilder.setSubscriptionExpressions(items2);
                    newPushConsumerBuilder.setConsumptionThreadCount(5);
                    newPushConsumerBuilder.setMessageListener(messageView -> {
                        return consume(messageView) ? ConsumeResult.SUCCESS : ConsumeResult.FAILURE;
                    });
                    this.pushConsumer = newPushConsumerBuilder.build();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            } catch (Exception e2) {
                log.error(e2.getMessage(), e2);
            }
        }
    }

    public int startPull() {
        if (this.pullConsumer == null) {
            try {
                this.pullConsumer = QueueServer.loadService().newSimpleConsumerBuilder().setClientConfiguration(QueueServer.getConfig()).setConsumerGroup(getGroupId()).setAwaitDuration(Duration.ofSeconds(5L)).setSubscriptionExpressions(items2).build();
            } catch (ClientException e) {
                log.error(e.getMessage());
                e.printStackTrace();
                return 0;
            }
        }
        try {
            List<MessageView> receive = this.pullConsumer.receive(100, Duration.ofSeconds(10L));
            for (MessageView messageView : receive) {
                if (consume(messageView)) {
                    this.pullConsumer.ack(messageView);
                }
            }
            return receive.size();
        } catch (ClientException e2) {
            log.error(e2.getMessage());
            e2.printStackTrace();
            return 0;
        }
    }

    public String getGroupId() {
        return String.format("%s-%s-%s", ServerConfig.getAppProduct(), ServerConfig.getAppIndustry(), ServerConfig.getAppVersion());
    }
}
