Summary

Class:CBAM.NATS.Implementation.ClientProtocol
Assembly:CBAM.NATS.Implementation
File(s):/repo-dir/contents/Source/Code/CBAM.NATS.Implementation/Protocol.Client.cs
Covered lines:292
Uncovered lines:46
Coverable lines:338
Total lines:835
Line coverage:86.3%
Branch coverage:87.7%

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
.ctor(...)801%1%
.ctor()101%0%
.ctor()101%0%
.ctor()101%0%
.ctor(...)401%0.5%
.ctor(...)101%0%
.ctor(...)201%0.5%
WriteSubscribe(...)101%0%
WriteSubscribe()1700.853%1%
EnumerationEnded(...)400.5%0.75%
WriteUnsubscribe()901%1%
PerformUnsubscribe()701%1%
WritePublish()2001%1%
WritePong()1200%0%
PerformReadNext()2100.944%1%
RequestAsync()1801%1%
PerformRead()3800.709%1%
SetPreReadLength(...)200.818%0.5%
DeserializeInfoMessage(...)101%0%
SerializeConnectMessage(...)101%0%
InitializeNewConnection()301%1%

File(s)

/repo-dir/contents/Source/Code/CBAM.NATS.Implementation/Protocol.Client.cs

#LineLine coverage
 1/*
 2 * Copyright 2018 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using Newtonsoft.Json;
 19using Newtonsoft.Json.Linq;
 20using System;
 21using System.Collections.Concurrent;
 22using System.Collections.Generic;
 23using System.IO;
 24using System.Linq;
 25using System.Text;
 26using System.Threading;
 27using System.Threading.Tasks;
 28using UtilPack;
 29
 30namespace CBAM.NATS.Implementation
 31{
 32   using TStoredState = Queue<NATSMessageImpl>;
 33
 34
 35   internal static class ClientProtocolConsts
 36   {
 37
 38      public const Byte CR = 0x0D;
 39      public const Byte LF = 0x0A;
 40      public const Byte SPACE = 0x20;
 41      public const Byte TAB = 0x09;
 42      public static readonly Byte[] CRLF = new Byte[] { CR, LF };
 43      public static readonly Byte[] LF_ARRAY = new Byte[] { CRLF[1] };
 44      public static readonly Byte[] PONG = new Byte[] { 0x50, 0x4F, 0x4E, 0x47, 0x0D, LF };
 45
 46      public static readonly Byte[] SUB_PREFIX = new Byte[] { 0x53, 0x55, 0x42, SPACE };
 47      public static readonly Byte[] PUB_PREFIX = new Byte[] { 0x50, 0x55, 0x42, SPACE };
 48      public static readonly Byte[] UNSUB_PREFIX = new Byte[] { 0x55, 0x4E, 0x53, 0x55, 0x42, SPACE };
 49      public static readonly Byte[] CONNECT_PREFIX = new Byte[] { 0x43, 0x4F, 0x4E, 0x4E, 0x45, 0x43, 0x54, SPACE };
 50
 51      public const Int32 READ_COUNT = 0x10000;
 52
 53      public const Int32 UPPERCASE_MASK_FULL = 0x5F5F5F5F;
 54      public const Int32 INFO_INT = 0x494E464F;
 55
 56
 57      public static class Info
 58      {
 59         public const String SERVER_ID = "server_id";
 60         public const String VERSION = "version";
 61         public const String VERSION_GO = "go";
 62         public const String HOST = "host";
 63         public const String PORT = "port";
 64         public const String AUTH_REQUIRED = "auth_required";
 65         public const String SSL_REQUIRED = "ssl_required";
 66         public const String MAX_PAYLOAD = "max_payload";
 67         public const String CONNECT_URLS = "connect_urls";
 68      }
 69
 70      public static class Connect
 71      {
 72         public const String VERBOSE = "verbose";
 73         public const String PEDANTIC = "pedantic";
 74         public const String SSL_REQUIRED = "ssl_required";
 75         public const String AUTH_TOKEN = "auth_token";
 76         public const String USER = "user";
 77         public const String PASSWORD = "pass";
 78         public const String NAME = "name";
 79         public const String LANGAUGE = "lang";
 80         public const String VERSION = "version";
 81         public const String PROTOCOL = "protocol";
 82      }
 83
 84   }
 85
 86
 87   internal sealed class ServerInformation
 88   {
 89      [JsonProperty( ClientProtocolConsts.Info.SERVER_ID )]
 90      public String ServerID { get; set; }
 91
 92      [JsonProperty( ClientProtocolConsts.Info.VERSION )]
 93      public String ServerVersion { get; set; }
 94
 95      [JsonProperty( ClientProtocolConsts.Info.VERSION_GO )]
 96      public String GoVersion { get; set; }
 97
 98      [JsonProperty( ClientProtocolConsts.Info.HOST )]
 99      public String Host { get; set; }
 100
 101      [JsonProperty( ClientProtocolConsts.Info.PORT )]
 102      public Int32 Port { get; set; }
 103
 104      [JsonProperty( ClientProtocolConsts.Info.AUTH_REQUIRED )]
 105      public Boolean AuthenticationRequired { get; set; }
 106
 107      [JsonProperty( ClientProtocolConsts.Info.SSL_REQUIRED )]
 108      public Boolean SSLRequired { get; set; }
 109
 110      [JsonProperty( ClientProtocolConsts.Info.MAX_PAYLOAD )]
 111      public Int32 MaxPayload { get; set; }
 112
 113      [JsonProperty( ClientProtocolConsts.Info.CONNECT_URLS )]
 114      public String[] ConnectionURLs { get; set; }
 115   }
 116
 117   internal sealed class ClientInformation
 118   {
 119      [JsonProperty( ClientProtocolConsts.Connect.VERBOSE )]
 120      public Boolean IsVerbose { get; set; }
 121
 122      [JsonProperty( ClientProtocolConsts.Connect.PEDANTIC )]
 123      public Boolean IsPedantic { get; set; }
 124
 125      [JsonProperty( ClientProtocolConsts.Connect.SSL_REQUIRED )]
 126      public Boolean SSLRequired { get; set; }
 127
 128      [JsonProperty( ClientProtocolConsts.Connect.AUTH_TOKEN, NullValueHandling = NullValueHandling.Ignore )]
 129      public String AuthenticationToken { get; set; }
 130
 131      [JsonProperty( ClientProtocolConsts.Connect.USER, NullValueHandling = NullValueHandling.Ignore )]
 132      public String Username { get; set; }
 133
 134      [JsonProperty( ClientProtocolConsts.Connect.PASSWORD, NullValueHandling = NullValueHandling.Ignore )]
 135      public String Password { get; set; }
 136
 137      [JsonProperty( ClientProtocolConsts.Connect.NAME, NullValueHandling = NullValueHandling.Ignore )]
 138      public String ClientName { get; set; }
 139
 140      [JsonProperty( ClientProtocolConsts.Connect.LANGAUGE, NullValueHandling = NullValueHandling.Ignore )]
 141      public String ClientLanguage { get; set; }
 142
 143      [JsonProperty( ClientProtocolConsts.Connect.VERSION, NullValueHandling = NullValueHandling.Ignore )]
 144      public String ClientVersion { get; set; }
 145
 146      [JsonProperty( ClientProtocolConsts.Connect.PROTOCOL, DefaultValueHandling = DefaultValueHandling.Ignore )]
 147      public Int32 ProtocolVersion { get; set; }
 148
 149   }
 150
 151
 152   internal sealed class ClientProtocol : NATSConnectionObservability
 153   {
 154      private sealed class SubscriptionState
 155      {
 3156         public SubscriptionState(
 3157            String subject,
 3158            Int64 subscriptionID,
 3159            Boolean isGlobal
 3160            )
 161         {
 3162            this.SubscriptionID = subscriptionID;
 3163            this.IsGlobal = isGlobal;
 3164            this.MessageQueue = isGlobal ? null : new Queue<NATSMessageImpl>();
 3165            this.CachedMessage = isGlobal || subject.IndexOf( "*" ) >= 0 ? null : new NATSMessageImpl( subject, subscrip
 3166            this.DataBuffer = isGlobal ? null : new ResizableArray<Byte>();
 3167            this.ByteArrayPool = new LocklessInstancePoolForClassesNoHeapAllocations<InstanceHolder<ResizableArray<Byte>
 3168            this.RentedByteArrays = new List<InstanceHolder<ResizableArray<Byte>>>();
 3169         }
 170
 0171         public Int64 SubscriptionID { get; }
 172
 9173         public Queue<NATSMessageImpl> MessageQueue { get; }
 174
 0175         public NATSMessageImpl CachedMessage { get; }
 176
 0177         public ResizableArray<Byte> DataBuffer { get; }
 178
 3179         public Boolean IsGlobal { get; }
 180
 5181         public LocklessInstancePoolForClassesNoHeapAllocations<InstanceHolder<ResizableArray<Byte>>> ByteArrayPool { ge
 182
 7183         public List<InstanceHolder<ResizableArray<Byte>>> RentedByteArrays { get; }
 184
 185      }
 186
 187      public abstract class IOState
 188      {
 189
 8190         public IOState()
 191         {
 8192            this.Lock = new AsyncLock();
 8193            this.Buffer = new ResizableArray<Byte>( 0x100 );
 8194         }
 195
 26196         public ResizableArray<Byte> Buffer { get; }
 197
 11198         public AsyncLock Lock { get; }
 199      }
 200
 201      public sealed class WriteState : IOState
 202      {
 203         public WriteState(
 4204            ) : base()
 205         {
 4206         }
 207      }
 208
 209      public sealed class ReadState : IOState
 210      {
 211         public ReadState(
 4212            ) : base()
 213         {
 4214            this.MessageSpaceIndices = new Int32[3];
 4215            this.BufferAdvanceState = new BufferAdvanceState();
 4216         }
 217
 14218         public BufferAdvanceState BufferAdvanceState { get; }
 219
 3220         public Int32[] MessageSpaceIndices { get; }
 221      }
 222
 223      public sealed class ClientProtocolIOState
 224      {
 225
 4226         public ClientProtocolIOState(
 4227            Stream stream,
 4228            BinaryStringPool stringPool,
 4229            IEncodingInfo encoding,
 4230            WriteState writeState,
 4231            ReadState readState
 4232            )
 233         {
 4234            this.Stream = ArgumentValidator.ValidateNotNull( nameof( stream ), stream );
 4235            this.StringPool = ArgumentValidator.ValidateNotNull( nameof( stringPool ), stringPool );
 4236            this.Encoding = ArgumentValidator.ValidateNotNull( nameof( encoding ), encoding );
 4237            this.WriteState = writeState ?? new WriteState();
 4238            this.ReadState = readState ?? new ReadState();
 4239         }
 240
 11241         public WriteState WriteState { get; }
 242
 6243         public ReadState ReadState { get; }
 244
 24245         public Stream Stream { get; }
 246
 3247         public BinaryStringPool StringPool { get; }
 248
 16249         public IEncodingInfo Encoding { get; }
 250      }
 251
 252      public sealed class GlobalSubscriptionEventArgs
 253      {
 1254         public GlobalSubscriptionEventArgs( NATSMessageImpl message )
 255         {
 1256            this.Message = ArgumentValidator.ValidateNotNull( nameof( message ), message );
 1257         }
 2258         public NATSMessageImpl Message { get; }
 259      }
 260
 261
 262      private readonly ClientProtocolIOState _state;
 263      private readonly ServerInformation _serverParameters;
 264      private readonly Byte[] _globalSubscriptionNameBytes;
 265      private readonly ConcurrentDictionary<Int64, SubscriptionState> _subscriptionStates;
 266      private readonly AsyncLazy<Int64> _globalSubscriptionID;
 267
 268      private Int64 _currentID;
 269      private Int64 _globalSubscriptionSuffix;
 270
 4271      public ClientProtocol(
 4272        ClientProtocolIOState state,
 4273        ServerInformation serverParameters,
 4274        String globalSubscriptionName = null
 4275      )
 276      {
 4277         this._state = ArgumentValidator.ValidateNotNull( nameof( state ), state );
 4278         this._subscriptionStates = new ConcurrentDictionary<Int64, SubscriptionState>();
 4279         this._serverParameters = ArgumentValidator.ValidateNotNull( nameof( serverParameters ), serverParameters );
 4280         this._currentID = 0;
 4281         this.GlobalSubscriptionPrefix = ( String.IsNullOrEmpty( globalSubscriptionName ) ? Guid.NewGuid().ToString( "N"
 4282         this._globalSubscriptionNameBytes = state.Encoding.Encoding.GetBytes( this.GlobalSubscriptionPrefix );
 5283         this._globalSubscriptionID = new AsyncLazy<Int64>( async () => ( await this.WriteSubscribe( this.GlobalSubscrip
 4284      }
 285
 286
 0287      public Boolean CanBeReturnedToPool => this._subscriptionStates.Count <= 0 || this._subscriptionStates.Values.All( 
 288
 4289      public Stream Stream => this._state.Stream;
 290
 6291      public String GlobalSubscriptionPrefix { get; }
 292
 293
 294      public event GenericEventHandler<GlobalSubscriptionEventArgs> GlobalSubscriptionMessageReceived;
 295      public event GenericEventHandler<AfterSubscriptionSentArgs> AfterSubscriptionSent;
 296      public event GenericEventHandler<AfterPublishSentArgs> AfterPublishSent;
 297
 298      public Task<(Int64, TStoredState)> WriteSubscribe(
 299         String subject,
 300         String queue,
 301         Int64 autoUnsub
 2302         ) => this.WriteSubscribe( subject, queue, autoUnsub, false );
 303
 304      private async Task<(Int64, TStoredState)> WriteSubscribe(
 305         String subject,
 306         String queue,
 307         Int64 autoUnsub,
 308         Boolean isGlobal
 309         )
 310      {
 3311         var state = this._state;
 3312         var wState = state.WriteState;
 3313         var id = Interlocked.Increment( ref this._currentID );
 314
 3315         using ( await wState.Lock.LockAsync() )
 316         {
 3317            var buffer = wState.Buffer;
 3318            var encoding = state.Encoding;
 3319            var idx = 0;
 320            // Write 'SUB <subject> [queue group ]<sid>\r\n'
 3321            var idSize = encoding.GetTextualIntegerRepresentationSize( id );
 3322            if ( !String.IsNullOrEmpty( queue ) && queue.ContainsNonASCIICharacters( NATSStatementInformationImpl.IsInva
 323            {
 0324               throw new InvalidOperationException( "Invalid queue name: " + queue );
 325            }
 3326            var msgSize = 7 + subject.Length + ( String.IsNullOrEmpty( queue ) ? 0 : ( queue.Length + 1 ) ) + idSize;
 3327            var array = wState.Buffer.SetCapacityAndReturnArray( msgSize );
 3328            array
 3329               .WriteASCIIString( ref idx, ClientProtocolConsts.SUB_PREFIX )
 3330               .WriteASCIIString( ref idx, subject, false )
 3331               .WriteASCIIString( ref idx, ClientProtocolConsts.SPACE );
 332
 3333            if ( !String.IsNullOrEmpty( queue ) )
 334            {
 0335               array
 0336                  .WriteASCIIString( ref idx, queue, false )
 0337                  .WriteASCIIString( ref idx, ClientProtocolConsts.SPACE );
 338            }
 339
 3340            encoding.WriteIntegerTextual( array, ref idx, id, idSize );
 3341            array.WriteASCIIString( ref idx, ClientProtocolConsts.CRLF );
 342            System.Diagnostics.Debug.Assert( idx == msgSize );
 343
 3344            await state.Stream.WriteAsync( array, 0, msgSize, default );
 3345            if ( !isGlobal )
 346            {
 2347               this.AfterSubscriptionSent?.InvokeAllEventHandlers( new AfterSubscriptionSentArgs( subject, queue, autoUn
 348            }
 3349            if ( autoUnsub > 0 )
 350            {
 1351               await this.PerformUnsubscribe( id, autoUnsub );
 352            }
 3353            await state.Stream.FlushAsync( default );
 354
 3355         }
 356
 3357         var retVal = new SubscriptionState( subject, id, isGlobal );
 3358         if ( !this._subscriptionStates.TryAdd( id, retVal ) )
 359         {
 0360            throw new Exception( "This should not be possible." );
 361         }
 362
 3363         return (id, retVal.MessageQueue);
 3364      }
 365
 366      public Task EnumerationEnded(
 367         Int64 id,
 368         Boolean hasAutoUnSub,
 369         Int64 currentAutoUnsub
 370
 371         )
 372      {
 373         Task retVal;
 2374         if ( hasAutoUnSub && currentAutoUnsub < 0 )
 375         {
 0376            this._subscriptionStates.TryRemove( id, out var ignored );
 0377            retVal = TaskUtils.CompletedTask;
 0378         }
 379         else
 380         {
 2381            retVal = this.WriteUnsubscribe( id, 0 );
 382         }
 383
 2384         return retVal;
 385      }
 386
 387      public async Task WriteUnsubscribe(
 388         Int64 id,
 389         Int64 autoUnsubscribe
 390         )
 391      {
 2392         if ( autoUnsubscribe <= 0 )
 393         {
 2394            this._subscriptionStates.TryRemove( id, out var ignored );
 395         }
 396
 2397         var state = this._state;
 2398         var wState = state.WriteState;
 2399         using ( await wState.Lock.LockAsync() )
 400         {
 2401            await this.PerformUnsubscribe( id, autoUnsubscribe );
 2402            await state.Stream.FlushAsync( default );
 2403         }
 404
 2405      }
 406
 407      private async Task PerformUnsubscribe(
 408         Int64 id,
 409         Int64 autoUnsubscribe
 410         )
 411      {
 3412         var state = this._state;
 3413         var wState = state.WriteState;
 3414         var encoding = state.Encoding;
 3415         var idSize = encoding.GetTextualIntegerRepresentationSize( id );
 3416         var autoSize = autoUnsubscribe > 0 ? encoding.GetTextualIntegerRepresentationSize( autoUnsubscribe ) : 0;
 3417         var msgSize = 8 + idSize + ( autoSize > 0 ? ( autoSize + 1 ) : 0 );
 3418         var array = wState.Buffer.SetCapacityAndReturnArray( msgSize );
 3419         var idx = 0;
 3420         array.WriteASCIIString( ref idx, ClientProtocolConsts.UNSUB_PREFIX );
 3421         encoding.WriteIntegerTextual( array, ref idx, id, idSize );
 3422         if ( autoSize > 0 )
 423         {
 1424            array.WriteASCIIString( ref idx, ClientProtocolConsts.SPACE );
 1425            encoding.WriteIntegerTextual( array, ref idx, autoUnsubscribe, autoSize );
 426         }
 3427         array.WriteASCIIString( ref idx, ClientProtocolConsts.CRLF );
 428         System.Diagnostics.Debug.Assert( idx == msgSize );
 3429         await state.Stream.WriteAsync( array, 0, msgSize, default );
 3430      }
 431
 432      public async Task WritePublish(
 433         IEnumerable<NATSPublishData> datas
 434         )
 435      {
 3436         var state = this._state;
 3437         var wState = state.WriteState;
 3438         using ( await wState.Lock.LockAsync() )
 439         {
 3440            var buffer = wState.Buffer;
 3441            var encoding = state.Encoding;
 442
 12443            foreach ( var pData in datas )
 444            {
 3445               var subject = pData.Subject;
 3446               if ( !String.IsNullOrEmpty( subject ) )
 447               {
 3448                  var count = pData.Count;
 3449                  var reply = pData.ReplySubject;
 450
 3451                  var dataMsgSize = encoding.GetTextualIntegerRepresentationSize( count );
 3452                  var msgSize = 9 + subject.Length + ( String.IsNullOrEmpty( reply ) ? 0 : ( reply.Length + 1 ) ) + data
 3453                  var array = wState.Buffer.SetCapacityAndReturnArray( msgSize );
 454
 3455                  var idx = 0;
 3456                  array
 3457                     .WriteASCIIString( ref idx, ClientProtocolConsts.PUB_PREFIX )
 3458                     .WriteASCIIString( ref idx, subject, false )
 3459                     .WriteASCIIString( ref idx, ClientProtocolConsts.SPACE );
 3460                  if ( !String.IsNullOrEmpty( reply ) )
 461                  {
 1462                     array
 1463                        .WriteASCIIString( ref idx, reply, false )
 1464                        .WriteASCIIString( ref idx, ClientProtocolConsts.SPACE );
 465                  }
 3466                  encoding.WriteIntegerTextual( array, ref idx, count, dataMsgSize );
 3467                  array.WriteASCIIString( ref idx, ClientProtocolConsts.CRLF );
 468
 3469                  if ( count > 0 )
 470                  {
 3471                     Array.Copy( pData.Data, pData.Offset, array, idx, count );
 3472                     idx += count;
 473                  }
 474
 3475                  array.WriteASCIIString( ref idx, ClientProtocolConsts.CRLF );
 476
 477                  System.Diagnostics.Debug.Assert( idx == msgSize );
 3478                  await state.Stream.WriteAsync( array, 0, msgSize, default );
 479
 3480                  this.AfterPublishSent?.InvokeAllEventHandlers( new AfterPublishSentArgs( subject, reply ), throwExcept
 3481               }
 3482            }
 483
 3484            await state.Stream.FlushAsync( default );
 3485         }
 486
 3487      }
 488
 489      private async Task WritePong()
 490      {
 0491         var state = this._state;
 0492         using ( await state.WriteState.Lock.LockAsync() )
 493         {
 0494            await state.Stream.WriteAsync( ClientProtocolConsts.PONG, 0, 6, default );
 0495            await state.Stream.FlushAsync( default );
 0496         }
 0497      }
 498
 499      public async ValueTask<Boolean> PerformReadNext(
 500         Int64 subscriptionID
 501         )
 502      {
 2503         if ( this._subscriptionStates.TryGetValue( subscriptionID, out var subState ) )
 504         {
 2505            var pool = subState.ByteArrayPool;
 4506            foreach ( var rentedByteArray in subState.RentedByteArrays )
 507            {
 0508               pool.ReturnInstance( rentedByteArray );
 509            }
 510
 2511            subState.RentedByteArrays.Clear();
 2512            var queue = subState.MessageQueue;
 4513            while ( queue.Count <= 0 )
 514            {
 2515               var rLock = this._state.ReadState.Lock;
 2516               if ( queue.Count <= 0 )
 517               {
 2518                  var lockScope = await rLock.TryLockAsync( TimeSpan.FromMilliseconds( 100 ) );
 2519                  if ( lockScope.HasValue )
 520                  {
 2521                     using ( lockScope.Value )
 522                     {
 2523                        if ( queue.Count <= 0 )
 524                        {
 2525                           await this.PerformRead();
 526                        }
 2527                     }
 528                  }
 529               }
 530            }
 2531         }
 532
 2533         return ( subState?.MessageQueue?.Count ?? 0 ) > 0;
 534
 2535      }
 536
 537      public async Task<NATSMessage> RequestAsync( String subject, Byte[] data, Int32 offset, Int32 count )
 538      {
 1539         await this._globalSubscriptionID;
 540
 1541         var replyTo = this.GlobalSubscriptionPrefix + Interlocked.Increment( ref this._globalSubscriptionSuffix );
 1542         NATSMessage receivedMessage = null;
 543         void HandleGlobalSubEvent( GlobalSubscriptionEventArgs args )
 544         {
 1545            if ( String.Equals( args.Message.Subject, replyTo ) )
 546            {
 1547               Interlocked.Exchange( ref receivedMessage, args.Message );
 548            }
 1549         };
 550
 1551         this.GlobalSubscriptionMessageReceived += HandleGlobalSubEvent;
 2552         using ( new UsingHelper( () => this.GlobalSubscriptionMessageReceived -= HandleGlobalSubEvent ) )
 553         {
 1554            await this.WritePublish(
 1555               new NATSPublishData( subject, data, offset, count, replyTo ).Singleton()
 1556               );
 1557            var rLock = this._state.ReadState.Lock;
 558            do
 559            {
 1560               var lockScope = await rLock.TryLockAsync( TimeSpan.FromMilliseconds( 100 ) );
 1561               if ( lockScope.HasValue )
 562               {
 1563                  using ( lockScope )
 564                  {
 1565                     if ( receivedMessage == null )
 566                     {
 1567                        await this.PerformRead();
 568                     }
 1569                  }
 570               }
 1571            } while ( receivedMessage == null );
 572
 1573         }
 574
 1575         return receivedMessage;
 1576      }
 577
 578
 579      private async Task PerformRead()
 580      {
 3581         var states = this._subscriptionStates;
 582         //const Int32 MIN_MESSAGE_SIZE = 5;
 583
 3584         var state = this._state;
 3585         var rState = state.ReadState;
 3586         var stream = state.Stream;
 3587         var buffer = rState.Buffer;
 3588         var encodingInfo = state.Encoding;
 3589         var stringPool = state.StringPool;
 590
 3591         var advanceState = rState.BufferAdvanceState;
 592
 3593         await stream.ReadUntilMaybeAsync( buffer, advanceState, ClientProtocolConsts.CRLF, ClientProtocolConsts.READ_CO
 3594         var crIdx = advanceState.BufferOffset;
 6595         while ( crIdx >= 0 )
 596         {
 3597            var array = buffer.Array;
 598            // First byte will be integer's uppermost byte, second byte second uppermost, etc
 3599            var idx = 0;
 3600            var msgHeader = array.ReadInt32BEFromBytes( ref idx );
 3601            var additionalByte = array[idx++];
 602            // At the end of the following switch statement, advanceState.BufferOffset should point to the CR byte of th
 603            // Examine first byte
 604            // x & 0x5F is to make ASCII lowercase letters (a-z) into uppercase letters (A-Z)
 3605            switch ( msgHeader & 0xFF000000 )
 606            {
 607               case 0x2B000000: // 2B = '+'
 0608                  if ( ( msgHeader & 0x005F5FFF ) == 0x004F4B0D && additionalByte == 0x0A ) // +OK\r\n
 609                  {
 610                     // OK -message -> ignore
 611                  }
 612                  else
 613                  {
 0614                     throw new Exception( "Protocol error" );
 615                  }
 616                  break;
 617               case 0x2D000000: // 2D = '-'
 0618                  if ( ( msgHeader & 0x005F5F5F ) == 0x00455252 && ( additionalByte == ClientProtocolConsts.SPACE || add
 619                  {
 620                     // -ERR<space/tab> -message, read textual error message
 621                     // TODO close connection on errors that are fatal
 0622                     throw new Exception( "Protocol error: " + stringPool.GetString( buffer.Array, idx, advanceState.Buf
 623                  }
 624                  else
 625                  {
 0626                     throw new Exception( "Protocol error" );
 627                  }
 628               default:
 3629                  if ( ( msgHeader & 0x5F5F5F00 ) == 0x4D534700 ) // MSG
 630                  {
 3631                     additionalByte = (Byte) ( msgHeader & 0x000000FF );
 3632                     if ( additionalByte == ClientProtocolConsts.SPACE || additionalByte == ClientProtocolConsts.TAB )
 633                     {
 634                        // Read the rest of the header
 3635                        array = buffer.Array;
 636
 637                        // Count spaces/tabs (TODO make this treat multiple consecutive spaces as one)
 3638                        var spacesSeen = 0;
 3639                        var spaceIndices = rState.MessageSpaceIndices;
 3640                        Array.Clear( spaceIndices, 0, spaceIndices.Length );
 3641                        var end = advanceState.BufferOffset;
 214642                        for ( var i = idx; i < end; ++i )
 643                        {
 104644                           if ( array[i] == ClientProtocolConsts.SPACE || array[i] == ClientProtocolConsts.TAB )
 645                           {
 7646                              var oldValue = spaceIndices[spacesSeen];
 7647                              spaceIndices[spacesSeen] = i;
 7648                              if ( oldValue < i - 1 )
 649                              {
 7650                                 ++spacesSeen;
 651                              }
 652                           }
 653                        }
 3654                        if ( spacesSeen < 2 || spacesSeen > 3 )
 655                        {
 0656                           throw new Exception( "Protocol error" );
 657                        }
 658
 659                        // MSG <subject> <sid> [reply-to] <#bytes>\r\n[payload]\r\n
 3660                        var subjStart = idx - 1;
 3661                        var subjLen = spaceIndices[0] - subjStart;
 3662                        idx = spaceIndices[0] + 1;
 3663                        var subID = encodingInfo.ParseInt64Textual( array, ref idx, (spaceIndices[1] - spaceIndices[0] -
 3664                        var replyTo = spacesSeen > 2 ?
 3665                           stringPool.GetString( array, spaceIndices[1] + 1, spaceIndices[2] - spaceIndices[1] - 1 ) :
 3666                           null;
 3667                        idx = spaceIndices[spacesSeen - 1] + 1;
 3668                        var payloadSize = encodingInfo.ParseInt32Textual( array, ref idx, (end - idx, true) );
 3669                        if ( payloadSize < 0 )
 670                        {
 0671                           throw new Exception( "Protocol error" );
 672                        }
 3673                        var readFromStreamCount = end + 2 + payloadSize + 2 - advanceState.BufferTotal;
 3674                        if ( readFromStreamCount > 0 )
 675                        {
 0676                           await stream.ReadSpecificAmountAsync( buffer, advanceState.BufferTotal, readFromStreamCount, 
 0677                           advanceState.ReadMore( readFromStreamCount );
 678                        }
 3679                        if ( states.TryGetValue( subID, out var subState ) )
 680                        {
 681                           NATSMessageImpl readMessage;
 3682                           if ( replyTo == null && payloadSize <= 0 && subState.CachedMessage != null )
 683                           {
 684                              // Use cached instance when no reply, no data, and same subject
 0685                              readMessage = subState.CachedMessage;
 0686                           }
 687                           else
 688                           {
 689                              // TODO we can pool & rent NATSMessageImpl instances too, for when there is no reply and s
 690                              // Rent byte array instance and make the message use it
 3691                              var byteArrayInstance = subState.ByteArrayPool.TakeInstance() ?? new InstanceHolder<Resiza
 3692                              subState.RentedByteArrays.Add( byteArrayInstance );
 3693                              var messageData = byteArrayInstance.Instance.SetCapacityAndReturnArray( payloadSize );
 3694                              Array.Copy( buffer.Array, end + 2, messageData, 0, payloadSize );
 3695                              readMessage = new NATSMessageImpl( stringPool.GetString( array, subjStart, subjLen ), subI
 696                           }
 697
 698
 3699                           if ( subState.IsGlobal )
 700                           {
 1701                              this.GlobalSubscriptionMessageReceived?.Invoke( new GlobalSubscriptionEventArgs( readMessa
 1702                           }
 703                           else
 704                           {
 2705                              subState.MessageQueue.Enqueue( readMessage );
 706                           }
 707                        }
 708
 3709                        advanceState.Advance( payloadSize + 2 );
 3710                     }
 711                     else
 712                     {
 0713                        throw new Exception( "Protocol error" );
 714                     }
 715                  }
 716                  else
 717                  {
 0718                     var additionalByte2 = array[idx++];
 0719                     switch ( msgHeader & ClientProtocolConsts.UPPERCASE_MASK_FULL )
 720                     {
 721                        case 0x50494E47: // PING
 0722                           if ( additionalByte == ClientProtocolConsts.CR && additionalByte2 == ClientProtocolConsts.LF 
 723                           {
 724                              // Send back PONG ( we purposefully don't 'await' for this task)
 725#pragma warning disable 4014
 0726                              this.WritePong();
 727#pragma warning restore 4014
 0728                           }
 729                           else
 730                           {
 0731                              throw new Exception( "Protocol error" );
 732                           }
 733                           break;
 734                        case 0x504F4E47: // PONG
 735                                         // Currently, unused, since client never sends pings.
 0736                           if ( additionalByte == ClientProtocolConsts.CR && additionalByte2 == ClientProtocolConsts.LF 
 737                           {
 738                           }
 739                           else
 740                           {
 0741                              throw new Exception( "Protocol error" );
 742                           }
 743                           break;
 744                        case ClientProtocolConsts.INFO_INT: // INFO
 0745                           if ( additionalByte == ClientProtocolConsts.SPACE || additionalByte == ClientProtocolConsts.T
 746                           {
 0747                              var info = DeserializeInfoMessage( buffer.Array, idx, advanceState.BufferOffset - idx, enc
 748                              // TODO implement dynamic handling of INFO message
 0749                           }
 750                           else
 751                           {
 0752                              throw new Exception( "Protocol error" );
 753                           }
 754                           break;
 755                        default:
 0756                           throw new Exception( "Protocol error" );
 757                     }
 758                  }
 759                  break;
 760            }
 761
 762            // Remember to shift remaining data to the beginning of byte array
 763            // TODO optimize: we don't need to do this on every loop.
 3764            SetPreReadLength( rState );
 3765            crIdx = array.IndexOfArray( 0, advanceState.BufferTotal, ClientProtocolConsts.CRLF );
 3766            advanceState.Advance( crIdx < 0 ? advanceState.BufferTotal : crIdx );
 3767         }
 3768      }
 769
 770      internal static void SetPreReadLength( ReadState rState )
 771      {
 7772         var aState = rState.BufferAdvanceState;
 7773         var end = aState.BufferOffset;
 7774         var preReadLength = aState.BufferTotal;
 775         // Messages end with CRLF
 7776         end += 2;
 7777         var remainingData = preReadLength - end;
 7778         if ( remainingData > 0 )
 779         {
 0780            var array = rState.Buffer.Array;
 0781            Array.Copy( array, end, array, 0, remainingData );
 782         }
 7783         aState.Reset();
 7784         aState.ReadMore( remainingData );
 785
 7786      }
 787
 788      internal static ServerInformation DeserializeInfoMessage( Byte[] array, Int32 offset, Int32 count, Encoding encodi
 789      {
 4790         using ( var mStream = new MemoryStream( array, offset, count ) )
 4791         using ( var reader = new StreamReader( mStream, encoding ) )
 4792         using ( var jReader = new JsonTextReader( reader ) )
 793         {
 794            //   return (JObject) JToken.Load( jReader );
 4795            return JsonSerializer.CreateDefault().Deserialize<ServerInformation>( jReader );
 796         }
 4797      }
 798
 799      internal static Byte[] SerializeConnectMessage( ClientInformation clientInfo, Encoding encoding )
 800      {
 4801         using ( var mStream = new MemoryStream( 0x100 ) )
 802         {
 4803            using ( var writer = new StreamWriter( mStream, encoding ) )
 804            {
 4805               JsonSerializer.CreateDefault().Serialize( writer, clientInfo );
 4806               writer.Flush();
 4807               return mStream.ToArray();
 808            }
 809         }
 4810      }
 811
 812      internal static async Task InitializeNewConnection(
 813         ClientInformation clientInfo,
 814         Encoding encoding,
 815         WriteState wState,
 816         Stream stream,
 817         CancellationToken token
 818         )
 819      {
 4820         var connectBytes = SerializeConnectMessage( clientInfo, encoding );
 821
 4822         var msgLength = 10 + connectBytes.Length;
 4823         var array = wState.Buffer.SetCapacityAndReturnArray( msgLength );
 4824         var idx = 0;
 4825         array.WriteASCIIString( ref idx, ClientProtocolConsts.CONNECT_PREFIX );
 4826         connectBytes.CopyTo( array, idx );
 4827         idx += connectBytes.Length;
 4828         array.WriteASCIIString( ref idx, ClientProtocolConsts.CRLF );
 829
 4830         await stream.WriteAsync( array, 0, msgLength, token );
 4831         await stream.FlushAsync( token );
 4832      }
 833   }
 834}
 835