From 543c79bdd95f9ad4988f1201ec2a8e59d8e4045e Mon Sep 17 00:00:00 2001 From: hugui <254963746@qq.com> Date: Tue, 20 Oct 2015 07:36:59 +0800 Subject: [PATCH] 1.6.1 --- .../java/com/lts/queue/AbstractPreLoader.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java b/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java index 0780873b5..c3196ae52 100644 --- a/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java +++ b/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java @@ -27,8 +27,8 @@ public abstract class AbstractPreLoader implements PreLoader { // 当前节点的序号 private int curSequence = 0; private int totalNodes = 1; - // 没个节点的步长 - protected int step = 500; + // 每个节点的步长 + protected int loadSize = 500; // 预取阀值 private double factor = 0.5; @@ -42,15 +42,19 @@ public abstract class AbstractPreLoader implements PreLoader { public AbstractPreLoader(final Application application) { if (start.compareAndSet(false, true)) { + + loadSize = application.getConfig().getParameter("job.preloader.size", 500); + factor = application.getConfig().getParameter("job.preloader.factor", 0.5); + scheduledFuture = LOAD_EXECUTOR_SERVICE.scheduleWithFixedDelay(new Runnable() { @Override public void run() { for (String loadTaskTrackerNodeGroup : LOAD_SIGNAL) { JobPriorityBlockingQueue queue = JOB_MAP.get(loadTaskTrackerNodeGroup); - if (queue.size() / step < factor) { + if (queue.size() / loadSize < factor) { // load - List loads = load(loadTaskTrackerNodeGroup, curSequence * step); + List loads = load(loadTaskTrackerNodeGroup, curSequence * (loadSize - queue.size())); // 加入到内存中 if (CollectionUtils.isNotEmpty(loads)) { for (JobPo load : loads) { @@ -134,14 +138,14 @@ public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity) { private JobPo get(String taskTrackerNodeGroup) { JobPriorityBlockingQueue queue = JOB_MAP.get(taskTrackerNodeGroup); if (queue == null) { - queue = new JobPriorityBlockingQueue(step); + queue = new JobPriorityBlockingQueue(loadSize); JobPriorityBlockingQueue oldQueue = JOB_MAP.putIfAbsent(taskTrackerNodeGroup, queue); if (oldQueue != null) { queue = oldQueue; } } - if (queue.size() / step < factor) { + if (queue.size() / loadSize < factor) { // 触发加载的请求 if (!LOAD_SIGNAL.contains(taskTrackerNodeGroup)) { LOAD_SIGNAL.add(taskTrackerNodeGroup);