Skip to content

Commit

Permalink
Adds a StreamInterceptor interface to allow users to plug in custom i…
Browse files Browse the repository at this point in the history
…nterceptors for formats like Zstd.
  • Loading branch information
tgregg committed Dec 5, 2024
1 parent fbb3fbf commit 4436c60
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 63 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dependencies {
testImplementation("org.hamcrest:hamcrest:2.2")
testImplementation("pl.pragmatists:JUnitParams:1.1.1")
testImplementation("com.google.code.tempus-fugit:tempus-fugit:1.1")
testImplementation("com.github.luben:zstd-jni:1.5.6-5")
}

group = "com.amazon.ion"
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/com/amazon/ion/GZIPStreamInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion;

import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;

/**
* The interceptor for GZIP streams.
*/
public enum GZIPStreamInterceptor implements StreamInterceptor {

INSTANCE;

private static final byte[] GZIP_HEADER = {0x1F, (byte) 0x8B};

@Override
public String formatName() {
return "GZIP";
}

@Override
public int headerLength() {
return GZIP_HEADER.length;
}

@Override
public boolean matchesHeader(byte[] candidate, int offset, int length) {
if (candidate == null || length < GZIP_HEADER.length) {
return false;
}

for (int i = 0; i < GZIP_HEADER.length; i++) {
if (GZIP_HEADER[i] != candidate[offset + i]) {
return false;
}
}
return true;
}

@Override
public InputStream newInputStream(InputStream interceptedStream) throws IOException {
return new GZIPInputStream(interceptedStream);
}
}
48 changes: 48 additions & 0 deletions src/main/java/com/amazon/ion/StreamInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion;

import com.amazon.ion.system.IonReaderBuilder;

import java.io.IOException;
import java.io.InputStream;

/**
* An interceptor to be consulted by the {@link com.amazon.ion.system.IonReaderBuilder} when creating an
* {@link IonReader} over a user-provided stream. This allows users to configure a sequence of interceptors
* to allow transformation of the stream's raw bytes into valid text or binary Ion.
*
* @see com.amazon.ion.system.IonReaderBuilder#addStreamInterceptor(StreamInterceptor)
* @see com.amazon.ion.system.IonSystemBuilder#withReaderBuilder(IonReaderBuilder)
*/
public interface StreamInterceptor {

/**
* The name of the format the interceptor recognizes.
* @return a constant String.
*/
String formatName();

/**
* The length of the byte header that identifies streams in this format.
* @return the length in bytes.
*/
int headerLength();

/**
* Determines whether the given candidate byte sequence matches this format.
* @param candidate the candidate byte sequence.
* @param offset the offset into the candidate bytes to begin matching.
* @param length the number of bytes (beginning at 'offset') in the candidate byte sequence.
* @return true if the candidate byte sequence matches; otherwise, false.
*/
boolean matchesHeader(byte[] candidate, int offset, int length);

/**
* Creates a new InputStream that transforms the bytes in the given InputStream into valid text or binary Ion.
* @param interceptedStream the stream containing bytes in this format.
* @return a new InputStream.
* @throws IOException if thrown when constructing the new InputStream.
*/
InputStream newInputStream(InputStream interceptedStream) throws IOException;
}
65 changes: 32 additions & 33 deletions src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package com.amazon.ion.impl;

import com.amazon.ion.IonCatalog;
import com.amazon.ion.IonException;
import com.amazon.ion.IonReader;
import com.amazon.ion.IonTextReader;
import com.amazon.ion.IonValue;
import com.amazon.ion.StreamInterceptor;
import com.amazon.ion.system.IonReaderBuilder;
import com.amazon.ion.util.IonStreamUtils;

Expand All @@ -16,7 +16,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.util.zip.GZIPInputStream;

import static com.amazon.ion.impl.LocalSymbolTable.DEFAULT_LST_FACTORY;
import static com.amazon.ion.impl._Private_IonReaderFactory.makeReader;
Expand Down Expand Up @@ -200,16 +199,18 @@ static IonReader buildReader(
IonReaderFromBytesFactoryBinary binary,
IonReaderFromBytesFactoryText text
) {
if (IonStreamUtils.isGzip(ionData, offset, length)) {
try {
return buildReader(
builder,
new GZIPInputStream(new ByteArrayInputStream(ionData, offset, length)),
_Private_IonReaderFactory::makeReaderBinary,
_Private_IonReaderFactory::makeReaderText
);
} catch (IOException e) {
throw new IonException(e);
for (StreamInterceptor streamInterceptor : builder.getStreamInterceptors()) {
if (streamInterceptor.matchesHeader(ionData, offset, length)) {
try {
return buildReader(
builder,
streamInterceptor.newInputStream(new ByteArrayInputStream(ionData, offset, length)),
_Private_IonReaderFactory::makeReaderBinary,
_Private_IonReaderFactory::makeReaderText
);
} catch (IOException e) {
throw new IonException(e);
}
}
}
if (IonStreamUtils.isIonBinary(ionData, offset, length)) {
Expand Down Expand Up @@ -247,15 +248,6 @@ private static boolean startsWithIvm(byte[] buffer, int length) {
return true;
}

static final byte[] GZIP_HEADER = {0x1F, (byte) 0x8B};

private static boolean startsWithGzipHeader(byte[] buffer, int length) {
if (length >= GZIP_HEADER.length) {
return buffer[0] == GZIP_HEADER[0] && buffer[1] == GZIP_HEADER[1];
}
return false;
}

@FunctionalInterface
interface IonReaderFromInputStreamFactoryText {
IonReader makeReader(IonCatalog catalog, InputStream source, _Private_LocalSymbolTableFactory lstFactory);
Expand All @@ -275,11 +267,15 @@ static IonReader buildReader(
if (source == null) {
throw new NullPointerException("Cannot build a reader from a null InputStream.");
}
int maxHeaderLength = Math.max(
_Private_IonConstants.BINARY_VERSION_MARKER_SIZE,
builder.getStreamInterceptors().stream().mapToInt(StreamInterceptor::headerLength).max().orElse(0)
);
// Note: this can create a lot of layers of InputStream wrappers. For example, if this method is called
// from build(byte[]) and the bytes contain GZIP, the chain will be SequenceInputStream(ByteArrayInputStream,
// GZIPInputStream -> PushbackInputStream -> ByteArrayInputStream). If this creates a drag on efficiency,
// alternatives should be evaluated.
byte[] possibleIVM = new byte[_Private_IonConstants.BINARY_VERSION_MARKER_SIZE];
byte[] possibleIVM = new byte[maxHeaderLength];
InputStream ionData = source;
int bytesRead;
try {
Expand All @@ -296,19 +292,22 @@ static IonReader buildReader(
// stream will always be empty (in which case it doesn't matter whether a text or binary reader is used)
// or it's a binary stream (in which case the correct reader was created) or it's a growing text stream
// (which has always been unsupported).
if (startsWithGzipHeader(possibleIVM, bytesRead)) {
try {
ionData = new GZIPInputStream(
new TwoElementSequenceInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData)
);
for (StreamInterceptor streamInterceptor : builder.getStreamInterceptors()) {
if (streamInterceptor.matchesHeader(possibleIVM, 0, bytesRead)) {
try {
bytesRead = ionData.read(possibleIVM);
} catch (EOFException e) {
// Only a GZIP header was available, so this may be a binary Ion stream.
bytesRead = 0;
ionData = streamInterceptor.newInputStream(
new TwoElementSequenceInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData)
);
try {
bytesRead = ionData.read(possibleIVM);
} catch (EOFException e) {
// Only a compression format header was available, so this may be a binary Ion stream.
bytesRead = 0;
}
} catch (IOException e) {
throw new IonException(e);
}
} catch (IOException e) {
throw new IonException(e);
break;
}
}
if (startsWithIvm(possibleIVM, bytesRead)) {
Expand Down
48 changes: 33 additions & 15 deletions src/main/java/com/amazon/ion/system/IonReaderBuilder.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,8 @@
/*
* Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion.system;

import com.amazon.ion.GZIPStreamInterceptor;
import com.amazon.ion.IonBufferConfiguration;
import com.amazon.ion.IonCatalog;
import com.amazon.ion.IonException;
Expand All @@ -23,11 +11,15 @@
import com.amazon.ion.IonSystem;
import com.amazon.ion.IonTextReader;
import com.amazon.ion.IonValue;
import com.amazon.ion.StreamInterceptor;
import com.amazon.ion.impl._Private_IonReaderBuilder;

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Build a new {@link IonReader} from the given {@link IonCatalog} and data
Expand All @@ -45,6 +37,7 @@ public abstract class IonReaderBuilder
private IonCatalog catalog = null;
private boolean isIncrementalReadingEnabled = false;
private IonBufferConfiguration bufferConfiguration = IonBufferConfiguration.DEFAULT;
private List<StreamInterceptor> streamInterceptors = new ArrayList<>(Collections.singletonList(GZIPStreamInterceptor.INSTANCE));

protected IonReaderBuilder()
{
Expand All @@ -55,6 +48,7 @@ protected IonReaderBuilder(IonReaderBuilder that)
this.catalog = that.catalog;
this.isIncrementalReadingEnabled = that.isIncrementalReadingEnabled;
this.bufferConfiguration = that.bufferConfiguration;
this.streamInterceptors = new ArrayList<>(that.streamInterceptors);
}

/**
Expand Down Expand Up @@ -263,6 +257,30 @@ public IonBufferConfiguration getBufferConfiguration() {
return bufferConfiguration;
}

/**
* Adds a {@link StreamInterceptor} to the end of the list that the builder will apply
* in order to each stream before creating {@link IonReader} instances over that stream.
* {@link GZIPStreamInterceptor} is always consulted first, and need not be added.
*
* @param streamInterceptor the stream interceptor to add.
*
* @return this builder instance, if mutable;
* otherwise a mutable copy of this builder.
*/
public IonReaderBuilder addStreamInterceptor(StreamInterceptor streamInterceptor) {
IonReaderBuilder b = mutable();
b.streamInterceptors.add(streamInterceptor);
return b;
}

/**
* @see #addStreamInterceptor(StreamInterceptor)
* @return an unmodifiable view of the stream interceptors currently configured.
*/
public List<StreamInterceptor> getStreamInterceptors() {
return Collections.unmodifiableList(streamInterceptors);
}

/**
* Based on the builder's configuration properties, creates a new IonReader
* instance over the given block of Ion data, detecting whether it's text or
Expand Down
30 changes: 15 additions & 15 deletions src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/*
* Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion.system;

import static com.amazon.ion.TestUtils.gzippedBytes;
Expand All @@ -26,18 +13,21 @@
import static org.junit.Assert.fail;

import com.amazon.ion.BitUtils;
import com.amazon.ion.GZIPStreamInterceptor;
import com.amazon.ion.IonBufferConfiguration;
import com.amazon.ion.IonCatalog;
import com.amazon.ion.IonException;
import com.amazon.ion.IonReader;
import com.amazon.ion.IonType;
import com.amazon.ion.IonWriter;
import com.amazon.ion.StreamInterceptor;
import com.amazon.ion.impl.ResizingPipedInputStream;
import com.amazon.ion.impl._Private_IonBinaryWriterBuilder;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.zip.GZIPOutputStream;

import com.amazon.ion.impl._Private_IonConstants;
Expand Down Expand Up @@ -234,4 +224,14 @@ public void incompleteIvmFailsCleanly(boolean isIncremental) throws Exception {
reader.close();
}

@Test
public void gzipInterceptorEnabledByDefault() {
IonReaderBuilder builder = IonReaderBuilder.standard();
List<StreamInterceptor> interceptors = builder.getStreamInterceptors();
assertEquals(1, interceptors.size());
assertEquals(GZIPStreamInterceptor.INSTANCE.formatName(), interceptors.get(0).formatName());
// The list returned from IonReaderBuilder.getStreamInterceptors() is unmodifiable.
assertThrows(UnsupportedOperationException.class, () -> interceptors.add(GZIPStreamInterceptor.INSTANCE));
}

}
Loading

0 comments on commit 4436c60

Please sign in to comment.