LCOV - code coverage report

Current view
top level - src/_impl/native - _channel_impl.dart
Test
lcov.info
Date
2025-11-17
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines687590.7%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1part of '_channel.dart';
2
3/// [Channel] implementation for the Native world.
4final class _VmChannel implements Channel {
510 _VmChannel._(this._sendPort, this.logger, this.exceptionManager);
6
7 /// [vm.SendPort] to communicate with the [vm.Isolate] if the channel is owned by
8 /// the worker owner. Otherwise, [vm.SendPort] to return values to the client.
9 final vm.SendPort _sendPort;
10
11 PlatformThread? _thread;
12 final _activeConnections = <ForwardStreamController<WorkerResponse>>[];
13
14 @override
15 final ExceptionManager exceptionManager;
16
17 @override
18 final Logger? logger;
19
20 final _closed = Completer<void>();
21
221 @override
232 Future<void> get closed => _closed.future;
24
25 /// [Channel] serialization in Native world returns the [vm.SendPort].
263 @override
273 PlatformChannel serialize() => _sendPort;
28
29 /// [Channel] sharing in native world returns a [_VmForwardChannel].
303 @override
313 Channel share() => _VmForwardChannel._(
3212 _sendPort, vm.ReceivePort(), logger, exceptionManager);
33
3410 void _postRequest(WorkerRequest req, {bool force = false}) {
3520 if (_closed.isCompleted && !force) {
360 throw SquadronErrorImpl.create('Channel is closed');
37 }
38 try {
3912 req.cancelToken?.ensureStarted();
4030 _sendPort.send(req.wrapInPlace());
41 } catch (ex, st) {
421 logger?.e(() => 'Failed to post request $req: $ex');
432 throw SquadronErrorImpl.create('Failed to post request: $ex', st);
44 }
45 }
46
47 /// Sends a termination [WorkerRequest] to the [vm.Isolate].
4810 @override
49 FutureOr<void> close() {
5020 if (!_closed.isCompleted) {
5120 _postRequest(WorkerRequest.stop());
5220 _closed.complete();
53 }
5420 return _closed.future;
55 }
56
57 /// Sends a close stream [WorkerRequest] to the [vm.Isolate].
584 @override
59 FutureOr<void> cancelStream(int streamId) {
608 _postRequest(WorkerRequest.cancelStream(streamId), force: true);
61 }
62
63 /// Sends a cancel token [WorkerRequest] to the [vm.Isolate].
642 @override
65 FutureOr<void> cancelToken(SquadronCancelationToken? token) {
66 if (token != null) {
674 _postRequest(WorkerRequest.cancel(token), force: true);
68 }
69 }
70
7110 void _enter(ForwardStreamController<WorkerResponse> controller) {
7220 _activeConnections.add(controller);
73 }
74
7510 void _leave(ForwardStreamController<WorkerResponse> controller) {
7620 _activeConnections.remove(controller);
7710 controller.close();
78 }
79
8010 Stream<dynamic> _getResponseStream(
81 vm.ReceivePort port,
82 WorkerRequest req,
83 void Function(WorkerRequest) post, {
84 required bool streaming,
85 }) {
8610 final command = req.command;
87
88 // send the request, return a stream of responses
8910 Stream<WorkerResponse> $sendRequest() {
90 late final ForwardStreamController<WorkerResponse> controller;
91
9210 void $forwardMessage(WorkerResponse msg) {
9320 if (!controller.isClosed) controller.add(msg);
94 }
95
961 void $forwardError(Object error, [StackTrace? st]) {
971 if (!controller.isClosed) {
982 controller.addError(SquadronException.from(error, st, command));
99 }
100 }
101
1020 void $done() {
1030 _leave(controller);
104 }
105
10620 controller = ForwardStreamController(onListen: () {
107 // do nothing if the controller is closed already
10810 if (controller.isClosed) return;
109
110 // bind the controller
11130 controller.attachSubscription(port.cast<WorkerResponse>().listen(
112 $forwardMessage,
113 onError: $forwardError,
114 onDone: $done,
115 cancelOnError: false,
116 ));
117
118 // send the request
119 try {
12010 _enter(controller);
12110 post(req);
122 } catch (ex, st) {
1231 $forwardError(ex, st);
1241 _leave(controller);
125 }
12610 }, onCancel: () {
12710 _leave(controller);
128 });
12910 return controller.stream;
130 }
131
132 // return a stream of decoded responses
13310 final res = ResultStream(this, req, $sendRequest, streaming);
13450 res.done.whenComplete(() => port.close()).ignore();
13510 return res.stream;
136 }
137
138 /// creates a [vm.ReceivePort] and a [WorkerRequest] and sends it to the
139 /// [vm.Isolate]. This method expects a single value from the [Isolate]
14010 @override
141 Future<dynamic> sendRequest(
142 int command,
143 List args, {
144 SquadronCancelationToken? token,
145 bool inspectRequest = false,
146 bool inspectResponse = false,
147 }) {
14810 final completer = Completer();
149 late final StreamSubscription sub;
150
15110 void $success(dynamic data) async {
15210 await sub.cancel();
15320 if (!completer.isCompleted) completer.complete(data);
154 }
155
1564 void $fail(Object ex, [StackTrace? st]) async {
1574 await sub.cancel();
1588 if (!completer.isCompleted) completer.completeError(ex, st);
159 }
160
1610 void $done() async {
1620 await sub.cancel();
1630 if (!completer.isCompleted) {
1640 $fail(WorkerException('No response from worker', null, command));
165 }
166 }
167
16810 final receiver = vm.ReceivePort();
16910 final req = WorkerRequest.userCommand(
17010 receiver.sendPort, command, args, token, inspectResponse);
17120 sub = _getResponseStream(receiver, req, _postRequest, streaming: false)
17210 .listen($success, onError: $fail, onDone: $done);
17310 return completer.future;
174 }
175
176 /// Creates a [vm.ReceivePort] and a [WorkerRequest] and sends it to the
177 /// [vm.Isolate]. This method expects a stream of values from the [vm.Isolate].
178 /// The [vm.Isolate] must send a [WorkerResponse.endOfStream] to close the
179 /// [Stream].
1805 @override
181 Stream<dynamic> sendStreamingRequest(
182 int command,
183 List args, {
184 SquadronCancelationToken? token,
185 bool inspectRequest = false,
186 bool inspectResponse = false,
187 }) {
1885 final receiver = vm.ReceivePort();
1895 final req = WorkerRequest.userCommand(
1905 receiver.sendPort, command, args, token, inspectResponse);
19110 return _getResponseStream(receiver, req, _postRequest, streaming: true);
192 }
193}
Choose Features