LCOV - code coverage report

Current view
top level - src/pool - _worker_stream_task.dart
Test
lcov.info
Date
2025-03-26
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines2929100.0%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import '../_impl/xplat/_forward_stream_controller.dart';
4import '../exceptions/squadron_exception.dart';
5import '../stats/perf_counter.dart';
6import '../worker/worker.dart';
7import '_worker_task.dart';
8import 'stream_task.dart';
9
10final class WorkerStreamTask<T, W extends Worker> extends WorkerTask<T, W>
11 implements StreamTask<T> {
12 /// Creates a new [StreamTask].
138 WorkerStreamTask(this._producer, PerfCounter? counter) : super(counter) {
1412 _controller = ForwardStreamController<T>(onListen: () async {
15 try {
168 final worker = await _worker.future;
174 if (canceledException != null || worker == null) {
18 // the task was canceled
192 throw canceledException!;
20 } else {
21 // otherwise, forward data from the worker
2220 _controller.attachSubscription(_producer(worker).listen(
234 _onData,
244 onError: _onError,
258 onDone: _controller.close,
26 cancelOnError: false,
27 ));
28 }
29 } catch (ex, st) {
304 _closeWithError(SquadronException.from(ex, st));
31 }
32 });
33 }
34
35 final Stream<T> Function(W worker) _producer;
36 final _worker = Completer<W?>();
37
38 late final ForwardStreamController<T> _controller;
39
404 @override
418 Stream<T> get stream => _controller.stream;
42
432 void _closeWithError(SquadronException ex) {
444 _controller.addError(ex);
454 _controller.close();
46 }
47
4812 void _onData(T data) => _controller.add(data);
49
5012 void _onError(ex, st) => _controller.addError(SquadronException.from(ex, st));
51
522 @override
53 void cancel([String? message]) {
542 super.cancel(message);
554 if (!_worker.isCompleted) {
56 // task canceled before it got scheduled
574 _worker.complete(null);
58 }
594 if (_controller.subscription != null) {
60 // task canceled after a listener subscribed to the stream
612 _closeWithError(canceledException!);
62 }
63 }
64
654 @override
66 Future<bool> execute(W worker) {
674 if (canceledException == null) {
68 // run with worker
698 _worker.complete(worker);
70 }
718 return _controller.done
7212 .then((_) => canceledException == null)
734 .catchError((_) => false);
74 }
75}
Choose Features