package cn.cerc.db.queue;

import cn.cerc.db.core.ClassData;
import cn.cerc.db.core.Datetime;
import cn.cerc.db.core.ServerConfig;
import cn.cerc.db.core.Utils;
import cn.cerc.db.queue.mns.MnsServer;
import cn.cerc.db.queue.rabbitmq.RabbitQueue;
import cn.cerc.db.queue.sqlmq.SqlmqQueue;
import cn.cerc.db.queue.sqlmq.SqlmqQueueName;
import cn.cerc.db.queue.sqlmq.SqlmqServer;
import cn.cerc.db.redis.Redis;
import cn.cerc.db.zk.ZkConfig;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;

/* loaded from: input_file:cn/cerc/db/queue/AbstractQueue.class */
public abstract class AbstractQueue implements OnStringMessage, Watcher, Runnable {
    private static final Logger log = LoggerFactory.getLogger(AbstractQueue.class);
    private static final int processors = Runtime.getRuntime().availableProcessors();
    public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors * 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(1024), new ThreadPoolExecutor.CallerRunsPolicy());
    private static ZkConfig config;
    private QueueServiceEnum service;
    private Datetime showTime;
    private String original;
    private String order;
    private String groupCode;
    private int executionSequence;
    private boolean pushMode = false;
    private int delayTime = 60;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: cn.cerc.db.queue.AbstractQueue$1, reason: invalid class name */
    /* loaded from: input_file:cn/cerc/db/queue/AbstractQueue$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$cn$cerc$db$queue$QueueServiceEnum = new int[QueueServiceEnum.values().length];

        static {
            try {
                $SwitchMap$cn$cerc$db$queue$QueueServiceEnum[QueueServiceEnum.Redis.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$cn$cerc$db$queue$QueueServiceEnum[QueueServiceEnum.AliyunMNS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$cn$cerc$db$queue$QueueServiceEnum[QueueServiceEnum.Sqlmq.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$cn$cerc$db$queue$QueueServiceEnum[QueueServiceEnum.RabbitMQ.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public AbstractQueue() {
        setService(ServerConfig.getQueueService());
        setOriginal(ServerConfig.getAppOriginal());
    }

    public String getTopic() {
        return getClass().getSimpleName();
    }

    public final String getTag() {
        return String.format("%s-%s", ServerConfig.getAppVersion(), getOriginal());
    }

    public final String getId() {
        Objects.requireNonNull(getTopic());
        return getTopic() + "-" + getTag();
    }

    public String getOriginal() {
        return this.original;
    }

    protected void setOriginal(String str) {
        Objects.requireNonNull(str);
        this.original = str;
    }

    protected void setDelayTime(int i) {
        this.delayTime = i;
    }

    public final long getDelayTime() {
        return this.delayTime;
    }

    public void setShowTime(Datetime datetime) {
        this.showTime = datetime;
    }

    public final Optional<Datetime> getShowTime() {
        return Optional.ofNullable(this.showTime);
    }

    public void startService() {
        try {
            ZkConfig zkConfig = new ZkConfig(String.format("/app/%s", ServerConfig.getAppName()));
            String path = zkConfig.path("status");
            if (zkConfig.client().exists(path, this) == null) {
                zkConfig.setValue("status", "running");
                if (zkConfig.client().exists(path, this) == null) {
                    log.warn("配置有误，无法启动消息队列");
                    return;
                }
            }
            config().setTempNode(getClass().getSimpleName(), "running");
            log.info("注册消息服务：{} from {}", getId(), getService().name());
        } catch (KeeperException | InterruptedException e) {
            log.error(e.getMessage());
            e.printStackTrace();
        }
    }

    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getType() == Watcher.Event.EventType.DataWatchRemoved) {
            log.info("此主机运行状态被移除");
            stopService();
        }
    }

    public void stopService() {
        log.info("{} 关闭了消息推送服务", getTopic());
        config().delete(getClass().getSimpleName());
    }

    private ZkConfig config() {
        if (config == null) {
            config = new ZkConfig(String.format("/app/%s/task", ServerConfig.getAppName()));
        }
        return config;
    }

    protected String push(String str) {
        switch (AnonymousClass1.$SwitchMap$cn$cerc$db$queue$QueueServiceEnum[getService().ordinal()]) {
            case 1:
                Redis redis = new Redis();
                try {
                    redis.lpush(getId(), str);
                    redis.close();
                    return "push redis ok";
                } catch (Throwable th) {
                    try {
                        redis.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            case ClassData.PRIVATE /* 2 */:
                return MnsServer.getQueue(getId()).push(str);
            case 3:
                SqlmqQueueName.register(getClass());
                if (this.executionSequence > 1) {
                    setShowTime(new Datetime().inc(Datetime.DateType.Year, 1));
                } else if (!Utils.isEmpty(this.groupCode) && this.executionSequence < 1) {
                    throw new RuntimeException("执行序列号不能小于1");
                }
                SqlmqQueue queue = SqlmqServer.getQueue(getId());
                queue.setDelayTime(this.delayTime);
                queue.setShowTime(this.showTime);
                queue.setService(this.service);
                queue.setQueueClass(getClass().getSimpleName());
                return queue.push(str, this.order, this.groupCode, this.executionSequence);
            case ClassData.PROTECTED /* 4 */:
                RabbitQueue rabbitQueue = new RabbitQueue(getId());
                try {
                    String push = rabbitQueue.push(str);
                    rabbitQueue.close();
                    return push;
                } catch (Throwable th3) {
                    try {
                        rabbitQueue.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                    throw th3;
                }
            default:
                return null;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        switch (AnonymousClass1.$SwitchMap$cn$cerc$db$queue$QueueServiceEnum[getService().ordinal()]) {
            case 1:
                Redis redis = new Redis();
                try {
                    String rpop = redis.rpop(getId());
                    if (rpop != null) {
                        consume(rpop, true);
                    }
                    redis.close();
                    return;
                } catch (Throwable th) {
                    try {
                        redis.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            case ClassData.PRIVATE /* 2 */:
                MnsServer.getQueue(getId()).pop(100, this);
                return;
            case 3:
                SqlmqServer.getQueue(getId()).pop(100, this);
                return;
            case ClassData.PROTECTED /* 4 */:
                RabbitQueue rabbitQueue = new RabbitQueue(getId());
                try {
                    rabbitQueue.setMaximum(100);
                    rabbitQueue.pop(this);
                    rabbitQueue.close();
                    return;
                } catch (Throwable th3) {
                    try {
                        rabbitQueue.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                    throw th3;
                }
            default:
                log.error("{} 不支持消息拉取模式:", getService().name());
                return;
        }
    }

    public final QueueServiceEnum getService() {
        return this.service;
    }

    public final void setService(QueueServiceEnum queueServiceEnum) {
        this.service = queueServiceEnum;
    }

    @Scheduled(initialDelay = 30000, fixedRate = 300)
    public void defaultCheck() {
        if (!isPushMode() && ServerConfig.enableTaskService()) {
            switch (AnonymousClass1.$SwitchMap$cn$cerc$db$queue$QueueServiceEnum[getService().ordinal()]) {
                case 1:
                case ClassData.PRIVATE /* 2 */:
                case ClassData.PROTECTED /* 4 */:
                    log.debug("thread pool add {} job {}", Thread.currentThread(), getClass().getSimpleName());
                    executor.submit(this);
                    return;
                case 3:
                    log.debug("{} sqlmq check job {}", Thread.currentThread(), getClass().getSimpleName());
                    run();
                    return;
                default:
                    return;
            }
        }
    }

    public String getOrder() {
        return this.order;
    }

    public void setOrder(String str) {
        this.order = str;
    }

    public boolean isPushMode() {
        return this.pushMode;
    }

    protected void setPushMode(boolean z) {
        this.pushMode = z;
    }

    protected void pushToSqlmq(String str) {
        if (getService() == QueueServiceEnum.Sqlmq) {
            return;
        }
        SqlmqQueue queue = SqlmqServer.getQueue(getId());
        queue.setService(this.service);
        queue.setDelayTime(this.delayTime);
        queue.setQueueClass(getClass().getSimpleName());
        queue.push(str, this.order);
    }

    public static void close() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(5L, TimeUnit.MINUTES)) {
                log.error("队列线程池等待关闭失败");
            }
        } catch (InterruptedException e) {
            log.error("等待线程池关闭超时了 {}", e.getMessage(), e);
        }
        if (executor.isTerminated()) {
            log.info("队列线程池中的任务已全部执行完毕");
            return;
        }
        log.warn("仍有线程任务执行 ->");
        log.warn("当前的核心线程数 {}", Integer.valueOf(executor.getCorePoolSize()));
        log.warn("当前的线程池大小 {}", Integer.valueOf(executor.getPoolSize()));
        log.warn("当前的活动线程数 {}", Integer.valueOf(executor.getActiveCount()));
        log.warn("等待执行的任务数 {}", Integer.valueOf(executor.getQueue().size()));
        log.warn("已完成的任务数量 {}", Long.valueOf(executor.getCompletedTaskCount()));
        log.warn("累计的总任务数量 {}", Long.valueOf(executor.getTaskCount()));
    }

    public String getGroupCode() {
        return this.groupCode;
    }

    public void setGroupCode(String str) {
        this.groupCode = str;
    }

    public int getExecutionSequence() {
        return this.executionSequence;
    }

    public void setExecutionSequence(int i) {
        this.executionSequence = i;
    }
}
