1 | | | part of '_channel.dart'; |
2 | | |
|
3 | | | /// [Channel] implementation for the Native world. |
4 | | | final class _VmChannel implements Channel { |
5 | | 9 | _VmChannel._(this._sendPort, this.logger, this.exceptionManager); |
6 | | |
|
7 | | | /// [SendPort] to communicate with the [Isolate] if the channel is owned by |
8 | | | /// the worker owner. Otherwise, [SendPort] to return values to the client. |
9 | | | final vm.SendPort _sendPort; |
10 | | |
|
11 | | | @override |
12 | | | final ExceptionManager exceptionManager; |
13 | | |
|
14 | | | @override |
15 | | | final Logger? logger; |
16 | | |
|
17 | | | bool _closed = false; |
18 | | |
|
19 | | | /// [Channel] serialization in Native world returns the [SendPort]. |
20 | | 3 | @override |
21 | | 3 | PlatformChannel serialize() => _sendPort; |
22 | | |
|
23 | | | /// [Channel] sharing in Native world returns the same instance. |
24 | | 3 | @override |
25 | | | Channel share() => this; |
26 | | |
|
27 | | 9 | void _postRequest(WorkerRequest req) { |
28 | | 9 | if (_closed) { |
29 | | 0 | throw SquadronErrorExt.create('Channel is closed'); |
30 | | | } |
31 | | | try { |
32 | | 11 | req.cancelToken?.ensureStarted(); |
33 | | 27 | _sendPort.send(req.wrapInPlace()); |
34 | | | } catch (ex, st) { |
35 | | 1 | logger?.e(() => 'Failed to post request $req: $ex'); |
36 | | 2 | throw SquadronErrorExt.create('Failed to post request: $ex', st); |
37 | | | } |
38 | | | } |
39 | | |
|
40 | | | /// Sends a termination [WorkerRequest] to the [vm.Isolate]. |
41 | | 9 | @override |
42 | | | FutureOr<void> close() { |
43 | | 9 | if (!_closed) { |
44 | | 18 | _postRequest(WorkerRequest.stop()); |
45 | | 9 | _closed = true; |
46 | | | } |
47 | | | } |
48 | | |
|
49 | | | /// Sends a close stream [WorkerRequest] to the [vm.Isolate]. |
50 | | 3 | @override |
51 | | | FutureOr<void> cancelStream(int streamId) { |
52 | | 3 | if (!_closed) { |
53 | | 6 | _postRequest(WorkerRequest.cancelStream(streamId)); |
54 | | | } |
55 | | | } |
56 | | |
|
57 | | | /// Sends a cancel token [WorkerRequest] to the [vm.Isolate]. |
58 | | 2 | @override |
59 | | | FutureOr<void> cancelToken(SquadronCancelationToken? token) { |
60 | | 2 | if (token != null && !_closed) { |
61 | | 4 | _postRequest(WorkerRequest.cancel(token)); |
62 | | | } |
63 | | | } |
64 | | |
|
65 | | 9 | Stream<dynamic> _getResponseStream( |
66 | | | vm.ReceivePort port, |
67 | | | WorkerRequest req, |
68 | | | void Function(WorkerRequest) post, { |
69 | | | required bool streaming, |
70 | | | }) { |
71 | | 9 | final command = req.command; |
72 | | |
|
73 | | | // send the request, return a stream of responses |
74 | | 9 | Stream<WorkerResponse> $sendRequest() { |
75 | | | late final ForwardStreamController<WorkerResponse> controller; |
76 | | |
|
77 | | 9 | void $forwardMessage(WorkerResponse msg) { |
78 | | 18 | if (!controller.isClosed) controller.add(msg); |
79 | | | } |
80 | | |
|
81 | | 1 | void $forwardError(Object error, StackTrace? st) { |
82 | | 1 | if (!controller.isClosed) { |
83 | | 2 | controller.addError(SquadronException.from(error, st, command)); |
84 | | | } |
85 | | | } |
86 | | |
|
87 | | 18 | controller = ForwardStreamController(onListen: () { |
88 | | | // do nothing if the controller is closed already |
89 | | 9 | if (controller.isClosed) return; |
90 | | |
|
91 | | | // bind the controller |
92 | | 27 | controller.attachSubscription(port.cast<WorkerResponse>().listen( |
93 | | | $forwardMessage, |
94 | | | onError: $forwardError, |
95 | | 9 | onDone: controller.close, |
96 | | | cancelOnError: false, |
97 | | | )); |
98 | | |
|
99 | | | // send the request |
100 | | | try { |
101 | | 9 | post(req); |
102 | | | } catch (ex, st) { |
103 | | 1 | $forwardError(ex, st); |
104 | | 1 | controller.close(); |
105 | | | } |
106 | | | }); |
107 | | 9 | return controller.stream; |
108 | | | } |
109 | | |
|
110 | | | // return a stream of decoded responses |
111 | | 18 | return ResultStream(this, req, $sendRequest, streaming).stream; |
112 | | | } |
113 | | |
|
114 | | | /// creates a [ReceivePort] and a [WorkerRequest] and sends it to the |
115 | | | /// [Isolate]. This method expects a single value from the [Isolate] |
116 | | 8 | @override |
117 | | | Future<dynamic> sendRequest( |
118 | | | int command, |
119 | | | List args, { |
120 | | | SquadronCancelationToken? token, |
121 | | | bool inspectRequest = false, |
122 | | | bool inspectResponse = false, |
123 | | | }) { |
124 | | 8 | final completer = Completer(); |
125 | | | late final StreamSubscription sub; |
126 | | |
|
127 | | 8 | void $success(dynamic data) async { |
128 | | 8 | await sub.cancel(); |
129 | | 16 | if (!completer.isCompleted) completer.complete(data); |
130 | | | } |
131 | | |
|
132 | | 3 | void $fail(Object ex, [StackTrace? st]) async { |
133 | | 3 | await sub.cancel(); |
134 | | 6 | if (!completer.isCompleted) completer.completeError(ex, st); |
135 | | | } |
136 | | |
|
137 | | 0 | void $done() async { |
138 | | 0 | await sub.cancel(); |
139 | | 0 | if (!completer.isCompleted) { |
140 | | 0 | $fail(WorkerException('No response from worker', null, command)); |
141 | | | } |
142 | | | } |
143 | | |
|
144 | | 8 | final receiver = vm.ReceivePort(); |
145 | | 8 | final req = WorkerRequest.userCommand( |
146 | | 8 | receiver.sendPort, command, args, token, inspectResponse); |
147 | | 16 | sub = _getResponseStream(receiver, req, _postRequest, streaming: false) |
148 | | 8 | .listen($success, onError: $fail, onDone: $done); |
149 | | 8 | return completer.future; |
150 | | | } |
151 | | |
|
152 | | | /// Creates a [ReceivePort] and a [WorkerRequest] and sends it to the |
153 | | | /// [Isolate]. This method expects a stream of values from the [Isolate]. |
154 | | | /// The [Isolate] must send a [WorkerResponse.endOfStream] to close the |
155 | | | /// [Stream]. |
156 | | 5 | @override |
157 | | | Stream<dynamic> sendStreamingRequest( |
158 | | | int command, |
159 | | | List args, { |
160 | | | SquadronCancelationToken? token, |
161 | | | bool inspectRequest = false, |
162 | | | bool inspectResponse = false, |
163 | | | }) { |
164 | | 5 | final receiver = vm.ReceivePort(); |
165 | | 5 | final req = WorkerRequest.userCommand( |
166 | | 5 | receiver.sendPort, command, args, token, inspectResponse); |
167 | | 10 | return _getResponseStream(receiver, req, _postRequest, streaming: true); |
168 | | | } |
169 | | | } |