LCOV - code coverage report

Current view
top level - src/_impl/xplat - _result_stream.dart
Test
lcov.info
Date
2026-03-04
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines385470.4%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import '../../channel.dart';
4import '../../exceptions/squadron_error.dart';
5import '../../exceptions/squadron_exception.dart';
6import '../../worker/worker_message.dart';
7import '../../worker/worker_request.dart';
8import '../../worker/worker_response.dart';
9import '_forward_stream_controller.dart';
10
11class ResultStream {
1210 ResultStream(
13 Channel channel,
14 WorkerRequest req,
15 Stream<WorkerResponse> Function() sendRequest,
16 bool streaming,
17 ) {
185 final streamIdCompleter = streaming ? Completer<StreamId?>() : null;
1920 final command = req.command, token = req.cancelToken;
20
215 void $decodeStreamOfResponses(WorkerResponse res) {
225 if (!res.unwrapInPlace(channel)) return;
23
245 final hasStreamId = streamIdCompleter!.isCompleted;
255 if (res.endOfStream) {
26 // handle endofStream
27 if (!hasStreamId) {
280 streamIdCompleter.complete(null);
290 channel.logger
300 ?.e('Invalid state: unexpected endOfStream for command $command');
310 _controller.safeAddError(SquadronErrorImpl.create(
32 'Invalid state: unexpected endOfStream',
33 null,
34 command,
35 ));
36 }
378 _controller.close();
38 return;
39 }
40
415 final error = res.error;
42 if (error == null && !hasStreamId) {
43 // the first result from a streaming operation is the stream ID
4415 streamIdCompleter.complete(StreamId.from(res.result));
45 } else if (error != null) {
466 _controller.safeAddError(error);
47 if (!hasStreamId) {
48 // if any error comes before the stream ID, somethind bad happened
490 streamIdCompleter.complete(null);
500 _controller.close();
51 return;
52 }
53 } else {
54 try {
5515 _controller.safeAdd(res.result);
56 } catch (ex, st) {
570 _controller.safeAddError(SquadronException.from(ex, st, command));
58 }
59 }
60
612 final canceled = token?.exception;
62 if (canceled != null) {
630 _controller.safeAddError(canceled);
640 _controller.close();
65 }
66 }
67
6810 void $decodeSingleResponse(WorkerResponse res) {
6910 if (!res.unwrapInPlace(channel)) return;
70
7110 final error = res.error;
72 if (error != null) {
736 _controller.safeAddError(error);
74 } else {
75 try {
7630 _controller.safeAdd(res.result);
77 } catch (ex, st) {
780 _controller.safeAddError(SquadronException.from(ex, st, command));
79 }
80 }
81
8220 _controller.close();
83 }
84
854 Future<StreamId?> $getStreamId(StreamSubscription sub) {
86 streamIdCompleter as Completer<StreamId?>;
87 var count = 0;
885 if (sub.isPaused && !streamIdCompleter.isCompleted) {
89 // if the subscription was paused and the streamId is not available,
90 // resume to have the streamId eventually come through.
910 while (sub.isPaused) {
920 count++;
930 sub.resume();
94 }
95 }
96 // wait for the streamId...
9712 return streamIdCompleter.future.then((streamId) {
984 while (count > 0) {
990 count--;
1000 sub.pause();
101 }
102 return streamId;
103 });
104 // restore subscription pause
105 }
106
10710 Future<void> $onCancel() async {
10820 final sub = _controller.subscription;
109 if (streamIdCompleter != null && sub != null) {
110 // this is a streaming operation and the subscription is active, so
111 // we need to inform the worker that the stream has been canceled
1124 final streamId = await $getStreamId(sub);
113 if (streamId != null) {
1144 channel.cancelStream(streamId);
115 }
1164 await sub.cancel();
117 }
118 }
119
1203 void $closeWithError(Object error, [StackTrace? st]) {
1219 _controller.safeAddError(SquadronException.from(error, st, command));
1226 _controller.close();
123 }
124
12510 void $onListen() {
126 try {
127 // do not send the request if the token is already canceled
1282 token?.throwIfCanceled();
129 // send the request and start decoding responses
13040 _controller.attachSubscription(sendRequest().listen(
131 streaming ? $decodeStreamOfResponses : $decodeSingleResponse,
132 onError: $closeWithError,
13320 onDone: _controller.close,
134 cancelOnError: false,
135 ));
136 } catch (ex, st) {
1370 $closeWithError(ex, st);
138 }
139 }
140
14120 _controller = ForwardStreamController<dynamic>(
142 onListen: $onListen,
143 onCancel: $onCancel,
144 );
145 }
146
147 late final ForwardStreamController<dynamic> _controller;
148
14930 Stream<dynamic> get stream => _controller.stream;
150
15130 Future<void> get done => _controller.done;
152}
Choose Features