Skip to content

Commit a139ab0

Browse files
committed
Converted listener's event-based to completer pattern in a session
1 parent 2b3f394 commit a139ab0

6 files changed

Lines changed: 225 additions & 87 deletions

File tree

NETCoreSyncServer/SyncMessages.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,34 +62,34 @@ internal enum PayloadActions
6262

6363
internal class RequestMessage
6464
{
65+
public string Id { get; set; } = Guid.NewGuid().ToString();
6566
public string Action { get; set; } = null!;
66-
public int SchemaVersion { get; set; }
67-
public SyncIdInfo SyncIdInfo { get; set; } = null!;
6867
public Dictionary<string, object?> Payload { get; set; } = null!;
6968

70-
public static RequestMessage FromPayload<T>(int schemaVersion, SyncIdInfo syncIdInfo, T basePayload) where T : BasePayload
69+
public static RequestMessage FromPayload<T>(string id, T basePayload) where T : BasePayload
7170
{
7271
return new RequestMessage()
7372
{
73+
Id = id,
7474
Action = basePayload.Action,
75-
SchemaVersion = schemaVersion,
76-
SyncIdInfo = syncIdInfo,
7775
Payload = basePayload.ToPayload<T>()
7876
};
7977
}
8078
}
8179

8280
internal class ResponseMessage
8381
{
82+
public string Id { get; set; } = null!;
8483
public string Action { get; set; } = null!;
8584
public bool IsOk { get; set; }
8685
public string? ErrorMessage { get; set; }
8786
public Dictionary<string, object?> Payload { get; set; } = null!;
8887

89-
public static ResponseMessage FromPayload<T>(bool isOk, string? errorMessage, T basePayload) where T : BasePayload
88+
public static ResponseMessage FromPayload<T>(string id, bool isOk, string? errorMessage, T basePayload) where T : BasePayload
9089
{
9190
return new ResponseMessage()
9291
{
92+
Id = id,
9393
Action = basePayload.Action,
9494
IsOk = isOk,
9595
ErrorMessage = errorMessage,
@@ -133,6 +133,9 @@ internal class EchoResponsePayload : BasePayload
133133
internal class HandshakeRequestPayload : BasePayload
134134
{
135135
override public string Action => PayloadActions.handshakeRequest.ToString();
136+
137+
public int SchemaVersion { get; set; }
138+
public SyncIdInfo SyncIdInfo { get; set; } = null!;
136139
}
137140

138141
internal class HandshakeResponsePayload : BasePayload

NETCoreSyncServer/SyncMiddleware.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ public async Task Invoke(HttpContext httpContext, SyncService syncService)
3535

3636
using (WebSocket webSocket = await httpContext.WebSockets.AcceptWebSocketAsync())
3737
{
38-
3938
await RunAsync(webSocket);
4039
}
4140
}
4241

4342
private async Task RunAsync(WebSocket webSocket)
4443
{
44+
Log("Server Opened");
4545
int bufferSize = netCoreSyncServerOptions.SendReceiveBufferSizeInBytes;
4646
while (true)
4747
{
@@ -58,9 +58,9 @@ private async Task RunAsync(WebSocket webSocket)
5858
}
5959
catch (WebSocketException wse)
6060
{
61-
// The remote party closed the WebSocket connection without completing the close handshake.
62-
if (wse.ErrorCode == 333514224)
61+
if (wse.Message == "The remote party closed the WebSocket connection without completing the close handshake.")
6362
{
63+
Log("Server Forced Closed");
6464
return;
6565
}
6666
throw;
@@ -73,6 +73,7 @@ private async Task RunAsync(WebSocket webSocket)
7373
if (result.CloseStatus.HasValue)
7474
{
7575
await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
76+
Log("Server Normal Closed");
7677
return;
7778
}
7879
if (result.MessageType == WebSocketMessageType.Binary)
@@ -91,17 +92,18 @@ private async Task RunAsync(WebSocket webSocket)
9192
EchoRequestPayload requestPayload = BasePayload.FromPayload<EchoRequestPayload>(request.Payload);
9293
String echoMessage = requestPayload.Message;
9394
EchoResponsePayload responsePayload = new EchoResponsePayload() { Message = echoMessage };
94-
ResponseMessage response = ResponseMessage.FromPayload<EchoResponsePayload>(true, null, responsePayload);
95+
ResponseMessage response = ResponseMessage.FromPayload<EchoResponsePayload>(request.Id, true, null, responsePayload);
9596
responseBytes = await SyncMessages.Compress(response);
9697
}
9798

9899
if (request != null && request.Action == PayloadActions.handshakeRequest.ToString())
99100
{
101+
HandshakeRequestPayload requestPayload = BasePayload.FromPayload<HandshakeRequestPayload>(request.Payload);
100102
HandshakeResponsePayload responsePayload = new HandshakeResponsePayload()
101103
{
102104
OrderedClassNames = new List<string>() { "A", "B", "C" }
103105
};
104-
ResponseMessage response = ResponseMessage.FromPayload<HandshakeResponsePayload>(true, null, responsePayload);
106+
ResponseMessage response = ResponseMessage.FromPayload<HandshakeResponsePayload>(request.Id, true, null, responsePayload);
105107
responseBytes = await SyncMessages.Compress(response);
106108
}
107109

@@ -121,5 +123,10 @@ private async Task RunAsync(WebSocket webSocket)
121123
}
122124
}
123125
}
126+
127+
void Log(string message)
128+
{
129+
System.Diagnostics.Debug.WriteLine(message);
130+
}
124131
}
125132
}

netcoresync_moor/lib/src/netcoresync_client.dart

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import 'client_insert.dart';
1010
import 'client_update.dart';
1111
import 'client_delete.dart';
1212
import 'sync_handler.dart';
13+
import 'sync_session.dart';
1314

1415
mixin NetCoreSyncClient on GeneratedDatabase {
1516
DataAccess? _dataAccess;
@@ -76,16 +77,27 @@ mixin NetCoreSyncClient on GeneratedDatabase {
7677
return dataAccess.syncIdInfo?.allSyncIds ?? "";
7778
}
7879

79-
Future netCoreSyncSynchronize({
80+
Future<void> netCoreSyncSynchronize({
8081
required String url,
8182
Map<String, dynamic> customInfo = const {},
8283
}) async {
8384
if (!netCoreSyncInitialized) throw NetCoreSyncNotInitializedException();
84-
final syncHandler = SyncHandler(dataAccess);
85-
await syncHandler.synchronize(
86-
url: url,
85+
if (dataAccess.syncIdInfo == null) {
86+
throw NetCoreSyncSyncIdInfoNotSetException();
87+
}
88+
if (dataAccess.inTransaction()) {
89+
throw NetCoreSyncMustNotInsideTransactionException();
90+
}
91+
92+
SyncSession syncSession = SyncSession(
93+
syncHandler: SyncHandler(
94+
url: url,
95+
),
96+
dataAccess: dataAccess,
8797
customInfo: customInfo,
8898
);
99+
100+
await syncSession.synchronize();
89101
}
90102

91103
SyncSimpleSelectStatement<T, R> syncSelect<T extends HasResultSet, R>(

netcoresync_moor/lib/src/sync_handler.dart

Lines changed: 92 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,70 +2,110 @@ import 'dart:async';
22
import 'package:netcoresync_moor/netcoresync_moor.dart';
33
import 'package:web_socket_channel/web_socket_channel.dart';
44
import 'package:web_socket_channel/status.dart' as socket_channel_status;
5-
import 'package:uuid/uuid.dart';
65
import 'package:enum_to_string/enum_to_string.dart';
76
import 'netcoresync_exceptions.dart';
8-
import 'data_access.dart';
97
import 'sync_messages.dart';
108

119
class SyncHandler {
12-
final DataAccess dataAccess;
10+
final String url;
1311

14-
SyncHandler(this.dataAccess);
12+
late WebSocketChannel _channel;
13+
late StreamSubscription _subscription;
14+
final Map<String, Completer<BasePayload>> _requests = {};
1515

16-
Future synchronize({
17-
required String url,
18-
Map<String, dynamic> customInfo = const {},
19-
}) async {
20-
if (dataAccess.syncIdInfo == null) {
21-
throw NetCoreSyncSyncIdInfoNotSetException();
16+
bool _connected = false;
17+
Completer _completerDone = Completer();
18+
19+
SyncHandler({
20+
required this.url,
21+
});
22+
23+
void _throwIfNotConnected() {
24+
if (!_connected) {
25+
throw NetCoreSyncException("WebSocket is not connected yet");
26+
}
27+
}
28+
29+
void connect() {
30+
if (_connected) {
31+
throw NetCoreSyncException("WebSocket is already connected");
2232
}
23-
if (dataAccess.inTransaction()) {
24-
throw NetCoreSyncMustNotInsideTransactionException();
33+
34+
_channel = WebSocketChannel.connect(Uri.parse(url));
35+
_connected = true;
36+
_completerDone = Completer();
37+
_log("Client Opened");
38+
_subscription = _channel.stream.listen(
39+
(message) => _onData(message),
40+
onDone: () async => await _onDone(),
41+
);
42+
}
43+
44+
Future<void> _onDone() async {
45+
_log("Client Closed");
46+
_connected = false;
47+
await _subscription.cancel();
48+
_requests.clear();
49+
if (!_completerDone.isCompleted) {
50+
_completerDone.complete();
2551
}
52+
}
53+
54+
Future<void> disconnect() async {
55+
_throwIfNotConnected();
56+
await _channel.sink.close(socket_channel_status.goingAway);
57+
await _completerDone.future;
58+
}
2659

27-
var channel = WebSocketChannel.connect(Uri.parse(url));
60+
Future<EchoResponsePayload> echo(
61+
{EchoRequestPayload payload =
62+
const EchoRequestPayload(message: "This is an echo message")}) async {
63+
_throwIfNotConnected();
64+
final request = RequestMessage(
65+
basePayload: payload,
66+
);
67+
final completer = Completer<EchoResponsePayload>();
68+
_requests[request.id] = completer;
69+
_channel.sink.add(SyncMessages.compress(request));
70+
return completer.future;
71+
}
2872

29-
String echoMessage = Uuid().v4();
73+
Future<HandshakeResponsePayload> handshake(
74+
{required HandshakeRequestPayload payload}) async {
75+
_throwIfNotConnected();
3076
final request = RequestMessage(
31-
schemaVersion: dataAccess.database.schemaVersion,
32-
syncIdInfo: dataAccess.syncIdInfo!,
33-
basePayload: EchoRequestPayload(message: echoMessage),
77+
basePayload: payload,
3478
);
35-
channel.sink.add(SyncMessages.compress(request));
36-
37-
StreamSubscription sub = channel.stream.listen((message) async {
38-
ResponseMessage response = SyncMessages.decompress(message);
39-
40-
if (EnumToString.fromString(PayloadActions.values, response.action) ==
41-
PayloadActions.echoResponse) {
42-
EchoResponsePayload payload =
43-
EchoResponsePayload.fromJson(response.payload);
44-
if (payload.message != echoMessage) {
45-
throw NetCoreSyncException(
46-
"Response echoMessage: ${payload.message} is "
47-
"different than Request echoMessage: $echoMessage");
48-
}
49-
50-
final request = RequestMessage(
51-
schemaVersion: dataAccess.database.schemaVersion,
52-
syncIdInfo: dataAccess.syncIdInfo!,
53-
basePayload: HandshakeRequestPayload(),
54-
);
55-
channel.sink.add(SyncMessages.compress(request));
56-
}
57-
58-
if (EnumToString.fromString(PayloadActions.values, response.action) ==
59-
PayloadActions.handshakeResponse) {
60-
HandshakeResponsePayload payload =
61-
HandshakeResponsePayload.fromJson(response.payload);
62-
print(payload.orderedClassNames);
63-
channel.sink.close(socket_channel_status.goingAway);
64-
}
65-
});
66-
67-
await Future.wait([
68-
sub.asFuture(),
69-
]);
79+
final completer = Completer<HandshakeResponsePayload>();
80+
_requests[request.id] = completer;
81+
_channel.sink.add(SyncMessages.compress(request));
82+
return completer.future;
83+
}
84+
85+
void _onData(dynamic message) {
86+
final response = SyncMessages.decompress(message);
87+
88+
BasePayload responsePayload;
89+
90+
if (EnumToString.fromString(PayloadActions.values, response.action) ==
91+
PayloadActions.echoResponse) {
92+
responsePayload = EchoResponsePayload.fromJson(response.payload);
93+
} else if (EnumToString.fromString(
94+
PayloadActions.values, response.action) ==
95+
PayloadActions.handshakeResponse) {
96+
responsePayload = HandshakeResponsePayload.fromJson(response.payload);
97+
} else {
98+
throw NetCoreSyncException(
99+
"Unexpected response action: ${response.action}");
100+
}
101+
102+
if (_requests.containsKey(response.id)) {
103+
_requests[response.id]!.complete(responsePayload);
104+
_requests.remove(response.id);
105+
}
106+
}
107+
108+
void _log(Object? object) {
109+
print(object);
70110
}
71111
}

0 commit comments

Comments
 (0)