Skip to content
This repository has been archived by the owner on Nov 26, 2020. It is now read-only.

Commit

Permalink
Add async handler for netty and nio buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ardikars committed Jan 3, 2019
1 parent 5ed06ec commit 0577d7a
Show file tree
Hide file tree
Showing 19 changed files with 304 additions and 204 deletions.
4 changes: 2 additions & 2 deletions gradle/configure.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ext {

NAME = 'Jxnet'
GROUP = 'com.ardikars.jxnet'
VERSION = '1.5.3.RC6'
VERSION = '1.5.3.RC7'
DESCRIPTION = 'Jxnet is a java library for capturing and sending network packet.'

NDK_HOME = "${System.env.NDK_HOME}"
Expand All @@ -19,7 +19,7 @@ ext {
MAVEN_LOCAL_REPOSITORY = "${rootDir}/build/repository"
// MAVEN_LOCAL_REPOSITORY = "${System.env.HOME}/.m2/repository"

JAVA_VERSION = '1.8'
JAVA_VERSION = '1.7'
JUNIT_VERSION = '4.12'
MOCKITO_VERSION = '2.13.0'
GRADLE_VERSION = '5.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;

/**
* Abstract jxnet spring boot runner.
* @param <T> type.
* @since 1.5.3
*/
@Order(Integer.MIN_VALUE)
public abstract class AbstractJxnetApplicationRunner<T> implements CommandLineRunner {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright (C) 2015-2018 Jxnet
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package com.ardikars.jxnet.spring.boot.autoconfigure;

import com.ardikars.common.annotation.Incubating;
import com.ardikars.jxnet.PcapPktHdr;
import com.ardikars.jxpacket.common.Packet;

import java.util.concurrent.ExecutionException;

/**
* Handler
* @param <T> arg type.
* @param <V> packet type.
* @author <a href="mailto:[email protected]">Ardika Rommy Sanjaya</a>
* @since 1.5.3
*/
@Incubating
public interface Handler<T, V> {

/**
* Next available packet.
* @param argument user argument.
* @param packet a tuple of {@link PcapPktHdr} and {@link Packet}.
* @throws ExecutionException execution exception.
* @throws InterruptedException interrupted exception.
*/
void next(T argument, V packet) throws ExecutionException, InterruptedException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright (C) 2015-2018 Jxnet
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package com.ardikars.jxnet.spring.boot.autoconfigure;

import com.ardikars.common.annotation.Incubating;
import com.ardikars.jxnet.DataLinkType;
import com.ardikars.jxnet.context.Context;
import com.ardikars.jxnet.spring.boot.autoconfigure.constant.JxnetObjectName;
import com.ardikars.jxpacket.common.Packet;
import com.ardikars.jxpacket.common.UnknownPacket;
import com.ardikars.jxpacket.core.ethernet.Ethernet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

/**
*
* @param <T> type.
* @param <V> packet type.
* @author <a href="mailto:[email protected]">Ardika Rommy Sanjaya</a>
*/
@Incubating
public class HandlerConfigurer<T, V> {

@Autowired
@Qualifier(JxnetObjectName.EXECUTOR_SERVICE_BEAN_NAME)
protected ExecutorService executorService;

@Autowired
@Qualifier(JxnetObjectName.CONTEXT_BEAN_NAME)
protected Context context;

@Autowired
@Qualifier(JxnetObjectName.DATALINK_TYPE_BEAN_NAME)
protected DataLinkType dataLinkType;

@Autowired
private Handler<T, V> handler;

/**
* Decode buffer.
* @param bytes direct byte buffer.
* @return returns {@link Packet}.
*/
public Packet decode(ByteBuffer bytes) {
ByteBuf buffer = Unpooled.wrappedBuffer(bytes);
Packet packet;
if (dataLinkType.getValue() == 1) {
packet = Ethernet.newPacket(buffer);
} else {
packet = UnknownPacket.newPacket(buffer);
}
return packet;
}

/**
* Get handler.
* @return returns {@link Handler} implementation.
*/
public Handler<T, V> getHandler() {
return handler;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,13 @@
import com.ardikars.jxnet.PcapPktHdr;
import com.ardikars.jxpacket.common.Packet;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* Callback function used for capturing packets.
*
* @author <a href="mailto:[email protected]">Ardika Rommy Sanjaya</a>
* @since 1.4.9
*/
public interface JxpacketAsyncHandler<T> {

/**
* Next available packet.
* @param argument user argument.
* @param packet a tuple of {@link PcapPktHdr} and {@link Packet}.
* @throws ExecutionException execution exception.
* @throws InterruptedException interrupted exception.
*/
void next(T argument, CompletableFuture<Pair<PcapPktHdr, Packet>> packet) throws ExecutionException, InterruptedException;
public interface JxpacketAsyncHandler<T> extends Handler<T, Pair<PcapPktHdr, Packet>> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.ardikars.jxnet.PcapPktHdr;
import com.ardikars.jxpacket.common.Packet;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
Expand All @@ -30,15 +29,6 @@
* @author <a href="mailto:[email protected]">Ardika Rommy Sanjaya</a>
* @since 1.4.9
*/
public interface JxpacketHandler<T> {

/**
* Next available packet.
* @param argument user argument.
* @param packet a tuple of {@link PcapPktHdr} and {@link Packet}.
* @throws ExecutionException execution exception.
* @throws InterruptedException interrupted exception.
*/
void next(T argument, Future<Pair<PcapPktHdr, Packet>> packet) throws ExecutionException, InterruptedException;
public interface JxpacketHandler<T> extends Handler<T, Future<Pair<PcapPktHdr, Packet>>> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.ardikars.common.tuple.Pair;
import com.ardikars.jxnet.PcapPktHdr;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.ExecutionException;

import java.util.concurrent.Future;

/**
Expand All @@ -29,15 +29,6 @@
* @author <a href="mailto:[email protected]">Ardika Rommy Sanjaya</a>
* @since 1.4.9
*/
public interface NettyBufferHandler<T> {

/**
* Next available packet.
* @param argument user argument.
* @param packet a taple of {@link PcapPktHdr} and {@link ByteBuf}.
* @throws ExecutionException execution exception.
* @throws InterruptedException interrupted exception.
*/
void next(T argument, Future<Pair<PcapPktHdr, ByteBuf>> packet) throws ExecutionException, InterruptedException;
public interface NettyBufferHandler<T> extends Handler<T, Future<Pair<PcapPktHdr, ByteBuf>>> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import com.ardikars.common.tuple.Pair;
import com.ardikars.jxnet.PcapPktHdr;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
Expand All @@ -29,15 +29,6 @@
* @author <a href="mailto:[email protected]">Ardika Rommy Sanjaya</a>
* @since 1.4.9
*/
public interface NioBufferHandler<T> {

/**
* Next available packet.
* @param argument user argument.
* @param packet a taple of {@link PcapPktHdr} and {@link ByteBuffer}.
* @throws ExecutionException execution exception.
* @throws InterruptedException interrupted exception.
*/
void next(T argument, Future<Pair<PcapPktHdr, ByteBuffer>> packet) throws ExecutionException, InterruptedException;
public interface NioBufferHandler<T> extends Handler<T, Future<Pair<PcapPktHdr, ByteBuffer>>> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public final class JxnetObjectName {
public static final String JXPACKET_HANDLER_CONFIGURATION_BEAN_NAME = "com.ardikras.jxnet.jxpacketHandlerConfiguration";
public static final String JXPACKET_ASYNC_HANDLER_CONFIGURATION_BEAN_NAME = "com.ardikras.jxnet.jxpacketAsyncHandlerConfiguration";
public static final String NETTY_BUFFER_HANDLER_CONFIGURATION_BEAN_NAME = "com.ardikras.jxnet.nettyBufferHandlerConfiguration";
public static final String NETTY_BUFFER_ASYCN_HANDLER_CONFIGURATION_BEAN_NAME = "com.ardikras.jxnet.nettyBufferAsycHandlerConfiguration";
public static final String NIO_BUFFER_HANDLER_CONFIGURATION_BEAN_NAME = "com.ardikras.jxnet.nioBufferHandlerConfiguration";
public static final String NIO_BUFFER_ASYNC_HANDLER_CONFIGURATION_BEAN_NAME = "com.ardikras.jxnet.nioBufferAsyncHandlerConfiguration";
public static final String PCAP_BUILDER_BEAN_NAME = "com.ardikras.jxnet.pcapBuilder";
public static final String JVM_BEAN_NAME = "com.ardikras.jxnet.jvm";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
*/
public enum PacketHandlerType {

JXPACKET, JXPACKET_ASYNC, NETTY_BUFFER, NIO_BUFFER
JXPACKET, JXPACKET_ASYNC, NETTY_BUFFER, NETTY_BUFFER_ASYNC, NIO_BUFFER, NIO_BUFFER_ASYNC

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,78 +17,41 @@

package com.ardikars.jxnet.spring.boot.autoconfigure.jxpacket;

import static com.ardikars.jxnet.spring.boot.autoconfigure.constant.JxnetObjectName.DATALINK_TYPE_BEAN_NAME;
import static com.ardikars.jxnet.spring.boot.autoconfigure.constant.JxnetObjectName.EXECUTOR_SERVICE_BEAN_NAME;
import static com.ardikars.jxnet.spring.boot.autoconfigure.constant.JxnetObjectName.JXPACKET_ASYNC_HANDLER_CONFIGURATION_BEAN_NAME;

import com.ardikars.common.logging.Logger;
import com.ardikars.common.logging.LoggerFactory;
import com.ardikars.common.tuple.Pair;
import com.ardikars.common.tuple.Tuple;
import com.ardikars.jxnet.DataLinkType;
import com.ardikars.jxnet.PcapHandler;
import com.ardikars.jxnet.PcapPktHdr;
import com.ardikars.jxnet.spring.boot.autoconfigure.JxpacketAsyncHandler;
import com.ardikars.jxnet.spring.boot.autoconfigure.HandlerConfigurer;
import com.ardikars.jxpacket.common.Packet;
import com.ardikars.jxpacket.common.UnknownPacket;
import com.ardikars.jxpacket.core.ethernet.Ethernet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Configuration;

@ConditionalOnClass({Packet.class, ByteBuf.class})
@Configuration(JXPACKET_ASYNC_HANDLER_CONFIGURATION_BEAN_NAME)
public class JxpacketAsyncHandlerConfiguration<T> implements PcapHandler<T> {
public class JxpacketAsyncHandlerConfiguration<T> extends HandlerConfigurer<T, Pair<PcapPktHdr, Packet>> implements PcapHandler<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(JxpacketHandlerConfiguration.class);

private final int rawDataLinkType;
private final JxpacketAsyncHandler<T> packetHandler;
private final ExecutorService executorService;

/**
* Jxpacket async handler configuration (completable future).
* @param executorService thread pool.
* @param dataLinkType datalink type.
* @param packetHandler packet handler.
*/
public JxpacketAsyncHandlerConfiguration(@Qualifier(EXECUTOR_SERVICE_BEAN_NAME) ExecutorService executorService,
@Qualifier(DATALINK_TYPE_BEAN_NAME) DataLinkType dataLinkType,
JxpacketAsyncHandler<T> packetHandler) {
this.rawDataLinkType = dataLinkType != null ? dataLinkType.getValue() : 1;
this.packetHandler = packetHandler;
this.executorService = executorService;
}

@Override
public void nextPacket(T user, PcapPktHdr h, ByteBuffer bytes) {
CompletableFuture<Pair<PcapPktHdr, Packet>> packet = CompletableFuture.supplyAsync(new Supplier<Pair<PcapPktHdr, Packet>>() {
public void nextPacket(final T user, final PcapPktHdr h, final ByteBuffer bytes) {
executorService.execute(new Runnable() {
@Override
public Pair<PcapPktHdr, Packet> get() {
ByteBuf buffer = Unpooled.wrappedBuffer(bytes);
Packet packet;
if (rawDataLinkType == 1) {
packet = Ethernet.newPacket(buffer);
} else {
packet = UnknownPacket.newPacket(buffer);
public void run() {
try {
getHandler().next(user, Tuple.of(h, decode(bytes)));
} catch (ExecutionException | InterruptedException e) {
LOGGER.warn(e);
}
return Tuple.of(h, packet);
}
}, executorService);
try {
packetHandler.next(user, packet);
} catch (ExecutionException | InterruptedException e) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(e.getMessage());
}
}
});
}


}
Loading

0 comments on commit 0577d7a

Please sign in to comment.