1 | | | part of '_channel.dart'; |
2 | | |
|
3 | | | /// [Channel] implementation for the JavaScript world. |
4 | | | final class _WebChannel implements Channel { |
5 | | 9 | _WebChannel._(this._sendPort, this.logger, this.exceptionManager); |
6 | | |
|
7 | | | /// [web.MessagePort] to communicate with the [web.Worker] if the channel is owned by the worker owner. Otherwise, |
8 | | | /// [web.MessagePort] to return values to the client. |
9 | | | final web.MessagePort _sendPort; |
10 | | |
|
11 | | | @override |
12 | | | final ExceptionManager exceptionManager; |
13 | | |
|
14 | | | @override |
15 | | | final Logger? logger; |
16 | | |
|
17 | | | bool _closed = false; |
18 | | |
|
19 | | | /// [Channel] serialization in JavaScript world returns the [web.MessagePort]. |
20 | | | @override |
21 | | | PlatformChannel serialize() => _sendPort; |
22 | | |
|
23 | | | /// [Channel] sharing in JavaScript world returns a [_WebForwardChannel]. |
24 | | | @override |
25 | | 2 | Channel share() => _WebForwardChannel._( |
26 | | 3 | _sendPort, web.MessageChannel(), logger, exceptionManager); |
27 | | |
|
28 | | 9 | void _postRequest(WorkerRequest req) { |
29 | | 9 | if (_closed) { |
30 | | 0 | throw SquadronErrorExt.create('Channel is closed'); |
31 | | | } |
32 | | 1 | try { |
33 | | 2 | req.cancelToken?.ensureStarted(); |
34 | | 9 | final data = req.wrapInPlace(); |
35 | | | final msg = data.jsify(); |
36 | | 9 | final transfer = Transferables.get([req.channelInfo]); |
37 | | 9 | if (transfer == null || transfer.isEmpty) { |
38 | | 9 | _sendPort.postMessage(msg); |
39 | | | } else { |
40 | | 8 | final jsTransfer = transfer.jsify() as JSArray; |
41 | | | _sendPort.postMessage(msg, jsTransfer); |
42 | | | } |
43 | | 1 | } catch (ex, st) { |
44 | | 3 | logger?.e(() => 'Failed to post request $req: $ex'); |
45 | | 1 | throw SquadronErrorExt.create('Failed to post request: $ex', st); |
46 | | | } |
47 | | 9 | } |
48 | | |
|
49 | | 2 | void _inspectAndPostRequest(WorkerRequest req) { |
50 | | 2 | if (_closed) { |
51 | | 0 | throw SquadronErrorExt.create('Channel is closed'); |
52 | | | } |
53 | | 0 | req.cancelToken?.ensureStarted(); |
54 | | | req.cancelToken?.throwIfCanceled(); |
55 | | 1 | try { |
56 | | 2 | final data = req.wrapInPlace(); |
57 | | | final msg = data.jsify(); |
58 | | 2 | final transfer = Transferables.get(data); |
59 | | 2 | if (transfer == null || transfer.isEmpty) { |
60 | | 2 | _sendPort.postMessage(msg); |
61 | | | } else { |
62 | | 2 | final jsTransfer = transfer.jsify() as JSArray; |
63 | | | _sendPort.postMessage(msg, jsTransfer); |
64 | | | } |
65 | | 1 | } catch (ex, st) { |
66 | | | logger?.e(() => 'Failed to post request $req: $ex'); |
67 | | 1 | throw SquadronErrorExt.create('Failed to post request: $ex', st); |
68 | | | } |
69 | | 2 | } |
70 | | |
|
71 | | | /// Sends a termination [WorkerRequest] to the [web.Worker]. |
72 | | | @override |
73 | | 9 | FutureOr<void> close() { |
74 | | 9 | if (!_closed) { |
75 | | 9 | _postRequest(WorkerRequest.stop()); |
76 | | 9 | _closed = true; |
77 | | | } |
78 | | 9 | } |
79 | | |
|
80 | | | /// Sends a close stream [WorkerRequest] to the [web.Worker]. |
81 | | | @override |
82 | | | FutureOr<void> cancelStream(int streamId) { |
83 | | 3 | if (!_closed) { |
84 | | 3 | _postRequest(WorkerRequest.cancelStream(streamId)); |
85 | | | } |
86 | | | } |
87 | | |
|
88 | | | /// Sends a cancel token [WorkerRequest] to the [web.Worker]. |
89 | | | @override |
90 | | 2 | FutureOr<void> cancelToken(SquadronCancelationToken? token) { |
91 | | 2 | if (token != null && !_closed) { |
92 | | 2 | _postRequest(WorkerRequest.cancel(token)); |
93 | | | } |
94 | | 2 | } |
95 | | |
|
96 | | 9 | Stream _getResponseStream( |
97 | | | web.MessageChannel com, |
98 | | | WorkerRequest req, |
99 | | | void Function(WorkerRequest) post, { |
100 | | | required bool streaming, |
101 | | | }) { |
102 | | 9 | final command = req.command; |
103 | | |
|
104 | | | // return a stream of responses |
105 | | 9 | Stream<WorkerResponse> $sendRequest() { |
106 | | 9 | StreamController<WorkerResponse>? controller; |
107 | | |
|
108 | | 10 | void $forwardMessage(WorkerResponse msg) => controller?.add(msg); |
109 | | |
|
110 | | 9 | void $forwardError(Object error, StackTrace? st) => |
111 | | 2 | controller?.addError(SquadronException.from(error, st, command)); |
112 | | |
|
113 | | | final buffer = EventBuffer($forwardMessage, $forwardError); |
114 | | |
|
115 | | 9 | Future<void> $close() async { |
116 | | 9 | com.port1.close(); |
117 | | | com.port2.close(); |
118 | | 9 | final future = controller?.close(); |
119 | | 9 | controller = null; |
120 | | 9 | await future; |
121 | | 9 | } |
122 | | |
|
123 | | 9 | controller = StreamController<WorkerResponse>( |
124 | | 9 | onListen: () { |
125 | | | // do nothing if the controller is closed already |
126 | | 9 | if (controller == null) return; |
127 | | |
|
128 | | | // bind the controller |
129 | | 9 | com.port1.onmessageerror = (web.ErrorEvent e) { |
130 | | 0 | final ex = SquadronException.from(getError(e), null, command); |
131 | | 0 | final handler = buffer.isActive ? buffer.addError : $forwardError; |
132 | | 0 | handler(ex, null); |
133 | | 9 | }.toJS; |
134 | | |
|
135 | | 10 | com.port1.onmessage = (web.MessageEvent e) { |
136 | | 10 | final res = WorkerResponseExt.from(getMessageEventData(e)!); |
137 | | 10 | final handler = buffer.isActive ? buffer.add : $forwardMessage; |
138 | | 10 | handler(res); |
139 | | 10 | }.toJS; |
140 | | |
|
141 | | | // send the request |
142 | | 2 | try { |
143 | | 9 | post(req); |
144 | | 2 | } catch (ex, st) { |
145 | | | if (buffer.isActive) { |
146 | | 0 | buffer.addError(ex, st); |
147 | | 2 | buffer.onDeactivate = $close; |
148 | | | } else { |
149 | | 2 | $forwardError(ex, st); |
150 | | 2 | $close(); |
151 | | | } |
152 | | | } |
153 | | 9 | }, |
154 | | 9 | onPause: buffer.activate, |
155 | | 9 | onResume: buffer.deactivate, |
156 | | | onCancel: $close, |
157 | | | ); |
158 | | |
|
159 | | 9 | return controller!.stream; |
160 | | 9 | } |
161 | | |
|
162 | | | // return a stream of decoded responses |
163 | | 9 | return ResultStream(this, req, $sendRequest, streaming).stream; |
164 | | 9 | } |
165 | | |
|
166 | | | /// Creates a [web.MessageChannel] and a [WorkerRequest] and sends it to the [web.Worker]. This method expects a |
167 | | | /// single value from the [web.Worker]. |
168 | | | @override |
169 | | 8 | Future<dynamic> sendRequest( |
170 | | | int command, |
171 | | | List args, { |
172 | | | SquadronCancelationToken? token, |
173 | | | bool inspectRequest = false, |
174 | | | bool inspectResponse = false, |
175 | | | }) { |
176 | | | final completer = Completer(); |
177 | | 8 | late final StreamSubscription sub; |
178 | | |
|
179 | | 8 | void $success(dynamic data) async { |
180 | | 8 | await sub.cancel(); |
181 | | 8 | if (!completer.isCompleted) completer.complete(data); |
182 | | 8 | } |
183 | | |
|
184 | | 8 | void $fail(Object ex, [StackTrace? st]) async { |
185 | | 4 | await sub.cancel(); |
186 | | 4 | if (!completer.isCompleted) completer.completeError(ex, st); |
187 | | 4 | } |
188 | | |
|
189 | | 8 | void $done() async { |
190 | | 0 | await sub.cancel(); |
191 | | 0 | if (!completer.isCompleted) { |
192 | | 0 | $fail(WorkerException('No response from worker', null, command)); |
193 | | | } |
194 | | 0 | } |
195 | | |
|
196 | | | final com = web.MessageChannel(); |
197 | | | final req = WorkerRequest.userCommand( |
198 | | | com.port2, command, args, token, inspectResponse); |
199 | | 8 | final post = inspectRequest ? _inspectAndPostRequest : _postRequest; |
200 | | 8 | sub = _getResponseStream(com, req, post, streaming: false) |
201 | | 8 | .listen($success, onError: $fail, onDone: $done); |
202 | | 8 | return completer.future; |
203 | | 8 | } |
204 | | |
|
205 | | | /// Creates a [web.MessageChannel] and a [WorkerRequest] and sends it to the [web.Worker]. This method expects a |
206 | | | /// stream of values from the [web.Worker]. The [web.Worker] must send a [WorkerResponse.endOfStream] to close |
207 | | | /// the [Stream]. |
208 | | | @override |
209 | | 5 | Stream<dynamic> sendStreamingRequest( |
210 | | | int command, |
211 | | | List args, { |
212 | | | SquadronCancelationToken? token, |
213 | | | bool inspectRequest = false, |
214 | | | bool inspectResponse = false, |
215 | | | }) { |
216 | | | final com = web.MessageChannel(); |
217 | | | final req = WorkerRequest.userCommand( |
218 | | | com.port2, command, args, token, inspectResponse); |
219 | | 5 | final post = inspectRequest ? _inspectAndPostRequest : _postRequest; |
220 | | 5 | return _getResponseStream(com, req, post, streaming: true); |
221 | | 5 | } |
222 | | | } |
223 | | |
|
224 | | | /// [Channel] used to communicate between [web.Worker]s. Creates a |
225 | | | /// [web.MessageChannel] to receive commands on |
226 | | | /// [web.MessageChannel.port2] and forwards them to the worker's [web.MessagePort] via [web.MessageChannel.port1]. |
227 | | | final class _WebForwardChannel extends _WebChannel { |
228 | | | /// [_remote] is the worker's [web.MessagePort], [_com] is the intermediate |
229 | | | /// message channel |
230 | | 3 | _WebForwardChannel._(this._remote, this._com, Logger? logger, |
231 | | | ExceptionManager exceptionManager) |
232 | | | : super._(_com.port2, logger, exceptionManager) { |
233 | | 3 | _com.port1.onmessage = _forward.toJS; |
234 | | | } |
235 | | |
|
236 | | | /// [web.MessagePort] to the worker. |
237 | | | final web.MessagePort _remote; |
238 | | |
|
239 | | | /// [web.MessageChannel] used for forwarding messages. |
240 | | | final web.MessageChannel _com; |
241 | | |
|
242 | | | /// Forwards [web.MessageEvent.data] to the worker. |
243 | | 3 | void _forward(web.MessageEvent e) { |
244 | | 3 | if (_closed) { |
245 | | 0 | throw SquadronErrorExt.create('Channel is closed'); |
246 | | | } |
247 | | 0 | try { |
248 | | 3 | final data = getMessageEventData(e) as List; |
249 | | 3 | final transfer = Transferables.get(data); |
250 | | 3 | if (transfer == null || transfer.isEmpty) { |
251 | | 3 | _remote.postMessage(e.data); |
252 | | | } else { |
253 | | 3 | final jsTransfer = transfer.jsify() as JSArray; |
254 | | | _remote.postMessage(e.data, jsTransfer); |
255 | | | } |
256 | | 0 | } catch (ex, st) { |
257 | | 1 | logger?.e(() => 'Failed to post request $e: $ex'); |
258 | | 0 | throw SquadronErrorExt.create('Failed to post request: $ex', st); |
259 | | | } |
260 | | 3 | } |
261 | | |
|
262 | | | /// Closes this [Channel], effectively stopping message forwarding. |
263 | | | @override |
264 | | 0 | void close() { |
265 | | 0 | if (!_closed) { |
266 | | 0 | _com.port1.close(); |
267 | | 0 | _closed = true; |
268 | | | } |
269 | | 0 | } |
270 | | | } |