From 404b3dfdb501516261fd4f47fb437e3224fb2262 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E8=B4=B5?= Date: Fri, 22 May 2015 15:49:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=BD=E8=B1=A1=20FailStore=20=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=EF=BC=8C=E5=90=8E=E9=9D=A2=E6=B7=BB=E5=8A=A0=E5=85=B6?= =?UTF-8?q?=E4=BB=96=E5=AE=9E=E7=8E=B0,=20=E5=90=8E=E9=9D=A2=E8=AE=A1?= =?UTF-8?q?=E5=88=92=EF=BC=8C=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1=E9=98=9F?= =?UTF-8?q?=E5=88=97=E6=A8=A1=E5=9E=8B,=20=E6=8F=90=E4=BA=A4=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/lts/job/client/RetryJobClient.java | 1 + .../job/core/cluster/AbstractClientNode.java | 10 +- .../job/core/cluster/AbstractServerNode.java | 3 +- .../java/com/lts/job/core/cluster/Config.java | 20 +-- .../com/lts/job/core/constant/Constants.java | 3 + .../core/factory/JobNodeConfigFactory.java | 2 +- .../com/lts/job/core/failstore/FailStore.java | 26 ++++ .../core/failstore/FailStoreException.java | 23 ++++ .../job/core/failstore/FailStoreFactory.java | 16 +++ .../failstore/leveldb/LeveldbFailStore.java | 117 ++++++++++++++++++ .../leveldb/LeveldbFailStoreFactory.java | 15 +++ .../lts/job/core/support/LevelDBStore.java | 14 +++ .../lts/job/core/support/RetryScheduler.java | 76 +++++------- ...om.lts.job.core.failstore.FailStoreFactory | 1 + .../lts/job/example/api/JobClientTest.java | 4 +- .../lts/job/example/api/TaskTrackerTest.java | 2 +- .../lts/job/spring/JobClientFactoryBean.java | 8 +- .../job/spring/TaskTrackerFactoryBean.java | 8 +- .../tracker/processor/JobPullProcessor.java | 28 +---- .../job/tracker/support/JobDistributor.java | 26 +++- 20 files changed, 305 insertions(+), 98 deletions(-) create mode 100644 job-core/src/main/java/com/lts/job/core/failstore/FailStore.java create mode 100644 job-core/src/main/java/com/lts/job/core/failstore/FailStoreException.java create mode 100644 job-core/src/main/java/com/lts/job/core/failstore/FailStoreFactory.java create mode 100644 job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStore.java create mode 100644 job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStoreFactory.java create mode 100644 job-core/src/main/resources/META-INF/lts/internal/com.lts.job.core.failstore.FailStoreFactory diff --git a/job-client/src/main/java/com/lts/job/client/RetryJobClient.java b/job-client/src/main/java/com/lts/job/client/RetryJobClient.java index 8e84f9014..d9bb644d0 100644 --- a/job-client/src/main/java/com/lts/job/client/RetryJobClient.java +++ b/job-client/src/main/java/com/lts/job/client/RetryJobClient.java @@ -64,6 +64,7 @@ public Response submitJob(List jobs) throws JobSubmitException { } response.setSuccess(true); response.setCode(ResponseCode.FAILED_AND_SAVE_FILE); + response.setMsg(response.getMsg() + ", but save local fail store and send later !"); } catch (Exception e) { response.setSuccess(false); response.setMsg(e.getMessage()); diff --git a/job-core/src/main/java/com/lts/job/core/cluster/AbstractClientNode.java b/job-core/src/main/java/com/lts/job/core/cluster/AbstractClientNode.java index 3028463d6..9e0864397 100644 --- a/job-core/src/main/java/com/lts/job/core/cluster/AbstractClientNode.java +++ b/job-core/src/main/java/com/lts/job/core/cluster/AbstractClientNode.java @@ -31,9 +31,9 @@ protected void innerStart() { NettyRequestProcessor defaultProcessor = getDefaultProcessor(); if (defaultProcessor != null) { - + int processorSize = config.getParameter(Constants.PROCESSOR_THREAD, Constants.DEFAULT_PROCESSOR_THREAD); remotingClient.registerDefaultProcessor(defaultProcessor, - Executors.newFixedThreadPool(32 + Constants.AVAILABLE_PROCESSOR * 5, + Executors.newFixedThreadPool(processorSize, new NamedThreadFactory(AbstractClientNode.class.getSimpleName()))); } } @@ -59,9 +59,9 @@ public void setNodeGroup(String nodeGroup) { config.setNodeGroup(nodeGroup); } - public void setJobInfoSavePath(String jobInfoSavePath) { - if (StringUtils.isNotEmpty(jobInfoSavePath)) { - config.setJobInfoSavePath(jobInfoSavePath); + public void setFailStorePath(String failStorePath) { + if (StringUtils.isNotEmpty(failStorePath)) { + config.setFailStorePath(failStorePath); } } diff --git a/job-core/src/main/java/com/lts/job/core/cluster/AbstractServerNode.java b/job-core/src/main/java/com/lts/job/core/cluster/AbstractServerNode.java index 3a1e0e4be..7d74b6951 100644 --- a/job-core/src/main/java/com/lts/job/core/cluster/AbstractServerNode.java +++ b/job-core/src/main/java/com/lts/job/core/cluster/AbstractServerNode.java @@ -30,8 +30,9 @@ protected void innerStart() { NettyRequestProcessor defaultProcessor = getDefaultProcessor(); if (defaultProcessor != null) { + int processorSize = config.getParameter(Constants.PROCESSOR_THREAD, Constants.DEFAULT_PROCESSOR_THREAD); remotingServer.registerDefaultProcessor(defaultProcessor, - Executors.newFixedThreadPool(32 + Constants.AVAILABLE_PROCESSOR * 5, new NamedThreadFactory(AbstractServerNode.class.getSimpleName()))); + Executors.newFixedThreadPool(processorSize, new NamedThreadFactory(AbstractServerNode.class.getSimpleName()))); } } diff --git a/job-core/src/main/java/com/lts/job/core/cluster/Config.java b/job-core/src/main/java/com/lts/job/core/cluster/Config.java index 1f3510139..7bcda181a 100644 --- a/job-core/src/main/java/com/lts/job/core/cluster/Config.java +++ b/job-core/src/main/java/com/lts/job/core/cluster/Config.java @@ -31,7 +31,7 @@ public class Config { // 监听端口 private int listenPort; // 任务信息存储路径(譬如TaskTracker反馈任务信息给JobTracker, JobTracker down掉了, 那么存储下来等待JobTracker可用时再发送) - private String jobInfoSavePath; + private String failStorePath; // 集群名字 private String clusterName; // java编译器 @@ -105,16 +105,12 @@ public void setListenPort(int listenPort) { this.listenPort = listenPort; } - public String getJobInfoSavePath() { - return jobInfoSavePath; + public void setFailStorePath(String failStorePath) { + this.failStorePath = failStorePath; } - public void setJobInfoSavePath(String jobInfoSavePath) { - this.jobInfoSavePath = jobInfoSavePath + "/.lts"; - } - - public String getFilePath() { - return jobInfoSavePath + "/" + nodeType + "/" + nodeGroup + "/"; + public String getFailStorePath() { + return failStorePath + "/.lts" + "/" + nodeType + "/" + nodeGroup + "/"; } public boolean isAvailable() { @@ -140,12 +136,14 @@ public String getParameter(String key, String defaultValue) { } return value; } + private Map getNumbers() { if (numbers == null) { // 允许并发重复创建 numbers = new ConcurrentHashMap(); } return numbers; } + public boolean getParameter(String key, boolean defaultValue) { String value = getParameter(key); if (value == null || value.length() == 0) { @@ -153,6 +151,7 @@ public boolean getParameter(String key, boolean defaultValue) { } return Boolean.parseBoolean(value); } + public int getParameter(String key, int defaultValue) { Number n = getNumbers().get(key); if (n != null) { @@ -166,6 +165,7 @@ public int getParameter(String key, int defaultValue) { getNumbers().put(key, i); return i; } + public String[] getParameter(String key, String[] defaultValue) { String value = getParameter(key); if (value == null || value.length() == 0) { @@ -173,6 +173,7 @@ public String[] getParameter(String key, String[] defaultValue) { } return Constants.COMMA_SPLIT_PATTERN.split(value); } + public double getParameter(String key, double defaultValue) { Number n = getNumbers().get(key); if (n != null) { @@ -186,6 +187,7 @@ public double getParameter(String key, double defaultValue) { getNumbers().put(key, d); return d; } + public float getParameter(String key, float defaultValue) { Number n = getNumbers().get(key); if (n != null) { diff --git a/job-core/src/main/java/com/lts/job/core/constant/Constants.java b/job-core/src/main/java/com/lts/job/core/constant/Constants.java index f8280036c..6f67b9320 100644 --- a/job-core/src/main/java/com/lts/job/core/constant/Constants.java +++ b/job-core/src/main/java/com/lts/job/core/constant/Constants.java @@ -70,4 +70,7 @@ public interface Constants { // 客户端提交并发请求size public static final String JOB_SUBMIT_CONCURRENCY_SIZE = "job.submit.concurrency.size"; public static final int DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE = Constants.AVAILABLE_PROCESSOR * 4; + + public static final String PROCESSOR_THREAD = "job.processor.thread"; + public static final int DEFAULT_PROCESSOR_THREAD = 32 + AVAILABLE_PROCESSOR * 5; } diff --git a/job-core/src/main/java/com/lts/job/core/factory/JobNodeConfigFactory.java b/job-core/src/main/java/com/lts/job/core/factory/JobNodeConfigFactory.java index 5656b54c8..cdc396392 100644 --- a/job-core/src/main/java/com/lts/job/core/factory/JobNodeConfigFactory.java +++ b/job-core/src/main/java/com/lts/job/core/factory/JobNodeConfigFactory.java @@ -17,7 +17,7 @@ public static Config getDefaultConfig() { config.setRegistryAddress("zookeeper://127.0.0.1:2181"); config.setInvokeTimeoutMillis(1000 * 6); config.setListenPort(0); - config.setJobInfoSavePath(Constants.USER_HOME); + config.setFailStorePath(Constants.USER_HOME); config.setClusterName(Constants.DEFAULT_CLUSTER_NAME); return config; } diff --git a/job-core/src/main/java/com/lts/job/core/failstore/FailStore.java b/job-core/src/main/java/com/lts/job/core/failstore/FailStore.java new file mode 100644 index 000000000..f5f97446e --- /dev/null +++ b/job-core/src/main/java/com/lts/job/core/failstore/FailStore.java @@ -0,0 +1,26 @@ +package com.lts.job.core.failstore; + +import com.lts.job.core.domain.KVPair; + +import java.lang.reflect.Type; +import java.util.List; + +/** + * Created by hugui on 5/21/15. + */ +public interface FailStore { + + public void open() throws FailStoreException; + + public void put(String key, Object value) throws FailStoreException; + + public void delete(String key) throws FailStoreException; + + public void delete(List keys) throws FailStoreException; + + public List> fetchTop(int size, Type type) throws FailStoreException; + + public void close() throws FailStoreException; + + public void destroy() throws FailStoreException; +} diff --git a/job-core/src/main/java/com/lts/job/core/failstore/FailStoreException.java b/job-core/src/main/java/com/lts/job/core/failstore/FailStoreException.java new file mode 100644 index 000000000..c96b5b613 --- /dev/null +++ b/job-core/src/main/java/com/lts/job/core/failstore/FailStoreException.java @@ -0,0 +1,23 @@ +package com.lts.job.core.failstore; + +/** + * Created by hugui on 5/21/15. + */ +public class FailStoreException extends Exception { + + public FailStoreException(String message) { + super(message); + } + + public FailStoreException(String message, Throwable cause) { + super(message, cause); + } + + public FailStoreException(Throwable cause) { + super(cause); + } + + protected FailStoreException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/job-core/src/main/java/com/lts/job/core/failstore/FailStoreFactory.java b/job-core/src/main/java/com/lts/job/core/failstore/FailStoreFactory.java new file mode 100644 index 000000000..4054cedd0 --- /dev/null +++ b/job-core/src/main/java/com/lts/job/core/failstore/FailStoreFactory.java @@ -0,0 +1,16 @@ +package com.lts.job.core.failstore; + +import com.lts.job.core.cluster.Config; +import com.lts.job.core.extension.Adaptive; +import com.lts.job.core.extension.SPI; + +/** + * Created by hugui on 5/21/15. + */ +@SPI("leveldb") +public interface FailStoreFactory { + + @Adaptive("job.fail.store") + public FailStore getFailStore(Config config); + +} diff --git a/job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStore.java b/job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStore.java new file mode 100644 index 000000000..3d24350a2 --- /dev/null +++ b/job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStore.java @@ -0,0 +1,117 @@ +package com.lts.job.core.failstore.leveldb; + +import com.lts.job.core.cluster.Config; +import com.lts.job.core.domain.KVPair; +import com.lts.job.core.failstore.FailStore; +import com.lts.job.core.failstore.FailStoreException; +import com.lts.job.core.file.FileAccessor; +import com.lts.job.core.file.FileException; +import com.lts.job.core.file.FileUtils; +import com.lts.job.core.util.JSONUtils; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by hugui on 5/21/15. + */ +public class LeveldbFailStore implements FailStore { + + // 文件锁 (同一时间只能有一个线程在 检查提交失败的任务) + private FileAccessor dbLock; + /** + * 数据库目录 + */ + private File dbPath; + + private DB db; + + private Options options; + + public LeveldbFailStore(Config config) { + dbPath = FileUtils.createDirIfNotExist(config.getFailStorePath()); + options = new Options(); + try { + dbLock = new FileAccessor(config.getFailStorePath() + "___db.lock"); + dbLock.createIfNotExist(); + } catch (FileException e) { + throw new RuntimeException(e); + } + } + + @Override + public void open() throws FailStoreException { + dbLock.tryLock(); + try { + db = JniDBFactory.factory.open(dbPath, options); + } catch (IOException e) { + throw new FailStoreException(e); + } + } + + @Override + public void put(String key, Object value) throws FailStoreException { + String valueString = JSONUtils.toJSONString(value); + db.put(key.getBytes(), valueString.getBytes()); + } + + @Override + public void delete(String key) throws FailStoreException { + if (key == null) { + return; + } + db.delete(key.getBytes()); + } + + @Override + public void delete(List keys) throws FailStoreException { + if (keys == null || keys.size() == 0) { + return; + } + for (String key : keys) { + delete(key); + } + } + + @Override + public List> fetchTop(int size, Type type) { + List> list = new ArrayList>(size); + DBIterator iterator = db.iterator(); + for (iterator.seekToLast(); iterator.hasPrev(); iterator.prev()) { + String key = new String(iterator.peekPrev().getKey()); + T value = JSONUtils.parse(new String(iterator.peekPrev().getValue()), type); + KVPair pair = new KVPair(key, value); + list.add(pair); + if (list.size() >= size) { + break; + } + } + return list; + } + + @Override + public void close() throws FailStoreException { + try { + db.close(); + dbLock.unlock(); + } catch (IOException e) { + throw new FailStoreException(e); + } + } + + public void destroy() throws FailStoreException { + try { + JniDBFactory.factory.destroy(dbPath, options); + dbLock.delete(); + } catch (IOException e) { + throw new FailStoreException(e); + } + } +} diff --git a/job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStoreFactory.java b/job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStoreFactory.java new file mode 100644 index 000000000..a5c4e473f --- /dev/null +++ b/job-core/src/main/java/com/lts/job/core/failstore/leveldb/LeveldbFailStoreFactory.java @@ -0,0 +1,15 @@ +package com.lts.job.core.failstore.leveldb; + +import com.lts.job.core.cluster.Config; +import com.lts.job.core.failstore.FailStore; +import com.lts.job.core.failstore.FailStoreFactory; + +/** + * Created by hugui on 5/21/15. + */ +public class LeveldbFailStoreFactory implements FailStoreFactory { + @Override + public FailStore getFailStore(Config config) { + return new LeveldbFailStore(config); + } +} diff --git a/job-core/src/main/java/com/lts/job/core/support/LevelDBStore.java b/job-core/src/main/java/com/lts/job/core/support/LevelDBStore.java index 3d76b7a28..8a8615f70 100644 --- a/job-core/src/main/java/com/lts/job/core/support/LevelDBStore.java +++ b/job-core/src/main/java/com/lts/job/core/support/LevelDBStore.java @@ -1,6 +1,8 @@ package com.lts.job.core.support; import com.lts.job.core.domain.KVPair; +import com.lts.job.core.file.FileAccessor; +import com.lts.job.core.file.FileException; import com.lts.job.core.file.FileUtils; import com.lts.job.core.util.JSONUtils; import org.fusesource.leveldbjni.JniDBFactory; @@ -16,10 +18,13 @@ /** * LevelDB 存储 + * * @author Robert HG (254963746@qq.com) on 3/6/15. */ public class LevelDBStore { + // 文件锁 (同一时间只能有一个线程在 检查提交失败的任务) + private FileAccessor dbLock; /** * 数据库目录 */ @@ -32,9 +37,16 @@ public class LevelDBStore { public LevelDBStore(String path) { dbPath = FileUtils.createDirIfNotExist(path); options = new Options(); + try { + dbLock = new FileAccessor(path + "___db.lock"); + dbLock.createIfNotExist(); + } catch (FileException e) { + throw new RuntimeException(e); + } } public void open() throws IOException { + dbLock.tryLock(); db = JniDBFactory.factory.open(dbPath, options); } @@ -91,10 +103,12 @@ public List> getList(int size, Type type) { public void close() throws IOException { db.close(); + dbLock.unlock(); } public void destroy() throws IOException { JniDBFactory.factory.destroy(dbPath, options); + dbLock.delete(); } } diff --git a/job-core/src/main/java/com/lts/job/core/support/RetryScheduler.java b/job-core/src/main/java/com/lts/job/core/support/RetryScheduler.java index fb4ca8ff3..e6d775402 100644 --- a/job-core/src/main/java/com/lts/job/core/support/RetryScheduler.java +++ b/job-core/src/main/java/com/lts/job/core/support/RetryScheduler.java @@ -2,18 +2,20 @@ import com.lts.job.core.Application; import com.lts.job.core.domain.KVPair; -import com.lts.job.core.file.FileAccessor; -import com.lts.job.core.file.FileException; +import com.lts.job.core.extension.ExtensionLoader; +import com.lts.job.core.failstore.FailStore; +import com.lts.job.core.failstore.FailStoreException; +import com.lts.job.core.failstore.FailStoreFactory; import com.lts.job.core.logger.Logger; import com.lts.job.core.logger.LoggerFactory; import com.lts.job.core.util.GenericsUtils; import com.lts.job.core.util.JSONUtils; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -24,24 +26,20 @@ public abstract class RetryScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(RetryScheduler.class); - private Class clazz = GenericsUtils.getSuperClassGenericType(this.getClass()); + private Class type = GenericsUtils.getSuperClassGenericType(this.getClass()); // 定时检查是否有 师表的反馈任务信息(给客户端的) private ScheduledExecutorService RETRY_EXECUTOR_SERVICE; - private LevelDBStore levelDBStore; - // 文件锁 (同一时间只能有一个线程在 检查提交失败的任务) - private com.lts.job.core.file.FileAccessor dbLock; + private ScheduledFuture scheduledFuture; + + private FailStore failStore; // 批量发送的消息数 private int batchSize = 5; public RetryScheduler(Application application) { - try { - levelDBStore = new LevelDBStore(application.getConfig().getFilePath()); - dbLock = new FileAccessor(application.getConfig().getFilePath() + "___db.lock"); - } catch (FileException e) { - throw new RuntimeException(e); - } + FailStoreFactory failStoreFactory = ExtensionLoader.getExtensionLoader(FailStoreFactory.class).getAdaptiveExtension(); + failStore = failStoreFactory.getFailStore(application.getConfig()); } protected RetryScheduler(Application application, int batchSize) { @@ -50,13 +48,10 @@ protected RetryScheduler(Application application, int batchSize) { } public void start() { - if (levelDBStore != null) { - - dbLock.createIfNotExist(); - + if (RETRY_EXECUTOR_SERVICE == null) { RETRY_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); // 这个时间后面再去优化 - RETRY_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CheckRunner(), 30, 30, TimeUnit.SECONDS); + scheduledFuture = RETRY_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CheckRunner(), 30, 30, TimeUnit.SECONDS); } } @@ -64,8 +59,13 @@ public void start() { private int maxSentSize = 20; public void stop() { - RETRY_EXECUTOR_SERVICE.shutdown(); - RETRY_EXECUTOR_SERVICE = null; + try { + scheduledFuture.cancel(true); + RETRY_EXECUTOR_SERVICE.shutdown(); + RETRY_EXECUTOR_SERVICE = null; + } catch (Throwable t) { + LOGGER.error(t.getMessage(), t); + } } /** @@ -80,12 +80,11 @@ public void run() { if (!isRemotingEnable()) { return; } - dbLock.tryLock(); try { - levelDBStore.open(); + failStore.open(); int sentSize = 0; - List> kvPairs = levelDBStore.getList(batchSize, clazz); + List> kvPairs = failStore.fetchTop(batchSize, type); while (kvPairs != null && kvPairs.size() > 0) { List values = new ArrayList(kvPairs.size()); @@ -96,7 +95,7 @@ public void run() { } if (retry(values)) { LOGGER.info("本地任务发送成功, {}", JSONUtils.toJSONString(values)); - levelDBStore.delete(keys); + failStore.delete(keys); } else { break; } @@ -105,15 +104,10 @@ public void run() { // 一次最多提交maxSentSize个, 保证文件所也能被其他线程拿到 break; } - kvPairs = levelDBStore.getList(batchSize, clazz); + kvPairs = failStore.fetchTop(batchSize, type); } } finally { - try { - levelDBStore.close(); - } catch (IOException e) { - LOGGER.error("close leveldb failed", e); - } - dbLock.unlock(); + failStore.close(); } } catch (Throwable e) { LOGGER.error(e.getMessage(), e); @@ -121,22 +115,16 @@ public void run() { } } - public void inSchedule(String key, Object value) { - dbLock.tryLock(); + public void inSchedule(String key, T value) { try { try { - levelDBStore.open(); - } catch (IOException e) { - throw new RuntimeException(e); - } - levelDBStore.put(key, value); - } finally { - try { - levelDBStore.close(); - } catch (IOException e) { - LOGGER.error("close leveldb failed", e); + failStore.open(); + failStore.put(key, value); + } finally { + failStore.close(); } - dbLock.unlock(); + } catch (FailStoreException e) { + LOGGER.error(e.getMessage(), e); } } diff --git a/job-core/src/main/resources/META-INF/lts/internal/com.lts.job.core.failstore.FailStoreFactory b/job-core/src/main/resources/META-INF/lts/internal/com.lts.job.core.failstore.FailStoreFactory new file mode 100644 index 000000000..24257e756 --- /dev/null +++ b/job-core/src/main/resources/META-INF/lts/internal/com.lts.job.core.failstore.FailStoreFactory @@ -0,0 +1 @@ +leveldb=com.lts.job.core.failstore.leveldb.LeveldbFailStoreFactory \ No newline at end of file diff --git a/job-example/src/main/java/com/lts/job/example/api/JobClientTest.java b/job-example/src/main/java/com/lts/job/example/api/JobClientTest.java index 4def1e93a..024c3950d 100644 --- a/job-example/src/main/java/com/lts/job/example/api/JobClientTest.java +++ b/job-example/src/main/java/com/lts/job/example/api/JobClientTest.java @@ -31,7 +31,7 @@ public static void console() throws IOException { jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181"); // jobClient.setRegistryAddress("redis://127.0.0.1:6379"); // 任务重试保存地址,默认用户目录下 -// jobClient.setJobInfoSavePath(Constants.USER_HOME); +// jobClient.setFailStorePath(Constants.USER_HOME); jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl()); jobClient.addMasterChangeListener(new MasterChangeListenerImpl()); // jobClient.setLoadBalance("consistenthash"); @@ -51,7 +51,7 @@ public static void testProtector() { jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181"); // jobClient.setRegistryAddress("redis://127.0.0.1:6379"); // 任务重试保存地址,默认用户目录下 -// jobClient.setJobInfoSavePath(Constants.USER_HOME); +// jobClient.setFailStorePath(Constants.USER_HOME); jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl()); jobClient.addMasterChangeListener(new MasterChangeListenerImpl()); jobClient.addConfig("job.submit.concurrency.size", "3"); diff --git a/job-example/src/main/java/com/lts/job/example/api/TaskTrackerTest.java b/job-example/src/main/java/com/lts/job/example/api/TaskTrackerTest.java index 6ebc1aaf8..18b123448 100644 --- a/job-example/src/main/java/com/lts/job/example/api/TaskTrackerTest.java +++ b/job-example/src/main/java/com/lts/job/example/api/TaskTrackerTest.java @@ -18,7 +18,7 @@ public static void main(String[] args) { taskTracker.setNodeGroup("test_trade_TaskTracker"); taskTracker.setClusterName("test_cluster"); taskTracker.setWorkThreads(20); -// taskTracker.setJobInfoSavePath(Constants.USER_HOME); +// taskTracker.setFailStorePath(Constants.USER_HOME); taskTracker.addMasterChangeListener(new MasterChangeListenerImpl()); // taskTracker.setBizLoggerLevel(Level.INFO); // 业务日志级别 taskTracker.start(); diff --git a/job-extensions/job-ext-spring/src/main/java/com/lts/job/spring/JobClientFactoryBean.java b/job-extensions/job-ext-spring/src/main/java/com/lts/job/spring/JobClientFactoryBean.java index 9b88a86d0..00af55940 100644 --- a/job-extensions/job-ext-spring/src/main/java/com/lts/job/spring/JobClientFactoryBean.java +++ b/job-extensions/job-ext-spring/src/main/java/com/lts/job/spring/JobClientFactoryBean.java @@ -40,7 +40,7 @@ public class JobClientFactoryBean implements FactoryBean, Initializin /** * 提交失败任务存储路径 , 默认用户木邻居 */ - private String jobInfoSavePath; + private String failStorePath; /** * master节点变化监听器 */ @@ -104,7 +104,7 @@ public void afterPropertiesSet() throws Exception { if (StringUtils.hasText(clusterName)) { jobClient.setClusterName(clusterName); } - jobClient.setJobInfoSavePath(jobInfoSavePath); + jobClient.setFailStorePath(failStorePath); jobClient.setNodeGroup(nodeGroup); if (jobFinishedHandler != null) { jobClient.setJobFinishedHandler(jobFinishedHandler); @@ -141,8 +141,8 @@ public void setJobFinishedHandler(JobFinishedHandler jobFinishedHandler) { this.jobFinishedHandler = jobFinishedHandler; } - public void setJobInfoSavePath(String jobInfoSavePath) { - this.jobInfoSavePath = jobInfoSavePath; + public void setFailStorePath(String failStorePath) { + this.failStorePath = failStorePath; } public void setMasterChangeListeners(MasterChangeListener[] masterChangeListeners) { diff --git a/job-extensions/job-ext-spring/src/main/java/com/lts/job/spring/TaskTrackerFactoryBean.java b/job-extensions/job-ext-spring/src/main/java/com/lts/job/spring/TaskTrackerFactoryBean.java index 922bf3f7a..41427a9ed 100644 --- a/job-extensions/job-ext-spring/src/main/java/com/lts/job/spring/TaskTrackerFactoryBean.java +++ b/job-extensions/job-ext-spring/src/main/java/com/lts/job/spring/TaskTrackerFactoryBean.java @@ -41,7 +41,7 @@ public class TaskTrackerFactoryBean implements FactoryBean, Applica /** * 提交失败任务存储路径 , 默认用户木邻居 */ - private String jobInfoSavePath; + private String failStorePath; /** * 工作线程个数 */ @@ -97,7 +97,7 @@ public void afterPropertiesSet() throws Exception { if (StringUtils.hasText(clusterName)) { taskTracker.setClusterName(clusterName); } - taskTracker.setJobInfoSavePath(jobInfoSavePath); + taskTracker.setFailStorePath(failStorePath); taskTracker.setWorkThreads(workThreads); taskTracker.setNodeGroup(nodeGroup); taskTracker.setRegistryAddress(registryAddress); @@ -146,8 +146,8 @@ public void setRegistryAddress(String registryAddress) { this.registryAddress = registryAddress; } - public void setJobInfoSavePath(String jobInfoSavePath) { - this.jobInfoSavePath = jobInfoSavePath; + public void setFailStorePath(String failStorePath) { + this.failStorePath = failStorePath; } public void setMasterChangeListeners(MasterChangeListener[] masterChangeListeners) { diff --git a/job-tracker/src/main/java/com/lts/job/tracker/processor/JobPullProcessor.java b/job-tracker/src/main/java/com/lts/job/tracker/processor/JobPullProcessor.java index b7b5d13f4..06bde4c7f 100644 --- a/job-tracker/src/main/java/com/lts/job/tracker/processor/JobPullProcessor.java +++ b/job-tracker/src/main/java/com/lts/job/tracker/processor/JobPullProcessor.java @@ -1,9 +1,5 @@ package com.lts.job.tracker.processor; -import com.lts.job.core.constant.Constants; -import com.lts.job.core.factory.NamedThreadFactory; -import com.lts.job.core.logger.Logger; -import com.lts.job.core.logger.LoggerFactory; import com.lts.job.core.protocol.JobProtos; import com.lts.job.core.protocol.command.JobPullRequest; import com.lts.job.core.remoting.RemotingServerDelegate; @@ -13,43 +9,27 @@ import com.lts.job.tracker.support.JobDistributor; import io.netty.channel.ChannelHandlerContext; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - /** * @author Robert HG (254963746@qq.com) on 7/24/14. * 处理 TaskTracker的 Job pull 请求 */ public class JobPullProcessor extends AbstractProcessor { - private final static Logger LOGGER = LoggerFactory.getLogger(JobPullProcessor.class.getSimpleName()); - - private final ExecutorService executor; private JobDistributor jobDistributor; public JobPullProcessor(RemotingServerDelegate remotingServer, JobTrackerApplication application) { super(remotingServer, application); - executor = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 5 - , new NamedThreadFactory(JobPullProcessor.class.getSimpleName())); jobDistributor = new JobDistributor(application); } @Override public RemotingCommand processRequest(final ChannelHandlerContext ctx, final RemotingCommand request) throws RemotingCommandException { - final JobPullRequest requestBody = request.getBody(); - - executor.submit(new Runnable() { - @Override - public void run() { - try { - jobDistributor.pushJob(remotingServer, requestBody); - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); - } - } - }); + JobPullRequest requestBody = request.getBody(); + + jobDistributor.distribute(remotingServer, requestBody); + return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.JOB_PULL_SUCCESS.code(), ""); } } diff --git a/job-tracker/src/main/java/com/lts/job/tracker/support/JobDistributor.java b/job-tracker/src/main/java/com/lts/job/tracker/support/JobDistributor.java index 86339fc46..5046c4032 100644 --- a/job-tracker/src/main/java/com/lts/job/tracker/support/JobDistributor.java +++ b/job-tracker/src/main/java/com/lts/job/tracker/support/JobDistributor.java @@ -1,7 +1,9 @@ package com.lts.job.tracker.support; +import com.lts.job.core.constant.Constants; import com.lts.job.core.domain.Job; import com.lts.job.core.exception.RemotingSendException; +import com.lts.job.core.factory.NamedThreadFactory; import com.lts.job.core.logger.Logger; import com.lts.job.core.logger.LoggerFactory; import com.lts.job.core.protocol.JobProtos; @@ -18,6 +20,8 @@ import com.lts.job.tracker.domain.TaskTrackerNode; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * @author Robert HG (254963746@qq.com) on 8/18/14. @@ -27,9 +31,25 @@ public class JobDistributor { private final Logger LOGGER = LoggerFactory.getLogger(JobDistributor.class); private JobTrackerApplication application; + private final ExecutorService executor; public JobDistributor(JobTrackerApplication application) { this.application = application; + executor = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 5 + , new NamedThreadFactory(JobDistributor.class.getSimpleName())); + } + + public void distribute(final RemotingServerDelegate remotingServer, final JobPullRequest request) { + executor.submit(new Runnable() { + @Override + public void run() { + try { + _sendJob(remotingServer, request); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } + }); } /** @@ -39,7 +59,7 @@ public JobDistributor(JobTrackerApplication application) { * @param remotingServer * @param request */ - public void pushJob(RemotingServerDelegate remotingServer, JobPullRequest request) { + private void _sendJob(RemotingServerDelegate remotingServer, JobPullRequest request) { String nodeGroup = request.getNodeGroup(); String identity = request.getIdentity(); @@ -56,7 +76,7 @@ public void pushJob(RemotingServerDelegate remotingServer, JobPullRequest reques while (availableThreads > 0) { // 推送任务 - int code = pushJob(remotingServer, taskTrackerNode); + int code = sendJob(remotingServer, taskTrackerNode); if (code == NO_JOB) { // 没有可以执行的任务, 直接停止 break; @@ -83,7 +103,7 @@ public void pushJob(RemotingServerDelegate remotingServer, JobPullRequest reques * @param taskTrackerNode * @return */ - private int pushJob(RemotingServerDelegate remotingServer, TaskTrackerNode taskTrackerNode) { + private int sendJob(RemotingServerDelegate remotingServer, TaskTrackerNode taskTrackerNode) { String nodeGroup = taskTrackerNode.getNodeGroup(); String identity = taskTrackerNode.getIdentity();