Skip to content

Commit

Permalink
Return replies (async)
Browse files Browse the repository at this point in the history
Issues:
    In test, last GET operation is not populating reply.
    But if we add a PING, not only we get the reply of GET but also PING.
  • Loading branch information
sazzad16 committed Dec 6, 2023
1 parent 004846e commit 06feae3
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 99 deletions.
148 changes: 148 additions & 0 deletions src/main/java/redis/clients/jedis/asyncio/CommandArguments.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package redis.clients.jedis.asyncio;

import redis.clients.jedis.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;

import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.args.RawableFactory;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.params.IParams;
import redis.clients.jedis.search.RediSearchUtil;

public class CommandArguments implements Iterable<Rawable> {

private final ArrayList<Rawable> args;

// private boolean blocking;

private CommandArguments() {
throw new InstantiationError();
}

public CommandArguments(ProtocolCommand command) {
args = new ArrayList<>();
args.add(command);
}

public ProtocolCommand getCommand() {
return (ProtocolCommand) args.get(0);
}

public CommandArguments add(Object arg) {
if (arg == null) {
throw new IllegalArgumentException("null is not a valid argument.");
} else if (arg instanceof Rawable) {
args.add((Rawable) arg);
} else if (arg instanceof byte[]) {
args.add(RawableFactory.from((byte[]) arg));
} else if (arg instanceof Integer) {
args.add(RawableFactory.from((Integer) arg));
} else if (arg instanceof Double) {
args.add(RawableFactory.from((Double) arg));
} else if (arg instanceof Boolean) {
args.add(RawableFactory.from((Boolean) arg ? 1 : 0));
} else if (arg instanceof float[]) {
args.add(RawableFactory.from(RediSearchUtil.toByteArray((float[]) arg)));
} else if (arg instanceof String) {
args.add(RawableFactory.from((String) arg));
} else if (arg instanceof GeoCoordinate) {
GeoCoordinate geo = (GeoCoordinate) arg;
args.add(RawableFactory.from(geo.getLongitude() + "," + geo.getLatitude()));
} else {
args.add(RawableFactory.from(String.valueOf(arg)));
}
return this;
}

public CommandArguments addObjects(Object... args) {
for (Object arg : args) {
add(arg);
}
return this;
}

public CommandArguments addObjects(Collection args) {
args.forEach(arg -> add(arg));
return this;
}

public CommandArguments key(Object key) {
if (key instanceof Rawable) {
Rawable raw = (Rawable) key;
// processKey(raw.getRaw());
args.add(raw);
} else if (key instanceof byte[]) {
byte[] raw = (byte[]) key;
// processKey(raw);
args.add(RawableFactory.from(raw));
} else if (key instanceof String) {
String raw = (String) key;
// processKey(raw);
args.add(RawableFactory.from(raw));
} else {
throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
}
return this;
}

public final CommandArguments keys(Object... keys) {
for (Object key : keys) {
key(key);
}
return this;
}

public final CommandArguments keys(Collection keys) {
keys.forEach(key -> key(key));
return this;
}

// public final CommandArguments addParams(IParams params) {
// params.addParams(this);
// return this;
// }
//
// protected CommandArguments processKey(byte[] key) {
// // do nothing
// return this;
// }
//
// protected final CommandArguments processKeys(byte[]... keys) {
// for (byte[] key : keys) {
// processKey(key);
// }
// return this;
// }
//
// protected CommandArguments processKey(String key) {
// // do nothing
// return this;
// }
//
// protected final CommandArguments processKeys(String... keys) {
// for (String key : keys) {
// processKey(key);
// }
// return this;
// }

public int size() {
return args.size();
}

@Override
public Iterator<Rawable> iterator() {
return args.iterator();
}
//
// public boolean isBlocking() {
// return blocking;
// }
//
// public CommandArguments blocking() {
// this.blocking = true;
// return this;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) throws
try {
buf.markWriterIndex();

if (msg instanceof List) {
if (msg instanceof CommandObject) {
write(buf, ((CommandObject<?>) msg).getArguments());
} else if (msg instanceof List) {
write(buf, (List<String>) msg);
} else if (msg instanceof String) {
write(buf, Arrays.asList(((String) msg).split("\\s+")));
Expand All @@ -31,14 +33,14 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) throws
}
}

private void write(ByteBuf buf, List<String> msg) throws Exception {
private void write(ByteBuf buf, List<String> args) throws Exception {

buf.writeByte('*');
buf.writeBytes(Protocol.toByteArray(msg.size()));
buf.writeBytes(Protocol.toByteArray(args.size()));
buf.writeByte('\r');
buf.writeByte('\n');

msg.forEach(arg -> {
args.forEach(arg -> {
buf.writeByte('$');
buf.writeBytes(Protocol.toByteArray(arg.length()));
buf.writeByte('\r');
Expand All @@ -48,4 +50,23 @@ private void write(ByteBuf buf, List<String> msg) throws Exception {
buf.writeByte('\n');
});
}

private void write(ByteBuf buf, CommandArguments args) throws Exception {

buf.writeByte('*');
buf.writeBytes(Protocol.toByteArray(args.size()));
buf.writeByte('\r');
buf.writeByte('\n');

args.forEach(arg -> {
byte[] raw = arg.getRaw();
buf.writeByte('$');
buf.writeBytes(Protocol.toByteArray(raw.length));
buf.writeByte('\r');
buf.writeByte('\n');
buf.writeBytes(raw);
buf.writeByte('\r');
buf.writeByte('\n');
});
}
}
23 changes: 23 additions & 0 deletions src/main/java/redis/clients/jedis/asyncio/CommandObject.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package redis.clients.jedis.asyncio;

import java.util.concurrent.CompletableFuture;
import redis.clients.jedis.asyncio.replies.CommandReply;

public class CommandObject<T> extends CompletableFuture<T> {

private final CommandArguments arguments;
private final CommandReply<T> reply;

public CommandObject(CommandArguments arguments, CommandReply<T> reply) {
this.arguments = arguments;
this.reply = reply;
}

public CommandArguments getArguments() {
return arguments;
}

public CommandReply<T> getReply() {
return reply;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
public class CommandResponseHandler extends ChannelDuplexHandler {

protected final ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(8192 * 8);
// protected volatile Deque<ChannelPromise> promiseList = new ArrayDeque<>(512);
protected volatile Deque<CommandObject<?>> commands = new ArrayDeque<>(512);

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// promiseList.add(promise);
commands.add((CommandObject<?>) msg);
ctx.write(msg, promise);
}

Expand All @@ -45,24 +45,28 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) {
ByteBuffer bytes;

byte type = buffer.readByte();
System.out.println((char) type);
// System.out.println((char) type);

CommandObject object = commands.peek();

switch (type) {
// TODO: skip push
//
case PLUS_BYTE:
bytes = readLine(buffer);
object.getReply().parse(bytes);
break;
case DOLLAR_BYTE:
end = findLineEnd(buffer);
length = (int) readLong(buffer, buffer.readerIndex(), end);
bytes = readBytes(buffer, length);
object.getReply().parse(bytes);
break;
default:
throw new JedisConnectionException("Unknown reply: " + (char) type);
}
// promiseList.remove();
System.out.println(StandardCharsets.US_ASCII.decode(bytes).toString());
commands.remove();
// System.out.println(StandardCharsets.US_ASCII.decode(bytes).toString());
}

private int findLineEnd(ByteBuf buffer) {
Expand Down
20 changes: 12 additions & 8 deletions src/main/java/redis/clients/jedis/asyncio/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ public void close() throws IOException {
}
}

public void executeCommand(String line) throws InterruptedException {
ChannelFuture writeFuture = channel.writeAndFlush(line);
writeFuture.addListener((ChannelFuture future) -> {
if (!future.isSuccess()) {
logger.error("Write failed", future.cause());
}
});
writeFuture.sync();
public <T> CommandObject<T> executeCommand(CommandObject<T> command) throws InterruptedException {
try {
ChannelFuture writeFuture = channel.writeAndFlush(command);
writeFuture.addListener((ChannelFuture future) -> {
if (!future.isSuccess()) {
logger.error("Write failed", future.cause());
}
});
writeFuture.sync();
} finally {
return command;
}
}
}
77 changes: 0 additions & 77 deletions src/main/java/redis/clients/jedis/asyncio/RedisClientHandler.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package redis.clients.jedis.asyncio.replies;

import java.nio.ByteBuffer;

public class CommandReply<T> {

private T reply = null;

public CommandReply() {
}

public T get() {
return reply;
}

public void setReply(T reply) {
this.reply = reply;
}

public void parse(ByteBuffer bytes) {
// override
}
}
Loading

0 comments on commit 06feae3

Please sign in to comment.