|  |  | 1 |  | /* | 
|  |  | 2 |  |  * Copyright 2018 Stanislav Muhametsin. All rights Reserved. | 
|  |  | 3 |  |  * | 
|  |  | 4 |  |  * Licensed  under the  Apache License,  Version 2.0  (the "License"); | 
|  |  | 5 |  |  * you may not use  this file  except in  compliance with the License. | 
|  |  | 6 |  |  * You may obtain a copy of the License at | 
|  |  | 7 |  |  * | 
|  |  | 8 |  |  *   http://www.apache.org/licenses/LICENSE-2.0 | 
|  |  | 9 |  |  * | 
|  |  | 10 |  |  * Unless required by applicable law or agreed to in writing, software | 
|  |  | 11 |  |  * distributed  under the  License is distributed on an "AS IS" BASIS, | 
|  |  | 12 |  |  * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or | 
|  |  | 13 |  |  * implied. | 
|  |  | 14 |  |  * | 
|  |  | 15 |  |  * See the License for the specific language governing permissions and | 
|  |  | 16 |  |  * limitations under the License. | 
|  |  | 17 |  |  */ | 
|  |  | 18 |  | using Newtonsoft.Json; | 
|  |  | 19 |  | using Newtonsoft.Json.Linq; | 
|  |  | 20 |  | using System; | 
|  |  | 21 |  | using System.Collections.Concurrent; | 
|  |  | 22 |  | using System.Collections.Generic; | 
|  |  | 23 |  | using System.IO; | 
|  |  | 24 |  | using System.Linq; | 
|  |  | 25 |  | using System.Text; | 
|  |  | 26 |  | using System.Threading; | 
|  |  | 27 |  | using System.Threading.Tasks; | 
|  |  | 28 |  | using UtilPack; | 
|  |  | 29 |  |  | 
|  |  | 30 |  | namespace CBAM.NATS.Implementation | 
|  |  | 31 |  | { | 
|  |  | 32 |  |    using TStoredState = Queue<NATSMessageImpl>; | 
|  |  | 33 |  |  | 
|  |  | 34 |  |  | 
|  |  | 35 |  |    internal static class ClientProtocolConsts | 
|  |  | 36 |  |    { | 
|  |  | 37 |  |  | 
|  |  | 38 |  |       public const Byte CR = 0x0D; | 
|  |  | 39 |  |       public const Byte LF = 0x0A; | 
|  |  | 40 |  |       public const Byte SPACE = 0x20; | 
|  |  | 41 |  |       public const Byte TAB = 0x09; | 
|  |  | 42 |  |       public static readonly Byte[] CRLF = new Byte[] { CR, LF }; | 
|  |  | 43 |  |       public static readonly Byte[] LF_ARRAY = new Byte[] { CRLF[1] }; | 
|  |  | 44 |  |       public static readonly Byte[] PONG = new Byte[] { 0x50, 0x4F, 0x4E, 0x47, 0x0D, LF }; | 
|  |  | 45 |  |  | 
|  |  | 46 |  |       public static readonly Byte[] SUB_PREFIX = new Byte[] { 0x53, 0x55, 0x42, SPACE }; | 
|  |  | 47 |  |       public static readonly Byte[] PUB_PREFIX = new Byte[] { 0x50, 0x55, 0x42, SPACE }; | 
|  |  | 48 |  |       public static readonly Byte[] UNSUB_PREFIX = new Byte[] { 0x55, 0x4E, 0x53, 0x55, 0x42, SPACE }; | 
|  |  | 49 |  |       public static readonly Byte[] CONNECT_PREFIX = new Byte[] { 0x43, 0x4F, 0x4E, 0x4E, 0x45, 0x43, 0x54, SPACE }; | 
|  |  | 50 |  |  | 
|  |  | 51 |  |       public const Int32 READ_COUNT = 0x10000; | 
|  |  | 52 |  |  | 
|  |  | 53 |  |       public const Int32 UPPERCASE_MASK_FULL = 0x5F5F5F5F; | 
|  |  | 54 |  |       public const Int32 INFO_INT = 0x494E464F; | 
|  |  | 55 |  |  | 
|  |  | 56 |  |  | 
|  |  | 57 |  |       public static class Info | 
|  |  | 58 |  |       { | 
|  |  | 59 |  |          public const String SERVER_ID = "server_id"; | 
|  |  | 60 |  |          public const String VERSION = "version"; | 
|  |  | 61 |  |          public const String VERSION_GO = "go"; | 
|  |  | 62 |  |          public const String HOST = "host"; | 
|  |  | 63 |  |          public const String PORT = "port"; | 
|  |  | 64 |  |          public const String AUTH_REQUIRED = "auth_required"; | 
|  |  | 65 |  |          public const String SSL_REQUIRED = "ssl_required"; | 
|  |  | 66 |  |          public const String MAX_PAYLOAD = "max_payload"; | 
|  |  | 67 |  |          public const String CONNECT_URLS = "connect_urls"; | 
|  |  | 68 |  |       } | 
|  |  | 69 |  |  | 
|  |  | 70 |  |       public static class Connect | 
|  |  | 71 |  |       { | 
|  |  | 72 |  |          public const String VERBOSE = "verbose"; | 
|  |  | 73 |  |          public const String PEDANTIC = "pedantic"; | 
|  |  | 74 |  |          public const String SSL_REQUIRED = "ssl_required"; | 
|  |  | 75 |  |          public const String AUTH_TOKEN = "auth_token"; | 
|  |  | 76 |  |          public const String USER = "user"; | 
|  |  | 77 |  |          public const String PASSWORD = "pass"; | 
|  |  | 78 |  |          public const String NAME = "name"; | 
|  |  | 79 |  |          public const String LANGAUGE = "lang"; | 
|  |  | 80 |  |          public const String VERSION = "version"; | 
|  |  | 81 |  |          public const String PROTOCOL = "protocol"; | 
|  |  | 82 |  |       } | 
|  |  | 83 |  |  | 
|  |  | 84 |  |    } | 
|  |  | 85 |  |  | 
|  |  | 86 |  |  | 
|  |  | 87 |  |    internal sealed class ServerInformation | 
|  |  | 88 |  |    { | 
|  |  | 89 |  |       [JsonProperty( ClientProtocolConsts.Info.SERVER_ID )] | 
|  |  | 90 |  |       public String ServerID { get; set; } | 
|  |  | 91 |  |  | 
|  |  | 92 |  |       [JsonProperty( ClientProtocolConsts.Info.VERSION )] | 
|  |  | 93 |  |       public String ServerVersion { get; set; } | 
|  |  | 94 |  |  | 
|  |  | 95 |  |       [JsonProperty( ClientProtocolConsts.Info.VERSION_GO )] | 
|  |  | 96 |  |       public String GoVersion { get; set; } | 
|  |  | 97 |  |  | 
|  |  | 98 |  |       [JsonProperty( ClientProtocolConsts.Info.HOST )] | 
|  |  | 99 |  |       public String Host { get; set; } | 
|  |  | 100 |  |  | 
|  |  | 101 |  |       [JsonProperty( ClientProtocolConsts.Info.PORT )] | 
|  |  | 102 |  |       public Int32 Port { get; set; } | 
|  |  | 103 |  |  | 
|  |  | 104 |  |       [JsonProperty( ClientProtocolConsts.Info.AUTH_REQUIRED )] | 
|  |  | 105 |  |       public Boolean AuthenticationRequired { get; set; } | 
|  |  | 106 |  |  | 
|  |  | 107 |  |       [JsonProperty( ClientProtocolConsts.Info.SSL_REQUIRED )] | 
|  |  | 108 |  |       public Boolean SSLRequired { get; set; } | 
|  |  | 109 |  |  | 
|  |  | 110 |  |       [JsonProperty( ClientProtocolConsts.Info.MAX_PAYLOAD )] | 
|  |  | 111 |  |       public Int32 MaxPayload { get; set; } | 
|  |  | 112 |  |  | 
|  |  | 113 |  |       [JsonProperty( ClientProtocolConsts.Info.CONNECT_URLS )] | 
|  |  | 114 |  |       public String[] ConnectionURLs { get; set; } | 
|  |  | 115 |  |    } | 
|  |  | 116 |  |  | 
|  |  | 117 |  |    internal sealed class ClientInformation | 
|  |  | 118 |  |    { | 
|  |  | 119 |  |       [JsonProperty( ClientProtocolConsts.Connect.VERBOSE )] | 
|  |  | 120 |  |       public Boolean IsVerbose { get; set; } | 
|  |  | 121 |  |  | 
|  |  | 122 |  |       [JsonProperty( ClientProtocolConsts.Connect.PEDANTIC )] | 
|  |  | 123 |  |       public Boolean IsPedantic { get; set; } | 
|  |  | 124 |  |  | 
|  |  | 125 |  |       [JsonProperty( ClientProtocolConsts.Connect.SSL_REQUIRED )] | 
|  |  | 126 |  |       public Boolean SSLRequired { get; set; } | 
|  |  | 127 |  |  | 
|  |  | 128 |  |       [JsonProperty( ClientProtocolConsts.Connect.AUTH_TOKEN, NullValueHandling = NullValueHandling.Ignore )] | 
|  |  | 129 |  |       public String AuthenticationToken { get; set; } | 
|  |  | 130 |  |  | 
|  |  | 131 |  |       [JsonProperty( ClientProtocolConsts.Connect.USER, NullValueHandling = NullValueHandling.Ignore )] | 
|  |  | 132 |  |       public String Username { get; set; } | 
|  |  | 133 |  |  | 
|  |  | 134 |  |       [JsonProperty( ClientProtocolConsts.Connect.PASSWORD, NullValueHandling = NullValueHandling.Ignore )] | 
|  |  | 135 |  |       public String Password { get; set; } | 
|  |  | 136 |  |  | 
|  |  | 137 |  |       [JsonProperty( ClientProtocolConsts.Connect.NAME, NullValueHandling = NullValueHandling.Ignore )] | 
|  |  | 138 |  |       public String ClientName { get; set; } | 
|  |  | 139 |  |  | 
|  |  | 140 |  |       [JsonProperty( ClientProtocolConsts.Connect.LANGAUGE, NullValueHandling = NullValueHandling.Ignore )] | 
|  |  | 141 |  |       public String ClientLanguage { get; set; } | 
|  |  | 142 |  |  | 
|  |  | 143 |  |       [JsonProperty( ClientProtocolConsts.Connect.VERSION, NullValueHandling = NullValueHandling.Ignore )] | 
|  |  | 144 |  |       public String ClientVersion { get; set; } | 
|  |  | 145 |  |  | 
|  |  | 146 |  |       [JsonProperty( ClientProtocolConsts.Connect.PROTOCOL, DefaultValueHandling = DefaultValueHandling.Ignore )] | 
|  |  | 147 |  |       public Int32 ProtocolVersion { get; set; } | 
|  |  | 148 |  |  | 
|  |  | 149 |  |    } | 
|  |  | 150 |  |  | 
|  |  | 151 |  |  | 
|  |  | 152 |  |    internal sealed class ClientProtocol : NATSConnectionObservability | 
|  |  | 153 |  |    { | 
|  |  | 154 |  |       private sealed class SubscriptionState | 
|  |  | 155 |  |       { | 
|  | 3 | 156 |  |          public SubscriptionState( | 
|  | 3 | 157 |  |             String subject, | 
|  | 3 | 158 |  |             Int64 subscriptionID, | 
|  | 3 | 159 |  |             Boolean isGlobal | 
|  | 3 | 160 |  |             ) | 
|  |  | 161 |  |          { | 
|  | 3 | 162 |  |             this.SubscriptionID = subscriptionID; | 
|  | 3 | 163 |  |             this.IsGlobal = isGlobal; | 
|  | 3 | 164 |  |             this.MessageQueue = isGlobal ? null : new Queue<NATSMessageImpl>(); | 
|  | 3 | 165 |  |             this.CachedMessage = isGlobal || subject.IndexOf( "*" ) >= 0 ? null : new NATSMessageImpl( subject, subscrip | 
|  | 3 | 166 |  |             this.DataBuffer = isGlobal ? null : new ResizableArray<Byte>(); | 
|  | 3 | 167 |  |             this.ByteArrayPool = new LocklessInstancePoolForClassesNoHeapAllocations<InstanceHolder<ResizableArray<Byte> | 
|  | 3 | 168 |  |             this.RentedByteArrays = new List<InstanceHolder<ResizableArray<Byte>>>(); | 
|  | 3 | 169 |  |          } | 
|  |  | 170 |  |  | 
|  | 0 | 171 |  |          public Int64 SubscriptionID { get; } | 
|  |  | 172 |  |  | 
|  | 9 | 173 |  |          public Queue<NATSMessageImpl> MessageQueue { get; } | 
|  |  | 174 |  |  | 
|  | 0 | 175 |  |          public NATSMessageImpl CachedMessage { get; } | 
|  |  | 176 |  |  | 
|  | 0 | 177 |  |          public ResizableArray<Byte> DataBuffer { get; } | 
|  |  | 178 |  |  | 
|  | 3 | 179 |  |          public Boolean IsGlobal { get; } | 
|  |  | 180 |  |  | 
|  | 5 | 181 |  |          public LocklessInstancePoolForClassesNoHeapAllocations<InstanceHolder<ResizableArray<Byte>>> ByteArrayPool { ge | 
|  |  | 182 |  |  | 
|  | 7 | 183 |  |          public List<InstanceHolder<ResizableArray<Byte>>> RentedByteArrays { get; } | 
|  |  | 184 |  |  | 
|  |  | 185 |  |       } | 
|  |  | 186 |  |  | 
|  |  | 187 |  |       public abstract class IOState | 
|  |  | 188 |  |       { | 
|  |  | 189 |  |  | 
|  | 8 | 190 |  |          public IOState() | 
|  |  | 191 |  |          { | 
|  | 8 | 192 |  |             this.Lock = new AsyncLock(); | 
|  | 8 | 193 |  |             this.Buffer = new ResizableArray<Byte>( 0x100 ); | 
|  | 8 | 194 |  |          } | 
|  |  | 195 |  |  | 
|  | 26 | 196 |  |          public ResizableArray<Byte> Buffer { get; } | 
|  |  | 197 |  |  | 
|  | 11 | 198 |  |          public AsyncLock Lock { get; } | 
|  |  | 199 |  |       } | 
|  |  | 200 |  |  | 
|  |  | 201 |  |       public sealed class WriteState : IOState | 
|  |  | 202 |  |       { | 
|  |  | 203 |  |          public WriteState( | 
|  | 4 | 204 |  |             ) : base() | 
|  |  | 205 |  |          { | 
|  | 4 | 206 |  |          } | 
|  |  | 207 |  |       } | 
|  |  | 208 |  |  | 
|  |  | 209 |  |       public sealed class ReadState : IOState | 
|  |  | 210 |  |       { | 
|  |  | 211 |  |          public ReadState( | 
|  | 4 | 212 |  |             ) : base() | 
|  |  | 213 |  |          { | 
|  | 4 | 214 |  |             this.MessageSpaceIndices = new Int32[3]; | 
|  | 4 | 215 |  |             this.BufferAdvanceState = new BufferAdvanceState(); | 
|  | 4 | 216 |  |          } | 
|  |  | 217 |  |  | 
|  | 14 | 218 |  |          public BufferAdvanceState BufferAdvanceState { get; } | 
|  |  | 219 |  |  | 
|  | 3 | 220 |  |          public Int32[] MessageSpaceIndices { get; } | 
|  |  | 221 |  |       } | 
|  |  | 222 |  |  | 
|  |  | 223 |  |       public sealed class ClientProtocolIOState | 
|  |  | 224 |  |       { | 
|  |  | 225 |  |  | 
|  | 4 | 226 |  |          public ClientProtocolIOState( | 
|  | 4 | 227 |  |             Stream stream, | 
|  | 4 | 228 |  |             BinaryStringPool stringPool, | 
|  | 4 | 229 |  |             IEncodingInfo encoding, | 
|  | 4 | 230 |  |             WriteState writeState, | 
|  | 4 | 231 |  |             ReadState readState | 
|  | 4 | 232 |  |             ) | 
|  |  | 233 |  |          { | 
|  | 4 | 234 |  |             this.Stream = ArgumentValidator.ValidateNotNull( nameof( stream ), stream ); | 
|  | 4 | 235 |  |             this.StringPool = ArgumentValidator.ValidateNotNull( nameof( stringPool ), stringPool ); | 
|  | 4 | 236 |  |             this.Encoding = ArgumentValidator.ValidateNotNull( nameof( encoding ), encoding ); | 
|  | 4 | 237 |  |             this.WriteState = writeState ?? new WriteState(); | 
|  | 4 | 238 |  |             this.ReadState = readState ?? new ReadState(); | 
|  | 4 | 239 |  |          } | 
|  |  | 240 |  |  | 
|  | 11 | 241 |  |          public WriteState WriteState { get; } | 
|  |  | 242 |  |  | 
|  | 6 | 243 |  |          public ReadState ReadState { get; } | 
|  |  | 244 |  |  | 
|  | 24 | 245 |  |          public Stream Stream { get; } | 
|  |  | 246 |  |  | 
|  | 3 | 247 |  |          public BinaryStringPool StringPool { get; } | 
|  |  | 248 |  |  | 
|  | 16 | 249 |  |          public IEncodingInfo Encoding { get; } | 
|  |  | 250 |  |       } | 
|  |  | 251 |  |  | 
|  |  | 252 |  |       public sealed class GlobalSubscriptionEventArgs | 
|  |  | 253 |  |       { | 
|  | 1 | 254 |  |          public GlobalSubscriptionEventArgs( NATSMessageImpl message ) | 
|  |  | 255 |  |          { | 
|  | 1 | 256 |  |             this.Message = ArgumentValidator.ValidateNotNull( nameof( message ), message ); | 
|  | 1 | 257 |  |          } | 
|  | 2 | 258 |  |          public NATSMessageImpl Message { get; } | 
|  |  | 259 |  |       } | 
|  |  | 260 |  |  | 
|  |  | 261 |  |  | 
|  |  | 262 |  |       private readonly ClientProtocolIOState _state; | 
|  |  | 263 |  |       private readonly ServerInformation _serverParameters; | 
|  |  | 264 |  |       private readonly Byte[] _globalSubscriptionNameBytes; | 
|  |  | 265 |  |       private readonly ConcurrentDictionary<Int64, SubscriptionState> _subscriptionStates; | 
|  |  | 266 |  |       private readonly AsyncLazy<Int64> _globalSubscriptionID; | 
|  |  | 267 |  |  | 
|  |  | 268 |  |       private Int64 _currentID; | 
|  |  | 269 |  |       private Int64 _globalSubscriptionSuffix; | 
|  |  | 270 |  |  | 
|  | 4 | 271 |  |       public ClientProtocol( | 
|  | 4 | 272 |  |         ClientProtocolIOState state, | 
|  | 4 | 273 |  |         ServerInformation serverParameters, | 
|  | 4 | 274 |  |         String globalSubscriptionName = null | 
|  | 4 | 275 |  |       ) | 
|  |  | 276 |  |       { | 
|  | 4 | 277 |  |          this._state = ArgumentValidator.ValidateNotNull( nameof( state ), state ); | 
|  | 4 | 278 |  |          this._subscriptionStates = new ConcurrentDictionary<Int64, SubscriptionState>(); | 
|  | 4 | 279 |  |          this._serverParameters = ArgumentValidator.ValidateNotNull( nameof( serverParameters ), serverParameters ); | 
|  | 4 | 280 |  |          this._currentID = 0; | 
|  | 4 | 281 |  |          this.GlobalSubscriptionPrefix = ( String.IsNullOrEmpty( globalSubscriptionName ) ? Guid.NewGuid().ToString( "N" | 
|  | 4 | 282 |  |          this._globalSubscriptionNameBytes = state.Encoding.Encoding.GetBytes( this.GlobalSubscriptionPrefix ); | 
|  | 5 | 283 |  |          this._globalSubscriptionID = new AsyncLazy<Int64>( async () => ( await this.WriteSubscribe( this.GlobalSubscrip | 
|  | 4 | 284 |  |       } | 
|  |  | 285 |  |  | 
|  |  | 286 |  |  | 
|  | 0 | 287 |  |       public Boolean CanBeReturnedToPool => this._subscriptionStates.Count <= 0 || this._subscriptionStates.Values.All(  | 
|  |  | 288 |  |  | 
|  | 4 | 289 |  |       public Stream Stream => this._state.Stream; | 
|  |  | 290 |  |  | 
|  | 6 | 291 |  |       public String GlobalSubscriptionPrefix { get; } | 
|  |  | 292 |  |  | 
|  |  | 293 |  |  | 
|  |  | 294 |  |       public event GenericEventHandler<GlobalSubscriptionEventArgs> GlobalSubscriptionMessageReceived; | 
|  |  | 295 |  |       public event GenericEventHandler<AfterSubscriptionSentArgs> AfterSubscriptionSent; | 
|  |  | 296 |  |       public event GenericEventHandler<AfterPublishSentArgs> AfterPublishSent; | 
|  |  | 297 |  |  | 
|  |  | 298 |  |       public Task<(Int64, TStoredState)> WriteSubscribe( | 
|  |  | 299 |  |          String subject, | 
|  |  | 300 |  |          String queue, | 
|  |  | 301 |  |          Int64 autoUnsub | 
|  | 2 | 302 |  |          ) => this.WriteSubscribe( subject, queue, autoUnsub, false ); | 
|  |  | 303 |  |  | 
|  |  | 304 |  |       private async Task<(Int64, TStoredState)> WriteSubscribe( | 
|  |  | 305 |  |          String subject, | 
|  |  | 306 |  |          String queue, | 
|  |  | 307 |  |          Int64 autoUnsub, | 
|  |  | 308 |  |          Boolean isGlobal | 
|  |  | 309 |  |          ) | 
|  |  | 310 |  |       { | 
|  | 3 | 311 |  |          var state = this._state; | 
|  | 3 | 312 |  |          var wState = state.WriteState; | 
|  | 3 | 313 |  |          var id = Interlocked.Increment( ref this._currentID ); | 
|  |  | 314 |  |  | 
|  | 3 | 315 |  |          using ( await wState.Lock.LockAsync() ) | 
|  |  | 316 |  |          { | 
|  | 3 | 317 |  |             var buffer = wState.Buffer; | 
|  | 3 | 318 |  |             var encoding = state.Encoding; | 
|  | 3 | 319 |  |             var idx = 0; | 
|  |  | 320 |  |             // Write 'SUB <subject> [queue group ]<sid>\r\n' | 
|  | 3 | 321 |  |             var idSize = encoding.GetTextualIntegerRepresentationSize( id ); | 
|  | 3 | 322 |  |             if ( !String.IsNullOrEmpty( queue ) && queue.ContainsNonASCIICharacters( NATSStatementInformationImpl.IsInva | 
|  |  | 323 |  |             { | 
|  | 0 | 324 |  |                throw new InvalidOperationException( "Invalid queue name: " + queue ); | 
|  |  | 325 |  |             } | 
|  | 3 | 326 |  |             var msgSize = 7 + subject.Length + ( String.IsNullOrEmpty( queue ) ? 0 : ( queue.Length + 1 ) ) + idSize; | 
|  | 3 | 327 |  |             var array = wState.Buffer.SetCapacityAndReturnArray( msgSize ); | 
|  | 3 | 328 |  |             array | 
|  | 3 | 329 |  |                .WriteASCIIString( ref idx, ClientProtocolConsts.SUB_PREFIX ) | 
|  | 3 | 330 |  |                .WriteASCIIString( ref idx, subject, false ) | 
|  | 3 | 331 |  |                .WriteASCIIString( ref idx, ClientProtocolConsts.SPACE ); | 
|  |  | 332 |  |  | 
|  | 3 | 333 |  |             if ( !String.IsNullOrEmpty( queue ) ) | 
|  |  | 334 |  |             { | 
|  | 0 | 335 |  |                array | 
|  | 0 | 336 |  |                   .WriteASCIIString( ref idx, queue, false ) | 
|  | 0 | 337 |  |                   .WriteASCIIString( ref idx, ClientProtocolConsts.SPACE ); | 
|  |  | 338 |  |             } | 
|  |  | 339 |  |  | 
|  | 3 | 340 |  |             encoding.WriteIntegerTextual( array, ref idx, id, idSize ); | 
|  | 3 | 341 |  |             array.WriteASCIIString( ref idx, ClientProtocolConsts.CRLF ); | 
|  |  | 342 |  |             System.Diagnostics.Debug.Assert( idx == msgSize ); | 
|  |  | 343 |  |  | 
|  | 3 | 344 |  |             await state.Stream.WriteAsync( array, 0, msgSize, default ); | 
|  | 3 | 345 |  |             if ( !isGlobal ) | 
|  |  | 346 |  |             { | 
|  | 2 | 347 |  |                this.AfterSubscriptionSent?.InvokeAllEventHandlers( new AfterSubscriptionSentArgs( subject, queue, autoUn | 
|  |  | 348 |  |             } | 
|  | 3 | 349 |  |             if ( autoUnsub > 0 ) | 
|  |  | 350 |  |             { | 
|  | 1 | 351 |  |                await this.PerformUnsubscribe( id, autoUnsub ); | 
|  |  | 352 |  |             } | 
|  | 3 | 353 |  |             await state.Stream.FlushAsync( default ); | 
|  |  | 354 |  |  | 
|  | 3 | 355 |  |          } | 
|  |  | 356 |  |  | 
|  | 3 | 357 |  |          var retVal = new SubscriptionState( subject, id, isGlobal ); | 
|  | 3 | 358 |  |          if ( !this._subscriptionStates.TryAdd( id, retVal ) ) | 
|  |  | 359 |  |          { | 
|  | 0 | 360 |  |             throw new Exception( "This should not be possible." ); | 
|  |  | 361 |  |          } | 
|  |  | 362 |  |  | 
|  | 3 | 363 |  |          return (id, retVal.MessageQueue); | 
|  | 3 | 364 |  |       } | 
|  |  | 365 |  |  | 
|  |  | 366 |  |       public Task EnumerationEnded( | 
|  |  | 367 |  |          Int64 id, | 
|  |  | 368 |  |          Boolean hasAutoUnSub, | 
|  |  | 369 |  |          Int64 currentAutoUnsub | 
|  |  | 370 |  |  | 
|  |  | 371 |  |          ) | 
|  |  | 372 |  |       { | 
|  |  | 373 |  |          Task retVal; | 
|  | 2 | 374 |  |          if ( hasAutoUnSub && currentAutoUnsub < 0 ) | 
|  |  | 375 |  |          { | 
|  | 0 | 376 |  |             this._subscriptionStates.TryRemove( id, out var ignored ); | 
|  | 0 | 377 |  |             retVal = TaskUtils.CompletedTask; | 
|  | 0 | 378 |  |          } | 
|  |  | 379 |  |          else | 
|  |  | 380 |  |          { | 
|  | 2 | 381 |  |             retVal = this.WriteUnsubscribe( id, 0 ); | 
|  |  | 382 |  |          } | 
|  |  | 383 |  |  | 
|  | 2 | 384 |  |          return retVal; | 
|  |  | 385 |  |       } | 
|  |  | 386 |  |  | 
|  |  | 387 |  |       public async Task WriteUnsubscribe( | 
|  |  | 388 |  |          Int64 id, | 
|  |  | 389 |  |          Int64 autoUnsubscribe | 
|  |  | 390 |  |          ) | 
|  |  | 391 |  |       { | 
|  | 2 | 392 |  |          if ( autoUnsubscribe <= 0 ) | 
|  |  | 393 |  |          { | 
|  | 2 | 394 |  |             this._subscriptionStates.TryRemove( id, out var ignored ); | 
|  |  | 395 |  |          } | 
|  |  | 396 |  |  | 
|  | 2 | 397 |  |          var state = this._state; | 
|  | 2 | 398 |  |          var wState = state.WriteState; | 
|  | 2 | 399 |  |          using ( await wState.Lock.LockAsync() ) | 
|  |  | 400 |  |          { | 
|  | 2 | 401 |  |             await this.PerformUnsubscribe( id, autoUnsubscribe ); | 
|  | 2 | 402 |  |             await state.Stream.FlushAsync( default ); | 
|  | 2 | 403 |  |          } | 
|  |  | 404 |  |  | 
|  | 2 | 405 |  |       } | 
|  |  | 406 |  |  | 
|  |  | 407 |  |       private async Task PerformUnsubscribe( | 
|  |  | 408 |  |          Int64 id, | 
|  |  | 409 |  |          Int64 autoUnsubscribe | 
|  |  | 410 |  |          ) | 
|  |  | 411 |  |       { | 
|  | 3 | 412 |  |          var state = this._state; | 
|  | 3 | 413 |  |          var wState = state.WriteState; | 
|  | 3 | 414 |  |          var encoding = state.Encoding; | 
|  | 3 | 415 |  |          var idSize = encoding.GetTextualIntegerRepresentationSize( id ); | 
|  | 3 | 416 |  |          var autoSize = autoUnsubscribe > 0 ? encoding.GetTextualIntegerRepresentationSize( autoUnsubscribe ) : 0; | 
|  | 3 | 417 |  |          var msgSize = 8 + idSize + ( autoSize > 0 ? ( autoSize + 1 ) : 0 ); | 
|  | 3 | 418 |  |          var array = wState.Buffer.SetCapacityAndReturnArray( msgSize ); | 
|  | 3 | 419 |  |          var idx = 0; | 
|  | 3 | 420 |  |          array.WriteASCIIString( ref idx, ClientProtocolConsts.UNSUB_PREFIX ); | 
|  | 3 | 421 |  |          encoding.WriteIntegerTextual( array, ref idx, id, idSize ); | 
|  | 3 | 422 |  |          if ( autoSize > 0 ) | 
|  |  | 423 |  |          { | 
|  | 1 | 424 |  |             array.WriteASCIIString( ref idx, ClientProtocolConsts.SPACE ); | 
|  | 1 | 425 |  |             encoding.WriteIntegerTextual( array, ref idx, autoUnsubscribe, autoSize ); | 
|  |  | 426 |  |          } | 
|  | 3 | 427 |  |          array.WriteASCIIString( ref idx, ClientProtocolConsts.CRLF ); | 
|  |  | 428 |  |          System.Diagnostics.Debug.Assert( idx == msgSize ); | 
|  | 3 | 429 |  |          await state.Stream.WriteAsync( array, 0, msgSize, default ); | 
|  | 3 | 430 |  |       } | 
|  |  | 431 |  |  | 
|  |  | 432 |  |       public async Task WritePublish( | 
|  |  | 433 |  |          IEnumerable<NATSPublishData> datas | 
|  |  | 434 |  |          ) | 
|  |  | 435 |  |       { | 
|  | 3 | 436 |  |          var state = this._state; | 
|  | 3 | 437 |  |          var wState = state.WriteState; | 
|  | 3 | 438 |  |          using ( await wState.Lock.LockAsync() ) | 
|  |  | 439 |  |          { | 
|  | 3 | 440 |  |             var buffer = wState.Buffer; | 
|  | 3 | 441 |  |             var encoding = state.Encoding; | 
|  |  | 442 |  |  | 
|  | 12 | 443 |  |             foreach ( var pData in datas ) | 
|  |  | 444 |  |             { | 
|  | 3 | 445 |  |                var subject = pData.Subject; | 
|  | 3 | 446 |  |                if ( !String.IsNullOrEmpty( subject ) ) | 
|  |  | 447 |  |                { | 
|  | 3 | 448 |  |                   var count = pData.Count; | 
|  | 3 | 449 |  |                   var reply = pData.ReplySubject; | 
|  |  | 450 |  |  | 
|  | 3 | 451 |  |                   var dataMsgSize = encoding.GetTextualIntegerRepresentationSize( count ); | 
|  | 3 | 452 |  |                   var msgSize = 9 + subject.Length + ( String.IsNullOrEmpty( reply ) ? 0 : ( reply.Length + 1 ) ) + data | 
|  | 3 | 453 |  |                   var array = wState.Buffer.SetCapacityAndReturnArray( msgSize ); | 
|  |  | 454 |  |  | 
|  | 3 | 455 |  |                   var idx = 0; | 
|  | 3 | 456 |  |                   array | 
|  | 3 | 457 |  |                      .WriteASCIIString( ref idx, ClientProtocolConsts.PUB_PREFIX ) | 
|  | 3 | 458 |  |                      .WriteASCIIString( ref idx, subject, false ) | 
|  | 3 | 459 |  |                      .WriteASCIIString( ref idx, ClientProtocolConsts.SPACE ); | 
|  | 3 | 460 |  |                   if ( !String.IsNullOrEmpty( reply ) ) | 
|  |  | 461 |  |                   { | 
|  | 1 | 462 |  |                      array | 
|  | 1 | 463 |  |                         .WriteASCIIString( ref idx, reply, false ) | 
|  | 1 | 464 |  |                         .WriteASCIIString( ref idx, ClientProtocolConsts.SPACE ); | 
|  |  | 465 |  |                   } | 
|  | 3 | 466 |  |                   encoding.WriteIntegerTextual( array, ref idx, count, dataMsgSize ); | 
|  | 3 | 467 |  |                   array.WriteASCIIString( ref idx, ClientProtocolConsts.CRLF ); | 
|  |  | 468 |  |  | 
|  | 3 | 469 |  |                   if ( count > 0 ) | 
|  |  | 470 |  |                   { | 
|  | 3 | 471 |  |                      Array.Copy( pData.Data, pData.Offset, array, idx, count ); | 
|  | 3 | 472 |  |                      idx += count; | 
|  |  | 473 |  |                   } | 
|  |  | 474 |  |  | 
|  | 3 | 475 |  |                   array.WriteASCIIString( ref idx, ClientProtocolConsts.CRLF ); | 
|  |  | 476 |  |  | 
|  |  | 477 |  |                   System.Diagnostics.Debug.Assert( idx == msgSize ); | 
|  | 3 | 478 |  |                   await state.Stream.WriteAsync( array, 0, msgSize, default ); | 
|  |  | 479 |  |  | 
|  | 3 | 480 |  |                   this.AfterPublishSent?.InvokeAllEventHandlers( new AfterPublishSentArgs( subject, reply ), throwExcept | 
|  | 3 | 481 |  |                } | 
|  | 3 | 482 |  |             } | 
|  |  | 483 |  |  | 
|  | 3 | 484 |  |             await state.Stream.FlushAsync( default ); | 
|  | 3 | 485 |  |          } | 
|  |  | 486 |  |  | 
|  | 3 | 487 |  |       } | 
|  |  | 488 |  |  | 
|  |  | 489 |  |       private async Task WritePong() | 
|  |  | 490 |  |       { | 
|  | 0 | 491 |  |          var state = this._state; | 
|  | 0 | 492 |  |          using ( await state.WriteState.Lock.LockAsync() ) | 
|  |  | 493 |  |          { | 
|  | 0 | 494 |  |             await state.Stream.WriteAsync( ClientProtocolConsts.PONG, 0, 6, default ); | 
|  | 0 | 495 |  |             await state.Stream.FlushAsync( default ); | 
|  | 0 | 496 |  |          } | 
|  | 0 | 497 |  |       } | 
|  |  | 498 |  |  | 
|  |  | 499 |  |       public async ValueTask<Boolean> PerformReadNext( | 
|  |  | 500 |  |          Int64 subscriptionID | 
|  |  | 501 |  |          ) | 
|  |  | 502 |  |       { | 
|  | 2 | 503 |  |          if ( this._subscriptionStates.TryGetValue( subscriptionID, out var subState ) ) | 
|  |  | 504 |  |          { | 
|  | 2 | 505 |  |             var pool = subState.ByteArrayPool; | 
|  | 4 | 506 |  |             foreach ( var rentedByteArray in subState.RentedByteArrays ) | 
|  |  | 507 |  |             { | 
|  | 0 | 508 |  |                pool.ReturnInstance( rentedByteArray ); | 
|  |  | 509 |  |             } | 
|  |  | 510 |  |  | 
|  | 2 | 511 |  |             subState.RentedByteArrays.Clear(); | 
|  | 2 | 512 |  |             var queue = subState.MessageQueue; | 
|  | 4 | 513 |  |             while ( queue.Count <= 0 ) | 
|  |  | 514 |  |             { | 
|  | 2 | 515 |  |                var rLock = this._state.ReadState.Lock; | 
|  | 2 | 516 |  |                if ( queue.Count <= 0 ) | 
|  |  | 517 |  |                { | 
|  | 2 | 518 |  |                   var lockScope = await rLock.TryLockAsync( TimeSpan.FromMilliseconds( 100 ) ); | 
|  | 2 | 519 |  |                   if ( lockScope.HasValue ) | 
|  |  | 520 |  |                   { | 
|  | 2 | 521 |  |                      using ( lockScope.Value ) | 
|  |  | 522 |  |                      { | 
|  | 2 | 523 |  |                         if ( queue.Count <= 0 ) | 
|  |  | 524 |  |                         { | 
|  | 2 | 525 |  |                            await this.PerformRead(); | 
|  |  | 526 |  |                         } | 
|  | 2 | 527 |  |                      } | 
|  |  | 528 |  |                   } | 
|  |  | 529 |  |                } | 
|  |  | 530 |  |             } | 
|  | 2 | 531 |  |          } | 
|  |  | 532 |  |  | 
|  | 2 | 533 |  |          return ( subState?.MessageQueue?.Count ?? 0 ) > 0; | 
|  |  | 534 |  |  | 
|  | 2 | 535 |  |       } | 
|  |  | 536 |  |  | 
|  |  | 537 |  |       public async Task<NATSMessage> RequestAsync( String subject, Byte[] data, Int32 offset, Int32 count ) | 
|  |  | 538 |  |       { | 
|  | 1 | 539 |  |          await this._globalSubscriptionID; | 
|  |  | 540 |  |  | 
|  | 1 | 541 |  |          var replyTo = this.GlobalSubscriptionPrefix + Interlocked.Increment( ref this._globalSubscriptionSuffix ); | 
|  | 1 | 542 |  |          NATSMessage receivedMessage = null; | 
|  |  | 543 |  |          void HandleGlobalSubEvent( GlobalSubscriptionEventArgs args ) | 
|  |  | 544 |  |          { | 
|  | 1 | 545 |  |             if ( String.Equals( args.Message.Subject, replyTo ) ) | 
|  |  | 546 |  |             { | 
|  | 1 | 547 |  |                Interlocked.Exchange( ref receivedMessage, args.Message ); | 
|  |  | 548 |  |             } | 
|  | 1 | 549 |  |          }; | 
|  |  | 550 |  |  | 
|  | 1 | 551 |  |          this.GlobalSubscriptionMessageReceived += HandleGlobalSubEvent; | 
|  | 2 | 552 |  |          using ( new UsingHelper( () => this.GlobalSubscriptionMessageReceived -= HandleGlobalSubEvent ) ) | 
|  |  | 553 |  |          { | 
|  | 1 | 554 |  |             await this.WritePublish( | 
|  | 1 | 555 |  |                new NATSPublishData( subject, data, offset, count, replyTo ).Singleton() | 
|  | 1 | 556 |  |                ); | 
|  | 1 | 557 |  |             var rLock = this._state.ReadState.Lock; | 
|  |  | 558 |  |             do | 
|  |  | 559 |  |             { | 
|  | 1 | 560 |  |                var lockScope = await rLock.TryLockAsync( TimeSpan.FromMilliseconds( 100 ) ); | 
|  | 1 | 561 |  |                if ( lockScope.HasValue ) | 
|  |  | 562 |  |                { | 
|  | 1 | 563 |  |                   using ( lockScope ) | 
|  |  | 564 |  |                   { | 
|  | 1 | 565 |  |                      if ( receivedMessage == null ) | 
|  |  | 566 |  |                      { | 
|  | 1 | 567 |  |                         await this.PerformRead(); | 
|  |  | 568 |  |                      } | 
|  | 1 | 569 |  |                   } | 
|  |  | 570 |  |                } | 
|  | 1 | 571 |  |             } while ( receivedMessage == null ); | 
|  |  | 572 |  |  | 
|  | 1 | 573 |  |          } | 
|  |  | 574 |  |  | 
|  | 1 | 575 |  |          return receivedMessage; | 
|  | 1 | 576 |  |       } | 
|  |  | 577 |  |  | 
|  |  | 578 |  |  | 
|  |  | 579 |  |       private async Task PerformRead() | 
|  |  | 580 |  |       { | 
|  | 3 | 581 |  |          var states = this._subscriptionStates; | 
|  |  | 582 |  |          //const Int32 MIN_MESSAGE_SIZE = 5; | 
|  |  | 583 |  |  | 
|  | 3 | 584 |  |          var state = this._state; | 
|  | 3 | 585 |  |          var rState = state.ReadState; | 
|  | 3 | 586 |  |          var stream = state.Stream; | 
|  | 3 | 587 |  |          var buffer = rState.Buffer; | 
|  | 3 | 588 |  |          var encodingInfo = state.Encoding; | 
|  | 3 | 589 |  |          var stringPool = state.StringPool; | 
|  |  | 590 |  |  | 
|  | 3 | 591 |  |          var advanceState = rState.BufferAdvanceState; | 
|  |  | 592 |  |  | 
|  | 3 | 593 |  |          await stream.ReadUntilMaybeAsync( buffer, advanceState, ClientProtocolConsts.CRLF, ClientProtocolConsts.READ_CO | 
|  | 3 | 594 |  |          var crIdx = advanceState.BufferOffset; | 
|  | 6 | 595 |  |          while ( crIdx >= 0 ) | 
|  |  | 596 |  |          { | 
|  | 3 | 597 |  |             var array = buffer.Array; | 
|  |  | 598 |  |             // First byte will be integer's uppermost byte, second byte second uppermost, etc | 
|  | 3 | 599 |  |             var idx = 0; | 
|  | 3 | 600 |  |             var msgHeader = array.ReadInt32BEFromBytes( ref idx ); | 
|  | 3 | 601 |  |             var additionalByte = array[idx++]; | 
|  |  | 602 |  |             // At the end of the following switch statement, advanceState.BufferOffset should point to the CR byte of th | 
|  |  | 603 |  |             // Examine first byte | 
|  |  | 604 |  |             // x & 0x5F is to make ASCII lowercase letters (a-z) into uppercase letters (A-Z) | 
|  | 3 | 605 |  |             switch ( msgHeader & 0xFF000000 ) | 
|  |  | 606 |  |             { | 
|  |  | 607 |  |                case 0x2B000000: // 2B = '+' | 
|  | 0 | 608 |  |                   if ( ( msgHeader & 0x005F5FFF ) == 0x004F4B0D && additionalByte == 0x0A ) // +OK\r\n | 
|  |  | 609 |  |                   { | 
|  |  | 610 |  |                      // OK -message -> ignore | 
|  |  | 611 |  |                   } | 
|  |  | 612 |  |                   else | 
|  |  | 613 |  |                   { | 
|  | 0 | 614 |  |                      throw new Exception( "Protocol error" ); | 
|  |  | 615 |  |                   } | 
|  |  | 616 |  |                   break; | 
|  |  | 617 |  |                case 0x2D000000: // 2D = '-' | 
|  | 0 | 618 |  |                   if ( ( msgHeader & 0x005F5F5F ) == 0x00455252 && ( additionalByte == ClientProtocolConsts.SPACE || add | 
|  |  | 619 |  |                   { | 
|  |  | 620 |  |                      // -ERR<space/tab> -message, read textual error message | 
|  |  | 621 |  |                      // TODO close connection on errors that are fatal | 
|  | 0 | 622 |  |                      throw new Exception( "Protocol error: " + stringPool.GetString( buffer.Array, idx, advanceState.Buf | 
|  |  | 623 |  |                   } | 
|  |  | 624 |  |                   else | 
|  |  | 625 |  |                   { | 
|  | 0 | 626 |  |                      throw new Exception( "Protocol error" ); | 
|  |  | 627 |  |                   } | 
|  |  | 628 |  |                default: | 
|  | 3 | 629 |  |                   if ( ( msgHeader & 0x5F5F5F00 ) == 0x4D534700 ) // MSG | 
|  |  | 630 |  |                   { | 
|  | 3 | 631 |  |                      additionalByte = (Byte) ( msgHeader & 0x000000FF ); | 
|  | 3 | 632 |  |                      if ( additionalByte == ClientProtocolConsts.SPACE || additionalByte == ClientProtocolConsts.TAB ) | 
|  |  | 633 |  |                      { | 
|  |  | 634 |  |                         // Read the rest of the header | 
|  | 3 | 635 |  |                         array = buffer.Array; | 
|  |  | 636 |  |  | 
|  |  | 637 |  |                         // Count spaces/tabs (TODO make this treat multiple consecutive spaces as one) | 
|  | 3 | 638 |  |                         var spacesSeen = 0; | 
|  | 3 | 639 |  |                         var spaceIndices = rState.MessageSpaceIndices; | 
|  | 3 | 640 |  |                         Array.Clear( spaceIndices, 0, spaceIndices.Length ); | 
|  | 3 | 641 |  |                         var end = advanceState.BufferOffset; | 
|  | 214 | 642 |  |                         for ( var i = idx; i < end; ++i ) | 
|  |  | 643 |  |                         { | 
|  | 104 | 644 |  |                            if ( array[i] == ClientProtocolConsts.SPACE || array[i] == ClientProtocolConsts.TAB ) | 
|  |  | 645 |  |                            { | 
|  | 7 | 646 |  |                               var oldValue = spaceIndices[spacesSeen]; | 
|  | 7 | 647 |  |                               spaceIndices[spacesSeen] = i; | 
|  | 7 | 648 |  |                               if ( oldValue < i - 1 ) | 
|  |  | 649 |  |                               { | 
|  | 7 | 650 |  |                                  ++spacesSeen; | 
|  |  | 651 |  |                               } | 
|  |  | 652 |  |                            } | 
|  |  | 653 |  |                         } | 
|  | 3 | 654 |  |                         if ( spacesSeen < 2 || spacesSeen > 3 ) | 
|  |  | 655 |  |                         { | 
|  | 0 | 656 |  |                            throw new Exception( "Protocol error" ); | 
|  |  | 657 |  |                         } | 
|  |  | 658 |  |  | 
|  |  | 659 |  |                         // MSG <subject> <sid> [reply-to] <#bytes>\r\n[payload]\r\n | 
|  | 3 | 660 |  |                         var subjStart = idx - 1; | 
|  | 3 | 661 |  |                         var subjLen = spaceIndices[0] - subjStart; | 
|  | 3 | 662 |  |                         idx = spaceIndices[0] + 1; | 
|  | 3 | 663 |  |                         var subID = encodingInfo.ParseInt64Textual( array, ref idx, (spaceIndices[1] - spaceIndices[0] - | 
|  | 3 | 664 |  |                         var replyTo = spacesSeen > 2 ? | 
|  | 3 | 665 |  |                            stringPool.GetString( array, spaceIndices[1] + 1, spaceIndices[2] - spaceIndices[1] - 1 ) : | 
|  | 3 | 666 |  |                            null; | 
|  | 3 | 667 |  |                         idx = spaceIndices[spacesSeen - 1] + 1; | 
|  | 3 | 668 |  |                         var payloadSize = encodingInfo.ParseInt32Textual( array, ref idx, (end - idx, true) ); | 
|  | 3 | 669 |  |                         if ( payloadSize < 0 ) | 
|  |  | 670 |  |                         { | 
|  | 0 | 671 |  |                            throw new Exception( "Protocol error" ); | 
|  |  | 672 |  |                         } | 
|  | 3 | 673 |  |                         var readFromStreamCount = end + 2 + payloadSize + 2 - advanceState.BufferTotal; | 
|  | 3 | 674 |  |                         if ( readFromStreamCount > 0 ) | 
|  |  | 675 |  |                         { | 
|  | 0 | 676 |  |                            await stream.ReadSpecificAmountAsync( buffer, advanceState.BufferTotal, readFromStreamCount,  | 
|  | 0 | 677 |  |                            advanceState.ReadMore( readFromStreamCount ); | 
|  |  | 678 |  |                         } | 
|  | 3 | 679 |  |                         if ( states.TryGetValue( subID, out var subState ) ) | 
|  |  | 680 |  |                         { | 
|  |  | 681 |  |                            NATSMessageImpl readMessage; | 
|  | 3 | 682 |  |                            if ( replyTo == null && payloadSize <= 0 && subState.CachedMessage != null ) | 
|  |  | 683 |  |                            { | 
|  |  | 684 |  |                               // Use cached instance when no reply, no data, and same subject | 
|  | 0 | 685 |  |                               readMessage = subState.CachedMessage; | 
|  | 0 | 686 |  |                            } | 
|  |  | 687 |  |                            else | 
|  |  | 688 |  |                            { | 
|  |  | 689 |  |                               // TODO we can pool & rent NATSMessageImpl instances too, for when there is no reply and s | 
|  |  | 690 |  |                               // Rent byte array instance and make the message use it | 
|  | 3 | 691 |  |                               var byteArrayInstance = subState.ByteArrayPool.TakeInstance() ?? new InstanceHolder<Resiza | 
|  | 3 | 692 |  |                               subState.RentedByteArrays.Add( byteArrayInstance ); | 
|  | 3 | 693 |  |                               var messageData = byteArrayInstance.Instance.SetCapacityAndReturnArray( payloadSize ); | 
|  | 3 | 694 |  |                               Array.Copy( buffer.Array, end + 2, messageData, 0, payloadSize ); | 
|  | 3 | 695 |  |                               readMessage = new NATSMessageImpl( stringPool.GetString( array, subjStart, subjLen ), subI | 
|  |  | 696 |  |                            } | 
|  |  | 697 |  |  | 
|  |  | 698 |  |  | 
|  | 3 | 699 |  |                            if ( subState.IsGlobal ) | 
|  |  | 700 |  |                            { | 
|  | 1 | 701 |  |                               this.GlobalSubscriptionMessageReceived?.Invoke( new GlobalSubscriptionEventArgs( readMessa | 
|  | 1 | 702 |  |                            } | 
|  |  | 703 |  |                            else | 
|  |  | 704 |  |                            { | 
|  | 2 | 705 |  |                               subState.MessageQueue.Enqueue( readMessage ); | 
|  |  | 706 |  |                            } | 
|  |  | 707 |  |                         } | 
|  |  | 708 |  |  | 
|  | 3 | 709 |  |                         advanceState.Advance( payloadSize + 2 ); | 
|  | 3 | 710 |  |                      } | 
|  |  | 711 |  |                      else | 
|  |  | 712 |  |                      { | 
|  | 0 | 713 |  |                         throw new Exception( "Protocol error" ); | 
|  |  | 714 |  |                      } | 
|  |  | 715 |  |                   } | 
|  |  | 716 |  |                   else | 
|  |  | 717 |  |                   { | 
|  | 0 | 718 |  |                      var additionalByte2 = array[idx++]; | 
|  | 0 | 719 |  |                      switch ( msgHeader & ClientProtocolConsts.UPPERCASE_MASK_FULL ) | 
|  |  | 720 |  |                      { | 
|  |  | 721 |  |                         case 0x50494E47: // PING | 
|  | 0 | 722 |  |                            if ( additionalByte == ClientProtocolConsts.CR && additionalByte2 == ClientProtocolConsts.LF  | 
|  |  | 723 |  |                            { | 
|  |  | 724 |  |                               // Send back PONG ( we purposefully don't 'await' for this task) | 
|  |  | 725 |  | #pragma warning disable 4014 | 
|  | 0 | 726 |  |                               this.WritePong(); | 
|  |  | 727 |  | #pragma warning restore 4014 | 
|  | 0 | 728 |  |                            } | 
|  |  | 729 |  |                            else | 
|  |  | 730 |  |                            { | 
|  | 0 | 731 |  |                               throw new Exception( "Protocol error" ); | 
|  |  | 732 |  |                            } | 
|  |  | 733 |  |                            break; | 
|  |  | 734 |  |                         case 0x504F4E47: // PONG | 
|  |  | 735 |  |                                          // Currently, unused, since client never sends pings. | 
|  | 0 | 736 |  |                            if ( additionalByte == ClientProtocolConsts.CR && additionalByte2 == ClientProtocolConsts.LF  | 
|  |  | 737 |  |                            { | 
|  |  | 738 |  |                            } | 
|  |  | 739 |  |                            else | 
|  |  | 740 |  |                            { | 
|  | 0 | 741 |  |                               throw new Exception( "Protocol error" ); | 
|  |  | 742 |  |                            } | 
|  |  | 743 |  |                            break; | 
|  |  | 744 |  |                         case ClientProtocolConsts.INFO_INT: // INFO | 
|  | 0 | 745 |  |                            if ( additionalByte == ClientProtocolConsts.SPACE || additionalByte == ClientProtocolConsts.T | 
|  |  | 746 |  |                            { | 
|  | 0 | 747 |  |                               var info = DeserializeInfoMessage( buffer.Array, idx, advanceState.BufferOffset - idx, enc | 
|  |  | 748 |  |                               // TODO implement dynamic handling of INFO message | 
|  | 0 | 749 |  |                            } | 
|  |  | 750 |  |                            else | 
|  |  | 751 |  |                            { | 
|  | 0 | 752 |  |                               throw new Exception( "Protocol error" ); | 
|  |  | 753 |  |                            } | 
|  |  | 754 |  |                            break; | 
|  |  | 755 |  |                         default: | 
|  | 0 | 756 |  |                            throw new Exception( "Protocol error" ); | 
|  |  | 757 |  |                      } | 
|  |  | 758 |  |                   } | 
|  |  | 759 |  |                   break; | 
|  |  | 760 |  |             } | 
|  |  | 761 |  |  | 
|  |  | 762 |  |             // Remember to shift remaining data to the beginning of byte array | 
|  |  | 763 |  |             // TODO optimize: we don't need to do this on every loop. | 
|  | 3 | 764 |  |             SetPreReadLength( rState ); | 
|  | 3 | 765 |  |             crIdx = array.IndexOfArray( 0, advanceState.BufferTotal, ClientProtocolConsts.CRLF ); | 
|  | 3 | 766 |  |             advanceState.Advance( crIdx < 0 ? advanceState.BufferTotal : crIdx ); | 
|  | 3 | 767 |  |          } | 
|  | 3 | 768 |  |       } | 
|  |  | 769 |  |  | 
|  |  | 770 |  |       internal static void SetPreReadLength( ReadState rState ) | 
|  |  | 771 |  |       { | 
|  | 7 | 772 |  |          var aState = rState.BufferAdvanceState; | 
|  | 7 | 773 |  |          var end = aState.BufferOffset; | 
|  | 7 | 774 |  |          var preReadLength = aState.BufferTotal; | 
|  |  | 775 |  |          // Messages end with CRLF | 
|  | 7 | 776 |  |          end += 2; | 
|  | 7 | 777 |  |          var remainingData = preReadLength - end; | 
|  | 7 | 778 |  |          if ( remainingData > 0 ) | 
|  |  | 779 |  |          { | 
|  | 0 | 780 |  |             var array = rState.Buffer.Array; | 
|  | 0 | 781 |  |             Array.Copy( array, end, array, 0, remainingData ); | 
|  |  | 782 |  |          } | 
|  | 7 | 783 |  |          aState.Reset(); | 
|  | 7 | 784 |  |          aState.ReadMore( remainingData ); | 
|  |  | 785 |  |  | 
|  | 7 | 786 |  |       } | 
|  |  | 787 |  |  | 
|  |  | 788 |  |       internal static ServerInformation DeserializeInfoMessage( Byte[] array, Int32 offset, Int32 count, Encoding encodi | 
|  |  | 789 |  |       { | 
|  | 4 | 790 |  |          using ( var mStream = new MemoryStream( array, offset, count ) ) | 
|  | 4 | 791 |  |          using ( var reader = new StreamReader( mStream, encoding ) ) | 
|  | 4 | 792 |  |          using ( var jReader = new JsonTextReader( reader ) ) | 
|  |  | 793 |  |          { | 
|  |  | 794 |  |             //   return (JObject) JToken.Load( jReader ); | 
|  | 4 | 795 |  |             return JsonSerializer.CreateDefault().Deserialize<ServerInformation>( jReader ); | 
|  |  | 796 |  |          } | 
|  | 4 | 797 |  |       } | 
|  |  | 798 |  |  | 
|  |  | 799 |  |       internal static Byte[] SerializeConnectMessage( ClientInformation clientInfo, Encoding encoding ) | 
|  |  | 800 |  |       { | 
|  | 4 | 801 |  |          using ( var mStream = new MemoryStream( 0x100 ) ) | 
|  |  | 802 |  |          { | 
|  | 4 | 803 |  |             using ( var writer = new StreamWriter( mStream, encoding ) ) | 
|  |  | 804 |  |             { | 
|  | 4 | 805 |  |                JsonSerializer.CreateDefault().Serialize( writer, clientInfo ); | 
|  | 4 | 806 |  |                writer.Flush(); | 
|  | 4 | 807 |  |                return mStream.ToArray(); | 
|  |  | 808 |  |             } | 
|  |  | 809 |  |          } | 
|  | 4 | 810 |  |       } | 
|  |  | 811 |  |  | 
|  |  | 812 |  |       internal static async Task InitializeNewConnection( | 
|  |  | 813 |  |          ClientInformation clientInfo, | 
|  |  | 814 |  |          Encoding encoding, | 
|  |  | 815 |  |          WriteState wState, | 
|  |  | 816 |  |          Stream stream, | 
|  |  | 817 |  |          CancellationToken token | 
|  |  | 818 |  |          ) | 
|  |  | 819 |  |       { | 
|  | 4 | 820 |  |          var connectBytes = SerializeConnectMessage( clientInfo, encoding ); | 
|  |  | 821 |  |  | 
|  | 4 | 822 |  |          var msgLength = 10 + connectBytes.Length; | 
|  | 4 | 823 |  |          var array = wState.Buffer.SetCapacityAndReturnArray( msgLength ); | 
|  | 4 | 824 |  |          var idx = 0; | 
|  | 4 | 825 |  |          array.WriteASCIIString( ref idx, ClientProtocolConsts.CONNECT_PREFIX ); | 
|  | 4 | 826 |  |          connectBytes.CopyTo( array, idx ); | 
|  | 4 | 827 |  |          idx += connectBytes.Length; | 
|  | 4 | 828 |  |          array.WriteASCIIString( ref idx, ClientProtocolConsts.CRLF ); | 
|  |  | 829 |  |  | 
|  | 4 | 830 |  |          await stream.WriteAsync( array, 0, msgLength, token ); | 
|  | 4 | 831 |  |          await stream.FlushAsync( token ); | 
|  | 4 | 832 |  |       } | 
|  |  | 833 |  |    } | 
|  |  | 834 |  | } | 
|  |  | 835 |  |  |