Skip to content

Commit

Permalink
Add --delimiter option to FixArchivePrinter (#509)
Browse files Browse the repository at this point in the history
* Add option to replace binary separator with pipe

* replace binary delimiter with any character
  • Loading branch information
Nadia-Adaptive authored Jun 12, 2024
1 parent 61f23b0 commit de96fa2
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import uk.co.real_logic.artio.messages.FixPProtocolType;
import uk.co.real_logic.artio.messages.MessageStatus;

import java.io.PrintStream;
import java.util.function.Predicate;

import static java.lang.Long.parseLong;
Expand All @@ -50,7 +51,7 @@ public final class FixArchivePrinter
{
public static void main(final String[] args)
{
new FixArchivePrinter().scan(args);
new FixArchivePrinter(System.out, System.err).scan(args);
}

private FixPProtocolType fixPProtocolType = FixPProtocolType.ILINK_3;
Expand All @@ -63,10 +64,19 @@ public static void main(final String[] args)
private FixMessagePredicate predicate = FixMessagePredicates.alwaysTrue();
private boolean follow = false;
private boolean fixp = false;
private char delimiter = Character.MIN_VALUE;
private Class<? extends FixDictionary> fixDictionaryType = null;
private Predicate<SessionHeaderDecoder> headerPredicate = null;
private final PrintStream out;
private final PrintStream err;

private void scan(final String[] args)
public FixArchivePrinter(final PrintStream out, final PrintStream err)
{
this.out = out;
this.err = err;
}

public void scan(final String[] args)
{
parseArgs(args);
validateArgs();
Expand Down Expand Up @@ -192,6 +202,10 @@ private void parseArgs(final String[] args)
case "log-file-dir":
logFileDir = optionValue;
break;

case "delimiter":
delimiter = optionValue.charAt(0);
break;
}
}
}
Expand All @@ -212,17 +226,17 @@ private void validateArgs()
requiredArgument(aeronChannel, "aeron-channel");
}

private static void requiredArgument(final int eqIndex)
private void requiredArgument(final int eqIndex)
{
if (eqIndex == -1)
{
System.err.println("--ilink, --help and --follow are the only options that don't take a value");
err.println("--ilink, --help and --follow are the only options that don't take a value");
printHelp();
System.exit(-1);
}
}

private static void scanArchive(
private void scanArchive(
final String aeronDirectoryName,
final String aeronChannel,
final IntHashSet queryStreamIds,
Expand Down Expand Up @@ -252,31 +266,31 @@ private static void scanArchive(

try (FixArchiveScanner scanner = new FixArchiveScanner(configuration))
{
System.out.println("Starting Scan ... ");
out.println("Starting Scan ... ");
scanner.scan(
aeronChannel,
queryStreamIds,
filterBy(FixArchivePrinter::print, predicate),
filterBy(this::print, predicate),
new LazyFixPMessagePrinter(DEFAULT_INBOUND_LIBRARY_STREAM, fixPProtocolType),
follow,
archiveScannerStreamId);
}
}

private static void requiredArgument(final String argument, final String description)
private void requiredArgument(final String argument, final String description)
{
if (argument == null)
{
System.err.printf("Missing required --%s argument%n", description);
err.printf("Missing required --%s argument%n", description);
printHelp();
System.exit(-1);
}
}

private static void printHelp()
private void printHelp()
{
System.out.println("FixArchivePrinter Options");
System.out.println("All options are specified in the form: --optionName=optionValue");
out.println("FixArchivePrinter Options");
out.println("All options are specified in the form: --optionName=optionValue");

printOption(
"aeron-dir-name",
Expand Down Expand Up @@ -366,19 +380,23 @@ private static void printHelp()
"Specifies a logFileDir option, this should be the same as provided to your EngineConfiguration." +
" This can be used to optimize scans that are time based",
false);
printOption(
"delimiter",
"Specifies the character to replace the binary delimiter with",
false);
}

private static void printOption(final String name, final String description, final boolean required)
private void printOption(final String name, final String description, final boolean required)
{
System.out.printf(" --%-20s [%s] - %s%n", name, required ? "required" : "optional", description);
out.printf(" --%-20s [%s] - %s%n", name, required ? "required" : "optional", description);
}

private static <T> Predicate<T> safeAnd(final Predicate<T> left, final Predicate<T> right)
{
return left == null ? right : left.and(right);
}

private static void print(
private void print(
final FixMessageDecoder message,
final DirectBuffer buffer,
final int offset,
Expand All @@ -387,8 +405,8 @@ private static void print(
{
final MessageStatus status = message.status();
final long timestamp = message.timestamp();
final String body = message.body();
System.out.printf("%1$20s: %2$s (%3$s)%n", timestamp, body, status);
final String body =
delimiter != Character.MIN_VALUE ? message.body().replace('\u0001', delimiter) : message.body();
out.printf("%1$20s: %2$s (%3$s)%n", timestamp, body, status);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2015-2024 Real Logic Limited, Adaptive Financial Consulting Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.co.real_logic.artio.system_tests;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.ACCEPTOR_LOGS;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingLibraryConfig;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.delete;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.launchInitiatingEngine;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.newInitiatingLibrary;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;

import org.junit.Before;
import org.junit.Test;

import io.aeron.Aeron;
import uk.co.real_logic.artio.engine.EngineConfiguration;
import uk.co.real_logic.artio.engine.logger.FixArchivePrinter;
import uk.co.real_logic.artio.library.LibraryConfiguration;

public class ArchivePrinterIntegrationTest extends AbstractGatewayToGatewaySystemTest
{
private final FakeConnectHandler fakeConnectHandler = new FakeConnectHandler();

@Before
public void launch()
{
delete(ACCEPTOR_LOGS);

mediaDriver = launchMediaDriver();

launchAcceptingEngine();
initiatingEngine = launchInitiatingEngine(libraryAeronPort, nanoClock);

final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock);
acceptingLibraryConfig.libraryConnectHandler(fakeConnectHandler);
acceptingLibrary = connect(acceptingLibraryConfig);
initiatingLibrary = newInitiatingLibrary(libraryAeronPort, initiatingHandler, nanoClock);
testSystem = new TestSystem(acceptingLibrary, initiatingLibrary);

connectSessions();
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldUseDefaultDelimiter()
{
setupAndExchangeMessages();

final EngineConfiguration configuration = acceptingEngine.configuration();
final Aeron.Context context = configuration.aeronContext();
final ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
final FixArchivePrinter fixArchivePrinter = new FixArchivePrinter(new PrintStream(outputBytes), System.err);
final String[] args = new String[] {
"--aeron-channel=" + configuration.libraryAeronChannel(),
"--log-file-dir=" + configuration.logFileDir(),
"--aeron-dir-name=" + context.aeronDirectory().getAbsolutePath(),
"--query-stream-id=" + configuration.outboundLibraryStream()
};

fixArchivePrinter.scan(args);

assertThat(outputBytes.toString(), containsString("\u0001112=hi"));
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void canUseSpecifiedCharAsDelimiter()
{
setupAndExchangeMessages();

final EngineConfiguration configuration = acceptingEngine.configuration();
final Aeron.Context context = configuration.aeronContext();
final ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
final FixArchivePrinter fixArchivePrinter = new FixArchivePrinter(new PrintStream(outputBytes), System.err);
final String[] args = new String[] {
"--delimiter=|",
"--aeron-channel=" + configuration.libraryAeronChannel(),
"--log-file-dir=" + configuration.logFileDir(),
"--aeron-dir-name=" + context.aeronDirectory().getAbsolutePath(),
"--query-stream-id=" + configuration.outboundLibraryStream()
};

fixArchivePrinter.scan(args);

assertThat(outputBytes.toString(), containsString("|112=hi"));
}

private void setupAndExchangeMessages()
{
messagesCanBeExchanged();

assertInitiatingSequenceIndexIs(0);
}
}

0 comments on commit de96fa2

Please sign in to comment.