diff --git a/fs2-aws-s3/src/main/scala/fs2/aws/s3/S3.scala b/fs2-aws-s3/src/main/scala/fs2/aws/s3/S3.scala index 5af07eba..0af599f0 100644 --- a/fs2-aws-s3/src/main/scala/fs2/aws/s3/S3.scala +++ b/fs2-aws-s3/src/main/scala/fs2/aws/s3/S3.scala @@ -1,7 +1,7 @@ package fs2.aws.s3 -import cats.effect.* -import cats.implicits.* +import cats.effect.Concurrent +import cats.syntax.all.* import cats.{Applicative, ApplicativeThrow, ~>} import eu.timepit.refined.auto.* import fs2.aws.s3.S3.MultipartETagValidation @@ -13,8 +13,6 @@ import software.amazon.awssdk.core.async.{AsyncRequestBody, AsyncResponseTransfo import software.amazon.awssdk.services.s3.model.* import java.security.MessageDigest -import scala.collection.immutable.ArraySeq -import scala.collection.immutable.ArraySeq.unsafeWrapArray import scala.jdk.CollectionConverters.* import scala.util.control.NoStackTrace @@ -94,7 +92,7 @@ object S3 { } /** It creates an instance of the purely functional S3 API. */ - def create[F[_]: Async](s3: S3AsyncClientOp[F]): S3[F] = + def create[F[_]: Concurrent](s3: S3AsyncClientOp[F]): S3[F] = new S3[F] { /** Deletes a file in a single request. @@ -194,12 +192,10 @@ object S3 { ): Pipe[F, List[PartProcessingOutcome], (Option[ETag], Option[Checksum], Option[PartId])] = _.evalMap[F, (Option[ETag], Option[Checksum], Option[PartId])] { case Nil => - cancelUpload(uploadId) *> - Async[F] - .ifM(Async[F].pure(uploadEmptyFiles))( - uploadEmptyFile.map(eTag => (Option(eTag.eTag()), Option.empty[Checksum], Option.empty[PartId])), - Async[F].pure((Option.empty[ETag], Option.empty[Checksum], Option.empty[PartId])) - ) + cancelUpload(uploadId) *> ( + if (uploadEmptyFiles) uploadEmptyFile.map(eTag => Option(eTag.eTag)) + else Applicative[F].pure(Option.empty[ETag]) + ).map((_, Option.empty[Checksum], Option.empty[PartId])) case tags => val parts = tags.map { case PartProcessingOutcome(t, i, _) => CompletedPart.builder().partNumber(i).eTag(t).build() @@ -272,7 +268,7 @@ object S3 { .through(uploadPart(uploadId)) .fold[List[PartProcessingOutcome]](List.empty)(_ :+ _) .through(completeUpload(uploadId)) - .handleErrorWith(ex => fs2.Stream.eval(cancelUpload(uploadId) >> Sync[F].raiseError(ex))) + .handleErrorWith(ex => fs2.Stream.eval(cancelUpload(uploadId) >> ApplicativeThrow[F].raiseError(ex))) _ <- fs2.Stream.eval(validateETag(eTag, maxPartNum, checksum)) } yield eTag @@ -294,7 +290,7 @@ object S3 { AsyncResponseTransformer.toBytes[GetObjectResponse] ) ) - .flatMap(r => fs2.Stream.chunk(Chunk(ArraySeq.unsafeWrapArray(r.asByteArray)*))) + .flatMap(r => fs2.Stream.chunk(Chunk.array(r.asByteArrayUnsafe))) /** Reads a file in multiple parts of the specified @partSize per request. Suitable for big files. * @@ -328,23 +324,11 @@ object S3 { .last .flatMap { case Some(resp) => - Pull.eval { - Async[F].blocking { - val bs = resp.asByteArray() - val len = bs.length - if (len < 0) None else Some(Chunk(unsafeWrapArray(bs)*)) - } - } + val chunk = Chunk.array(resp.asByteArrayUnsafe) + Pull.output(chunk) >> (if (chunk.size < chunkSizeBytes) Pull.done else go(offset + chunk.size)) case None => - Pull.eval(none.pure[F]) - } - .flatMap { - case Some(o) => - if (o.size < chunkSizeBytes) Pull.output(o) - else Pull.output(o) >> go(offset + o.size) - case None => Pull.done + Pull.done } - go(0).stream }