package cn.cerc.db.queue;

import cn.cerc.db.core.ConfigReader;
import cn.cerc.db.redis.RedisClient;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:cn/cerc/db/queue/JobManager.class */
public class JobManager implements MessageListener, ApplicationRunner {
    public static final String Task_Service = "task.service";
    public Set<JobRunnable> jobs = new HashSet();
    public JobState jobState = JobState.Pause;
    private ConfigReader config;
    private RedisClient redis;

    @Autowired
    Environment environment;
    private ServerRegister serverRegister;
    private static final Logger log = LoggerFactory.getLogger(JobManager.class);
    private static final ServerList list = new ServerList();

    @PostConstruct
    public void init() {
        int property = this.config.getProperty(Task_Service, 0);
        if (property < 0 || property >= JobState.values().length) {
            return;
        }
        this.jobState = JobState.values()[property];
    }

    public void run(ApplicationArguments applicationArguments) throws Exception {
        list.init(this.environment, this.config);
        this.redis.subscribe(this, ServerNode.JobId);
    }

    @Resource
    public void setRedisClient(RedisClient redisClient) {
        this.redis = redisClient;
    }

    public void onMessage(Message message, byte[] bArr) {
        String obj = message.toString();
        if (obj == null) {
            return;
        }
        if (obj.startsWith("shutdown ")) {
            String[] split = obj.split(" ");
            if (!list.localhost().equals(split[1])) {
                list.hosts().remove(split[1]);
                return;
            } else {
                setJobState(JobState.Close);
                showdown();
                return;
            }
        }
        list.register(message.toString(), null);
        if (JobState.Close != this.jobState) {
            String group = list.group();
            String version = list.version();
            if (list.getHosts().stream().filter(serverNode -> {
                return serverNode.getGroup().equals(group) && serverNode.getVersion().compareTo(version) > 0 && serverNode.isPublish();
            }).findAny().isPresent()) {
                setJobState(JobState.Pause);
            } else {
                setJobState(JobState.Play);
            }
        }
    }

    private void showdown() {
        int i = 0;
        while (i < 3600) {
            i++;
            Optional<JobRunnable> findAny = this.jobs.stream().filter(jobRunnable -> {
                return jobRunnable.isWorking();
            }).findAny();
            if (findAny.isEmpty()) {
                break;
            }
            if (i == 3600) {
                log.error("无法等待 {} 关闭", findAny.get().getClass().getName());
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.serverRegister.shutdown(ServerNode.JobId);
    }

    private void setJobState(JobState jobState) {
        if (this.jobState == jobState) {
            return;
        }
        this.jobState = jobState;
        for (JobRunnable jobRunnable : this.jobs) {
            try {
                if (!jobRunnable.notice(jobState)) {
                    log.warn("{} 不能配合执行指令：{}", jobRunnable.getClass().getSimpleName(), jobState.name());
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
        log.info("发送关闭通知：" + jobState.name() + "，共计通知对象数量：" + this.jobs.size());
    }

    public JobState register(JobRunnable jobRunnable) {
        this.jobs.add(jobRunnable);
        return this.jobState;
    }

    public void unregister(JobRunnable jobRunnable) {
        this.jobs.remove(jobRunnable);
    }

    public boolean lock(JobRunnable jobRunnable, int i) {
        if (this.jobState != JobState.Play) {
            return false;
        }
        String str = list.group() + "-" + jobRunnable.getClass().getName();
        if (!this.redis.setnx(str, this.redis.get(str))) {
            return false;
        }
        this.redis.expire(str, i);
        return true;
    }

    public void unlock(JobRunnable jobRunnable) {
        this.redis.del(list.group() + "-" + jobRunnable.getClass().getName());
    }

    public JobState jobState() {
        return this.jobState;
    }

    public boolean disabled() {
        return this.jobState != JobState.Play;
    }

    @Resource
    public void setServerRegister(ServerRegister serverRegister) {
        this.serverRegister = serverRegister;
    }

    @Resource
    public void setConfigReader(ConfigReader configReader) {
        this.config = configReader;
    }
}
