| | 1 | | /* |
| | 2 | | * Copyright 2018 Stanislav Muhametsin. All rights Reserved. |
| | 3 | | * |
| | 4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
| | 5 | | * you may not use this file except in compliance with the License. |
| | 6 | | * You may obtain a copy of the License at |
| | 7 | | * |
| | 8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
| | 9 | | * |
| | 10 | | * Unless required by applicable law or agreed to in writing, software |
| | 11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
| | 12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| | 13 | | * implied. |
| | 14 | | * |
| | 15 | | * See the License for the specific language governing permissions and |
| | 16 | | * limitations under the License. |
| | 17 | | */ |
| | 18 | | using CBAM.Abstractions.Implementation.NetworkStream; |
| | 19 | | using IOUtils.Network.Configuration; |
| | 20 | | using ResourcePooling.Async.Abstractions; |
| | 21 | | using System; |
| | 22 | | using System.Collections.Generic; |
| | 23 | | using System.IO; |
| | 24 | | using System.Text; |
| | 25 | | using System.Threading; |
| | 26 | | using System.Threading.Tasks; |
| | 27 | | using UtilPack; |
| | 28 | |
|
| | 29 | | namespace CBAM.NATS.Implementation |
| | 30 | | { |
| | 31 | | using TIntermediateState = ValueTuple<ClientProtocol.ReadState, Reference<ServerInformation>, CancellationToken, Stre |
| | 32 | |
|
| | 33 | | public sealed class NATSConnectionPoolProvider : AbstractAsyncResourceFactoryProvider<NATSConnection, NATSConnectionC |
| | 34 | | { |
| 3 | 35 | | public static AsyncResourceFactory<NATSConnection, NATSConnectionCreationInfo> Factory { get; } = new DefaultAsync |
| 3 | 36 | | config.NewFactoryParametrizer<NATSConnectionCreationInfo, NATSConnectionCreationInfoData, NATSConnectionConfigu |
| 3 | 37 | | .BindPublicConnectionType<NATSConnection>() |
| 3 | 38 | | .CreateStatefulDelegatingConnectionFactory( |
| 3 | 39 | | Encoding.ASCII.CreateDefaultEncodingInfo(), |
| 7 | 40 | | ( parameters, encodingInfo, stringPool, stringPoolIsDedicated, socketOrNull, stream, token ) => new TInte |
| 3 | 41 | | async ( parameters, encodingInfo, stringPool, stringPoolIsDedicated, state ) => |
| 3 | 42 | | { |
| 3 | 43 | | // First, read the INFO message from server |
| 7 | 44 | | var rState = state.Item1; |
| 7 | 45 | | var buffer = rState.Buffer; |
| 7 | 46 | | var aState = rState.BufferAdvanceState; |
| 7 | 47 | | await state.Item4.ReadUntilMaybeAsync( buffer, aState, ClientProtocolConsts.CRLF, ClientProtocolConsts |
| 7 | 48 | | var array = buffer.Array; |
| 7 | 49 | | var idx = 0; |
| 7 | 50 | | var end = aState.BufferOffset; |
| 7 | 51 | | if ( end < 7 |
| 7 | 52 | | || ( array.ReadInt32BEFromBytes( ref idx ) & ClientProtocolConsts.UPPERCASE_MASK_FULL ) != ClientProto |
| 7 | 53 | | || ( array[idx] != ClientProtocolConsts.SPACE |
| 7 | 54 | | && array[idx] != ClientProtocolConsts.TAB ) |
| 7 | 55 | | ) |
| 3 | 56 | | { |
| 3 | 57 | | throw new NATSException( "Invalid INFO message at startup." ); |
| 3 | 58 | | } |
| 7 | 59 | | ++idx; |
| 3 | 60 | |
|
| 7 | 61 | | var serverInfo = ClientProtocol.DeserializeInfoMessage( array, idx, end - idx, NATSAuthenticationConfi |
| 7 | 62 | | state.Item2.Value = serverInfo; |
| 7 | 63 | | var sslMode = parameters.CreationData.Connection?.ConnectionSSLMode ?? ConnectionSSLMode.NotRequired; |
| 7 | 64 | | var serverNeedsSSL = serverInfo.SSLRequired; |
| 3 | 65 | |
|
| 7 | 66 | | if ( serverNeedsSSL && sslMode == ConnectionSSLMode.NotRequired ) |
| 3 | 67 | | { |
| 3 | 68 | | throw new NATSException( "Server requires SSL, but client does not." ); |
| 3 | 69 | | } |
| 7 | 70 | | else if ( !serverNeedsSSL && sslMode == ConnectionSSLMode.Required ) |
| 3 | 71 | | { |
| 3 | 72 | | throw new NATSException( "Client requires SSL, but server does not." ); |
| 3 | 73 | | } |
| 7 | 74 | | else if ( serverInfo.AuthenticationRequired ) |
| 3 | 75 | | { |
| 3 | 76 | | throw new NotImplementedException(); |
| 3 | 77 | | } |
| 3 | 78 | |
|
| 3 | 79 | | // We should not receive anything else except info message at start, but let's just make sure we leave |
| 7 | 80 | | ClientProtocol.SetPreReadLength( rState ); |
| 3 | 81 | |
|
| 7 | 82 | | return serverNeedsSSL; |
| 7 | 83 | | }, |
| 3 | 84 | | () => new NATSException( "Server accepted SSL request, but the creation parameters did not have callback |
| 3 | 85 | | () => new NATSException( "Server does not support SSL." ), |
| 3 | 86 | | () => new NATSException( "SSL stream creation callback returned null." ), |
| 3 | 87 | | () => new NATSException( "Authentication callback given by SSL stream creation callback was null." ), |
| 3 | 88 | | inner => new NATSException( "Unable to start SSL client.", inner ), |
| 3 | 89 | | async ( parameters, encodingInfo, stringPool, stringPoolIsDedicated, stream, socketOrNull, token, state ) |
| 3 | 90 | | { |
| 7 | 91 | | var paramData = parameters.CreationData; |
| 7 | 92 | | var initConfig = paramData.Initialization; |
| 7 | 93 | | var protoConfig = initConfig?.Protocol ?? new NATSProtocolConfiguration(); |
| 7 | 94 | | var authConfig = initConfig?.Authentication; |
| 7 | 95 | | var wState = new ClientProtocol.WriteState(); |
| 7 | 96 | | var serverInfo = state.Item2.Value; |
| 7 | 97 | | await ClientProtocol.InitializeNewConnection( new ClientInformation() |
| 7 | 98 | | { |
| 7 | 99 | | IsVerbose = protoConfig.Verbose, |
| 7 | 100 | | IsPedantic = protoConfig.Pedantic, |
| 7 | 101 | | SSLRequired = serverInfo.SSLRequired, |
| 7 | 102 | | AuthenticationToken = authConfig?.AuthenticationToken, |
| 7 | 103 | | Username = authConfig?.Username, |
| 7 | 104 | | Password = authConfig?.Password, |
| 7 | 105 | | ClientName = protoConfig.ClientName, |
| 7 | 106 | | ClientLanguage = protoConfig.ClientLanguage, |
| 7 | 107 | | ClientVersion = protoConfig.ClientVersion |
| 7 | 108 | | }, NATSAuthenticationConfiguration.PasswordByteEncoding, wState, stream, token ); |
| 3 | 109 | |
|
| 7 | 110 | | return new ClientProtocolPoolInfo( new ClientProtocol( new ClientProtocol.ClientProtocolIOState( |
| 7 | 111 | | new DuplexBufferedAsyncStream( stream, Math.Max( NATSProtocolConfiguration.DEFAULT_BUFFER_SIZE, pro |
| 7 | 112 | | stringPool, |
| 7 | 113 | | encodingInfo, |
| 7 | 114 | | wState, |
| 7 | 115 | | state.Item1 |
| 7 | 116 | | ), serverInfo ) ); |
| 7 | 117 | | }, |
| 7 | 118 | | protocol => new ValueTask<NATSConnectionImpl>( new NATSConnectionImpl( NATSConnectionVendorFunctionalityI |
| 7 | 119 | | ( protocol, connection ) => new CBAM.Abstractions.Implementation.StatelessConnectionAcquireInfo<NATSConne |
| 3 | 120 | | ( functionality, connection, token, error ) => functionality.Protocol?.Stream |
| 3 | 121 | | ) ); |
| | 122 | |
|
| | 123 | |
|
| | 124 | | public NATSConnectionPoolProvider() |
| 0 | 125 | | : base( typeof( NATSConnectionCreationInfoData ) ) |
| | 126 | | { |
| 0 | 127 | | } |
| | 128 | |
|
| | 129 | | protected override AsyncResourceFactory<NATSConnection, NATSConnectionCreationInfo> CreateFactory() |
| | 130 | | { |
| 0 | 131 | | return Factory; |
| | 132 | | } |
| | 133 | |
|
| | 134 | | protected override NATSConnectionCreationInfo TransformFactoryParameters( Object creationParameters ) |
| | 135 | | { |
| 0 | 136 | | ArgumentValidator.ValidateNotNull( nameof( creationParameters ), creationParameters ); |
| | 137 | |
|
| | 138 | | NATSConnectionCreationInfo retVal; |
| 0 | 139 | | if ( creationParameters is NATSConnectionCreationInfoData creationData ) |
| | 140 | | { |
| 0 | 141 | | retVal = new NATSConnectionCreationInfo( creationData ); |
| | 142 | |
|
| 0 | 143 | | } |
| 0 | 144 | | else if ( creationParameters is NATSConnectionCreationInfo creationInfo ) |
| | 145 | | { |
| 0 | 146 | | retVal = creationInfo; |
| 0 | 147 | | } |
| | 148 | | else |
| | 149 | | { |
| 0 | 150 | | throw new ArgumentException( $"The {nameof( creationParameters )} must be instance of {typeof( NATSConnectio |
| | 151 | | } |
| | 152 | |
|
| 0 | 153 | | return retVal; |
| | 154 | | } |
| | 155 | | } |
| | 156 | | } |