diff --git a/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersion.sql b/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersion.sql index 60963f798..b9c72b097 100644 --- a/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersion.sql +++ b/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersion.sql @@ -1,67 +1,54 @@ BEGIN TRANSACTION AppendStream; + DECLARE @streamIdInternal AS INT; DECLARE @latestStreamVersion AS INT; - DECLARE @latestStreamPosition AS BIGINT; + DECLARE @latestStreamPosition AS BIGINT; - SELECT @streamIdInternal = dbo.Streams.IdInternal, - @latestStreamVersion = dbo.Streams.[Version] - FROM dbo.Streams - WHERE dbo.Streams.Id = @streamId; + SELECT @streamIdInternal = dbo.Streams.IdInternal, @latestStreamVersion = dbo.Streams.[Version] + FROM dbo.Streams WITH (UPDLOCK, ROWLOCK) + WHERE dbo.Streams.Id = @streamId; - IF @streamIdInternal IS NULL + IF @streamIdInternal IS NULL BEGIN ROLLBACK TRANSACTION AppendStream; RAISERROR('WrongExpectedVersion', 16, 1); RETURN; END - - IF @latestStreamVersion != @expectedStreamVersion + IF @latestStreamVersion != @expectedStreamVersion BEGIN ROLLBACK TRANSACTION AppendStream; RAISERROR('WrongExpectedVersion', 16, 2); RETURN; END -INSERT INTO dbo.Messages (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata) - SELECT @streamIdInternal, - StreamVersion + @latestStreamVersion + 1, - Id, - Created, - [Type], - JsonData, - JsonMetadata - FROM @newMessages - ORDER BY StreamVersion; + INSERT INTO dbo.Messages + (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata) + SELECT @streamIdInternal, StreamVersion + @latestStreamVersion + 1, Id, Created, [Type], JsonData, JsonMetadata + FROM @newMessages + ORDER BY StreamVersion; - SELECT TOP(1) - @latestStreamVersion = dbo.Messages.StreamVersion, - @latestStreamPosition = dbo.Messages.Position - FROM dbo.Messages - WHERE dbo.Messages.StreamIDInternal = @streamIdInternal - ORDER BY dbo.Messages.Position DESC + SET @latestStreamPosition = SCOPE_IDENTITY() - UPDATE dbo.Streams + SELECT @latestStreamVersion = MAX(StreamVersion) + @latestStreamVersion + 1 + FROM @newMessages + + UPDATE dbo.Streams SET dbo.Streams.[Version] = @latestStreamVersion, dbo.Streams.[Position] = @latestStreamPosition - WHERE dbo.Streams.IdInternal = @streamIdInternal + WHERE dbo.Streams.IdInternal = @streamIdInternal COMMIT TRANSACTION AppendStream; /* Select CurrentVersion, CurrentPosition */ - SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition +SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition /* Select Metadata */ - DECLARE @metadataStreamId as NVARCHAR(42) - DECLARE @metadataStreamIdInternal as INT - SET @metadataStreamId = '$$' + @streamId - - SELECT @metadataStreamIdInternal = dbo.Streams.IdInternal - FROM dbo.Streams - WHERE dbo.Streams.Id = @metadataStreamId; - SELECT TOP(1) - dbo.Messages.JsonData - FROM dbo.Messages - WHERE dbo.Messages.StreamIdInternal = @metadataStreamIdInternal - ORDER BY dbo.Messages.Position DESC; +SELECT dbo.Messages.JsonData +FROM dbo.Messages +WHERE dbo.Messages.Position = ( + SELECT dbo.Streams.Position + FROM dbo.Streams + WHERE dbo.Streams.Id = '$$' + @streamId +) diff --git a/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersionAny.sql b/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersionAny.sql index 21f21bff8..9972cc31f 100644 --- a/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersionAny.sql +++ b/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersionAny.sql @@ -1,64 +1,63 @@ DECLARE @streamIdInternal AS INT; BEGIN TRANSACTION CreateStreamIfNotExists; - IF NOT EXISTS (SELECT * FROM dbo.Streams WITH (UPDLOCK, ROWLOCK, HOLDLOCK) - WHERE dbo.Streams.Id = @streamId) - INSERT INTO dbo.Streams (Id, IdOriginal) VALUES (@streamId, @streamIdOriginal); + + IF NOT EXISTS ( + SELECT * + FROM dbo.Streams WITH (UPDLOCK, ROWLOCK, HOLDLOCK) + WHERE dbo.Streams.Id = @streamId + ) + INSERT INTO dbo.Streams (Id, IdOriginal) + VALUES (@streamId, @streamIdOriginal); COMMIT TRANSACTION CreateStreamIfNotExists; BEGIN TRANSACTION AppendStream; - DECLARE @latestStreamVersion AS INT; - DECLARE @latestStreamPosition AS BIGINT; - - SELECT @streamIdInternal = dbo.Streams.IdInternal, - @latestStreamVersion = dbo.Streams.[Version] - FROM dbo.Streams WITH (UPDLOCK, ROWLOCK) - WHERE dbo.Streams.Id = @streamId; - -INSERT INTO dbo.Messages (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata) - SELECT @streamIdInternal, - StreamVersion + @latestStreamVersion + 1, - Id, - Created, - [Type], - JsonData, - JsonMetadata - FROM @newMessages - ORDER BY StreamVersion - SELECT TOP(1) - @latestStreamVersion = dbo.Messages.StreamVersion, - @latestStreamPosition = dbo.Messages.Position - FROM dbo.Messages - WHERE dbo.Messages.StreamIDInternal = @streamIdInternal - ORDER BY dbo.Messages.Position DESC - - IF @latestStreamPosition IS NULL - SET @latestStreamPosition = -1 - - UPDATE dbo.Streams - SET dbo.Streams.[Version] = @latestStreamVersion, - dbo.Streams.[Position] = @latestStreamPosition - WHERE dbo.Streams.IdInternal = @streamIdInternal + DECLARE @latestStreamVersion AS INT; + DECLARE @latestStreamPosition AS BIGINT; + + SELECT @streamIdInternal = dbo.Streams.IdInternal, @latestStreamVersion = dbo.Streams.[Version], @latestStreamPosition = dbo.Streams.[Position] + FROM dbo.Streams WITH (UPDLOCK, ROWLOCK) + WHERE dbo.Streams.Id = @streamId; + + IF @hasMessages = 1 + BEGIN + INSERT INTO dbo.Messages + (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata) + SELECT @streamIdInternal, StreamVersion + @latestStreamVersion + 1, Id, Created, [Type], JsonData, JsonMetadata + FROM @newMessages + ORDER BY StreamVersion + + SET @latestStreamPosition = SCOPE_IDENTITY() + + SELECT @latestStreamVersion = MAX(StreamVersion) + @latestStreamVersion + 1 + FROM @newMessages + SET @latestStreamVersion = @latestStreamVersion + + UPDATE dbo.Streams + SET dbo.Streams.[Version] = @latestStreamVersion, + dbo.Streams.[Position] = @latestStreamPosition + WHERE dbo.Streams.IdInternal = @streamIdInternal + END + ELSE + BEGIN + SET @latestStreamPosition = ISNULL(@latestStreamPosition, -1) + SET @latestStreamVersion = ISNULL(@latestStreamVersion, -1) + END COMMIT TRANSACTION AppendStream; /* Select CurrentVersion, CurrentPosition */ - SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition +SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition /* Select Metadata */ - DECLARE @metadataStreamId as NVARCHAR(42) - DECLARE @metadataStreamIdInternal as INT - SET @metadataStreamId = '$$' + @streamId - - SELECT @metadataStreamIdInternal = dbo.Streams.IdInternal - FROM dbo.Streams - WHERE dbo.Streams.Id = @metadataStreamId; - SELECT TOP(1) - dbo.Messages.JsonData - FROM dbo.Messages - WHERE dbo.Messages.StreamIdInternal = @metadataStreamIdInternal - ORDER BY dbo.Messages.Position DESC; +SELECT dbo.Messages.JsonData +FROM dbo.Messages +WHERE dbo.Messages.Position = ( + SELECT dbo.Streams.Position + FROM dbo.Streams + WHERE dbo.Streams.Id = '$$' + @streamId +) \ No newline at end of file diff --git a/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersionNoStream.sql b/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersionNoStream.sql index eedd73581..577e2aa86 100644 --- a/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersionNoStream.sql +++ b/src/SqlStreamStore.MsSql/MsSqlScripts/AppendStreamExpectedVersionNoStream.sql @@ -1,58 +1,56 @@ BEGIN TRANSACTION CreateStream; + DECLARE @streamIdInternal AS INT; DECLARE @latestStreamVersion AS INT; - DECLARE @latestStreamPosition AS BIGINT; + DECLARE @latestStreamPosition AS BIGINT; BEGIN - INSERT INTO dbo.Streams (Id, IdOriginal) VALUES (@streamId, @streamIdOriginal); - SELECT @streamIdInternal = SCOPE_IDENTITY(); - - INSERT INTO dbo.Messages (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata) - SELECT @streamIdInternal, - StreamVersion, - Id, - Created, - [Type], - JsonData, - JsonMetadata - FROM @newMessages - ORDER BY StreamVersion; - - SELECT TOP(1) - @latestStreamVersion = dbo.Messages.StreamVersion, - @latestStreamPosition = dbo.Messages.Position - FROM dbo.Messages - WHERE dbo.Messages.StreamIDInternal = @streamIdInternal - ORDER BY dbo.Messages.Position DESC - - IF @latestStreamVersion IS NULL - SET @latestStreamVersion = -1 - - IF @latestStreamPosition IS NULL - SET @latestStreamPosition = -1 - - UPDATE dbo.Streams - SET dbo.Streams.[Version] = @latestStreamVersion, - dbo.Streams.[Position] = @latestStreamPosition - WHERE dbo.Streams.IdInternal = @streamIdInternal + + INSERT INTO dbo.Streams + (Id, IdOriginal) + VALUES + (@streamId, @streamIdOriginal); + + SET @streamIdInternal = SCOPE_IDENTITY(); + + IF @hasMessages = 1 + BEGIN + INSERT INTO dbo.Messages + (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata) + SELECT @streamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata + FROM @newMessages + ORDER BY StreamVersion; + + SET @latestStreamPosition = SCOPE_IDENTITY() + + SELECT @latestStreamVersion = MAX(StreamVersion) + FROM @newMessages + + UPDATE dbo.Streams + SET dbo.Streams.[Version] = @latestStreamVersion, + dbo.Streams.[Position] = @latestStreamPosition + WHERE dbo.Streams.IdInternal = @streamIdInternal + END + ELSE + BEGIN + SET @latestStreamPosition = -1 + SET @latestStreamVersion = -1 + END + END; + COMMIT TRANSACTION CreateStream; /* Select CurrentVersion, CurrentPosition */ - SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition +SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition /* Select Metadata */ - DECLARE @metadataStreamId as NVARCHAR(42) - DECLARE @metadataStreamIdInternal as INT - SET @metadataStreamId = '$$' + @streamId - - SELECT @metadataStreamIdInternal = dbo.Streams.IdInternal - FROM dbo.Streams - WHERE dbo.Streams.Id = @metadataStreamId; - - SELECT TOP(1) - dbo.Messages.JsonData - FROM dbo.Messages - WHERE dbo.Messages.StreamIdInternal = @metadataStreamIdInternal - ORDER BY dbo.Messages.Position DESC; + +SELECT dbo.Messages.JsonData +FROM dbo.Messages +WHERE dbo.Messages.Position = ( + SELECT dbo.Streams.Position + FROM dbo.Streams + WHERE dbo.Streams.Id = '$$' + @streamId +) \ No newline at end of file diff --git a/src/SqlStreamStore.MsSql/MsSqlStreamStore.AppendStream.cs b/src/SqlStreamStore.MsSql/MsSqlStreamStore.AppendStream.cs index d389d00cd..0c47dd7cb 100644 --- a/src/SqlStreamStore.MsSql/MsSqlStreamStore.AppendStream.cs +++ b/src/SqlStreamStore.MsSql/MsSqlStreamStore.AppendStream.cs @@ -139,12 +139,14 @@ private async Task AppendToStreamExpectedVersionAny( var sqlDataRecords = CreateSqlDataRecords(messages); var eventsParam = CreateNewMessagesSqlParameter(sqlDataRecords); command.Parameters.Add(eventsParam); + command.Parameters.AddWithValue("hasMessages", true); } else { // Must use a null value for the table-valued param if there are no records var eventsParam = CreateNewMessagesSqlParameter(null); command.Parameters.Add(eventsParam); + command.Parameters.AddWithValue("hasMessages", false); } try @@ -241,12 +243,14 @@ private async Task AppendToStreamExpectedVersionNoStream( var sqlDataRecords = CreateSqlDataRecords(messages); var eventsParam = CreateNewMessagesSqlParameter(sqlDataRecords); command.Parameters.Add(eventsParam); + command.Parameters.AddWithValue("hasMessages", true); } else { // Must use a null value for the table-valued param if there are no records var eventsParam = CreateNewMessagesSqlParameter(null); command.Parameters.Add(eventsParam); + command.Parameters.AddWithValue("hasMessages", false); } try