Skip to content

Commit

Permalink
Merge pull request #12 from qq254963746/develop
Browse files Browse the repository at this point in the history
抽象 FailStore 接口,后面添加其他实现, 后面计划,优化任务队列模型, 提交处理性能
  • Loading branch information
qq254963746 committed May 22, 2015
2 parents 599998d + 404b3df commit 0a91431
Show file tree
Hide file tree
Showing 20 changed files with 305 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public Response submitJob(List<Job> jobs) throws JobSubmitException {
}
response.setSuccess(true);
response.setCode(ResponseCode.FAILED_AND_SAVE_FILE);
response.setMsg(response.getMsg() + ", but save local fail store and send later !");
} catch (Exception e) {
response.setSuccess(false);
response.setMsg(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ protected void innerStart() {

NettyRequestProcessor defaultProcessor = getDefaultProcessor();
if (defaultProcessor != null) {

int processorSize = config.getParameter(Constants.PROCESSOR_THREAD, Constants.DEFAULT_PROCESSOR_THREAD);
remotingClient.registerDefaultProcessor(defaultProcessor,
Executors.newFixedThreadPool(32 + Constants.AVAILABLE_PROCESSOR * 5,
Executors.newFixedThreadPool(processorSize,
new NamedThreadFactory(AbstractClientNode.class.getSimpleName())));
}
}
Expand All @@ -59,9 +59,9 @@ public void setNodeGroup(String nodeGroup) {
config.setNodeGroup(nodeGroup);
}

public void setJobInfoSavePath(String jobInfoSavePath) {
if (StringUtils.isNotEmpty(jobInfoSavePath)) {
config.setJobInfoSavePath(jobInfoSavePath);
public void setFailStorePath(String failStorePath) {
if (StringUtils.isNotEmpty(failStorePath)) {
config.setFailStorePath(failStorePath);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ protected void innerStart() {

NettyRequestProcessor defaultProcessor = getDefaultProcessor();
if (defaultProcessor != null) {
int processorSize = config.getParameter(Constants.PROCESSOR_THREAD, Constants.DEFAULT_PROCESSOR_THREAD);
remotingServer.registerDefaultProcessor(defaultProcessor,
Executors.newFixedThreadPool(32 + Constants.AVAILABLE_PROCESSOR * 5, new NamedThreadFactory(AbstractServerNode.class.getSimpleName())));
Executors.newFixedThreadPool(processorSize, new NamedThreadFactory(AbstractServerNode.class.getSimpleName())));
}
}

Expand Down
20 changes: 11 additions & 9 deletions job-core/src/main/java/com/lts/job/core/cluster/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class Config {
// 监听端口
private int listenPort;
// 任务信息存储路径(譬如TaskTracker反馈任务信息给JobTracker, JobTracker down掉了, 那么存储下来等待JobTracker可用时再发送)
private String jobInfoSavePath;
private String failStorePath;
// 集群名字
private String clusterName;
// java编译器
Expand Down Expand Up @@ -105,16 +105,12 @@ public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}

public String getJobInfoSavePath() {
return jobInfoSavePath;
public void setFailStorePath(String failStorePath) {
this.failStorePath = failStorePath;
}

public void setJobInfoSavePath(String jobInfoSavePath) {
this.jobInfoSavePath = jobInfoSavePath + "/.lts";
}

public String getFilePath() {
return jobInfoSavePath + "/" + nodeType + "/" + nodeGroup + "/";
public String getFailStorePath() {
return failStorePath + "/.lts" + "/" + nodeType + "/" + nodeGroup + "/";
}

public boolean isAvailable() {
Expand All @@ -140,19 +136,22 @@ public String getParameter(String key, String defaultValue) {
}
return value;
}

private Map<String, Number> getNumbers() {
if (numbers == null) { // 允许并发重复创建
numbers = new ConcurrentHashMap<String, Number>();
}
return numbers;
}

public boolean getParameter(String key, boolean defaultValue) {
String value = getParameter(key);
if (value == null || value.length() == 0) {
return defaultValue;
}
return Boolean.parseBoolean(value);
}

public int getParameter(String key, int defaultValue) {
Number n = getNumbers().get(key);
if (n != null) {
Expand All @@ -166,13 +165,15 @@ public int getParameter(String key, int defaultValue) {
getNumbers().put(key, i);
return i;
}

public String[] getParameter(String key, String[] defaultValue) {
String value = getParameter(key);
if (value == null || value.length() == 0) {
return defaultValue;
}
return Constants.COMMA_SPLIT_PATTERN.split(value);
}

public double getParameter(String key, double defaultValue) {
Number n = getNumbers().get(key);
if (n != null) {
Expand All @@ -186,6 +187,7 @@ public double getParameter(String key, double defaultValue) {
getNumbers().put(key, d);
return d;
}

public float getParameter(String key, float defaultValue) {
Number n = getNumbers().get(key);
if (n != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,7 @@ public interface Constants {
// 客户端提交并发请求size
public static final String JOB_SUBMIT_CONCURRENCY_SIZE = "job.submit.concurrency.size";
public static final int DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE = Constants.AVAILABLE_PROCESSOR * 4;

public static final String PROCESSOR_THREAD = "job.processor.thread";
public static final int DEFAULT_PROCESSOR_THREAD = 32 + AVAILABLE_PROCESSOR * 5;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static Config getDefaultConfig() {
config.setRegistryAddress("zookeeper://127.0.0.1:2181");
config.setInvokeTimeoutMillis(1000 * 6);
config.setListenPort(0);
config.setJobInfoSavePath(Constants.USER_HOME);
config.setFailStorePath(Constants.USER_HOME);
config.setClusterName(Constants.DEFAULT_CLUSTER_NAME);
return config;
}
Expand Down
26 changes: 26 additions & 0 deletions job-core/src/main/java/com/lts/job/core/failstore/FailStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.lts.job.core.failstore;

import com.lts.job.core.domain.KVPair;

import java.lang.reflect.Type;
import java.util.List;

/**
* Created by hugui on 5/21/15.
*/
public interface FailStore {

public void open() throws FailStoreException;

public void put(String key, Object value) throws FailStoreException;

public void delete(String key) throws FailStoreException;

public void delete(List<String> keys) throws FailStoreException;

public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStoreException;

public void close() throws FailStoreException;

public void destroy() throws FailStoreException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.lts.job.core.failstore;

/**
* Created by hugui on 5/21/15.
*/
public class FailStoreException extends Exception {

public FailStoreException(String message) {
super(message);
}

public FailStoreException(String message, Throwable cause) {
super(message, cause);
}

public FailStoreException(Throwable cause) {
super(cause);
}

protected FailStoreException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.lts.job.core.failstore;

import com.lts.job.core.cluster.Config;
import com.lts.job.core.extension.Adaptive;
import com.lts.job.core.extension.SPI;

/**
* Created by hugui on 5/21/15.
*/
@SPI("leveldb")
public interface FailStoreFactory {

@Adaptive("job.fail.store")
public FailStore getFailStore(Config config);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.lts.job.core.failstore.leveldb;

import com.lts.job.core.cluster.Config;
import com.lts.job.core.domain.KVPair;
import com.lts.job.core.failstore.FailStore;
import com.lts.job.core.failstore.FailStoreException;
import com.lts.job.core.file.FileAccessor;
import com.lts.job.core.file.FileException;
import com.lts.job.core.file.FileUtils;
import com.lts.job.core.util.JSONUtils;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;

/**
* Created by hugui on 5/21/15.
*/
public class LeveldbFailStore implements FailStore {

// 文件锁 (同一时间只能有一个线程在 检查提交失败的任务)
private FileAccessor dbLock;
/**
* 数据库目录
*/
private File dbPath;

private DB db;

private Options options;

public LeveldbFailStore(Config config) {
dbPath = FileUtils.createDirIfNotExist(config.getFailStorePath());
options = new Options();
try {
dbLock = new FileAccessor(config.getFailStorePath() + "___db.lock");
dbLock.createIfNotExist();
} catch (FileException e) {
throw new RuntimeException(e);
}
}

@Override
public void open() throws FailStoreException {
dbLock.tryLock();
try {
db = JniDBFactory.factory.open(dbPath, options);
} catch (IOException e) {
throw new FailStoreException(e);
}
}

@Override
public void put(String key, Object value) throws FailStoreException {
String valueString = JSONUtils.toJSONString(value);
db.put(key.getBytes(), valueString.getBytes());
}

@Override
public void delete(String key) throws FailStoreException {
if (key == null) {
return;
}
db.delete(key.getBytes());
}

@Override
public void delete(List<String> keys) throws FailStoreException {
if (keys == null || keys.size() == 0) {
return;
}
for (String key : keys) {
delete(key);
}
}

@Override
public <T> List<KVPair<String, T>> fetchTop(int size, Type type) {
List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
DBIterator iterator = db.iterator();
for (iterator.seekToLast(); iterator.hasPrev(); iterator.prev()) {
String key = new String(iterator.peekPrev().getKey());
T value = JSONUtils.parse(new String(iterator.peekPrev().getValue()), type);
KVPair<String, T> pair = new KVPair<String, T>(key, value);
list.add(pair);
if (list.size() >= size) {
break;
}
}
return list;
}

@Override
public void close() throws FailStoreException {
try {
db.close();
dbLock.unlock();
} catch (IOException e) {
throw new FailStoreException(e);
}
}

public void destroy() throws FailStoreException {
try {
JniDBFactory.factory.destroy(dbPath, options);
dbLock.delete();
} catch (IOException e) {
throw new FailStoreException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.lts.job.core.failstore.leveldb;

import com.lts.job.core.cluster.Config;
import com.lts.job.core.failstore.FailStore;
import com.lts.job.core.failstore.FailStoreFactory;

/**
* Created by hugui on 5/21/15.
*/
public class LeveldbFailStoreFactory implements FailStoreFactory {
@Override
public FailStore getFailStore(Config config) {
return new LeveldbFailStore(config);
}
}
14 changes: 14 additions & 0 deletions job-core/src/main/java/com/lts/job/core/support/LevelDBStore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.lts.job.core.support;

import com.lts.job.core.domain.KVPair;
import com.lts.job.core.file.FileAccessor;
import com.lts.job.core.file.FileException;
import com.lts.job.core.file.FileUtils;
import com.lts.job.core.util.JSONUtils;
import org.fusesource.leveldbjni.JniDBFactory;
Expand All @@ -16,10 +18,13 @@

/**
* LevelDB 存储
*
* @author Robert HG ([email protected]) on 3/6/15.
*/
public class LevelDBStore {

// 文件锁 (同一时间只能有一个线程在 检查提交失败的任务)
private FileAccessor dbLock;
/**
* 数据库目录
*/
Expand All @@ -32,9 +37,16 @@ public class LevelDBStore {
public LevelDBStore(String path) {
dbPath = FileUtils.createDirIfNotExist(path);
options = new Options();
try {
dbLock = new FileAccessor(path + "___db.lock");
dbLock.createIfNotExist();
} catch (FileException e) {
throw new RuntimeException(e);
}
}

public void open() throws IOException {
dbLock.tryLock();
db = JniDBFactory.factory.open(dbPath, options);
}

Expand Down Expand Up @@ -91,10 +103,12 @@ public <T> List<KVPair<String, T>> getList(int size, Type type) {

public void close() throws IOException {
db.close();
dbLock.unlock();
}

public void destroy() throws IOException {
JniDBFactory.factory.destroy(dbPath, options);
dbLock.delete();
}

}
Loading

0 comments on commit 0a91431

Please sign in to comment.