Skip to content

Commit

Permalink
feat: Fix chunk upload in Swift (box/box-codegen#555) (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
box-sdk-build authored Sep 5, 2024
1 parent 3daadf7 commit 93ff568
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .codegen.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{ "engineHash": "6cf3f9e", "specHash": "57614c2", "version": "0.3.1" }
{ "engineHash": "2994c4a", "specHash": "739d87b", "version": "0.3.1" }
24 changes: 24 additions & 0 deletions BoxSdkGen.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

127 changes: 127 additions & 0 deletions Sources/Internal/MemoryInputStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import Foundation

/// A custom `InputStream` implementation that reads data from an in-memory `Data` object.
class MemoryInputStream: InputStream {
private var data: Data
private var position: Int = 0
private var _streamStatus: Stream.Status
private var _streamError: Error?
private var _delegate: StreamDelegate?

/// Initializes a new `MemoryInputStream` with the provided data.
/// - Parameter data: The `Data` object containing the data to be read.
override init(data: Data) {
self.data = data
self._streamStatus = .notOpen
super.init(data: Data())
}

/// Reads up to a specified number of bytes into the provided buffer.
/// - Parameters:
/// - buffer: A buffer to which the data will be read.
/// - len: The maximum number of bytes to read.
/// - Returns: The number of bytes actually read, or `-1` if an error occurred or the stream is not open.
override func read(_ buffer: UnsafeMutablePointer<UInt8>, maxLength len: Int) -> Int {
guard _streamStatus == .open else {
return -1 // Stream must be open to read
}

// If we've reached the end of the buffer, mark the stream as atEnd
if position >= data.count {
_streamStatus = .atEnd
return 0
}

// Calculate the number of bytes to read
let bytesToRead = min(len, data.count - position)

// Copy data to the buffer
let range = position..<position + bytesToRead
data.copyBytes(to: buffer, from: range)

// Update the position
position += bytesToRead

if position >= data.count {
_streamStatus = .atEnd
}

return bytesToRead
}

/// Indicates whether there are bytes available to read.
/// - Returns: `true` if there are bytes available and the stream is open; otherwise, `false`.
override var hasBytesAvailable: Bool {
return position < data.count && _streamStatus == .open
}

/// Closes the stream, marking the end of the data.
override func close() {
position = data.count
_streamStatus = .closed
}

/// Resets the stream to its initial state.
/// The position is reset to the beginning of the data and the stream status is set to not open.
func reset() {
position = 0
_streamStatus = .notOpen
}

/// Opens the stream for reading.
/// The position is reset to the beginning of the data and the stream status is set to open.
override func open() {
position = 0
_streamStatus = .open
}

/// The current status of the stream.
override var streamStatus: Stream.Status {
return _streamStatus
}

/// The error encountered by the stream, if any.
override var streamError: Error? {
return _streamError
}

/// The delegate of the stream.
override var delegate: StreamDelegate? {
get {
return _delegate
}
set {
_delegate = newValue
}
}

/// Schedules the stream in a run loop. This method does nothing in this implementation.
/// - Parameters:
/// - runLoop: The run loop in which to schedule the stream.
/// - mode: The run loop mode in which to schedule the stream.
override func schedule(in _: RunLoop, forMode _: RunLoop.Mode) {}

/// Removes the stream from a run loop. This method does nothing in this implementation.
/// - Parameters:
/// - runLoop: The run loop from which to remove the stream.
/// - mode: The run loop mode from which to remove the stream.
override func remove(from _: RunLoop, forMode _: RunLoop.Mode) {}

#if os(iOS) || os(macOS)
/// Returns the value of a specified property key. This method always returns `nil` in this implementation.
/// - Parameter key: The property key.
/// - Returns: `nil`.
override func property(forKey _: Stream.PropertyKey) -> Any? {
return nil
}

/// Sets the value of a specified property key. This method always returns `false` in this implementation.
/// - Parameters:
/// - property: The property value to set.
/// - key: The property key.
/// - Returns: `false`.
override func setProperty(_: Any?, forKey _: Stream.PropertyKey) -> Bool {
return false
}
#endif
}
85 changes: 85 additions & 0 deletions Sources/Internal/StreamSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import Foundation

/// A `Sequence` that reads data from an `InputStream` in chunks.
/// Conforms to the `Sequence` protocol and provides an iterator that yields chunks of data from the stream.
public struct StreamSequence: Sequence {
/// The type of elements that this sequence yields.
public typealias Element = InputStream

/// The `InputStream` to read data from.
private let inputStream: InputStream

/// The size of each chunk to read from the `InputStream`.
private let chunkSize: Int

/// Initializes a `StreamSequence` with the given `InputStream` and chunk size.
///
/// - Parameters:
/// - inputStream: The `InputStream` to read data from.
/// - chunkSize: The size of each chunk to read from the `InputStream`.
init(inputStream: InputStream, chunkSize: Int) {
self.inputStream = inputStream
self.chunkSize = chunkSize
}

/// Creates and returns an iterator for this sequence.
///
/// - Returns: A `StreamIterator` that reads data from the `InputStream` in chunks.
public func makeIterator() -> StreamIterator {
return StreamIterator(inputStream: inputStream, chunkSize: chunkSize)
}
}

/// An iterator that reads data from an `InputStream` in chunks.
/// Conforms to the `IteratorProtocol` and yields chunks of data as `InputStream` objects.
public struct StreamIterator: IteratorProtocol {
/// The type of elements that this iterator yields.
public typealias Element = InputStream

/// The `InputStream` to read data from.
private let inputStream: InputStream

/// The size of each chunk to read from the `InputStream`.
private let chunkSize: Int

/// A buffer to hold data read from the `InputStream`.
private var buffer: [UInt8]

/// A flag indicating whether there is more data to read from the `InputStream`.
private var hasMoreData: Bool

/// Initializes a `StreamIterator` with the given `InputStream` and chunk size.
///
/// - Parameters:
/// - inputStream: The `InputStream` to read data from.
/// - chunkSize: The size of each chunk to read from the `InputStream`.
init(inputStream: InputStream, chunkSize: Int) {
self.inputStream = inputStream
self.chunkSize = chunkSize
self.buffer = [UInt8](repeating: 0, count: chunkSize)
self.hasMoreData = true

// Open the input stream for reading.
inputStream.open()
}

/// Reads the next chunk of data from the `InputStream` and returns it as an `InputStream`.
///
/// - Returns: An `InputStream` containing the next chunk of data, or `nil` if no more data is available.
public mutating func next() -> InputStream? {
guard hasMoreData else { return nil }

// Read data into the buffer.
let bytesRead = inputStream.read(&buffer, maxLength: chunkSize)

if bytesRead > 0 {
// Create and return an `InputStream` containing the read data.
return MemoryInputStream(data: Data(bytes: buffer, count: bytesRead))
} else {
// No more data to read.
hasMoreData = false
inputStream.close()
return nil
}
}
}
42 changes: 9 additions & 33 deletions Sources/Internal/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -289,53 +289,29 @@ public enum Utils {
}


/// Iterates over a stream and yields chunks of it
/// Creates a StreamSequence from a given InputStream.
///
/// - Parameters:
/// - stream: InputStream to iterate over
/// - chunkSize: Size of chunk
/// - fileSize: Size of the file
/// - Returns: The asynchronous sequence AsyncStream
public static func iterateChunks(stream: InputStream, chunkSize: Int64, fileSize: Int64) -> AsyncStream<InputStream> {
return AsyncStream<InputStream> { continuation in
_Concurrency.Task {
stream.open()

let bufferSize = Int(chunkSize)
var buffer = [UInt8](repeating: 0, count: bufferSize)

defer {
stream.close()
continuation.finish()
}

while stream.hasBytesAvailable {
let read = stream.read(&buffer, maxLength: buffer.count)
if read < 0, let error = stream.streamError {
throw error
} else if read == 0 {
return
}

continuation.yield(InputStream(data:Data(buffer.prefix(read))))
}
}
}
}
/// - Returns: The StreamSequence
public static func iterateChunks(stream: InputStream, chunkSize: Int64, fileSize: Int64) -> StreamSequence {
return StreamSequence(inputStream: stream, chunkSize: Int(chunkSize))
}

/// Asynchronously reduces the elements of an `AsyncStream` using a specified reducer function and initial value.
/// Asynchronously reduces the elements of an `Sequence` using a specified reducer function and initial value.
///
/// - Parameters:
/// - iterator: The `AsyncStream` providing elements to be reduced.
/// - iterator: The `Sequence` providing elements to be reduced.
/// - reducer: A closure that combines an accumulated value (`U`) with each element of the stream (`T`) asynchronously.
/// - initialValue: The initial value to start the reduction.
/// - Returns: The result of combining all elements of the stream using the provided reducer function.
/// - Throws: Any error thrown by the `reducer` closure during the reduction process.
public static func reduceIterator<T,U>(iterator: AsyncStream<T>, reducer: (U, T) async throws -> U, initialValue: U) async throws -> U
{
public static func reduceIterator<T,U,S>(iterator: S, reducer: @escaping (U, T) async throws -> U, initialValue: U) async throws -> U where S: Sequence, S.Element == T {
var result = initialValue

for await item in iterator {
for item in iterator {
result = try await reducer(result, item)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public class ChunkedUploadsManager {
assert(partSize * Int64(totalParts) >= fileSize)
assert(uploadSession.numPartsProcessed == 0)
let fileHash: Hash = Hash(algorithm: HashName.sha1)
let chunksIterator: AsyncStream<InputStream> = Utils.iterateChunks(stream: file, chunkSize: partSize, fileSize: fileSize)
let chunksIterator: StreamSequence = Utils.iterateChunks(stream: file, chunkSize: partSize, fileSize: fileSize)
let results: PartAccumulator = try await Utils.reduceIterator(iterator: chunksIterator, reducer: self.reducer, initialValue: PartAccumulator(lastIndex: -1, parts: [], fileSize: fileSize, uploadPartUrl: uploadPartUrl, fileHash: fileHash))
let parts: [UploadPart] = results.parts
let processedSessionParts: UploadParts = try await self.getFileUploadSessionPartsByUrl(url: listPartsUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class FolderClassificationsManager {
///
/// This API can also be called by including the enterprise ID in the
/// URL explicitly, for example
/// `/folders/:id//enterprise_12345/securityClassification-6VMVochwUWo`.
/// `/folders/:id/enterprise_12345/securityClassification-6VMVochwUWo`.
///
/// - Parameters:
/// - folderId: The unique identifier that represent a folder.
Expand Down Expand Up @@ -43,7 +43,7 @@ public class FolderClassificationsManager {
///
/// This API can also be called by including the enterprise ID in the
/// URL explicitly, for example
/// `/folders/:id//enterprise_12345/securityClassification-6VMVochwUWo`.
/// `/folders/:id/enterprise_12345/securityClassification-6VMVochwUWo`.
///
/// - Parameters:
/// - folderId: The unique identifier that represent a folder.
Expand Down Expand Up @@ -99,7 +99,7 @@ public class FolderClassificationsManager {
///
/// This API can also be called by including the enterprise ID in the
/// URL explicitly, for example
/// `/folders/:id//enterprise_12345/securityClassification-6VMVochwUWo`.
/// `/folders/:id/enterprise_12345/securityClassification-6VMVochwUWo`.
///
/// - Parameters:
/// - folderId: The unique identifier that represent a folder.
Expand Down
25 changes: 23 additions & 2 deletions Sources/Networking/FetchOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public enum HTTPMethod: String, ExpressibleByStringLiteral, CaseIterable {
case options = "OPTIONS"
}

/// Represents paremeters used for the request.
/// Represents parameters used for the request.
public class FetchOptions {

/// The HTTP request URL
Expand Down Expand Up @@ -69,7 +69,7 @@ public class FetchOptions {
/// - downloadDestinationURL: The URL on disk where the data will be saved
/// - contentType: Content type of the request.
/// - responseFormat: Expected format of the response: 'json', 'binary'
/// - auth: The authentication sesson management used in the request.
/// - auth: The authentication session management used in the request.
/// - networkSession: The URLSession holder along with the network configuration parameters
public init(
url: String,
Expand Down Expand Up @@ -98,4 +98,25 @@ public class FetchOptions {
self.auth = auth
self.networkSession = networkSession
}

/// Creates a new `FetchOptions` object with an updated file stream.
///
/// - Parameter fileStream: The new input stream for file uploads.
/// - Returns: A new `FetchOptions` instance with the updated file stream.
func withFileStream(fileStream: InputStream) -> FetchOptions {
return FetchOptions(
url: url,
method: method,
params: params,
headers: headers,
data: data,
fileStream: fileStream,
multipartData: multipartData,
downloadDestinationURL: downloadDestinationURL,
contentType: contentType,
responseFormat: responseFormat,
auth: auth,
networkSession: networkSession
)
}
}
10 changes: 10 additions & 0 deletions Sources/Networking/NetworkClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public class NetworkClient {
/// - Returns: Response of the request in the form of FetchResponse object.
/// - Throws: An error if the request fails for any reason.
public func fetch(options: FetchOptions) async throws -> FetchResponse {
var options = options
if let fileStream = options.fileStream, !(fileStream is MemoryInputStream) {
let memoryInputStream = MemoryInputStream(data: Utils.readByteStream(byteStream: fileStream))
options = options.withFileStream(fileStream: memoryInputStream)
}

return try await fetch(
options: options,
networkSession: options.networkSession ?? NetworkSession(),
Expand All @@ -53,6 +59,10 @@ public class NetworkClient {
networkSession: networkSession
)

if let fileStream = options.fileStream, let memoryInputStream = fileStream as? MemoryInputStream, attempt > 1 {
memoryInputStream.reset()
}

if let downloadDestinationURL = options.downloadDestinationURL {
let (downloadUrl, urlResponse) = try await sendDownloadRequest(urlRequest, downloadDestinationURL: downloadDestinationURL, networkSession: networkSession)
let conversation = FetchConversation(options: options, urlRequest: urlRequest, urlResponse: urlResponse as! HTTPURLResponse, responseType: .url(downloadUrl))
Expand Down
Loading

0 comments on commit 93ff568

Please sign in to comment.