Skip to content

Commit

Permalink
Merge pull request #10 from qq254963746/develop
Browse files Browse the repository at this point in the history
添加 mysql JobLogger 实现
  • Loading branch information
qq254963746 committed May 21, 2015
2 parents bff5325 + 94b4881 commit 0a5eab2
Show file tree
Hide file tree
Showing 17 changed files with 243 additions and 29 deletions.
13 changes: 10 additions & 3 deletions job-core/src/main/java/com/lts/job/core/file/FileUtils.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.lts.job.core.file;

import java.io.File;
import java.io.IOException;
import java.io.*;

/**
* @author Robert HG ([email protected]) on 3/6/15.
Expand Down Expand Up @@ -32,5 +31,13 @@ public static File createDirIfNotExist(String path) {
return file;
}


public static String read(InputStream is) throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(is));
StringBuilder createTableSql = new StringBuilder();
String data = null;
while ((data = br.readLine()) != null) {
createTableSql.append(data);
}
return createTableSql.toString();
}
}
15 changes: 15 additions & 0 deletions job-core/src/main/java/com/lts/job/core/util/JSONUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
public class JSONUtils {

public static <T> T parse(String json, Type type) {
if (StringUtils.isEmpty(json)) {
return null;
}
return (T) JSONObject.parseObject(json, type);
}

Expand All @@ -24,18 +27,30 @@ public static <T> T parse(String json, TypeReference<T> type) {
}

public static String toJSONString(Object obj) {
if (obj == null) {
return null;
}
return JSONObject.toJSONString(obj);
}

public static JSONObject toJSON(Object obj) {
if (obj == null) {
return null;
}
return (JSONObject) JSONObject.toJSON(obj);
}

public static JSONArray parseArray(String obj) {
if (obj == null) {
return null;
}
return JSON.parseArray(obj);
}

public static JSONObject parseObject(String obj) {
if (obj == null) {
return null;
}
return JSON.parseObject(obj);
}

Expand Down
5 changes: 5 additions & 0 deletions job-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>job-logger-console</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.lts</groupId>
<artifactId>job-logger-mysql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.lts</groupId>
<artifactId>job-queue-mongo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static void console() throws IOException {
// jobClient.setJobInfoSavePath(Constants.USER_HOME);
jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl());
jobClient.addMasterChangeListener(new MasterChangeListenerImpl());
jobClient.setLoadBalance("consistenthash");
// jobClient.setLoadBalance("consistenthash");
jobClient.start();

JobClientTest jobClientTest = new JobClientTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void run() {
}

/**
* 使用mongo做任务队列
* 使用mysql做任务队列
*/
public static void testMysqlQueue() {
final JobTracker jobTracker = new JobTracker();
Expand All @@ -65,8 +65,8 @@ public static void testMysqlQueue() {

jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());

// 设置业务日志记录
// jobTracker.addConfig("job.logger", "mongo");
// 设置业务日志记录 mysql
jobTracker.addConfig("job.logger", "mysql");
// 任务队列用mysql
jobTracker.addConfig("job.queue", "mysql");
// mysql 配置
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.lts.job.biz.logger;

/**
* @author Robert HG ([email protected]) on 5/21/15.
*/
public class JobLogException extends RuntimeException {

public JobLogException() {
super();
}

public JobLogException(String message) {
super(message);
}

public JobLogException(String message, Throwable cause) {
super(message, cause);
}

public JobLogException(Throwable cause) {
super(cause);
}

protected JobLogException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.lts.job.biz.logger;

import com.lts.job.biz.logger.domain.BizLogPo;
import com.lts.job.biz.logger.domain.JobLogPo;
import com.lts.job.biz.logger.domain.LogType;

/**
* @author Robert HG ([email protected]) on 5/21/15.
*/
public class JobLogUtils {

public static JobLogPo bizConvert(BizLogPo bizLogPo) {
if (bizLogPo == null) {
return null;
}
JobLogPo jobLogPo = new JobLogPo();
jobLogPo.setTimestamp(bizLogPo.getTimestamp());
jobLogPo.setTaskTrackerNodeGroup(bizLogPo.getTaskTrackerNodeGroup());
jobLogPo.setTaskTrackerIdentity(bizLogPo.getTaskTrackerIdentity());
jobLogPo.setJobId(bizLogPo.getJobId());
jobLogPo.setMsg(bizLogPo.getMsg());
jobLogPo.setLevel(bizLogPo.getLevel());
jobLogPo.setLogType(LogType.BIZ);
return jobLogPo;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public enum LogType {

FINISHED, // 任务执行完成
RESEND, // TaskTracker 重新发送的任务执行结果
FIXED_DEAD // 修复死掉的任务
FIXED_DEAD, // 修复死掉的任务
BIZ // 业务日志
;
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.lts.job.biz.logger.mongo;


import com.lts.job.biz.logger.JobLogUtils;
import com.lts.job.biz.logger.JobLogger;
import com.lts.job.biz.logger.domain.BizLogPo;
import com.lts.job.biz.logger.domain.JobLogPo;
import com.lts.job.biz.logger.domain.LogType;
import com.lts.job.core.cluster.Config;
import com.lts.job.store.mongo.AbstractMongoRepository;

Expand All @@ -23,13 +25,7 @@ public void log(JobLogPo jobLogPo) {

@Override
public void log(BizLogPo bizLogPo) {
JobLogPo jobLogPo = new JobLogPo();
jobLogPo.setTimestamp(bizLogPo.getTimestamp());
jobLogPo.setTaskTrackerNodeGroup(bizLogPo.getTaskTrackerNodeGroup());
jobLogPo.setTaskTrackerIdentity(bizLogPo.getTaskTrackerIdentity());
jobLogPo.setJobId(bizLogPo.getJobId());
jobLogPo.setMsg(bizLogPo.getMsg());
ds.save(jobLogPo);
ds.save(JobLogUtils.bizConvert(bizLogPo));
}

}
30 changes: 30 additions & 0 deletions job-logger/job-logger-mysql/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>job-logger</artifactId>
<groupId>com.lts</groupId>
<version>1.4.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>job-logger-mysql</artifactId>

<dependencies>
<dependency>
<groupId>com.lts</groupId>
<artifactId>job-logger-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.lts.job.biz.logger.mysql;

import com.lts.job.biz.logger.JobLogException;
import com.lts.job.biz.logger.JobLogUtils;
import com.lts.job.biz.logger.JobLogger;
import com.lts.job.biz.logger.domain.BizLogPo;
import com.lts.job.biz.logger.domain.JobLogPo;
import com.lts.job.core.cluster.Config;
import com.lts.job.core.file.FileUtils;
import com.lts.job.core.util.JSONUtils;
import com.lts.job.store.jdbc.JdbcRepository;

import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;

/**
* @author Robert HG ([email protected]) on 5/21/15.
*/
public class MysqlJobLogger extends JdbcRepository implements JobLogger {

public MysqlJobLogger(Config config) {
super(config);
doCreateTable();
}

@Override
public void log(JobLogPo jobLogPo) {
if(jobLogPo == null){
return ;
}
String sql = "INSERT INTO `lts_job_log_po` (`timestamp`, `log_type`, `success`, `msg`" +
", `code`, `task_tracker_identity`, `level`, `task_id`, `job_id`" +
", `priority`, `submit_node_group`, `task_tracker_node_group`, `ext_params`, `needFeedback`" +
", `cron_expression`, `trigger_time`)" +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

try {
getSqlTemplate().update(sql,
jobLogPo.getTimestamp(),
jobLogPo.getLogType().name(),
jobLogPo.isSuccess(),
jobLogPo.getMsg(),
jobLogPo.getCode(),
jobLogPo.getTaskTrackerIdentity(),
jobLogPo.getLevel().name(),
jobLogPo.getTaskId(),
jobLogPo.getJobId(),
jobLogPo.getPriority(),
jobLogPo.getSubmitNodeGroup(),
jobLogPo.getTaskTrackerNodeGroup(),
JSONUtils.toJSONString(jobLogPo.getExtParams()),
jobLogPo.isNeedFeedback(),
jobLogPo.getCronExpression(),
jobLogPo.getTriggerTime()
);
} catch (SQLException e) {
throw new JobLogException(e.getMessage(), e);
}
}

@Override
public void log(BizLogPo bizLogPo) {
log(JobLogUtils.bizConvert(bizLogPo));
}

private void doCreateTable() {
// 创建表
try {
InputStream is = this.getClass().getClassLoader().getResourceAsStream("sql/lts_job_log_po.sql");
getSqlTemplate().update(FileUtils.read(is));
} catch (SQLException e) {
throw new RuntimeException("create table error!", e);
} catch (IOException e) {
throw new RuntimeException("create table error!", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.lts.job.biz.logger.mysql;

import com.lts.job.biz.logger.JobLogger;
import com.lts.job.biz.logger.JobLoggerFactory;
import com.lts.job.core.cluster.Config;

/**
* @author Robert HG ([email protected]) on 5/21/15.
*/
public class MysqlJobLoggerFactory implements JobLoggerFactory {
@Override
public JobLogger getJobLogger(Config config) {
return new MysqlJobLogger(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mysql=com.lts.job.biz.logger.mysql.MysqlJobLoggerFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE TABLE IF NOT EXISTS `lts_job_log_po` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`timestamp` bigint(20) DEFAULT NULL COMMENT '日志记录时间',
`log_type` varchar(32) DEFAULT NULL COMMENT '日志类型',
`success` tinyint(11) DEFAULT NULL COMMENT '成功与否',
`msg` text COMMENT '消息',
`code` varchar(32) DEFAULT NULL COMMENT '消息编码',
`task_tracker_identity` varchar(64) DEFAULT NULL COMMENT '执行节点唯一标识',
`level` varchar(32) DEFAULT NULL COMMENT '日志记录级别',
`task_id` varchar(64) DEFAULT NULL COMMENT '客户端ID',
`job_id` varchar(64) DEFAULT '' COMMENT '服务端生成ID',
`priority` int(11) DEFAULT NULL COMMENT '优先级',
`submit_node_group` varchar(64) DEFAULT NULL COMMENT '提交节点group',
`task_tracker_node_group` varchar(64) DEFAULT NULL COMMENT '执行节点group',
`ext_params` text COMMENT '额外参数',
`needFeedback` tinyint(4) DEFAULT NULL COMMENT '是否需要反馈',
`cron_expression` varchar(32) DEFAULT NULL COMMENT 'cron表达式',
`trigger_time` bigint(20) DEFAULT NULL COMMENT '触发时间',
PRIMARY KEY (`id`),
KEY `timestamp` (`timestamp`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1 change: 1 addition & 0 deletions job-logger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<module>job-logger-console</module>
<module>job-logger-mongo</module>
<module>job-logger-api</module>
<module>job-logger-mysql</module>
</modules>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.alibaba.fastjson.TypeReference;
import com.lts.job.core.cluster.Config;
import com.lts.job.core.domain.JobResult;
import com.lts.job.core.file.FileUtils;
import com.lts.job.core.util.CollectionUtils;
import com.lts.job.core.util.JSONUtils;
import com.lts.job.queue.JobFeedbackQueue;
Expand Down Expand Up @@ -34,13 +35,7 @@ private void doCreateTable() {
// 创建表
try {
InputStream is = this.getClass().getClassLoader().getResourceAsStream("sql/lts_job_feedback_po.sql");
BufferedReader br = new BufferedReader(new InputStreamReader(is));
StringBuilder createTableSql = new StringBuilder();
String data = null;
while ((data = br.readLine()) != null) {
createTableSql.append(data);
}
getSqlTemplate().update(createTableSql.toString());
getSqlTemplate().update(FileUtils.read(is));
} catch (SQLException e) {
throw new RuntimeException("create table error!", e);
} catch (IOException e) {
Expand Down
Loading

0 comments on commit 0a5eab2

Please sign in to comment.