LCOV - code coverage report

Current view
top level - src/_impl/native - _channel_impl.dart
Test
lcov.info
Date
2025-03-26
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines637090.0%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1part of '_channel.dart';
2
3/// [Channel] implementation for the Native world.
4final class _VmChannel implements Channel {
510 _VmChannel._(this._sendPort, this.logger, this.exceptionManager);
6
7 /// [SendPort] to communicate with the [Isolate] if the channel is owned by
8 /// the worker owner. Otherwise, [SendPort] to return values to the client.
9 final vm.SendPort _sendPort;
10
11 PlatformThread? _thread;
12 final _activeConnections = <ForwardStreamController<WorkerResponse>>[];
13
14 @override
15 final ExceptionManager exceptionManager;
16
17 @override
18 final Logger? logger;
19
20 bool _closed = false;
21
22 /// [Channel] serialization in Native world returns the [SendPort].
233 @override
243 PlatformChannel serialize() => _sendPort;
25
26 /// [Channel] sharing in JavaScript world returns a [_VmForwardChannel].
273 @override
283 Channel share() => _VmForwardChannel._(
2912 _sendPort, vm.ReceivePort(), logger, exceptionManager);
30
3110 void _postRequest(WorkerRequest req, {bool force = false}) {
3210 if (_closed && !force) {
330 throw SquadronErrorImpl.create('Channel is closed');
34 }
35 try {
3612 req.cancelToken?.ensureStarted();
3730 _sendPort.send(req.wrapInPlace());
38 } catch (ex, st) {
391 logger?.e(() => 'Failed to post request $req: $ex');
402 throw SquadronErrorImpl.create('Failed to post request: $ex', st);
41 }
42 }
43
44 /// Sends a termination [WorkerRequest] to the [vm.Isolate].
4510 @override
46 FutureOr<void> close() {
4710 if (!_closed) {
4820 _postRequest(WorkerRequest.stop());
4910 _closed = true;
50 }
51 }
52
53 /// Sends a close stream [WorkerRequest] to the [vm.Isolate].
543 @override
55 FutureOr<void> cancelStream(int streamId) {
566 _postRequest(WorkerRequest.cancelStream(streamId), force: true);
57 }
58
59 /// Sends a cancel token [WorkerRequest] to the [vm.Isolate].
602 @override
61 FutureOr<void> cancelToken(SquadronCancelationToken? token) {
62 if (token != null) {
634 _postRequest(WorkerRequest.cancel(token), force: true);
64 }
65 }
66
6710 void _enter(ForwardStreamController<WorkerResponse> controller) {
6820 _activeConnections.add(controller);
69 }
70
711 void _leave(ForwardStreamController<WorkerResponse> controller) {
722 _activeConnections.remove(controller);
731 controller.close();
74 }
75
7610 Stream<dynamic> _getResponseStream(
77 vm.ReceivePort port,
78 WorkerRequest req,
79 void Function(WorkerRequest) post, {
80 required bool streaming,
81 }) {
8210 final command = req.command;
83
84 // send the request, return a stream of responses
8510 Stream<WorkerResponse> $sendRequest() {
86 late final ForwardStreamController<WorkerResponse> controller;
87
8810 void $forwardMessage(WorkerResponse msg) {
8920 if (!controller.isClosed) controller.add(msg);
90 }
91
921 void $forwardError(Object error, StackTrace? st) {
931 if (!controller.isClosed) {
942 controller.addError(SquadronException.from(error, st, command));
95 }
96 }
97
980 void $done() {
990 _leave(controller);
100 }
101
10220 controller = ForwardStreamController(onListen: () {
103 // do nothing if the controller is closed already
10410 if (controller.isClosed) return;
105
106 // bind the controller
10730 controller.attachSubscription(port.cast<WorkerResponse>().listen(
108 $forwardMessage,
109 onError: $forwardError,
110 onDone: $done,
111 cancelOnError: false,
112 ));
113
114 // send the request
115 try {
11610 _enter(controller);
11710 post(req);
118 } catch (ex, st) {
1191 $forwardError(ex, st);
1201 _leave(controller);
121 }
122 });
12310 return controller.stream;
124 }
125
126 // return a stream of decoded responses
12710 final res = ResultStream(this, req, $sendRequest, streaming);
12850 res.done.whenComplete(() => port.close()).ignore();
12910 return res.stream;
130 }
131
132 /// creates a [ReceivePort] and a [WorkerRequest] and sends it to the
133 /// [Isolate]. This method expects a single value from the [Isolate]
13410 @override
135 Future<dynamic> sendRequest(
136 int command,
137 List args, {
138 SquadronCancelationToken? token,
139 bool inspectRequest = false,
140 bool inspectResponse = false,
141 }) {
14210 final completer = Completer();
143 late final StreamSubscription sub;
144
14510 void $success(dynamic data) async {
14610 await sub.cancel();
14720 if (!completer.isCompleted) completer.complete(data);
148 }
149
1504 void $fail(Object ex, [StackTrace? st]) async {
1514 await sub.cancel();
1528 if (!completer.isCompleted) completer.completeError(ex, st);
153 }
154
1550 void $done() async {
1560 await sub.cancel();
1570 if (!completer.isCompleted) {
1580 $fail(WorkerException('No response from worker', null, command));
159 }
160 }
161
16210 final receiver = vm.ReceivePort();
16310 final req = WorkerRequest.userCommand(
16410 receiver.sendPort, command, args, token, inspectResponse);
16520 sub = _getResponseStream(receiver, req, _postRequest, streaming: false)
16610 .listen($success, onError: $fail, onDone: $done);
16710 return completer.future;
168 }
169
170 /// Creates a [ReceivePort] and a [WorkerRequest] and sends it to the
171 /// [Isolate]. This method expects a stream of values from the [Isolate].
172 /// The [Isolate] must send a [WorkerResponse.endOfStream] to close the
173 /// [Stream].
1745 @override
175 Stream<dynamic> sendStreamingRequest(
176 int command,
177 List args, {
178 SquadronCancelationToken? token,
179 bool inspectRequest = false,
180 bool inspectResponse = false,
181 }) {
1825 final receiver = vm.ReceivePort();
1835 final req = WorkerRequest.userCommand(
1845 receiver.sendPort, command, args, token, inspectResponse);
18510 return _getResponseStream(receiver, req, _postRequest, streaming: true);
186 }
187}
Choose Features