Skip to content

Commit

Permalink
SepWriter: Add NewRow overloads with CancellationToken (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
nietras authored Jan 23, 2025
1 parent 2ec75d9 commit 5708114
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 25 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2089,6 +2089,7 @@ namespace nietras.SeparatedValues
{
public static void CopyTo(this nietras.SeparatedValues.SepReader.Row readerRow, nietras.SeparatedValues.SepWriter.Row writerRow) { }
public static nietras.SeparatedValues.SepWriter.Row NewRow(this nietras.SeparatedValues.SepWriter writer, nietras.SeparatedValues.SepReader.Row rowToCopy) { }
public static nietras.SeparatedValues.SepWriter.Row NewRow(this nietras.SeparatedValues.SepWriter writer, nietras.SeparatedValues.SepReader.Row rowToCopy, System.Threading.CancellationToken cancellationToken) { }
}
public readonly struct SepSpec : System.IEquatable<nietras.SeparatedValues.SepSpec>
{
Expand Down Expand Up @@ -2127,6 +2128,7 @@ namespace nietras.SeparatedValues
public void Flush() { }
public System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken = default) { }
public nietras.SeparatedValues.SepWriter.Row NewRow() { }
public nietras.SeparatedValues.SepWriter.Row NewRow(System.Threading.CancellationToken cancellationToken) { }
public override string ToString() { }
[System.Obsolete(("Types with embedded references are not supported in this version of your compiler" +
"."), true)]
Expand Down
71 changes: 60 additions & 11 deletions src/Sep.Test/SepReaderWriterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace nietras.SeparatedValues.Test;

[TestClass]
public class SepReaderWriterTest
{
readonly Dictionary<string, Action<string>> _assertCopyColumns = new() {
{ nameof(AssertCopyColumnsManual), AssertCopyColumnsManual },
{ nameof(AssertCopyColumnsNewRow), AssertCopyColumnsNewRow },
readonly Dictionary<string, Func<string, ValueTask>> _assertCopyColumns = new() {
{ nameof(AssertCopyColumnsManualSyncAsync), AssertCopyColumnsManualSyncAsync },
{ nameof(AssertCopyColumnsNewRowSyncAsync), AssertCopyColumnsNewRowSyncAsync },
};

// Header only copied if any other rows, this is due to how API is designed
Expand All @@ -27,11 +29,11 @@ public class SepReaderWriterTest
123;456
789;012
")]
public void SepReaderWriterTest_CopyColumnsIfAnyRows(string text) =>
AssertCopyColumns(text);
public async ValueTask SepReaderWriterTest_CopyColumnsIfAnyRows(string text) =>
await AssertCopyColumnsSyncAsync(text);

[TestMethod]
public void SepReaderWriterTest_CopyColumnsIfAnyRows_Long()
public async ValueTask SepReaderWriterTest_CopyColumnsIfAnyRows_Long()
{
#if SEPREADERTRACE // Don't run really long with tracing enabled 😅
var lengths = new[] { 32, 64, 128, 512, 1024 };
Expand All @@ -51,7 +53,7 @@ public void SepReaderWriterTest_CopyColumnsIfAnyRows_Long()
++i;
}
var text = sb.ToString();
AssertCopyColumns(text);
await AssertCopyColumnsSyncAsync(text);
}
}

Expand Down Expand Up @@ -129,16 +131,22 @@ public void SepReaderWriterTest_CopySingleEmptyColumn(int rowCountWithHeader)
}
}

void AssertCopyColumns(string text)
async ValueTask AssertCopyColumnsSyncAsync(string text)
{
ArgumentNullException.ThrowIfNull(text);
foreach (var assertCopyColumns in _assertCopyColumns)
{
assertCopyColumns.Value(text);
await assertCopyColumns.Value(text);
}
}

static void AssertCopyColumnsManual(string text)
static async ValueTask AssertCopyColumnsManualSyncAsync(string text)
{
AssertCopyColumnsNewRowSync(text);
await AssertCopyColumnsNewRowAsync(text);
}

static void AssertCopyColumnsManualSync(string text)
{
// Act
using var reader = Sep.Reader().FromText(text);
Expand All @@ -156,7 +164,32 @@ static void AssertCopyColumnsManual(string text)
AreEqual(text, actual);
}

static void AssertCopyColumnsNewRow(string text)
static async ValueTask AssertCopyColumnsManualAsync(string text)
{
// Act
using var reader = await Sep.Reader().FromTextAsync(text);
await using var writer = reader.Spec.Writer().ToText();
var colNames = reader.Header.ColNames.ToArray();
await foreach (var readRow in reader)
{
var readCols = readRow[colNames];

await using var writeRow = writer.NewRow();
writeRow[colNames].Set(readCols);
}
await writer.FlushAsync();
// Assert
var actual = writer.ToString();
AreEqual(text, actual);
}

static async ValueTask AssertCopyColumnsNewRowSyncAsync(string text)
{
AssertCopyColumnsNewRowSync(text);
await AssertCopyColumnsNewRowAsync(text);
}

static void AssertCopyColumnsNewRowSync(string text)
{
// Act
using var reader = Sep.Reader().FromText(text);
Expand All @@ -170,6 +203,22 @@ static void AssertCopyColumnsNewRow(string text)
AreEqual(text, actual);
}

static async ValueTask AssertCopyColumnsNewRowAsync(string text)
{
// Act
using var reader = await Sep.Reader().FromTextAsync(text);
await using var writer = reader.Spec.Writer().ToText();
var cts = new CancellationTokenSource();
await foreach (var readRow in reader)
{
await using var writeRow = writer.NewRow(readRow, cts.Token);
}
await writer.FlushAsync(cts.Token);
// Assert
var actual = writer.ToString();
AreEqual(text, actual);
}

static void AreEqual(string text, string actual)
{
if (text != actual)
Expand Down
4 changes: 3 additions & 1 deletion src/Sep.Test/SepWriterColTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

Expand Down Expand Up @@ -221,7 +222,8 @@ static async ValueTask Run(SepWriter.ColAction action, string? expectedColValue
{
await using var writer = createWriter();
{
await using var row = writer.NewRow();
var cts = new CancellationTokenSource();
await using var row = writer.NewRow(cts.Token);
action(row[ColName]);
}
AssertCol(expectedColValue, writer);
Expand Down
11 changes: 11 additions & 0 deletions src/Sep.Test/SepWriterOptionsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,15 @@ public void SepWriterOptionsTest_Defaults()
Assert.IsFalse(sut.Escape);
Assert.IsFalse(sut.AsyncContinueOnCapturedContext);
}

[TestMethod]
public void SepWriterOptionsTest_Override()
{
// Assert any not otherwise tested
var sut = new SepWriterOptions()
{
AsyncContinueOnCapturedContext = true,
};
Assert.IsTrue(sut.AsyncContinueOnCapturedContext);
}
}
17 changes: 16 additions & 1 deletion src/Sep.Test/SepWriterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

Expand Down Expand Up @@ -124,11 +125,25 @@ public async ValueTask SepWriterTest_EndRowWithoutNewRow_Throws()
{
await using var writer = CreateWriter();
var e = await Assert.ThrowsExceptionAsync<InvalidOperationException>(
async () => await writer.EndRowAsync(default));
async () => await writer.EndRowAsync());
Assert.AreEqual(expected, e.Message);
}
}

[TestMethod]
public void SepWriterTest_NewRowWithCancellationToken_Dispose_Throws()
{
var expected = "'NewRow()' called with 'CancellationToken', if async use was intented, " +
"be sure to dispose this asynchronously with 'await' like " +
"'await using var row = writer.NewRow(cancellationToken);'";

using var writer = Sep.New(';').Writer().ToText();
var cts = new CancellationTokenSource();
var e = Assert.ThrowsException<InvalidOperationException>(
() => { using (writer.NewRow(cts.Token)) { } });
Assert.AreEqual(expected, e.Message);
}

[TestMethod]
public void SepWriterTest_ColWrittenNotDefinedInFirstRow_Throws()
{
Expand Down
10 changes: 10 additions & 0 deletions src/Sep/Internals/SepThrow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using static nietras.SeparatedValues.SepWriter;

namespace nietras.SeparatedValues;
Expand Down Expand Up @@ -81,6 +82,15 @@ internal static void InvalidOperationException_WriterDoesNotHaveActiveRow()
$"I.e. prefer 'using var row = writer.NewRow();'");
}

[DoesNotReturn]
internal static void InvalidOperationException_SyncEndRowCalledWhenValidCancellationToken()
{
throw new InvalidOperationException(
$"'{nameof(SepWriter.NewRow)}()' called with '{nameof(CancellationToken)}'" +
$", if async use was intented, be sure to dispose this asynchronously with 'await' like " +
$"'await using var row = writer.NewRow(cancellationToken);'");
}

[DoesNotReturn]
internal static void InvalidOperationException_NotAllExpectedColsSet(List<ColImpl> cols, string[] colNamesHeader)
{
Expand Down
9 changes: 9 additions & 0 deletions src/Sep/SepReaderWriterExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics.Contracts;
using System.Threading;

namespace nietras.SeparatedValues;

Expand All @@ -12,6 +13,14 @@ public static SepWriter.Row NewRow(this SepWriter writer, SepReader.Row rowToCop
return row;
}

public static SepWriter.Row NewRow(this SepWriter writer, SepReader.Row rowToCopy, CancellationToken cancellationToken)
{
Contract.Assume(writer is not null);
var row = writer.NewRow(cancellationToken);
rowToCopy.CopyTo(row);
return row;
}

public static void CopyTo(this SepReader.Row readerRow, SepWriter.Row writerRow)
{
var colNames = readerRow._state._header.ColNames;
Expand Down
11 changes: 8 additions & 3 deletions src/Sep/SepWriter.IO.Async.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//#define SYNC
using System;
using System.Runtime.CompilerServices;
#if !SYNC
using System.Threading;
#if !SYNC
using System.Threading.Tasks;
#endif

Expand All @@ -20,11 +20,15 @@ public Task FlushAsync(CancellationToken cancellationToken = default) =>
#if SYNC
internal void EndRow()
#else
internal async ValueTask EndRowAsync(CancellationToken cancellationToken)
internal async ValueTask EndRowAsync()
#endif
{
if (!_newRowActive) { SepThrow.InvalidOperationException_WriterDoesNotHaveActiveRow(); }

#if SYNC
if (_newRowCancellationToken != CancellationToken.None) { SepThrow.InvalidOperationException_SyncEndRowCalledWhenValidCancellationToken(); }
#else
var cancellationToken = _newRowCancellationToken;
#endif
var cols = _cols;

// Header
Expand Down Expand Up @@ -107,6 +111,7 @@ await _writer.WriteLineAsync()
_headerOrFirstRowColCount = cols.Count;
}
_newRowActive = false;
_newRowCancellationToken = CancellationToken.None;
}

#if SYNC
Expand Down
11 changes: 8 additions & 3 deletions src/Sep/SepWriter.IO.Sync.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#define SYNC
using System;
using System.Runtime.CompilerServices;
#if !SYNC
using System.Threading;
#if !SYNC
using System.Threading.Tasks;
#endif

Expand All @@ -20,11 +20,15 @@ public Task FlushAsync(CancellationToken cancellationToken = default) =>
#if SYNC
internal void EndRow()
#else
internal async ValueTask EndRowAsync(CancellationToken cancellationToken)
internal async ValueTask EndRowAsync()
#endif
{
if (!_newRowActive) { SepThrow.InvalidOperationException_WriterDoesNotHaveActiveRow(); }

#if SYNC
if (_newRowCancellationToken != CancellationToken.None) { SepThrow.InvalidOperationException_SyncEndRowCalledWhenValidCancellationToken(); }
#else
var cancellationToken = _newRowCancellationToken;
#endif
var cols = _cols;

// Header
Expand Down Expand Up @@ -107,6 +111,7 @@ await _writer.WriteLineAsync()
_headerOrFirstRowColCount = cols.Count;
}
_newRowActive = false;
_newRowCancellationToken = CancellationToken.None;
}

#if SYNC
Expand Down
2 changes: 1 addition & 1 deletion src/Sep/SepWriter.Row.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public ValueTask DisposeAsync()
{
if (_writer is not null)
{
return _writer.EndRowAsync(default);
return _writer.EndRowAsync();
}
_writer = null;
return ValueTask.CompletedTask;
Expand Down
26 changes: 21 additions & 5 deletions src/Sep/SepWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Globalization;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace nietras.SeparatedValues;
Expand Down Expand Up @@ -35,6 +36,7 @@ public sealed partial class SepWriter : IDisposable
internal bool _headerWrittenOrSkipped = false;
internal int _headerOrFirstRowColCount = -1;
bool _newRowActive = false;
CancellationToken _newRowCancellationToken = CancellationToken.None;
int _cacheIndex = 0;

internal SepWriter(SepWriterOptions options, TextWriter writer, ISepTextWriterDisposer textWriterDisposer)
Expand All @@ -56,11 +58,14 @@ internal SepWriter(SepWriterOptions options, TextWriter writer, ISepTextWriterDi

public Row NewRow()
{
if (_newRowActive) { SepThrow.InvalidOperationException_WriterAlreadyHasActiveRow(); }
_newRowActive = true;
_cacheIndex = 0;
_arrayPool.Reset();
foreach (var col in _cols) { col.Clear(); }
PrepareNewRow();
return new(this);
}

public Row NewRow(CancellationToken cancellationToken)
{
PrepareNewRow();
_newRowCancellationToken = cancellationToken;
return new(this);
}

Expand All @@ -74,6 +79,17 @@ public override string ToString()
return null;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
void PrepareNewRow()
{
if (_newRowActive) { SepThrow.InvalidOperationException_WriterAlreadyHasActiveRow(); }
_newRowActive = true;
A.Assert(_newRowCancellationToken == CancellationToken.None);
_cacheIndex = 0;
_arrayPool.Reset();
foreach (var col in _cols) { col.Clear(); }
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static uint ContainsSpecialCharacters(ReadOnlySpan<char> span, char separator)
{
Expand Down

0 comments on commit 5708114

Please sign in to comment.