From c1a27f8342230e8215c5b9bdc900a9b1ba72e6d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Seyid=20Ya=C4=9Fmur?= Date: Thu, 17 Oct 2019 14:33:47 +0300 Subject: [PATCH 1/5] connection socket changed from Socket to web_socket_channel --- .vscode/launch.json | 14 +++++++++++++ lib/impl/plugin_vm.dart | 45 +++++++++++++++++++++++++---------------- lib/vm.dart | 32 ++++++++++++++++++++++------- test/echo_ws_test.dart | 2 +- test/local_test.dart | 21 +++++++++++++++++++ 5 files changed, 89 insertions(+), 25 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 test/local_test.dart diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..c078e2b --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,14 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Dart", + "program": "bin/main.dart", + "request": "launch", + "type": "dart" + } + ] +} \ No newline at end of file diff --git a/lib/impl/plugin_vm.dart b/lib/impl/plugin_vm.dart index cb8e932..7967758 100644 --- a/lib/impl/plugin_vm.dart +++ b/lib/impl/plugin_vm.dart @@ -4,40 +4,51 @@ library stomp_impl_plugin_vm; import "dart:async"; -import "dart:io"; - -import "plugin.dart" show BytesStompConnector; +import 'package:web_socket_channel/io.dart'; +import "plugin.dart" show StringStompConnector; +import 'package:web_socket_channel/status.dart' as status; /** The implementation on top of [Socket]. */ -class SocketStompConnector extends BytesStompConnector { - final Socket _socket; +class SocketStompConnector extends StringStompConnector { + final IOWebSocketChannel _socket; + StreamSubscription _listen; SocketStompConnector(this._socket) { _init(); } void _init() { - _socket.listen((List data) { - if (data != null && !data.isEmpty) - onBytes(data); - }, onError: (error, stackTrace) { - onError(error, stackTrace); - }, onDone: () { + _listen = _socket.stream.listen((data) { + if (data != null) { + final String sdata = data.toString(); + if (sdata.isNotEmpty) onString(sdata); + } + }); + + _listen.onError((err) => onError(err, null)); + _listen.onDone(() => onClose()); + + _socket.stream.handleError((error) => onError(error, null)); + + _socket.sink.done.then((v) { onClose(); }); } @override Future close() { - _socket.destroy(); + _listen.cancel(); + _socket.sink.close(status.goingAway); return new Future.value(); } + + @override + Future writeStream_(Stream> stream) + => _socket.sink.addStream(stream); @override - void writeBytes_(List bytes) { - _socket.add(bytes); + void writeString_(String string) { + _socket.sink.add(string); + // TODO: implement writeString_ } - @override - Future writeStream_(Stream> stream) - => _socket.addStream(stream); } diff --git a/lib/vm.dart b/lib/vm.dart index 5a6b99c..3dc7eb4 100644 --- a/lib/vm.dart +++ b/lib/vm.dart @@ -4,9 +4,8 @@ library stomp_vm; import "dart:async"; -import "dart:io"; - import "stomp.dart" show StompClient; +import 'package:web_socket_channel/io.dart'; import "impl/plugin_vm.dart" show SocketStompConnector; /** Connects a STOMP server, and instantiates a [StompClient] @@ -37,8 +36,27 @@ Future connect(address, {int port: 61626, void onDisconnect(StompClient client), void onError(StompClient client, String message, String detail, Map headers), void onFault(StompClient client, error, stackTrace)}) -=> Socket.connect(address, port).then((Socket socket) - => StompClient.connect(new SocketStompConnector(socket), - host: host, login: login, passcode: passcode, heartbeat: heartbeat, - onConnect: onConnect, onDisconnect: onDisconnect, - onError: onError, onFault: onFault)); +async => connectWith(await IOWebSocketChannel.connect(address), + host: host, + login: login, + passcode: passcode, + heartbeat: heartbeat, + onConnect: onConnect, + onDisconnect: onDisconnect, + onError: onError, + onFault: onFault); + +Future connectWith(IOWebSocketChannel channel, + {String host, + String login, + String passcode, + List heartbeat, + void onConnect(StompClient client, Map headers), + void onDisconnect(StompClient client), + void onError(StompClient client, String message, String detail, + Map headers), + void onFault(StompClient client, error, stackTrace)})=> + StompClient.connect(new SocketStompConnector(channel), + host: host, login: login, passcode: passcode, heartbeat: heartbeat, + onConnect: onConnect, onDisconnect: onDisconnect, + onError: onError, onFault: onFault); diff --git a/test/echo_ws_test.dart b/test/echo_ws_test.dart index 55c4fd5..ec3af6c 100644 --- a/test/echo_ws_test.dart +++ b/test/echo_ws_test.dart @@ -7,7 +7,7 @@ import "dart:html"; import "dart:async"; import 'package:test/test.dart'; -import 'package:stomp/webSocket.dart' show connect; +import 'package:stomp/websocket.dart' show connect; part "_echo_test.dart"; diff --git a/test/local_test.dart b/test/local_test.dart new file mode 100644 index 0000000..b39e996 --- /dev/null +++ b/test/local_test.dart @@ -0,0 +1,21 @@ +//Copyright (C) 2013 Potix Corporation. All Rights Reserved. +//History: Fri, Aug 09, 2013 11:20:46 AM +// Author: tomyeh +library echo_test; + +import "dart:async"; +import "dart:io"; +import 'package:test/test.dart'; + +import 'package:stomp/vm.dart' show connect; + +part "_echo_test.dart"; + +void main() { + final address = "ws://192.168.1.79:8088/ws"; + testEcho(address) + .catchError((ex) { + print("Unable to connect $address\n" + "Check if the server has been started\n\nCause:\n$ex"); + }, test: (ex) => ex is SocketException); +} From 09e026e1137bacb0ccd72eb7651436dc292f4989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Seyid=20Ya=C4=9Fmur?= Date: Thu, 17 Oct 2019 16:20:55 +0300 Subject: [PATCH 2/5] customHeader params added to connect function --- lib/src/stomp_impl.dart | 4 ++++ lib/stomp.dart | 3 ++- lib/vm.dart | 6 ++++-- lib/websocket.dart | 4 ++++ test/_echo_test.dart | 9 +++++---- test/echo_vm_test.dart | 2 +- test/echo_ws_test.dart | 2 +- test/local_test.dart | 6 +++++- 8 files changed, 26 insertions(+), 10 deletions(-) diff --git a/lib/src/stomp_impl.dart b/lib/src/stomp_impl.dart index 93149df..abea5f4 100644 --- a/lib/src/stomp_impl.dart +++ b/lib/src/stomp_impl.dart @@ -81,6 +81,7 @@ class _StompClient implements StompClient { static Future connect( StompConnector connector, String host, + Map customHeaders, String login, String passcode, List heartbeat, @@ -105,6 +106,9 @@ class _StompClient implements StompClient { } else { client.heartbeat[0] = client.heartbeat[1] = 0; } + if(customHeaders != null) headers.addAll(customHeaders); + print("headers"); + print(headers); writeSimpleFrame(connector, STOMP, headers); return client._connecting.future; diff --git a/lib/stomp.dart b/lib/stomp.dart index 1e55d93..91722c7 100644 --- a/lib/stomp.dart +++ b/lib/stomp.dart @@ -79,6 +79,7 @@ abstract class StompClient { */ static Future connect(StompConnector connector, {String host, + Map customHeaders, String login, String passcode, List heartbeat, @@ -91,7 +92,7 @@ abstract class StompClient { throw new ArgumentError( "Required: connector. Use stomp_vm's connect() instead."); - return _StompClient.connect(connector, host, login, passcode, heartbeat, + return _StompClient.connect(connector, host, customHeaders,login, passcode, heartbeat, onConnect, onDisconnect, onError, onFault); } diff --git a/lib/vm.dart b/lib/vm.dart index 3dc7eb4..f4aa20d 100644 --- a/lib/vm.dart +++ b/lib/vm.dart @@ -31,13 +31,14 @@ import "impl/plugin_vm.dart" show SocketStompConnector; * * [onFault] -- callback when an exception is received. */ Future connect(address, {int port: 61626, - String host, String login, String passcode, List heartbeat, + String host,Map customHeaders, String login, String passcode, List heartbeat, void onConnect(StompClient client, Map headers), void onDisconnect(StompClient client), void onError(StompClient client, String message, String detail, Map headers), void onFault(StompClient client, error, stackTrace)}) async => connectWith(await IOWebSocketChannel.connect(address), host: host, + customHeaders: customHeaders, login: login, passcode: passcode, heartbeat: heartbeat, @@ -48,6 +49,7 @@ async => connectWith(await IOWebSocketChannel.connect(address), Future connectWith(IOWebSocketChannel channel, {String host, + Map customHeaders, String login, String passcode, List heartbeat, @@ -57,6 +59,6 @@ Future connectWith(IOWebSocketChannel channel, Map headers), void onFault(StompClient client, error, stackTrace)})=> StompClient.connect(new SocketStompConnector(channel), - host: host, login: login, passcode: passcode, heartbeat: heartbeat, + host: host,customHeaders: customHeaders, login: login, passcode: passcode, heartbeat: heartbeat, onConnect: onConnect, onDisconnect: onDisconnect, onError: onError, onFault: onFault); diff --git a/lib/websocket.dart b/lib/websocket.dart index 60c99f0..843af63 100644 --- a/lib/websocket.dart +++ b/lib/websocket.dart @@ -31,6 +31,7 @@ import "impl/plugin.dart" show StringStompConnector; */ Future connect(String url, {String host, + Map customHeaders, String login, String passcode, List heartbeat, @@ -41,6 +42,7 @@ Future connect(String url, void onFault(StompClient client, error, stackTrace)}) => connectWith(new WebSocket(url), host: host, + customHeaders: customHeaders, login: login, passcode: passcode, heartbeat: heartbeat, @@ -56,6 +58,7 @@ Future connect(String url, */ Future connectWith(WebSocket socket, {String host, + Map customHeaders, String login, String passcode, List heartbeat, @@ -67,6 +70,7 @@ Future connectWith(WebSocket socket, _WSStompConnector.startWith(socket).then((_WSStompConnector connector) => StompClient.connect(connector, host: host, + customHeaders: customHeaders, login: login, passcode: passcode, heartbeat: heartbeat, diff --git a/test/_echo_test.dart b/test/_echo_test.dart index 623e879..2475c1e 100644 --- a/test/_echo_test.dart +++ b/test/_echo_test.dart @@ -6,8 +6,8 @@ part of echo_test; /** It is part of both echo_vm_test.dart and echo_ws_test.dart * so we can test it on both VM and browser. */ -Future testEcho(address) -=> connect(address, onDisconnect: (_) { +Future testEcho({address,headers}) +=> connect(address,customHeaders: headers, onDisconnect: (_) { print("Disconnected"); }).then((client) { test("echo test", () { @@ -15,7 +15,7 @@ Future testEcho(address) final List sends = ["1. apple", "2. orange\nand 2nd line", "3. mango"]; final List sendExtraHeader = ["123", "abc:", "xyz"]; final List receives = [], receiveExtraHeader = []; - +/* client.subscribeString("0", destination, (headers, message) { //print("< customHeaders = new LinkedHashMap(); + customHeaders["userid"]="D7t7G8989y3"; + customHeaders["platform"]="mobile"; + testEcho(address: address,headers: customHeaders) .catchError((ex) { print("Unable to connect $address\n" "Check if the server has been started\n\nCause:\n$ex"); From 1fe7ec7b46c05fed6787758e2786bd6a38084a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Seyid=20Ya=C4=9Fmur?= Date: Thu, 17 Oct 2019 16:22:55 +0300 Subject: [PATCH 3/5] removed unnecessary log --- lib/src/stomp_impl.dart | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/src/stomp_impl.dart b/lib/src/stomp_impl.dart index abea5f4..815ce83 100644 --- a/lib/src/stomp_impl.dart +++ b/lib/src/stomp_impl.dart @@ -107,8 +107,6 @@ class _StompClient implements StompClient { client.heartbeat[0] = client.heartbeat[1] = 0; } if(customHeaders != null) headers.addAll(customHeaders); - print("headers"); - print(headers); writeSimpleFrame(connector, STOMP, headers); return client._connecting.future; From 00a223dc485963dcdfe4f4caebd9b3073b2d0529 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Seyid=20Ya=C4=9Fmur?= Date: Mon, 28 Oct 2019 17:31:21 +0300 Subject: [PATCH 4/5] implemented heartbeat --- lib/src/impl/util_write.dart | 7 +++++-- lib/src/stomp_impl.dart | 4 +++- lib/src/stomp_util.dart | 21 +++++++++++++++++++++ test/_echo_test.dart | 12 ++++++++---- test/echo_vm_test.dart | 1 + test/local_test.dart | 4 ++-- 6 files changed, 40 insertions(+), 9 deletions(-) diff --git a/lib/src/impl/util_write.dart b/lib/src/impl/util_write.dart index 12da1ad..e57bf5f 100644 --- a/lib/src/impl/util_write.dart +++ b/lib/src/impl/util_write.dart @@ -5,7 +5,7 @@ part of stomp_impl_util; //Commands// const String CONNECT = "CONNECT"; -const String STOMP = "STOMP"; +const String STOMP = "CONNECT"; const String CONNECTED = "CONNECTED"; const String DISCONNECT = "DISCONNECT"; const String SEND = "SEND"; @@ -57,7 +57,6 @@ void writeDataFrame(StompConnector connector, String command, Map headers, String string, [List bytes]) { writeHeaders(connector, command, headers, endOfHeaders: false); - if (headers == null || headers[CONTENT_LENGTH] == null) { int len = 0; if (bytes != null) { @@ -78,6 +77,10 @@ void writeDataFrame(StompConnector connector, String command, connector.writeEof(); } +void pongMessage(StompConnector connector) { + writeSimpleFrame(connector,SEND,null); +} + ///Write a frame from the given stream Future writeStreamFrame(StompConnector connector, String command, Map headers, Stream> stream) { diff --git a/lib/src/stomp_impl.dart b/lib/src/stomp_impl.dart index 815ce83..7521644 100644 --- a/lib/src/stomp_impl.dart +++ b/lib/src/stomp_impl.dart @@ -54,7 +54,7 @@ class _StompClient implements StompClient { final _DisconnectCallback _onDisconnect; final _ErrorCallback _onError; final _FaultCallback _onFault; - + DateTime lastMessageDate = new DateTime.now(); /// final Map _subscribers = new HashMap(); @@ -129,9 +129,11 @@ class _StompClient implements StompClient { _connector ..onBytes = (List data) { + lastMessageDate = DateTime.now(); _parser.addBytes(data); } ..onString = (String data) { + lastMessageDate = DateTime.now(); _parser.addString(data); } ..onError = (error, stackTrace) { diff --git a/lib/src/stomp_util.dart b/lib/src/stomp_util.dart index 747780f..101e416 100644 --- a/lib/src/stomp_util.dart +++ b/lib/src/stomp_util.dart @@ -79,6 +79,27 @@ void _handleHeartbeat(_StompClient client, String heartbeat) { sy = int.parse(heartbeat.substring(i + 1)); client.heartbeat[0] = _calcHeartbeat(client.heartbeat[0], sy); client.heartbeat[1] = _calcHeartbeat(client.heartbeat[1], sx); + final int ttlOutgoing = client.heartbeat[0]; + if (ttlOutgoing != 0) { + Timer.periodic(new Duration(milliseconds: ttlOutgoing), (_) { + if (!client.isDisconnected) { + //client.sendString("", ""); + print("pong"); + pongMessage(client._connector); + } + }); + } + final int ttlIncoming = client.heartbeat[1]; + if (ttlIncoming != 0) { + Timer.periodic(new Duration(milliseconds: ttlIncoming), (_) { + int delta = new DateTime.now() + .difference(client.lastMessageDate) + .inMilliseconds; + if (delta > (ttlIncoming * 2)) { + client.disconnect(); + } + }); + } } catch (ex) { // ignore silently } diff --git a/test/_echo_test.dart b/test/_echo_test.dart index 2475c1e..baf0806 100644 --- a/test/_echo_test.dart +++ b/test/_echo_test.dart @@ -6,11 +6,15 @@ part of echo_test; /** It is part of both echo_vm_test.dart and echo_ws_test.dart * so we can test it on both VM and browser. */ -Future testEcho({address,headers}) -=> connect(address,customHeaders: headers, onDisconnect: (_) { - print("Disconnected"); +Future testEcho({address,headers,heartbeat}) +=> connect(address,customHeaders: headers,heartbeat: heartbeat, +onConnect: ( client, Map headers){ +} , onDisconnect: (_) { + print("disconnect"); +},onError: ( client, String message, String detail, Map headers){ + }).then((client) { - test("echo test", () { + test("echo test", () { final String destination = "/foo"; final List sends = ["1. apple", "2. orange\nand 2nd line", "3. mango"]; final List sendExtraHeader = ["123", "abc:", "xyz"]; diff --git a/test/echo_vm_test.dart b/test/echo_vm_test.dart index 505702b..f9e6e55 100644 --- a/test/echo_vm_test.dart +++ b/test/echo_vm_test.dart @@ -5,6 +5,7 @@ library echo_test; import "dart:async"; import "dart:io"; +import 'package:stomp/stomp.dart'; import 'package:test/test.dart'; import 'package:stomp/vm.dart' show connect; diff --git a/test/local_test.dart b/test/local_test.dart index 6d3d2d0..59a3a21 100644 --- a/test/local_test.dart +++ b/test/local_test.dart @@ -13,11 +13,11 @@ import 'package:stomp/vm.dart' show connect; part "_echo_test.dart"; void main() { - final address = "ws://192.168.1.79:8088/ws"; + final address = "ws://192.168.1.110:8088/ws"; Map customHeaders = new LinkedHashMap(); customHeaders["userid"]="D7t7G8989y3"; customHeaders["platform"]="mobile"; - testEcho(address: address,headers: customHeaders) + testEcho(address: address,headers: customHeaders,heartbeat: [10000,10000]) .catchError((ex) { print("Unable to connect $address\n" "Check if the server has been started\n\nCause:\n$ex"); From 40511b692884c838de5838e8dc85d54dbf121604 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Seyid=20Ya=C4=9Fmur?= Date: Wed, 30 Oct 2019 14:15:34 +0300 Subject: [PATCH 5/5] added disconnect function for not coming heartbeat messages --- lib/src/stomp_impl.dart | 2 +- lib/src/stomp_util.dart | 12 +++++++++--- test/local_test.dart | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/lib/src/stomp_impl.dart b/lib/src/stomp_impl.dart index 7521644..68ee9b3 100644 --- a/lib/src/stomp_impl.dart +++ b/lib/src/stomp_impl.dart @@ -143,6 +143,7 @@ class _StompClient implements StompClient { _disconnected = true; _subscribers.clear(); _receipts.clear(); + cleanTimers(); if (_onDisconnect != null) _onDisconnect(this); }; } @@ -159,7 +160,6 @@ class _StompClient implements StompClient { Future disconnect({String receipt}) { _checkSend(); _disconnected = true; - Completer completer; Map headers; diff --git a/lib/src/stomp_util.dart b/lib/src/stomp_util.dart index 101e416..67c6380 100644 --- a/lib/src/stomp_util.dart +++ b/lib/src/stomp_util.dart @@ -5,6 +5,7 @@ part of stomp; const int _SUB_BYTES = 0, _SUB_STRING = 1, _SUB_JSON = 2, _SUB_BLOB = 3; +Timer outgoingTimer,incomingTimer; ///The information of a subscriber class _Subscriber { final String id; @@ -81,9 +82,8 @@ void _handleHeartbeat(_StompClient client, String heartbeat) { client.heartbeat[1] = _calcHeartbeat(client.heartbeat[1], sx); final int ttlOutgoing = client.heartbeat[0]; if (ttlOutgoing != 0) { - Timer.periodic(new Duration(milliseconds: ttlOutgoing), (_) { + outgoingTimer = Timer.periodic(new Duration(milliseconds: ttlOutgoing), (_) { if (!client.isDisconnected) { - //client.sendString("", ""); print("pong"); pongMessage(client._connector); } @@ -91,7 +91,7 @@ void _handleHeartbeat(_StompClient client, String heartbeat) { } final int ttlIncoming = client.heartbeat[1]; if (ttlIncoming != 0) { - Timer.periodic(new Duration(milliseconds: ttlIncoming), (_) { + incomingTimer = Timer.periodic(new Duration(milliseconds: ttlIncoming), (_) { int delta = new DateTime.now() .difference(client.lastMessageDate) .inMilliseconds; @@ -105,5 +105,11 @@ void _handleHeartbeat(_StompClient client, String heartbeat) { } } } +cleanTimers(){ + if(outgoingTimer != null && outgoingTimer.isActive) + outgoingTimer.cancel(); + if(incomingTimer != null && incomingTimer.isActive) + incomingTimer.cancel(); +} int _calcHeartbeat(int a, int b) => a == 0 || b == 0 ? 0 : max(a, b); diff --git a/test/local_test.dart b/test/local_test.dart index 59a3a21..f9ee427 100644 --- a/test/local_test.dart +++ b/test/local_test.dart @@ -13,7 +13,7 @@ import 'package:stomp/vm.dart' show connect; part "_echo_test.dart"; void main() { - final address = "ws://192.168.1.110:8088/ws"; + final address = "ws://192.168.1.2:8088/ws"; Map customHeaders = new LinkedHashMap(); customHeaders["userid"]="D7t7G8989y3"; customHeaders["platform"]="mobile";