| 1 | | | part of '_channel.dart'; |
| 2 | | | |
| 3 | | | /// [Channel] implementation for the Native world. |
| 4 | | | final class _VmChannel implements Channel { |
| 5 | | 10 | _VmChannel._(this._sendPort, this.logger, this.exceptionManager); |
| 6 | | | |
| 7 | | | /// [SendPort] to communicate with the [Isolate] if the channel is owned by |
| 8 | | | /// the worker owner. Otherwise, [SendPort] to return values to the client. |
| 9 | | | final vm.SendPort _sendPort; |
| 10 | | | |
| 11 | | | PlatformThread? _thread; |
| 12 | | | final _activeConnections = <ForwardStreamController<WorkerResponse>>[]; |
| 13 | | | |
| 14 | | | @override |
| 15 | | | final ExceptionManager exceptionManager; |
| 16 | | | |
| 17 | | | @override |
| 18 | | | final Logger? logger; |
| 19 | | | |
| 20 | | | bool _closed = false; |
| 21 | | | |
| 22 | | | /// [Channel] serialization in Native world returns the [SendPort]. |
| 23 | | 3 | @override |
| 24 | | 3 | PlatformChannel serialize() => _sendPort; |
| 25 | | | |
| 26 | | | /// [Channel] sharing in JavaScript world returns a [_VmForwardChannel]. |
| 27 | | 3 | @override |
| 28 | | 3 | Channel share() => _VmForwardChannel._( |
| 29 | | 12 | _sendPort, vm.ReceivePort(), logger, exceptionManager); |
| 30 | | | |
| 31 | | 10 | void _postRequest(WorkerRequest req, {bool force = false}) { |
| 32 | | 10 | if (_closed && !force) { |
| 33 | | 0 | throw SquadronErrorImpl.create('Channel is closed'); |
| 34 | | | } |
| 35 | | | try { |
| 36 | | 12 | req.cancelToken?.ensureStarted(); |
| 37 | | 30 | _sendPort.send(req.wrapInPlace()); |
| 38 | | | } catch (ex, st) { |
| 39 | | 1 | logger?.e(() => 'Failed to post request $req: $ex'); |
| 40 | | 2 | throw SquadronErrorImpl.create('Failed to post request: $ex', st); |
| 41 | | | } |
| 42 | | | } |
| 43 | | | |
| 44 | | | /// Sends a termination [WorkerRequest] to the [vm.Isolate]. |
| 45 | | 10 | @override |
| 46 | | | FutureOr<void> close() { |
| 47 | | 10 | if (!_closed) { |
| 48 | | 20 | _postRequest(WorkerRequest.stop()); |
| 49 | | 10 | _closed = true; |
| 50 | | | } |
| 51 | | | } |
| 52 | | | |
| 53 | | | /// Sends a close stream [WorkerRequest] to the [vm.Isolate]. |
| 54 | | 4 | @override |
| 55 | | | FutureOr<void> cancelStream(int streamId) { |
| 56 | | 8 | _postRequest(WorkerRequest.cancelStream(streamId), force: true); |
| 57 | | | } |
| 58 | | | |
| 59 | | | /// Sends a cancel token [WorkerRequest] to the [vm.Isolate]. |
| 60 | | 2 | @override |
| 61 | | | FutureOr<void> cancelToken(SquadronCancelationToken? token) { |
| 62 | | | if (token != null) { |
| 63 | | 4 | _postRequest(WorkerRequest.cancel(token), force: true); |
| 64 | | | } |
| 65 | | | } |
| 66 | | | |
| 67 | | 10 | void _enter(ForwardStreamController<WorkerResponse> controller) { |
| 68 | | 20 | _activeConnections.add(controller); |
| 69 | | | } |
| 70 | | | |
| 71 | | 10 | void _leave(ForwardStreamController<WorkerResponse> controller) { |
| 72 | | 20 | _activeConnections.remove(controller); |
| 73 | | 10 | controller.close(); |
| 74 | | | } |
| 75 | | | |
| 76 | | 10 | Stream<dynamic> _getResponseStream( |
| 77 | | | vm.ReceivePort port, |
| 78 | | | WorkerRequest req, |
| 79 | | | void Function(WorkerRequest) post, { |
| 80 | | | required bool streaming, |
| 81 | | | }) { |
| 82 | | 10 | final command = req.command; |
| 83 | | | |
| 84 | | | // send the request, return a stream of responses |
| 85 | | 10 | Stream<WorkerResponse> $sendRequest() { |
| 86 | | | late final ForwardStreamController<WorkerResponse> controller; |
| 87 | | | |
| 88 | | 10 | void $forwardMessage(WorkerResponse msg) { |
| 89 | | 20 | if (!controller.isClosed) controller.add(msg); |
| 90 | | | } |
| 91 | | | |
| 92 | | 1 | void $forwardError(Object error, [StackTrace? st]) { |
| 93 | | 1 | if (!controller.isClosed) { |
| 94 | | 2 | controller.addError(SquadronException.from(error, st, command)); |
| 95 | | | } |
| 96 | | | } |
| 97 | | | |
| 98 | | 0 | void $done() { |
| 99 | | 0 | _leave(controller); |
| 100 | | | } |
| 101 | | | |
| 102 | | 20 | controller = ForwardStreamController(onListen: () { |
| 103 | | | // do nothing if the controller is closed already |
| 104 | | 10 | if (controller.isClosed) return; |
| 105 | | | |
| 106 | | | // bind the controller |
| 107 | | 30 | controller.attachSubscription(port.cast<WorkerResponse>().listen( |
| 108 | | | $forwardMessage, |
| 109 | | | onError: $forwardError, |
| 110 | | | onDone: $done, |
| 111 | | | cancelOnError: false, |
| 112 | | | )); |
| 113 | | | |
| 114 | | | // send the request |
| 115 | | | try { |
| 116 | | 10 | _enter(controller); |
| 117 | | 10 | post(req); |
| 118 | | | } catch (ex, st) { |
| 119 | | 1 | $forwardError(ex, st); |
| 120 | | 1 | _leave(controller); |
| 121 | | | } |
| 122 | | 10 | }, onCancel: () { |
| 123 | | 10 | _leave(controller); |
| 124 | | | }); |
| 125 | | 10 | return controller.stream; |
| 126 | | | } |
| 127 | | | |
| 128 | | | // return a stream of decoded responses |
| 129 | | 10 | final res = ResultStream(this, req, $sendRequest, streaming); |
| 130 | | 50 | res.done.whenComplete(() => port.close()).ignore(); |
| 131 | | 10 | return res.stream; |
| 132 | | | } |
| 133 | | | |
| 134 | | | /// creates a [ReceivePort] and a [WorkerRequest] and sends it to the |
| 135 | | | /// [Isolate]. This method expects a single value from the [Isolate] |
| 136 | | 10 | @override |
| 137 | | | Future<dynamic> sendRequest( |
| 138 | | | int command, |
| 139 | | | List args, { |
| 140 | | | SquadronCancelationToken? token, |
| 141 | | | bool inspectRequest = false, |
| 142 | | | bool inspectResponse = false, |
| 143 | | | }) { |
| 144 | | 10 | final completer = Completer(); |
| 145 | | | late final StreamSubscription sub; |
| 146 | | | |
| 147 | | 10 | void $success(dynamic data) async { |
| 148 | | 10 | await sub.cancel(); |
| 149 | | 20 | if (!completer.isCompleted) completer.complete(data); |
| 150 | | | } |
| 151 | | | |
| 152 | | 4 | void $fail(Object ex, [StackTrace? st]) async { |
| 153 | | 4 | await sub.cancel(); |
| 154 | | 8 | if (!completer.isCompleted) completer.completeError(ex, st); |
| 155 | | | } |
| 156 | | | |
| 157 | | 0 | void $done() async { |
| 158 | | 0 | await sub.cancel(); |
| 159 | | 0 | if (!completer.isCompleted) { |
| 160 | | 0 | $fail(WorkerException('No response from worker', null, command)); |
| 161 | | | } |
| 162 | | | } |
| 163 | | | |
| 164 | | 10 | final receiver = vm.ReceivePort(); |
| 165 | | 10 | final req = WorkerRequest.userCommand( |
| 166 | | 10 | receiver.sendPort, command, args, token, inspectResponse); |
| 167 | | 20 | sub = _getResponseStream(receiver, req, _postRequest, streaming: false) |
| 168 | | 10 | .listen($success, onError: $fail, onDone: $done); |
| 169 | | 10 | return completer.future; |
| 170 | | | } |
| 171 | | | |
| 172 | | | /// Creates a [ReceivePort] and a [WorkerRequest] and sends it to the |
| 173 | | | /// [Isolate]. This method expects a stream of values from the [Isolate]. |
| 174 | | | /// The [Isolate] must send a [WorkerResponse.endOfStream] to close the |
| 175 | | | /// [Stream]. |
| 176 | | 5 | @override |
| 177 | | | Stream<dynamic> sendStreamingRequest( |
| 178 | | | int command, |
| 179 | | | List args, { |
| 180 | | | SquadronCancelationToken? token, |
| 181 | | | bool inspectRequest = false, |
| 182 | | | bool inspectResponse = false, |
| 183 | | | }) { |
| 184 | | 5 | final receiver = vm.ReceivePort(); |
| 185 | | 5 | final req = WorkerRequest.userCommand( |
| 186 | | 5 | receiver.sendPort, command, args, token, inspectResponse); |
| 187 | | 10 | return _getResponseStream(receiver, req, _postRequest, streaming: true); |
| 188 | | | } |
| 189 | | | } |