当前位置: 首页 > news >正文

聊聊PowerJob日志的上报及存储

本文主要研究一下PowerJob的日志上报及存储

OmsLoggerFactory.build

tech/powerjob/worker/log/OmsLoggerFactory.java

public class OmsLoggerFactory {public static OmsLogger build(Long instanceId, String logConfig, WorkerRuntime workerRuntime) {LogConfig cfg;if (StringUtils.isEmpty(logConfig)) {cfg = new LogConfig();} else {try {cfg = JsonUtils.parseObject(logConfig, LogConfig.class);} catch (Exception ignore) {cfg = new LogConfig();}}switch (LogType.of(cfg.getType())) {case LOCAL:return new OmsLocalLogger(cfg);case STDOUT:return new OmsStdOutLogger(cfg);case NULL:return new OmsNullLogger();case LOCAL_AND_ONLINE:return new OmsServerAndLocalLogger(cfg, instanceId, workerRuntime.getOmsLogHandler());default:return new OmsServerLogger(cfg, instanceId, workerRuntime.getOmsLogHandler());}}
}

默认logConfig为null,cfg是new LogConfig(),其build出来的是OmsServerLogger

OmsServerLogger

tech/powerjob/worker/log/impl/OmsServerLogger.java

public class OmsServerLogger extends AbstractOmsLogger {private final long instanceId;private final OmsLogHandler omsLogHandler;public OmsServerLogger(LogConfig logConfig, long instanceId, OmsLogHandler omsLogHandler) {super(logConfig);this.instanceId = instanceId;this.omsLogHandler = omsLogHandler;}@Overridepublic void debug0(String messagePattern, Object... args) {process(LogLevel.DEBUG, messagePattern, args);}@Overridepublic void info0(String messagePattern, Object... args) {process(LogLevel.INFO, messagePattern, args);}@Overridepublic void warn0(String messagePattern, Object... args) {process(LogLevel.WARN, messagePattern, args);}@Overridepublic void error0(String messagePattern, Object... args) {process(LogLevel.ERROR, messagePattern, args);}private void process(LogLevel level, String messagePattern, Object... args) {String logContent = genLogContent(messagePattern, args);omsLogHandler.submitLog(instanceId, level, logContent);}}

OmsServerLogger的process方法调用的是OmsLogHandler的submitLog方法

submitLog

tech/powerjob/worker/background/OmsLogHandler.java

@Slf4j
public class OmsLogHandler {private final String workerAddress;private final Transporter transporter;private final ServerDiscoveryService serverDiscoveryService;// 处理线程,需要通过线程池启动public final Runnable logSubmitter = new LogSubmitter();// 上报锁,只需要一个线程上报即可private final Lock reportLock = new ReentrantLock();// 生产者消费者模式,异步上传日志private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);// 每次上报携带的数据条数private static final int BATCH_SIZE = 20;// 本地囤积阈值private static final int REPORT_SIZE = 1024;public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {this.workerAddress = workerAddress;this.transporter = transporter;this.serverDiscoveryService = serverDiscoveryService;}/*** 提交日志* @param instanceId 任务实例ID* @param logContent 日志内容*/public void submitLog(long instanceId, LogLevel logLevel, String logContent) {if (logQueue.size() > REPORT_SIZE) {// 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁new Thread(logSubmitter).start();}InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);boolean offerRet = logQueue.offer(tuple);if (!offerRet) {log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);}}//......
}    

OmsLogHandler的submitLog方法每次先判断logQueue大小是否大于REPORT_SIZE(1024),是则启动logSubmitter线程,否则放入logQueue队列

LogSubmitter

tech/powerjob/worker/background/OmsLogHandler.java

    private class LogSubmitter implements Runnable {@Overridepublic void run() {boolean lockResult = reportLock.tryLock();if (!lockResult) {return;}try {final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();// 当前无可用 Serverif (StringUtils.isEmpty(currentServerAddress)) {if (!logQueue.isEmpty()) {logQueue.clear();log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");}return;}List<InstanceLogContent> logs = Lists.newLinkedList();while (!logQueue.isEmpty()) {try {InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);logs.add(logContent);if (logs.size() >= BATCH_SIZE) {WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));// 不可靠请求,WEB日志不追求极致TransportUtils.reportLogs(req, currentServerAddress, transporter);logs.clear();}}catch (Exception ignore) {break;}}if (!logs.isEmpty()) {WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);TransportUtils.reportLogs(req, currentServerAddress, transporter);}}finally {reportLock.unlock();}}}

LogSubmitter不断地从logQueue.poll数据,在logs的大小大于等于BATCH_SIZE(20)时通过TransportUtils.reportLogs给server上报日志

reportLogs

tech/powerjob/worker/common/utils/TransportUtils.java

    public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);transporter.tell(url, req);}

reportLogs请求S4W_HANDLER_REPORT_LOG

processWorkerLogReport

tech/powerjob/server/core/handler/AbWorkerRequestHandler.java

    @Override@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)public void processWorkerLogReport(WorkerLogReportReq req) {WorkerLogReportEvent event = new WorkerLogReportEvent().setWorkerAddress(req.getWorkerAddress()).setLogNum(req.getInstanceLogContents().size());try {processWorkerLogReport0(req, event);event.setStatus(WorkerLogReportEvent.Status.SUCCESS);} catch (RejectedExecutionException re) {event.setStatus(WorkerLogReportEvent.Status.REJECTED);} catch (Throwable t) {event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);log.warn("[WorkerRequestHandler] process worker report failed!", t);} finally {monitorService.monitor(event);}}

server端的processWorkerLogReport接收WorkerLogReportReq,执行processWorkerLogReport0方法

WorkerRequestHandlerImpl

tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java

    protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {// 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());}

WorkerRequestHandlerImpl的processWorkerLogReport0执行的是instanceLogService.submitLogs

submitLogs

tech/powerjob/server/core/instance/InstanceLogService.java

    @Async(value = PJThreadPool.LOCAL_DB_POOL)public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {List<LocalInstanceLogDO> logList = logs.stream().map(x -> {instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());LocalInstanceLogDO y = new LocalInstanceLogDO();BeanUtils.copyProperties(x, y);y.setWorkerAddress(workerAddress);return y;}).collect(Collectors.toList());try {CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));}catch (Exception e) {log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);}}

InstanceLogService的submitLogs是个异步方法,它将InstanceLogContent转换为LocalInstanceLogDO,然后执行localInstanceLogRepository.saveAll保存

LocalJpaConfig

tech/powerjob/server/persistence/config/LocalJpaConfig.java

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(// repository包名basePackages = LocalJpaConfig.LOCAL_PACKAGES,// 实体管理bean名称entityManagerFactoryRef = "localEntityManagerFactory",// 事务管理bean名称transactionManagerRef = "localTransactionManager"
)
public class LocalJpaConfig {public static final String LOCAL_PACKAGES = "tech.powerjob.server.persistence.local";private static Map<String, Object> genDatasourceProperties() {JpaProperties jpaProperties = new JpaProperties();jpaProperties.setOpenInView(false);jpaProperties.setShowSql(false);HibernateProperties hibernateProperties = new HibernateProperties();// 每次启动都删除数据(重启后原来的Instance已经通过故障转移更换了Server,老的日志数据也没什么意义了)hibernateProperties.setDdlAuto("create");return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());}@Bean(name = "localEntityManagerFactory")public LocalContainerEntityManagerFactoryBean initLocalEntityManagerFactory(@Qualifier("omsLocalDatasource") DataSource omsLocalDatasource,EntityManagerFactoryBuilder builder) {return builder.dataSource(omsLocalDatasource).properties(genDatasourceProperties()).packages(LOCAL_PACKAGES).persistenceUnit("localPersistenceUnit").build();}@Bean(name = "localTransactionManager")public PlatformTransactionManager initLocalTransactionManager(@Qualifier("localEntityManagerFactory") LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBean) {return new JpaTransactionManager(Objects.requireNonNull(localContainerEntityManagerFactoryBean.getObject()));}@Bean(name = "localTransactionTemplate")public TransactionTemplate initTransactionTemplate(@Qualifier("localTransactionManager") PlatformTransactionManager ptm) {TransactionTemplate tt =  new TransactionTemplate(ptm);// 设置隔离级别tt.setIsolationLevel(TransactionDefinition.ISOLATION_DEFAULT);return tt;}
}

LocalJpaConfig针对tech.powerjob.server.persistence.local的dao采用了omsLocalDatasource数据源

MultiDatasourceConfig

tech/powerjob/server/persistence/config/MultiDatasourceConfig.java

@Configuration
public class MultiDatasourceConfig {private static final String H2_DRIVER_CLASS_NAME = "org.h2.Driver";private static final String H2_JDBC_URL_PATTERN = "jdbc:h2:file:%spowerjob_server_db";private static final int H2_MIN_SIZE = 4;private static final int H2_MAX_ACTIVE_SIZE = 10;@Primary@Bean("omsRemoteDatasource")@ConfigurationProperties(prefix = "spring.datasource.core")public DataSource initOmsCoreDatasource() {return DataSourceBuilder.create().build();}@Bean("omsLocalDatasource")public DataSource initOmsLocalDatasource() {String h2Path = OmsFileUtils.genH2WorkPath();HikariConfig config = new HikariConfig();config.setDriverClassName(H2_DRIVER_CLASS_NAME);config.setJdbcUrl(String.format(H2_JDBC_URL_PATTERN, h2Path));config.setAutoCommit(true);// 池中最小空闲连接数量config.setMinimumIdle(H2_MIN_SIZE);// 池中最大连接数量config.setMaximumPoolSize(H2_MAX_ACTIVE_SIZE);// JVM 关闭时删除文件try {FileUtils.forceDeleteOnExit(new File(h2Path));}catch (Exception ignore) {}return new HikariDataSource(config);}
}

MultiDatasourceConfig定义了两个数据源,一个是远程的数据源,比如mysql,一个是本地的h2数据源

processFinishedInstance

tech/powerjob/server/core/instance/InstanceManager.java

    public void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) {log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());// 上报日志数据HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 60, TimeUnit.SECONDS);// workflow 特殊处理if (wfInstanceId != null) {// 手动停止在工作流中也认为是失败(理论上不应该发生)workflowInstanceManager.move(wfInstanceId, instanceId, status, result);}// 告警if (status == InstanceStatus.FAILED) {alert(instanceId, result);}// 主动移除缓存,减小内存占用instanceMetadataService.invalidateJobInfo(instanceId);}

InstanceManager的processFinishedInstance方法会延时60s执行instanceLogService.sync(instanceId)

sync

tech/powerjob/server/core/instance/InstanceLogService.java

    @Async(PJThreadPool.BACKGROUND_POOL)public void sync(Long instanceId) {Stopwatch sw = Stopwatch.createStarted();try {// 先持久化到本地文件File stableLogFile = genStableLogFile(instanceId);// 将文件推送到 MongoDBFileLocation dfsFL = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));try {dFsService.store(new StoreRequest().setLocalFile(stableLogFile).setFileLocation(dfsFL));log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop());}catch (Exception e) {log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e);}}catch (Exception e) {log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e);}// 删除本地数据库数据try {instanceId2LastReportTime.remove(instanceId);CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);}catch (Exception e) {log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);}}

InstanceLogService的sync方法先通过genStableLogFile将日志持久化到server端的本地日志文件,接着将该任务实例日志的元信息(哪个任务实例、在哪个server、本地日志文件的路径)存储到dFsService(它有oss、gridfs、minio、mysql四种实现,具体看server的配置文件是启动哪个,如果是mysql则是存储到powerjob_files表中),最后通过localInstanceLogRepository.deleteByInstanceId清空该任务实例在h2中的LOCAL_INSTANCE_LOG表的记录

genStableLogFile

    private File genStableLogFile(long instanceId) {String path = genLogFilePath(instanceId, true);int lockId = ("stFileLock-" + instanceId).hashCode();try {segmentLock.lockInterruptibleSafe(lockId);return localTransactionTemplate.execute(status -> {File f = new File(path);if (f.exists()) {return f;}try {// 创建父文件夹(文件在开流时自动会被创建)FileUtils.forceMkdirParent(f);// 本地存在数据,从本地持久化(对应 SYNC 的情况)if (instanceId2LastReportTime.containsKey(instanceId)) {try (Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {stream2File(allLogStream, f);}}else {FileLocation dfl = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));Optional<FileMeta> dflMetaOpt = dFsService.fetchFileMeta(dfl);if (!dflMetaOpt.isPresent()) {OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f);return f;}dFsService.download(new DownloadRequest().setTarget(f).setFileLocation(dfl));}return f;}catch (Exception e) {CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f));throw new RuntimeException(e);}});}finally {segmentLock.unlock(lockId);}}private static String genLogFilePath(long instanceId, boolean stable) {if (stable) {return OmsFileUtils.genLogDirPath() + String.format("%d-stable.log", instanceId);}else {return OmsFileUtils.genLogDirPath() + String.format("%d-temporary.log", instanceId);}}    

genStableLogFile它先判断该server是否有存储该任务实例的日志文件(~/powerjob/server/online_log/%d-stable.log),有则直接返回;否则判断该instanceId2LastReportTime是否包含该任务实例,包含则从localInstanceLogRepository拉取日志然后写入到文件;不包含则通过dFsService.fetchFileMeta拉取元信息,然后下载到本地再返回

相关表结构

LOCAL_INSTANCE_LOG

CREATE TABLE PUBLIC.LOCAL_INSTANCE_LOG (ID BIGINT NOT NULL AUTO_INCREMENT,INSTANCE_ID BIGINT,LOG_CONTENT CHARACTER VARYING,LOG_LEVEL INTEGER,LOG_TIME BIGINT,WORKER_ADDRESS CHARACTER VARYING(255),CONSTRAINT CONSTRAINT_8 PRIMARY KEY (ID)
);
CREATE INDEX IDXPJ6CD8W5EAW8QBKMD84I8KYS7 ON PUBLIC.LOCAL_INSTANCE_LOG (INSTANCE_ID);
CREATE UNIQUE INDEX PRIMARY_KEY_8 ON PUBLIC.LOCAL_INSTANCE_LOG (ID);

powerjob_files

CREATE TABLE `powerjob_files` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',`bucket` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '分桶',`name` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '文件名称',`version` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '版本',`meta` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '元数据',`length` bigint NOT NULL COMMENT '长度',`status` int NOT NULL COMMENT '状态',`data` longblob NOT NULL COMMENT '文件内容',`extra` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '其他信息',`gmt_create` datetime NOT NULL COMMENT '创建时间',`gmt_modified` datetime DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

小结

  • PowerJob的worker端的OmsServerLogger的process方法调用的是OmsLogHandler的submitLog方法,它每次先判断logQueue大小是否大于REPORT_SIZE(1024),是则启动logSubmitter线程,否则放入logQueue队列LogSubmitter不断地从logQueue.poll数据,在logs的大小大于等于BATCH_SIZE(20)时通过TransportUtils.reportLogs给server上报日志
  • server端的AbWorkerRequestHandler的processWorkerLogReport接收WorkerLogReportReq,执行processWorkerLogReport0方法,它执行的是instanceLogService.submitLogs;InstanceLogService的submitLogs是个异步方法,它将InstanceLogContent转换为LocalInstanceLogDO,然后执行localInstanceLogRepository.saveAll保存;server端有两份数据源,一份是mysql,一份是h2,而localInstanceLog存储到的是h2的LOCAL_INSTANCE_LOG表
  • 另外server端在任务实例结束时会执行InstanceManager的processFinishedInstance方法,它会延时60s执行instanceLogService.sync(instanceId);sync方法先通过genStableLogFile将日志持久化到server端的本地日志文件,接着将该任务实例日志的元信息(哪个任务实例、在哪个server、本地日志文件的路径)存储到dFsService(它有oss、gridfs、minio、mysql四种实现,具体看server的配置文件是启动哪个,如果是mysql则是存储到powerjob_files表中),最后通过localInstanceLogRepository.deleteByInstanceId清空该任务实例在h2中的LOCAL_INSTANCE_LOG表的记录

相关文章:

  • dockerpipwork相关测试过程
  • Faster-Whisper 实时识别电脑语音转文本
  • web应用课——(第四讲:中期项目——拳皇)
  • 【PHP源码】熊猫乐园签到,任务,玩游戏一键完成源代码
  • unittest、nosetest、pytest
  • 【Tomcat与网络4】Tomcat的连接器设计
  • CSC联合培养博士申请亲历|联系外导的详细过程
  • 栈的应用:括号匹配问题_有效的括号
  • 防御保护---防火墙的智能选路
  • 机器学习入门-----sklearn
  • 《幻兽帕鲁》好玩吗?幻兽帕鲁能在Mac上运行吗?
  • torch训练简单例子
  • C语言入门到精通之练习37:输入3个数a,b,c,按大小顺序输出。
  • AES加密原理
  • LeetCode 每日一题 2024/1/29-2024/2/4
  • 分享一款快速APP功能测试工具
  • 《Javascript数据结构和算法》笔记-「字典和散列表」
  • Angular 响应式表单之下拉框
  • Hibernate最全面试题
  • JS创建对象模式及其对象原型链探究(一):Object模式
  • Laravel核心解读--Facades
  • Making An Indicator With Pure CSS
  • MYSQL 的 IF 函数
  • Next.js之基础概念(二)
  • PermissionScope Swift4 兼容问题
  • Spring Security中异常上抛机制及对于转型处理的一些感悟
  • SpriteKit 技巧之添加背景图片
  • 第十八天-企业应用架构模式-基本模式
  • 给第三方使用接口的 URL 签名实现
  • 给自己的博客网站加上酷炫的初音未来音乐游戏?
  • 数据科学 第 3 章 11 字符串处理
  • 网页视频流m3u8/ts视频下载
  • 用 Swift 编写面向协议的视图
  • d²y/dx²; 偏导数问题 请问f1 f2是什么意思
  • 阿里云IoT边缘计算助力企业零改造实现远程运维 ...
  • ​Z时代时尚SUV新宠:起亚赛图斯值不值得年轻人买?
  • ​什么是bug?bug的源头在哪里?
  • #if和#ifdef区别
  • #Z2294. 打印树的直径
  • ( 10 )MySQL中的外键
  • (2015)JS ES6 必知的十个 特性
  • (附源码)ssm基于jsp的在线点餐系统 毕业设计 111016
  • (十八)三元表达式和列表解析
  • (算法)求1到1亿间的质数或素数
  • (五)MySQL的备份及恢复
  • . Flume面试题
  • .desktop 桌面快捷_Linux桌面环境那么多,这几款优秀的任你选
  • .Family_物联网
  • .Net CF下精确的计时器
  • .NET MVC第三章、三种传值方式
  • .net refrector
  • .NET 除了用 Task 之外,如何自己写一个可以 await 的对象?
  • .NET 将混合了多个不同平台(Windows Mac Linux)的文件 目录的路径格式化成同一个平台下的路径
  • .net6+aspose.words导出word并转pdf
  • .net经典笔试题