diff --git a/lib/pages/live_room/controller.dart b/lib/pages/live_room/controller.dart index f05c7cb19..4265870eb 100644 --- a/lib/pages/live_room/controller.dart +++ b/lib/pages/live_room/controller.dart @@ -259,6 +259,8 @@ class LiveRoomController extends GetxController { cancelLiveTimer(); savedDanmaku?.clear(); savedDanmaku = null; + msgStream?.close(); + msgStream = null; scrollController ..removeListener(listener) ..dispose(); diff --git a/lib/pages/live_room/view.dart b/lib/pages/live_room/view.dart index 271854827..1b307521d 100644 --- a/lib/pages/live_room/view.dart +++ b/lib/pages/live_room/view.dart @@ -85,9 +85,6 @@ class _LiveRoomPageState extends State WidgetsBinding.instance.removeObserver(this); ScreenBrightness.instance.resetApplicationScreenBrightness(); PlPlayerController.setPlayCallBack(null); - _liveRoomController - ..msgStream?.close() - ..msgStream = null; plPlayerController ..removeStatusLister(playerListener) ..dispose(); diff --git a/lib/tcp/live.dart b/lib/tcp/live.dart index 5475b1ea2..3578a4925 100644 --- a/lib/tcp/live.dart +++ b/lib/tcp/live.dart @@ -5,7 +5,9 @@ import 'dart:typed_data'; import 'package:PiliPlus/services/loggeer.dart'; import 'package:brotli/brotli.dart'; +import 'package:flutter/foundation.dart' show kDebugMode; import 'package:flutter_smart_dialog/flutter_smart_dialog.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; class PackageHeader { int totalSize; @@ -154,7 +156,7 @@ class LiveMessageStream { String streamToken; int roomId, uid; List servers; - List eventListeners = []; + final List _eventListeners = []; LiveMessageStream({ required this.streamToken, required this.roomId, @@ -162,9 +164,10 @@ class LiveMessageStream { required this.servers, }); - WebSocket? socket; + bool _active = true; + WebSocketChannel? _channel; StreamSubscription? _socketSubscription; - bool heartBeat = true; + Timer? _timer; PiliLogger logger = getLogger(); final String logTag = "LiveStreamService"; @@ -190,20 +193,27 @@ class LiveMessageStream { // final marshaledData = authPackage.marshal(); // logger.d(marshaledData); try { - Future getSocket() async { + Future getSocket() async { for (final server in servers) { try { - return WebSocket.connect(server); + final channel = WebSocketChannel.connect(Uri.parse(server)); + await channel.ready; + return channel; } catch (_) {} } throw Exception("all servers connect failed"); } - socket = await getSocket(); + _channel = await getSocket(); + if (!_active) { + if (kDebugMode) logger.i("$logTag init inactive $hashCode"); + close(); + return; + } // logger // ..d('$logTag ===> TCP连接建立') // ..d('$logTag ===> 发送认证包'); - _socketSubscription = socket?.listen( + _socketSubscription = _channel?.stream.listen( (data) { PackageHeader? header = PackageHeader.fromBytesData(data); if (header != null) { @@ -230,15 +240,15 @@ class LiveMessageStream { } _processingData(decompressedData); } catch (e) { - logger.i(e); + if (kDebugMode) logger.i(e); } } }, ); - socket?.add(authPackage.marshal()); + _channel?.sink.add(authPackage.marshal()); } catch (e) { SmartDialog.showToast("弹幕地址链接失败"); - // logger.i('$logTag ===> TCP连接失败: $e'); + if (kDebugMode) logger.i('$logTag ===> TCP连接失败: $e'); } } @@ -251,7 +261,7 @@ class LiveMessageStream { var msgBody = utf8.decode( data.sublist(subHeader.headerSize, subHeader.totalSize), ); - for (var f in eventListeners) { + for (var f in _eventListeners) { f(jsonDecode(msgBody)); } if (subHeader.totalSize < data.length) { @@ -259,16 +269,24 @@ class LiveMessageStream { } } } catch (e) { - logger.i('ParseHeader错误: $e'); + if (kDebugMode) logger.i('ParseHeader错误: $e'); } } Future _heartBeat() async { - logger.i("$logTag 直播间信息流认证成功"); + if (!_active) { + if (kDebugMode) logger.i("$logTag init heartBeat inactive $hashCode"); + return; + } + if (kDebugMode) logger.i("$logTag 直播间信息流认证成功 $hashCode"); int heartBeatCount = 1; - while (heartBeat) { - await Future.delayed(const Duration(seconds: 30)); - //发送心跳包 + _timer ??= Timer.periodic(const Duration(seconds: 30), (timer) { + if (!_active) { + if (kDebugMode) logger.i("$logTag heartBeat inactive $hashCode"); + timer.cancel(); + return; + } + if (kDebugMode) logger.i("$logTag heartBeat $hashCode"); var package = HeartbeatPackage( header: PackageHeader( totalSize: 0, @@ -279,20 +297,24 @@ class LiveMessageStream { ), ); try { - socket?.add(package.marshal()); - } catch (_) {} + _channel?.sink.add(package.marshal()); + } catch (_) { + timer.cancel(); + } heartBeatCount++; - } + }); } void addEventListener(void Function(dynamic) func) { - eventListeners.add(func); + _eventListeners.add(func); } void close() { - heartBeat = false; - eventListeners.clear(); + _active = false; + if (kDebugMode) logger.i("$logTag close $hashCode"); + _timer?.cancel(); + _eventListeners.clear(); _socketSubscription?.cancel(); - socket?.close(); + _channel?.sink.close(); } } diff --git a/pubspec.lock b/pubspec.lock index 20a8a69ab..af5697c61 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -2048,7 +2048,7 @@ packages: source: hosted version: "1.0.1" web_socket_channel: - dependency: transitive + dependency: "direct main" description: name: web_socket_channel sha256: d645757fb0f4773d602444000a8131ff5d48c9e47adfe9772652dd1a4f2d45c8 diff --git a/pubspec.yaml b/pubspec.yaml index 1c87c1a21..894be61b9 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -200,6 +200,7 @@ dependencies: url: https://github.com/bggRGjQaUbCoE/flutter_sortable_wrap.git ref: master crclib: ^3.0.0 + web_socket_channel: ^3.0.3 vector_math: any fixnum: any