Skip to content

Commit

Permalink
Merge pull request #21 from qq254963746/develop
Browse files Browse the repository at this point in the history
采用CAS 来减小 mysql  queue 情况下 dead lock 的概率
  • Loading branch information
qq254963746 committed Jun 3, 2015
2 parents 69d701e + b551fd4 commit 13d927b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public class JobTrackerTest {
public static void main(String[] args) {

// 1. 使用mongo做任务队列
testMongoQueue();
// testMongoQueue();
// 2. 使用mysql做任务队列
// testMysqlQueue();
testMysqlQueue();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void run(Job job) throws Throwable {
bizLogger.info("测试,业务日志啊啊啊啊啊");

try {
Thread.sleep(1000L);
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
import com.lts.job.queue.domain.JobPo;
import com.lts.job.queue.exception.JobQueueException;
import com.lts.job.queue.mysql.support.ResultSetHandlerHolder;
import com.lts.job.store.jdbc.SqlExecutor;

import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

Expand Down Expand Up @@ -62,41 +60,83 @@ public boolean add(JobPo jobPo) {

@Override
public JobPo take(final String taskTrackerNodeGroup, final String taskTrackerIdentity) {
return getSqlTemplate().executeInTransaction(new SqlExecutor<JobPo>() {
@Override
public JobPo run(Connection conn) throws SQLException {
Long now = System.currentTimeMillis();
// select for update
String selectForUpdateSql = "SELECT *" +
" FROM `{tableName}` " +
" WHERE is_running = ? " +
" AND `trigger_time` < ? " +
" ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
" LIMIT 0, 1 FOR UPDATE";
Object[] selectParams = new Object[]{false, now};
JobPo jobPo = getSqlTemplate().query(conn, getRealSql(selectForUpdateSql, taskTrackerNodeGroup),
ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams
);
if (jobPo != null) {
String updateSql = "UPDATE `{tableName}` SET " +
"`is_running` = ?, " +
"`task_tracker_identity` = ?, " +
"`gmt_modified` = ?," +
"`prev_exe_time` = ? " +
" WHERE job_id = ?";
Object[] params = new Object[]{
true, taskTrackerIdentity, now, now, jobPo.getJobId()
};
getSqlTemplate().update(conn, getRealSql(updateSql, taskTrackerNodeGroup), params);
/**
* 这里从SELECT FOR UPDATE 优化为 CAS 乐观锁
*/
Long now = DateUtils.currentTimeMillis();
String selectSql = "SELECT *" +
" FROM `{tableName}` " +
" WHERE is_running = ? " +
" AND `trigger_time` < ? " +
" ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
" LIMIT 0, 1";
Object[] selectParams = new Object[]{false, now};

jobPo.setIsRunning(true);
jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
jobPo.setGmtModified(now);
jobPo.setPrevExeTime(now);
}
try {
JobPo jobPo = getSqlTemplate().query(getRealSql(selectSql, taskTrackerNodeGroup),
ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams);
if (jobPo == null) {
return null;
}

String updateSql = "UPDATE `{tableName}` SET " +
"`is_running` = ?, " +
"`task_tracker_identity` = ?, " +
"`gmt_modified` = ?," +
"`prev_exe_time` = ? " +
" WHERE job_id = ? AND is_running = ?";
Object[] params = new Object[]{
true, taskTrackerIdentity, now, now, jobPo.getJobId(), false
};
// 返回影响的行数
int affectedRow = getSqlTemplate().update(getRealSql(updateSql, taskTrackerNodeGroup), params);
if (affectedRow == 0) {
return take(taskTrackerNodeGroup, taskTrackerIdentity);
} else {
jobPo.setIsRunning(true);
jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
jobPo.setGmtModified(now);
jobPo.setPrevExeTime(now);
return jobPo;
}
});
} catch (SQLException e) {
throw new JobQueueException(e);
}
// return getSqlTemplate().executeInTransaction(new SqlExecutor<JobPo>() {
// @Override
// public JobPo run(Connection conn) throws SQLException {
// Long now = System.currentTimeMillis();
// // select for update
// String selectForUpdateSql = "SELECT *" +
// " FROM `{tableName}` " +
// " WHERE is_running = ? " +
// " AND `trigger_time` < ? " +
// " ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
// " LIMIT 0, 1 FOR UPDATE";
// Object[] selectParams = new Object[]{false, now};
// JobPo jobPo = getSqlTemplate().query(conn, getRealSql(selectForUpdateSql, taskTrackerNodeGroup),
// ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams
// );
// if (jobPo != null) {
// String updateSql = "UPDATE `{tableName}` SET " +
// "`is_running` = ?, " +
// "`task_tracker_identity` = ?, " +
// "`gmt_modified` = ?," +
// "`prev_exe_time` = ? " +
// " WHERE job_id = ?";
// Object[] params = new Object[]{
// true, taskTrackerIdentity, now, now, jobPo.getJobId()
// };
// getSqlTemplate().update(conn, getRealSql(updateSql, taskTrackerNodeGroup), params);
//
// jobPo.setIsRunning(true);
// jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
// jobPo.setGmtModified(now);
// jobPo.setPrevExeTime(now);
// }
// return jobPo;
// }
// });
}

@Override
Expand Down

0 comments on commit 13d927b

Please sign in to comment.