LCOV - code coverage report

Current view
top level - src/_impl/web - _channel_impl.dart
Test
lcov.info
Date
2024-11-13
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines10512584.0%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1part of '_channel.dart';
2
3/// [Channel] implementation for the JavaScript world.
4final class _WebChannel implements Channel {
59 _WebChannel._(this._sendPort, this.logger, this.exceptionManager);
6
7 /// [web.MessagePort] to communicate with the [web.Worker] if the channel is owned by the worker owner. Otherwise,
8 /// [web.MessagePort] to return values to the client.
9 final web.MessagePort _sendPort;
10
11 @override
12 final ExceptionManager exceptionManager;
13
14 @override
15 final Logger? logger;
16
17 bool _closed = false;
18
19 /// [Channel] serialization in JavaScript world returns the [web.MessagePort].
20 @override
21 PlatformChannel serialize() => _sendPort;
22
23 /// [Channel] sharing in JavaScript world returns a [_WebForwardChannel].
24 @override
252 Channel share() => _WebForwardChannel._(
263 _sendPort, web.MessageChannel(), logger, exceptionManager);
27
289 void _postRequest(WorkerRequest req) {
299 if (_closed) {
300 throw SquadronErrorExt.create('Channel is closed');
31 }
321 try {
332 req.cancelToken?.ensureStarted();
349 final data = req.wrapInPlace();
35 final msg = data.jsify();
369 final transfer = Transferables.get([req.channelInfo]);
379 if (transfer == null || transfer.isEmpty) {
389 _sendPort.postMessage(msg);
39 } else {
408 final jsTransfer = transfer.jsify() as JSArray;
41 _sendPort.postMessage(msg, jsTransfer);
42 }
431 } catch (ex, st) {
443 logger?.e(() => 'Failed to post request $req: $ex');
451 throw SquadronErrorExt.create('Failed to post request: $ex', st);
46 }
479 }
48
492 void _inspectAndPostRequest(WorkerRequest req) {
502 if (_closed) {
510 throw SquadronErrorExt.create('Channel is closed');
52 }
530 req.cancelToken?.ensureStarted();
54 req.cancelToken?.throwIfCanceled();
551 try {
562 final data = req.wrapInPlace();
57 final msg = data.jsify();
582 final transfer = Transferables.get(data);
592 if (transfer == null || transfer.isEmpty) {
602 _sendPort.postMessage(msg);
61 } else {
622 final jsTransfer = transfer.jsify() as JSArray;
63 _sendPort.postMessage(msg, jsTransfer);
64 }
651 } catch (ex, st) {
66 logger?.e(() => 'Failed to post request $req: $ex');
671 throw SquadronErrorExt.create('Failed to post request: $ex', st);
68 }
692 }
70
71 /// Sends a termination [WorkerRequest] to the [web.Worker].
72 @override
739 FutureOr<void> close() {
749 if (!_closed) {
759 _postRequest(WorkerRequest.stop());
769 _closed = true;
77 }
789 }
79
80 /// Sends a close stream [WorkerRequest] to the [web.Worker].
81 @override
82 FutureOr<void> cancelStream(int streamId) {
833 if (!_closed) {
843 _postRequest(WorkerRequest.cancelStream(streamId));
85 }
86 }
87
88 /// Sends a cancel token [WorkerRequest] to the [web.Worker].
89 @override
902 FutureOr<void> cancelToken(SquadronCancelationToken? token) {
912 if (token != null && !_closed) {
922 _postRequest(WorkerRequest.cancel(token));
93 }
942 }
95
969 Stream _getResponseStream(
97 web.MessageChannel com,
98 WorkerRequest req,
99 void Function(WorkerRequest) post, {
100 required bool streaming,
101 }) {
1029 final command = req.command;
103
104 // return a stream of responses
1059 Stream<WorkerResponse> $sendRequest() {
1069 StreamController<WorkerResponse>? controller;
107
10810 void $forwardMessage(WorkerResponse msg) => controller?.add(msg);
109
1109 void $forwardError(Object error, StackTrace? st) =>
1112 controller?.addError(SquadronException.from(error, st, command));
112
113 final buffer = EventBuffer($forwardMessage, $forwardError);
114
1159 Future<void> $close() async {
1169 com.port1.close();
117 com.port2.close();
1189 final future = controller?.close();
1199 controller = null;
1209 await future;
1219 }
122
1239 controller = StreamController<WorkerResponse>(
1249 onListen: () {
125 // do nothing if the controller is closed already
1269 if (controller == null) return;
127
128 // bind the controller
1299 com.port1.onmessageerror = (web.ErrorEvent e) {
1300 final ex = SquadronException.from(getError(e), null, command);
1310 final handler = buffer.isActive ? buffer.addError : $forwardError;
1320 handler(ex, null);
1339 }.toJS;
134
13510 com.port1.onmessage = (web.MessageEvent e) {
13610 final res = WorkerResponseExt.from(getMessageEventData(e)!);
13710 final handler = buffer.isActive ? buffer.add : $forwardMessage;
13810 handler(res);
13910 }.toJS;
140
141 // send the request
1422 try {
1439 post(req);
1442 } catch (ex, st) {
145 if (buffer.isActive) {
1460 buffer.addError(ex, st);
1472 buffer.onDeactivate = $close;
148 } else {
1492 $forwardError(ex, st);
1502 $close();
151 }
152 }
1539 },
1549 onPause: buffer.activate,
1559 onResume: buffer.deactivate,
156 onCancel: $close,
157 );
158
1599 return controller!.stream;
1609 }
161
162 // return a stream of decoded responses
1639 return ResultStream(this, req, $sendRequest, streaming).stream;
1649 }
165
166 /// Creates a [web.MessageChannel] and a [WorkerRequest] and sends it to the [web.Worker]. This method expects a
167 /// single value from the [web.Worker].
168 @override
1698 Future<dynamic> sendRequest(
170 int command,
171 List args, {
172 SquadronCancelationToken? token,
173 bool inspectRequest = false,
174 bool inspectResponse = false,
175 }) {
176 final completer = Completer();
1778 late final StreamSubscription sub;
178
1798 void $success(dynamic data) async {
1808 await sub.cancel();
1818 if (!completer.isCompleted) completer.complete(data);
1828 }
183
1848 void $fail(Object ex, [StackTrace? st]) async {
1854 await sub.cancel();
1864 if (!completer.isCompleted) completer.completeError(ex, st);
1874 }
188
1898 void $done() async {
1900 await sub.cancel();
1910 if (!completer.isCompleted) {
1920 $fail(WorkerException('No response from worker', null, command));
193 }
1940 }
195
196 final com = web.MessageChannel();
197 final req = WorkerRequest.userCommand(
198 com.port2, command, args, token, inspectResponse);
1998 final post = inspectRequest ? _inspectAndPostRequest : _postRequest;
2008 sub = _getResponseStream(com, req, post, streaming: false)
2018 .listen($success, onError: $fail, onDone: $done);
2028 return completer.future;
2038 }
204
205 /// Creates a [web.MessageChannel] and a [WorkerRequest] and sends it to the [web.Worker]. This method expects a
206 /// stream of values from the [web.Worker]. The [web.Worker] must send a [WorkerResponse.endOfStream] to close
207 /// the [Stream].
208 @override
2095 Stream<dynamic> sendStreamingRequest(
210 int command,
211 List args, {
212 SquadronCancelationToken? token,
213 bool inspectRequest = false,
214 bool inspectResponse = false,
215 }) {
216 final com = web.MessageChannel();
217 final req = WorkerRequest.userCommand(
218 com.port2, command, args, token, inspectResponse);
2195 final post = inspectRequest ? _inspectAndPostRequest : _postRequest;
2205 return _getResponseStream(com, req, post, streaming: true);
2215 }
222}
223
224/// [Channel] used to communicate between [web.Worker]s. Creates a
225/// [web.MessageChannel] to receive commands on
226/// [web.MessageChannel.port2] and forwards them to the worker's [web.MessagePort] via [web.MessageChannel.port1].
227final class _WebForwardChannel extends _WebChannel {
228 /// [_remote] is the worker's [web.MessagePort], [_com] is the intermediate
229 /// message channel
2303 _WebForwardChannel._(this._remote, this._com, Logger? logger,
231 ExceptionManager exceptionManager)
232 : super._(_com.port2, logger, exceptionManager) {
2333 _com.port1.onmessage = _forward.toJS;
234 }
235
236 /// [web.MessagePort] to the worker.
237 final web.MessagePort _remote;
238
239 /// [web.MessageChannel] used for forwarding messages.
240 final web.MessageChannel _com;
241
242 /// Forwards [web.MessageEvent.data] to the worker.
2433 void _forward(web.MessageEvent e) {
2443 if (_closed) {
2450 throw SquadronErrorExt.create('Channel is closed');
246 }
2470 try {
2483 final data = getMessageEventData(e) as List;
2493 final transfer = Transferables.get(data);
2503 if (transfer == null || transfer.isEmpty) {
2513 _remote.postMessage(e.data);
252 } else {
2533 final jsTransfer = transfer.jsify() as JSArray;
254 _remote.postMessage(e.data, jsTransfer);
255 }
2560 } catch (ex, st) {
2571 logger?.e(() => 'Failed to post request $e: $ex');
2580 throw SquadronErrorExt.create('Failed to post request: $ex', st);
259 }
2603 }
261
262 /// Closes this [Channel], effectively stopping message forwarding.
263 @override
2640 void close() {
2650 if (!_closed) {
2660 _com.port1.close();
2670 _closed = true;
268 }
2690 }
270}
Choose Features