LCOV - code coverage report

Current view
top level - src/_impl/native - _channel_impl.dart
Test
lcov.info
Date
2024-11-13
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines566191.8%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1part of '_channel.dart';
2
3/// [Channel] implementation for the Native world.
4final class _VmChannel implements Channel {
59 _VmChannel._(this._sendPort, this.logger, this.exceptionManager);
6
7 /// [SendPort] to communicate with the [Isolate] if the channel is owned by
8 /// the worker owner. Otherwise, [SendPort] to return values to the client.
9 final vm.SendPort _sendPort;
10
11 @override
12 final ExceptionManager exceptionManager;
13
14 @override
15 final Logger? logger;
16
17 bool _closed = false;
18
19 /// [Channel] serialization in Native world returns the [SendPort].
203 @override
213 PlatformChannel serialize() => _sendPort;
22
23 /// [Channel] sharing in Native world returns the same instance.
243 @override
25 Channel share() => this;
26
279 void _postRequest(WorkerRequest req) {
289 if (_closed) {
290 throw SquadronErrorExt.create('Channel is closed');
30 }
31 try {
3211 req.cancelToken?.ensureStarted();
3327 _sendPort.send(req.wrapInPlace());
34 } catch (ex, st) {
351 logger?.e(() => 'Failed to post request $req: $ex');
362 throw SquadronErrorExt.create('Failed to post request: $ex', st);
37 }
38 }
39
40 /// Sends a termination [WorkerRequest] to the [vm.Isolate].
419 @override
42 FutureOr<void> close() {
439 if (!_closed) {
4418 _postRequest(WorkerRequest.stop());
459 _closed = true;
46 }
47 }
48
49 /// Sends a close stream [WorkerRequest] to the [vm.Isolate].
503 @override
51 FutureOr<void> cancelStream(int streamId) {
523 if (!_closed) {
536 _postRequest(WorkerRequest.cancelStream(streamId));
54 }
55 }
56
57 /// Sends a cancel token [WorkerRequest] to the [vm.Isolate].
582 @override
59 FutureOr<void> cancelToken(SquadronCancelationToken? token) {
602 if (token != null && !_closed) {
614 _postRequest(WorkerRequest.cancel(token));
62 }
63 }
64
659 Stream<dynamic> _getResponseStream(
66 vm.ReceivePort port,
67 WorkerRequest req,
68 void Function(WorkerRequest) post, {
69 required bool streaming,
70 }) {
719 final command = req.command;
72
73 // send the request, return a stream of responses
749 Stream<WorkerResponse> $sendRequest() {
75 late final ForwardStreamController<WorkerResponse> controller;
76
779 void $forwardMessage(WorkerResponse msg) {
7818 if (!controller.isClosed) controller.add(msg);
79 }
80
811 void $forwardError(Object error, StackTrace? st) {
821 if (!controller.isClosed) {
832 controller.addError(SquadronException.from(error, st, command));
84 }
85 }
86
8718 controller = ForwardStreamController(onListen: () {
88 // do nothing if the controller is closed already
899 if (controller.isClosed) return;
90
91 // bind the controller
9227 controller.attachSubscription(port.cast<WorkerResponse>().listen(
93 $forwardMessage,
94 onError: $forwardError,
959 onDone: controller.close,
96 cancelOnError: false,
97 ));
98
99 // send the request
100 try {
1019 post(req);
102 } catch (ex, st) {
1031 $forwardError(ex, st);
1041 controller.close();
105 }
106 });
1079 return controller.stream;
108 }
109
110 // return a stream of decoded responses
11118 return ResultStream(this, req, $sendRequest, streaming).stream;
112 }
113
114 /// creates a [ReceivePort] and a [WorkerRequest] and sends it to the
115 /// [Isolate]. This method expects a single value from the [Isolate]
1168 @override
117 Future<dynamic> sendRequest(
118 int command,
119 List args, {
120 SquadronCancelationToken? token,
121 bool inspectRequest = false,
122 bool inspectResponse = false,
123 }) {
1248 final completer = Completer();
125 late final StreamSubscription sub;
126
1278 void $success(dynamic data) async {
1288 await sub.cancel();
12916 if (!completer.isCompleted) completer.complete(data);
130 }
131
1323 void $fail(Object ex, [StackTrace? st]) async {
1333 await sub.cancel();
1346 if (!completer.isCompleted) completer.completeError(ex, st);
135 }
136
1370 void $done() async {
1380 await sub.cancel();
1390 if (!completer.isCompleted) {
1400 $fail(WorkerException('No response from worker', null, command));
141 }
142 }
143
1448 final receiver = vm.ReceivePort();
1458 final req = WorkerRequest.userCommand(
1468 receiver.sendPort, command, args, token, inspectResponse);
14716 sub = _getResponseStream(receiver, req, _postRequest, streaming: false)
1488 .listen($success, onError: $fail, onDone: $done);
1498 return completer.future;
150 }
151
152 /// Creates a [ReceivePort] and a [WorkerRequest] and sends it to the
153 /// [Isolate]. This method expects a stream of values from the [Isolate].
154 /// The [Isolate] must send a [WorkerResponse.endOfStream] to close the
155 /// [Stream].
1565 @override
157 Stream<dynamic> sendStreamingRequest(
158 int command,
159 List args, {
160 SquadronCancelationToken? token,
161 bool inspectRequest = false,
162 bool inspectResponse = false,
163 }) {
1645 final receiver = vm.ReceivePort();
1655 final req = WorkerRequest.userCommand(
1665 receiver.sendPort, command, args, token, inspectResponse);
16710 return _getResponseStream(receiver, req, _postRequest, streaming: true);
168 }
169}
Choose Features