package cn.cerc.db.queue;

import cn.cerc.db.core.ServerConfig;
import cn.cerc.db.zk.ZkConfig;
import java.time.Duration;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/cerc/db/queue/AbstractQueue.class */
public abstract class AbstractQueue implements OnStringMessage, ServletContextListener, Watcher {
    private static final Logger log = LoggerFactory.getLogger(AbstractQueue.class);
    private static QueueConsumer consumer = QueueConsumer.getInstance();
    private static ZkConfig config;
    private long delayTime = 0;

    public AbstractQueue() {
        log.debug("{} is init ", getClass().getSimpleName());
        QueueServer.createTopic(getTopic(), getDelayTime() > 0);
    }

    public abstract String getTopic();

    public String getTag() {
        return QueueConfig.tag;
    }

    public long getDelayTime() {
        return this.delayTime;
    }

    public void contextInitialized(ServletContextEvent servletContextEvent) {
        if (ServerConfig.enableTaskService()) {
            startService();
        } else {
            log.info("当前主机没有开启消息队列服务：{}", getClass().getSimpleName());
        }
    }

    public void contextDestroyed(ServletContextEvent servletContextEvent) {
        stopService();
    }

    public void startService() {
        try {
            ZkConfig zkConfig = new ZkConfig(String.format("/app/%s", ServerConfig.getAppName()));
            String path = zkConfig.path("status");
            if (zkConfig.client().exists(path, this) == null) {
                zkConfig.setValue("status", "running");
                if (zkConfig.client().exists(path, this) == null) {
                    log.warn("配置有误，无法启动消息队列");
                    return;
                }
            }
            config().setTempNode(getClass().getSimpleName(), "running");
            log.info("注册消息推送服务：{}", getTopic());
            consumer.addConsumer(getTopic(), getTag(), this);
        } catch (KeeperException | InterruptedException e) {
            log.error(e.getMessage());
            e.printStackTrace();
        }
    }

    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getType() == Watcher.Event.EventType.DataWatchRemoved) {
            log.info("此主机运行状态被移除");
            stopService();
        }
    }

    public void stopService() {
        if (consumer == null) {
            return;
        }
        config().delete(getClass().getSimpleName());
        log.info("{} 关闭了消息推送服务", getTopic());
        consumer.close();
        consumer = null;
    }

    private ZkConfig config() {
        if (config == null) {
            config = new ZkConfig(String.format("/app/%s/task", ServerConfig.getAppName()));
        }
        return config;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String sendMessage(String str) {
        try {
            return new QueueProducer(getTopic(), getTag()).append(str, Duration.ofSeconds(this.delayTime));
        } catch (ClientException e) {
            log.error(e.getMessage());
            e.printStackTrace();
            return null;
        }
    }
}
