Skip to content

Commit

Permalink
Fix chunk upload with ChunkedStreamReader to cancel underlying stream
Browse files Browse the repository at this point in the history
  • Loading branch information
hoangdat committed Feb 1, 2023
1 parent 5d08f44 commit 9bd2285
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 56 deletions.
75 changes: 46 additions & 29 deletions data/lib/src/repository/flow/flow_uploader_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import 'dart:async';
import 'dart:developer' as developer;
import 'dart:io';

import 'package:async/async.dart';
import 'package:dartz/dartz.dart';
import 'package:data/src/network/config/endpoint.dart';
import 'package:data/src/network/dio_client.dart';
Expand All @@ -43,7 +44,6 @@ import 'package:data/src/network/model/query/query_parameter.dart';
import 'package:data/src/repository/flow/flow_response.dart';
import 'package:dio/dio.dart';
import 'package:domain/domain.dart';
import 'package:domain/src/model/async_task/async_task.dart';

class FlowUploaderImpl extends FlowUploader {
final DioClient _dioClient;
Expand Down Expand Up @@ -102,7 +102,7 @@ class FlowUploaderImpl extends FlowUploader {

@override
Future<Flow> uploadChunk(
File file,
ChunkedStreamReader<int> chunkedStreamReaderFile,
int chunkNumber,
int chunkSize,
int currentChunkSize,
Expand All @@ -116,30 +116,51 @@ class FlowUploaderImpl extends FlowUploader {
{String? sharedSpaceId,
String? parentNodeId}
) async {
final _fileSize = file.lengthSync();
developer.log('uploadChunk(): chunk: $chunkNumber - currentChunkSize: $currentChunkSize', name: 'FlowUploaderImpl');

final formData = generateFormData(file, chunkNumber, chunkSize, currentChunkSize,
startByte, endByte, _fileSize, flowIdentifier, flowFile, totalChunk, uploadedByte,
onSendController, sharedSpaceId: sharedSpaceId, parentNodeId: parentNodeId);

final response = await _dioClient.post(
Endpoint.flow.generateEndpointPath(),
data: formData,
options: Options(
headers: _getRangeHeadersForChunkUpload(startByte, endByte, _fileSize)
),
onSendProgress: (progress, total) {
onSendController.add(Right(UploadingFlowUploadState(flowFile, uploadedByte + progress, flowFile.fileInfo.fileSize)));
}
);
final flowResponse = FlowResponse.fromJson(response);
developer.log('uploadChunk(): ${flowResponse.toString()}', name: 'FlowUploaderImpl');
return flowResponse.toFlow();
try {
final _fileSize = flowFile.fileInfo.fileSize;
developer.log(
'uploadChunk(): chunk: $chunkNumber - currentChunkSize: $currentChunkSize',
name: 'FlowUploaderImpl');

final formData = generateFormData(
chunkedStreamReaderFile,
chunkNumber,
chunkSize,
currentChunkSize,
startByte,
endByte,
_fileSize,
flowIdentifier,
flowFile,
totalChunk,
uploadedByte,
onSendController,
sharedSpaceId: sharedSpaceId,
parentNodeId: parentNodeId);

final response = await _dioClient.post(
Endpoint.flow.generateEndpointPath(),
data: formData,
options: Options(
headers: _getRangeHeadersForChunkUpload(startByte, endByte, _fileSize)
),
onSendProgress: (progress, total) {
onSendController.add(Right(UploadingFlowUploadState(
flowFile, uploadedByte + progress,
flowFile.fileInfo.fileSize)));
}
);
final flowResponse = FlowResponse.fromJson(response);
developer.log('uploadChunk(): ${flowResponse.toString()}', name: 'FlowUploaderImpl');
return flowResponse.toFlow();
} catch (e) {
developer.log('uploadChunk(): exception ${e}', name: 'FlowUploaderImpl');
rethrow;
}
}

FormData generateFormData(
File file,
ChunkedStreamReader<int> chunkedStreamReaderFile,
int chunkNumber,
int chunkSize,
int currentChunkSize,
Expand All @@ -163,7 +184,7 @@ class FlowUploaderImpl extends FlowUploader {
IDENTIFIER: flowIdentifier,
FILENAME: flowFile.fileInfo.fileName,
RELATIVE_PATH: flowFile.fileInfo.fileName,
FILE: MultipartFile(file.openRead(startByte, endByte), endByte - startByte),
FILE: MultipartFile(chunkedStreamReaderFile.readStream(currentChunkSize), currentChunkSize),
ASYNC_TASK: true,
};

Expand Down Expand Up @@ -192,10 +213,6 @@ class FlowUploaderImpl extends FlowUploader {
);

developer.log('getFlowTask(): 1', name: 'FlowUploaderImpl');
final dmm = AsyncTaskResponse.fromJson(asyncTaskJson);
developer.log('getFlowTask(): 2 - $dmm', name: 'FlowUploaderImpl');
final chot = dmm.toAsyncTask();
developer.log('getFlowTask(): 3 - $chot', name: 'FlowUploaderImpl');
return chot;
return AsyncTaskResponse.fromJson(asyncTaskJson).toAsyncTask();
}
}
12 changes: 11 additions & 1 deletion domain/lib/src/model/flow/flow_chunk.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
*/

import 'dart:async';
import 'dart:developer' as developer;
import 'dart:io';
import 'dart:math';

import 'package:async/async.dart';
import 'package:dartz/dartz.dart';
import 'package:domain/domain.dart';
import 'package:domain/src/model/flow/flow_chunk_upload_state.dart';
Expand All @@ -58,6 +60,7 @@ class FlowChunk extends Equatable {
FlowChunkUploadState status = FlowChunkUploadState.pending;

bool error = false;
ChunkedStreamReader<int>? _chunkedStreamReader;

final FlowUploader _flowUploader;

Expand All @@ -75,8 +78,9 @@ class FlowChunk extends Equatable {

Future<Flow> upload(int uploadedByte, StreamController<Either<Failure, Success>> onSendController) async {
status = FlowChunkUploadState.uploading;
_chunkedStreamReader = ChunkedStreamReader(file.openRead(startByte, endByte));
return _flowUploader.uploadChunk(
file,
_chunkedStreamReader!,
offset + 1,
chunkSize,
currentChunkSize,
Expand Down Expand Up @@ -108,11 +112,17 @@ class FlowChunk extends Equatable {
if (isSuccess) {
status = FlowChunkUploadState.success;
} else {
developer.log('test(): update status error for offset $offset', name: 'FlowChunk');
status = FlowChunkUploadState.error;
}
return isSuccess;
}

void completed() {
developer.log('completed()', name: 'FlowChunk');
_chunkedStreamReader?.cancel();
}

@override
List<Object?> get props => [offset, flowFile, identifier];
}
28 changes: 15 additions & 13 deletions domain/lib/src/model/flow/flow_file.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,9 @@ import 'dart:io';

import 'package:dartz/dartz.dart';
import 'package:domain/domain.dart';
import 'package:domain/src/model/async_task/async_task.dart';
import 'package:domain/src/model/async_task/async_task_exception.dart';
import 'package:domain/src/model/async_task/async_task_status.dart';
import 'package:domain/src/model/flow/flow_chunk.dart';
import 'package:domain/src/model/flow/flow_chunk_upload_state.dart';
import 'package:domain/src/repository/flow/flow_uploader.dart';
import 'package:domain/src/usecases/upload_file/flow_upload_state.dart';
import 'package:equatable/equatable.dart';
import 'package:retry/retry.dart';

Expand Down Expand Up @@ -110,21 +106,26 @@ class FlowFile extends Equatable {
}

void _updateEvent(Either<Failure, Success> flowUploadState) {
developer.log('_updateEvent(): $flowUploadState', name: 'FlowFile');
_progressStateController.add(flowUploadState);
}

void _handleCompleted() {
if (chunks.every((chunk) => chunk.status == FlowChunkUploadState.success)) {
final asyncTaskId = _getProcessingTaskId();
if (asyncTaskId != null) {
_handleProcessing(asyncTaskId);
try {
if (chunks.every((chunk) => chunk.status == FlowChunkUploadState.success)) {
final asyncTaskId = _getProcessingTaskId();
if (asyncTaskId != null) {
_handleProcessing(asyncTaskId);
} else {
_handleSuccess();
}
} else {
_handleSuccess();
developer.log('handleCompleted(): error', name: 'FlowFile');
_updateEvent(Left(ErrorFlowUploadState(this)));
_progressStateController.close();
}
} else {
developer.log('handleCompleted(): error', name: 'FlowFile');
_updateEvent(Left(ErrorFlowUploadState(this)));
_progressStateController.close();
} catch (e) {
developer.log('_handleCompleted(): exception $e', name: 'FlowFile');
}
}

Expand Down Expand Up @@ -177,6 +178,7 @@ class FlowFile extends Equatable {
if (success) {
_updateProgress(chunk);
}
chunk.completed();
}
}

Expand Down
9 changes: 6 additions & 3 deletions domain/lib/src/repository/flow/flow_uploader.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@
*/

import 'dart:async';
import 'dart:io';

import 'package:async/async.dart';
import 'package:dartz/dartz.dart';
import 'package:domain/domain.dart';
import 'package:domain/src/model/async_task/async_task.dart';
import 'package:domain/src/model/file_info.dart';
import 'package:domain/src/model/flow/flow.dart';
import 'package:domain/src/model/flow/flow_file.dart';
import 'package:domain/src/state/failure.dart';
import 'package:domain/src/state/success.dart';

abstract class FlowUploader {
Future<Flow> uploadChunk(
File file,
ChunkedStreamReader<int> chunkedStreamReaderFile,
int chunkNumber,
int chunkSize,
int currentChunkSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class UploadShareFileManager {
_progressStateStreamGroup.stream.listen((flowUploadState) {
flowUploadState.fold(
(failure) {
developer.log('_handleFlowProgressState(): [Failure] $failure', name: 'UploadShareFileManager');
if (failure is ErrorFlowUploadState) {
_uploadingStateFiles.updateElementByUploadTaskId(
failure.flowFile.uploadTaskId,
Expand All @@ -154,8 +155,6 @@ class UploadShareFileManager {
},
(success) {
if (success is UploadingFlowUploadState) {
developer.log('_handleFlowProgressState(): uploading: ${success.progress}', name: 'UploadShareFileManager');

_uploadingStateFiles.updateElementByUploadTaskId(
success.flowFile.uploadTaskId,
(currentState) => (currentState?.uploadStatus.completed ?? false)
Expand All @@ -177,7 +176,7 @@ class UploadShareFileManager {
}

void _handleUploadFileSucceedWithResource(SuccessWithResourceFlowUploadState successWithResourceFlowUploadState) {
developer.log('_handleUploadFileSucceed()', name: 'UploadShareFileManager');
developer.log('_handleUploadFileSucceedWithResource()', name: 'UploadShareFileManager');
final fileState = _uploadingStateFiles.getElementByUploadTaskId(successWithResourceFlowUploadState.flowFile.uploadTaskId);
if (fileState != null) {
_fileHelper.deleteFile(fileState.file);
Expand Down Expand Up @@ -227,6 +226,7 @@ class UploadShareFileManager {
}

void _handleFlowUploadFileFailure(ErrorFlowUploadState errorFlowUploadState) {
developer.log('_handleFlowUploadFileFailure(): $errorFlowUploadState', name: 'UploadShareFileManager');
final fileState = _uploadingStateFiles.getElementByUploadTaskId(errorFlowUploadState.flowFile.uploadTaskId);
if (fileState != null) {
_fileHelper.deleteFile(fileState.file);
Expand Down
15 changes: 8 additions & 7 deletions lib/presentation/util/helper/file_helper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,20 @@
// the Additional Terms applicable to LinShare software.
//

import 'dart:developer' as developer;
import 'dart:io';

import 'package:domain/domain.dart';
import 'dart:developer' as developer;

class FileHelper {
void deleteFile(FileInfo fileInfo) async {
if (fileInfo != null) {
final file = File(fileInfo.filePath + fileInfo.fileName);
try {
await file.delete();
} catch (exception) {
print('error when delete file: ${fileInfo.fileName} ' + exception.toString());
}
developer.log('deleteFile(): $fileInfo', name: 'FileHelper');
final file = File(fileInfo.filePath + fileInfo.fileName);
try {
await file.delete();
} catch (exception) {
developer.log('deleteFile(): error when delete file: ${fileInfo.fileName}' + exception.toString(), name: 'FileHelper');
}
}
}

0 comments on commit 9bd2285

Please sign in to comment.