@@ -2,70 +2,110 @@ import 'dart:async';
22import 'package:netcoresync_moor/netcoresync_moor.dart' ;
33import 'package:web_socket_channel/web_socket_channel.dart' ;
44import 'package:web_socket_channel/status.dart' as socket_channel_status;
5- import 'package:uuid/uuid.dart' ;
65import 'package:enum_to_string/enum_to_string.dart' ;
76import 'netcoresync_exceptions.dart' ;
8- import 'data_access.dart' ;
97import 'sync_messages.dart' ;
108
119class SyncHandler {
12- final DataAccess dataAccess ;
10+ final String url ;
1311
14- SyncHandler (this .dataAccess);
12+ late WebSocketChannel _channel;
13+ late StreamSubscription _subscription;
14+ final Map <String , Completer <BasePayload >> _requests = {};
1515
16- Future synchronize ({
17- required String url,
18- Map <String , dynamic > customInfo = const {},
19- }) async {
20- if (dataAccess.syncIdInfo == null ) {
21- throw NetCoreSyncSyncIdInfoNotSetException ();
16+ bool _connected = false ;
17+ Completer _completerDone = Completer ();
18+
19+ SyncHandler ({
20+ required this .url,
21+ });
22+
23+ void _throwIfNotConnected () {
24+ if (! _connected) {
25+ throw NetCoreSyncException ("WebSocket is not connected yet" );
26+ }
27+ }
28+
29+ void connect () {
30+ if (_connected) {
31+ throw NetCoreSyncException ("WebSocket is already connected" );
2232 }
23- if (dataAccess.inTransaction ()) {
24- throw NetCoreSyncMustNotInsideTransactionException ();
33+
34+ _channel = WebSocketChannel .connect (Uri .parse (url));
35+ _connected = true ;
36+ _completerDone = Completer ();
37+ _log ("Client Opened" );
38+ _subscription = _channel.stream.listen (
39+ (message) => _onData (message),
40+ onDone: () async => await _onDone (),
41+ );
42+ }
43+
44+ Future <void > _onDone () async {
45+ _log ("Client Closed" );
46+ _connected = false ;
47+ await _subscription.cancel ();
48+ _requests.clear ();
49+ if (! _completerDone.isCompleted) {
50+ _completerDone.complete ();
2551 }
52+ }
53+
54+ Future <void > disconnect () async {
55+ _throwIfNotConnected ();
56+ await _channel.sink.close (socket_channel_status.goingAway);
57+ await _completerDone.future;
58+ }
2659
27- var channel = WebSocketChannel .connect (Uri .parse (url));
60+ Future <EchoResponsePayload > echo (
61+ {EchoRequestPayload payload =
62+ const EchoRequestPayload (message: "This is an echo message" )}) async {
63+ _throwIfNotConnected ();
64+ final request = RequestMessage (
65+ basePayload: payload,
66+ );
67+ final completer = Completer <EchoResponsePayload >();
68+ _requests[request.id] = completer;
69+ _channel.sink.add (SyncMessages .compress (request));
70+ return completer.future;
71+ }
2872
29- String echoMessage = Uuid ().v4 ();
73+ Future <HandshakeResponsePayload > handshake (
74+ {required HandshakeRequestPayload payload}) async {
75+ _throwIfNotConnected ();
3076 final request = RequestMessage (
31- schemaVersion: dataAccess.database.schemaVersion,
32- syncIdInfo: dataAccess.syncIdInfo! ,
33- basePayload: EchoRequestPayload (message: echoMessage),
77+ basePayload: payload,
3478 );
35- channel.sink.add (SyncMessages .compress (request));
36-
37- StreamSubscription sub = channel.stream.listen ((message) async {
38- ResponseMessage response = SyncMessages .decompress (message);
39-
40- if (EnumToString .fromString (PayloadActions .values, response.action) ==
41- PayloadActions .echoResponse) {
42- EchoResponsePayload payload =
43- EchoResponsePayload .fromJson (response.payload);
44- if (payload.message != echoMessage) {
45- throw NetCoreSyncException (
46- "Response echoMessage: ${payload .message } is "
47- "different than Request echoMessage: $echoMessage " );
48- }
49-
50- final request = RequestMessage (
51- schemaVersion: dataAccess.database.schemaVersion,
52- syncIdInfo: dataAccess.syncIdInfo! ,
53- basePayload: HandshakeRequestPayload (),
54- );
55- channel.sink.add (SyncMessages .compress (request));
56- }
57-
58- if (EnumToString .fromString (PayloadActions .values, response.action) ==
59- PayloadActions .handshakeResponse) {
60- HandshakeResponsePayload payload =
61- HandshakeResponsePayload .fromJson (response.payload);
62- print (payload.orderedClassNames);
63- channel.sink.close (socket_channel_status.goingAway);
64- }
65- });
66-
67- await Future .wait ([
68- sub.asFuture (),
69- ]);
79+ final completer = Completer <HandshakeResponsePayload >();
80+ _requests[request.id] = completer;
81+ _channel.sink.add (SyncMessages .compress (request));
82+ return completer.future;
83+ }
84+
85+ void _onData (dynamic message) {
86+ final response = SyncMessages .decompress (message);
87+
88+ BasePayload responsePayload;
89+
90+ if (EnumToString .fromString (PayloadActions .values, response.action) ==
91+ PayloadActions .echoResponse) {
92+ responsePayload = EchoResponsePayload .fromJson (response.payload);
93+ } else if (EnumToString .fromString (
94+ PayloadActions .values, response.action) ==
95+ PayloadActions .handshakeResponse) {
96+ responsePayload = HandshakeResponsePayload .fromJson (response.payload);
97+ } else {
98+ throw NetCoreSyncException (
99+ "Unexpected response action: ${response .action }" );
100+ }
101+
102+ if (_requests.containsKey (response.id)) {
103+ _requests[response.id]! .complete (responsePayload);
104+ _requests.remove (response.id);
105+ }
106+ }
107+
108+ void _log (Object ? object) {
109+ print (object);
70110 }
71111}
0 commit comments