From fa16d6c2c3d281f91e9a8d087f2ad1e465df2145 Mon Sep 17 00:00:00 2001 From: RobIII Date: Fri, 26 Apr 2024 16:29:35 +0200 Subject: [PATCH 1/3] Cleanup & refactor --- .../ConfigurationBuilderExtensions.cs | 4 +- .../Contrib.KafkaFlow.Outbox.Postgres.csproj | 2 +- .../PostgresOutboxBackend.cs | 100 ------------ .../PostgresOutboxRepository.cs | 50 ++++++ .../schema/0002.Table.sql | 2 +- .../ConfigurationBuilderExtensions.cs | 4 +- .../Contrib.KafkaFlow.Outbox.SqlServer.csproj | 1 - .../SqlServerOutboxBackend.cs | 94 ------------ .../SqlServerOutboxRepository.cs | 50 ++++++ .../ConfigurationExtensions.cs | 2 - .../Contrib.KafkaFlow.Outbox.csproj | 1 + .../IOutboxBackend.cs | 2 - .../IOutboxDispatcher.cs | 3 + .../IOutboxProducerConfigurationBuilder.cs | 61 ++++++++ .../IOutboxRepository.cs | 7 + src/Contrib.KafkaFlow.Outbox/OutboxBackend.cs | 61 ++++++++ .../OutboxDispatcherService.cs | 2 +- .../OutboxProviderConfigurationBuilder.cs | 58 ------- src/Contrib.KafkaFlow.Outbox/OutboxRecord.cs | 5 + .../OutboxTableRow.cs | 7 + .../ConfigurationBuilderExtensions.cs | 5 +- ....KafkaFlow.ProcessManagers.Postgres.csproj | 2 +- .../PostgresProcessManagersStore.cs | 94 ------------ .../PostgresProcessStateRepository.cs | 63 ++++++++ .../ConfigurationBuilderExtensions.cs | 5 +- ...KafkaFlow.ProcessManagers.SqlServer.csproj | 1 - .../SqlServerProcessManagersStore.cs | 55 ++----- .../Contrib.KafkaFlow.ProcessManagers.csproj | 2 +- .../HandlerTypeMapping.cs | 8 + .../IProcessManager.cs | 6 + .../IProcessStateRepository.cs | 8 + .../IProcessStateStore.cs | 18 --- .../OptimisticConcurrencyException.cs | 19 +++ .../ProcessManager.cs | 32 ++-- .../ProcessManagerConfiguration.cs | 29 ---- .../ProcessManagerMiddleware.cs | 1 - .../ProcessManagersStore.cs | 43 ++++++ .../ProcessStateTableRow.cs | 3 + .../TransactionMode.cs | 22 +++ .../Fixture/IKafkaFlowFixture.cs | 9 ++ .../Fixture/KafkaFlowFixture.cs | 143 ++++++++++++++++++ .../Fixture/PostgresKafkaFlowFixture.cs | 119 +-------------- .../Fixture/SqlServerKafkaFlowFixture.cs | 123 +-------------- ...ow.ProcessManagers.IntegrationTests.csproj | 2 +- .../PostgresProcessManagerStoreTests.cs | 35 +---- .../ProcessManagerStoreTests.cs | 44 ++++++ .../SqlServerProcessManagerStoreTests.cs | 39 +---- ...ostgresUserLifecycleProcessManagerTests.cs | 33 +--- .../SqlServerLifecycleProcessManagerTests.cs | 35 +---- .../UserLifecycleProcessManagerTests.cs | 36 +++++ 50 files changed, 717 insertions(+), 833 deletions(-) delete mode 100644 src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxBackend.cs create mode 100644 src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxRepository.cs delete mode 100644 src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs create mode 100644 src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxRepository.cs create mode 100644 src/Contrib.KafkaFlow.Outbox/IOutboxDispatcher.cs create mode 100644 src/Contrib.KafkaFlow.Outbox/IOutboxProducerConfigurationBuilder.cs create mode 100644 src/Contrib.KafkaFlow.Outbox/IOutboxRepository.cs create mode 100644 src/Contrib.KafkaFlow.Outbox/OutboxBackend.cs create mode 100644 src/Contrib.KafkaFlow.Outbox/OutboxRecord.cs create mode 100644 src/Contrib.KafkaFlow.Outbox/OutboxTableRow.cs delete mode 100644 src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessManagersStore.cs create mode 100644 src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessStateRepository.cs create mode 100644 src/Contrib.KafkaFlow.ProcessManagers/HandlerTypeMapping.cs create mode 100644 src/Contrib.KafkaFlow.ProcessManagers/IProcessManager.cs create mode 100644 src/Contrib.KafkaFlow.ProcessManagers/IProcessStateRepository.cs create mode 100644 src/Contrib.KafkaFlow.ProcessManagers/OptimisticConcurrencyException.cs create mode 100644 src/Contrib.KafkaFlow.ProcessManagers/ProcessManagersStore.cs create mode 100644 src/Contrib.KafkaFlow.ProcessManagers/ProcessStateTableRow.cs create mode 100644 src/Contrib.KafkaFlow.ProcessManagers/TransactionMode.cs create mode 100644 tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/IKafkaFlowFixture.cs create mode 100644 tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/KafkaFlowFixture.cs create mode 100644 tests/KafkaFlow.ProcessManagers.IntegrationTests/ProcessManagerStoreTests.cs create mode 100644 tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifecycleProcessManagerTests.cs diff --git a/src/Contrib.KafkaFlow.Outbox.Postgres/ConfigurationBuilderExtensions.cs b/src/Contrib.KafkaFlow.Outbox.Postgres/ConfigurationBuilderExtensions.cs index b87c29c..c3436b7 100644 --- a/src/Contrib.KafkaFlow.Outbox.Postgres/ConfigurationBuilderExtensions.cs +++ b/src/Contrib.KafkaFlow.Outbox.Postgres/ConfigurationBuilderExtensions.cs @@ -5,5 +5,7 @@ namespace KafkaFlow.Outbox.Postgres; public static class ConfigurationBuilderExtensions { public static IServiceCollection AddPostgresOutboxBackend(this IServiceCollection services) => - services.AddSingleton(); + services + .AddSingleton() + .AddSingleton(); } diff --git a/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj b/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj index 7535b86..001931c 100644 --- a/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj +++ b/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj @@ -1,4 +1,4 @@ - + netstandard2.0 diff --git a/src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxBackend.cs b/src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxBackend.cs deleted file mode 100644 index 7b76f15..0000000 --- a/src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxBackend.cs +++ /dev/null @@ -1,100 +0,0 @@ -using Confluent.Kafka; -using Dapper; -using Npgsql; -using System.Text.Json; - -namespace KafkaFlow.Outbox.Postgres; - -public class PostgresOutboxBackend(NpgsqlDataSource connectionPool) : IOutboxBackend -{ - private readonly NpgsqlDataSource _connectionPool = connectionPool; - - public async ValueTask Store(TopicPartition topicPartition, Message message, CancellationToken token = default) - { - var sql = """ - INSERT INTO outbox.outbox(topic_name, partition, message_key, message_headers, message_body) - VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body) - """; - - await using var conn = _connectionPool.CreateConnection(); - - var rawHeaders = - message.Headers == null - ? null - : JsonSerializer.Serialize(message.Headers.ToDictionary(x => x.Key, x => x.GetValueBytes())); - - var res = await conn.ExecuteAsync(sql, new - { - topic_name = topicPartition.Topic, - partition = topicPartition.Partition.IsSpecial ? null : (int?)topicPartition.Partition.Value, - message_key = message.Key, - message_headers = rawHeaders, - message_body = message.Value - }).ConfigureAwait(false); - - } - - public async ValueTask Read(int batchSize, CancellationToken token = default) - { - var sql = """ - DELETE FROM outbox.outbox - WHERE - sequence_id = ANY(ARRAY( - SELECT sequence_id FROM outbox.outbox - ORDER BY sequence_id - LIMIT @batch_size - FOR UPDATE - )) - RETURNING - sequence_id, - topic_name, - partition, - message_key, - message_headers, - message_body - """; - await using var conn = _connectionPool.CreateConnection(); - var result = await conn.QueryAsync(sql, new { batch_size = batchSize }); - - return result?.Select(ToOutboxRecord).ToArray() ?? []; - } - - private static OutboxRecord ToOutboxRecord(OutboxTableRow row) - { - var partition = row.partition.HasValue ? new Partition(row.partition.Value) : Partition.Any; - var topicPartition = new TopicPartition(row.topic_name, partition); - - var storedHeaders = - row.message_headers == null - ? null - : JsonSerializer.Deserialize>(row.message_headers); - - var headers = - storedHeaders?.Aggregate(new Headers(), (h, x) => - { - h.Add(x.Key, x.Value); - return h; - }); - - var msg = new Message - { - Key = row.message_key!, - Value = row.message_body!, - Headers = headers - }; - - return new OutboxRecord(topicPartition, msg); - } -} - -internal sealed class OutboxTableRow -{ -#pragma warning disable IDE1006 // Naming Styles - public long sequence_id { get; set; } - public string topic_name { get; set; } = null!; - public int? partition { get; set; } - public byte[]? message_key { get; set; } - public string? message_headers { get; set; } - public byte[]? message_body { get; set; } -#pragma warning restore IDE1006 // Naming Styles -} diff --git a/src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxRepository.cs b/src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxRepository.cs new file mode 100644 index 0000000..2fd1f78 --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxRepository.cs @@ -0,0 +1,50 @@ +using Dapper; +using Npgsql; + +namespace KafkaFlow.Outbox.Postgres; + +public class PostgresOutboxRepository(NpgsqlDataSource connectionPool) : IOutboxRepository +{ + private readonly NpgsqlDataSource _connectionPool = connectionPool; + + public async ValueTask Store(OutboxTableRow outboxTableRow, CancellationToken token = default) + { + var sql = """ + INSERT INTO outbox.outbox(topic_name, partition, message_key, message_headers, message_body) + VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body) + """; + + await using var conn = _connectionPool.CreateConnection(); + await conn.ExecuteAsync(sql, new + { + topic_name = outboxTableRow.TopicName, + partition = outboxTableRow.Partition, + message_key = outboxTableRow.MessageKey, + message_headers = outboxTableRow.MessageHeaders, + message_body = outboxTableRow.MessageBody + }).ConfigureAwait(false); + } + + public async Task> Read(int batchSize, CancellationToken token = default) + { + var sql = """ + DELETE FROM outbox.outbox + WHERE + sequence_id = ANY(ARRAY( + SELECT sequence_id FROM outbox.outbox + ORDER BY sequence_id + LIMIT @batch_size + FOR UPDATE + )) + RETURNING + sequence_id as "SequenceId", + topic_name as "TopicName", + partition as "Partition", + message_key as "MessageKey", + message_headers as "MessageHeaders", + message_body as "MessageBody" + """; + await using var conn = _connectionPool.CreateConnection(); + return await conn.QueryAsync(sql, new { batch_size = batchSize }).ConfigureAwait(false); + } +} diff --git a/src/Contrib.KafkaFlow.Outbox.Postgres/schema/0002.Table.sql b/src/Contrib.KafkaFlow.Outbox.Postgres/schema/0002.Table.sql index dc72b7a..a050402 100644 --- a/src/Contrib.KafkaFlow.Outbox.Postgres/schema/0002.Table.sql +++ b/src/Contrib.KafkaFlow.Outbox.Postgres/schema/0002.Table.sql @@ -1,7 +1,7 @@ /* Table */ CREATE TABLE "outbox"."outbox" ( - sequence_id SERIAL NOT NULL, + sequence_id SERIAL NOT NULL, topic_name VARCHAR(255) NOT NULL, partition INT NULL, message_key BYTEA NULL, diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/ConfigurationBuilderExtensions.cs b/src/Contrib.KafkaFlow.Outbox.SqlServer/ConfigurationBuilderExtensions.cs index 257bdec..78dd1ed 100644 --- a/src/Contrib.KafkaFlow.Outbox.SqlServer/ConfigurationBuilderExtensions.cs +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/ConfigurationBuilderExtensions.cs @@ -7,7 +7,9 @@ namespace KafkaFlow.Outbox.SqlServer; public static class ConfigurationBuilderExtensions { public static IServiceCollection AddSqlServerOutboxBackend(this IServiceCollection services) => - services.AddSingleton(); + services + .AddSingleton() + .AddSingleton(); public static IServiceCollection AddSqlServerOutboxBackend(this IServiceCollection services, string connectionString) { diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj b/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj index 83daef0..545b604 100644 --- a/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj @@ -17,7 +17,6 @@ - diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs b/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs deleted file mode 100644 index 76126ec..0000000 --- a/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs +++ /dev/null @@ -1,94 +0,0 @@ -using Confluent.Kafka; -using Dapper; -using KafkaFlow.SqlServer; -using Microsoft.Extensions.Options; -using System.Data.SqlClient; -using System.Text.Json; - -namespace KafkaFlow.Outbox.SqlServer; - -public class SqlServerOutboxBackend(IOptions options) : IOutboxBackend -{ - private readonly SqlServerBackendOptions _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); - - public async ValueTask Store(TopicPartition topicPartition, Message message, CancellationToken token = default) - { - var sql = """ - INSERT INTO [outbox].[outbox] ([topic_name], [partition], [message_key], [message_headers], [message_body]) - VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body); - """; - - using var conn = new SqlConnection(_options.ConnectionString); - - var rawHeaders = - message.Headers == null - ? null - : JsonSerializer.Serialize(message.Headers.ToDictionary(x => x.Key, x => x.GetValueBytes())); - - var res = await conn.ExecuteAsync(sql, new - { - topic_name = topicPartition.Topic, - partition = topicPartition.Partition.IsSpecial ? null : (int?)topicPartition.Partition.Value, - message_key = message.Key, - message_headers = rawHeaders, - message_body = message.Value - }).ConfigureAwait(false); - - } - - public async ValueTask Read(int batchSize, CancellationToken token = default) - { - var sql = """ - DELETE FROM [outbox].[outbox] - OUTPUT DELETED.* - WHERE - [sequence_id] IN ( - SELECT TOP (@batch_size) [sequence_id] FROM [outbox].[outbox] - ORDER BY [sequence_id] - ); - """; - using var conn = new SqlConnection(_options.ConnectionString); - var result = await conn.QueryAsync(sql, new { batch_size = batchSize }); - - return result?.Select(ToOutboxRecord).ToArray() ?? []; - } - - private static OutboxRecord ToOutboxRecord(OutboxTableRow row) - { - var partition = row.partition.HasValue ? new Partition(row.partition.Value) : Partition.Any; - var topicPartition = new TopicPartition(row.topic_name, partition); - - var storedHeaders = - row.message_headers == null - ? null - : JsonSerializer.Deserialize>(row.message_headers); - - var headers = - storedHeaders?.Aggregate(new Headers(), (h, x) => - { - h.Add(x.Key, x.Value); - return h; - }); - - var msg = new Message - { - Key = row.message_key!, - Value = row.message_body!, - Headers = headers - }; - - return new OutboxRecord(topicPartition, msg); - } -} - -internal sealed class OutboxTableRow -{ -#pragma warning disable IDE1006 // Naming Styles - public long sequence_id { get; set; } - public string topic_name { get; set; } = null!; - public int? partition { get; set; } - public byte[]? message_key { get; set; } - public string? message_headers { get; set; } - public byte[]? message_body { get; set; } -#pragma warning restore IDE1006 // Naming Styles -} diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxRepository.cs b/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxRepository.cs new file mode 100644 index 0000000..3275490 --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxRepository.cs @@ -0,0 +1,50 @@ +using Dapper; +using KafkaFlow.SqlServer; +using Microsoft.Extensions.Options; +using System.Data.SqlClient; + +namespace KafkaFlow.Outbox.SqlServer; + +public class SqlServerOutboxRepository(IOptions options) : IOutboxRepository +{ + private readonly SqlServerBackendOptions _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + + public async ValueTask Store(OutboxTableRow outboxTableRow, CancellationToken token = default) + { + var sql = """ + INSERT INTO [outbox].[outbox] ([topic_name], [partition], [message_key], [message_headers], [message_body]) + VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body); + """; + + using var conn = new SqlConnection(_options.ConnectionString); + await conn.ExecuteAsync(sql, new + { + topic_name = outboxTableRow.TopicName, + partition = outboxTableRow.Partition, + message_key = outboxTableRow.MessageKey, + message_headers = outboxTableRow.MessageHeaders, + message_body = outboxTableRow.MessageBody + }).ConfigureAwait(false); + } + + public async Task> Read(int batchSize, CancellationToken token = default) + { + var sql = """ + DELETE FROM [outbox].[outbox] + OUTPUT DELETED.sequence_id as SequenceId, + DELETED.topic_name as TopicName, + DELETED.partition as Partition, + DELETED.message_key as MessageKey, + DELETED.message_headers as MessageHeaders, + DELETED.message_body as MessageBody + WHERE + [sequence_id] IN ( + SELECT TOP (@batch_size) [sequence_id] FROM [outbox].[outbox] + ORDER BY [sequence_id] + ); + """; + + using var conn = new SqlConnection(_options.ConnectionString); + return await conn.QueryAsync(sql, new { batch_size = batchSize }).ConfigureAwait(false); + } +} diff --git a/src/Contrib.KafkaFlow.Outbox/ConfigurationExtensions.cs b/src/Contrib.KafkaFlow.Outbox/ConfigurationExtensions.cs index e17064c..afbda42 100644 --- a/src/Contrib.KafkaFlow.Outbox/ConfigurationExtensions.cs +++ b/src/Contrib.KafkaFlow.Outbox/ConfigurationExtensions.cs @@ -3,8 +3,6 @@ namespace KafkaFlow.Outbox; -public interface IOutboxDispatcher {} - public static class ConfigurationExtensions { public static IServiceCollection AddOutboxBackend(this IServiceCollection services, Func factory) => diff --git a/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj b/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj index f22646c..c23350d 100644 --- a/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj +++ b/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj @@ -20,6 +20,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/src/Contrib.KafkaFlow.Outbox/IOutboxBackend.cs b/src/Contrib.KafkaFlow.Outbox/IOutboxBackend.cs index f0e6246..40e0adb 100644 --- a/src/Contrib.KafkaFlow.Outbox/IOutboxBackend.cs +++ b/src/Contrib.KafkaFlow.Outbox/IOutboxBackend.cs @@ -1,8 +1,6 @@ using Confluent.Kafka; namespace KafkaFlow.Outbox; - -public sealed record OutboxRecord(TopicPartition TopicPartition, Message Message); public interface IOutboxBackend { ValueTask Store(TopicPartition topicPartition, Message message, CancellationToken token2 = default); diff --git a/src/Contrib.KafkaFlow.Outbox/IOutboxDispatcher.cs b/src/Contrib.KafkaFlow.Outbox/IOutboxDispatcher.cs new file mode 100644 index 0000000..b86080f --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox/IOutboxDispatcher.cs @@ -0,0 +1,3 @@ +namespace KafkaFlow.Outbox; + +public interface IOutboxDispatcher {} diff --git a/src/Contrib.KafkaFlow.Outbox/IOutboxProducerConfigurationBuilder.cs b/src/Contrib.KafkaFlow.Outbox/IOutboxProducerConfigurationBuilder.cs new file mode 100644 index 0000000..566518d --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox/IOutboxProducerConfigurationBuilder.cs @@ -0,0 +1,61 @@ +using Confluent.Kafka; + +namespace KafkaFlow.Outbox; + +public interface IOutboxProducerConfigurationBuilder +{ + IDependencyConfigurator DependencyConfigurator { get; } + + /// + /// Sets the to be used when producing messages + /// + /// The enum value + /// + IOutboxProducerConfigurationBuilder WithAcks(Acks acks); + + /// + /// Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches to transmit to brokers. + /// A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency. + /// default: 0.5 (500 microseconds) + /// importance: high + /// + /// The time in milliseconds to wait to build the message batch + /// + IOutboxProducerConfigurationBuilder WithLingerMs(double lingerMs); + + /// + /// Adds a handler for the Kafka producer statistics + /// + /// A handler for the statistics + /// + IOutboxProducerConfigurationBuilder WithStatisticsHandler(Action statisticsHandler); + + /// + /// Sets the interval the statistics are emitted + /// + /// The interval in miliseconds + /// + IOutboxProducerConfigurationBuilder WithStatisticsIntervalMs(int statisticsIntervalMs); + + /// + /// Sets the partitioner + /// + /// The partitioner to use + /// + IOutboxProducerConfigurationBuilder WithPartitioner(Partitioner partitioner); + + /// + /// Sets compression configurations in the producer + /// + /// + /// enum to select the compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. + /// default: none + /// importance: medium + /// + /// Compression level parameter for algorithm selected by enum. Higher values will result in better compression at the cost of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; -1 = codec-dependent default compression level. + /// default: -1 + /// importance: medium + /// + /// + IOutboxProducerConfigurationBuilder WithCompression(CompressionType compressionType, int? compressionLevel); +} diff --git a/src/Contrib.KafkaFlow.Outbox/IOutboxRepository.cs b/src/Contrib.KafkaFlow.Outbox/IOutboxRepository.cs new file mode 100644 index 0000000..988b2e5 --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox/IOutboxRepository.cs @@ -0,0 +1,7 @@ +namespace KafkaFlow.Outbox; + +public interface IOutboxRepository +{ + ValueTask Store(OutboxTableRow outboxTableRow, CancellationToken token = default); + Task> Read(int batchSize, CancellationToken token = default); +} diff --git a/src/Contrib.KafkaFlow.Outbox/OutboxBackend.cs b/src/Contrib.KafkaFlow.Outbox/OutboxBackend.cs new file mode 100644 index 0000000..aaf1489 --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox/OutboxBackend.cs @@ -0,0 +1,61 @@ +using Confluent.Kafka; +using System.Text.Json; + +namespace KafkaFlow.Outbox; + +public class OutboxBackend(IOutboxRepository outboxRepository) : IOutboxBackend +{ + private readonly IOutboxRepository _outboxRepository = outboxRepository; + + public async ValueTask Store(TopicPartition topicPartition, Message message, CancellationToken token = default) + { + var rawHeaders = + message.Headers == null + ? null + : JsonSerializer.Serialize(message.Headers.ToDictionary(x => x.Key, x => x.GetValueBytes())); + + await _outboxRepository.Store( + new OutboxTableRow + ( + topicPartition.Topic, + topicPartition.Partition.IsSpecial ? null : topicPartition.Partition.Value, + message.Key, + rawHeaders, + message.Value + ), token + ).ConfigureAwait(false); + } + + public async ValueTask Read(int batchSize, CancellationToken token = default) + { + var result = await _outboxRepository.Read(batchSize, token).ConfigureAwait(false); + return result?.Select(ToOutboxRecord).ToArray() ?? []; + } + + private static OutboxRecord ToOutboxRecord(OutboxTableRow row) + { + var partition = row.Partition.HasValue ? new Partition(row.Partition.Value) : Partition.Any; + var topicPartition = new TopicPartition(row.TopicName, partition); + + var storedHeaders = + row.MessageHeaders == null + ? null + : JsonSerializer.Deserialize>(row.MessageHeaders); + + var headers = + storedHeaders?.Aggregate(new Headers(), (h, x) => + { + h.Add(x.Key, x.Value); + return h; + }); + + var msg = new Message + { + Key = row.MessageKey!, + Value = row.MessageBody!, + Headers = headers + }; + + return new OutboxRecord(topicPartition, msg); + } +} diff --git a/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs b/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs index 23208bf..0507532 100644 --- a/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs +++ b/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs @@ -26,7 +26,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken); } } - _logger.LogInformation("Outbox dispatcher service has ыещззув"); + _logger.LogInformation("Outbox dispatcher service has stopped"); } private async Task DispatchNextBatchAsync(CancellationToken stoppingToken) diff --git a/src/Contrib.KafkaFlow.Outbox/OutboxProviderConfigurationBuilder.cs b/src/Contrib.KafkaFlow.Outbox/OutboxProviderConfigurationBuilder.cs index 037b603..6e0ca76 100644 --- a/src/Contrib.KafkaFlow.Outbox/OutboxProviderConfigurationBuilder.cs +++ b/src/Contrib.KafkaFlow.Outbox/OutboxProviderConfigurationBuilder.cs @@ -3,64 +3,6 @@ namespace KafkaFlow.Outbox; -public interface IOutboxProducerConfigurationBuilder -{ - IDependencyConfigurator DependencyConfigurator { get; } - - /// - /// Sets the to be used when producing messages - /// - /// The enum value - /// - IOutboxProducerConfigurationBuilder WithAcks(Acks acks); - - /// - /// Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches to transmit to brokers. - /// A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency. - /// default: 0.5 (500 microseconds) - /// importance: high - /// - /// The time in milliseconds to wait to build the message batch - /// - IOutboxProducerConfigurationBuilder WithLingerMs(double lingerMs); - - /// - /// Adds a handler for the Kafka producer statistics - /// - /// A handler for the statistics - /// - IOutboxProducerConfigurationBuilder WithStatisticsHandler(Action statisticsHandler); - - /// - /// Sets the interval the statistics are emitted - /// - /// The interval in miliseconds - /// - IOutboxProducerConfigurationBuilder WithStatisticsIntervalMs(int statisticsIntervalMs); - - /// - /// Sets the partitioner - /// - /// The partitioner to use - /// - IOutboxProducerConfigurationBuilder WithPartitioner(Partitioner partitioner); - - /// - /// Sets compression configurations in the producer - /// - /// - /// enum to select the compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`. - /// default: none - /// importance: medium - /// - /// Compression level parameter for algorithm selected by enum. Higher values will result in better compression at the cost of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; -1 = codec-dependent default compression level. - /// default: -1 - /// importance: medium - /// - /// - IOutboxProducerConfigurationBuilder WithCompression(CompressionType compressionType, int? compressionLevel); -} - internal sealed class OutboxProducerConfigurationBuilder(IProducerConfigurationBuilder builder) : IOutboxProducerConfigurationBuilder { private readonly IProducerConfigurationBuilder _builder = builder ?? throw new ArgumentNullException(nameof(builder)); diff --git a/src/Contrib.KafkaFlow.Outbox/OutboxRecord.cs b/src/Contrib.KafkaFlow.Outbox/OutboxRecord.cs new file mode 100644 index 0000000..27c2662 --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox/OutboxRecord.cs @@ -0,0 +1,5 @@ +using Confluent.Kafka; + +namespace KafkaFlow.Outbox; + +public sealed record OutboxRecord(TopicPartition TopicPartition, Message Message); diff --git a/src/Contrib.KafkaFlow.Outbox/OutboxTableRow.cs b/src/Contrib.KafkaFlow.Outbox/OutboxTableRow.cs new file mode 100644 index 0000000..3c7f783 --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox/OutboxTableRow.cs @@ -0,0 +1,7 @@ +namespace KafkaFlow.Outbox; + +public sealed record OutboxTableRow(long SequenceId, string TopicName, int? Partition, byte[]? MessageKey, string? MessageHeaders, byte[]? MessageBody) +{ + public OutboxTableRow(string TopicName, int? Partition, byte[]? MessageKey, string? MessageHeaders, byte[]? MessageBody) + : this(0, TopicName, Partition, MessageKey, MessageHeaders, MessageBody) { } +} diff --git a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/ConfigurationBuilderExtensions.cs b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/ConfigurationBuilderExtensions.cs index 7cfb4b6..e43c332 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/ConfigurationBuilderExtensions.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/ConfigurationBuilderExtensions.cs @@ -1,4 +1,3 @@ -using KafkaFlow.ProcessManagers; using Microsoft.Extensions.DependencyInjection; namespace KafkaFlow.ProcessManagers.Postgres; @@ -6,5 +5,7 @@ namespace KafkaFlow.ProcessManagers.Postgres; public static class ConfigurationBuilderExtensions { public static IServiceCollection AddPostgresProcessManagerState(this IServiceCollection services) => - services.AddSingleton(); + services + .AddSingleton() + .AddSingleton(); } diff --git a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj index 852e25b..9139865 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj @@ -1,4 +1,4 @@ - + netstandard2.0 diff --git a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessManagersStore.cs b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessManagersStore.cs deleted file mode 100644 index e4ab025..0000000 --- a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessManagersStore.cs +++ /dev/null @@ -1,94 +0,0 @@ -using Dapper; -using Npgsql; -using System.Text.Json; - -namespace KafkaFlow.ProcessManagers.Postgres; - -public sealed class PostgresProcessManagersStore(NpgsqlDataSource connectionPool) : IProcessStateStore -{ - private readonly NpgsqlDataSource _connectionPool = connectionPool; - - public async ValueTask Persist(Type processType, Guid processId, VersionedState state) - { - var sql = """ - INSERT INTO process_managers.processes(process_type, process_id, process_state) - VALUES (@process_type, @process_id, @process_state) - ON CONFLICT (process_type, process_id) DO - UPDATE - SET - process_state = EXCLUDED.process_state, - date_updated_utc = (now() AT TIME ZONE 'utc') - WHERE xmin = @version - """; - await using var conn = _connectionPool.CreateConnection(); - var result = await conn.ExecuteAsync(sql, new - { - process_type = processType.FullName, - process_id = processId, - process_state = JsonSerializer.Serialize(state.State), - version = state.Version - }); - - if (result == 0) - { - throw new OptimisticConcurrencyException(processType, processId, - $"Concurrency error when persisting state {processType.FullName}"); - } - } - - public async ValueTask Load(Type processType, Guid processId) - { - var sql = """ - SELECT process_state, xmin as version - FROM process_managers.processes - WHERE process_type = @process_type AND process_id = @process_id - """; - - await using var conn = _connectionPool.CreateConnection(); - var result = await conn.QueryAsync(sql, new - { - process_type = processType.FullName, - process_id = processId - }); - - var firstResult = result?.FirstOrDefault(); - - if (firstResult == null) - { - return VersionedState.Zero; - } - - var decoded = JsonSerializer.Deserialize(firstResult.process_state, processType); - return new VersionedState(firstResult.version, decoded); - } - - public async ValueTask Delete(Type processType, Guid processId, int version) - { - var sql = """ - DELETE FROM process_managers.processes - WHERE process_type = @process_type AND process_id = @process_id and xmin = @version - """; - - await using var conn = _connectionPool.CreateConnection(); - var result = await conn.ExecuteAsync(sql, new - { - process_type = processType.FullName, - process_id = processId, - version - }); - - if (result == 0) - { - throw new OptimisticConcurrencyException(processType, processId, - $"Concurrency error when persisting state {processType.FullName}"); - } - } - - private sealed class ProcessStateRow - { -#pragma warning disable IDE1006 // Naming Styles - public string process_state { get; set; } = null!; - public int version { get; set; } -#pragma warning restore IDE1006 // Naming Styles - } -} diff --git a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessStateRepository.cs b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessStateRepository.cs new file mode 100644 index 0000000..7c5e286 --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessStateRepository.cs @@ -0,0 +1,63 @@ +using Dapper; +using Npgsql; + +namespace KafkaFlow.ProcessManagers.Postgres; + +public sealed class PostgresProcessStateRepository(NpgsqlDataSource connectionPool) : IProcessStateRepository +{ + private readonly NpgsqlDataSource _connectionPool = connectionPool; + + public async ValueTask Persist(Type processType, string processState, Guid processId, VersionedState state) + { + var sql = """ + INSERT INTO process_managers.processes(process_type, process_id, process_state) + VALUES (@process_type, @process_id, @process_state) + ON CONFLICT (process_type, process_id) DO + UPDATE + SET + process_state = EXCLUDED.process_state, + date_updated_utc = (now() AT TIME ZONE 'utc') + WHERE xmin = @version + """; + await using var conn = _connectionPool.CreateConnection(); + return await conn.ExecuteAsync(sql, new + { + process_type = processType.FullName, + process_id = processId, + process_state = processState, + version = state.Version + }).ConfigureAwait(false); + } + + public async ValueTask> Load(Type processType, Guid processId) + { + var sql = """ + SELECT process_state as "ProcessState", xmin as "Version" + FROM process_managers.processes + WHERE process_type = @process_type AND process_id = @process_id + """; + + await using var conn = _connectionPool.CreateConnection(); + return (await conn.QueryAsync<(string ProcessState, uint Version)>(sql, new + { + process_type = processType.FullName, + process_id = processId + }).ConfigureAwait(false)).Select(v => new ProcessStateTableRow(v.ProcessState, (int)v.Version)); + } + + public async ValueTask Delete(Type processType, Guid processId, int version) + { + var sql = """ + DELETE FROM process_managers.processes + WHERE process_type = @process_type AND process_id = @process_id and xmin = @version + """; + + await using var conn = _connectionPool.CreateConnection(); + return await conn.ExecuteAsync(sql, new + { + process_type = processType.FullName, + process_id = processId, + version + }).ConfigureAwait(false); + } +} diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/ConfigurationBuilderExtensions.cs b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/ConfigurationBuilderExtensions.cs index 422027c..2320a99 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/ConfigurationBuilderExtensions.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/ConfigurationBuilderExtensions.cs @@ -7,8 +7,9 @@ namespace KafkaFlow.ProcessManagers.SqlServer; public static class ConfigurationBuilderExtensions { public static IServiceCollection AddSqlServerProcessManagerState(this IServiceCollection services) => - services.AddSingleton(); - + services + .AddSingleton() + .AddSingleton(); public static IServiceCollection AddSqlServerProcessManagerState(this IServiceCollection services, string connectionString) { diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj index d9fcceb..6a224cf 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj @@ -21,7 +21,6 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs index b7519e6..2fc864b 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs @@ -2,15 +2,14 @@ using KafkaFlow.SqlServer; using Microsoft.Extensions.Options; using System.Data.SqlClient; -using System.Text.Json; namespace KafkaFlow.ProcessManagers.SqlServer; -public sealed class SqlServerProcessManagersStore(IOptions options) : IProcessStateStore +public sealed class SqlServerProcessStateRepository(IOptions options) : IProcessStateRepository { private readonly SqlServerBackendOptions _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); - public async ValueTask Persist(Type processType, Guid processId, VersionedState state) + public async ValueTask Persist(Type processType, string processState, Guid processId, VersionedState state) { var sql = """ MERGE INTO [process_managers].[processes] as [target] @@ -27,48 +26,32 @@ WHEN NOT MATCHED THEN """; using var conn = new SqlConnection(_options.ConnectionString); - var result = await conn.ExecuteAsync(sql, new + return await conn.ExecuteAsync(sql, new { process_type = processType.FullName, process_id = processId, - process_state = JsonSerializer.Serialize(state.State), + process_state = processState, version = state.Version - }); - - if (result == 0) - { - throw new OptimisticConcurrencyException(processType, processId, - $"Concurrency error when persisting state {processType.FullName}"); - } + }).ConfigureAwait(false); } - public async ValueTask Load(Type processType, Guid processId) + public async ValueTask> Load(Type processType, Guid processId) { var sql = """ - SELECT [process_state], [rowversion] as [version] + SELECT [process_state] as [ProcessState], [rowversion] as [Version] FROM [process_managers].[processes] WHERE [process_type] = @process_type AND [process_id] = @process_id; """; using var conn = new SqlConnection(_options.ConnectionString); - var result = await conn.QueryAsync(sql, new + return await conn.QueryAsync(sql, new { process_type = processType.FullName, process_id = processId - }); - - var firstResult = result?.FirstOrDefault(); - - if (firstResult == null) - { - return VersionedState.Zero; - } - - var decoded = JsonSerializer.Deserialize(firstResult.process_state, processType); - return new VersionedState(firstResult.version, decoded); + }).ConfigureAwait(false); } - public async ValueTask Delete(Type processType, Guid processId, int version) + public async ValueTask Delete(Type processType, Guid processId, int version) { var sql = """ DELETE FROM [process_managers].[processes] @@ -76,25 +59,11 @@ DELETE FROM [process_managers].[processes] """; using var conn = new SqlConnection(_options.ConnectionString); - var result = await conn.ExecuteAsync(sql, new + return await conn.ExecuteAsync(sql, new { process_type = processType.FullName, process_id = processId, version - }); - - if (result == 0) - { - throw new OptimisticConcurrencyException(processType, processId, - $"Concurrency error when persisting state {processType.FullName}"); - } - } - - private sealed class ProcessStateRow - { -#pragma warning disable IDE1006 // Naming Styles - public required string process_state { get; set; } - public required int version { get; set; } -#pragma warning restore IDE1006 // Naming Styles + }).ConfigureAwait(false); } } diff --git a/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj b/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj index 0641584..d7f8265 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj @@ -1,4 +1,4 @@ - + netstandard2.0 diff --git a/src/Contrib.KafkaFlow.ProcessManagers/HandlerTypeMapping.cs b/src/Contrib.KafkaFlow.ProcessManagers/HandlerTypeMapping.cs new file mode 100644 index 0000000..8f375fa --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers/HandlerTypeMapping.cs @@ -0,0 +1,8 @@ +namespace KafkaFlow.ProcessManagers; + +internal sealed record HandlerTypeMapping(IReadOnlyDictionary> Mapping) +{ + private static readonly IReadOnlyList EmptyList = new List().AsReadOnly(); + public IReadOnlyList GetHandlersTypes(Type messageType) => + Mapping.TryGetValue(messageType, out var handlerType) ? handlerType : EmptyList; +} diff --git a/src/Contrib.KafkaFlow.ProcessManagers/IProcessManager.cs b/src/Contrib.KafkaFlow.ProcessManagers/IProcessManager.cs new file mode 100644 index 0000000..338aeda --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers/IProcessManager.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.ProcessManagers; + +public interface IProcessManager +{ + Type StateType { get; } +} diff --git a/src/Contrib.KafkaFlow.ProcessManagers/IProcessStateRepository.cs b/src/Contrib.KafkaFlow.ProcessManagers/IProcessStateRepository.cs new file mode 100644 index 0000000..ef8a847 --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers/IProcessStateRepository.cs @@ -0,0 +1,8 @@ +namespace KafkaFlow.ProcessManagers; + +public interface IProcessStateRepository +{ + ValueTask Persist(Type processType, string processState, Guid processId, VersionedState state); + ValueTask> Load(Type processType, Guid processId); + ValueTask Delete(Type processType, Guid processId, int version); +} diff --git a/src/Contrib.KafkaFlow.ProcessManagers/IProcessStateStore.cs b/src/Contrib.KafkaFlow.ProcessManagers/IProcessStateStore.cs index d57b9ae..86dda69 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/IProcessStateStore.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers/IProcessStateStore.cs @@ -1,23 +1,5 @@ namespace KafkaFlow.ProcessManagers; -public sealed class OptimisticConcurrencyException : Exception -{ - public Type ProcessType { get; init; } - public Guid ProcessId { get; init; } - - public OptimisticConcurrencyException(Type processType, Guid processId, string? message) : base(message) - { - ProcessType = processType; - ProcessId = processId; - } - - public OptimisticConcurrencyException(Type processType, Guid processId, string? message, Exception? innerException) : base(message, innerException) - { - ProcessType = processType; - ProcessId = processId; - } -} - public interface IProcessStateStore { /// diff --git a/src/Contrib.KafkaFlow.ProcessManagers/OptimisticConcurrencyException.cs b/src/Contrib.KafkaFlow.ProcessManagers/OptimisticConcurrencyException.cs new file mode 100644 index 0000000..683135d --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers/OptimisticConcurrencyException.cs @@ -0,0 +1,19 @@ +namespace KafkaFlow.ProcessManagers; + +public sealed class OptimisticConcurrencyException : Exception +{ + public Type ProcessType { get; init; } + public Guid ProcessId { get; init; } + + public OptimisticConcurrencyException(Type processType, Guid processId, string? message) : base(message) + { + ProcessType = processType; + ProcessId = processId; + } + + public OptimisticConcurrencyException(Type processType, Guid processId, string? message, Exception? innerException) : base(message, innerException) + { + ProcessType = processType; + ProcessId = processId; + } +} diff --git a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManager.cs b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManager.cs index 5f89490..3c326a8 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManager.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManager.cs @@ -1,10 +1,5 @@ namespace KafkaFlow.ProcessManagers; -public interface IProcessManager -{ - Type StateType { get; } -} - public abstract class ProcessManager : IProcessManager where TState : class { public Type StateType => typeof(TState); @@ -12,30 +7,29 @@ public abstract class ProcessManager : IProcessManager where TState : cl protected bool IsStateSet => State != null; - internal void SetState(TState? state) - { - State = state; - } + internal void SetState(TState? state) => State = state; - protected void UpdateState(TState state) - { - State = state; - } + protected void UpdateState(TState state) => State = state; - protected void FinishProcess() - { - State = null; - } + protected void FinishProcess() => State = null; protected async Task WithRequiredStateAsync(Func handler) { - if (!IsStateSet) return; + if (!IsStateSet) + { + return; + } + await handler(State!); } protected void WithRequiredState(Action handler) { - if (!IsStateSet) return; + if (!IsStateSet) + { + return; + } + handler(State!); } } diff --git a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfiguration.cs b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfiguration.cs index c8416dd..293d3f5 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfiguration.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfiguration.cs @@ -2,36 +2,7 @@ namespace KafkaFlow.ProcessManagers; - -/// -/// How to run transactions for process managers -/// -public enum TransactionMode -{ - /// - /// Do not run process managers in transactions - /// - Disabled, - - /// - /// Each message handler runs in each own transaction scope - /// - ForEachHandler, - - /// - /// All message handlers for a given messages run in one transaction scope - /// - ForAllHandlers -} - internal sealed record ProcessManagerConfiguration( TransactionMode TransactionMode, HandlerTypeMapping TypeMapping, Func BeginTransaction); - -internal sealed record HandlerTypeMapping(IReadOnlyDictionary> Mapping) -{ - private static readonly IReadOnlyList EmptyList = new List().AsReadOnly(); - public IReadOnlyList GetHandlersTypes(Type messageType) => - Mapping.TryGetValue(messageType, out var handlerType) ? handlerType : EmptyList; -} diff --git a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerMiddleware.cs b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerMiddleware.cs index ba972ae..2884f76 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerMiddleware.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerMiddleware.cs @@ -1,4 +1,3 @@ -using System.ComponentModel; using System.Transactions; namespace KafkaFlow.ProcessManagers; diff --git a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagersStore.cs b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagersStore.cs new file mode 100644 index 0000000..c5a5073 --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagersStore.cs @@ -0,0 +1,43 @@ +using System.Text.Json; + +namespace KafkaFlow.ProcessManagers; + +public sealed class ProcessManagersStore(IProcessStateRepository processStateRepository) : IProcessStateStore +{ + private readonly IProcessStateRepository _processStateRepository = processStateRepository; + + public async ValueTask Persist(Type processType, Guid processId, VersionedState state) + { + var persisted = await _processStateRepository.Persist(processType, JsonSerializer.Serialize(state.State), processId, state).ConfigureAwait(false); + if (persisted == 0) + { + throw new OptimisticConcurrencyException(processType, processId, + $"Concurrency error when persisting state {processType.FullName}"); + } + } + + public async ValueTask Load(Type processType, Guid processId) + { + var result = await _processStateRepository.Load(processType, processId).ConfigureAwait(false); + var firstResult = result?.FirstOrDefault(); + + if (firstResult == null) + { + return VersionedState.Zero; + } + + var deserialized = JsonSerializer.Deserialize(firstResult.ProcessState, processType); + return new VersionedState(firstResult.Version, deserialized); + } + + public async ValueTask Delete(Type processType, Guid processId, int version) + { + var result = await _processStateRepository.Delete(processType, processId, version).ConfigureAwait(false); + + if (result == 0) + { + throw new OptimisticConcurrencyException(processType, processId, + $"Concurrency error when persisting state {processType.FullName}"); + } + } +} diff --git a/src/Contrib.KafkaFlow.ProcessManagers/ProcessStateTableRow.cs b/src/Contrib.KafkaFlow.ProcessManagers/ProcessStateTableRow.cs new file mode 100644 index 0000000..20e502e --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers/ProcessStateTableRow.cs @@ -0,0 +1,3 @@ +namespace KafkaFlow.ProcessManagers; + +public sealed record ProcessStateTableRow(string ProcessState, int Version); diff --git a/src/Contrib.KafkaFlow.ProcessManagers/TransactionMode.cs b/src/Contrib.KafkaFlow.ProcessManagers/TransactionMode.cs new file mode 100644 index 0000000..f3a618b --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers/TransactionMode.cs @@ -0,0 +1,22 @@ +namespace KafkaFlow.ProcessManagers; + +/// +/// How to run transactions for process managers +/// +public enum TransactionMode +{ + /// + /// Do not run process managers in transactions + /// + Disabled, + + /// + /// Each message handler runs in each own transaction scope + /// + ForEachHandler, + + /// + /// All message handlers for a given messages run in one transaction scope + /// + ForAllHandlers +} diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/IKafkaFlowFixture.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/IKafkaFlowFixture.cs new file mode 100644 index 0000000..d655d29 --- /dev/null +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/IKafkaFlowFixture.cs @@ -0,0 +1,9 @@ + +namespace KafkaFlow.ProcessManagers.IntegrationTests.Fixture; + +public interface IKafkaFlowFixture +{ + LoggingProcessStateStore ProcessStateStore { get; } + IMessageProducer Producer { get; } + string TopicName { get; } +} \ No newline at end of file diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/KafkaFlowFixture.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/KafkaFlowFixture.cs new file mode 100644 index 0000000..dffff9a --- /dev/null +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/KafkaFlowFixture.cs @@ -0,0 +1,143 @@ +using Confluent.Kafka; +using KafkaFlow.Consumers; +using KafkaFlow.Outbox; +using KafkaFlow.Serializer; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace KafkaFlow.ProcessManagers.IntegrationTests.Fixture; + +public abstract class KafkaFlowFixture( + string prefix, + IEnumerable brokers, + int numberOfPartitions = 3, + short replicationFactor = 1, + Partitioner partitioner = Partitioner.Murmur2Random, + int bufferSize = 100, + int workersCount = 1 + ) : KafkaFlowFixture(prefix, brokers, numberOfPartitions, replicationFactor, partitioner, bufferSize, workersCount) +{ } + +public abstract class KafkaFlowFixture : IDisposable, IAsyncDisposable, IKafkaFlowFixture where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer +{ + public readonly string FixtureId = Guid.NewGuid().ToString(); + public string TopicName { get; } + public readonly ServiceProvider ServiceProvider; + private readonly IKafkaBus _kafkaBus; + private readonly CancellationTokenSource _fixtureCancellation; + + public LoggingProcessStateStore ProcessStateStore { get; } + + public IMessageProducer Producer { get; } + + public abstract void ConfigureFixture(IConfiguration config, IServiceCollection services); + + public KafkaFlowFixture(string prefix, + IEnumerable brokers, + int numberOfPartitions = 3, + short replicationFactor = 1, + Partitioner partitioner = Partitioner.Murmur2Random, + int bufferSize = 100, + int workersCount = 1) + { + _fixtureCancellation = new CancellationTokenSource(); + TopicName = $"{prefix}-messages-{FixtureId}"; + + var services = new ServiceCollection(); + + var config = new ConfigurationManager() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", optional: false) + .AddEnvironmentVariables() + .Build(); + + ConfigureFixture(config, services); + services + .AddSingleton(config) + .AddLogging(log => log.AddConsole().AddDebug()) + .Decorate() + .AddKafka(kafka => + kafka + .UseMicrosoftLog() + .AddCluster(cluster => + cluster + .WithBrokers(brokers) + .CreateTopicIfNotExists(TopicName, numberOfPartitions, replicationFactor) + .AddOutboxDispatcher(x => x.WithPartitioner(partitioner)) + .AddProducer(producer => + producer + .WithOutbox() + .DefaultTopic(TopicName) + .AddMiddlewares(m => m.AddSerializer())) + .AddProducer(producer => + producer + .WithOutbox() + .DefaultTopic(TopicName) + .AddMiddlewares(m => m.AddSerializer()) + ) + .AddConsumer(consumer => + consumer + .Topic(TopicName) + .WithGroupId($"{prefix}-group-{FixtureId}") + .WithBufferSize(bufferSize) + .WithWorkersCount(workersCount) + .WithAutoOffsetReset(AutoOffsetReset.Earliest) + .AddMiddlewares(middlewares => + middlewares + .AddDeserializer() + .AddProcessManagers(pm => pm.AddProcessManagersFromAssemblyOf()) + ) + ) + ) + ); + + ServiceProvider = services.BuildServiceProvider(); + ProcessStateStore = (LoggingProcessStateStore)ServiceProvider.GetRequiredService(); + Producer = ServiceProvider.GetRequiredService>(); + + var svc = ServiceProvider.GetServices(); + foreach (var service in svc) + { + service.StartAsync(_fixtureCancellation.Token).GetAwaiter().GetResult(); + } + + _kafkaBus = ServiceProvider.CreateKafkaBus(); + _kafkaBus.StartAsync(_fixtureCancellation.Token).GetAwaiter().GetResult(); + } + + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + public async ValueTask DisposeAsync() + { + await DisposeAsyncCore().ConfigureAwait(false); + + Dispose(disposing: false); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + _fixtureCancellation.Cancel(); + _fixtureCancellation.Dispose(); + } + } + + protected virtual async ValueTask DisposeAsyncCore() + { + await _kafkaBus.StopAsync(); + foreach (var cons in ServiceProvider.GetRequiredService().All) + { + await cons.StopAsync(); + } + await ServiceProvider.DisposeAsync(); + } +} diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs index 0882a09..8524b12 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs @@ -1,133 +1,24 @@ -using Confluent.Kafka; -using KafkaFlow.Consumers; -using KafkaFlow.Outbox; using KafkaFlow.Outbox.Postgres; using KafkaFlow.ProcessManagers.Postgres; -using KafkaFlow.Serializer; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; using Npgsql; namespace KafkaFlow.ProcessManagers.IntegrationTests.Fixture; -public class PostgresKafkaFlowFixture : IDisposable, IAsyncDisposable +public class PostgresKafkaFlowFixture : KafkaFlowFixture { - public readonly string FixtureId = Guid.NewGuid().ToString(); - public string TopicName { get; } - public readonly ServiceProvider ServiceProvider; - private readonly IKafkaBus _kafkaBus; - private readonly CancellationTokenSource _fixtureCancellation; - - public LoggingProcessStateStore ProcessStateStore { get; } - - public IMessageProducer Producer { get; } - public PostgresKafkaFlowFixture() - { - _fixtureCancellation = new CancellationTokenSource(); - TopicName = $"pg-messages-{FixtureId}"; - - var services = new ServiceCollection(); - - var config = new ConfigurationManager() - .SetBasePath(Directory.GetCurrentDirectory()) - .AddJsonFile("appsettings.json", optional: false) - .AddEnvironmentVariables() - .Build(); + : base("pg", ["localhost:9092"]) { } + public override void ConfigureFixture(IConfiguration config, IServiceCollection services) + { var connStr = config.GetConnectionString("PostgresBackend"); var pool = new NpgsqlDataSourceBuilder(connStr).Build(); services - .AddSingleton(config) .AddSingleton(pool) - .AddLogging(log => log.AddConsole().AddDebug()) .AddPostgresProcessManagerState() - .Decorate() - .AddPostgresOutboxBackend() - .AddKafka(kafka => - kafka - .UseMicrosoftLog() - .AddCluster(cluster => - cluster - .WithBrokers(["localhost:9092 "]) - .CreateTopicIfNotExists(TopicName, 3, 1) - .AddOutboxDispatcher(x => x.WithPartitioner(Partitioner.Murmur2Random)) - .AddProducer(producer => - producer - .WithOutbox() - .DefaultTopic(TopicName) - .AddMiddlewares(m => m.AddSerializer())) - .AddProducer(producer => - producer - .WithOutbox() - .DefaultTopic(TopicName) - .AddMiddlewares(m => m.AddSerializer()) - ) - .AddConsumer(consumer => - consumer - .Topic(TopicName) - .WithGroupId($"pg-group-{FixtureId}") - .WithBufferSize(100) - .WithWorkersCount(1) - .WithAutoOffsetReset(AutoOffsetReset.Earliest) - .AddMiddlewares(middlewares => - middlewares - .AddDeserializer() - .AddProcessManagers(pm => pm.AddProcessManagersFromAssemblyOf()) - ) - ) - ) - ); - ServiceProvider = services.BuildServiceProvider(); - - ProcessStateStore = (LoggingProcessStateStore)ServiceProvider.GetRequiredService(); - - Producer = ServiceProvider.GetRequiredService>(); - - var svc = ServiceProvider.GetServices(); - - foreach (var service in svc) - { - service.StartAsync(_fixtureCancellation.Token); - } - - _kafkaBus = ServiceProvider.CreateKafkaBus(); - _kafkaBus.StartAsync(_fixtureCancellation.Token); - } - - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - - public async ValueTask DisposeAsync() - { - await DisposeAsyncCore().ConfigureAwait(false); - - Dispose(disposing: false); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - { - _fixtureCancellation.Cancel(); - _fixtureCancellation.Dispose(); - } - } - - protected virtual async ValueTask DisposeAsyncCore() - { - await _kafkaBus.StopAsync(); - foreach (var cons in ServiceProvider.GetRequiredService().All) - { - await cons.StopAsync(); - } - await ServiceProvider.DisposeAsync(); + .AddPostgresOutboxBackend(); } } diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs index 6f69b6e..b001355 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs @@ -1,128 +1,17 @@ -using Confluent.Kafka; -using KafkaFlow.Consumers; -using KafkaFlow.Outbox; -using KafkaFlow.Outbox.SqlServer; +using KafkaFlow.Outbox.SqlServer; using KafkaFlow.ProcessManagers.SqlServer; -using KafkaFlow.Serializer; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; namespace KafkaFlow.ProcessManagers.IntegrationTests.Fixture; -public class SqlServerKafkaFlowFixture : IDisposable, IAsyncDisposable +public class SqlServerKafkaFlowFixture : KafkaFlowFixture { - public readonly string FixtureId = Guid.NewGuid().ToString(); - public string TopicName { get; } - public readonly ServiceProvider ServiceProvider; - private readonly IKafkaBus _kafkaBus; - private readonly CancellationTokenSource _fixtureCancellation; - - public LoggingProcessStateStore ProcessStateStore { get; } - - public IMessageProducer Producer { get; } - public SqlServerKafkaFlowFixture() - { - _fixtureCancellation = new CancellationTokenSource(); - TopicName = $"mssql-messages-{FixtureId}"; - - var services = new ServiceCollection(); + : base("mssql", ["localhost:9092"]) { } - var config = new ConfigurationManager() - .SetBasePath(Directory.GetCurrentDirectory()) - .AddJsonFile("appsettings.json", optional: false) - .AddEnvironmentVariables() - .Build(); - - services - .AddSingleton(config) - .AddLogging(log => log.AddConsole().AddDebug()) + public override void ConfigureFixture(IConfiguration config, IServiceCollection services) + => services .AddSqlServerProcessManagerState(config.GetConnectionString("SqlServerBackend")!) - .Decorate() - .AddSqlServerOutboxBackend() - .AddKafka(kafka => - kafka - .UseMicrosoftLog() - .AddCluster(cluster => - cluster - .WithBrokers(["localhost:9092 "]) - .CreateTopicIfNotExists(TopicName, 3, 1) - .AddOutboxDispatcher(x => x.WithPartitioner(Partitioner.Murmur2Random)) - .AddProducer(producer => - producer - .WithOutbox() - .DefaultTopic(TopicName) - .AddMiddlewares(m => m.AddSerializer())) - .AddProducer(producer => - producer - .WithOutbox() - .DefaultTopic(TopicName) - .AddMiddlewares(m => m.AddSerializer()) - ) - .AddConsumer(consumer => - consumer - .Topic(TopicName) - .WithGroupId($"mssql-group-{FixtureId}") - .WithBufferSize(100) - .WithWorkersCount(1) - .WithAutoOffsetReset(AutoOffsetReset.Earliest) - .AddMiddlewares(middlewares => - middlewares - .AddDeserializer() - .AddProcessManagers(pm => pm.AddProcessManagersFromAssemblyOf()) - ) - ) - ) - ); - ServiceProvider = services.BuildServiceProvider(); - - ProcessStateStore = (LoggingProcessStateStore)ServiceProvider.GetRequiredService(); - - Producer = ServiceProvider.GetRequiredService>(); - - var svc = ServiceProvider.GetServices(); - - foreach (var service in svc) - { - service.StartAsync(_fixtureCancellation.Token); - } - - _kafkaBus = ServiceProvider.CreateKafkaBus(); - _kafkaBus.StartAsync(_fixtureCancellation.Token); - } - - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - - public async ValueTask DisposeAsync() - { - await DisposeAsyncCore().ConfigureAwait(false); - - Dispose(disposing: false); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - { - _fixtureCancellation.Cancel(); - _fixtureCancellation.Dispose(); - } - } - - protected virtual async ValueTask DisposeAsyncCore() - { - await _kafkaBus.StopAsync(); - foreach (var cons in ServiceProvider.GetRequiredService().All) - { - await cons.StopAsync(); - } - await ServiceProvider.DisposeAsync(); - } + .AddSqlServerOutboxBackend(); } diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj b/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj index a7c835a..c9c8d57 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj @@ -1,4 +1,4 @@ - + net8.0 diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/PostgresProcessManagerStoreTests.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/PostgresProcessManagerStoreTests.cs index f6bd730..2771a04 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/PostgresProcessManagerStoreTests.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/PostgresProcessManagerStoreTests.cs @@ -1,42 +1,17 @@ -using FluentAssertions; using KafkaFlow.ProcessManagers.Postgres; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Npgsql; namespace KafkaFlow.ProcessManagers.IntegrationTests; -public sealed class PostgresProcessManagerStoreTests +public sealed class PostgresProcessManagerStoreTests : ProcessManagerStoreTests { - [Fact] - public async Task Should_write_update_and_delete_state() + public override void Configure(IConfiguration config, IServiceCollection services) { - var processId = Guid.NewGuid(); - var state = processId.ToString(); - - var config = - new ConfigurationManager() - .SetBasePath(Directory.GetCurrentDirectory()) - .AddJsonFile("appsettings.json", optional: false) - .AddEnvironmentVariables() - .Build(); - var connStr = config.GetConnectionString("PostgresBackend"); - var pool = new NpgsqlDataSourceBuilder(connStr).Build(); - - var store = new PostgresProcessManagersStore(pool); - - var noState = await store.Load(state.GetType(), processId); - noState.Should().BeEquivalentTo(VersionedState.Zero); - - await store.Persist(state.GetType(), processId, new VersionedState(0, state)); - - var hasState = await store.Load(state.GetType(), processId); - hasState.State.Should().NotBeNull(); - hasState.Version.Should().BePositive(); - - await store.Delete(state.GetType(), processId, hasState.Version); - var goneState = await store.Load(state.GetType(), processId); - goneState.Should().BeEquivalentTo(VersionedState.Zero); + services.AddSingleton(pool); + services.AddPostgresProcessManagerState(); } } diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/ProcessManagerStoreTests.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/ProcessManagerStoreTests.cs new file mode 100644 index 0000000..98142d2 --- /dev/null +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/ProcessManagerStoreTests.cs @@ -0,0 +1,44 @@ +using FluentAssertions; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace KafkaFlow.ProcessManagers.IntegrationTests; + +public abstract class ProcessManagerStoreTests +{ + public abstract void Configure(IConfiguration config, IServiceCollection services); + + [Fact] + public async Task Should_write_update_and_delete_state() + { + var processId = Guid.NewGuid(); + var state = processId.ToString(); + + var config = + new ConfigurationManager() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", optional: false) + .AddEnvironmentVariables() + .Build(); + + var services = new ServiceCollection(); + Configure(config, services); + var sp = services.BuildServiceProvider(); + + var store = sp.GetRequiredService(); + + var noState = await store.Load(state.GetType(), processId); + noState.Should().BeEquivalentTo(VersionedState.Zero); + + await store.Persist(state.GetType(), processId, new VersionedState(0, state)); + + var hasState = await store.Load(state.GetType(), processId); + hasState.State.Should().NotBeNull(); + hasState.Version.Should().BePositive(); + + await store.Delete(state.GetType(), processId, hasState.Version); + var goneState = await store.Load(state.GetType(), processId); + goneState.Should().BeEquivalentTo(VersionedState.Zero); + } + +} diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs index 5be3b76..ba5be00 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs @@ -1,41 +1,10 @@ -using FluentAssertions; -using KafkaFlow.ProcessManagers.SqlServer; +using KafkaFlow.ProcessManagers.SqlServer; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; namespace KafkaFlow.ProcessManagers.IntegrationTests; -public sealed class SqlServerProcessManagerStoreTests -{ - [Fact] - public async Task Should_write_update_and_delete_state() - { - var processId = Guid.NewGuid(); - var state = processId.ToString(); - - var config = - new ConfigurationManager() - .SetBasePath(Directory.GetCurrentDirectory()) - .AddJsonFile("appsettings.json", optional: false) - .AddEnvironmentVariables() - .Build(); - - var services = new ServiceCollection(); - services.AddSqlServerProcessManagerState(config.GetConnectionString("SqlServerBackend")!); - var sp = services.BuildServiceProvider(); - - var store = sp.GetRequiredService(); - var noState = await store.Load(state.GetType(), processId); - noState.Should().BeEquivalentTo(VersionedState.Zero); - - await store.Persist(state.GetType(), processId, new VersionedState(0, state)); - - var hasState = await store.Load(state.GetType(), processId); - hasState.State.Should().NotBeNull(); - hasState.Version.Should().BePositive(); - - await store.Delete(state.GetType(), processId, hasState.Version); - var goneState = await store.Load(state.GetType(), processId); - goneState.Should().BeEquivalentTo(VersionedState.Zero); - } +public sealed class SqlServerProcessManagerStoreTests : ProcessManagerStoreTests +{ + public override void Configure(IConfiguration config, IServiceCollection services) => services.AddSqlServerProcessManagerState(config.GetConnectionString("SqlServerBackend")!); } diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/PostgresUserLifecycleProcessManagerTests.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/PostgresUserLifecycleProcessManagerTests.cs index 2c2e986..2e4a69a 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/PostgresUserLifecycleProcessManagerTests.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/PostgresUserLifecycleProcessManagerTests.cs @@ -1,35 +1,6 @@ -using FluentAssertions; using KafkaFlow.ProcessManagers.IntegrationTests.Fixture; namespace KafkaFlow.ProcessManagers.IntegrationTests.UserLifeCycle; -public sealed class PostgresUserLifecycleProcessManagerTests : IAssemblyFixture -{ - private readonly PostgresKafkaFlowFixture _fixture; - - public PostgresUserLifecycleProcessManagerTests(PostgresKafkaFlowFixture fixture) - { - _fixture = fixture ?? throw new ArgumentNullException(nameof(fixture)); - _fixture.ProcessStateStore.ClearChanges(); - } - - [Fact] - public async Task Should_run_user_registration_simulation() - { - var message = new UserRegistered(Guid.NewGuid(), "test@test.com"); - await _fixture.Producer.ProduceAsync(message.UserId.ToString(), message); - - TestUtils.RetryFor(TimeSpan.FromSeconds(30), TimeSpan.FromMicroseconds(100), () => - { - _fixture.ProcessStateStore - .Changes - .Select(x => x.Item1) - .Should().BeEquivalentTo(new[] - { - LoggingProcessStateStore.ActionType.Persisted, - LoggingProcessStateStore.ActionType.Persisted, - LoggingProcessStateStore.ActionType.Deleted - }, x => x.WithStrictOrdering()); - }); - } -} +public sealed class PostgresUserLifecycleProcessManagerTests(PostgresKafkaFlowFixture fixture) : UserLifecycleProcessManagerTests(fixture) +{ } diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/SqlServerLifecycleProcessManagerTests.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/SqlServerLifecycleProcessManagerTests.cs index 2c5210e..bcd8290 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/SqlServerLifecycleProcessManagerTests.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/SqlServerLifecycleProcessManagerTests.cs @@ -1,35 +1,6 @@ -using FluentAssertions; -using KafkaFlow.ProcessManagers.IntegrationTests.Fixture; +using KafkaFlow.ProcessManagers.IntegrationTests.Fixture; namespace KafkaFlow.ProcessManagers.IntegrationTests.UserLifeCycle; -public sealed class SqlServerLifecycleProcessManagerTests : IAssemblyFixture -{ - private readonly SqlServerKafkaFlowFixture _fixture; - - public SqlServerLifecycleProcessManagerTests(SqlServerKafkaFlowFixture fixture) - { - _fixture = fixture ?? throw new ArgumentNullException(nameof(fixture)); - _fixture.ProcessStateStore.ClearChanges(); - } - - [Fact] - public async Task Should_run_user_registration_simulation() - { - var message = new UserRegistered(Guid.NewGuid(), "test@test.com"); - await _fixture.Producer.ProduceAsync(message.UserId.ToString(), message); - - TestUtils.RetryFor(TimeSpan.FromSeconds(30), TimeSpan.FromMicroseconds(100), () => - { - _fixture.ProcessStateStore - .Changes - .Select(x => x.Item1) - .Should().BeEquivalentTo(new[] - { - LoggingProcessStateStore.ActionType.Persisted, - LoggingProcessStateStore.ActionType.Persisted, - LoggingProcessStateStore.ActionType.Deleted - }, x => x.WithStrictOrdering()); - }); - } -} +public sealed class SqlServerLifecycleProcessManagerTests(SqlServerKafkaFlowFixture fixture) : UserLifecycleProcessManagerTests(fixture) +{ } diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifecycleProcessManagerTests.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifecycleProcessManagerTests.cs new file mode 100644 index 0000000..2d5fa05 --- /dev/null +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifecycleProcessManagerTests.cs @@ -0,0 +1,36 @@ +using FluentAssertions; +using KafkaFlow.ProcessManagers.IntegrationTests.Fixture; + +namespace KafkaFlow.ProcessManagers.IntegrationTests.UserLifeCycle; + +public abstract class UserLifecycleProcessManagerTests : IAssemblyFixture + where T : class, IKafkaFlowFixture, new() +{ + private readonly T _fixture; + + protected UserLifecycleProcessManagerTests(T fixture) + { + _fixture = fixture ?? throw new ArgumentNullException(nameof(fixture)); + _fixture.ProcessStateStore.ClearChanges(); + } + + [Fact] + public async Task Should_run_user_registration_simulation() + { + var message = new UserRegistered(Guid.NewGuid(), "test@test.com"); + await _fixture.Producer.ProduceAsync(message.UserId.ToString(), message); + + TestUtils.RetryFor(TimeSpan.FromSeconds(30), TimeSpan.FromMicroseconds(100), () => + { + _fixture.ProcessStateStore + .Changes + .Select(x => x.Item1) + .Should().BeEquivalentTo(new[] + { + LoggingProcessStateStore.ActionType.Persisted, + LoggingProcessStateStore.ActionType.Persisted, + LoggingProcessStateStore.ActionType.Deleted + }, x => x.WithStrictOrdering()); + }); + } +} From 6154475c5bc0c31dd505ef847f1f8a37c4c7739d Mon Sep 17 00:00:00 2001 From: RobIII Date: Fri, 26 Apr 2024 16:36:50 +0200 Subject: [PATCH 2/3] Minor reformatting of queries --- .../SqlServerOutboxRepository.cs | 12 ++++++------ .../PostgresProcessStateRepository.cs | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxRepository.cs b/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxRepository.cs index 3275490..2ac2d13 100644 --- a/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxRepository.cs +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxRepository.cs @@ -31,12 +31,12 @@ public async Task> Read(int batchSize, CancellationT { var sql = """ DELETE FROM [outbox].[outbox] - OUTPUT DELETED.sequence_id as SequenceId, - DELETED.topic_name as TopicName, - DELETED.partition as Partition, - DELETED.message_key as MessageKey, - DELETED.message_headers as MessageHeaders, - DELETED.message_body as MessageBody + OUTPUT [DELETED].[sequence_id] as [SequenceId], + [DELETED].[topic_name] as [TopicName], + [DELETED].[partition] as [Partition], + [DELETED].[message_key] as [MessageKey], + [DELETED].[message_headers] as [MessageHeaders], + [DELETED].[message_body] as [MessageBody] WHERE [sequence_id] IN ( SELECT TOP (@batch_size) [sequence_id] FROM [outbox].[outbox] diff --git a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessStateRepository.cs b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessStateRepository.cs index 7c5e286..6507248 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessStateRepository.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/PostgresProcessStateRepository.cs @@ -15,8 +15,8 @@ INSERT INTO process_managers.processes(process_type, process_id, process_state) ON CONFLICT (process_type, process_id) DO UPDATE SET - process_state = EXCLUDED.process_state, - date_updated_utc = (now() AT TIME ZONE 'utc') + process_state = EXCLUDED.process_state, + date_updated_utc = (now() AT TIME ZONE 'utc') WHERE xmin = @version """; await using var conn = _connectionPool.CreateConnection(); From cd95c5d0d3cfdcda01ffe30f45e066e642182674 Mon Sep 17 00:00:00 2001 From: RobIII Date: Fri, 26 Apr 2024 16:40:38 +0200 Subject: [PATCH 3/3] Fix lost-somewhere spaces --- src/Contrib.KafkaFlow.Outbox.Postgres/schema/0002.Table.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Contrib.KafkaFlow.Outbox.Postgres/schema/0002.Table.sql b/src/Contrib.KafkaFlow.Outbox.Postgres/schema/0002.Table.sql index a050402..dc72b7a 100644 --- a/src/Contrib.KafkaFlow.Outbox.Postgres/schema/0002.Table.sql +++ b/src/Contrib.KafkaFlow.Outbox.Postgres/schema/0002.Table.sql @@ -1,7 +1,7 @@ /* Table */ CREATE TABLE "outbox"."outbox" ( - sequence_id SERIAL NOT NULL, + sequence_id SERIAL NOT NULL, topic_name VARCHAR(255) NOT NULL, partition INT NULL, message_key BYTEA NULL,