Skip to content

Commit

Permalink
add: FtpClientEx UploadFile
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya committed Oct 20, 2024
1 parent 10b2e8d commit bef2b72
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 18 deletions.
58 changes: 57 additions & 1 deletion src/Asv.Mavlink.Test/Microservices/Ftp/FtpMicroservice.Test.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
using System;
using System.Buffers;
using System.IO;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reactive.Concurrency;
using System.Threading;
using System.Threading.Tasks;
using DynamicData;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -115,7 +119,7 @@ public async Task CreateFile_WithProperInput_Success(string filePath, byte sessi
server.CreateFile = (path, cancellationToken) => Task.FromResult(session);

var result = await client.CreateFile(filePath);
Assert.Equal((byte)0, result.ReadSession());
Assert.Equal(session, result.ReadSession());
Assert.Equal(FtpOpcode.Ack, result.ReadOpcode());
}

Expand Down Expand Up @@ -375,4 +379,56 @@ await client.BurstReadFile(originRequest, packet =>
originStream.Close();
}
}

[Fact]
public async Task Client_Call_UploadFile_And_Server_Catch_It()
{
SetUpMicroservice(out var client, out var server, packet => true, packet => true);

const string testFilePath = "mftp://upload/test.txt";
var testData = new byte[500];
new Random().NextBytes(testData);
using var streamToUpload = new MemoryStream(testData);

const byte expectedSession = 1;
var expectedFileSize = (uint)testData.Length;

var openFileWriteCalled = 0;
var writeFileCalled = 0;
var terminateSessionCalled = 0;

server.OpenFileWrite = (path, cancel) =>
{
openFileWriteCalled++;
Assert.Equal(testFilePath, path);
return Task.FromResult(new WriteHandle(expectedSession, expectedFileSize));
};

var receivedData = new byte[testData.Length];
server.WriteFile = (request, data, cancel) =>
{
writeFileCalled++;
Assert.Equal(expectedSession, request.Session);
var expectedDataLength = request.Take;
_output.WriteLine($"Write exceeds buffer size. Written={request.Skip + data.Length}. To be write={receivedData.Length}");
data[..expectedDataLength].CopyTo(receivedData.AsMemory((int)request.Skip, expectedDataLength));
return Task.CompletedTask;
};

server.TerminateSession = (session, cancel) =>
{
terminateSessionCalled++;
Assert.Equal(expectedSession, session);
return Task.CompletedTask;
};

var ftpClientEx = new FtpClientEx(client);
await ftpClientEx.UploadFile(testFilePath, streamToUpload);

Assert.Equal(1, openFileWriteCalled);
Assert.True(writeFileCalled > 0);
Assert.Equal(1, terminateSessionCalled);
Assert.Equal(testData.Length, receivedData.Length);
Assert.Equal(testData.ToArray(), receivedData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public FtpServerExTests(ITestOutputHelper output)
_output = output;
}

private MockFileSystem SetUpFileSystem(string root)
public MockFileSystem SetUpFileSystem(string root)
{
var mockFileCfg = new MockFileSystemOptions
{
Expand All @@ -35,8 +35,8 @@ private MockFileSystem SetUpFileSystem(string root)

return fileSystem;
}
private void SetUpServer(out IFtpServer server)

public void SetUpServer(out IFtpServer server)
{
var link = new VirtualMavlinkConnection(_ => true, _ => true);
var clientId = new MavlinkClientIdentity
Expand Down
39 changes: 31 additions & 8 deletions src/Asv.Mavlink/Microservices/Ftp/Client/Ex/FtpClientEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Asv.Mavlink.V2.Common;
using DynamicData;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
Expand Down Expand Up @@ -211,8 +208,6 @@ public async Task DownloadFile(string filePath, IBufferWriter<byte> bufferToSave
}
}



public async Task DownloadFile(string filePath,Stream streamToSave, IProgress<double>? progress = null, CancellationToken cancel = default)
{
progress ??= new Progress<double>();
Expand All @@ -237,7 +232,6 @@ public async Task DownloadFile(string filePath,Stream streamToSave, IProgress<do
await streamToSave.WriteAsync(mem, cancel).ConfigureAwait(false);
skip += result.ReadCount;
progress.Report((double)skip / file.Size);

}
catch (FtpNackEndOfFileException e)
{
Expand All @@ -250,6 +244,35 @@ public async Task DownloadFile(string filePath,Stream streamToSave, IProgress<do
ArrayPool<byte>.Shared.Return(buffer);
await Base.TerminateSession(file.Session, cancel).ConfigureAwait(false);
}

}
}

public async Task UploadFile(string filePath, Stream streamToUpload, IProgress<double>? progress = null, CancellationToken cancel = default)
{
progress ??= new Progress<double>();
var file = await Base.OpenFileWrite(filePath, cancel).ConfigureAwait(false);
var totalWritten = 0L;
var buffer = ArrayPool<byte>.Shared.Rent(MavlinkFtpHelper.MaxDataSize);

try
{
while (true)
{
var bytesRead = await streamToUpload.ReadAsync(buffer.AsMemory(0, MavlinkFtpHelper.MaxDataSize), cancel).ConfigureAwait(false);
if (bytesRead == 0) break;

var request = new WriteRequest(file.Session, (uint)totalWritten, (byte)bytesRead);
var memory = new Memory<byte>(buffer, 0, bytesRead);

await Base.WriteFile(request, memory, cancel).ConfigureAwait(false);

totalWritten += bytesRead;
progress.Report((double)totalWritten / streamToUpload.Length);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
await Base.TerminateSession(file.Session, cancel).ConfigureAwait(false);
}
}
}
7 changes: 4 additions & 3 deletions src/Asv.Mavlink/Microservices/Ftp/Client/FtpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ public async Task<FileTransferProtocolPacket> WriteFile(WriteRequest request, Me
_logger.ZLogInformation($"{LogSend} {FtpOpcode.WriteFile:G} {request.ToString()}");
var result = await InternalFtpCall(FtpOpcode.WriteFile, p =>
{
p.WriteData(buffer.Span[..request.Take]);
p.WriteSize(request.Take);
p.WriteOffset(request.Skip);
p.WriteSession(request.Session); // Byte 2
p.WriteSize(request.Take); // Byte 4
p.WriteOffset(request.Skip); // Bytes 8-11
p.WriteData(buffer.Span[..request.Take]); // Bytes 12+
}, cancellationToken).ConfigureAwait(false);
_logger
.ZLogInformation($"{LogRecv} {FtpOpcode.WriteFile:G} size: {request.Take}, offset: {request.Skip}, session: {request.Session}");
Expand Down
2 changes: 0 additions & 2 deletions src/Asv.Mavlink/Microservices/Ftp/Server/FtpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,6 @@ private Task InternalFtpReply(FileTransferProtocolPacket req, FtpOpcode replyOpC
p.Payload.TargetComponent = req.ComponentId;
p.Payload.TargetSystem = req.SystemId;
p.Payload.TargetNetwork = req.Payload.TargetNetwork;
var session = req.ReadSession();
p.WriteSession(session);
var originSeq = p.ReadSequenceNumber();
p.WriteSequenceNumber((ushort)((originSeq + 1) % ushort.MaxValue));
p.WriteOpcode(replyOpCode);
Expand Down
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<ProductVersion>3.10.4-dev.5</ProductVersion>
<ProductVersion>3.10.4-dev.7</ProductVersion>
<ProductPrevVersion>3.10.3</ProductPrevVersion>

<AsvCommonVersion>3.0.0-dev.4</AsvCommonVersion>
Expand Down

0 comments on commit bef2b72

Please sign in to comment.