LCOV - code coverage report

Current view
top level - src/_impl/xplat - _result_stream.dart
Test
lcov.info
Date
2024-11-13
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines598272.0%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import '../../channel.dart';
4import '../../exceptions/squadron_error.dart';
5import '../../exceptions/squadron_exception.dart';
6import '../../worker/worker_request.dart';
7import '../../worker/worker_response.dart';
8import '_forward_stream_controller.dart';
9
10class ResultStream {
1118 ResultStream(
12 Channel channel,
13 WorkerRequest req,
14 Stream<WorkerResponse> Function() sendRequest,
15 bool streaming,
16 ) {
175 final streamIdCompleter = streaming ? Completer<int?>() : null;
1818 final command = req.command, token = req.cancelToken;
19
2015 void $decodeStreamOfResponses(WorkerResponse res) {
2111 if (!res.unwrapInPlace(channel)) return;
22
2311 final hasStreamId = streamIdCompleter!.isCompleted;
245 if (res.endOfStream) {
25 // handle endofStream
264 if (!hasStreamId) {
270 streamIdCompleter.complete(null);
280 channel.logger?.e('Invalid state: endOfStream before streamId');
290 _controller.addError(
300 SquadronErrorExt.create('Invalid state: unexpected endOfStream'));
31 }
3212 _controller.close();
334 return;
34 }
35
365 final error = res.error;
376 if (error == null && !hasStreamId) {
38 // the first result from a streaming operation is the stream ID
3920 streamIdCompleter.complete((res.result as num).toInt());
406 } else if (error != null) {
4110 _controller.addError(error);
424 if (!hasStreamId) {
43 // if any error comes before the stream ID, somethind bad happened
440 streamIdCompleter.complete(null);
450 _controller.close();
460 return;
47 }
48 } else {
490 try {
5021 _controller.add(res.result);
510 } catch (ex, st) {
520 _controller.addError(SquadronException.from(ex, st, command));
53 }
54 }
55
568 final canceled = token?.exception;
576 if (canceled != null) {
580 _controller.addError(canceled);
590 _controller.close();
60 }
616 }
62
6317 void $decodeSingleResponse(WorkerResponse res) {
6416 if (!res.unwrapInPlace(channel)) return;
65
668 final error = res.error;
678 if (error != null) {
6810 _controller.addError(error);
69 } else {
700 try {
7132 _controller.add(res.result);
720 } catch (ex, st) {
730 _controller.addError(SquadronException.from(ex, st, command));
74 }
75 }
76
7724 _controller.close();
788 }
79
8012 Future<int?> $getStreamId(StreamSubscription sub) async {
813 streamIdCompleter as Completer<int?>;
823 var count = 0;
837 if (sub.isPaused && !streamIdCompleter.isCompleted) {
84 // if the subscription was paused and the streamId is not available,
85 // resume to have the streamId eventually come through.
860 while (sub.isPaused) {
870 count++;
880 sub.resume();
89 }
90 }
91 // wait for the streamId...
926 final streamId = await streamIdCompleter.future;
93 // restore subscription pause
943 while (count > 0) {
950 count--;
960 sub.pause();
97 }
983 return streamId;
993 }
100
10118 Future<void> $onCancel() async {
10227 final sub = _controller.subscription;
1039 if (streamIdCompleter != null && sub != null) {
104 // this is a streaming operation and the subscription is active, so
105 // we need to inform the worker that the stream has been canceled
1066 final streamId = await $getStreamId(sub);
107 if (streamId != null) {
1086 channel.cancelStream(streamId);
109 }
1106 await sub.cancel();
111 }
1129 }
113
11410 void $closeWithError(Object error, StackTrace? st) {
1155 _controller.addError(SquadronException.from(error, st, command));
1164 _controller.close();
1172 }
118
11918 void $onListen() {
1209 try {
121 // do not send the request if the token is already canceled
12211 token?.throwIfCanceled();
123 // send the request and start decoding responses
12445 _controller.attachSubscription(sendRequest().listen(
1259 streaming ? $decodeStreamOfResponses : $decodeSingleResponse,
1269 onError: $closeWithError,
12727 onDone: _controller.close,
128 cancelOnError: false,
129 ));
1300 } catch (ex, st) {
1310 $closeWithError(ex, st);
132 }
1339 }
134
13523 _controller = ForwardStreamController<dynamic>(
136 onListen: $onListen,
137 onCancel: $onCancel,
138 );
1399 }
140
14110 late final ForwardStreamController<dynamic> _controller;
142
14327 Stream<dynamic> get stream => _controller.stream;
144
1450 Future<void> get done => _controller.done;
146}
Choose Features