Skip to content

Commit

Permalink
Some more updates
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberdelia committed Aug 9, 2020
1 parent c0f12ae commit ea871f8
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 63 deletions.
7 changes: 3 additions & 4 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
`java-library`
`maven-publish`

id("org.jmailen.kotlinter") version "2.3.2"
id("org.jmailen.kotlinter") version "2.4.1"
id("org.jetbrains.dokka") version "0.10.1"
}

Expand All @@ -19,17 +19,16 @@ repositories {
dependencies {
val kotlinVersion = "1.3.72"
implementation(kotlin("stdlib-jdk8", kotlinVersion))
implementation(kotlin("reflect", kotlinVersion))

implementation(kotlin("test", kotlinVersion))
implementation(kotlin("test-junit", kotlinVersion))

val coroutineVersion = "1.3.7"
val coroutineVersion = "1.3.8"
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutineVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$coroutineVersion")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutineVersion")

val awsVersion = "2.13.23"
val awsVersion = "[2.13,2.14["
implementation("software.amazon.awssdk:s3:$awsVersion")
testImplementation("software.amazon.awssdk:sts:$awsVersion")

Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/com/lapanthere/signals/S3InputStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public class S3InputStream(
.bucket(bucket)
.key(key)
.range("bytes=$begin-$end")
.build(), AsyncResponseTransformer.toBytes()
.build(),
AsyncResponseTransformer.toBytes()
).await().asInputStream()
}
}.toMutableList()
Expand Down
35 changes: 19 additions & 16 deletions src/main/kotlin/com/lapanthere/signals/S3OutputStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,26 @@ public class S3OutputStream(
semaphore.acquire()
partSize.next()
val part = Part(uploadID, parts.size + 1, digest, buffer)
parts.add(scope.async(CoroutineName("part-${part.partNumber}")) {
val response = s3.uploadPart(
UploadPartRequest.builder()
.bucket(bucket)
.key(key)
.partNumber(part.partNumber)
.uploadId(uploadID)
.contentMD5(part.contentMD5)
.contentLength(part.buffer.size.toLong())
.build(), AsyncRequestBody.fromBytes(part.buffer)
).await()
if (response.eTag != part.eTag) {
throw IOException("mismatching checksum: ${response.eTag} != ${part.eTag}")
parts.add(
scope.async(CoroutineName("part-${part.partNumber}")) {
val response = s3.uploadPart(
UploadPartRequest.builder()
.bucket(bucket)
.key(key)
.partNumber(part.partNumber)
.uploadId(uploadID)
.contentMD5(part.contentMD5)
.contentLength(part.buffer.size.toLong())
.build(),
AsyncRequestBody.fromBytes(part.buffer)
).await()
if (response.eTag != part.eTag) {
throw IOException("mismatching checksum: ${response.eTag} != ${part.eTag}")
}
semaphore.release()
part.toCompletedPart()
}
semaphore.release()
part.toCompletedPart()
})
)
buffer.reset()
}

Expand Down
15 changes: 10 additions & 5 deletions src/test/kotlin/com/lapanthere/signals/S3InputStreamTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class S3InputStreamTest {
.bucket(bucket)
.key(key)
.range("bytes=0-5242879")
.build(), any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>()
.build(),
any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>()
)
} returns CompletableFuture.completedFuture(
ResponseBytes.fromByteArray(GetObjectResponse.builder().build(), ByteArray(32))
Expand All @@ -49,7 +50,8 @@ class S3InputStreamTest {
.bucket(bucket)
.key(key)
.range("bytes=5242880-10489185")
.build(), any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>()
.build(),
any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>()
)
} returns CompletableFuture.completedFuture(
ResponseBytes.fromByteArray(GetObjectResponse.builder().build(), ByteArray(32))
Expand Down Expand Up @@ -77,7 +79,8 @@ class S3InputStreamTest {
.bucket(bucket)
.key(key)
.range("bytes=0-5242879")
.build(), any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>()
.build(),
any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>()
)
}
verify(exactly = 1) {
Expand All @@ -86,7 +89,8 @@ class S3InputStreamTest {
.bucket(bucket)
.key(key)
.range("bytes=5242880-10489185")
.build(), any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>()
.build(),
any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>()
)
}
}
Expand All @@ -99,7 +103,8 @@ class S3InputStreamTest {
.bucket(bucket)
.key(key)
.range("bytes=0-5242879")
.build(), any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>()
.build(),
any<ByteArrayAsyncResponseTransformer<GetObjectResponse>>()
)
} throws SdkClientException.create("read timeout")

Expand Down
92 changes: 55 additions & 37 deletions src/test/kotlin/com/lapanthere/signals/S3OutputStreamTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class S3OutputStreamTest {
.contentLength(32)
.contentMD5("cLyPS3KoaSFGi/joRB3OUQ==")
.partNumber(1)
.build(), any<AsyncRequestBody>()
.build(),
any<AsyncRequestBody>()
)
} returns CompletableFuture.completedFuture(
UploadPartResponse.builder()
Expand Down Expand Up @@ -103,29 +104,40 @@ class S3OutputStreamTest {
}

verify {
s3.createMultipartUpload(CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.build())
s3.uploadPart(UploadPartRequest.builder()
.bucket(bucket)
.key(key)
.contentLength(32)
.contentMD5("cLyPS3KoaSFGi/joRB3OUQ==")
.partNumber(1)
.uploadId(uploadID)
.build(), any<AsyncRequestBody>())
s3.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.uploadId(uploadID)
.multipartUpload(CompletedMultipartUpload.builder()
.parts(CompletedPart.builder()
.eTag("70bc8f4b72a86921468bf8e8441dce51")
.partNumber(1)
.build())
.build())
.build())
s3.createMultipartUpload(
CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.build()
)
s3.uploadPart(
UploadPartRequest.builder()
.bucket(bucket)
.key(key)
.contentLength(32)
.contentMD5("cLyPS3KoaSFGi/joRB3OUQ==")
.partNumber(1)
.uploadId(uploadID)
.build(),
any<AsyncRequestBody>()
)
s3.completeMultipartUpload(
CompleteMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.uploadId(uploadID)
.multipartUpload(
CompletedMultipartUpload.builder()
.parts(
CompletedPart.builder()
.eTag("70bc8f4b72a86921468bf8e8441dce51")
.partNumber(1)
.build()
)
.build()
)
.build()
)
}

verify(exactly = 0) {
Expand All @@ -152,7 +164,8 @@ class S3OutputStreamTest {
.contentLength(32)
.contentMD5("cLyPS3KoaSFGi/joRB3OUQ==")
.partNumber(1)
.build(), any<AsyncRequestBody>()
.build(),
any<AsyncRequestBody>()
)
} throws SdkClientException.create("write timeout")

Expand All @@ -165,18 +178,23 @@ class S3OutputStreamTest {
}

verify {
s3.createMultipartUpload(CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.build())
s3.uploadPart(UploadPartRequest.builder()
.bucket(bucket)
.key(key)
.contentLength(32)
.contentMD5("cLyPS3KoaSFGi/joRB3OUQ==")
.partNumber(1)
.uploadId(uploadID)
.build(), any<AsyncRequestBody>())
s3.createMultipartUpload(
CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.build()
)
s3.uploadPart(
UploadPartRequest.builder()
.bucket(bucket)
.key(key)
.contentLength(32)
.contentMD5("cLyPS3KoaSFGi/joRB3OUQ==")
.partNumber(1)
.uploadId(uploadID)
.build(),
any<AsyncRequestBody>()
)
}

verify(exactly = 1) {
Expand Down

0 comments on commit ea871f8

Please sign in to comment.