Summary

Class:CBAM.NATS.Implementation.NATSConnectionVendorFunctionalityImpl
Assembly:CBAM.NATS.Implementation
File(s):/repo-dir/contents/Source/Code/CBAM.NATS.Implementation/Connection.cs
Covered lines:20
Uncovered lines:0
Coverable lines:20
Total lines:594
Line coverage:100%

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
.ctor()101%0%
CBAM.Abstractions.ConnectionVendorFunctionality<CBAM.NATS.NATSSubscribeStatement,System.String>.CreateStatementBuilder(...)101%0%
CBAM.Abstractions.ConnectionVendorFunctionality<CBAM.NATS.NATSPublishStatement,System.Func<System.Func<System.Threading.Tasks.ValueTask<System.Collections.Generic.IEnumerable<CBAM.NATS.NATSPublishData>>>>>.CreateStatementBuilder(...)101%0%

File(s)

/repo-dir/contents/Source/Code/CBAM.NATS.Implementation/Connection.cs

#LineLine coverage
 1/*
 2 * Copyright 2018 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using AsyncEnumeration.Abstractions;
 19using AsyncEnumeration.Implementation.Enumerable;
 20using AsyncEnumeration.Observability;
 21using CBAM.Abstractions;
 22using CBAM.Abstractions.Implementation;
 23using CBAM.NATS;
 24using Newtonsoft.Json;
 25using Newtonsoft.Json.Linq;
 26using System;
 27using System.Collections.Concurrent;
 28using System.Collections.Generic;
 29using System.IO;
 30using System.Net.Sockets;
 31using System.Text;
 32using System.Threading;
 33using System.Threading.Tasks;
 34using UtilPack;
 35
 36using TDataProducerResult = System.Threading.Tasks.ValueTask<System.Collections.Generic.IEnumerable<CBAM.NATS.NATSPublis
 37
 38namespace CBAM.NATS
 39{
 40   using TDataProducer = Func<TDataProducerResult>;
 41
 42   namespace Implementation
 43   {
 44
 45      using TDataProducerFactory = Func<TDataProducer>;
 46
 47      internal sealed class NATSConnectionImpl : NATSConnection
 48      {
 49
 50         public sealed class NATSSubscribeConnectionImpl : ConnectionImpl<NATSSubscribeStatement, NATSSubscribeStatement
 51         {
 52            public NATSSubscribeConnectionImpl(
 53               NATSSubscribeConnectionFunctionality functionality
 54               ) : base( functionality )
 55            {
 56            }
 57         }
 58
 59         public sealed class NATSPublishConnectionImpl : ConnectionImpl<NATSPublishStatement, NATSPublishStatementInform
 60         {
 61            public NATSPublishConnectionImpl(
 62               NATSPublishConnectionFunctionality functionality
 63               ) : base( functionality )
 64            {
 65            }
 66         }
 67
 68         //private readonly Socket _socket;
 69         private readonly ClientProtocol _protocol;
 70         private readonly NATSSubscribeConnectionImpl _subscribe;
 71         private readonly NATSPublishConnectionImpl _publish;
 72
 73         public NATSConnectionImpl(
 74            //Socket socket,
 75            NATSConnectionVendorFunctionalityImpl vendorFunctionality,
 76            ClientProtocol protocol
 77            )
 78         {
 79            this.VendorFunctionality = ArgumentValidator.ValidateNotNull( nameof( vendorFunctionality ), vendorFunctiona
 80            //this._socket = ArgumentValidator.ValidateNotNull( nameof( socket ), socket );
 81            this._protocol = ArgumentValidator.ValidateNotNull( nameof( protocol ), protocol );
 82            this._subscribe = new NATSSubscribeConnectionImpl( new NATSSubscribeConnectionFunctionality( vendorFunctiona
 83            this._publish = new NATSPublishConnectionImpl( new NATSPublishConnectionFunctionality( vendorFunctionality, 
 84         }
 85
 86         public NATSConnectionVendorFunctionality VendorFunctionality { get; }
 87
 88         public Boolean DisableEnumerableObservability { get; set; }
 89
 90         event GenericEventHandler<EnumerationStartedEventArgs<NATSSubscribeStatementInformation>> AsyncEnumerationObser
 91         {
 92            add
 93            {
 94               this._subscribe.BeforeEnumerationStart += value;
 95            }
 96            remove
 97            {
 98               this._subscribe.BeforeEnumerationEnd -= value;
 99            }
 100         }
 101
 102         event GenericEventHandler<EnumerationStartedEventArgs> AsyncEnumerationObservation<NATSMessage>.BeforeEnumerati
 103         {
 104            add
 105            {
 106               this._subscribe.BeforeEnumerationStart += value;
 107            }
 108            remove
 109            {
 110               this._subscribe.BeforeEnumerationStart -= value;
 111            }
 112         }
 113
 114         event GenericEventHandler<EnumerationStartedEventArgs<NATSSubscribeStatementInformation>> AsyncEnumerationObser
 115         {
 116            add
 117            {
 118               this._subscribe.AfterEnumerationStart += value;
 119            }
 120            remove
 121            {
 122               this._subscribe.AfterEnumerationStart -= value;
 123            }
 124         }
 125
 126         event GenericEventHandler<EnumerationStartedEventArgs> AsyncEnumerationObservation<NATSMessage>.AfterEnumeratio
 127         {
 128            add
 129            {
 130               this._subscribe.AfterEnumerationStart += value;
 131            }
 132
 133            remove
 134            {
 135               this._subscribe.AfterEnumerationStart -= value;
 136            }
 137         }
 138
 139         event GenericEventHandler<EnumerationItemEventArgs<NATSMessage, NATSSubscribeStatementInformation>> AsyncEnumer
 140         {
 141            add
 142            {
 143               this._subscribe.AfterEnumerationItemEncountered += value;
 144            }
 145            remove
 146            {
 147               this._subscribe.AfterEnumerationItemEncountered -= value;
 148            }
 149         }
 150
 151         event GenericEventHandler<EnumerationItemEventArgs<NATSMessage>> AsyncEnumerationObservation<NATSMessage>.After
 152         {
 153            add
 154            {
 155               this._subscribe.AfterEnumerationItemEncountered += value;
 156            }
 157            remove
 158            {
 159               this._subscribe.AfterEnumerationItemEncountered -= value;
 160            }
 161         }
 162
 163         event GenericEventHandler<EnumerationEndedEventArgs<NATSSubscribeStatementInformation>> AsyncEnumerationObserva
 164         {
 165            add
 166            {
 167               this._subscribe.BeforeEnumerationEnd += value;
 168            }
 169            remove
 170            {
 171               this._subscribe.BeforeEnumerationEnd -= value;
 172            }
 173         }
 174
 175         event GenericEventHandler<EnumerationEndedEventArgs> AsyncEnumerationObservation<NATSMessage>.BeforeEnumeration
 176         {
 177            add
 178            {
 179               this._subscribe.BeforeEnumerationEnd += value;
 180            }
 181            remove
 182            {
 183               this._subscribe.BeforeEnumerationEnd -= value;
 184            }
 185         }
 186
 187         event GenericEventHandler<EnumerationEndedEventArgs<NATSSubscribeStatementInformation>> AsyncEnumerationObserva
 188         {
 189            add
 190            {
 191               this._subscribe.AfterEnumerationEnd += value;
 192            }
 193            remove
 194            {
 195               this._subscribe.AfterEnumerationEnd -= value;
 196            }
 197         }
 198
 199         event GenericEventHandler<EnumerationEndedEventArgs> AsyncEnumerationObservation<NATSMessage>.AfterEnumerationE
 200         {
 201            add
 202            {
 203               this._subscribe.AfterEnumerationEnd += value;
 204            }
 205            remove
 206            {
 207               this._subscribe.AfterEnumerationEnd -= value;
 208            }
 209         }
 210
 211
 212
 213
 214         event GenericEventHandler<EnumerationStartedEventArgs<NATSPublishStatementInformation>> AsyncEnumerationObserva
 215         {
 216            add
 217            {
 218               this._publish.BeforeEnumerationStart += value;
 219            }
 220            remove
 221            {
 222               this._publish.BeforeEnumerationStart -= value;
 223            }
 224         }
 225
 226         event GenericEventHandler<EnumerationStartedEventArgs> AsyncEnumerationObservation<NATSPublishCompleted>.Before
 227         {
 228            add
 229            {
 230               this._publish.BeforeEnumerationStart += value;
 231            }
 232            remove
 233            {
 234               this._publish.BeforeEnumerationStart -= value;
 235            }
 236         }
 237
 238         event GenericEventHandler<EnumerationStartedEventArgs<NATSPublishStatementInformation>> AsyncEnumerationObserva
 239         {
 240            add
 241            {
 242               this._publish.AfterEnumerationStart += value;
 243            }
 244            remove
 245            {
 246               this._publish.AfterEnumerationStart -= value;
 247            }
 248         }
 249
 250         event GenericEventHandler<EnumerationStartedEventArgs> AsyncEnumerationObservation<NATSPublishCompleted>.AfterE
 251         {
 252            add
 253            {
 254               this._publish.AfterEnumerationStart += value;
 255            }
 256            remove
 257            {
 258               this._publish.AfterEnumerationStart -= value;
 259            }
 260         }
 261
 262         event GenericEventHandler<EnumerationItemEventArgs<NATSPublishCompleted, NATSPublishStatementInformation>> Asyn
 263         {
 264            add
 265            {
 266               this._publish.AfterEnumerationItemEncountered += value;
 267            }
 268            remove
 269            {
 270               this._publish.AfterEnumerationItemEncountered -= value;
 271            }
 272         }
 273
 274         event GenericEventHandler<EnumerationItemEventArgs<NATSPublishCompleted>> AsyncEnumerationObservation<NATSPubli
 275         {
 276            add
 277            {
 278               this._publish.AfterEnumerationItemEncountered += value;
 279            }
 280            remove
 281            {
 282               this._publish.AfterEnumerationItemEncountered -= value;
 283            }
 284         }
 285
 286         event GenericEventHandler<EnumerationEndedEventArgs<NATSPublishStatementInformation>> AsyncEnumerationObservati
 287         {
 288            add
 289            {
 290               this._publish.BeforeEnumerationEnd += value;
 291            }
 292            remove
 293            {
 294               this._publish.BeforeEnumerationEnd -= value;
 295            }
 296         }
 297
 298         event GenericEventHandler<EnumerationEndedEventArgs> AsyncEnumerationObservation<NATSPublishCompleted>.BeforeEn
 299         {
 300            add
 301            {
 302               this._publish.BeforeEnumerationEnd += value;
 303            }
 304            remove
 305            {
 306               this._publish.BeforeEnumerationEnd -= value;
 307            }
 308         }
 309
 310         event GenericEventHandler<EnumerationEndedEventArgs<NATSPublishStatementInformation>> AsyncEnumerationObservati
 311         {
 312            add
 313            {
 314               this._publish.AfterEnumerationEnd += value;
 315            }
 316            remove
 317            {
 318               this._publish.AfterEnumerationEnd -= value;
 319            }
 320         }
 321
 322         event GenericEventHandler<EnumerationEndedEventArgs> AsyncEnumerationObservation<NATSPublishCompleted>.AfterEnu
 323         {
 324            add
 325            {
 326               this._publish.AfterEnumerationEnd += value;
 327            }
 328            remove
 329            {
 330               this._publish.AfterEnumerationEnd -= value;
 331            }
 332         }
 333
 334         public IAsyncEnumerable<NATSMessage> PrepareStatementForExecution( NATSSubscribeStatementInformation statement 
 335         {
 336            return this._subscribe.PrepareStatementForExecution( statement );
 337         }
 338
 339         public IAsyncEnumerable<NATSPublishCompleted> PrepareStatementForExecution( NATSPublishStatementInformation sta
 340         {
 341            return this._publish.PrepareStatementForExecution( statement );
 342         }
 343
 344         public async Task<NATSMessage> RequestAsync( String subject, Byte[] data, Int32 offset, Int32 count )
 345         {
 346            return await this._protocol.RequestAsync( subject, data, offset, count );
 347         }
 348
 349         public event GenericEventHandler<AfterSubscriptionSentArgs> AfterSubscriptionSent
 350         {
 351            add
 352            {
 353               this._protocol.AfterSubscriptionSent += value;
 354            }
 355            remove
 356            {
 357               this._protocol.AfterSubscriptionSent -= value;
 358            }
 359         }
 360
 361         public event GenericEventHandler<AfterPublishSentArgs> AfterPublishSent
 362         {
 363            add
 364            {
 365               this._protocol.AfterPublishSent += value;
 366            }
 367            remove
 368            {
 369               this._protocol.AfterPublishSent -= value;
 370            }
 371         }
 372
 373         //         internal CancellationTokenSource UsageStarted()
 374         //         {
 375         //            var retVal = new CancellationTokenSource();
 376         //#pragma warning disable 4014
 377         //            //this.RunReader( retVal.Token );
 378         //#pragma warning restore 4014
 379         //            return retVal;
 380         //         }
 381
 382         //internal void UsageEnded( CancellationTokenSource cancellationTokenSource )
 383         //{
 384         //   cancellationTokenSource.Cancel();
 385         //}
 386
 387         //         private async Task RunReader( CancellationToken token )
 388         //         {
 389         //            var socket = this._socket;
 390         //            var seenIDs = new HashSet<Int64>();
 391         //            while ( !token.IsCancellationRequested )
 392         //            {
 393         //               if ( socket.Available > 0 || socket.Poll( 1, SelectMode.SelectRead ) || socket.Available > 0 )
 394         //               {
 395         //                  await this._protocol.PerformRead( seenIDs );
 396         //               }
 397         //               else
 398         //               {
 399         //                  await
 400         //#if NET40
 401         //                     TaskEx
 402         //#else
 403         //                     Task
 404         //#endif
 405         //                     .Delay( 50 );
 406
 407         //               }
 408         //            }
 409         //         }
 410
 411      }
 412
 413      internal sealed class NATSSubscribeConnectionFunctionality : DefaultConnectionFunctionality<NATSSubscribeStatement
 414      {
 415
 416         private readonly ClientProtocol _protocol;
 417
 418         public NATSSubscribeConnectionFunctionality(
 419            NATSConnectionVendorFunctionalityImpl vendor,
 420            ClientProtocol protocol
 421            ) : base( vendor )
 422         {
 423            this._protocol = ArgumentValidator.ValidateNotNull( nameof( protocol ), protocol );
 424         }
 425
 426         protected override IAsyncEnumerable<NATSMessage> CreateEnumerable( NATSSubscribeStatementInformation metadata )
 427         {
 428            var protocol = this._protocol;
 429            var originalAutoUnsub = metadata.AutoUnsubscribeAfter;
 430            var hasAutoUnSub = originalAutoUnsub > 0;
 431            var subj = metadata.Subject;
 432            var queue = metadata.Queue;
 433            var dynamicUnsubscribe = metadata.DynamicUnsubscription;
 434
 435            return AsyncEnumerationFactory.CreateStatefulWrappingEnumerable( () =>
 436            {
 437               Queue<NATSMessageImpl> messageQueue = null;
 438               Int64 id = -1;
 439               var currentAutoUnsub = originalAutoUnsub;
 440               var dynamicallyUnsubscribed = false;
 441               return AsyncEnumerationFactory.CreateWrappingStartInfo(
 442                  async () =>
 443                  {
 444                     if ( messageQueue == null )
 445                     {
 446                        (id, messageQueue) = await protocol.WriteSubscribe( subj, queue, currentAutoUnsub );
 447                     }
 448                     return !dynamicallyUnsubscribed && ( !hasAutoUnSub || currentAutoUnsub > 0 ? await protocol.Perform
 449                  },
 450                  ( out Boolean success ) =>
 451                  {
 452                     success = ( !hasAutoUnSub || currentAutoUnsub > 0 ) && messageQueue.Count > 0;
 453                     if ( hasAutoUnSub && success )
 454                     {
 455                        --currentAutoUnsub;
 456                     }
 457
 458                     var retVal = success ? messageQueue.Dequeue() : default;
 459                     if ( success && ( dynamicUnsubscribe?.Invoke( retVal ) ?? false ) )
 460                     {
 461                        success = false;
 462                        dynamicallyUnsubscribed = true;
 463                        retVal = default;
 464                     }
 465
 466                     return retVal;
 467                  },
 468                  () => protocol.EnumerationEnded( id, hasAutoUnSub, currentAutoUnsub )
 469                  );
 470
 471            },
 472            AsyncEnumeration.Implementation.Provider.DefaultAsyncProvider.Instance
 473            );
 474         }
 475
 476         protected override NATSSubscribeStatementInformation GetInformationFromStatement( NATSSubscribeStatement statem
 477         {
 478            return (NATSSubscribeStatementInformation) statement?.NATSStatementInformation;
 479         }
 480
 481         protected override void ValidateStatementOrThrow( NATSSubscribeStatementInformation statement )
 482         {
 483            ArgumentValidator.ValidateNotNull( nameof( statement ), statement );
 484         }
 485
 486      }
 487
 488      internal sealed class NATSPublishConnectionFunctionality : DefaultConnectionFunctionality<NATSPublishStatement, NA
 489      {
 490         private readonly ClientProtocol _protocol;
 491
 492         public NATSPublishConnectionFunctionality(
 493            NATSConnectionVendorFunctionalityImpl vendor,
 494            ClientProtocol protocol
 495            ) : base( vendor )
 496         {
 497            this._protocol = ArgumentValidator.ValidateNotNull( nameof( protocol ), protocol );
 498         }
 499
 500
 501
 502         protected override IAsyncEnumerable<NATSPublishCompleted> CreateEnumerable( NATSPublishStatementInformation met
 503         {
 504            const Int32 INITIAL = 0;
 505            const Int32 STARTED = 1;
 506            const Int32 PENDING_NEXT = 2;
 507            var protocol = this._protocol;
 508            var dpFactory = metadata.DataProducerFactory;
 509
 510            return AsyncEnumerationFactory.CreateStatefulWrappingEnumerable( () =>
 511            {
 512               var state = INITIAL;
 513               TDataProducer dp = null;
 514               return AsyncEnumerationFactory.CreateWrappingStartInfo(
 515                  async () =>
 516                  {
 517                     if ( state == INITIAL )
 518                     {
 519                        dp = dpFactory?.Invoke();
 520                     }
 521                     Interlocked.Exchange( ref state, STARTED );
 522                     var datas = dp == null ? default : await dp();
 523                     if ( datas != null )
 524                     {
 525                        await protocol.WritePublish( datas );
 526                     }
 527                     return datas != null;
 528                  },
 529                  ( out Boolean success ) =>
 530                  {
 531                     success = state == STARTED;
 532                     if ( success )
 533                     {
 534                        Interlocked.Exchange( ref state, PENDING_NEXT );
 535                     }
 536                     return default( NATSPublishCompleted );
 537                  },
 538                  null
 539                  );
 540            },
 541            AsyncEnumeration.Implementation.Provider.DefaultAsyncProvider.Instance
 542            );
 543         }
 544
 545         protected override NATSPublishStatementInformation GetInformationFromStatement( NATSPublishStatement statement 
 546         {
 547            return statement.NATSStatementInformation;
 548         }
 549
 550         protected override void ValidateStatementOrThrow( NATSPublishStatementInformation statement )
 551         {
 552            ArgumentValidator.ValidateNotNull( nameof( statement ), statement );
 553         }
 554      }
 555
 556      internal sealed class NATSConnectionVendorFunctionalityImpl : NATSConnectionVendorFunctionality
 557      {
 5558         public static NATSConnectionVendorFunctionalityImpl Instance { get; } = new NATSConnectionVendorFunctionalityIm
 559
 1560         private NATSConnectionVendorFunctionalityImpl()
 561         {
 562
 1563         }
 564
 565
 566         NATSSubscribeStatement ConnectionVendorFunctionality<NATSSubscribeStatement, String>.CreateStatementBuilder( St
 567         {
 2568            var q = new Reference<String>();
 2569            var a = new Reference<Int64>();
 2570            var d = new Reference<Func<NATSMessage, Boolean>>();
 2571            return new NATSSubscribeStatementImpl(
 2572               new NATSSubscribeStatementInformationImpl( subject, q, a, d ),
 2573               q,
 2574               a,
 2575               d
 2576               );
 577         }
 578
 579         NATSPublishStatement ConnectionVendorFunctionality<NATSPublishStatement, TDataProducerFactory>.CreateStatementB
 580         {
 2581            var dp = new Reference<TDataProducerFactory>()
 2582            {
 2583               Value = dataProducerFactory
 2584            };
 585
 2586            return new NATSPublishStatementImpl(
 2587               new NATSPublishStatementInformationImpl( dp ),
 2588               dp
 2589               );
 590         }
 591      }
 592   }
 593
 594}