Skip to content
This repository was archived by the owner on Aug 15, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,67 +1,54 @@
BEGIN TRANSACTION AppendStream;

DECLARE @streamIdInternal AS INT;
DECLARE @latestStreamVersion AS INT;
DECLARE @latestStreamPosition AS BIGINT;
DECLARE @latestStreamPosition AS BIGINT;

SELECT @streamIdInternal = dbo.Streams.IdInternal,
@latestStreamVersion = dbo.Streams.[Version]
FROM dbo.Streams
WHERE dbo.Streams.Id = @streamId;
SELECT @streamIdInternal = dbo.Streams.IdInternal, @latestStreamVersion = dbo.Streams.[Version]
FROM dbo.Streams WITH (UPDLOCK, ROWLOCK)
WHERE dbo.Streams.Id = @streamId;

IF @streamIdInternal IS NULL
IF @streamIdInternal IS NULL
BEGIN
ROLLBACK TRANSACTION AppendStream;
RAISERROR('WrongExpectedVersion', 16, 1);
RETURN;
END

IF @latestStreamVersion != @expectedStreamVersion
IF @latestStreamVersion != @expectedStreamVersion
BEGIN
ROLLBACK TRANSACTION AppendStream;
RAISERROR('WrongExpectedVersion', 16, 2);
RETURN;
END

INSERT INTO dbo.Messages (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata)
SELECT @streamIdInternal,
StreamVersion + @latestStreamVersion + 1,
Id,
Created,
[Type],
JsonData,
JsonMetadata
FROM @newMessages
ORDER BY StreamVersion;
INSERT INTO dbo.Messages
(StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata)
SELECT @streamIdInternal, StreamVersion + @latestStreamVersion + 1, Id, Created, [Type], JsonData, JsonMetadata
FROM @newMessages
ORDER BY StreamVersion;

SELECT TOP(1)
@latestStreamVersion = dbo.Messages.StreamVersion,
@latestStreamPosition = dbo.Messages.Position
FROM dbo.Messages
WHERE dbo.Messages.StreamIDInternal = @streamIdInternal
ORDER BY dbo.Messages.Position DESC
SET @latestStreamPosition = SCOPE_IDENTITY()

UPDATE dbo.Streams
SELECT @latestStreamVersion = MAX(StreamVersion) + @latestStreamVersion + 1
FROM @newMessages

UPDATE dbo.Streams
SET dbo.Streams.[Version] = @latestStreamVersion,
dbo.Streams.[Position] = @latestStreamPosition
WHERE dbo.Streams.IdInternal = @streamIdInternal
WHERE dbo.Streams.IdInternal = @streamIdInternal

COMMIT TRANSACTION AppendStream;

/* Select CurrentVersion, CurrentPosition */

SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition
SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition

/* Select Metadata */
DECLARE @metadataStreamId as NVARCHAR(42)
DECLARE @metadataStreamIdInternal as INT
SET @metadataStreamId = '$$' + @streamId

SELECT @metadataStreamIdInternal = dbo.Streams.IdInternal
FROM dbo.Streams
WHERE dbo.Streams.Id = @metadataStreamId;

SELECT TOP(1)
dbo.Messages.JsonData
FROM dbo.Messages
WHERE dbo.Messages.StreamIdInternal = @metadataStreamIdInternal
ORDER BY dbo.Messages.Position DESC;
SELECT dbo.Messages.JsonData
FROM dbo.Messages
WHERE dbo.Messages.Position = (
SELECT dbo.Streams.Position
FROM dbo.Streams
WHERE dbo.Streams.Id = '$$' + @streamId
)
Original file line number Diff line number Diff line change
@@ -1,64 +1,63 @@
DECLARE @streamIdInternal AS INT;

BEGIN TRANSACTION CreateStreamIfNotExists;
IF NOT EXISTS (SELECT * FROM dbo.Streams WITH (UPDLOCK, ROWLOCK, HOLDLOCK)
WHERE dbo.Streams.Id = @streamId)
INSERT INTO dbo.Streams (Id, IdOriginal) VALUES (@streamId, @streamIdOriginal);

IF NOT EXISTS (
SELECT *
FROM dbo.Streams WITH (UPDLOCK, ROWLOCK, HOLDLOCK)
WHERE dbo.Streams.Id = @streamId
)
INSERT INTO dbo.Streams (Id, IdOriginal)
VALUES (@streamId, @streamIdOriginal);

COMMIT TRANSACTION CreateStreamIfNotExists;

BEGIN TRANSACTION AppendStream;
DECLARE @latestStreamVersion AS INT;
DECLARE @latestStreamPosition AS BIGINT;

SELECT @streamIdInternal = dbo.Streams.IdInternal,
@latestStreamVersion = dbo.Streams.[Version]
FROM dbo.Streams WITH (UPDLOCK, ROWLOCK)
WHERE dbo.Streams.Id = @streamId;

INSERT INTO dbo.Messages (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata)
SELECT @streamIdInternal,
StreamVersion + @latestStreamVersion + 1,
Id,
Created,
[Type],
JsonData,
JsonMetadata
FROM @newMessages
ORDER BY StreamVersion

SELECT TOP(1)
@latestStreamVersion = dbo.Messages.StreamVersion,
@latestStreamPosition = dbo.Messages.Position
FROM dbo.Messages
WHERE dbo.Messages.StreamIDInternal = @streamIdInternal
ORDER BY dbo.Messages.Position DESC

IF @latestStreamPosition IS NULL
SET @latestStreamPosition = -1

UPDATE dbo.Streams
SET dbo.Streams.[Version] = @latestStreamVersion,
dbo.Streams.[Position] = @latestStreamPosition
WHERE dbo.Streams.IdInternal = @streamIdInternal
DECLARE @latestStreamVersion AS INT;
DECLARE @latestStreamPosition AS BIGINT;

SELECT @streamIdInternal = dbo.Streams.IdInternal, @latestStreamVersion = dbo.Streams.[Version], @latestStreamPosition = dbo.Streams.[Position]
FROM dbo.Streams WITH (UPDLOCK, ROWLOCK)
WHERE dbo.Streams.Id = @streamId;

IF @hasMessages = 1
BEGIN
INSERT INTO dbo.Messages
(StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata)
SELECT @streamIdInternal, StreamVersion + @latestStreamVersion + 1, Id, Created, [Type], JsonData, JsonMetadata
FROM @newMessages
ORDER BY StreamVersion

SET @latestStreamPosition = SCOPE_IDENTITY()

SELECT @latestStreamVersion = MAX(StreamVersion) + @latestStreamVersion + 1
FROM @newMessages
SET @latestStreamVersion = @latestStreamVersion
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line can safely be removed - PR inbound.


UPDATE dbo.Streams
SET dbo.Streams.[Version] = @latestStreamVersion,
dbo.Streams.[Position] = @latestStreamPosition
WHERE dbo.Streams.IdInternal = @streamIdInternal
END
ELSE
BEGIN
SET @latestStreamPosition = ISNULL(@latestStreamPosition, -1)
SET @latestStreamVersion = ISNULL(@latestStreamVersion, -1)
END

COMMIT TRANSACTION AppendStream;

/* Select CurrentVersion, CurrentPosition */

SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition
SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition

/* Select Metadata */
DECLARE @metadataStreamId as NVARCHAR(42)
DECLARE @metadataStreamIdInternal as INT
SET @metadataStreamId = '$$' + @streamId

SELECT @metadataStreamIdInternal = dbo.Streams.IdInternal
FROM dbo.Streams
WHERE dbo.Streams.Id = @metadataStreamId;

SELECT TOP(1)
dbo.Messages.JsonData
FROM dbo.Messages
WHERE dbo.Messages.StreamIdInternal = @metadataStreamIdInternal
ORDER BY dbo.Messages.Position DESC;
SELECT dbo.Messages.JsonData
FROM dbo.Messages
WHERE dbo.Messages.Position = (
SELECT dbo.Streams.Position
FROM dbo.Streams
WHERE dbo.Streams.Id = '$$' + @streamId
)
Original file line number Diff line number Diff line change
@@ -1,58 +1,56 @@
BEGIN TRANSACTION CreateStream;

DECLARE @streamIdInternal AS INT;
DECLARE @latestStreamVersion AS INT;
DECLARE @latestStreamPosition AS BIGINT;
DECLARE @latestStreamPosition AS BIGINT;

BEGIN
INSERT INTO dbo.Streams (Id, IdOriginal) VALUES (@streamId, @streamIdOriginal);
SELECT @streamIdInternal = SCOPE_IDENTITY();

INSERT INTO dbo.Messages (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata)
SELECT @streamIdInternal,
StreamVersion,
Id,
Created,
[Type],
JsonData,
JsonMetadata
FROM @newMessages
ORDER BY StreamVersion;

SELECT TOP(1)
@latestStreamVersion = dbo.Messages.StreamVersion,
@latestStreamPosition = dbo.Messages.Position
FROM dbo.Messages
WHERE dbo.Messages.StreamIDInternal = @streamIdInternal
ORDER BY dbo.Messages.Position DESC

IF @latestStreamVersion IS NULL
SET @latestStreamVersion = -1

IF @latestStreamPosition IS NULL
SET @latestStreamPosition = -1

UPDATE dbo.Streams
SET dbo.Streams.[Version] = @latestStreamVersion,
dbo.Streams.[Position] = @latestStreamPosition
WHERE dbo.Streams.IdInternal = @streamIdInternal

INSERT INTO dbo.Streams
(Id, IdOriginal)
VALUES
(@streamId, @streamIdOriginal);

SET @streamIdInternal = SCOPE_IDENTITY();

IF @hasMessages = 1
BEGIN
INSERT INTO dbo.Messages
(StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata)
SELECT @streamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata
FROM @newMessages
ORDER BY StreamVersion;

SET @latestStreamPosition = SCOPE_IDENTITY()

SELECT @latestStreamVersion = MAX(StreamVersion)
FROM @newMessages

UPDATE dbo.Streams
SET dbo.Streams.[Version] = @latestStreamVersion,
dbo.Streams.[Position] = @latestStreamPosition
WHERE dbo.Streams.IdInternal = @streamIdInternal
END
ELSE
BEGIN
SET @latestStreamPosition = -1
SET @latestStreamVersion = -1
END

END;

COMMIT TRANSACTION CreateStream;

/* Select CurrentVersion, CurrentPosition */

SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition
SELECT currentVersion = @latestStreamVersion, currentPosition = @latestStreamPosition

/* Select Metadata */
DECLARE @metadataStreamId as NVARCHAR(42)
DECLARE @metadataStreamIdInternal as INT
SET @metadataStreamId = '$$' + @streamId

SELECT @metadataStreamIdInternal = dbo.Streams.IdInternal
FROM dbo.Streams
WHERE dbo.Streams.Id = @metadataStreamId;

SELECT TOP(1)
dbo.Messages.JsonData
FROM dbo.Messages
WHERE dbo.Messages.StreamIdInternal = @metadataStreamIdInternal
ORDER BY dbo.Messages.Position DESC;

SELECT dbo.Messages.JsonData
FROM dbo.Messages
WHERE dbo.Messages.Position = (
SELECT dbo.Streams.Position
FROM dbo.Streams
WHERE dbo.Streams.Id = '$$' + @streamId
)
4 changes: 4 additions & 0 deletions src/SqlStreamStore.MsSql/MsSqlStreamStore.AppendStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,14 @@ private async Task<MsSqlAppendResult> AppendToStreamExpectedVersionAny(
var sqlDataRecords = CreateSqlDataRecords(messages);
var eventsParam = CreateNewMessagesSqlParameter(sqlDataRecords);
command.Parameters.Add(eventsParam);
command.Parameters.AddWithValue("hasMessages", true);
}
else
{
// Must use a null value for the table-valued param if there are no records
var eventsParam = CreateNewMessagesSqlParameter(null);
command.Parameters.Add(eventsParam);
command.Parameters.AddWithValue("hasMessages", false);
}

try
Expand Down Expand Up @@ -241,12 +243,14 @@ private async Task<MsSqlAppendResult> AppendToStreamExpectedVersionNoStream(
var sqlDataRecords = CreateSqlDataRecords(messages);
var eventsParam = CreateNewMessagesSqlParameter(sqlDataRecords);
command.Parameters.Add(eventsParam);
command.Parameters.AddWithValue("hasMessages", true);
}
else
{
// Must use a null value for the table-valued param if there are no records
var eventsParam = CreateNewMessagesSqlParameter(null);
command.Parameters.Add(eventsParam);
command.Parameters.AddWithValue("hasMessages", false);
}

try
Expand Down