Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix chunk upload with ChunkedStreamReader to cancel underlying stream #920

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions data/lib/src/repository/flow/flow_uploader_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import 'dart:async';
import 'dart:developer' as developer;
import 'dart:io';

import 'package:async/async.dart';
import 'package:dartz/dartz.dart';
import 'package:data/src/network/config/endpoint.dart';
import 'package:data/src/network/dio_client.dart';
Expand All @@ -43,7 +44,6 @@ import 'package:data/src/network/model/query/query_parameter.dart';
import 'package:data/src/repository/flow/flow_response.dart';
import 'package:dio/dio.dart';
import 'package:domain/domain.dart';
import 'package:domain/src/model/async_task/async_task.dart';

class FlowUploaderImpl extends FlowUploader {
final DioClient _dioClient;
Expand Down Expand Up @@ -102,7 +102,7 @@ class FlowUploaderImpl extends FlowUploader {

@override
Future<Flow> uploadChunk(
File file,
ChunkedStreamReader<int> chunkedStreamReaderFile,
int chunkNumber,
int chunkSize,
int currentChunkSize,
Expand All @@ -116,30 +116,51 @@ class FlowUploaderImpl extends FlowUploader {
{String? sharedSpaceId,
String? parentNodeId}
) async {
final _fileSize = file.lengthSync();
developer.log('uploadChunk(): chunk: $chunkNumber - currentChunkSize: $currentChunkSize', name: 'FlowUploaderImpl');

final formData = generateFormData(file, chunkNumber, chunkSize, currentChunkSize,
startByte, endByte, _fileSize, flowIdentifier, flowFile, totalChunk, uploadedByte,
onSendController, sharedSpaceId: sharedSpaceId, parentNodeId: parentNodeId);

final response = await _dioClient.post(
Endpoint.flow.generateEndpointPath(),
data: formData,
options: Options(
headers: _getRangeHeadersForChunkUpload(startByte, endByte, _fileSize)
),
onSendProgress: (progress, total) {
onSendController.add(Right(UploadingFlowUploadState(flowFile, uploadedByte + progress, flowFile.fileInfo.fileSize)));
}
);
final flowResponse = FlowResponse.fromJson(response);
developer.log('uploadChunk(): ${flowResponse.toString()}', name: 'FlowUploaderImpl');
return flowResponse.toFlow();
try {
final _fileSize = flowFile.fileInfo.fileSize;
developer.log(
'uploadChunk(): chunk: $chunkNumber - currentChunkSize: $currentChunkSize',
name: 'FlowUploaderImpl');

final formData = generateFormData(
chunkedStreamReaderFile,
chunkNumber,
chunkSize,
currentChunkSize,
startByte,
endByte,
_fileSize,
flowIdentifier,
flowFile,
totalChunk,
uploadedByte,
onSendController,
sharedSpaceId: sharedSpaceId,
parentNodeId: parentNodeId);

final response = await _dioClient.post(
Endpoint.flow.generateEndpointPath(),
data: formData,
options: Options(
headers: _getRangeHeadersForChunkUpload(startByte, endByte, _fileSize)
),
onSendProgress: (progress, total) {
onSendController.add(Right(UploadingFlowUploadState(
flowFile, uploadedByte + progress,
flowFile.fileInfo.fileSize)));
}
);
final flowResponse = FlowResponse.fromJson(response);
developer.log('uploadChunk(): ${flowResponse.toString()}', name: 'FlowUploaderImpl');
return flowResponse.toFlow();
} catch (e) {
developer.log('uploadChunk(): exception ${e}', name: 'FlowUploaderImpl');
rethrow;
}
}

FormData generateFormData(
File file,
ChunkedStreamReader<int> chunkedStreamReaderFile,
int chunkNumber,
int chunkSize,
int currentChunkSize,
Expand All @@ -163,7 +184,7 @@ class FlowUploaderImpl extends FlowUploader {
IDENTIFIER: flowIdentifier,
FILENAME: flowFile.fileInfo.fileName,
RELATIVE_PATH: flowFile.fileInfo.fileName,
FILE: MultipartFile(file.openRead(startByte, endByte), endByte - startByte),
FILE: MultipartFile(chunkedStreamReaderFile.readStream(currentChunkSize), currentChunkSize),
ASYNC_TASK: true,
};

Expand Down Expand Up @@ -192,10 +213,6 @@ class FlowUploaderImpl extends FlowUploader {
);

developer.log('getFlowTask(): 1', name: 'FlowUploaderImpl');
final dmm = AsyncTaskResponse.fromJson(asyncTaskJson);
developer.log('getFlowTask(): 2 - $dmm', name: 'FlowUploaderImpl');
final chot = dmm.toAsyncTask();
developer.log('getFlowTask(): 3 - $chot', name: 'FlowUploaderImpl');
return chot;
return AsyncTaskResponse.fromJson(asyncTaskJson).toAsyncTask();
}
}
12 changes: 11 additions & 1 deletion domain/lib/src/model/flow/flow_chunk.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
*/

import 'dart:async';
import 'dart:developer' as developer;
import 'dart:io';
import 'dart:math';

import 'package:async/async.dart';
import 'package:dartz/dartz.dart';
import 'package:domain/domain.dart';
import 'package:domain/src/model/flow/flow_chunk_upload_state.dart';
Expand All @@ -58,6 +60,7 @@ class FlowChunk extends Equatable {
FlowChunkUploadState status = FlowChunkUploadState.pending;

bool error = false;
ChunkedStreamReader<int>? _chunkedStreamReader;

final FlowUploader _flowUploader;

Expand All @@ -75,8 +78,9 @@ class FlowChunk extends Equatable {

Future<Flow> upload(int uploadedByte, StreamController<Either<Failure, Success>> onSendController) async {
status = FlowChunkUploadState.uploading;
_chunkedStreamReader = ChunkedStreamReader(file.openRead(startByte, endByte));
return _flowUploader.uploadChunk(
file,
_chunkedStreamReader!,
offset + 1,
chunkSize,
currentChunkSize,
Expand Down Expand Up @@ -108,11 +112,17 @@ class FlowChunk extends Equatable {
if (isSuccess) {
status = FlowChunkUploadState.success;
} else {
developer.log('test(): update status error for offset $offset', name: 'FlowChunk');
status = FlowChunkUploadState.error;
}
return isSuccess;
}

void completed() {
developer.log('completed()', name: 'FlowChunk');
_chunkedStreamReader?.cancel();
}

@override
List<Object?> get props => [offset, flowFile, identifier];
}
28 changes: 15 additions & 13 deletions domain/lib/src/model/flow/flow_file.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,9 @@ import 'dart:io';

import 'package:dartz/dartz.dart';
import 'package:domain/domain.dart';
import 'package:domain/src/model/async_task/async_task.dart';
import 'package:domain/src/model/async_task/async_task_exception.dart';
import 'package:domain/src/model/async_task/async_task_status.dart';
import 'package:domain/src/model/flow/flow_chunk.dart';
import 'package:domain/src/model/flow/flow_chunk_upload_state.dart';
import 'package:domain/src/repository/flow/flow_uploader.dart';
import 'package:domain/src/usecases/upload_file/flow_upload_state.dart';
import 'package:equatable/equatable.dart';
import 'package:retry/retry.dart';

Expand Down Expand Up @@ -110,21 +106,26 @@ class FlowFile extends Equatable {
}

void _updateEvent(Either<Failure, Success> flowUploadState) {
developer.log('_updateEvent(): $flowUploadState', name: 'FlowFile');
_progressStateController.add(flowUploadState);
}

void _handleCompleted() {
if (chunks.every((chunk) => chunk.status == FlowChunkUploadState.success)) {
final asyncTaskId = _getProcessingTaskId();
if (asyncTaskId != null) {
_handleProcessing(asyncTaskId);
try {
if (chunks.every((chunk) => chunk.status == FlowChunkUploadState.success)) {
final asyncTaskId = _getProcessingTaskId();
if (asyncTaskId != null) {
_handleProcessing(asyncTaskId);
} else {
_handleSuccess();
}
} else {
_handleSuccess();
developer.log('handleCompleted(): error', name: 'FlowFile');
_updateEvent(Left(ErrorFlowUploadState(this)));
_progressStateController.close();
}
} else {
developer.log('handleCompleted(): error', name: 'FlowFile');
_updateEvent(Left(ErrorFlowUploadState(this)));
_progressStateController.close();
} catch (e) {
developer.log('_handleCompleted(): exception $e', name: 'FlowFile');
}
}

Expand Down Expand Up @@ -177,6 +178,7 @@ class FlowFile extends Equatable {
if (success) {
_updateProgress(chunk);
}
chunk.completed();
}
}

Expand Down
9 changes: 6 additions & 3 deletions domain/lib/src/repository/flow/flow_uploader.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@
*/

import 'dart:async';
import 'dart:io';

import 'package:async/async.dart';
import 'package:dartz/dartz.dart';
import 'package:domain/domain.dart';
import 'package:domain/src/model/async_task/async_task.dart';
import 'package:domain/src/model/file_info.dart';
import 'package:domain/src/model/flow/flow.dart';
import 'package:domain/src/model/flow/flow_file.dart';
import 'package:domain/src/state/failure.dart';
import 'package:domain/src/state/success.dart';

abstract class FlowUploader {
Future<Flow> uploadChunk(
File file,
ChunkedStreamReader<int> chunkedStreamReaderFile,
int chunkNumber,
int chunkSize,
int currentChunkSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class UploadShareFileManager {
_progressStateStreamGroup.stream.listen((flowUploadState) {
flowUploadState.fold(
(failure) {
developer.log('_handleFlowProgressState(): [Failure] $failure', name: 'UploadShareFileManager');
if (failure is ErrorFlowUploadState) {
_uploadingStateFiles.updateElementByUploadTaskId(
failure.flowFile.uploadTaskId,
Expand All @@ -154,8 +155,6 @@ class UploadShareFileManager {
},
(success) {
if (success is UploadingFlowUploadState) {
developer.log('_handleFlowProgressState(): uploading: ${success.progress}', name: 'UploadShareFileManager');

_uploadingStateFiles.updateElementByUploadTaskId(
success.flowFile.uploadTaskId,
(currentState) => (currentState?.uploadStatus.completed ?? false)
Expand All @@ -177,7 +176,7 @@ class UploadShareFileManager {
}

void _handleUploadFileSucceedWithResource(SuccessWithResourceFlowUploadState successWithResourceFlowUploadState) {
developer.log('_handleUploadFileSucceed()', name: 'UploadShareFileManager');
developer.log('_handleUploadFileSucceedWithResource()', name: 'UploadShareFileManager');
final fileState = _uploadingStateFiles.getElementByUploadTaskId(successWithResourceFlowUploadState.flowFile.uploadTaskId);
if (fileState != null) {
_fileHelper.deleteFile(fileState.file);
Expand Down Expand Up @@ -227,6 +226,7 @@ class UploadShareFileManager {
}

void _handleFlowUploadFileFailure(ErrorFlowUploadState errorFlowUploadState) {
developer.log('_handleFlowUploadFileFailure(): $errorFlowUploadState', name: 'UploadShareFileManager');
final fileState = _uploadingStateFiles.getElementByUploadTaskId(errorFlowUploadState.flowFile.uploadTaskId);
if (fileState != null) {
_fileHelper.deleteFile(fileState.file);
Expand Down
15 changes: 8 additions & 7 deletions lib/presentation/util/helper/file_helper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,20 @@
// the Additional Terms applicable to LinShare software.
//

import 'dart:developer' as developer;
import 'dart:io';

import 'package:domain/domain.dart';
import 'dart:developer' as developer;

class FileHelper {
void deleteFile(FileInfo fileInfo) async {
if (fileInfo != null) {
final file = File(fileInfo.filePath + fileInfo.fileName);
try {
await file.delete();
} catch (exception) {
print('error when delete file: ${fileInfo.fileName} ' + exception.toString());
}
developer.log('deleteFile(): $fileInfo', name: 'FileHelper');
final file = File(fileInfo.filePath + fileInfo.fileName);
try {
await file.delete();
} catch (exception) {
developer.log('deleteFile(): error when delete file: ${fileInfo.fileName}' + exception.toString(), name: 'FileHelper');
}
}
}