diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/FixArchivePrinter.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/FixArchivePrinter.java index 0cb367ade9..372dffa7ee 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/FixArchivePrinter.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/FixArchivePrinter.java @@ -30,6 +30,7 @@ import uk.co.real_logic.artio.messages.FixPProtocolType; import uk.co.real_logic.artio.messages.MessageStatus; +import java.io.PrintStream; import java.util.function.Predicate; import static java.lang.Long.parseLong; @@ -50,7 +51,7 @@ public final class FixArchivePrinter { public static void main(final String[] args) { - new FixArchivePrinter().scan(args); + new FixArchivePrinter(System.out, System.err).scan(args); } private FixPProtocolType fixPProtocolType = FixPProtocolType.ILINK_3; @@ -63,10 +64,19 @@ public static void main(final String[] args) private FixMessagePredicate predicate = FixMessagePredicates.alwaysTrue(); private boolean follow = false; private boolean fixp = false; + private char delimiter = Character.MIN_VALUE; private Class fixDictionaryType = null; private Predicate headerPredicate = null; + private final PrintStream out; + private final PrintStream err; - private void scan(final String[] args) + public FixArchivePrinter(final PrintStream out, final PrintStream err) + { + this.out = out; + this.err = err; + } + + public void scan(final String[] args) { parseArgs(args); validateArgs(); @@ -192,6 +202,10 @@ private void parseArgs(final String[] args) case "log-file-dir": logFileDir = optionValue; break; + + case "delimiter": + delimiter = optionValue.charAt(0); + break; } } } @@ -212,17 +226,17 @@ private void validateArgs() requiredArgument(aeronChannel, "aeron-channel"); } - private static void requiredArgument(final int eqIndex) + private void requiredArgument(final int eqIndex) { if (eqIndex == -1) { - System.err.println("--ilink, --help and --follow are the only options that don't take a value"); + err.println("--ilink, --help and --follow are the only options that don't take a value"); printHelp(); System.exit(-1); } } - private static void scanArchive( + private void scanArchive( final String aeronDirectoryName, final String aeronChannel, final IntHashSet queryStreamIds, @@ -252,31 +266,31 @@ private static void scanArchive( try (FixArchiveScanner scanner = new FixArchiveScanner(configuration)) { - System.out.println("Starting Scan ... "); + out.println("Starting Scan ... "); scanner.scan( aeronChannel, queryStreamIds, - filterBy(FixArchivePrinter::print, predicate), + filterBy(this::print, predicate), new LazyFixPMessagePrinter(DEFAULT_INBOUND_LIBRARY_STREAM, fixPProtocolType), follow, archiveScannerStreamId); } } - private static void requiredArgument(final String argument, final String description) + private void requiredArgument(final String argument, final String description) { if (argument == null) { - System.err.printf("Missing required --%s argument%n", description); + err.printf("Missing required --%s argument%n", description); printHelp(); System.exit(-1); } } - private static void printHelp() + private void printHelp() { - System.out.println("FixArchivePrinter Options"); - System.out.println("All options are specified in the form: --optionName=optionValue"); + out.println("FixArchivePrinter Options"); + out.println("All options are specified in the form: --optionName=optionValue"); printOption( "aeron-dir-name", @@ -366,11 +380,15 @@ private static void printHelp() "Specifies a logFileDir option, this should be the same as provided to your EngineConfiguration." + " This can be used to optimize scans that are time based", false); + printOption( + "delimiter", + "Specifies the character to replace the binary delimiter with", + false); } - private static void printOption(final String name, final String description, final boolean required) + private void printOption(final String name, final String description, final boolean required) { - System.out.printf(" --%-20s [%s] - %s%n", name, required ? "required" : "optional", description); + out.printf(" --%-20s [%s] - %s%n", name, required ? "required" : "optional", description); } private static Predicate safeAnd(final Predicate left, final Predicate right) @@ -378,7 +396,7 @@ private static Predicate safeAnd(final Predicate left, final Predicate return left == null ? right : left.and(right); } - private static void print( + private void print( final FixMessageDecoder message, final DirectBuffer buffer, final int offset, @@ -387,8 +405,8 @@ private static void print( { final MessageStatus status = message.status(); final long timestamp = message.timestamp(); - final String body = message.body(); - System.out.printf("%1$20s: %2$s (%3$s)%n", timestamp, body, status); + final String body = + delimiter != Character.MIN_VALUE ? message.body().replace('\u0001', delimiter) : message.body(); + out.printf("%1$20s: %2$s (%3$s)%n", timestamp, body, status); } - } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/ArchivePrinterIntegrationTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/ArchivePrinterIntegrationTest.java new file mode 100644 index 0000000000..b5f80fc50d --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/ArchivePrinterIntegrationTest.java @@ -0,0 +1,112 @@ +/* + * Copyright 2015-2024 Real Logic Limited, Adaptive Financial Consulting Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.co.real_logic.artio.system_tests; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.ACCEPTOR_LOGS; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingLibraryConfig; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.delete; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.launchInitiatingEngine; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.newInitiatingLibrary; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import org.junit.Before; +import org.junit.Test; + +import io.aeron.Aeron; +import uk.co.real_logic.artio.engine.EngineConfiguration; +import uk.co.real_logic.artio.engine.logger.FixArchivePrinter; +import uk.co.real_logic.artio.library.LibraryConfiguration; + +public class ArchivePrinterIntegrationTest extends AbstractGatewayToGatewaySystemTest +{ + private final FakeConnectHandler fakeConnectHandler = new FakeConnectHandler(); + + @Before + public void launch() + { + delete(ACCEPTOR_LOGS); + + mediaDriver = launchMediaDriver(); + + launchAcceptingEngine(); + initiatingEngine = launchInitiatingEngine(libraryAeronPort, nanoClock); + + final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock); + acceptingLibraryConfig.libraryConnectHandler(fakeConnectHandler); + acceptingLibrary = connect(acceptingLibraryConfig); + initiatingLibrary = newInitiatingLibrary(libraryAeronPort, initiatingHandler, nanoClock); + testSystem = new TestSystem(acceptingLibrary, initiatingLibrary); + + connectSessions(); + } + + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldUseDefaultDelimiter() + { + setupAndExchangeMessages(); + + final EngineConfiguration configuration = acceptingEngine.configuration(); + final Aeron.Context context = configuration.aeronContext(); + final ByteArrayOutputStream outputBytes = new ByteArrayOutputStream(); + final FixArchivePrinter fixArchivePrinter = new FixArchivePrinter(new PrintStream(outputBytes), System.err); + final String[] args = new String[] { + "--aeron-channel=" + configuration.libraryAeronChannel(), + "--log-file-dir=" + configuration.logFileDir(), + "--aeron-dir-name=" + context.aeronDirectory().getAbsolutePath(), + "--query-stream-id=" + configuration.outboundLibraryStream() + }; + + fixArchivePrinter.scan(args); + + assertThat(outputBytes.toString(), containsString("\u0001112=hi")); + } + + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void canUseSpecifiedCharAsDelimiter() + { + setupAndExchangeMessages(); + + final EngineConfiguration configuration = acceptingEngine.configuration(); + final Aeron.Context context = configuration.aeronContext(); + final ByteArrayOutputStream outputBytes = new ByteArrayOutputStream(); + final FixArchivePrinter fixArchivePrinter = new FixArchivePrinter(new PrintStream(outputBytes), System.err); + final String[] args = new String[] { + "--delimiter=|", + "--aeron-channel=" + configuration.libraryAeronChannel(), + "--log-file-dir=" + configuration.logFileDir(), + "--aeron-dir-name=" + context.aeronDirectory().getAbsolutePath(), + "--query-stream-id=" + configuration.outboundLibraryStream() + }; + + fixArchivePrinter.scan(args); + + assertThat(outputBytes.toString(), containsString("|112=hi")); + } + + private void setupAndExchangeMessages() + { + messagesCanBeExchanged(); + + assertInitiatingSequenceIndexIs(0); + } +}