diff --git a/KafkaFlow.Contrib.sln b/KafkaFlow.Contrib.sln index e013acc..6e70cca 100644 --- a/KafkaFlow.Contrib.sln +++ b/KafkaFlow.Contrib.sln @@ -1,18 +1,27 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contrib.KafkaFlow.ProcessManagers", "src\Contrib.KafkaFlow.ProcessManagers\Contrib.KafkaFlow.ProcessManagers.csproj", "{1DD0DA5D-63CD-4FC3-972E-CD376139810A}" +# Visual Studio Version 17 +VisualStudioVersion = 17.9.34728.123 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.ProcessManagers", "src\Contrib.KafkaFlow.ProcessManagers\Contrib.KafkaFlow.ProcessManagers.csproj", "{1DD0DA5D-63CD-4FC3-972E-CD376139810A}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{CE99781E-E88C-404C-B9AD-D7F713265F31}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{A5E2EC13-2E8B-4EB3-8C0F-D252A04B42F4}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.ProcessManagers.IntegrationTests", "tests\KafkaFlow.ProcessManagers.IntegrationTests\KafkaFlow.ProcessManagers.IntegrationTests.csproj", "{F3DB8E66-BEA2-494F-B908-57C83BDF1501}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.ProcessManagers.IntegrationTests", "tests\KafkaFlow.ProcessManagers.IntegrationTests\KafkaFlow.ProcessManagers.IntegrationTests.csproj", "{F3DB8E66-BEA2-494F-B908-57C83BDF1501}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contrib.KafkaFlow.Outbox", "src\Contrib.KafkaFlow.Outbox\Contrib.KafkaFlow.Outbox.csproj", "{39CF3703-98C2-4EE5-872B-AEFE85EEAC9A}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.Outbox", "src\Contrib.KafkaFlow.Outbox\Contrib.KafkaFlow.Outbox.csproj", "{39CF3703-98C2-4EE5-872B-AEFE85EEAC9A}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contrib.KafkaFlow.Outbox.Postgres", "src\Contrib.KafkaFlow.Outbox.Postgres\Contrib.KafkaFlow.Outbox.Postgres.csproj", "{95A1F8B4-98B2-42F2-8CF1-0067F3D6B125}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.Outbox.Postgres", "src\Contrib.KafkaFlow.Outbox.Postgres\Contrib.KafkaFlow.Outbox.Postgres.csproj", "{95A1F8B4-98B2-42F2-8CF1-0067F3D6B125}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contrib.KafkaFlow.ProcessManagers.Postgres", "src\Contrib.KafkaFlow.ProcessManagers.Postgres\Contrib.KafkaFlow.ProcessManagers.Postgres.csproj", "{6A9E74CC-3E65-4422-8E3E-862C3A999B72}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.ProcessManagers.Postgres", "src\Contrib.KafkaFlow.ProcessManagers.Postgres\Contrib.KafkaFlow.ProcessManagers.Postgres.csproj", "{6A9E74CC-3E65-4422-8E3E-862C3A999B72}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.Outbox.SqlServer", "src\Contrib.KafkaFlow.Outbox.SqlServer\Contrib.KafkaFlow.Outbox.SqlServer.csproj", "{A4CBB313-9DCC-441E-BB44-1473CDF7A986}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.ProcessManagers.SqlServer", "src\Contrib.KafkaFlow.ProcessManagers.SqlServer\Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj", "{3F0C0AD0-CFD8-4722-BF3B-6259D0878680}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contrib.KafkaFlow.SqlServer", "src\Contrib.KafkaFlow.SqlServer\Contrib.KafkaFlow.SqlServer.csproj", "{6839FA17-DF28-418B-AB5A-2249D44F4CA3}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -40,6 +49,21 @@ Global {6A9E74CC-3E65-4422-8E3E-862C3A999B72}.Debug|Any CPU.Build.0 = Debug|Any CPU {6A9E74CC-3E65-4422-8E3E-862C3A999B72}.Release|Any CPU.ActiveCfg = Release|Any CPU {6A9E74CC-3E65-4422-8E3E-862C3A999B72}.Release|Any CPU.Build.0 = Release|Any CPU + {A4CBB313-9DCC-441E-BB44-1473CDF7A986}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A4CBB313-9DCC-441E-BB44-1473CDF7A986}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A4CBB313-9DCC-441E-BB44-1473CDF7A986}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A4CBB313-9DCC-441E-BB44-1473CDF7A986}.Release|Any CPU.Build.0 = Release|Any CPU + {3F0C0AD0-CFD8-4722-BF3B-6259D0878680}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3F0C0AD0-CFD8-4722-BF3B-6259D0878680}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3F0C0AD0-CFD8-4722-BF3B-6259D0878680}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3F0C0AD0-CFD8-4722-BF3B-6259D0878680}.Release|Any CPU.Build.0 = Release|Any CPU + {6839FA17-DF28-418B-AB5A-2249D44F4CA3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6839FA17-DF28-418B-AB5A-2249D44F4CA3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6839FA17-DF28-418B-AB5A-2249D44F4CA3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6839FA17-DF28-418B-AB5A-2249D44F4CA3}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution {1DD0DA5D-63CD-4FC3-972E-CD376139810A} = {CE99781E-E88C-404C-B9AD-D7F713265F31} @@ -47,5 +71,11 @@ Global {39CF3703-98C2-4EE5-872B-AEFE85EEAC9A} = {CE99781E-E88C-404C-B9AD-D7F713265F31} {95A1F8B4-98B2-42F2-8CF1-0067F3D6B125} = {CE99781E-E88C-404C-B9AD-D7F713265F31} {6A9E74CC-3E65-4422-8E3E-862C3A999B72} = {CE99781E-E88C-404C-B9AD-D7F713265F31} + {A4CBB313-9DCC-441E-BB44-1473CDF7A986} = {CE99781E-E88C-404C-B9AD-D7F713265F31} + {3F0C0AD0-CFD8-4722-BF3B-6259D0878680} = {CE99781E-E88C-404C-B9AD-D7F713265F31} + {6839FA17-DF28-418B-AB5A-2249D44F4CA3} = {CE99781E-E88C-404C-B9AD-D7F713265F31} + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {66EE15D8-69C4-4AFB-9EB8-6020B2D8C01A} EndGlobalSection EndGlobal diff --git a/Readme.md b/Readme.md index 603558f..8ebb48e 100644 --- a/Readme.md +++ b/Readme.md @@ -11,6 +11,7 @@ ecosystem. The following backends are implemented: - [Contrib.KafkaFlow.Outbox.Postgres](./src/Contrib.KafkaFlow.Outbox.Postgres) - Postgres SQL backend + - [Contrib.KafkaFlow.Outbox.SqlServer](./src/Contrib.KafkaFlow.Outbox.SqlServer) - SQL Server backend - [Contrib.KafkaFlow.ProcessManagers](./src/Contrib.KafkaFlow.ProcessManagers/Readme.md) @@ -22,6 +23,8 @@ ecosystem. - [Contrib.KafkaFlow.ProcessManagers.Postgres](./src/Contrib.KafkaFlow.ProcessManagers.Postgres) - Postgres SQL backend for storing process' state + - [Contrib.KafkaFlow.ProcessManagers.SqlServer](./src/Contrib.KafkaFlow.ProcessManagers.SqlServer) - + SQL Server backend for storing process' state ## Usage example diff --git a/docker-compose.yaml b/docker-compose.yaml index 6b84381..74f1049 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -43,7 +43,7 @@ services: - KAFKA_SCHEMAREGISTRY_ENABLED=TRUE - KAFKA_SCHEMAREGISTRY_URLS=http://kafka:8081 - db: + postgres: image: public.ecr.aws/docker/library/postgres:14.7-alpine environment: PGDATA: /var/lib/postgresql/data/pgdata @@ -61,3 +61,23 @@ services: - "./local-env/postgres:/docker-entrypoint-initdb.d" - "./src/Contrib.KafkaFlow.Outbox.Postgres/schema:/docker-entrypoint-initdb.d/0001.outbox" - "./src/Contrib.KafkaFlow.ProcessManagers.Postgres/schema:/docker-entrypoint-initdb.d/0002.processes" + + mssql: + image: mcr.microsoft.com/mssql/server:2022-CU12-ubuntu-22.04 + environment: + ACCEPT_EULA: "Y" + MSSQL_SA_PASSWORD: "4YiUmU2YJ8$6eqbSXF8765Ck3" + MSSQL_PID: Developer + healthcheck: + test: /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "$${MSSQL_SA_PASSWORD}" -Q "SELECT 1" || exit 1 + interval: 5s + timeout: 5s + retries: 5 + ports: + - "1433:1433/tcp" + restart: unless-stopped + command: ["/bin/bash", "-c", "/docker-entrypoint-initdb.d/initdb.sh"] + volumes: + - "./local-env/mssql:/docker-entrypoint-initdb.d" + - "./src/Contrib.KafkaFlow.Outbox.SqlServer/schema:/docker-entrypoint-initdb.d/0001.outbox" + - "./src/Contrib.KafkaFlow.ProcessManagers.SqlServer/schema:/docker-entrypoint-initdb.d/0002.processes" diff --git a/local-env/mssql/initdb.sh b/local-env/mssql/initdb.sh new file mode 100644 index 0000000..d84fdf7 --- /dev/null +++ b/local-env/mssql/initdb.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + +# Start SQL Server +/opt/mssql/bin/sqlservr & + +# Wait for SQL to start +TRIES=60 +DBSTATUS=1 +ERRCODE=1 +i=0 + +echo "Waiting for SQL Server to start" +while [[ $DBSTATUS -ne 0 ]] && [[ $i -lt $TRIES ]]; do + i=$((i+1)) + DBSTATUS=$(/opt/mssql-tools/bin/sqlcmd -h -1 -t 1 -U sa -P $MSSQL_SA_PASSWORD -Q "SET NOCOUNT ON; Select COALESCE(SUM(state), 0) from sys.databases") || DBSTATUS=1 + if [ $DBSTATUS -ne 0 ]; then + sleep 1s + fi +done + +sleep 5s +if [ $DBSTATUS -ne 0 ]; then + echo "SQL Server took more than $TRIES seconds to start up or one or more databases are not in an ONLINE state" + exit 1 +fi + +mssql=( /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "$MSSQL_SA_PASSWORD" -d master -i ) + +find /docker-entrypoint-initdb.d -mindepth 2 -type f | sort | while read f; do + case "$f" in + *.sh) + if [ -x "$f" ]; then + echo "$0: running $f" + "$f" + else + echo "$0: sourcing $f" + . "$f" + fi + ;; + *.sql) echo "$0: running $f"; "${mssql[@]}" "$f"; echo ;; + *.sql.gz) echo "$0: running $f"; gunzip -c "$f" | "${mssql[@]}"; echo ;; + *) echo "$0: ignoring $f" ;; + esac + echo +done + +echo "SQL Server is running" +sleep infinity diff --git a/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj b/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj index 6059b89..30f3939 100644 --- a/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj +++ b/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/ConfigurationBuilderExtensions.cs b/src/Contrib.KafkaFlow.Outbox.SqlServer/ConfigurationBuilderExtensions.cs new file mode 100644 index 0000000..257bdec --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/ConfigurationBuilderExtensions.cs @@ -0,0 +1,17 @@ +using KafkaFlow; +using KafkaFlow.SqlServer; +using Microsoft.Extensions.DependencyInjection; + +namespace KafkaFlow.Outbox.SqlServer; + +public static class ConfigurationBuilderExtensions +{ + public static IServiceCollection AddSqlServerOutboxBackend(this IServiceCollection services) => + services.AddSingleton(); + + public static IServiceCollection AddSqlServerOutboxBackend(this IServiceCollection services, string connectionString) + { + services.ConfigureSqlServerBackend(options => options.ConnectionString = connectionString); + return AddSqlServerOutboxBackend(services); + } +} diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj b/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj new file mode 100644 index 0000000..c3972e9 --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj @@ -0,0 +1,26 @@ + + + + net8.0 + enable + enable + latest + true + latest + + + + + + + + + + + + + + + + + diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs b/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs new file mode 100644 index 0000000..86e7e06 --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs @@ -0,0 +1,95 @@ +using Confluent.Kafka; +using Dapper; +using KafkaFlow.SqlServer; +using Microsoft.Extensions.Options; +using System.Data.SqlClient; +using System.Text.Json; + +namespace KafkaFlow.Outbox.SqlServer; + +public class SqlServerOutboxBackend : IOutboxBackend +{ + private readonly SqlServerBackendOptions _options; + + public SqlServerOutboxBackend(IOptions options) + => _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + + public async ValueTask Store(TopicPartition topicPartition, Message message, CancellationToken token = default) + { + var sql = """ + INSERT INTO [outbox].[outbox] ([topic_name], [partition], [message_key], [message_headers], [message_body]) + VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body); + """; + + using var conn = new SqlConnection(_options.ConnectionString); + + var rawHeaders = + message.Headers == null + ? null + : JsonSerializer.Serialize(message.Headers.ToDictionary(x => x.Key, x => x.GetValueBytes())); + + var res = await conn.ExecuteAsync(sql, new + { + topic_name = topicPartition.Topic, + partition = topicPartition.Partition.IsSpecial ? null : (int?)topicPartition.Partition.Value, + message_key = message.Key, + message_headers = rawHeaders, + message_body = message.Value + }).ConfigureAwait(false); + + } + + public async ValueTask Read(int batchSize, CancellationToken token = default) + { + var sql = """ + DELETE FROM [outbox].[outbox] + OUTPUT DELETED.* + WHERE + [sequence_id] IN ( + SELECT TOP (@batch_size) [sequence_id] FROM [outbox].[outbox] + ORDER BY [sequence_id] + ); + """; + using var conn = new SqlConnection(_options.ConnectionString); + var result = await conn.QueryAsync(sql, new { batch_size = batchSize }); + + return result?.Select(ToOutboxRecord).ToArray() ?? Array.Empty(); + } + + private static OutboxRecord ToOutboxRecord(OutboxTableRow row) + { + var partition = row.partition.HasValue ? new Partition(row.partition.Value) : Partition.Any; + var topicPartition = new TopicPartition(row.topic_name, partition); + + var storedHeaders = + row.message_headers == null + ? null + : JsonSerializer.Deserialize>(row.message_headers); + + var headers = + storedHeaders?.Aggregate(new Headers(), (h, x) => + { + h.Add(x.Key, x.Value); + return h; + }); + + var msg = new Message + { + Key = row.message_key!, + Value = row.message_body!, + Headers = headers + }; + + return new OutboxRecord(topicPartition, msg); + } +} + +internal sealed class OutboxTableRow +{ + public long sequence_id { get; set; } + public string topic_name { get; set; } = null!; + public int? partition { get; set; } + public byte[]? message_key { get; set; } + public string? message_headers { get; set; } + public byte[]? message_body { get; set; } +} diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/schema/0001.Shema.sql b/src/Contrib.KafkaFlow.Outbox.SqlServer/schema/0001.Shema.sql new file mode 100644 index 0000000..456284a --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/schema/0001.Shema.sql @@ -0,0 +1,5 @@ +IF NOT EXISTS(SELECT * FROM sys.schemas WHERE name = 'outbox') +BEGIN + EXEC ('CREATE SCHEMA [outbox] AUTHORIZATION [dbo]') +END +GO diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/schema/0002.Table.sql b/src/Contrib.KafkaFlow.Outbox.SqlServer/schema/0002.Table.sql new file mode 100644 index 0000000..306883e --- /dev/null +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/schema/0002.Table.sql @@ -0,0 +1,18 @@ +/* Table */ +IF OBJECT_ID(N'[outbox].[outbox]', N'U') IS NULL +BEGIN + CREATE TABLE [outbox].[outbox]( + [sequence_id] [bigint] IDENTITY(1,1) NOT NULL, + [topic_name] [nvarchar](255) NOT NULL, + [partition] [int] NULL, + [message_key] [varbinary](max) NULL, + [message_headers] [nvarchar](max) NULL, + [message_body] [varbinary](max) NULL, + [date_added_utc] [datetime2] NOT NULL DEFAULT(SYSUTCDATETIME()), + [rowversion] [int] NOT NULL DEFAULT(1), + CONSTRAINT [PK_outbox] PRIMARY KEY CLUSTERED ([sequence_id] ASC), + CONSTRAINT [CK_headers_not_blank_or_empty] CHECK ((TRIM([message_headers])<>N'')), + CONSTRAINT [CK_topic_name_not_blank_or_empty] CHECK ((TRIM([topic_name])<>N'')) + ) +END +GO diff --git a/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj b/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj index 1835b88..8680311 100644 --- a/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj +++ b/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj @@ -9,10 +9,10 @@ - - + + - + diff --git a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj index 83e825c..6f33620 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/ConfigurationBuilderExtensions.cs b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/ConfigurationBuilderExtensions.cs new file mode 100644 index 0000000..422027c --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/ConfigurationBuilderExtensions.cs @@ -0,0 +1,18 @@ +using KafkaFlow; +using KafkaFlow.SqlServer; +using Microsoft.Extensions.DependencyInjection; + +namespace KafkaFlow.ProcessManagers.SqlServer; + +public static class ConfigurationBuilderExtensions +{ + public static IServiceCollection AddSqlServerProcessManagerState(this IServiceCollection services) => + services.AddSingleton(); + + + public static IServiceCollection AddSqlServerProcessManagerState(this IServiceCollection services, string connectionString) + { + services.ConfigureSqlServerBackend(options => options.ConnectionString = connectionString); + return AddSqlServerProcessManagerState(services); + } +} diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj new file mode 100644 index 0000000..bf370c7 --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj @@ -0,0 +1,26 @@ + + + + net8.0 + enable + enable + latest + true + latest + + + + + + + + + + + + + + + + + diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs new file mode 100644 index 0000000..ccd29cf --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/SqlServerProcessManagersStore.cs @@ -0,0 +1,101 @@ +using Dapper; +using KafkaFlow.SqlServer; +using Microsoft.Extensions.Options; +using System.Data.SqlClient; +using System.Text.Json; + +namespace KafkaFlow.ProcessManagers.SqlServer; + +public sealed class SqlServerProcessManagersStore : IProcessStateStore +{ + private readonly SqlServerBackendOptions _options; + + public SqlServerProcessManagersStore(IOptions options) + => _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + + public async ValueTask Persist(Type processType, Guid processId, VersionedState state) + { + var sql = """ + MERGE INTO [process_managers].[processes] as [target] + USING (VALUES (@process_type, @process_id,@process_state)) AS [source] ([process_type], [process_id], [process_state]) + ON [target].[process_type] = @process_type AND [target].[process_id] = @process_id + WHEN MATCHED AND [target].[rowversion] = @Version THEN + UPDATE SET + [process_state] = @process_state, + [date_updated_utc] = SYSUTCDATETIME(), + [rowversion] = [target].[rowversion] + 1 + WHEN NOT MATCHED THEN + INSERT ([process_type], [process_id], [process_state]) + VALUES (@process_type, @process_id,@process_state); + """; + + using var conn = new SqlConnection(_options.ConnectionString); + var result = await conn.ExecuteAsync(sql, new + { + process_type = processType.FullName, + process_id = processId, + process_state = JsonSerializer.Serialize(state.State), + version = state.Version + }); + + if (result == 0) + { + throw new OptimisticConcurrencyException(processType, processId, + $"Concurrency error when persisting state {processType.FullName}"); + } + } + + public async ValueTask Load(Type processType, Guid processId) + { + var sql = """ + SELECT [process_state], [rowversion] as [version] + FROM [process_managers].[processes] + WHERE [process_type] = @process_type AND [process_id] = @process_id; + """; + + using var conn = new SqlConnection(_options.ConnectionString); + var result = await conn.QueryAsync(sql, new + { + process_type = processType.FullName, + process_id = processId + }); + + var firstResult = result?.FirstOrDefault(); + + if (firstResult == null) + { + return VersionedState.Zero; + } + + var decoded = JsonSerializer.Deserialize(firstResult.process_state, processType); + return new VersionedState(firstResult.version, decoded); + } + + public async ValueTask Delete(Type processType, Guid processId, int version) + { + var sql = """ + DELETE FROM [process_managers].[processes] + WHERE [process_type] = @process_type AND [process_id] = @process_id and [rowversion] = @version; + """; + + using var conn = new SqlConnection(_options.ConnectionString); + var result = await conn.ExecuteAsync(sql, new + { + process_type = processType.FullName, + process_id = processId, + version + }); + + if (result == 0) + { + throw new OptimisticConcurrencyException(processType, processId, + $"Concurrency error when persisting state {processType.FullName}"); + } + } + + private sealed class ProcessStateRow + { + public required string process_state { get; set; } + public required int version { get; set; } + } +} diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/schema/0001.Schema.sql b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/schema/0001.Schema.sql new file mode 100644 index 0000000..1b59cb6 --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/schema/0001.Schema.sql @@ -0,0 +1,5 @@ +IF NOT EXISTS(SELECT * FROM sys.schemas WHERE name = 'process_managers') +BEGIN + EXEC ('CREATE SCHEMA [process_managers] AUTHORIZATION [dbo]') +END +GO diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/schema/0002.Table.sql b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/schema/0002.Table.sql new file mode 100644 index 0000000..e31f1f9 --- /dev/null +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/schema/0002.Table.sql @@ -0,0 +1,17 @@ +/* Table */ +IF OBJECT_ID(N'[process_managers].[processes]', N'U') IS NULL +BEGIN + CREATE TABLE [process_managers].[processes]( + [sequence_id] [bigint] IDENTITY(1,1) NOT NULL, + [process_type] [nvarchar](255) NOT NULL, + [process_id] [uniqueidentifier] NOT NULL, + [process_state] [nvarchar](max) NOT NULL, + [date_added_utc] [datetime2] NOT NULL DEFAULT(SYSUTCDATETIME()), + [date_updated_utc] [datetime2] NOT NULL DEFAULT(SYSUTCDATETIME()), + [rowversion] [int] NOT NULL DEFAULT(1), + CONSTRAINT [PK_processes] PRIMARY KEY CLUSTERED ([sequence_id] ASC), + CONSTRAINT [CK_process_state_not_blank_or_empty] CHECK ((TRIM([process_state])<>N'')) + ); + + CREATE UNIQUE NONCLUSTERED INDEX [IX_processes] ON [process_managers].[processes] ([process_type] ASC,[process_id] ASC) WITH (FILLFACTOR = 90); +END diff --git a/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj b/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj index 3be5b7c..f7eb111 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj @@ -8,9 +8,9 @@ - + - + diff --git a/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj b/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj new file mode 100644 index 0000000..b5f4601 --- /dev/null +++ b/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj @@ -0,0 +1,17 @@ + + + + net8.0 + enable + enable + latest + true + latest + + + + + + + + diff --git a/src/Contrib.KafkaFlow.SqlServer/ServiceCollectionExtensions.cs b/src/Contrib.KafkaFlow.SqlServer/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..2f27593 --- /dev/null +++ b/src/Contrib.KafkaFlow.SqlServer/ServiceCollectionExtensions.cs @@ -0,0 +1,25 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace KafkaFlow.SqlServer; + +public static class ServiceCollectionExtensions +{ + public static IServiceCollection ConfigureSqlServerBackend(this IServiceCollection services, IConfiguration configuration) + => services.Configure(configuration); + + public static IServiceCollection ConfigureSqlServerBackend(this IServiceCollection services, Action configureOptions) + => services.Configure(configureOptions); + + public static IServiceCollection ConfigureSqlServerBackend(this IServiceCollection services, IConfiguration configuration, Action? configureBinder) + => services.Configure(configuration, configureBinder); + + public static IServiceCollection ConfigureSqlServerBackend(this IServiceCollection services, string? name, IConfiguration configuration) + => services.Configure(name, configuration); + + public static IServiceCollection ConfigureSqlServerBackend(this IServiceCollection services, string? name, Action configureOptions) + => services.Configure(name, configureOptions); + + public static IServiceCollection ConfigureSqlServerBackend(this IServiceCollection services, string? name, IConfiguration configuration, Action? configureBinder) + => services.Configure(name, configuration, configureBinder); +} diff --git a/src/Contrib.KafkaFlow.SqlServer/SqlServerBackendOptions.cs b/src/Contrib.KafkaFlow.SqlServer/SqlServerBackendOptions.cs new file mode 100644 index 0000000..fca319c --- /dev/null +++ b/src/Contrib.KafkaFlow.SqlServer/SqlServerBackendOptions.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.SqlServer; + +public record SqlServerBackendOptions +{ + public required string ConnectionString { get; set; } +} diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/KafkaFlowFixture.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs similarity index 91% rename from tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/KafkaFlowFixture.cs rename to tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs index 7c948f9..5242304 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/KafkaFlowFixture.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/PostgresKafkaFlowFixture.cs @@ -12,7 +12,7 @@ namespace KafkaFlow.ProcessManagers.IntegrationTests.Fixture; -public class KafkaFlowFixture : IDisposable, IAsyncDisposable +public class PostgresKafkaFlowFixture : IDisposable, IAsyncDisposable { public readonly string FixtureId = Guid.NewGuid().ToString(); public string TopicName { get; } @@ -24,10 +24,10 @@ public class KafkaFlowFixture : IDisposable, IAsyncDisposable public IMessageProducer Producer { get; } - public KafkaFlowFixture() + public PostgresKafkaFlowFixture() { _fixtureCancellation = new CancellationTokenSource(); - TopicName = $"messages-{FixtureId}"; + TopicName = $"pg-messages-{FixtureId}"; var services = new ServiceCollection(); @@ -55,7 +55,7 @@ public KafkaFlowFixture() .WithBrokers(new[] { "localhost:9092 " }) .CreateTopicIfNotExists(TopicName, 3, 1) .AddOutboxDispatcher(x => x.WithPartitioner(Partitioner.Murmur2Random)) - .AddProducer(producer => + .AddProducer(producer => producer .WithOutbox() .DefaultTopic(TopicName) @@ -69,14 +69,14 @@ public KafkaFlowFixture() .AddConsumer(consumer => consumer .Topic(TopicName) - .WithGroupId($"group-{FixtureId}") + .WithGroupId($"pg-group-{FixtureId}") .WithBufferSize(100) .WithWorkersCount(1) .WithAutoOffsetReset(AutoOffsetReset.Earliest) .AddMiddlewares(middlewares => middlewares .AddDeserializer() - .AddProcessManagers(pm => pm.AddProcessManagersFromAssemblyOf()) + .AddProcessManagers(pm => pm.AddProcessManagersFromAssemblyOf()) ) ) ) @@ -85,7 +85,7 @@ public KafkaFlowFixture() ProcessStateStore = (LoggingProcessStateStore)ServiceProvider.GetRequiredService(); - Producer = ServiceProvider.GetRequiredService>(); + Producer = ServiceProvider.GetRequiredService>(); var svc = ServiceProvider.GetServices(); diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs new file mode 100644 index 0000000..a841357 --- /dev/null +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/Fixture/SqlServerKafkaFlowFixture.cs @@ -0,0 +1,121 @@ +using Confluent.Kafka; +using KafkaFlow.Consumers; +using KafkaFlow.Outbox; +using KafkaFlow.Outbox.SqlServer; +using KafkaFlow.ProcessManagers.SqlServer; +using KafkaFlow.Serializer; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace KafkaFlow.ProcessManagers.IntegrationTests.Fixture; + +public class SqlServerKafkaFlowFixture : IDisposable, IAsyncDisposable +{ + public readonly string FixtureId = Guid.NewGuid().ToString(); + public string TopicName { get; } + public readonly ServiceProvider ServiceProvider; + private readonly IKafkaBus _kafkaBus; + private readonly CancellationTokenSource _fixtureCancellation; + + public LoggingProcessStateStore ProcessStateStore { get; } + + public IMessageProducer Producer { get; } + + public SqlServerKafkaFlowFixture() + { + _fixtureCancellation = new CancellationTokenSource(); + TopicName = $"mssql-messages-{FixtureId}"; + + var services = new ServiceCollection(); + + var config = new ConfigurationManager() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", optional: false) + .AddEnvironmentVariables() + .Build(); + + services + .AddSingleton(config) + .AddLogging(log => log.AddConsole().AddDebug()) + .AddSqlServerProcessManagerState(config.GetConnectionString("SqlServerBackend")) + .Decorate() + .AddSqlServerOutboxBackend() + .AddKafka(kafka => + kafka + .UseMicrosoftLog() + .AddCluster(cluster => + cluster + .WithBrokers(new[] { "localhost:9092 " }) + .CreateTopicIfNotExists(TopicName, 3, 1) + .AddOutboxDispatcher(x => x.WithPartitioner(Partitioner.Murmur2Random)) + .AddProducer(producer => + producer + .WithOutbox() + .DefaultTopic(TopicName) + .AddMiddlewares(m => m.AddSerializer())) + .AddProducer(producer => + producer + .WithOutbox() + .DefaultTopic(TopicName) + .AddMiddlewares(m => m.AddSerializer()) + ) + .AddConsumer(consumer => + consumer + .Topic(TopicName) + .WithGroupId($"mssql-group-{FixtureId}") + .WithBufferSize(100) + .WithWorkersCount(1) + .WithAutoOffsetReset(AutoOffsetReset.Earliest) + .AddMiddlewares(middlewares => + middlewares + .AddDeserializer() + .AddProcessManagers(pm => pm.AddProcessManagersFromAssemblyOf()) + ) + ) + ) + ); + ServiceProvider = services.BuildServiceProvider(); + + ProcessStateStore = (LoggingProcessStateStore)ServiceProvider.GetRequiredService(); + + Producer = ServiceProvider.GetRequiredService>(); + + var svc = ServiceProvider.GetServices(); + + foreach (var service in svc) + { + service.StartAsync(_fixtureCancellation.Token); + } + + _kafkaBus = ServiceProvider.CreateKafkaBus(); + _kafkaBus.StartAsync(_fixtureCancellation.Token); + } + + public void Dispose() + { + if (!_disposedAsync) + { + DisposeAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + } + } + + private bool _disposedAsync = false; + + public async ValueTask DisposeAsync() + { + _disposedAsync = true; + + _fixtureCancellation.Cancel(); + _fixtureCancellation.Dispose(); + await _kafkaBus.StopAsync(); + + foreach (var cons in ServiceProvider.GetRequiredService().All) + { + await cons.StopAsync(); + } + + await ServiceProvider.DisposeAsync(); + } +} diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj b/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj index e11d3d4..4a429eb 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj @@ -1,4 +1,4 @@ - + net8.0 @@ -11,23 +11,23 @@ - - - - - + + + + + - + - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all @@ -35,9 +35,12 @@ + + + diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs new file mode 100644 index 0000000..6dbe813 --- /dev/null +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/SqlServerProcessManagerStoreTests.cs @@ -0,0 +1,41 @@ +using FluentAssertions; +using KafkaFlow.ProcessManagers.SqlServer; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace KafkaFlow.ProcessManagers.IntegrationTests; +public sealed class SqlServerProcessManagerStoreTests +{ + [Fact] + public async Task Should_write_update_and_delete_state() + { + var processId = Guid.NewGuid(); + var state = processId.ToString(); + + var config = + new ConfigurationManager() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", optional: false) + .AddEnvironmentVariables() + .Build(); + + var services = new ServiceCollection(); + services.AddSqlServerProcessManagerState(config.GetConnectionString("SqlServerBackend")); + var sp = services.BuildServiceProvider(); + + var store = sp.GetRequiredService(); + + var noState = await store.Load(state.GetType(), processId); + noState.Should().BeEquivalentTo(VersionedState.Zero); + + await store.Persist(state.GetType(), processId, new VersionedState(0, state)); + + var hasState = await store.Load(state.GetType(), processId); + hasState.State.Should().NotBeNull(); + hasState.Version.Should().BePositive(); + + await store.Delete(state.GetType(), processId, hasState.Version); + var goneState = await store.Load(state.GetType(), processId); + goneState.Should().BeEquivalentTo(VersionedState.Zero); + } +} diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifecycleProcessManagerTests.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/PostgresUserLifecycleProcessManagerTests.cs similarity index 81% rename from tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifecycleProcessManagerTests.cs rename to tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/PostgresUserLifecycleProcessManagerTests.cs index 7ddaa32..2c2e986 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/UserLifecycleProcessManagerTests.cs +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/PostgresUserLifecycleProcessManagerTests.cs @@ -3,11 +3,11 @@ namespace KafkaFlow.ProcessManagers.IntegrationTests.UserLifeCycle; -public sealed class UserLifecycleProcessManagerTests : IAssemblyFixture +public sealed class PostgresUserLifecycleProcessManagerTests : IAssemblyFixture { - private readonly KafkaFlowFixture _fixture; + private readonly PostgresKafkaFlowFixture _fixture; - public UserLifecycleProcessManagerTests(KafkaFlowFixture fixture) + public PostgresUserLifecycleProcessManagerTests(PostgresKafkaFlowFixture fixture) { _fixture = fixture ?? throw new ArgumentNullException(nameof(fixture)); _fixture.ProcessStateStore.ClearChanges(); diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/SqlServerLifecycleProcessManagerTests.cs b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/SqlServerLifecycleProcessManagerTests.cs new file mode 100644 index 0000000..2c5210e --- /dev/null +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/UserLifeCycle/SqlServerLifecycleProcessManagerTests.cs @@ -0,0 +1,35 @@ +using FluentAssertions; +using KafkaFlow.ProcessManagers.IntegrationTests.Fixture; + +namespace KafkaFlow.ProcessManagers.IntegrationTests.UserLifeCycle; + +public sealed class SqlServerLifecycleProcessManagerTests : IAssemblyFixture +{ + private readonly SqlServerKafkaFlowFixture _fixture; + + public SqlServerLifecycleProcessManagerTests(SqlServerKafkaFlowFixture fixture) + { + _fixture = fixture ?? throw new ArgumentNullException(nameof(fixture)); + _fixture.ProcessStateStore.ClearChanges(); + } + + [Fact] + public async Task Should_run_user_registration_simulation() + { + var message = new UserRegistered(Guid.NewGuid(), "test@test.com"); + await _fixture.Producer.ProduceAsync(message.UserId.ToString(), message); + + TestUtils.RetryFor(TimeSpan.FromSeconds(30), TimeSpan.FromMicroseconds(100), () => + { + _fixture.ProcessStateStore + .Changes + .Select(x => x.Item1) + .Should().BeEquivalentTo(new[] + { + LoggingProcessStateStore.ActionType.Persisted, + LoggingProcessStateStore.ActionType.Persisted, + LoggingProcessStateStore.ActionType.Deleted + }, x => x.WithStrictOrdering()); + }); + } +} diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/appsettings.json b/tests/KafkaFlow.ProcessManagers.IntegrationTests/appsettings.json index e54fe79..db41e93 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/appsettings.json +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/appsettings.json @@ -1,5 +1,6 @@ { "ConnectionStrings": { - "PostgresBackend": "Host=localhost;Username=postgres;Password=postgres;Database=postgres" + "PostgresBackend": "Host=localhost;Username=postgres;Password=postgres;Database=postgres", + "SqlServerBackend": "Server=localhost;User Id=SA;Password=4YiUmU2YJ8$6eqbSXF8765Ck3;Database=kafkaflowtest" } }