LCOV - code coverage report

Current view
top level - src/_impl/xplat - _result_stream.dart
Test
lcov.info
Date
2026-02-21
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines385470.4%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import '../../channel.dart';
4import '../../exceptions/squadron_error.dart';
5import '../../exceptions/squadron_exception.dart';
6import '../../worker/worker_request.dart';
7import '../../worker/worker_response.dart';
8import '_forward_stream_controller.dart';
9
10class ResultStream {
1110 ResultStream(
12 Channel channel,
13 WorkerRequest req,
14 Stream<WorkerResponse> Function() sendRequest,
15 bool streaming,
16 ) {
175 final streamIdCompleter = streaming ? Completer<int?>() : null;
1820 final command = req.command, token = req.cancelToken;
19
205 void $decodeStreamOfResponses(WorkerResponse res) {
215 if (!res.unwrapInPlace(channel)) return;
22
235 final hasStreamId = streamIdCompleter!.isCompleted;
245 if (res.endOfStream) {
25 // handle endofStream
26 if (!hasStreamId) {
270 streamIdCompleter.complete(null);
280 channel.logger
290 ?.e('Invalid state: unexpected endOfStream for command $command');
300 _controller.safeAddError(SquadronErrorImpl.create(
31 'Invalid state: unexpected endOfStream',
32 null,
33 command,
34 ));
35 }
368 _controller.close();
37 return;
38 }
39
405 final error = res.error;
41 if (error == null && !hasStreamId) {
42 // the first result from a streaming operation is the stream ID
4315 streamIdCompleter.complete((res.result as num).toInt());
44 } else if (error != null) {
456 _controller.safeAddError(error);
46 if (!hasStreamId) {
47 // if any error comes before the stream ID, somethind bad happened
480 streamIdCompleter.complete(null);
490 _controller.close();
50 return;
51 }
52 } else {
53 try {
5415 _controller.safeAdd(res.result);
55 } catch (ex, st) {
560 _controller.safeAddError(SquadronException.from(ex, st, command));
57 }
58 }
59
602 final canceled = token?.exception;
61 if (canceled != null) {
620 _controller.safeAddError(canceled);
630 _controller.close();
64 }
65 }
66
6710 void $decodeSingleResponse(WorkerResponse res) {
6810 if (!res.unwrapInPlace(channel)) return;
69
7010 final error = res.error;
71 if (error != null) {
726 _controller.safeAddError(error);
73 } else {
74 try {
7530 _controller.safeAdd(res.result);
76 } catch (ex, st) {
770 _controller.safeAddError(SquadronException.from(ex, st, command));
78 }
79 }
80
8120 _controller.close();
82 }
83
844 Future<int?> $getStreamId(StreamSubscription sub) {
85 streamIdCompleter as Completer<int?>;
86 var count = 0;
875 if (sub.isPaused && !streamIdCompleter.isCompleted) {
88 // if the subscription was paused and the streamId is not available,
89 // resume to have the streamId eventually come through.
900 while (sub.isPaused) {
910 count++;
920 sub.resume();
93 }
94 }
95 // wait for the streamId...
9612 return streamIdCompleter.future.then((streamId) {
974 while (count > 0) {
980 count--;
990 sub.pause();
100 }
101 return streamId;
102 });
103 // restore subscription pause
104 }
105
10610 Future<void> $onCancel() async {
10720 final sub = _controller.subscription;
108 if (streamIdCompleter != null && sub != null) {
109 // this is a streaming operation and the subscription is active, so
110 // we need to inform the worker that the stream has been canceled
1114 final streamId = await $getStreamId(sub);
112 if (streamId != null) {
1134 channel.cancelStream(streamId);
114 }
1154 await sub.cancel();
116 }
117 }
118
1193 void $closeWithError(Object error, [StackTrace? st]) {
1209 _controller.safeAddError(SquadronException.from(error, st, command));
1216 _controller.close();
122 }
123
12410 void $onListen() {
125 try {
126 // do not send the request if the token is already canceled
1272 token?.throwIfCanceled();
128 // send the request and start decoding responses
12940 _controller.attachSubscription(sendRequest().listen(
130 streaming ? $decodeStreamOfResponses : $decodeSingleResponse,
131 onError: $closeWithError,
13220 onDone: _controller.close,
133 cancelOnError: false,
134 ));
135 } catch (ex, st) {
1360 $closeWithError(ex, st);
137 }
138 }
139
14020 _controller = ForwardStreamController<dynamic>(
141 onListen: $onListen,
142 onCancel: $onCancel,
143 );
144 }
145
146 late final ForwardStreamController<dynamic> _controller;
147
14830 Stream<dynamic> get stream => _controller.stream;
149
15030 Future<void> get done => _controller.done;
151}
Choose Features