1 | | | part of '_channel.dart'; |
2 | | |
|
3 | | | /// [Channel] implementation for the Native world. |
4 | | | final class _VmChannel implements Channel { |
5 | | 10 | _VmChannel._(this._sendPort, this.logger, this.exceptionManager); |
6 | | |
|
7 | | | /// [SendPort] to communicate with the [Isolate] if the channel is owned by |
8 | | | /// the worker owner. Otherwise, [SendPort] to return values to the client. |
9 | | | final vm.SendPort _sendPort; |
10 | | |
|
11 | | | PlatformThread? _thread; |
12 | | | final _activeConnections = <ForwardStreamController<WorkerResponse>>[]; |
13 | | |
|
14 | | | @override |
15 | | | final ExceptionManager exceptionManager; |
16 | | |
|
17 | | | @override |
18 | | | final Logger? logger; |
19 | | |
|
20 | | | bool _closed = false; |
21 | | |
|
22 | | | /// [Channel] serialization in Native world returns the [SendPort]. |
23 | | 3 | @override |
24 | | 3 | PlatformChannel serialize() => _sendPort; |
25 | | |
|
26 | | | /// [Channel] sharing in JavaScript world returns a [_VmForwardChannel]. |
27 | | 3 | @override |
28 | | 3 | Channel share() => _VmForwardChannel._( |
29 | | 12 | _sendPort, vm.ReceivePort(), logger, exceptionManager); |
30 | | |
|
31 | | 10 | void _postRequest(WorkerRequest req, {bool force = false}) { |
32 | | 10 | if (_closed && !force) { |
33 | | 0 | throw SquadronErrorImpl.create('Channel is closed'); |
34 | | | } |
35 | | | try { |
36 | | 12 | req.cancelToken?.ensureStarted(); |
37 | | 30 | _sendPort.send(req.wrapInPlace()); |
38 | | | } catch (ex, st) { |
39 | | 1 | logger?.e(() => 'Failed to post request $req: $ex'); |
40 | | 2 | throw SquadronErrorImpl.create('Failed to post request: $ex', st); |
41 | | | } |
42 | | | } |
43 | | |
|
44 | | | /// Sends a termination [WorkerRequest] to the [vm.Isolate]. |
45 | | 10 | @override |
46 | | | FutureOr<void> close() { |
47 | | 10 | if (!_closed) { |
48 | | 20 | _postRequest(WorkerRequest.stop()); |
49 | | 10 | _closed = true; |
50 | | | } |
51 | | | } |
52 | | |
|
53 | | | /// Sends a close stream [WorkerRequest] to the [vm.Isolate]. |
54 | | 3 | @override |
55 | | | FutureOr<void> cancelStream(int streamId) { |
56 | | 6 | _postRequest(WorkerRequest.cancelStream(streamId), force: true); |
57 | | | } |
58 | | |
|
59 | | | /// Sends a cancel token [WorkerRequest] to the [vm.Isolate]. |
60 | | 2 | @override |
61 | | | FutureOr<void> cancelToken(SquadronCancelationToken? token) { |
62 | | | if (token != null) { |
63 | | 4 | _postRequest(WorkerRequest.cancel(token), force: true); |
64 | | | } |
65 | | | } |
66 | | |
|
67 | | 10 | void _enter(ForwardStreamController<WorkerResponse> controller) { |
68 | | 20 | _activeConnections.add(controller); |
69 | | | } |
70 | | |
|
71 | | 1 | void _leave(ForwardStreamController<WorkerResponse> controller) { |
72 | | 2 | _activeConnections.remove(controller); |
73 | | 1 | controller.close(); |
74 | | | } |
75 | | |
|
76 | | 10 | Stream<dynamic> _getResponseStream( |
77 | | | vm.ReceivePort port, |
78 | | | WorkerRequest req, |
79 | | | void Function(WorkerRequest) post, { |
80 | | | required bool streaming, |
81 | | | }) { |
82 | | 10 | final command = req.command; |
83 | | |
|
84 | | | // send the request, return a stream of responses |
85 | | 10 | Stream<WorkerResponse> $sendRequest() { |
86 | | | late final ForwardStreamController<WorkerResponse> controller; |
87 | | |
|
88 | | 10 | void $forwardMessage(WorkerResponse msg) { |
89 | | 20 | if (!controller.isClosed) controller.add(msg); |
90 | | | } |
91 | | |
|
92 | | 1 | void $forwardError(Object error, StackTrace? st) { |
93 | | 1 | if (!controller.isClosed) { |
94 | | 2 | controller.addError(SquadronException.from(error, st, command)); |
95 | | | } |
96 | | | } |
97 | | |
|
98 | | 0 | void $done() { |
99 | | 0 | _leave(controller); |
100 | | | } |
101 | | |
|
102 | | 20 | controller = ForwardStreamController(onListen: () { |
103 | | | // do nothing if the controller is closed already |
104 | | 10 | if (controller.isClosed) return; |
105 | | |
|
106 | | | // bind the controller |
107 | | 30 | controller.attachSubscription(port.cast<WorkerResponse>().listen( |
108 | | | $forwardMessage, |
109 | | | onError: $forwardError, |
110 | | | onDone: $done, |
111 | | | cancelOnError: false, |
112 | | | )); |
113 | | |
|
114 | | | // send the request |
115 | | | try { |
116 | | 10 | _enter(controller); |
117 | | 10 | post(req); |
118 | | | } catch (ex, st) { |
119 | | 1 | $forwardError(ex, st); |
120 | | 1 | _leave(controller); |
121 | | | } |
122 | | | }); |
123 | | 10 | return controller.stream; |
124 | | | } |
125 | | |
|
126 | | | // return a stream of decoded responses |
127 | | 10 | final res = ResultStream(this, req, $sendRequest, streaming); |
128 | | 50 | res.done.whenComplete(() => port.close()).ignore(); |
129 | | 10 | return res.stream; |
130 | | | } |
131 | | |
|
132 | | | /// creates a [ReceivePort] and a [WorkerRequest] and sends it to the |
133 | | | /// [Isolate]. This method expects a single value from the [Isolate] |
134 | | 10 | @override |
135 | | | Future<dynamic> sendRequest( |
136 | | | int command, |
137 | | | List args, { |
138 | | | SquadronCancelationToken? token, |
139 | | | bool inspectRequest = false, |
140 | | | bool inspectResponse = false, |
141 | | | }) { |
142 | | 10 | final completer = Completer(); |
143 | | | late final StreamSubscription sub; |
144 | | |
|
145 | | 10 | void $success(dynamic data) async { |
146 | | 10 | await sub.cancel(); |
147 | | 20 | if (!completer.isCompleted) completer.complete(data); |
148 | | | } |
149 | | |
|
150 | | 4 | void $fail(Object ex, [StackTrace? st]) async { |
151 | | 4 | await sub.cancel(); |
152 | | 8 | if (!completer.isCompleted) completer.completeError(ex, st); |
153 | | | } |
154 | | |
|
155 | | 0 | void $done() async { |
156 | | 0 | await sub.cancel(); |
157 | | 0 | if (!completer.isCompleted) { |
158 | | 0 | $fail(WorkerException('No response from worker', null, command)); |
159 | | | } |
160 | | | } |
161 | | |
|
162 | | 10 | final receiver = vm.ReceivePort(); |
163 | | 10 | final req = WorkerRequest.userCommand( |
164 | | 10 | receiver.sendPort, command, args, token, inspectResponse); |
165 | | 20 | sub = _getResponseStream(receiver, req, _postRequest, streaming: false) |
166 | | 10 | .listen($success, onError: $fail, onDone: $done); |
167 | | 10 | return completer.future; |
168 | | | } |
169 | | |
|
170 | | | /// Creates a [ReceivePort] and a [WorkerRequest] and sends it to the |
171 | | | /// [Isolate]. This method expects a stream of values from the [Isolate]. |
172 | | | /// The [Isolate] must send a [WorkerResponse.endOfStream] to close the |
173 | | | /// [Stream]. |
174 | | 5 | @override |
175 | | | Stream<dynamic> sendStreamingRequest( |
176 | | | int command, |
177 | | | List args, { |
178 | | | SquadronCancelationToken? token, |
179 | | | bool inspectRequest = false, |
180 | | | bool inspectResponse = false, |
181 | | | }) { |
182 | | 5 | final receiver = vm.ReceivePort(); |
183 | | 5 | final req = WorkerRequest.userCommand( |
184 | | 5 | receiver.sendPort, command, args, token, inspectResponse); |
185 | | 10 | return _getResponseStream(receiver, req, _postRequest, streaming: true); |
186 | | | } |
187 | | | } |