package cn.cerc.db.queue;

import cn.cerc.db.core.DataSet;
import cn.cerc.db.core.ServerConfig;
import com.aliyun.rocketmq20220801.Client;
import com.aliyun.rocketmq20220801.models.CreateConsumerGroupRequest;
import com.aliyun.rocketmq20220801.models.GetConsumerGroupResponseBody;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/cerc/db/queue/QueueConsumer.class */
public class QueueConsumer {
    private static final Logger log = LoggerFactory.getLogger(DataSet.class);
    protected static final Map<String, QueueConsumer> consumers = new HashMap();
    private String topic;
    private String tag;
    private PushConsumer consumer;

    /* loaded from: input_file:cn/cerc/db/queue/QueueConsumer$OnMessageCallback.class */
    public interface OnMessageCallback {
        boolean consume(String str);
    }

    public static QueueConsumer create(String str, String str2, OnMessageCallback onMessageCallback) {
        if (consumers.containsKey(String.format("%s-%s", str, str2))) {
            return consumers.get(String.format("%s-%s", str, str2));
        }
        QueueConsumer queueConsumer = new QueueConsumer(str, str2, onMessageCallback);
        consumers.put(String.format("%s-%s", str, str2), queueConsumer);
        return queueConsumer;
    }

    public PushConsumer consumer() {
        return this.consumer;
    }

    public void close() {
        if (this.consumer != null) {
            try {
                this.consumer.close();
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
        }
    }

    private QueueConsumer(String str, String str2, OnMessageCallback onMessageCallback) {
        this.topic = str;
        this.tag = str2;
        if (ServerConfig.enableTaskService()) {
            String format = String.format("%s-%s-%s", "G", str, str2);
            Client client = QueueServer.getClient();
            try {
                GetConsumerGroupResponseBody.GetConsumerGroupResponseBodyData data = client.getConsumerGroup(QueueServer.getInstanceId(), format).getBody().getData();
                if (data == null || !"RUNNING".equals(data.getStatus())) {
                    CreateConsumerGroupRequest createConsumerGroupRequest = new CreateConsumerGroupRequest();
                    createConsumerGroupRequest.setDeliveryOrderType("Concurrently");
                    CreateConsumerGroupRequest.CreateConsumerGroupRequestConsumeRetryPolicy createConsumerGroupRequestConsumeRetryPolicy = new CreateConsumerGroupRequest.CreateConsumerGroupRequestConsumeRetryPolicy();
                    createConsumerGroupRequestConsumeRetryPolicy.setMaxRetryTimes(16);
                    createConsumerGroupRequestConsumeRetryPolicy.setRetryPolicy("FixedRetryPolicy");
                    createConsumerGroupRequest.setConsumeRetryPolicy(createConsumerGroupRequestConsumeRetryPolicy);
                    if (!client.createConsumerGroup(QueueServer.getInstanceId(), format, createConsumerGroupRequest).getBody().getSuccess().booleanValue()) {
                        log.error("创建消费组 {} 失败");
                        return;
                    }
                }
                log.info("{}, {}, {} ,{} consumer is creating", new Object[]{str, str2, format, Thread.currentThread()});
                try {
                    this.consumer = ClientServiceProvider.loadService().newPushConsumerBuilder().setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints(QueueServer.getEndpoint()).setCredentialProvider(new StaticSessionCredentialsProvider(QueueServer.getAccessKeyId(), QueueServer.getAccessSecret())).build()).setConsumerGroup(format).setSubscriptionExpressions(Collections.singletonMap(str, new FilterExpression(str2, FilterExpressionType.TAG))).setMessageListener(messageView -> {
                        return onMessageCallback.consume(StandardCharsets.UTF_8.decode(messageView.getBody()).toString()) ? ConsumeResult.SUCCESS : ConsumeResult.FAILURE;
                    }).build();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            } catch (Exception e2) {
                log.error(e2.getMessage(), e2);
            }
        }
    }

    public String getTopic() {
        return this.topic;
    }

    public String getTag() {
        return this.tag;
    }

    public MessageView recevie() {
        return null;
    }

    public void delete(MessageView messageView) {
    }

    public static void main(String[] strArr) throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add("a");
        arrayList.add("b");
        arrayList.add("c");
        arrayList.add("d");
        arrayList.add("e");
        arrayList.add("f");
        arrayList.add("g");
        arrayList.add("h");
        arrayList.add("i");
        arrayList.add("j");
        arrayList.add("k");
        arrayList.add("l");
        arrayList.add("m");
        QueueServer.createTopic("test", false);
        arrayList.forEach(str -> {
            new Thread(() -> {
                create("test", str, str -> {
                    System.out.println(str + "---" + str);
                    return true;
                });
            }).start();
        });
    }
}
