LCOV - code coverage report

Current view
top level - src/pool - _worker_stream_task.dart
Test
lcov.info
Date
2024-11-13
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines344085.0%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import '../_impl/xplat/_forward_stream_controller.dart';
4import '../exceptions/squadron_exception.dart';
5import '../stats/perf_counter.dart';
6import '../worker/worker.dart';
7import '_worker_task.dart';
8import 'stream_task.dart';
9
10final class WorkerStreamTask<T, W extends Worker> extends WorkerTask<T, W>
11 implements StreamTask<T> {
12 /// Creates a new [StreamTask].
1312 WorkerStreamTask(this._producer, PerfCounter? counter) : super(counter) {
1416 _controller = ForwardStreamController<T>(onListen: () async {
154 try {
168 throwIfCanceled();
178 if (_controller.isClosed) return;
1812 final stream = await _streamer.future;
198 if (_controller.isClosed) {
20 // we might have a problem here: the controller is closed but the worker
21 // has started streaming; cancel the operation
220 stream.listen((_) {}).cancel();
23 } else {
2416 _controller.attachSubscription(stream.listen(
258 _onData,
268 onError: _onError,
2712 onDone: _controller.close,
28 cancelOnError: false,
29 ));
30 }
310 } catch (ex, st) {
320 _closeWithError(SquadronException.from(ex, st));
33 }
344 });
354 }
36
37 final Stream<T> Function(W worker) _producer;
38 final _streamer = Completer<Stream<T>>();
39
405 late final ForwardStreamController<T> _controller;
41
424 @override
438 Stream<T> get stream => _controller.stream;
44
451 void _closeWithError(SquadronException ex) {
462 _controller.addError(ex);
473 _controller.close();
48 }
49
5017 void _onData(T data) => _controller.add(data);
51
5216 void _onError(ex, st) => _controller.addError(SquadronException.from(ex, st));
53
541 @override
551 void cancel([String? message]) {
562 super.cancel(message);
573 _closeWithError(canceledException!);
581 }
59
604 @override
614 Future<bool> execute(W worker) {
624 try {
634 throwIfCanceled();
6412 final stream = _producer(worker);
6512 _streamer.complete(stream);
6620 return _controller.done.then((_) => true);
670 } catch (ex, st) {
680 _closeWithError(SquadronException.from(ex, st));
690 return Future.value(false);
70 }
714 }
72}
Choose Features