diff --git a/pom.xml b/pom.xml index facca52..e8af6e1 100644 --- a/pom.xml +++ b/pom.xml @@ -4,13 +4,13 @@ name.maxdeliso teflon - 1.0.7 + 1.0.8 jar teflon https://maxdeliso.name - 14 + 15 UTF-8 ${java.version} ${java.version} @@ -40,7 +40,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.8.0 + 3.8.1 ${java.version} ${java.version} diff --git a/src/main/java/name/maxdeliso/teflon/Main.java b/src/main/java/name/maxdeliso/teflon/Main.java index 7dab068..aaf1dfd 100644 --- a/src/main/java/name/maxdeliso/teflon/Main.java +++ b/src/main/java/name/maxdeliso/teflon/Main.java @@ -2,20 +2,22 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; -import java.net.SocketException; import java.net.UnknownHostException; -import java.util.Collections; import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TransferQueue; import javax.swing.JFrame; import javax.swing.JOptionPane; -import name.maxdeliso.teflon.ctx.RunContext; import name.maxdeliso.teflon.data.JsonMessageMarshaller; import name.maxdeliso.teflon.data.Message; import name.maxdeliso.teflon.data.MessageMarshaller; +import name.maxdeliso.teflon.net.InterfaceChooser; import name.maxdeliso.teflon.net.NetSelector; import name.maxdeliso.teflon.ui.MainFrame; import org.apache.logging.log4j.LogManager; @@ -26,76 +28,94 @@ class Main { private static final Gson GSON = new GsonBuilder().create(); - private static final MessageMarshaller MM = new JsonMessageMarshaller(GSON); + private static final MessageMarshaller MESSAGE_MARSHALLER = new JsonMessageMarshaller(GSON); - private static final String MCAST_ADDR = "FF02:0:0:0:0:0:0:77"; + private static final String MULTICAST_BIND_ADDRESS = "FF02:0:0:0:0:0:0:77"; private static final int UDP_PORT = 1337; private static final int BUFFER_LENGTH = 4096; - private static final AtomicReference mainFrameRef = new AtomicReference<>(); + private static final TransferQueue TRANSFER_QUEUE = new LinkedTransferQueue<>(); + + private static final UUID UUID = java.util.UUID.randomUUID(); + + private static final CompletableFuture MAIN_FRAME_F = + CompletableFuture + .supplyAsync(() -> new MainFrame(UUID, TRANSFER_QUEUE::add)) + .thenApply(mainFrame -> { + mainFrame.setVisible(true); + return mainFrame; + }); + + private static final CompletableFuture> BIND_F = + CompletableFuture + .supplyAsync(() -> + { + try { + var bindAddress = Inet6Address.getByName(MULTICAST_BIND_ADDRESS); + return Optional.ofNullable(bindAddress); + } catch (UnknownHostException uke) { + return Optional.empty(); + } + } + ); + + private static final CompletableFuture> INTERFACE_OPT_F = + CompletableFuture + .supplyAsync(InterfaceChooser::new) + .thenApply(interfaceChooser -> interfaceChooser.queryInterfaces().stream().findFirst()); + private static final CompletableFuture MAIN = + BIND_F.thenCompose(inetAddressOpt -> INTERFACE_OPT_F.thenCompose(networkInterfaceOpt -> { + var inetAddress = inetAddressOpt + .orElseThrow( + () -> new IllegalStateException("failed to bind")); + + var networkInterface = networkInterfaceOpt + .orElseThrow( + () -> new IllegalStateException("failed to locate viable network interface")); + + return MAIN_FRAME_F + .thenCompose(mainFrame -> networkLoop(mainFrame, inetAddress, networkInterface)); + })); + + private static CompletableFuture networkLoop( + MainFrame mainFrame, + InetAddress bindAddress, + NetworkInterface networkInterface) { + return CompletableFuture.supplyAsync(() -> new NetSelector(UDP_PORT, BUFFER_LENGTH, + (_address, rxBytes) -> MESSAGE_MARSHALLER + .bufferToMessage(rxBytes) + .ifPresent(mainFrame::queueMessageDisplay), + bindAddress, + networkInterface, + () -> Optional + .ofNullable(TRANSFER_QUEUE.poll()) + .map(MESSAGE_MARSHALLER::messageToBuffer) + .orElse(null) + ) + ).thenCompose(NetSelector::selectLoop); + } public static void main(String[] args) { try { - var firstViableInterface = Collections - .list(NetworkInterface.getNetworkInterfaces()) - .stream() - .filter(ni -> { - try { - return ni.isUp() && - ni.supportsMulticast() && - !ni.isLoopback() && - !ni.isPointToPoint() && - !ni.isVirtual(); - } catch (SocketException exc) { - LOG.error("failed to interrogate prospective network interface", exc); - return false; - } - }) - .findFirst() - .orElseThrow(); - - var transferQueue = new LinkedTransferQueue(); - var rc = new RunContext(); - var bindAddr = InetAddress.getByName(MCAST_ADDR); - - LOG.info("attempting bind to {} on interface {}", bindAddr, firstViableInterface); - - mainFrameRef.set(new MainFrame(rc, transferQueue::add)); - - var netSelector = new NetSelector(UDP_PORT, BUFFER_LENGTH, - (_addr, rxBytes) -> MM - .bufferToMessage(rxBytes) - .ifPresent(msg -> mainFrameRef.get().queueMessageDisplay(msg)), - bindAddr, - firstViableInterface, - () -> Optional - .ofNullable(transferQueue.poll()) - .map(MM::messageToBuffer) - .orElse(null) - ); - - mainFrameRef.get().setVisible(true); - netSelector.selectLoop(); - mainFrameRef.get().dispose(); - } catch (SocketException | UnknownHostException exc) { - var dialogFrame = new JFrame(); + LOG.info("starting up with UUID {}", UUID); + MAIN.get(); + } catch (InterruptedException ie) { + LOG.error("synchronization error", ie); + System.exit(1); + } catch (ExecutionException ee) { + LOG.error("error", ee.getCause()); + var dialogFrame = new JFrame(); JOptionPane.showMessageDialog( dialogFrame, - exc.getMessage(), - "Network Error", + ee.getCause().getMessage() + + ". Please consult the log files for more information.", + "Error", JOptionPane.ERROR_MESSAGE); - - LOG.error("network related failure", exc); dialogFrame.dispose(); - } finally { - var mainFrame = mainFrameRef.get(); - - if(mainFrame != null) { - mainFrame.dispose(); - } + System.exit(2); } } } diff --git a/src/main/java/name/maxdeliso/teflon/ctx/RunContext.java b/src/main/java/name/maxdeliso/teflon/ctx/RunContext.java deleted file mode 100644 index 841cbbf..0000000 --- a/src/main/java/name/maxdeliso/teflon/ctx/RunContext.java +++ /dev/null @@ -1,15 +0,0 @@ -package name.maxdeliso.teflon.ctx; - -import java.util.UUID; - -public class RunContext { - private final String localHostUUID; - - public RunContext() { - localHostUUID = UUID.randomUUID().toString(); - } - - public String getLocalHostUUID() { - return localHostUUID; - } -} diff --git a/src/main/java/name/maxdeliso/teflon/net/InterfaceChooser.java b/src/main/java/name/maxdeliso/teflon/net/InterfaceChooser.java new file mode 100644 index 0000000..d172698 --- /dev/null +++ b/src/main/java/name/maxdeliso/teflon/net/InterfaceChooser.java @@ -0,0 +1,35 @@ +package name.maxdeliso.teflon.net; + +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class InterfaceChooser { + private List queryAvailableInterfaces() { + try { + return Collections.list(NetworkInterface.getNetworkInterfaces()); + } catch (SocketException se) { + return new ArrayList<>(); + } + } + + public List queryInterfaces() { + return queryAvailableInterfaces() + .stream() + .filter(ni -> { + try { + return ni.isUp() && + ni.supportsMulticast() && + !ni.isLoopback() && + !ni.isPointToPoint() && + !ni.isVirtual(); + } catch (SocketException exc) { + return false; + } + }) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/name/maxdeliso/teflon/net/NetSelector.java b/src/main/java/name/maxdeliso/teflon/net/NetSelector.java index ad5c5d3..dc7b95a 100644 --- a/src/main/java/name/maxdeliso/teflon/net/NetSelector.java +++ b/src/main/java/name/maxdeliso/teflon/net/NetSelector.java @@ -5,7 +5,6 @@ import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.SocketAddress; -import java.net.SocketException; import java.net.StandardProtocolFamily; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; @@ -14,6 +13,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -21,10 +21,10 @@ import org.apache.logging.log4j.Logger; /** - * This class contains the main event selectLoop which checks in memory queues, and performs UDP sending/receiving. + * This class contains the main event selectLoop which checks in memory queues, and performs UDP + * sending/receiving. */ public class NetSelector { - private static final Logger LOG = LogManager.getLogger(NetSelector.class); private final ByteBuffer incomingBuffer; @@ -54,75 +54,85 @@ public NetSelector(final int udpPort, } /** - * Main event processing selectLoop. This function busies the calling thread with the task of continual sending - * and receiving as data arrives. + * Main event processing selectLoop. This function busies the calling thread with the task of + * continual sending and receiving as data arrives. */ - public void selectLoop() { - try (final DatagramChannel datagramChannel = setupDatagramChannel(); - final Selector datagramChanSelector = Selector.open()) { - - datagramChannel.register(datagramChanSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); - - while (alive.get() && membershipKey.isValid()) { - datagramChanSelector.select(0); - final var selectionKeySet = datagramChanSelector.selectedKeys(); - - for (final SelectionKey key : selectionKeySet) { - if (key.isReadable()) { - incomingBuffer.clear(); - var sender = datagramChannel.receive(incomingBuffer); - var receivedBytes = new byte[incomingBuffer.position()]; - incomingBuffer.rewind(); - incomingBuffer.get(receivedBytes); - - // NOTE: heavy compute in incoming consumer will add latency - incomingByteBufferConsumer.accept(sender, receivedBytes); - } - - if (key.isWritable()) { - boolean writeSucceeded = Optional.ofNullable(outgoingMessageSupplier.get()) - .map(bufferToSend -> { - try { - final var bufferLength = bufferToSend.array().length; - final var sentBytes = datagramChannel.send(bufferToSend, multicastSendSocketAddress); - LOG.debug("sent {} of {} bytes over the wire", sentBytes, bufferLength); - return bufferLength == sentBytes; - } catch (IOException exc) { - LOG.error("i/o exception while attempting to send", exc); - return false; + public CompletableFuture selectLoop() { + return setupDatagramChannel() + .thenApply(datagramChannelOpt -> { + try (final Selector datagramChanSelector = Selector.open()) { + var datagramChannel = datagramChannelOpt.orElseThrow( + () -> new IllegalStateException("failed to setup a datagram channel") + ); + datagramChannel.register(datagramChanSelector, + SelectionKey.OP_READ | SelectionKey.OP_WRITE); + + while (alive.get() && membershipKey.isValid()) { + datagramChanSelector.select(0); + final var selectionKeySet = datagramChanSelector.selectedKeys(); + + for (final SelectionKey key : selectionKeySet) { + if (key.isReadable()) { + incomingBuffer.clear(); + var sender = datagramChannel.receive(incomingBuffer); + var receivedBytes = new byte[incomingBuffer.position()]; + incomingBuffer.rewind(); + incomingBuffer.get(receivedBytes); + + // NOTE: heavy compute in incoming consumer will add latency + incomingByteBufferConsumer.accept(sender, receivedBytes); + } + + if (key.isWritable()) { + boolean writeSucceededCompletely = + Optional.ofNullable(outgoingMessageSupplier.get()) + .filter(byteBuffer -> byteBuffer.array().length > 0) + .map(bufferToSend -> { + try { + final var bufferLength = bufferToSend.array().length; + final var sentBytes = + datagramChannel.send(bufferToSend, multicastSendSocketAddress); + LOG.debug("sent {} of {} bytes over the wire", + sentBytes, + bufferLength); + return bufferLength == sentBytes; + } catch (IOException exc) { + LOG.error("i/o exception while attempting to send", exc); + return false; + } + }) + .orElse(true); + + if (!writeSucceededCompletely) { + LOG.warn("write failed at least partially"); } - }) - .orElse(false); - - if (!writeSucceeded) { - LOG.warn("write failed"); + } + } } + } catch (IOException exc) { + LOG.error("unexpected exception in main event selectLoop", exc); } - } - } - } catch (IOException exc) { - LOG.error("unexpected exception in main event selectLoop", exc); - } + return null; + }); } - private DatagramChannel setupDatagramChannel() throws IOException { - final var channel = DatagramChannel.open(StandardProtocolFamily.INET6); - - try { - channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, multicastInterface); - } catch (SocketException se) { - LOG.warn("failed to set IP_MULTICAST_IF socket option", se); - } - - channel.configureBlocking(false); - - final var udpSocket = channel.socket(); - udpSocket.setReuseAddress(true); - udpSocket.setBroadcast(true); - udpSocket.bind(multicastListenSocketAddress); - this.membershipKey = channel.join(multicastGroupAddress, multicastInterface); - - LOG.debug("joined to group {}", this.membershipKey.group()); - return channel; + private CompletableFuture> setupDatagramChannel() { + return CompletableFuture.supplyAsync(() -> { + try { + final DatagramChannel channel = DatagramChannel.open(StandardProtocolFamily.INET6); + final var udpSocket = channel.socket(); + channel.setOption(StandardSocketOptions.IP_MULTICAST_IF, multicastInterface); + channel.configureBlocking(false); + udpSocket.setReuseAddress(true); + udpSocket.setBroadcast(true); + udpSocket.bind(multicastListenSocketAddress); + this.membershipKey = channel.join(multicastGroupAddress, multicastInterface); + LOG.debug("joined to group {}", this.membershipKey.group()); + return Optional.of(channel); + } catch (IOException ioe) { + LOG.error("failed to setup a datagram channel", ioe); + return Optional.empty(); + } + }); } } diff --git a/src/main/java/name/maxdeliso/teflon/ui/MainFrame.java b/src/main/java/name/maxdeliso/teflon/ui/MainFrame.java index 4af085a..984fbb1 100644 --- a/src/main/java/name/maxdeliso/teflon/ui/MainFrame.java +++ b/src/main/java/name/maxdeliso/teflon/ui/MainFrame.java @@ -5,17 +5,15 @@ import java.awt.event.KeyEvent; import java.text.DateFormat; import java.util.Date; +import java.util.UUID; import java.util.function.Consumer; -import java.util.function.Supplier; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTextArea; import javax.swing.JTextField; import javax.swing.SwingUtilities; -import name.maxdeliso.teflon.ctx.RunContext; import name.maxdeliso.teflon.data.Message; -import name.maxdeliso.teflon.net.NetSelector; /** * A JFrame, which will present a UI through the native windowing system on supported OSes. @@ -36,13 +34,12 @@ public class MainFrame extends JFrame { private final JTextField inputTextField = new JTextField(); private final DateFormat dateFormat = DateFormat.getInstance(); - private final RunContext runContext; + private final UUID uuid; private final Consumer messageConsumer; - private Supplier netSelectorSupplier; - public MainFrame(final RunContext runContext, + public MainFrame(final UUID uuid, final Consumer messageConsumer) { - this.runContext = runContext; + this.uuid = uuid; this.messageConsumer = messageConsumer; buildFrame(); } @@ -61,7 +58,7 @@ private void buildFrame() { @Override public void keyReleased(final KeyEvent ke) { if (ke.getKeyCode() == KeyEvent.VK_ENTER) { - var outgoing = new Message(runContext.getLocalHostUUID(), inputTextField.getText()); + var outgoing = new Message(uuid.toString(), inputTextField.getText()); messageConsumer.accept(outgoing); inputTextField.setText(""); } diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..81ed829 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,20 @@ + + + + + + %d %p %c{1.} [%t] %m%n + + + + + + + + + + + + + \ No newline at end of file