Skip to content

Commit

Permalink
Merge pull request #17 from akpaevj/develop
Browse files Browse the repository at this point in the history
develop branch has been created
  • Loading branch information
akpaevj authored May 18, 2021
2 parents 389bd6c + e21c9db commit 186e217
Show file tree
Hide file tree
Showing 37 changed files with 610 additions and 609 deletions.
162 changes: 82 additions & 80 deletions OneSTools.EventLog.Exporter.Core/ClickHouse/ClickHouseStorage.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
using ClickHouse.Client.ADO;
using ClickHouse.Client.Copy;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using ClickHouse.Client.ADO;
using ClickHouse.Client.Copy;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace OneSTools.EventLog.Exporter.Core.ClickHouse
{
public class ClickHouseStorage : IEventLogStorage
{
private const string TableName = "EventLogItems";
private readonly ILogger<ClickHouseStorage> _logger;
private ClickHouseConnection _connection;
private string _connectionString;
private string _databaseName;
private ClickHouseConnection _connection;

public ClickHouseStorage(string connectionsString, ILogger<ClickHouseStorage> logger = null)
{
Expand All @@ -36,6 +35,81 @@ public ClickHouseStorage(ILogger<ClickHouseStorage> logger, IConfiguration confi
Init();
}

public async Task<EventLogPosition> ReadEventLogPositionAsync(CancellationToken cancellationToken = default)
{
await CreateConnectionAsync(cancellationToken);

var commandText =
$"SELECT TOP 1 FileName, EndPosition, LgfEndPosition, Id FROM {TableName} ORDER BY Id DESC";

await using var cmd = _connection.CreateCommand();
cmd.CommandText = commandText;

await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);

if (await reader.ReadAsync(cancellationToken))
return new EventLogPosition(reader.GetString(0), reader.GetInt64(1), reader.GetInt64(2),
reader.GetInt64(3));
return null;
}

public async Task WriteEventLogDataAsync(List<EventLogItem> entities,
CancellationToken cancellationToken = default)
{
await CreateConnectionAsync(cancellationToken);

using var copy = new ClickHouseBulkCopy(_connection)
{
DestinationTableName = TableName,
BatchSize = entities.Count
};

var data = entities.Select(item => new object[]
{
item.FileName ?? "",
item.EndPosition,
item.LgfEndPosition,
item.Id,
item.DateTime,
item.TransactionStatus ?? "",
item.TransactionDateTime == DateTime.MinValue ? new DateTime(1970, 1, 1) : item.TransactionDateTime,
item.TransactionNumber,
item.UserUuid ?? "",
item.User ?? "",
item.Computer ?? "",
item.Application ?? "",
item.Connection,
item.Event ?? "",
item.Severity ?? "",
item.Comment ?? "",
item.MetadataUuid ?? "",
item.Metadata ?? "",
item.Data ?? "",
item.DataPresentation ?? "",
item.Server ?? "",
item.MainPort,
item.AddPort,
item.Session
}).AsEnumerable();

try
{
await copy.WriteToServerAsync(data, cancellationToken);
}
catch (Exception ex)
{
_logger?.LogError(ex, $"Failed to write data to {_databaseName}");
throw;
}

_logger?.LogDebug($"{entities.Count} items were being written to {_databaseName}");
}

public void Dispose()
{
_connection?.Dispose();
}

private void Init()
{
if (_connectionString == string.Empty)
Expand Down Expand Up @@ -106,77 +180,5 @@ ORDER BY (DateTime, EndPosition)
cmd.CommandText = commandText;
await cmd.ExecuteNonQueryAsync(cancellationToken);
}

public async Task<EventLogPosition> ReadEventLogPositionAsync(CancellationToken cancellationToken = default)
{
await CreateConnectionAsync(cancellationToken);

var commandText = $"SELECT TOP 1 FileName, EndPosition, LgfEndPosition, Id FROM {TableName} ORDER BY Id DESC";

await using var cmd = _connection.CreateCommand();
cmd.CommandText = commandText;

await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);

if (await reader.ReadAsync(cancellationToken))
return new EventLogPosition(reader.GetString(0), reader.GetInt64(1), reader.GetInt64(2), reader.GetInt64(3));
else
return null;
}

public async Task WriteEventLogDataAsync(List<EventLogItem> entities, CancellationToken cancellationToken = default)
{
await CreateConnectionAsync(cancellationToken);

using var copy = new ClickHouseBulkCopy(_connection)
{
DestinationTableName = TableName,
BatchSize = entities.Count
};

var data = entities.Select(item => new object[] {
item.FileName ?? "",
item.EndPosition,
item.LgfEndPosition,
item.Id,
item.DateTime,
item.TransactionStatus ?? "",
item.TransactionDateTime == DateTime.MinValue ? new DateTime(1970, 1, 1) : item.TransactionDateTime,
item.TransactionNumber,
item.UserUuid ?? "",
item.User ?? "",
item.Computer ?? "",
item.Application ?? "",
item.Connection,
item.Event ?? "",
item.Severity ?? "",
item.Comment ?? "",
item.MetadataUuid ?? "",
item.Metadata ?? "",
item.Data ?? "",
item.DataPresentation ?? "",
item.Server ?? "",
item.MainPort,
item.AddPort,
item.Session
}).AsEnumerable();

try
{
await copy.WriteToServerAsync(data, cancellationToken);
}
catch (Exception ex)
{
_logger?.LogError(ex, $"Failed to write data to {_databaseName}");
throw;
}

_logger?.LogDebug($"{entities.Count} items were being written to {_databaseName}");
}

public void Dispose()
{
_connection?.Dispose();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ public enum AuthenticationType
Basic = 1,
ApiKey = 2
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ public override int GetHashCode()
return !(left == right);
}
}
}
}
Loading

0 comments on commit 186e217

Please sign in to comment.