package cn.cerc.db.queue;

import cn.cerc.db.core.ClassData;
import cn.cerc.db.core.IHandle;
import cn.cerc.db.core.ServerConfig;
import cn.cerc.db.redis.Redis;
import cn.cerc.db.zk.ZkConfig;
import java.time.Duration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;

/* loaded from: input_file:cn/cerc/db/queue/AbstractQueue.class */
public abstract class AbstractQueue implements OnStringMessage, Watcher {
    private static final Logger log = LoggerFactory.getLogger(AbstractQueue.class);
    private static ZkConfig config;
    private QueueServiceEnum service;
    private boolean initTopic;
    private long delayTime = 0;
    private String industry;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: cn.cerc.db.queue.AbstractQueue$1, reason: invalid class name */
    /* loaded from: input_file:cn/cerc/db/queue/AbstractQueue$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$cn$cerc$db$queue$QueueServiceEnum = new int[QueueServiceEnum.values().length];

        static {
            try {
                $SwitchMap$cn$cerc$db$queue$QueueServiceEnum[QueueServiceEnum.Redis.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$cn$cerc$db$queue$QueueServiceEnum[QueueServiceEnum.AliyunMNS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$cn$cerc$db$queue$QueueServiceEnum[QueueServiceEnum.RocketMQ.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public AbstractQueue() {
        setService(ServerConfig.getQueueService());
        setIndustry(ServerConfig.getAppIndustry());
    }

    public String getTopic() {
        return getClass().getSimpleName();
    }

    public final String getTag() {
        return String.format("%s-%s", ServerConfig.getAppVersion(), getIndustry());
    }

    public final String getId() {
        return getTopic() + "-" + getTag();
    }

    public String getIndustry() {
        return this.industry;
    }

    public void setIndustry(String str) {
        this.industry = str;
    }

    protected void setIndustryByCorpNo(IHandle iHandle, String str) {
        throw new RuntimeException("从数据库取得相应的产业代码");
    }

    protected void setDelayTime(long j) {
        this.delayTime = j;
    }

    public final long getDelayTime() {
        return this.delayTime;
    }

    public void startService(QueueConsumer queueConsumer) {
        try {
            ZkConfig zkConfig = new ZkConfig(String.format("/app/%s", ServerConfig.getAppName()));
            String path = zkConfig.path("status");
            if (zkConfig.client().exists(path, this) == null) {
                zkConfig.setValue("status", "running");
                if (zkConfig.client().exists(path, this) == null) {
                    log.warn("配置有误，无法启动消息队列");
                    return;
                }
            }
            config().setTempNode(getClass().getSimpleName(), "running");
            log.info("注册消息服务：{} from {}", getId(), this.service.name());
            if (this.service == QueueServiceEnum.RocketMQ) {
                initTopic();
                queueConsumer.addConsumer(getTopic(), getTag(), this);
            }
        } catch (KeeperException | InterruptedException e) {
            log.error(e.getMessage());
            e.printStackTrace();
        }
    }

    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getType() == Watcher.Event.EventType.DataWatchRemoved) {
            log.info("此主机运行状态被移除");
            stopService();
        }
    }

    public void stopService() {
        log.info("{} 关闭了消息推送服务", getTopic());
        config().delete(getClass().getSimpleName());
    }

    private ZkConfig config() {
        if (config == null) {
            config = new ZkConfig(String.format("/app/%s/task", ServerConfig.getAppName()));
        }
        return config;
    }

    protected String push(String str) {
        switch (AnonymousClass1.$SwitchMap$cn$cerc$db$queue$QueueServiceEnum[this.service.ordinal()]) {
            case ClassData.PUBLIC /* 1 */:
                Redis redis = new Redis();
                try {
                    redis.lpush(getId(), str);
                    redis.close();
                    return "push redis ok";
                } catch (Throwable th) {
                    try {
                        redis.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            case ClassData.PRIVATE /* 2 */:
                return MnsServer.getQueue(getId()).push(str);
            case 3:
                initTopic();
                try {
                    String append = new QueueProducer(getTopic(), getTag()).append(str, Duration.ofSeconds(getDelayTime()));
                    log.info("发送消息成功  {} {} {}", new Object[]{getTopic(), getTag(), append});
                    return append;
                } catch (ClientException e) {
                    log.error(e.getMessage());
                    e.printStackTrace();
                    return null;
                }
            default:
                return null;
        }
    }

    private void initTopic() {
        if (this.initTopic) {
            return;
        }
        QueueServer.createTopic(getTopic(), getDelayTime() > 0);
        this.initTopic = true;
    }

    protected void receiveMessage() {
        switch (AnonymousClass1.$SwitchMap$cn$cerc$db$queue$QueueServiceEnum[this.service.ordinal()]) {
            case ClassData.PUBLIC /* 1 */:
                Redis redis = new Redis();
                try {
                    String rpop = redis.rpop(getId());
                    if (rpop != null) {
                        consume(rpop);
                    }
                    redis.close();
                    return;
                } catch (Throwable th) {
                    try {
                        redis.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            case ClassData.PRIVATE /* 2 */:
                MnsServer.getQueue(getId()).pop(100, this);
                return;
            default:
                log.error("{} 不支持消息拉取模式:", this.service.name());
                return;
        }
    }

    protected QueueServiceEnum getService() {
        return this.service;
    }

    public void setService(QueueServiceEnum queueServiceEnum) {
        this.service = queueServiceEnum;
    }

    @Scheduled(initialDelay = 30000, fixedRate = 3000)
    public void defaultCheck() {
        if (ServerConfig.enableTaskService()) {
            switch (AnonymousClass1.$SwitchMap$cn$cerc$db$queue$QueueServiceEnum[getService().ordinal()]) {
                case ClassData.PUBLIC /* 1 */:
                case ClassData.PRIVATE /* 2 */:
                    receiveMessage();
                    return;
                default:
                    return;
            }
        }
    }
}
