| 1 |  |  | import 'dart:async'; | 
                
                
                    | 2 |  |  |  | 
                
                
                    | 3 |  |  | import '../../channel.dart'; | 
                
                
                    | 4 |  |  | import '../../exceptions/squadron_error.dart'; | 
                
                
                    | 5 |  |  | import '../../exceptions/squadron_exception.dart'; | 
                
                
                    | 6 |  |  | import '../../worker/worker_request.dart'; | 
                
                
                    | 7 |  |  | import '../../worker/worker_response.dart'; | 
                
                
                    | 8 |  |  | import '_forward_stream_controller.dart'; | 
                
                
                    | 9 |  |  |  | 
                
                
                    | 10 |  |  | class ResultStream { | 
                
                
                    | 11 |  | 10 |   ResultStream( | 
                
                
                    | 12 |  |  |     Channel channel, | 
                
                
                    | 13 |  |  |     WorkerRequest req, | 
                
                
                    | 14 |  |  |     Stream<WorkerResponse> Function() sendRequest, | 
                
                
                    | 15 |  |  |     bool streaming, | 
                
                
                    | 16 |  |  |   ) { | 
                
                
                    | 17 |  | 5 |     final streamIdCompleter = streaming ? Completer<int?>() : null; | 
                
                
                    | 18 |  | 20 |     final command = req.command, token = req.cancelToken; | 
                
                
                    | 19 |  |  |  | 
                
                
                    | 20 |  | 5 |     void $decodeStreamOfResponses(WorkerResponse res) { | 
                
                
                    | 21 |  | 5 |       if (!res.unwrapInPlace(channel)) return; | 
                
                
                    | 22 |  |  |  | 
                
                
                    | 23 |  | 5 |       final hasStreamId = streamIdCompleter!.isCompleted; | 
                
                
                    | 24 |  | 5 |       if (res.endOfStream) { | 
                
                
                    | 25 |  |  |         // handle endofStream | 
                
                
                    | 26 |  |  |         if (!hasStreamId) { | 
                
                
                    | 27 |  | 0 |           streamIdCompleter.complete(null); | 
                
                
                    | 28 |  | 0 |           channel.logger | 
                
                
                    | 29 |  | 0 |               ?.e('Invalid state: unexpected endOfStream for command $command'); | 
                
                
                    | 30 |  | 0 |           _controller.addError(SquadronErrorImpl.create( | 
                
                
                    | 31 |  |  |             'Invalid state: unexpected endOfStream', | 
                
                
                    | 32 |  |  |             null, | 
                
                
                    | 33 |  |  |             command, | 
                
                
                    | 34 |  |  |           )); | 
                
                
                    | 35 |  |  |         } | 
                
                
                    | 36 |  | 8 |         _controller.close(); | 
                
                
                    | 37 |  |  |         return; | 
                
                
                    | 38 |  |  |       } | 
                
                
                    | 39 |  |  |  | 
                
                
                    | 40 |  | 5 |       final error = res.error; | 
                
                
                    | 41 |  |  |       if (error == null && !hasStreamId) { | 
                
                
                    | 42 |  |  |         // the first result from a streaming operation is the stream ID | 
                
                
                    | 43 |  | 15 |         streamIdCompleter.complete((res.result as num).toInt()); | 
                
                
                    | 44 |  |  |       } else if (error != null) { | 
                
                
                    | 45 |  | 6 |         _controller.addError(error); | 
                
                
                    | 46 |  |  |         if (!hasStreamId) { | 
                
                
                    | 47 |  |  |           // if any error comes before the stream ID, somethind bad happened | 
                
                
                    | 48 |  | 0 |           streamIdCompleter.complete(null); | 
                
                
                    | 49 |  | 0 |           _controller.close(); | 
                
                
                    | 50 |  |  |           return; | 
                
                
                    | 51 |  |  |         } | 
                
                
                    | 52 |  |  |       } else { | 
                
                
                    | 53 |  |  |         try { | 
                
                
                    | 54 |  | 15 |           _controller.add(res.result); | 
                
                
                    | 55 |  |  |         } catch (ex, st) { | 
                
                
                    | 56 |  | 0 |           _controller.addError(SquadronException.from(ex, st, command)); | 
                
                
                    | 57 |  |  |         } | 
                
                
                    | 58 |  |  |       } | 
                
                
                    | 59 |  |  |  | 
                
                
                    | 60 |  | 2 |       final canceled = token?.exception; | 
                
                
                    | 61 |  |  |       if (canceled != null) { | 
                
                
                    | 62 |  | 0 |         _controller.addError(canceled); | 
                
                
                    | 63 |  | 0 |         _controller.close(); | 
                
                
                    | 64 |  |  |       } | 
                
                
                    | 65 |  |  |     } | 
                
                
                    | 66 |  |  |  | 
                
                
                    | 67 |  | 10 |     void $decodeSingleResponse(WorkerResponse res) { | 
                
                
                    | 68 |  | 10 |       if (!res.unwrapInPlace(channel)) return; | 
                
                
                    | 69 |  |  |  | 
                
                
                    | 70 |  | 10 |       final error = res.error; | 
                
                
                    | 71 |  |  |       if (error != null) { | 
                
                
                    | 72 |  | 6 |         _controller.addError(error); | 
                
                
                    | 73 |  |  |       } else { | 
                
                
                    | 74 |  |  |         try { | 
                
                
                    | 75 |  | 30 |           _controller.add(res.result); | 
                
                
                    | 76 |  |  |         } catch (ex, st) { | 
                
                
                    | 77 |  | 0 |           _controller.addError(SquadronException.from(ex, st, command)); | 
                
                
                    | 78 |  |  |         } | 
                
                
                    | 79 |  |  |       } | 
                
                
                    | 80 |  |  |  | 
                
                
                    | 81 |  | 20 |       _controller.close(); | 
                
                
                    | 82 |  |  |     } | 
                
                
                    | 83 |  |  |  | 
                
                
                    | 84 |  | 4 |     Future<int?> $getStreamId(StreamSubscription sub) async { | 
                
                
                    | 85 |  |  |       streamIdCompleter as Completer<int?>; | 
                
                
                    | 86 |  |  |       var count = 0; | 
                
                
                    | 87 |  | 5 |       if (sub.isPaused && !streamIdCompleter.isCompleted) { | 
                
                
                    | 88 |  |  |         // if the subscription was paused and the streamId is not available, | 
                
                
                    | 89 |  |  |         // resume to have the streamId eventually come through. | 
                
                
                    | 90 |  | 0 |         while (sub.isPaused) { | 
                
                
                    | 91 |  | 0 |           count++; | 
                
                
                    | 92 |  | 0 |           sub.resume(); | 
                
                
                    | 93 |  |  |         } | 
                
                
                    | 94 |  |  |       } | 
                
                
                    | 95 |  |  |       // wait for the streamId... | 
                
                
                    | 96 |  | 4 |       final streamId = await streamIdCompleter.future; | 
                
                
                    | 97 |  |  |       // restore subscription pause | 
                
                
                    | 98 |  | 4 |       while (count > 0) { | 
                
                
                    | 99 |  | 0 |         count--; | 
                
                
                    | 100 |  | 0 |         sub.pause(); | 
                
                
                    | 101 |  |  |       } | 
                
                
                    | 102 |  |  |       return streamId; | 
                
                
                    | 103 |  |  |     } | 
                
                
                    | 104 |  |  |  | 
                
                
                    | 105 |  | 10 |     Future<void> $onCancel() async { | 
                
                
                    | 106 |  | 20 |       final sub = _controller.subscription; | 
                
                
                    | 107 |  |  |       if (streamIdCompleter != null && sub != null) { | 
                
                
                    | 108 |  |  |         // this is a streaming operation and the subscription is active, so | 
                
                
                    | 109 |  |  |         // we need to inform the worker that the stream has been canceled | 
                
                
                    | 110 |  | 4 |         final streamId = await $getStreamId(sub); | 
                
                
                    | 111 |  |  |         if (streamId != null) { | 
                
                
                    | 112 |  | 4 |           channel.cancelStream(streamId); | 
                
                
                    | 113 |  |  |         } | 
                
                
                    | 114 |  | 4 |         await sub.cancel(); | 
                
                
                    | 115 |  |  |       } | 
                
                
                    | 116 |  |  |     } | 
                
                
                    | 117 |  |  |  | 
                
                
                    | 118 |  | 3 |     void $closeWithError(Object error, [StackTrace? st]) { | 
                
                
                    | 119 |  | 9 |       _controller.addError(SquadronException.from(error, st, command)); | 
                
                
                    | 120 |  | 6 |       _controller.close(); | 
                
                
                    | 121 |  |  |     } | 
                
                
                    | 122 |  |  |  | 
                
                
                    | 123 |  | 10 |     void $onListen() { | 
                
                
                    | 124 |  |  |       try { | 
                
                
                    | 125 |  |  |         // do not send the request if the token is already canceled | 
                
                
                    | 126 |  | 2 |         token?.throwIfCanceled(); | 
                
                
                    | 127 |  |  |         // send the request and start decoding responses | 
                
                
                    | 128 |  | 40 |         _controller.attachSubscription(sendRequest().listen( | 
                
                
                    | 129 |  |  |           streaming ? $decodeStreamOfResponses : $decodeSingleResponse, | 
                
                
                    | 130 |  |  |           onError: $closeWithError, | 
                
                
                    | 131 |  | 20 |           onDone: _controller.close, | 
                
                
                    | 132 |  |  |           cancelOnError: false, | 
                
                
                    | 133 |  |  |         )); | 
                
                
                    | 134 |  |  |       } catch (ex, st) { | 
                
                
                    | 135 |  | 0 |         $closeWithError(ex, st); | 
                
                
                    | 136 |  |  |       } | 
                
                
                    | 137 |  |  |     } | 
                
                
                    | 138 |  |  |  | 
                
                
                    | 139 |  | 20 |     _controller = ForwardStreamController<dynamic>( | 
                
                
                    | 140 |  |  |       onListen: $onListen, | 
                
                
                    | 141 |  |  |       onCancel: $onCancel, | 
                
                
                    | 142 |  |  |     ); | 
                
                
                    | 143 |  |  |   } | 
                
                
                    | 144 |  |  |  | 
                
                
                    | 145 |  |  |   late final ForwardStreamController<dynamic> _controller; | 
                
                
                    | 146 |  |  |  | 
                
                
                    | 147 |  | 30 |   Stream<dynamic> get stream => _controller.stream; | 
                
                
                    | 148 |  |  |  | 
                
                
                    | 149 |  | 30 |   Future<void> get done => _controller.done; | 
                
                
                    | 150 |  |  | } |