refa: download video (#1737)

* opt: save pb danmaku

* refa: download video

* opt: replaceAll

* fix: wait delete

* opt: remove completer

* fix: index.json

* tweaks

Signed-off-by: bggRGjQaUbCoE <githubaccount56556@proton.me>

---------

Co-authored-by: bggRGjQaUbCoE <githubaccount56556@proton.me>
This commit is contained in:
My-Responsitories
2025-11-12 19:12:17 +08:00
committed by GitHub
parent 37b1228552
commit 407b31c5c1
16 changed files with 345 additions and 410 deletions

View File

@@ -1,4 +1,4 @@
import 'dart:async' show Completer, StreamSubscription;
import 'dart:async';
import 'dart:io';
import 'package:PiliPlus/http/init.dart';
@@ -9,186 +9,97 @@ import 'package:dio/dio.dart';
class DownloadManager {
final String url;
final String path;
final Function({required int progress, required int total}) onTaskRunning;
final Function() onTaskComplete;
final Function({
required int progress,
required int total,
required Object error,
})
onTaskError;
final void Function(int, int)? onReceiveProgress;
final void Function([Object? error]) onDone;
bool _closed = false;
DownloadStatus _status = DownloadStatus.wait;
DownloadStatus _status = DownloadStatus.downloading;
DownloadStatus get status => _status;
CancelToken? _cancelToken;
Completer? _completer;
final _cancelToken = CancelToken();
late Future<void> task;
DownloadManager({
required this.url,
required this.path,
required this.onTaskRunning,
required this.onTaskComplete,
required this.onTaskError,
});
void _complete() {
if (_completer?.isCompleted == false) {
_completer?.complete();
}
required this.onReceiveProgress,
required this.onDone,
}) {
task = _start();
}
Future<void> start() async {
_completer = Completer();
_cancelToken = CancelToken();
_status = DownloadStatus.downloading;
Future<void> _start() async {
int received;
final file = File(path);
// If the file already exists, the method fails.
if (!file.existsSync()) {
if (file.existsSync()) {
received = await file.length();
} else {
file.createSync(recursive: true);
received = 0;
}
final int downloadedSize = await file.length();
// Shouldn't call file.writeAsBytesSync(list, flush: flush),
// because it can write all bytes by once. Consider that the file is
// a very big size (up to 1 Gigabytes), it will be expensive in memory.
RandomAccessFile raf = file.openSync(
mode: downloadedSize == 0 ? FileMode.write : FileMode.append,
final sink = file.openWrite(
mode: received == 0 ? FileMode.writeOnly : FileMode.writeOnlyAppend,
);
Future<void>? asyncWrite;
Future<void> closeAndDelete({bool delete = false}) async {
if (!_closed) {
_closed = true;
await asyncWrite;
await raf.close().catchError((_) => raf);
Future<void> onError(Object e, {bool delete = false}) async {
try {
await sink.close();
} catch (_) {}
if (_status == DownloadStatus.downloading) {
_status = DownloadStatus.failDownload;
if (delete && file.existsSync()) {
await file.delete().catchError((_) => file);
await file.tryDel();
}
}
onDone(e);
}
final Response<ResponseBody> response;
Response<ResponseBody> response;
try {
response = await Request.dio.get<ResponseBody>(
url.http2https,
options: Options(
headers: {'range': 'bytes=$downloadedSize-'},
headers: {'range': 'bytes=$received-'},
responseType: ResponseType.stream,
validateStatus: (status) {
return status == 416 ||
(status != null && status >= 200 && status < 300);
},
validateStatus: (status) =>
status != null &&
(status == 416 || (status >= 200 && status < 300)),
),
cancelToken: _cancelToken,
);
} on DioException catch (e) {
final isFailed = e.response?.statusCode != 416;
if (isFailed) {
_status = DownloadStatus.failDownload;
onTaskError(progress: 0, total: 0, error: e);
} else {
_status = DownloadStatus.completed;
onTaskComplete();
}
closeAndDelete(delete: isFailed).whenComplete(_complete);
await onError(e, delete: true);
return;
}
final data = response.data!;
final contentLength = data.contentLength + received;
int received = downloadedSize;
// Stream<Uint8List>
final stream = response.data!.stream;
final total =
int.parse(response.headers.value(Headers.contentLengthHeader) ?? '0') +
downloadedSize;
if (downloadedSize == 0) {
onTaskRunning(progress: 0, total: total);
if (received == 0) {
onReceiveProgress?.call(0, contentLength);
}
late StreamSubscription subscription;
subscription = stream.listen(
(data) {
subscription.pause();
// Write file asynchronously
asyncWrite = raf
.writeFrom(data)
.then((result) async {
// Notify progress
received += data.length;
onTaskRunning(progress: received, total: total);
raf = result;
if (_cancelToken != null && !_cancelToken!.isCancelled) {
subscription.resume();
}
})
.catchError((Object e) async {
try {
await subscription.cancel().catchError((_) {});
_closed = true;
await raf.close().catchError((_) => raf);
if (file.existsSync()) {
await file.delete().catchError((_) => file);
}
} catch (e) {
_status = DownloadStatus.failDownload;
onTaskError(progress: received, total: total, error: e);
} finally {
_complete();
}
});
},
onDone: () async {
try {
await asyncWrite;
_closed = true;
await raf.close().catchError((_) => raf);
_status = DownloadStatus.completed;
onTaskComplete();
} catch (e) {
_status = DownloadStatus.failDownload;
onTaskError(progress: received, total: total, error: e);
} finally {
_complete();
}
},
onError: (e) async {
try {
await closeAndDelete(delete: true);
} catch (e) {
_cancel();
_status = DownloadStatus.failDownload;
onTaskError(progress: received, total: total, error: e);
} finally {
_complete();
}
},
cancelOnError: true,
);
_cancelToken?.whenCancel.then((_) async {
await subscription.cancel();
await closeAndDelete();
_complete();
});
}
Future<void>? _cancel() {
if (_cancelToken != null) {
_cancelToken?.cancel();
_cancelToken = null;
try {
await for (final chunk in data.stream) {
sink.add(chunk);
received += chunk.length;
onReceiveProgress?.call(received, contentLength);
}
await sink.close();
_status = DownloadStatus.completed;
onDone();
} catch (e) {
await onError(e);
return;
}
return _completer?.future;
}
Future<void>? cancel({required bool isDelete}) {
Future<void> cancel({required bool isDelete}) {
if (!isDelete && _status == DownloadStatus.downloading) {
_status = DownloadStatus.pause;
}
return _cancel();
if (!_cancelToken.isCancelled) {
_cancelToken.cancel();
}
return task;
}
}

View File

@@ -1,7 +1,8 @@
import 'dart:async';
import 'dart:convert' show jsonDecode, utf8;
import 'dart:io' show Directory, File, FileSystemEntity;
import 'dart:convert' show jsonDecode, jsonEncode;
import 'dart:io' show Directory, File;
import 'package:PiliPlus/grpc/dm.dart';
import 'package:PiliPlus/http/download.dart';
import 'package:PiliPlus/http/init.dart';
import 'package:PiliPlus/models/common/video/video_quality.dart';
@@ -12,14 +13,13 @@ import 'package:PiliPlus/models_new/pgc/pgc_info_model/result.dart';
import 'package:PiliPlus/models_new/video/video_detail/data.dart';
import 'package:PiliPlus/models_new/video/video_detail/episode.dart' as ugc;
import 'package:PiliPlus/models_new/video/video_detail/page.dart';
import 'package:PiliPlus/pages/danmaku/controller.dart';
import 'package:PiliPlus/services/download/download_manager.dart';
import 'package:PiliPlus/utils/extension.dart';
import 'package:PiliPlus/utils/id_utils.dart';
import 'package:PiliPlus/utils/path_utils.dart';
import 'package:PiliPlus/utils/utils.dart';
import 'package:archive/archive.dart' show Inflate;
import 'package:dio/dio.dart' show Options, ResponseType;
import 'package:flutter/foundation.dart';
import 'package:flutter_cache_manager/flutter_cache_manager.dart';
import 'package:flutter_smart_dialog/flutter_smart_dialog.dart';
import 'package:get/get.dart';
import 'package:path/path.dart' as path;
@@ -30,12 +30,10 @@ import 'package:synchronized/synchronized.dart';
class DownloadService extends GetxService {
static const _entryFile = 'entry.json';
static const _indexFile = 'index.json';
static const _danmakuFile = 'danmaku.xml';
static const _coverFile = 'cover.jpg';
final _lock = Lock();
final downloaFlag = RxnInt();
final flagNotifier = <void Function()>{};
final waitDownloadQueue = <BiliDownloadEntryInfo>[];
final downloadList = RxList<BiliDownloadEntryInfo>();
@@ -51,41 +49,40 @@ class DownloadService extends GetxService {
DownloadManager? _downloadManager;
DownloadManager? _audioDownloadManager;
Completer? _completer;
Future<void>? get waitForInitialization => _completer?.future;
late Future<void> waitForInitialization;
@override
void onInit() {
super.onInit();
readDownloadList();
initDownloadList();
}
Future<void> readDownloadList() async {
_completer = Completer();
void initDownloadList() {
waitForInitialization = _readDownloadList();
}
Future<void> _readDownloadList() async {
final downloadDir = Directory(await _getDownloadPath());
final list = <BiliDownloadEntryInfo>[];
for (final dir in downloadDir.listSync()) {
await for (final dir in downloadDir.list()) {
if (dir is Directory) {
list.addAll(await _readDownloadDirectory(dir));
}
}
downloadList.value = list
..sort((a, b) => b.timeUpdateStamp.compareTo(a.timeUpdateStamp));
if (!_completer!.isCompleted) {
_completer!.complete();
}
}
Future<List<BiliDownloadEntryInfo>> _readDownloadDirectory(
FileSystemEntity pageDir,
Directory pageDir,
) async {
final result = <BiliDownloadEntryInfo>[];
if (!pageDir.existsSync() || pageDir is! Directory) {
if (!pageDir.existsSync()) {
return result;
}
for (final entryDir in pageDir.listSync()) {
await for (final entryDir in pageDir.list()) {
if (entryDir is Directory) {
final entryFile = File(path.join(entryDir.path, _entryFile));
if (entryFile.existsSync()) {
@@ -96,7 +93,7 @@ class DownloadService extends GetxService {
..entryDirPath = entryDir.path;
result.add(entry);
if (!entry.isCompleted) {
waitDownloadQueue.add(entry);
waitDownloadQueue.add(entry..status = DownloadStatus.wait);
}
} catch (_) {
if (kDebugMode) rethrow;
@@ -141,7 +138,7 @@ class DownloadService extends GetxService {
downloadedBytes: 0,
title: videoDetail?.title ?? videoArc!.title!,
typeTag: videoQuality.code.toString(),
cover: videoDetail?.pic ?? videoArc!.cover!,
cover: (videoDetail?.pic ?? videoArc!.cover!).http2https,
preferedVideoQuality: videoQuality.code,
qualityPithyDescription: videoQuality.desc,
guessedTotalBytes: 0,
@@ -234,14 +231,13 @@ class DownloadService extends GetxService {
Future<void> _createDownload(BiliDownloadEntryInfo entry) async {
final entryDir = await _getDownloadEntryDir(entry);
final entryJsonFile = File(path.join(entryDir.path, _entryFile));
final entryJsonStr = Utils.jsonEncoder.convert(entry.toJson());
await entryJsonFile.writeAsBytes(utf8.encode(entryJsonStr));
await entryJsonFile.writeAsString(jsonEncode(entry.toJson()));
entry
..pageDirPath = entryDir.parent.path
..entryDirPath = entryDir.path
..status = DownloadStatus.wait;
downloadList.insert(0, entry);
downloaFlag.refresh();
flagNotifier.refresh();
final currStatus = curDownload.value?.status?.index;
if (currStatus == null || currStatus > 3) {
startDownload(entry);
@@ -269,7 +265,7 @@ class DownloadService extends GetxService {
return pageDir;
}
Future<String> _getDownloadPath() async {
static Future<String> _getDownloadPath() async {
final dir = Directory(downloadPath);
if (!dir.existsSync()) {
await dir.create(recursive: true);
@@ -301,26 +297,38 @@ class DownloadService extends GetxService {
if (cid == null) {
return false;
}
final danmakuXMLFile = File(path.join(entry.entryDirPath, _danmakuFile));
if (isUpdate || !danmakuXMLFile.existsSync()) {
final danmakuFile = File(
path.join(entry.entryDirPath, PathUtils.danmakuName),
);
if (isUpdate || !danmakuFile.existsSync()) {
try {
if (!isUpdate) {
_updateCurStatus(DownloadStatus.getDanmaku);
}
final res = await Request().get(
'https://comment.bilibili.com/$cid.xml',
options: Options(responseType: ResponseType.bytes),
);
final xmlBytes = Inflate((res.data as Uint8List)).getBytes();
await danmakuXMLFile.writeAsBytes(xmlBytes);
final seg = (entry.totalTimeMilli / PlDanmakuController.segmentLength)
.ceil();
final res = await Future.wait([
for (var i = 1; i <= seg; i++)
DmGrpc.dmSegMobile(cid: cid, segmentIndex: i),
]);
final danmaku = res.removeAt(0).data;
for (var i in res) {
if (!i.isSuccess) {
throw i.toString();
}
danmaku.elems.addAll(i.data.elems);
}
res.clear();
await danmakuFile.writeAsBytes(danmaku.writeToBuffer());
return true;
} catch (e) {
if (!isUpdate) {
_updateCurStatus(DownloadStatus.failDanmaku);
}
if (kDebugMode) {
SmartDialog.showToast(e.toString());
}
if (kDebugMode) SmartDialog.showToast(e.toString());
return false;
}
}
@@ -331,13 +339,22 @@ class DownloadService extends GetxService {
required BiliDownloadEntryInfo entry,
}) async {
try {
await Request.dio.download(
entry.cover.http2https,
path.join(entry.entryDirPath, _coverFile),
);
final filePath = path.join(entry.entryDirPath, PathUtils.coverName);
if (File(filePath).existsSync()) {
return true;
}
final file = (await DefaultCacheManager().getFileFromCache(
entry.cover,
))?.file;
if (file != null) {
await file.copy(filePath);
} else {
await Request.dio.download(entry.cover, filePath);
}
return true;
} catch (_) {}
return false;
} catch (_) {
return false;
}
}
Future<void> _startDownload(BiliDownloadEntryInfo entry) async {
@@ -357,18 +374,16 @@ class DownloadService extends GetxService {
await videoDir.create(recursive: true);
}
final res = await Future.wait([
downloadDanmaku(entry: entry),
_downloadCover(entry: entry),
]);
final coverTask = _downloadCover(entry: entry);
if (!res.first) {
if (!await downloadDanmaku(entry: entry)) {
return;
}
final mediaJsonFile = File(path.join(videoDir.path, _indexFile));
final mediaJsonStr = Utils.jsonEncoder.convert(mediaFileInfo.toJson());
await mediaJsonFile.writeAsString(mediaJsonStr);
await Future.wait([
mediaJsonFile.writeAsString(jsonEncode(mediaFileInfo.toJson())),
coverTask,
]);
if (curDownload.value?.cid != entry.cid) {
return;
@@ -380,28 +395,25 @@ class DownloadService extends GetxService {
_downloadManager = DownloadManager(
url: first.url,
path: path.join(videoDir.path, PathUtils.videoNameType1),
onTaskRunning: _onTaskRunning,
onTaskComplete: _onTaskComplete,
onTaskError: _onTaskError,
)..start();
onReceiveProgress: _onReceive,
onDone: _onDone,
);
break;
case Type2 mediaFileInfo:
_downloadManager = DownloadManager(
url: mediaFileInfo.video.first.baseUrl,
path: path.join(videoDir.path, PathUtils.videoNameType2),
onTaskRunning: _onTaskRunning,
onTaskComplete: _onTaskComplete,
onTaskError: _onTaskError,
)..start();
onReceiveProgress: _onReceive,
onDone: _onDone,
);
final audio = mediaFileInfo.audio;
if (audio != null && audio.isNotEmpty) {
_audioDownloadManager = DownloadManager(
url: audio.first.baseUrl,
path: path.join(videoDir.path, PathUtils.audioNameType2),
onTaskRunning: _onAudioTaskRunning,
onTaskComplete: _onAudioTaskComplete,
onTaskError: _onAudioTaskError,
)..start();
onReceiveProgress: null,
onDone: _onAudioDone,
);
}
late final first = mediaFileInfo.video.first;
entry.pageData
@@ -425,17 +437,14 @@ class DownloadService extends GetxService {
Future<void> _updateBiliDownloadEntryJson(BiliDownloadEntryInfo entry) async {
final entryJsonFile = File(path.join(entry.entryDirPath, _entryFile));
final entryJsonStr = Utils.jsonEncoder.convert(entry.toJson());
await entryJsonFile.writeAsString(entryJsonStr);
await entryJsonFile.writeAsString(jsonEncode(entry.toJson()));
}
void _onTaskRunning({required int progress, required int total}) {
if (progress == 0 && total != 0) {
if (curDownload.value case final curEntryInfo?) {
_updateBiliDownloadEntryJson(curEntryInfo..totalBytes = total);
}
}
void _onReceive(int progress, int total) {
if (curDownload.value case final entry?) {
if (progress == 0 && total != 0) {
_updateBiliDownloadEntryJson(entry..totalBytes = total);
}
entry
..downloadedBytes = progress
..status = DownloadStatus.downloading;
@@ -443,55 +452,38 @@ class DownloadService extends GetxService {
}
}
void _onTaskComplete() {
final audioStatus = _audioDownloadManager?.status;
final status = switch (audioStatus) {
void _onDone([Object? error]) {
final status = switch (_audioDownloadManager?.status) {
DownloadStatus.downloading => DownloadStatus.audioDownloading,
DownloadStatus.failDownload => DownloadStatus.failDownloadAudio,
null => DownloadStatus.completed,
_ => audioStatus,
_ => _downloadManager?.status ?? DownloadStatus.pause,
};
_updateCurStatus(status);
if (status == DownloadStatus.completed) {
_completeDownload();
} else {
if (curDownload.value case final curEntryInfo?) {
_updateBiliDownloadEntryJson(
curEntryInfo..downloadedBytes = curEntryInfo.totalBytes,
);
if (curDownload.value case final curEntryInfo?) {
if (error == null) {
curEntryInfo.downloadedBytes = curEntryInfo.totalBytes;
}
if (status == DownloadStatus.completed) {
_completeDownload();
} else {
_updateBiliDownloadEntryJson(curEntryInfo);
}
}
}
void _onTaskError({
required int progress,
required int total,
required Object error,
}) {
_updateCurStatus(DownloadStatus.failDownload);
if (curDownload.value case final curEntryInfo?) {
curEntryInfo
..totalBytes = total
..downloadedBytes = progress;
_updateBiliDownloadEntryJson(curEntryInfo);
}
}
void _onAudioTaskRunning({required int progress, required int total}) {}
void _onAudioTaskComplete() {
void _onAudioDone([Object? error]) {
if (_downloadManager?.status == DownloadStatus.completed) {
_completeDownload();
}
}
void _onAudioTaskError({
required int progress,
required int total,
required Object error,
}) {
if (_downloadManager?.status == DownloadStatus.completed) {
_updateCurStatus(DownloadStatus.failDownloadAudio);
if (error == null) {
_completeDownload();
} else {
final status = _audioDownloadManager?.status ?? DownloadStatus.pause;
_updateCurStatus(
status == DownloadStatus.failDownload
? DownloadStatus.failDownloadAudio
: status,
);
}
}
}
@@ -504,7 +496,7 @@ class DownloadService extends GetxService {
..downloadedBytes = entry.totalBytes
..isCompleted = true;
await _updateBiliDownloadEntryJson(entry);
downloaFlag.refresh();
flagNotifier.refresh();
curDownload.value = null;
_downloadManager = null;
_audioDownloadManager = null;
@@ -514,7 +506,7 @@ class DownloadService extends GetxService {
void _nextDownload() {
if (waitDownloadQueue.isNotEmpty) {
final next = waitDownloadQueue.removeAt(0);
if (downloadList.contains(next)) {
if (!next.isCompleted && downloadList.contains(next)) {
startDownload(next);
} else {
_nextDownload();
@@ -530,7 +522,7 @@ class DownloadService extends GetxService {
}
final downloadDir = Directory(entry.pageDirPath);
if (downloadDir.existsSync()) {
if (downloadDir.listSync().length <= 1) {
if (!await downloadDir.lengthGte(2)) {
await downloadDir.tryDel(recursive: true);
} else {
final entryDir = Directory(entry.entryDirPath);
@@ -541,13 +533,13 @@ class DownloadService extends GetxService {
}
downloadList.remove(entry);
waitDownloadQueue.remove(entry);
downloaFlag.refresh();
flagNotifier.refresh();
}
Future<void> deletePage({required String pageDirPath}) async {
await Directory(pageDirPath).tryDel(recursive: true);
downloadList.removeWhere((e) => e.pageDirPath == pageDirPath);
downloaFlag.refresh();
flagNotifier.refresh();
}
Future<void> cancelDownload({
@@ -574,3 +566,11 @@ class DownloadService extends GetxService {
}
}
}
extension on Set<void Function()> {
void refresh() {
for (var i in this) {
i();
}
}
}