| 1 | | | import 'dart:async'; |
| 2 | | | import 'dart:convert'; |
| 3 | | | import 'dart:isolate' as vm; |
| 4 | | | |
| 5 | | | import 'package:logger/web.dart'; |
| 6 | | | import 'package:meta/meta.dart'; |
| 7 | | | |
| 8 | | | import '../../channel.dart'; |
| 9 | | | import '../../exceptions/exception_manager.dart'; |
| 10 | | | import '../../exceptions/squadron_error.dart'; |
| 11 | | | import '../../exceptions/squadron_exception.dart'; |
| 12 | | | import '../../exceptions/task_terminated_exception.dart'; |
| 13 | | | import '../../exceptions/worker_exception.dart'; |
| 14 | | | import '../../tokens/_squadron_cancelation_token.dart'; |
| 15 | | | import '../../worker/worker_request.dart'; |
| 16 | | | import '../../worker/worker_response.dart'; |
| 17 | | | import '../xplat/_disconnected_channel.dart'; |
| 18 | | | import '../xplat/_forward_stream_controller.dart'; |
| 19 | | | import '../xplat/_result_stream.dart'; |
| 20 | | | import '_typedefs.dart'; |
| 21 | | | |
| 22 | | | part '_channel_forward.dart'; |
| 23 | | | part '_channel_impl.dart'; |
| 24 | | | |
| 25 | | | // Stub implementations. |
| 26 | | | |
| 27 | | | /// Starts an [Isolate] using the [entryPoint] and sends a start |
| 28 | | | /// [WorkerRequest] with [startArguments]. The future completes after the |
| 29 | | | /// worker [Isolate]'s main program has provided the [SendPort] via |
| 30 | | | /// [WorkerChannel.connect]. |
| 31 | | 11 | Future<Channel> openChannel( |
| 32 | | | EntryPoint entryPoint, |
| 33 | | | ExceptionManager exceptionManager, |
| 34 | | | Logger? logger, |
| 35 | | | List startArguments, |
| 36 | | | PlatformThreadHook? hook, |
| 37 | | | ) async { |
| 38 | | 11 | final completer = Completer<_VmChannel>(); |
| 39 | | | Channel? channel; |
| 40 | | | |
| 41 | | 11 | void $failure(Object error, [StackTrace? stackTrace]) { |
| 42 | | 11 | if (!completer.isCompleted) { |
| 43 | | 6 | completer.completeError(SquadronException.from(error, stackTrace)); |
| 44 | | | } |
| 45 | | | } |
| 46 | | | |
| 47 | | 10 | void $success(_VmChannel channel) { |
| 48 | | 10 | if (!completer.isCompleted) { |
| 49 | | 10 | completer.complete(channel); |
| 50 | | | } |
| 51 | | | } |
| 52 | | | |
| 53 | | 11 | final receiver = vm.ReceivePort(); |
| 54 | | 11 | final exitPort = vm.ReceivePort(); |
| 55 | | 11 | final errorPort = vm.ReceivePort(); |
| 56 | | | |
| 57 | | 22 | exitPort.listen((message) { |
| 58 | | 22 | $failure(SquadronErrorImpl.create('Connection to worker failed')); |
| 59 | | 6 | logger?.t('Isolate terminated with message $message.'); |
| 60 | | 10 | channel?.close(); |
| 61 | | 11 | receiver.close(); |
| 62 | | 11 | errorPort.close(); |
| 63 | | 11 | exitPort.close(); |
| 64 | | | }); |
| 65 | | | |
| 66 | | 11 | errorPort.listen((message) { |
| 67 | | | SquadronException? error; |
| 68 | | | try { |
| 69 | | 0 | final data = jsonDecode(message[0]); |
| 70 | | 0 | if (data is List) { |
| 71 | | 0 | error = exceptionManager.deserialize(data.cast<String>()); |
| 72 | | | } |
| 73 | | | } catch (_) { |
| 74 | | | // not a String representing a SquadronException |
| 75 | | | } |
| 76 | | | |
| 77 | | 0 | error ??= WorkerException( |
| 78 | | 0 | message[0], |
| 79 | | 0 | SquadronException.loadStackTrace(message[1]), |
| 80 | | | ); |
| 81 | | | |
| 82 | | 0 | logger?.d(() => 'Unhandled error from Isolate: ${error?.message}.'); |
| 83 | | 0 | $failure(error); |
| 84 | | | }); |
| 85 | | | |
| 86 | | 11 | final disconnected = DisconnectedChannel(exceptionManager, logger); |
| 87 | | | |
| 88 | | 21 | receiver.listen((message) { |
| 89 | | 10 | final response = WorkerResponseImpl.from(message); |
| 90 | | 10 | if (!response.unwrapInPlace(disconnected)) { |
| 91 | | | return; |
| 92 | | | } |
| 93 | | | |
| 94 | | 10 | final error = response.error; |
| 95 | | | if (error != null) { |
| 96 | | 3 | logger?.e(() => 'Connection to Isolate failed: $error'); |
| 97 | | 2 | $failure(error); |
| 98 | | 10 | } else if (response.endOfStream) { |
| 99 | | 1 | logger?.w('Disconnecting from Isolate'); |
| 100 | | 1 | channel?.close(); |
| 101 | | 10 | } else if (!completer.isCompleted) { |
| 102 | | 3 | logger?.t('Connected to Isolate'); |
| 103 | | | final platformChannel = |
| 104 | | 20 | _VmChannel._(response.result, logger, exceptionManager); |
| 105 | | | channel = platformChannel; |
| 106 | | 10 | $success(platformChannel); |
| 107 | | | } else { |
| 108 | | 0 | logger?.e(() => 'Unexpected response: $response'); |
| 109 | | | } |
| 110 | | | }); |
| 111 | | | |
| 112 | | 22 | final startRequest = WorkerRequest.start(receiver.sendPort, startArguments); |
| 113 | | 11 | startRequest.wrapInPlace(); |
| 114 | | 11 | final isolate = await vm.Isolate.spawn( |
| 115 | | | entryPoint, |
| 116 | | | startRequest, |
| 117 | | | errorsAreFatal: false, |
| 118 | | 11 | onExit: exitPort.sendPort, |
| 119 | | 11 | onError: errorPort.sendPort, |
| 120 | | | ); |
| 121 | | | |
| 122 | | | try { |
| 123 | | 11 | final channel = await completer.future; |
| 124 | | 10 | channel._thread = isolate; |
| 125 | | | if (hook != null) { |
| 126 | | 2 | await hook.call(isolate); |
| 127 | | | } |
| 128 | | 3 | logger?.t('Created Isolate'); |
| 129 | | | return channel; |
| 130 | | | } catch (ex) { |
| 131 | | 0 | logger?.e(() => 'Connection to Isolate failed: $ex'); |
| 132 | | 3 | isolate.kill(priority: vm.Isolate.beforeNextEvent); |
| 133 | | | rethrow; |
| 134 | | | } |
| 135 | | | } |
| 136 | | | |
| 137 | | 6 | @internal |
| 138 | | | int getActiveConnections(Channel channel) => |
| 139 | | 18 | (channel is _VmChannel) ? channel._activeConnections.length : 0; |
| 140 | | | |
| 141 | | 2 | @internal |
| 142 | | | void terminateChannel(Channel channel, TaskTerminatedException ex) { |
| 143 | | 2 | if (channel is _VmChannel) { |
| 144 | | 4 | channel._thread?.kill(priority: vm.Isolate.immediate); |
| 145 | | 2 | final pendingConnections = channel._activeConnections; |
| 146 | | 4 | for (var c in pendingConnections) { |
| 147 | | 4 | c.subscription?.cancel(); |
| 148 | | 2 | c.addError(ex); |
| 149 | | 2 | c.close(); |
| 150 | | | } |
| 151 | | | } |
| 152 | | | } |
| 153 | | | |
| 154 | | | /// Creates a [_VmChannel] from a [SendPort]. |
| 155 | | 3 | Channel? deserialize( |
| 156 | | | PlatformChannel? channelInfo, [ |
| 157 | | | Logger? logger, |
| 158 | | | ExceptionManager? exceptionManager, |
| 159 | | | ]) => |
| 160 | | | (channelInfo == null) |
| 161 | | | ? null |
| 162 | | 3 | : _VmChannel._( |
| 163 | | | channelInfo, |
| 164 | | | logger, |
| 165 | | 3 | exceptionManager ?? ExceptionManager(), |
| 166 | | | ); |