Skip to content

Commit

Permalink
1.0.3
Browse files Browse the repository at this point in the history
async event worker support
  • Loading branch information
leonchen83 committed Aug 25, 2016
1 parent a54370a commit 972d530
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 71 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ rdb version 7
<dependency>
<groupId>com.moilioncircle</groupId>
<artifactId>redis-replicator</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
</dependency>
```

Expand Down Expand Up @@ -41,7 +41,7 @@ rdb version 7
##File

```java
RedisReplicator replicator = new RedisReplicator(new File("dump.rdb"));
RedisReplicator replicator = new RedisReplicator(new File("dump.rdb"), Configuration.defaultSetting());
replicator.addRdbFilter(new RdbFilter() {
@Override
public boolean accept(KeyValuePair<?> kv) {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<groupId>com.moilioncircle</groupId>
<artifactId>redis-replicator</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>

<name>redis-replicator</name>
<description>Redis Replicator is a redis RDB and Command parser written in java.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

Expand All @@ -32,11 +33,13 @@
*/
public abstract class AbstractReplicator implements Replicator {
protected RedisInputStream inputStream;
protected BlockingQueue<Object> eventQueue;
protected final ConcurrentHashMap<CommandName, CommandParser<? extends Command>> commands = new ConcurrentHashMap<>();
protected final List<CommandFilter> filters = new CopyOnWriteArrayList<>();
protected final List<CommandListener> listeners = new CopyOnWriteArrayList<>();
protected final List<RdbFilter> rdbFilters = new CopyOnWriteArrayList<>();
protected final List<RdbListener> rdbListeners = new CopyOnWriteArrayList<>();
protected final EventHandlerWorker worker = new EventHandlerWorker(this);

@Override
public void doCommandHandler(Command command) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public static Configuration defaultSetting() {
*/
private String masterRunId = "?";

/**
* blocking queue size
*/
private int eventQueueSize = 1000;

/**
* psync offset
*/
Expand Down Expand Up @@ -179,4 +184,13 @@ public Configuration addOffset(long offset) {
this.offset.addAndGet(offset);
return this;
}

public int getEventQueueSize() {
return eventQueueSize;
}

public Configuration setEventQueueSize(int eventQueueSize) {
this.eventQueueSize = eventQueueSize;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2016 leon chen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.moilioncircle.redis.replicator;

import com.moilioncircle.redis.replicator.cmd.Command;
import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Created by leon on 8/25/16.
*/
class EventHandlerWorker extends Thread implements Closeable {
private static final Log logger = LogFactory.getLog(EventHandlerWorker.class);

private final AbstractReplicator replicator;
private AtomicBoolean isClosed = new AtomicBoolean(false);

public EventHandlerWorker(AbstractReplicator replicator) {
this.replicator = replicator;
setDaemon(true);
setName("event-handler-worker");
}

@Override
public void run() {
while (!isClosed.get()) {
try {
Object object = replicator.eventQueue.take();
if (object instanceof KeyValuePair<?>) {
KeyValuePair<?> kv = (KeyValuePair<?>) object;
if (!replicator.doRdbFilter(kv)) continue;
replicator.doRdbHandler(kv);
} else if (object instanceof Command) {
Command command = (Command) object;
if (!replicator.doCommandFilter(command)) continue;
replicator.doCommandHandler(command);
} else {
throw new AssertionError(object);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Throwable e) {
exceptionHandler(e);
}
}
}

@Override
public void close() throws IOException {
isClosed.compareAndSet(false, true);
}

public boolean isClosed() {
return isClosed.get();
}

protected void exceptionHandler(Throwable e) {
logger.error(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,32 @@
import com.moilioncircle.redis.replicator.rdb.RdbParser;

import java.io.*;
import java.util.concurrent.ArrayBlockingQueue;

/**
* Created by leon on 8/13/16.
*/
public class RedisFileReplicator extends AbstractReplicator {
class RedisFileReplicator extends AbstractReplicator {

public RedisFileReplicator(File file) throws FileNotFoundException {
this.inputStream = new RedisInputStream(new FileInputStream(file));
public RedisFileReplicator(File file, Configuration configuration) throws FileNotFoundException {
this(new RedisInputStream(new FileInputStream(file)), configuration);
}

public RedisFileReplicator(InputStream in) {
this.inputStream = new RedisInputStream(in);
public RedisFileReplicator(InputStream in, Configuration configuration) {
this.inputStream = new RedisInputStream(in, configuration.getBufferSize());
this.eventQueue = new ArrayBlockingQueue<>(configuration.getEventQueueSize());
}

@Override
public void open() throws IOException {
RdbParser parser = new RdbParser(inputStream, this);
worker.start();
RdbParser parser = new RdbParser(inputStream, this, this.eventQueue);
parser.parse();
}

@Override
public void close() throws IOException {
inputStream.close();
if (worker != null && !worker.isClosed()) worker.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
public class RedisReplicator implements Replicator {
private final Replicator replicator;

public RedisReplicator(File file) throws FileNotFoundException {
replicator = new RedisFileReplicator(file);
public RedisReplicator(File file, Configuration configuration) throws FileNotFoundException {
replicator = new RedisFileReplicator(file, configuration);
}

public RedisReplicator(InputStream in) {
replicator = new RedisFileReplicator(in);
public RedisReplicator(InputStream in, Configuration configuration) {
replicator = new RedisFileReplicator(in, configuration);
}

public RedisReplicator(String host, int port, Configuration configuration) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.moilioncircle.redis.replicator.Constants.DOLLAR;
Expand All @@ -39,7 +41,7 @@
/**
* Created by leon on 8/9/16.
*/
public class RedisSocketReplicator extends AbstractReplicator {
class RedisSocketReplicator extends AbstractReplicator {

private static final Log logger = LogFactory.getLog(RedisSocketReplicator.class);

Expand All @@ -57,6 +59,7 @@ public RedisSocketReplicator(String host, int port, Configuration configuration)
this.host = host;
this.port = port;
this.configuration = configuration;
this.eventQueue = new ArrayBlockingQueue<>(configuration.getEventQueueSize());
buildInCommandParserRegister();
}

Expand All @@ -67,6 +70,7 @@ public RedisSocketReplicator(String host, int port, Configuration configuration)
*/
@Override
public void open() throws IOException {
worker.start();
for (int i = 0; i < configuration.getRetries(); i++) {
try {

Expand Down Expand Up @@ -126,22 +130,21 @@ public void handle(long len) {
CommandParser<? extends Command> operations = commands.get(cmdName);
Command parsedCommand = operations.parse(cmdName, params);

//do command filter
if (!doCommandFilter(parsedCommand)) continue;

//do command handler
doCommandHandler(parsedCommand);
//submit event
this.eventQueue.put(parsedCommand);
} else {
if (logger.isInfoEnabled()) logger.info("Redis reply:" + obj);
}
}
//connected = false
break;
} catch (SocketException | SocketTimeoutException e) {
} catch (SocketException | SocketTimeoutException | InterruptedException e) {
logger.error(e);
//when close socket manual
if (!connected.get()) {
break;
}
//connect refused
//connect timeout
//read timeout
//connect abort
Expand All @@ -150,13 +153,15 @@ public void handle(long len) {
logger.info("retry connect to redis.");
}
}
//
if (worker != null && !worker.isClosed()) worker.close();
}

private SyncMode trySync(final String reply) throws IOException {
logger.info(reply);
if (reply.startsWith("FULLRESYNC")) {
//sync dump
parseDump(this);
parseDump(this, this.eventQueue);
//after parsed dump file,cache master run id and offset so that next psync.
String[] ary = reply.split(" ");
configuration.setMasterRunId(ary[1]);
Expand All @@ -169,12 +174,12 @@ private SyncMode trySync(final String reply) throws IOException {
//server don't support psync
logger.info("SYNC");
send("SYNC".getBytes());
parseDump(this);
parseDump(this, this.eventQueue);
return SyncMode.SYNC;
}
}

private void parseDump(final Replicator replicator) throws IOException {
private void parseDump(final AbstractReplicator replicator, final BlockingQueue<Object> eventQueue) throws IOException {
//sync dump
String reply = (String) replyParser.parse(new BulkReplyHandler() {
@Override
Expand All @@ -184,7 +189,7 @@ public String handle(long len, RedisInputStream in) throws IOException {
logger.info("Discard " + len + " bytes");
in.skip(len);
} else {
RdbParser parser = new RdbParser(in, replicator);
RdbParser parser = new RdbParser(in, replicator, eventQueue);
parser.parse();
}
return "OK";
Expand Down Expand Up @@ -291,6 +296,10 @@ private void connect() throws IOException {
replyParser = new ReplyParser(inputStream);
}

private void close0() throws IOException {

}

@Override
public void close() throws IOException {
if (!connected.compareAndSet(true, false)) return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.moilioncircle.redis.replicator.rdb;

import com.moilioncircle.redis.replicator.Replicator;
import com.moilioncircle.redis.replicator.AbstractReplicator;
import com.moilioncircle.redis.replicator.io.RedisInputStream;
import com.moilioncircle.redis.replicator.util.Lzf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;

import static com.moilioncircle.redis.replicator.Constants.*;

Expand All @@ -18,14 +19,17 @@ public abstract class AbstractRdbParser {

protected final RedisInputStream in;

protected final Replicator replicator;
protected final AbstractReplicator replicator;

public AbstractRdbParser(RedisInputStream in, Replicator replicator) {
protected final BlockingQueue<Object> eventQueue;

public AbstractRdbParser(RedisInputStream in, AbstractReplicator replicator, BlockingQueue<Object> eventQueue) {
this.in = in;
this.replicator = replicator;
this.eventQueue = eventQueue;
}

protected long rdbLoad() throws IOException {
protected long rdbLoad() throws IOException, InterruptedException {
throw new UnsupportedOperationException("rdbLoad()");
}

Expand Down
Loading

0 comments on commit 972d530

Please sign in to comment.