Summary

Class:CBAM.NATS.Implementation.NATSConnectionPoolProvider
Assembly:CBAM.NATS.Implementation
File(s):/repo-dir/contents/Source/Code/CBAM.NATS.Implementation/ConnectionPoolProvider.cs
Covered lines:87
Uncovered lines:12
Coverable lines:99
Total lines:156
Line coverage:87.8%
Branch coverage:0%

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
.cctor()101%0%
.ctor()100%0%
CreateFactory()100%0%
TransformFactoryParameters(...)400%0%

File(s)

/repo-dir/contents/Source/Code/CBAM.NATS.Implementation/ConnectionPoolProvider.cs

#LineLine coverage
 1/*
 2 * Copyright 2018 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using CBAM.Abstractions.Implementation.NetworkStream;
 19using IOUtils.Network.Configuration;
 20using ResourcePooling.Async.Abstractions;
 21using System;
 22using System.Collections.Generic;
 23using System.IO;
 24using System.Text;
 25using System.Threading;
 26using System.Threading.Tasks;
 27using UtilPack;
 28
 29namespace CBAM.NATS.Implementation
 30{
 31   using TIntermediateState = ValueTuple<ClientProtocol.ReadState, Reference<ServerInformation>, CancellationToken, Stre
 32
 33   public sealed class NATSConnectionPoolProvider : AbstractAsyncResourceFactoryProvider<NATSConnection, NATSConnectionC
 34   {
 335      public static AsyncResourceFactory<NATSConnection, NATSConnectionCreationInfo> Factory { get; } = new DefaultAsync
 336         config.NewFactoryParametrizer<NATSConnectionCreationInfo, NATSConnectionCreationInfoData, NATSConnectionConfigu
 337            .BindPublicConnectionType<NATSConnection>()
 338            .CreateStatefulDelegatingConnectionFactory(
 339               Encoding.ASCII.CreateDefaultEncodingInfo(),
 740               ( parameters, encodingInfo, stringPool, stringPoolIsDedicated, socketOrNull, stream, token ) => new TInte
 341               async ( parameters, encodingInfo, stringPool, stringPoolIsDedicated, state ) =>
 342               {
 343                  // First, read the INFO message from server
 744                  var rState = state.Item1;
 745                  var buffer = rState.Buffer;
 746                  var aState = rState.BufferAdvanceState;
 747                  await state.Item4.ReadUntilMaybeAsync( buffer, aState, ClientProtocolConsts.CRLF, ClientProtocolConsts
 748                  var array = buffer.Array;
 749                  var idx = 0;
 750                  var end = aState.BufferOffset;
 751                  if ( end < 7
 752                  || ( array.ReadInt32BEFromBytes( ref idx ) & ClientProtocolConsts.UPPERCASE_MASK_FULL ) != ClientProto
 753                  || ( array[idx] != ClientProtocolConsts.SPACE
 754                     && array[idx] != ClientProtocolConsts.TAB )
 755                  )
 356                  {
 357                     throw new NATSException( "Invalid INFO message at startup." );
 358                  }
 759                  ++idx;
 360
 761                  var serverInfo = ClientProtocol.DeserializeInfoMessage( array, idx, end - idx, NATSAuthenticationConfi
 762                  state.Item2.Value = serverInfo;
 763                  var sslMode = parameters.CreationData.Connection?.ConnectionSSLMode ?? ConnectionSSLMode.NotRequired;
 764                  var serverNeedsSSL = serverInfo.SSLRequired;
 365
 766                  if ( serverNeedsSSL && sslMode == ConnectionSSLMode.NotRequired )
 367                  {
 368                     throw new NATSException( "Server requires SSL, but client does not." );
 369                  }
 770                  else if ( !serverNeedsSSL && sslMode == ConnectionSSLMode.Required )
 371                  {
 372                     throw new NATSException( "Client requires SSL, but server does not." );
 373                  }
 774                  else if ( serverInfo.AuthenticationRequired )
 375                  {
 376                     throw new NotImplementedException();
 377                  }
 378
 379                  // We should not receive anything else except info message at start, but let's just make sure we leave
 780                  ClientProtocol.SetPreReadLength( rState );
 381
 782                  return serverNeedsSSL;
 783               },
 384               () => new NATSException( "Server accepted SSL request, but the creation parameters did not have callback 
 385               () => new NATSException( "Server does not support SSL." ),
 386               () => new NATSException( "SSL stream creation callback returned null." ),
 387               () => new NATSException( "Authentication callback given by SSL stream creation callback was null." ),
 388               inner => new NATSException( "Unable to start SSL client.", inner ),
 389               async ( parameters, encodingInfo, stringPool, stringPoolIsDedicated, stream, socketOrNull, token, state )
 390               {
 791                  var paramData = parameters.CreationData;
 792                  var initConfig = paramData.Initialization;
 793                  var protoConfig = initConfig?.Protocol ?? new NATSProtocolConfiguration();
 794                  var authConfig = initConfig?.Authentication;
 795                  var wState = new ClientProtocol.WriteState();
 796                  var serverInfo = state.Item2.Value;
 797                  await ClientProtocol.InitializeNewConnection( new ClientInformation()
 798                  {
 799                     IsVerbose = protoConfig.Verbose,
 7100                     IsPedantic = protoConfig.Pedantic,
 7101                     SSLRequired = serverInfo.SSLRequired,
 7102                     AuthenticationToken = authConfig?.AuthenticationToken,
 7103                     Username = authConfig?.Username,
 7104                     Password = authConfig?.Password,
 7105                     ClientName = protoConfig.ClientName,
 7106                     ClientLanguage = protoConfig.ClientLanguage,
 7107                     ClientVersion = protoConfig.ClientVersion
 7108                  }, NATSAuthenticationConfiguration.PasswordByteEncoding, wState, stream, token );
 3109
 7110                  return new ClientProtocolPoolInfo( new ClientProtocol( new ClientProtocol.ClientProtocolIOState(
 7111                     new DuplexBufferedAsyncStream( stream, Math.Max( NATSProtocolConfiguration.DEFAULT_BUFFER_SIZE, pro
 7112                     stringPool,
 7113                     encodingInfo,
 7114                     wState,
 7115                     state.Item1
 7116                     ), serverInfo ) );
 7117               },
 7118               protocol => new ValueTask<NATSConnectionImpl>( new NATSConnectionImpl( NATSConnectionVendorFunctionalityI
 7119               ( protocol, connection ) => new CBAM.Abstractions.Implementation.StatelessConnectionAcquireInfo<NATSConne
 3120               ( functionality, connection, token, error ) => functionality.Protocol?.Stream
 3121               ) );
 122
 123
 124      public NATSConnectionPoolProvider()
 0125         : base( typeof( NATSConnectionCreationInfoData ) )
 126      {
 0127      }
 128
 129      protected override AsyncResourceFactory<NATSConnection, NATSConnectionCreationInfo> CreateFactory()
 130      {
 0131         return Factory;
 132      }
 133
 134      protected override NATSConnectionCreationInfo TransformFactoryParameters( Object creationParameters )
 135      {
 0136         ArgumentValidator.ValidateNotNull( nameof( creationParameters ), creationParameters );
 137
 138         NATSConnectionCreationInfo retVal;
 0139         if ( creationParameters is NATSConnectionCreationInfoData creationData )
 140         {
 0141            retVal = new NATSConnectionCreationInfo( creationData );
 142
 0143         }
 0144         else if ( creationParameters is NATSConnectionCreationInfo creationInfo )
 145         {
 0146            retVal = creationInfo;
 0147         }
 148         else
 149         {
 0150            throw new ArgumentException( $"The {nameof( creationParameters )} must be instance of {typeof( NATSConnectio
 151         }
 152
 0153         return retVal;
 154      }
 155   }
 156}