LCOV - code coverage report

Current view
top level - src/_impl/xplat - _result_stream.dart
Test
lcov.info
Date
2025-03-26
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines385470.4%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import '../../channel.dart';
4import '../../exceptions/squadron_error.dart';
5import '../../exceptions/squadron_exception.dart';
6import '../../worker/worker_request.dart';
7import '../../worker/worker_response.dart';
8import '_forward_stream_controller.dart';
9
10class ResultStream {
1110 ResultStream(
12 Channel channel,
13 WorkerRequest req,
14 Stream<WorkerResponse> Function() sendRequest,
15 bool streaming,
16 ) {
175 final streamIdCompleter = streaming ? Completer<int?>() : null;
1820 final command = req.command, token = req.cancelToken;
19
205 void $decodeStreamOfResponses(WorkerResponse res) {
215 if (!res.unwrapInPlace(channel)) return;
22
235 final hasStreamId = streamIdCompleter!.isCompleted;
245 if (res.endOfStream) {
25 // handle endofStream
26 if (!hasStreamId) {
270 streamIdCompleter.complete(null);
280 channel.logger
290 ?.e('Invalid state: unexpected endOfStream for command $command');
300 _controller.addError(SquadronErrorImpl.create(
31 'Invalid state: unexpected endOfStream',
32 null,
33 command,
34 ));
35 }
368 _controller.close();
37 return;
38 }
39
405 final error = res.error;
41 if (error == null && !hasStreamId) {
42 // the first result from a streaming operation is the stream ID
4315 streamIdCompleter.complete((res.result as num).toInt());
44 } else if (error != null) {
456 _controller.addError(error);
46 if (!hasStreamId) {
47 // if any error comes before the stream ID, somethind bad happened
480 streamIdCompleter.complete(null);
490 _controller.close();
50 return;
51 }
52 } else {
53 try {
5415 _controller.add(res.result);
55 } catch (ex, st) {
560 _controller.addError(SquadronException.from(ex, st, command));
57 }
58 }
59
602 final canceled = token?.exception;
61 if (canceled != null) {
620 _controller.addError(canceled);
630 _controller.close();
64 }
65 }
66
6710 void $decodeSingleResponse(WorkerResponse res) {
6810 if (!res.unwrapInPlace(channel)) return;
69
7010 final error = res.error;
71 if (error != null) {
726 _controller.addError(error);
73 } else {
74 try {
7530 _controller.add(res.result);
76 } catch (ex, st) {
770 _controller.addError(SquadronException.from(ex, st, command));
78 }
79 }
80
8120 _controller.close();
82 }
83
843 Future<int?> $getStreamId(StreamSubscription sub) async {
85 streamIdCompleter as Completer<int?>;
86 var count = 0;
874 if (sub.isPaused && !streamIdCompleter.isCompleted) {
88 // if the subscription was paused and the streamId is not available,
89 // resume to have the streamId eventually come through.
900 while (sub.isPaused) {
910 count++;
920 sub.resume();
93 }
94 }
95 // wait for the streamId...
963 final streamId = await streamIdCompleter.future;
97 // restore subscription pause
983 while (count > 0) {
990 count--;
1000 sub.pause();
101 }
102 return streamId;
103 }
104
10510 Future<void> $onCancel() async {
10620 final sub = _controller.subscription;
107 if (streamIdCompleter != null && sub != null) {
108 // this is a streaming operation and the subscription is active, so
109 // we need to inform the worker that the stream has been canceled
1103 final streamId = await $getStreamId(sub);
111 if (streamId != null) {
1123 channel.cancelStream(streamId);
113 }
1143 await sub.cancel();
115 }
116 }
117
1182 void $closeWithError(Object error, StackTrace? st) {
1196 _controller.addError(SquadronException.from(error, st, command));
1204 _controller.close();
121 }
122
12310 void $onListen() {
124 try {
125 // do not send the request if the token is already canceled
1262 token?.throwIfCanceled();
127 // send the request and start decoding responses
12840 _controller.attachSubscription(sendRequest().listen(
129 streaming ? $decodeStreamOfResponses : $decodeSingleResponse,
130 onError: $closeWithError,
13120 onDone: _controller.close,
132 cancelOnError: false,
133 ));
134 } catch (ex, st) {
1350 $closeWithError(ex, st);
136 }
137 }
138
13920 _controller = ForwardStreamController<dynamic>(
140 onListen: $onListen,
141 onCancel: $onCancel,
142 );
143 }
144
145 late final ForwardStreamController<dynamic> _controller;
146
14730 Stream<dynamic> get stream => _controller.stream;
148
14930 Future<void> get done => _controller.done;
150}
Choose Features