|  |  | 1 |  | /* | 
|  |  | 2 |  |  * Copyright 2018 Stanislav Muhametsin. All rights Reserved. | 
|  |  | 3 |  |  * | 
|  |  | 4 |  |  * Licensed  under the  Apache License,  Version 2.0  (the "License"); | 
|  |  | 5 |  |  * you may not use  this file  except in  compliance with the License. | 
|  |  | 6 |  |  * You may obtain a copy of the License at | 
|  |  | 7 |  |  * | 
|  |  | 8 |  |  *   http://www.apache.org/licenses/LICENSE-2.0 | 
|  |  | 9 |  |  * | 
|  |  | 10 |  |  * Unless required by applicable law or agreed to in writing, software | 
|  |  | 11 |  |  * distributed  under the  License is distributed on an "AS IS" BASIS, | 
|  |  | 12 |  |  * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or | 
|  |  | 13 |  |  * implied. | 
|  |  | 14 |  |  * | 
|  |  | 15 |  |  * See the License for the specific language governing permissions and | 
|  |  | 16 |  |  * limitations under the License. | 
|  |  | 17 |  |  */ | 
|  |  | 18 |  | using AsyncEnumeration.Abstractions; | 
|  |  | 19 |  | using AsyncEnumeration.Implementation.Enumerable; | 
|  |  | 20 |  | using AsyncEnumeration.Observability; | 
|  |  | 21 |  | using CBAM.Abstractions; | 
|  |  | 22 |  | using CBAM.Abstractions.Implementation; | 
|  |  | 23 |  | using CBAM.NATS; | 
|  |  | 24 |  | using Newtonsoft.Json; | 
|  |  | 25 |  | using Newtonsoft.Json.Linq; | 
|  |  | 26 |  | using System; | 
|  |  | 27 |  | using System.Collections.Concurrent; | 
|  |  | 28 |  | using System.Collections.Generic; | 
|  |  | 29 |  | using System.IO; | 
|  |  | 30 |  | using System.Net.Sockets; | 
|  |  | 31 |  | using System.Text; | 
|  |  | 32 |  | using System.Threading; | 
|  |  | 33 |  | using System.Threading.Tasks; | 
|  |  | 34 |  | using UtilPack; | 
|  |  | 35 |  |  | 
|  |  | 36 |  | using TDataProducerResult = System.Threading.Tasks.ValueTask<System.Collections.Generic.IEnumerable<CBAM.NATS.NATSPublis | 
|  |  | 37 |  |  | 
|  |  | 38 |  | namespace CBAM.NATS | 
|  |  | 39 |  | { | 
|  |  | 40 |  |    using TDataProducer = Func<TDataProducerResult>; | 
|  |  | 41 |  |  | 
|  |  | 42 |  |    namespace Implementation | 
|  |  | 43 |  |    { | 
|  |  | 44 |  |  | 
|  |  | 45 |  |       using TDataProducerFactory = Func<TDataProducer>; | 
|  |  | 46 |  |  | 
|  |  | 47 |  |       internal sealed class NATSConnectionImpl : NATSConnection | 
|  |  | 48 |  |       { | 
|  |  | 49 |  |  | 
|  |  | 50 |  |          public sealed class NATSSubscribeConnectionImpl : ConnectionImpl<NATSSubscribeStatement, NATSSubscribeStatement | 
|  |  | 51 |  |          { | 
|  |  | 52 |  |             public NATSSubscribeConnectionImpl( | 
|  |  | 53 |  |                NATSSubscribeConnectionFunctionality functionality | 
|  | 4 | 54 |  |                ) : base( functionality ) | 
|  |  | 55 |  |             { | 
|  | 4 | 56 |  |             } | 
|  |  | 57 |  |          } | 
|  |  | 58 |  |  | 
|  |  | 59 |  |          public sealed class NATSPublishConnectionImpl : ConnectionImpl<NATSPublishStatement, NATSPublishStatementInform | 
|  |  | 60 |  |          { | 
|  |  | 61 |  |             public NATSPublishConnectionImpl( | 
|  |  | 62 |  |                NATSPublishConnectionFunctionality functionality | 
|  | 4 | 63 |  |                ) : base( functionality ) | 
|  |  | 64 |  |             { | 
|  | 4 | 65 |  |             } | 
|  |  | 66 |  |          } | 
|  |  | 67 |  |  | 
|  |  | 68 |  |          //private readonly Socket _socket; | 
|  |  | 69 |  |          private readonly ClientProtocol _protocol; | 
|  |  | 70 |  |          private readonly NATSSubscribeConnectionImpl _subscribe; | 
|  |  | 71 |  |          private readonly NATSPublishConnectionImpl _publish; | 
|  |  | 72 |  |  | 
|  | 4 | 73 |  |          public NATSConnectionImpl( | 
|  | 4 | 74 |  |             //Socket socket, | 
|  | 4 | 75 |  |             NATSConnectionVendorFunctionalityImpl vendorFunctionality, | 
|  | 4 | 76 |  |             ClientProtocol protocol | 
|  | 4 | 77 |  |             ) | 
|  |  | 78 |  |          { | 
|  | 4 | 79 |  |             this.VendorFunctionality = ArgumentValidator.ValidateNotNull( nameof( vendorFunctionality ), vendorFunctiona | 
|  |  | 80 |  |             //this._socket = ArgumentValidator.ValidateNotNull( nameof( socket ), socket ); | 
|  | 4 | 81 |  |             this._protocol = ArgumentValidator.ValidateNotNull( nameof( protocol ), protocol ); | 
|  | 4 | 82 |  |             this._subscribe = new NATSSubscribeConnectionImpl( new NATSSubscribeConnectionFunctionality( vendorFunctiona | 
|  | 4 | 83 |  |             this._publish = new NATSPublishConnectionImpl( new NATSPublishConnectionFunctionality( vendorFunctionality,  | 
|  | 4 | 84 |  |          } | 
|  |  | 85 |  |  | 
|  | 4 | 86 |  |          public NATSConnectionVendorFunctionality VendorFunctionality { get; } | 
|  |  | 87 |  |  | 
|  | 0 | 88 |  |          public Boolean DisableEnumerableObservability { get; set; } | 
|  |  | 89 |  |  | 
|  |  | 90 |  |          event GenericEventHandler<EnumerationStartedEventArgs<NATSSubscribeStatementInformation>> AsyncEnumerationObser | 
|  |  | 91 |  |          { | 
|  |  | 92 |  |             add | 
|  |  | 93 |  |             { | 
|  | 0 | 94 |  |                this._subscribe.BeforeEnumerationStart += value; | 
|  | 0 | 95 |  |             } | 
|  |  | 96 |  |             remove | 
|  |  | 97 |  |             { | 
|  | 0 | 98 |  |                this._subscribe.BeforeEnumerationEnd -= value; | 
|  | 0 | 99 |  |             } | 
|  |  | 100 |  |          } | 
|  |  | 101 |  |  | 
|  |  | 102 |  |          event GenericEventHandler<EnumerationStartedEventArgs> AsyncEnumerationObservation<NATSMessage>.BeforeEnumerati | 
|  |  | 103 |  |          { | 
|  |  | 104 |  |             add | 
|  |  | 105 |  |             { | 
|  | 0 | 106 |  |                this._subscribe.BeforeEnumerationStart += value; | 
|  | 0 | 107 |  |             } | 
|  |  | 108 |  |             remove | 
|  |  | 109 |  |             { | 
|  | 0 | 110 |  |                this._subscribe.BeforeEnumerationStart -= value; | 
|  | 0 | 111 |  |             } | 
|  |  | 112 |  |          } | 
|  |  | 113 |  |  | 
|  |  | 114 |  |          event GenericEventHandler<EnumerationStartedEventArgs<NATSSubscribeStatementInformation>> AsyncEnumerationObser | 
|  |  | 115 |  |          { | 
|  |  | 116 |  |             add | 
|  |  | 117 |  |             { | 
|  | 0 | 118 |  |                this._subscribe.AfterEnumerationStart += value; | 
|  | 0 | 119 |  |             } | 
|  |  | 120 |  |             remove | 
|  |  | 121 |  |             { | 
|  | 0 | 122 |  |                this._subscribe.AfterEnumerationStart -= value; | 
|  | 0 | 123 |  |             } | 
|  |  | 124 |  |          } | 
|  |  | 125 |  |  | 
|  |  | 126 |  |          event GenericEventHandler<EnumerationStartedEventArgs> AsyncEnumerationObservation<NATSMessage>.AfterEnumeratio | 
|  |  | 127 |  |          { | 
|  |  | 128 |  |             add | 
|  |  | 129 |  |             { | 
|  | 0 | 130 |  |                this._subscribe.AfterEnumerationStart += value; | 
|  | 0 | 131 |  |             } | 
|  |  | 132 |  |  | 
|  |  | 133 |  |             remove | 
|  |  | 134 |  |             { | 
|  | 0 | 135 |  |                this._subscribe.AfterEnumerationStart -= value; | 
|  | 0 | 136 |  |             } | 
|  |  | 137 |  |          } | 
|  |  | 138 |  |  | 
|  |  | 139 |  |          event GenericEventHandler<EnumerationItemEventArgs<NATSMessage, NATSSubscribeStatementInformation>> AsyncEnumer | 
|  |  | 140 |  |          { | 
|  |  | 141 |  |             add | 
|  |  | 142 |  |             { | 
|  | 0 | 143 |  |                this._subscribe.AfterEnumerationItemEncountered += value; | 
|  | 0 | 144 |  |             } | 
|  |  | 145 |  |             remove | 
|  |  | 146 |  |             { | 
|  | 0 | 147 |  |                this._subscribe.AfterEnumerationItemEncountered -= value; | 
|  | 0 | 148 |  |             } | 
|  |  | 149 |  |          } | 
|  |  | 150 |  |  | 
|  |  | 151 |  |          event GenericEventHandler<EnumerationItemEventArgs<NATSMessage>> AsyncEnumerationObservation<NATSMessage>.After | 
|  |  | 152 |  |          { | 
|  |  | 153 |  |             add | 
|  |  | 154 |  |             { | 
|  | 0 | 155 |  |                this._subscribe.AfterEnumerationItemEncountered += value; | 
|  | 0 | 156 |  |             } | 
|  |  | 157 |  |             remove | 
|  |  | 158 |  |             { | 
|  | 0 | 159 |  |                this._subscribe.AfterEnumerationItemEncountered -= value; | 
|  | 0 | 160 |  |             } | 
|  |  | 161 |  |          } | 
|  |  | 162 |  |  | 
|  |  | 163 |  |          event GenericEventHandler<EnumerationEndedEventArgs<NATSSubscribeStatementInformation>> AsyncEnumerationObserva | 
|  |  | 164 |  |          { | 
|  |  | 165 |  |             add | 
|  |  | 166 |  |             { | 
|  | 0 | 167 |  |                this._subscribe.BeforeEnumerationEnd += value; | 
|  | 0 | 168 |  |             } | 
|  |  | 169 |  |             remove | 
|  |  | 170 |  |             { | 
|  | 0 | 171 |  |                this._subscribe.BeforeEnumerationEnd -= value; | 
|  | 0 | 172 |  |             } | 
|  |  | 173 |  |          } | 
|  |  | 174 |  |  | 
|  |  | 175 |  |          event GenericEventHandler<EnumerationEndedEventArgs> AsyncEnumerationObservation<NATSMessage>.BeforeEnumeration | 
|  |  | 176 |  |          { | 
|  |  | 177 |  |             add | 
|  |  | 178 |  |             { | 
|  | 0 | 179 |  |                this._subscribe.BeforeEnumerationEnd += value; | 
|  | 0 | 180 |  |             } | 
|  |  | 181 |  |             remove | 
|  |  | 182 |  |             { | 
|  | 0 | 183 |  |                this._subscribe.BeforeEnumerationEnd -= value; | 
|  | 0 | 184 |  |             } | 
|  |  | 185 |  |          } | 
|  |  | 186 |  |  | 
|  |  | 187 |  |          event GenericEventHandler<EnumerationEndedEventArgs<NATSSubscribeStatementInformation>> AsyncEnumerationObserva | 
|  |  | 188 |  |          { | 
|  |  | 189 |  |             add | 
|  |  | 190 |  |             { | 
|  | 0 | 191 |  |                this._subscribe.AfterEnumerationEnd += value; | 
|  | 0 | 192 |  |             } | 
|  |  | 193 |  |             remove | 
|  |  | 194 |  |             { | 
|  | 0 | 195 |  |                this._subscribe.AfterEnumerationEnd -= value; | 
|  | 0 | 196 |  |             } | 
|  |  | 197 |  |          } | 
|  |  | 198 |  |  | 
|  |  | 199 |  |          event GenericEventHandler<EnumerationEndedEventArgs> AsyncEnumerationObservation<NATSMessage>.AfterEnumerationE | 
|  |  | 200 |  |          { | 
|  |  | 201 |  |             add | 
|  |  | 202 |  |             { | 
|  | 0 | 203 |  |                this._subscribe.AfterEnumerationEnd += value; | 
|  | 0 | 204 |  |             } | 
|  |  | 205 |  |             remove | 
|  |  | 206 |  |             { | 
|  | 0 | 207 |  |                this._subscribe.AfterEnumerationEnd -= value; | 
|  | 0 | 208 |  |             } | 
|  |  | 209 |  |          } | 
|  |  | 210 |  |  | 
|  |  | 211 |  |  | 
|  |  | 212 |  |  | 
|  |  | 213 |  |  | 
|  |  | 214 |  |          event GenericEventHandler<EnumerationStartedEventArgs<NATSPublishStatementInformation>> AsyncEnumerationObserva | 
|  |  | 215 |  |          { | 
|  |  | 216 |  |             add | 
|  |  | 217 |  |             { | 
|  | 0 | 218 |  |                this._publish.BeforeEnumerationStart += value; | 
|  | 0 | 219 |  |             } | 
|  |  | 220 |  |             remove | 
|  |  | 221 |  |             { | 
|  | 0 | 222 |  |                this._publish.BeforeEnumerationStart -= value; | 
|  | 0 | 223 |  |             } | 
|  |  | 224 |  |          } | 
|  |  | 225 |  |  | 
|  |  | 226 |  |          event GenericEventHandler<EnumerationStartedEventArgs> AsyncEnumerationObservation<NATSPublishCompleted>.Before | 
|  |  | 227 |  |          { | 
|  |  | 228 |  |             add | 
|  |  | 229 |  |             { | 
|  | 0 | 230 |  |                this._publish.BeforeEnumerationStart += value; | 
|  | 0 | 231 |  |             } | 
|  |  | 232 |  |             remove | 
|  |  | 233 |  |             { | 
|  | 0 | 234 |  |                this._publish.BeforeEnumerationStart -= value; | 
|  | 0 | 235 |  |             } | 
|  |  | 236 |  |          } | 
|  |  | 237 |  |  | 
|  |  | 238 |  |          event GenericEventHandler<EnumerationStartedEventArgs<NATSPublishStatementInformation>> AsyncEnumerationObserva | 
|  |  | 239 |  |          { | 
|  |  | 240 |  |             add | 
|  |  | 241 |  |             { | 
|  | 0 | 242 |  |                this._publish.AfterEnumerationStart += value; | 
|  | 0 | 243 |  |             } | 
|  |  | 244 |  |             remove | 
|  |  | 245 |  |             { | 
|  | 0 | 246 |  |                this._publish.AfterEnumerationStart -= value; | 
|  | 0 | 247 |  |             } | 
|  |  | 248 |  |          } | 
|  |  | 249 |  |  | 
|  |  | 250 |  |          event GenericEventHandler<EnumerationStartedEventArgs> AsyncEnumerationObservation<NATSPublishCompleted>.AfterE | 
|  |  | 251 |  |          { | 
|  |  | 252 |  |             add | 
|  |  | 253 |  |             { | 
|  | 0 | 254 |  |                this._publish.AfterEnumerationStart += value; | 
|  | 0 | 255 |  |             } | 
|  |  | 256 |  |             remove | 
|  |  | 257 |  |             { | 
|  | 0 | 258 |  |                this._publish.AfterEnumerationStart -= value; | 
|  | 0 | 259 |  |             } | 
|  |  | 260 |  |          } | 
|  |  | 261 |  |  | 
|  |  | 262 |  |          event GenericEventHandler<EnumerationItemEventArgs<NATSPublishCompleted, NATSPublishStatementInformation>> Asyn | 
|  |  | 263 |  |          { | 
|  |  | 264 |  |             add | 
|  |  | 265 |  |             { | 
|  | 0 | 266 |  |                this._publish.AfterEnumerationItemEncountered += value; | 
|  | 0 | 267 |  |             } | 
|  |  | 268 |  |             remove | 
|  |  | 269 |  |             { | 
|  | 0 | 270 |  |                this._publish.AfterEnumerationItemEncountered -= value; | 
|  | 0 | 271 |  |             } | 
|  |  | 272 |  |          } | 
|  |  | 273 |  |  | 
|  |  | 274 |  |          event GenericEventHandler<EnumerationItemEventArgs<NATSPublishCompleted>> AsyncEnumerationObservation<NATSPubli | 
|  |  | 275 |  |          { | 
|  |  | 276 |  |             add | 
|  |  | 277 |  |             { | 
|  | 0 | 278 |  |                this._publish.AfterEnumerationItemEncountered += value; | 
|  | 0 | 279 |  |             } | 
|  |  | 280 |  |             remove | 
|  |  | 281 |  |             { | 
|  | 0 | 282 |  |                this._publish.AfterEnumerationItemEncountered -= value; | 
|  | 0 | 283 |  |             } | 
|  |  | 284 |  |          } | 
|  |  | 285 |  |  | 
|  |  | 286 |  |          event GenericEventHandler<EnumerationEndedEventArgs<NATSPublishStatementInformation>> AsyncEnumerationObservati | 
|  |  | 287 |  |          { | 
|  |  | 288 |  |             add | 
|  |  | 289 |  |             { | 
|  | 0 | 290 |  |                this._publish.BeforeEnumerationEnd += value; | 
|  | 0 | 291 |  |             } | 
|  |  | 292 |  |             remove | 
|  |  | 293 |  |             { | 
|  | 0 | 294 |  |                this._publish.BeforeEnumerationEnd -= value; | 
|  | 0 | 295 |  |             } | 
|  |  | 296 |  |          } | 
|  |  | 297 |  |  | 
|  |  | 298 |  |          event GenericEventHandler<EnumerationEndedEventArgs> AsyncEnumerationObservation<NATSPublishCompleted>.BeforeEn | 
|  |  | 299 |  |          { | 
|  |  | 300 |  |             add | 
|  |  | 301 |  |             { | 
|  | 0 | 302 |  |                this._publish.BeforeEnumerationEnd += value; | 
|  | 0 | 303 |  |             } | 
|  |  | 304 |  |             remove | 
|  |  | 305 |  |             { | 
|  | 0 | 306 |  |                this._publish.BeforeEnumerationEnd -= value; | 
|  | 0 | 307 |  |             } | 
|  |  | 308 |  |          } | 
|  |  | 309 |  |  | 
|  |  | 310 |  |          event GenericEventHandler<EnumerationEndedEventArgs<NATSPublishStatementInformation>> AsyncEnumerationObservati | 
|  |  | 311 |  |          { | 
|  |  | 312 |  |             add | 
|  |  | 313 |  |             { | 
|  | 0 | 314 |  |                this._publish.AfterEnumerationEnd += value; | 
|  | 0 | 315 |  |             } | 
|  |  | 316 |  |             remove | 
|  |  | 317 |  |             { | 
|  | 0 | 318 |  |                this._publish.AfterEnumerationEnd -= value; | 
|  | 0 | 319 |  |             } | 
|  |  | 320 |  |          } | 
|  |  | 321 |  |  | 
|  |  | 322 |  |          event GenericEventHandler<EnumerationEndedEventArgs> AsyncEnumerationObservation<NATSPublishCompleted>.AfterEnu | 
|  |  | 323 |  |          { | 
|  |  | 324 |  |             add | 
|  |  | 325 |  |             { | 
|  | 0 | 326 |  |                this._publish.AfterEnumerationEnd += value; | 
|  | 0 | 327 |  |             } | 
|  |  | 328 |  |             remove | 
|  |  | 329 |  |             { | 
|  | 0 | 330 |  |                this._publish.AfterEnumerationEnd -= value; | 
|  | 0 | 331 |  |             } | 
|  |  | 332 |  |          } | 
|  |  | 333 |  |  | 
|  |  | 334 |  |          public IAsyncEnumerable<NATSMessage> PrepareStatementForExecution( NATSSubscribeStatementInformation statement  | 
|  |  | 335 |  |          { | 
|  | 2 | 336 |  |             return this._subscribe.PrepareStatementForExecution( statement ); | 
|  |  | 337 |  |          } | 
|  |  | 338 |  |  | 
|  |  | 339 |  |          public IAsyncEnumerable<NATSPublishCompleted> PrepareStatementForExecution( NATSPublishStatementInformation sta | 
|  |  | 340 |  |          { | 
|  | 2 | 341 |  |             return this._publish.PrepareStatementForExecution( statement ); | 
|  |  | 342 |  |          } | 
|  |  | 343 |  |  | 
|  |  | 344 |  |          public async Task<NATSMessage> RequestAsync( String subject, Byte[] data, Int32 offset, Int32 count ) | 
|  |  | 345 |  |          { | 
|  | 1 | 346 |  |             return await this._protocol.RequestAsync( subject, data, offset, count ); | 
|  | 1 | 347 |  |          } | 
|  |  | 348 |  |  | 
|  |  | 349 |  |          public event GenericEventHandler<AfterSubscriptionSentArgs> AfterSubscriptionSent | 
|  |  | 350 |  |          { | 
|  |  | 351 |  |             add | 
|  |  | 352 |  |             { | 
|  | 0 | 353 |  |                this._protocol.AfterSubscriptionSent += value; | 
|  | 0 | 354 |  |             } | 
|  |  | 355 |  |             remove | 
|  |  | 356 |  |             { | 
|  | 0 | 357 |  |                this._protocol.AfterSubscriptionSent -= value; | 
|  | 0 | 358 |  |             } | 
|  |  | 359 |  |          } | 
|  |  | 360 |  |  | 
|  |  | 361 |  |          public event GenericEventHandler<AfterPublishSentArgs> AfterPublishSent | 
|  |  | 362 |  |          { | 
|  |  | 363 |  |             add | 
|  |  | 364 |  |             { | 
|  | 0 | 365 |  |                this._protocol.AfterPublishSent += value; | 
|  | 0 | 366 |  |             } | 
|  |  | 367 |  |             remove | 
|  |  | 368 |  |             { | 
|  | 0 | 369 |  |                this._protocol.AfterPublishSent -= value; | 
|  | 0 | 370 |  |             } | 
|  |  | 371 |  |          } | 
|  |  | 372 |  |  | 
|  |  | 373 |  |          //         internal CancellationTokenSource UsageStarted() | 
|  |  | 374 |  |          //         { | 
|  |  | 375 |  |          //            var retVal = new CancellationTokenSource(); | 
|  |  | 376 |  |          //#pragma warning disable 4014 | 
|  |  | 377 |  |          //            //this.RunReader( retVal.Token ); | 
|  |  | 378 |  |          //#pragma warning restore 4014 | 
|  |  | 379 |  |          //            return retVal; | 
|  |  | 380 |  |          //         } | 
|  |  | 381 |  |  | 
|  |  | 382 |  |          //internal void UsageEnded( CancellationTokenSource cancellationTokenSource ) | 
|  |  | 383 |  |          //{ | 
|  |  | 384 |  |          //   cancellationTokenSource.Cancel(); | 
|  |  | 385 |  |          //} | 
|  |  | 386 |  |  | 
|  |  | 387 |  |          //         private async Task RunReader( CancellationToken token ) | 
|  |  | 388 |  |          //         { | 
|  |  | 389 |  |          //            var socket = this._socket; | 
|  |  | 390 |  |          //            var seenIDs = new HashSet<Int64>(); | 
|  |  | 391 |  |          //            while ( !token.IsCancellationRequested ) | 
|  |  | 392 |  |          //            { | 
|  |  | 393 |  |          //               if ( socket.Available > 0 || socket.Poll( 1, SelectMode.SelectRead ) || socket.Available > 0 ) | 
|  |  | 394 |  |          //               { | 
|  |  | 395 |  |          //                  await this._protocol.PerformRead( seenIDs ); | 
|  |  | 396 |  |          //               } | 
|  |  | 397 |  |          //               else | 
|  |  | 398 |  |          //               { | 
|  |  | 399 |  |          //                  await | 
|  |  | 400 |  |          //#if NET40 | 
|  |  | 401 |  |          //                     TaskEx | 
|  |  | 402 |  |          //#else | 
|  |  | 403 |  |          //                     Task | 
|  |  | 404 |  |          //#endif | 
|  |  | 405 |  |          //                     .Delay( 50 ); | 
|  |  | 406 |  |  | 
|  |  | 407 |  |          //               } | 
|  |  | 408 |  |          //            } | 
|  |  | 409 |  |          //         } | 
|  |  | 410 |  |  | 
|  |  | 411 |  |       } | 
|  |  | 412 |  |  | 
|  |  | 413 |  |       internal sealed class NATSSubscribeConnectionFunctionality : DefaultConnectionFunctionality<NATSSubscribeStatement | 
|  |  | 414 |  |       { | 
|  |  | 415 |  |  | 
|  |  | 416 |  |          private readonly ClientProtocol _protocol; | 
|  |  | 417 |  |  | 
|  |  | 418 |  |          public NATSSubscribeConnectionFunctionality( | 
|  |  | 419 |  |             NATSConnectionVendorFunctionalityImpl vendor, | 
|  |  | 420 |  |             ClientProtocol protocol | 
|  |  | 421 |  |             ) : base( vendor ) | 
|  |  | 422 |  |          { | 
|  |  | 423 |  |             this._protocol = ArgumentValidator.ValidateNotNull( nameof( protocol ), protocol ); | 
|  |  | 424 |  |          } | 
|  |  | 425 |  |  | 
|  |  | 426 |  |          protected override IAsyncEnumerable<NATSMessage> CreateEnumerable( NATSSubscribeStatementInformation metadata ) | 
|  |  | 427 |  |          { | 
|  |  | 428 |  |             var protocol = this._protocol; | 
|  |  | 429 |  |             var originalAutoUnsub = metadata.AutoUnsubscribeAfter; | 
|  |  | 430 |  |             var hasAutoUnSub = originalAutoUnsub > 0; | 
|  |  | 431 |  |             var subj = metadata.Subject; | 
|  |  | 432 |  |             var queue = metadata.Queue; | 
|  |  | 433 |  |             var dynamicUnsubscribe = metadata.DynamicUnsubscription; | 
|  |  | 434 |  |  | 
|  |  | 435 |  |             return AsyncEnumerationFactory.CreateStatefulWrappingEnumerable( () => | 
|  |  | 436 |  |             { | 
|  |  | 437 |  |                Queue<NATSMessageImpl> messageQueue = null; | 
|  |  | 438 |  |                Int64 id = -1; | 
|  |  | 439 |  |                var currentAutoUnsub = originalAutoUnsub; | 
|  |  | 440 |  |                var dynamicallyUnsubscribed = false; | 
|  |  | 441 |  |                return AsyncEnumerationFactory.CreateWrappingStartInfo( | 
|  |  | 442 |  |                   async () => | 
|  |  | 443 |  |                   { | 
|  |  | 444 |  |                      if ( messageQueue == null ) | 
|  |  | 445 |  |                      { | 
|  |  | 446 |  |                         (id, messageQueue) = await protocol.WriteSubscribe( subj, queue, currentAutoUnsub ); | 
|  |  | 447 |  |                      } | 
|  |  | 448 |  |                      return !dynamicallyUnsubscribed && ( !hasAutoUnSub || currentAutoUnsub > 0 ? await protocol.Perform | 
|  |  | 449 |  |                   }, | 
|  |  | 450 |  |                   ( out Boolean success ) => | 
|  |  | 451 |  |                   { | 
|  |  | 452 |  |                      success = ( !hasAutoUnSub || currentAutoUnsub > 0 ) && messageQueue.Count > 0; | 
|  |  | 453 |  |                      if ( hasAutoUnSub && success ) | 
|  |  | 454 |  |                      { | 
|  |  | 455 |  |                         --currentAutoUnsub; | 
|  |  | 456 |  |                      } | 
|  |  | 457 |  |  | 
|  |  | 458 |  |                      var retVal = success ? messageQueue.Dequeue() : default; | 
|  |  | 459 |  |                      if ( success && ( dynamicUnsubscribe?.Invoke( retVal ) ?? false ) ) | 
|  |  | 460 |  |                      { | 
|  |  | 461 |  |                         success = false; | 
|  |  | 462 |  |                         dynamicallyUnsubscribed = true; | 
|  |  | 463 |  |                         retVal = default; | 
|  |  | 464 |  |                      } | 
|  |  | 465 |  |  | 
|  |  | 466 |  |                      return retVal; | 
|  |  | 467 |  |                   }, | 
|  |  | 468 |  |                   () => protocol.EnumerationEnded( id, hasAutoUnSub, currentAutoUnsub ) | 
|  |  | 469 |  |                   ); | 
|  |  | 470 |  |  | 
|  |  | 471 |  |             }, | 
|  |  | 472 |  |             AsyncEnumeration.Implementation.Provider.DefaultAsyncProvider.Instance | 
|  |  | 473 |  |             ); | 
|  |  | 474 |  |          } | 
|  |  | 475 |  |  | 
|  |  | 476 |  |          protected override NATSSubscribeStatementInformation GetInformationFromStatement( NATSSubscribeStatement statem | 
|  |  | 477 |  |          { | 
|  |  | 478 |  |             return (NATSSubscribeStatementInformation) statement?.NATSStatementInformation; | 
|  |  | 479 |  |          } | 
|  |  | 480 |  |  | 
|  |  | 481 |  |          protected override void ValidateStatementOrThrow( NATSSubscribeStatementInformation statement ) | 
|  |  | 482 |  |          { | 
|  |  | 483 |  |             ArgumentValidator.ValidateNotNull( nameof( statement ), statement ); | 
|  |  | 484 |  |          } | 
|  |  | 485 |  |  | 
|  |  | 486 |  |       } | 
|  |  | 487 |  |  | 
|  |  | 488 |  |       internal sealed class NATSPublishConnectionFunctionality : DefaultConnectionFunctionality<NATSPublishStatement, NA | 
|  |  | 489 |  |       { | 
|  |  | 490 |  |          private readonly ClientProtocol _protocol; | 
|  |  | 491 |  |  | 
|  |  | 492 |  |          public NATSPublishConnectionFunctionality( | 
|  |  | 493 |  |             NATSConnectionVendorFunctionalityImpl vendor, | 
|  |  | 494 |  |             ClientProtocol protocol | 
|  |  | 495 |  |             ) : base( vendor ) | 
|  |  | 496 |  |          { | 
|  |  | 497 |  |             this._protocol = ArgumentValidator.ValidateNotNull( nameof( protocol ), protocol ); | 
|  |  | 498 |  |          } | 
|  |  | 499 |  |  | 
|  |  | 500 |  |  | 
|  |  | 501 |  |  | 
|  |  | 502 |  |          protected override IAsyncEnumerable<NATSPublishCompleted> CreateEnumerable( NATSPublishStatementInformation met | 
|  |  | 503 |  |          { | 
|  |  | 504 |  |             const Int32 INITIAL = 0; | 
|  |  | 505 |  |             const Int32 STARTED = 1; | 
|  |  | 506 |  |             const Int32 PENDING_NEXT = 2; | 
|  |  | 507 |  |             var protocol = this._protocol; | 
|  |  | 508 |  |             var dpFactory = metadata.DataProducerFactory; | 
|  |  | 509 |  |  | 
|  |  | 510 |  |             return AsyncEnumerationFactory.CreateStatefulWrappingEnumerable( () => | 
|  |  | 511 |  |             { | 
|  |  | 512 |  |                var state = INITIAL; | 
|  |  | 513 |  |                TDataProducer dp = null; | 
|  |  | 514 |  |                return AsyncEnumerationFactory.CreateWrappingStartInfo( | 
|  |  | 515 |  |                   async () => | 
|  |  | 516 |  |                   { | 
|  |  | 517 |  |                      if ( state == INITIAL ) | 
|  |  | 518 |  |                      { | 
|  |  | 519 |  |                         dp = dpFactory?.Invoke(); | 
|  |  | 520 |  |                      } | 
|  |  | 521 |  |                      Interlocked.Exchange( ref state, STARTED ); | 
|  |  | 522 |  |                      var datas = dp == null ? default : await dp(); | 
|  |  | 523 |  |                      if ( datas != null ) | 
|  |  | 524 |  |                      { | 
|  |  | 525 |  |                         await protocol.WritePublish( datas ); | 
|  |  | 526 |  |                      } | 
|  |  | 527 |  |                      return datas != null; | 
|  |  | 528 |  |                   }, | 
|  |  | 529 |  |                   ( out Boolean success ) => | 
|  |  | 530 |  |                   { | 
|  |  | 531 |  |                      success = state == STARTED; | 
|  |  | 532 |  |                      if ( success ) | 
|  |  | 533 |  |                      { | 
|  |  | 534 |  |                         Interlocked.Exchange( ref state, PENDING_NEXT ); | 
|  |  | 535 |  |                      } | 
|  |  | 536 |  |                      return default( NATSPublishCompleted ); | 
|  |  | 537 |  |                   }, | 
|  |  | 538 |  |                   null | 
|  |  | 539 |  |                   ); | 
|  |  | 540 |  |             }, | 
|  |  | 541 |  |             AsyncEnumeration.Implementation.Provider.DefaultAsyncProvider.Instance | 
|  |  | 542 |  |             ); | 
|  |  | 543 |  |          } | 
|  |  | 544 |  |  | 
|  |  | 545 |  |          protected override NATSPublishStatementInformation GetInformationFromStatement( NATSPublishStatement statement  | 
|  |  | 546 |  |          { | 
|  |  | 547 |  |             return statement.NATSStatementInformation; | 
|  |  | 548 |  |          } | 
|  |  | 549 |  |  | 
|  |  | 550 |  |          protected override void ValidateStatementOrThrow( NATSPublishStatementInformation statement ) | 
|  |  | 551 |  |          { | 
|  |  | 552 |  |             ArgumentValidator.ValidateNotNull( nameof( statement ), statement ); | 
|  |  | 553 |  |          } | 
|  |  | 554 |  |       } | 
|  |  | 555 |  |  | 
|  |  | 556 |  |       internal sealed class NATSConnectionVendorFunctionalityImpl : NATSConnectionVendorFunctionality | 
|  |  | 557 |  |       { | 
|  |  | 558 |  |          public static NATSConnectionVendorFunctionalityImpl Instance { get; } = new NATSConnectionVendorFunctionalityIm | 
|  |  | 559 |  |  | 
|  |  | 560 |  |          private NATSConnectionVendorFunctionalityImpl() | 
|  |  | 561 |  |          { | 
|  |  | 562 |  |  | 
|  |  | 563 |  |          } | 
|  |  | 564 |  |  | 
|  |  | 565 |  |  | 
|  |  | 566 |  |          NATSSubscribeStatement ConnectionVendorFunctionality<NATSSubscribeStatement, String>.CreateStatementBuilder( St | 
|  |  | 567 |  |          { | 
|  |  | 568 |  |             var q = new Reference<String>(); | 
|  |  | 569 |  |             var a = new Reference<Int64>(); | 
|  |  | 570 |  |             var d = new Reference<Func<NATSMessage, Boolean>>(); | 
|  |  | 571 |  |             return new NATSSubscribeStatementImpl( | 
|  |  | 572 |  |                new NATSSubscribeStatementInformationImpl( subject, q, a, d ), | 
|  |  | 573 |  |                q, | 
|  |  | 574 |  |                a, | 
|  |  | 575 |  |                d | 
|  |  | 576 |  |                ); | 
|  |  | 577 |  |          } | 
|  |  | 578 |  |  | 
|  |  | 579 |  |          NATSPublishStatement ConnectionVendorFunctionality<NATSPublishStatement, TDataProducerFactory>.CreateStatementB | 
|  |  | 580 |  |          { | 
|  |  | 581 |  |             var dp = new Reference<TDataProducerFactory>() | 
|  |  | 582 |  |             { | 
|  |  | 583 |  |                Value = dataProducerFactory | 
|  |  | 584 |  |             }; | 
|  |  | 585 |  |  | 
|  |  | 586 |  |             return new NATSPublishStatementImpl( | 
|  |  | 587 |  |                new NATSPublishStatementInformationImpl( dp ), | 
|  |  | 588 |  |                dp | 
|  |  | 589 |  |                ); | 
|  |  | 590 |  |          } | 
|  |  | 591 |  |       } | 
|  |  | 592 |  |    } | 
|  |  | 593 |  |  | 
|  |  | 594 |  | } |