package cn.cerc.db.queue;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;

/* loaded from: input_file:cn/cerc/db/queue/QueueConsumer.class */
public class QueueConsumer implements AutoCloseable {
    private static final ClientServiceProvider provider = ClientServiceProvider.loadService();
    private String topic;
    private String tag = "*";
    SimpleConsumer consumer;

    public QueueConsumer(String str, String str2) throws ClientException {
        ClientConfiguration build = ClientConfiguration.newBuilder().setEndpoints(QueueServer.getRmqEndpoint()).setCredentialProvider(new StaticSessionCredentialsProvider(QueueServer.getRmqAccessKeyId(), QueueServer.getRmqAccessSecret())).build();
        this.consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(build).setConsumerGroup("main").setAwaitDuration(Duration.ofSeconds(1L)).setSubscriptionExpressions(Collections.singletonMap(str, new FilterExpression(str2, FilterExpressionType.TAG))).build();
    }

    public String getTopic() {
        return this.topic;
    }

    public String getTag() {
        return this.tag;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            if (this.consumer != null) {
                this.consumer.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public int recevie(QueueProcesser queueProcesser) throws ClientException {
        List<MessageView> receive = this.consumer.receive(16, Duration.ofMinutes(10L));
        for (MessageView messageView : receive) {
            try {
                if (queueProcesser.processMessage(StandardCharsets.UTF_8.decode(messageView.getBody()).toString())) {
                    this.consumer.ack(messageView);
                }
            } catch (Exception e) {
            }
        }
        return receive.size();
    }

    public MessageView recevie() throws ClientException {
        Iterator it = this.consumer.receive(1, Duration.ofMinutes(10L)).iterator();
        if (it.hasNext()) {
            return (MessageView) it.next();
        }
        return null;
    }

    public void ack(MessageView messageView) throws ClientException {
        this.consumer.ack(messageView);
    }

    public static void main(String[] strArr) throws ClientException {
        QueueProcesser queueProcesser = str -> {
            System.out.println("消息内容: " + str);
            return true;
        };
        QueueConsumer queueConsumer = new QueueConsumer("TopicTestMQ", "fpl");
        try {
            System.out.println(String.format("有读到 %s 条消息", Integer.valueOf(queueConsumer.recevie(queueProcesser))));
            queueConsumer.close();
        } catch (Throwable th) {
            try {
                queueConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
