LCOV - code coverage report

Current view
top level - src/_impl/native - _channel_impl.dart
Test
lcov.info
Date
2026-02-21
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines657191.5%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1part of '_channel.dart';
2
3/// [Channel] implementation for the Native world.
4final class _VmChannel implements Channel {
510 _VmChannel._(this._sendPort, this.logger, this.exceptionManager);
6
7 /// [vm.SendPort] to communicate with the [vm.Isolate] if the channel is owned by
8 /// the worker owner. Otherwise, [vm.SendPort] to return values to the client.
9 final vm.SendPort _sendPort;
10
11 PlatformThread? _thread;
12 final _activeConnections = <ForwardStreamController<WorkerResponse>>[];
13
14 @override
15 final ExceptionManager exceptionManager;
16
17 @override
18 final Logger? logger;
19
20 final _closed = Completer<void>();
21
221 @override
232 Future<void> get closed => _closed.future;
24
25 /// [Channel] serialization in Native world returns the [vm.SendPort].
263 @override
273 PlatformChannel serialize() => _sendPort;
28
29 /// [Channel] sharing in native world returns a [_VmForwardChannel].
303 @override
313 Channel share() => _VmForwardChannel._(
3212 _sendPort, vm.ReceivePort(), logger, exceptionManager);
33
3410 void _postRequest(WorkerRequest req, {bool force = false}) {
3520 if (_closed.isCompleted && !force) {
360 throw SquadronErrorImpl.create('Channel is closed');
37 }
38 try {
3912 req.cancelToken?.ensureStarted();
4010 req.wrapInPlace();
4120 _sendPort.send(req);
42 } catch (ex, st) {
431 logger?.e(() => 'Failed to post request $req: $ex');
442 throw SquadronErrorImpl.create('Failed to post request: $ex', st);
45 }
46 }
47
48 /// Sends a termination [WorkerRequest] to the [vm.Isolate].
4910 @override
50 Future<void> close() {
5120 if (!_closed.isCompleted) {
5220 _postRequest(WorkerRequest.stop());
5320 _closed.complete();
54 }
5520 return _closed.future;
56 }
57
58 /// Sends a close stream [WorkerRequest] to the [vm.Isolate].
594 @override
60 void cancelStream(int streamId) {
618 _postRequest(WorkerRequest.cancelStream(streamId), force: true);
62 }
63
64 /// Sends a cancel token [WorkerRequest] to the [vm.Isolate].
652 @override
66 void cancelToken(SquadronCancelationToken? token) {
67 if (token != null) {
684 _postRequest(WorkerRequest.cancel(token), force: true);
69 }
70 }
71
7210 void _enter(ForwardStreamController<WorkerResponse> controller) {
7320 _activeConnections.add(controller);
74 }
75
7610 void _leave(ForwardStreamController<WorkerResponse> controller) {
7720 _activeConnections.remove(controller);
7810 controller.close();
79 }
80
8110 Stream<dynamic> _getResponseStream(
82 vm.ReceivePort port,
83 WorkerRequest req,
84 void Function(WorkerRequest) post, {
85 required bool streaming,
86 }) {
8710 final command = req.command;
88
89 // send the request, return a stream of responses
9010 Stream<WorkerResponse> $sendRequest() {
91 late final ForwardStreamController<WorkerResponse> controller;
92
931 void $forwardError(Object error, [StackTrace? st]) =>
942 controller.safeAddError(SquadronException.from(error, st, command));
95
9620 controller = ForwardStreamController(onListen: () {
97 // do nothing if the controller is closed already
9810 if (controller.isClosed) return;
99
100 // bind the controller
10130 controller.attachSubscription(port.cast<WorkerResponse>().listen(
10210 controller.safeAdd,
103 onError: $forwardError,
1040 onDone: () => _leave(controller),
105 cancelOnError: false,
106 ));
107
108 // send the request
109 try {
11010 _enter(controller);
11110 post(req);
112 } catch (ex, st) {
1131 $forwardError(ex, st);
1141 _leave(controller);
115 }
11610 }, onCancel: () {
11710 _leave(controller);
118 });
11910 return controller.stream;
120 }
121
122 // return a stream of decoded responses
12310 final res = ResultStream(this, req, $sendRequest, streaming);
12450 res.done.whenComplete(() => port.close()).ignore();
12510 return res.stream;
126 }
127
128 /// creates a [vm.ReceivePort] and a [WorkerRequest] and sends it to the
129 /// [vm.Isolate]. This method expects a single value from the [Isolate]
13010 @override
131 Future<dynamic> sendRequest(
132 int command,
133 List args, {
134 SquadronCancelationToken? token,
135 bool inspectRequest = false,
136 bool inspectResponse = false,
137 }) {
13810 final completer = Completer();
139 late final StreamSubscription sub;
140
14110 void $success(dynamic data) {
14240 sub.cancel().whenComplete(() => completer.safeComplete(data));
143 }
144
1454 void $failure(Object ex, [StackTrace? st]) {
14616 sub.cancel().whenComplete(() => completer.safeCompleteError(ex, st));
147 }
148
1490 void $done() {
1500 sub.cancel().whenComplete(() {
1510 if (!completer.isCompleted) {
1520 $failure(WorkerException('No response from worker', null, command));
153 }
154 });
155 }
156
15710 final receiver = vm.ReceivePort();
15810 final req = WorkerRequest.userCommand(
15910 receiver.sendPort, command, args, token, inspectResponse);
16020 sub = _getResponseStream(receiver, req, _postRequest, streaming: false)
16110 .listen($success, onError: $failure, onDone: $done);
16210 return completer.future;
163 }
164
165 /// Creates a [vm.ReceivePort] and a [WorkerRequest] and sends it to the
166 /// [vm.Isolate]. This method expects a stream of values from the [vm.Isolate].
167 /// The [vm.Isolate] must send a [WorkerResponse.endOfStream] to close the
168 /// [Stream].
1695 @override
170 Stream<dynamic> sendStreamingRequest(
171 int command,
172 List args, {
173 SquadronCancelationToken? token,
174 bool inspectRequest = false,
175 bool inspectResponse = false,
176 }) {
1775 final receiver = vm.ReceivePort();
1785 final req = WorkerRequest.userCommand(
1795 receiver.sendPort, command, args, token, inspectResponse);
18010 return _getResponseStream(receiver, req, _postRequest, streaming: true);
181 }
182}
Choose Features