Skip to content

Commit

Permalink
Merge pull request #131 from qq254963746/develop
Browse files Browse the repository at this point in the history
1.6.1
  • Loading branch information
qq254963746 committed Oct 19, 2015
2 parents f18269d + 543c79b commit 9b3dc6b
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public abstract class AbstractPreLoader implements PreLoader {
// 当前节点的序号
private int curSequence = 0;
private int totalNodes = 1;
// 没个节点的步长
protected int step = 500;
// 每个节点的步长
protected int loadSize = 500;
// 预取阀值
private double factor = 0.5;

Expand All @@ -42,15 +42,19 @@ public abstract class AbstractPreLoader implements PreLoader {

public AbstractPreLoader(final Application application) {
if (start.compareAndSet(false, true)) {

loadSize = application.getConfig().getParameter("job.preloader.size", 500);
factor = application.getConfig().getParameter("job.preloader.factor", 0.5);

scheduledFuture = LOAD_EXECUTOR_SERVICE.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {

for (String loadTaskTrackerNodeGroup : LOAD_SIGNAL) {
JobPriorityBlockingQueue queue = JOB_MAP.get(loadTaskTrackerNodeGroup);
if (queue.size() / step < factor) {
if (queue.size() / loadSize < factor) {
// load
List<JobPo> loads = load(loadTaskTrackerNodeGroup, curSequence * step);
List<JobPo> loads = load(loadTaskTrackerNodeGroup, curSequence * (loadSize - queue.size()));
// 加入到内存中
if (CollectionUtils.isNotEmpty(loads)) {
for (JobPo load : loads) {
Expand Down Expand Up @@ -134,14 +138,14 @@ public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity) {
private JobPo get(String taskTrackerNodeGroup) {
JobPriorityBlockingQueue queue = JOB_MAP.get(taskTrackerNodeGroup);
if (queue == null) {
queue = new JobPriorityBlockingQueue(step);
queue = new JobPriorityBlockingQueue(loadSize);
JobPriorityBlockingQueue oldQueue = JOB_MAP.putIfAbsent(taskTrackerNodeGroup, queue);
if (oldQueue != null) {
queue = oldQueue;
}
}

if (queue.size() / step < factor) {
if (queue.size() / loadSize < factor) {
// 触发加载的请求
if (!LOAD_SIGNAL.contains(taskTrackerNodeGroup)) {
LOAD_SIGNAL.add(taskTrackerNodeGroup);
Expand Down

0 comments on commit 9b3dc6b

Please sign in to comment.