Summary

Class:CBAM.SQL.PostgreSQL.Implementation.E_TODO
Assembly:CBAM.SQL.PostgreSQL.Implementation
File(s):/repo-dir/contents/Source/Code/CBAM.SQL.PostgreSQL.Implementation/Protocol.cs
Covered lines:1
Uncovered lines:0
Coverable lines:1
Total lines:1353
Line coverage:100%
Branch coverage:50%

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
GetResultForceSynchronous(...)201%0.5%

File(s)

/repo-dir/contents/Source/Code/CBAM.SQL.PostgreSQL.Implementation/Protocol.cs

#LineLine coverage
 1/*
 2 * Copyright 2017 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using CBAM.Abstractions;
 19using CBAM.Abstractions.Implementation;
 20using CBAM.SQL.Implementation;
 21using CBAM.SQL.PostgreSQL;
 22using CBAM.SQL.PostgreSQL.Implementation;
 23using System;
 24using System.Collections.Generic;
 25using System.IO;
 26using System.Linq;
 27using System.Net;
 28using System.Text;
 29using System.Threading;
 30using System.Threading.Tasks;
 31using UtilPack;
 32using MessageIOArgs = System.ValueTuple<CBAM.SQL.PostgreSQL.BackendABIHelper, System.IO.Stream, System.Threading.Cancell
 33using TBoundTypeInfo = System.ValueTuple<System.Type, CBAM.SQL.PostgreSQL.PgSQLTypeFunctionality, CBAM.SQL.PostgreSQL.Pg
 34using FluentCryptography.SASL;
 35using FluentCryptography.SASL.SCRAM;
 36using AsyncEnumeration.Implementation.Enumerable;
 37using AsyncEnumeration.Implementation.Provider;
 38
 39#if !NETSTANDARD1_0
 40using System.Net.Sockets;
 41#endif
 42
 43namespace CBAM.SQL.PostgreSQL.Implementation
 44{
 45   using TSASLAuthState = System.ValueTuple<SASLMechanism, SASLCredentialsSCRAMForClient, ResizableArray<Byte>, IEncodin
 46   using TStatementExecutionSimpleTaskParameter = System.ValueTuple<SQLStatementExecutionResult, Func<ValueTask<(Boolean
 47
 48   internal sealed partial class PostgreSQLProtocol : SQLConnectionFunctionalitySU<PgSQLConnectionVendorFunctionality>
 49   {
 50
 51
 52      private Int32 _lastSeenTransactionStatus;
 53      private readonly IDictionary<String, String> _serverParameters;
 54      //private Int32 _standardConformingStrings;
 55      private readonly Version _serverVersion;
 56
 57      public PostgreSQLProtocol(
 58         PgSQLConnectionVendorFunctionality vendorFunctionality,
 59         Boolean disableBinaryProtocolSend,
 60         Boolean disableBinaryProtocolReceive,
 61         BackendABIHelper messageIOArgs,
 62         Stream stream,
 63         ResizableArray<Byte> buffer,
 64         IDictionary<String, String> serverParameters,
 65         TransactionStatus status,
 66         Int32 backendPID
 67#if !NETSTANDARD1_0
 68         , Socket socket
 69#endif
 70         ) : base( vendorFunctionality, DefaultAsyncProvider.Instance )
 71      {
 72         this.DisableBinaryProtocolSend = disableBinaryProtocolSend;
 73         this.DisableBinaryProtocolReceive = disableBinaryProtocolReceive;
 74         this.MessageIOArgs = ArgumentValidator.ValidateNotNull( nameof( messageIOArgs ), messageIOArgs );
 75         this.Stream = ArgumentValidator.ValidateNotNull( nameof( stream ), stream );
 76#if !NETSTANDARD1_0
 77         this.Socket = socket;
 78#endif
 79         this.Buffer = buffer ?? new ResizableArray<Byte>( 8, exponentialResize: true );
 80         this.DataRowColumnSizes = new ResizableArray<ResettableTransformable<Int32?, Int32>>( exponentialResize: false 
 81         this._serverParameters = ArgumentValidator.ValidateNotNull( nameof( serverParameters ), serverParameters );
 82         this.ServerParameters = new System.Collections.ObjectModel.ReadOnlyDictionary<String, String>( serverParameters
 83         this.TypeRegistry = new TypeRegistryImpl( vendorFunctionality, sql => this.PrepareStatementForExecution( vendor
 84
 85         if ( serverParameters.TryGetValue( "server_version", out var serverVersionString ) )
 86         {
 87            // Parse server version
 88            var i = 0;
 89            var version = serverVersionString.Trim();
 90            while ( i < version.Length && ( Char.IsDigit( version[i] ) || version[i] == '.' ) )
 91            {
 92               ++i;
 93            }
 94            this._serverVersion = new Version( version.Substring( 0, i ) );
 95
 96         }
 97
 98         // Min supported version is 8.4.
 99         var serverVersion = this._serverVersion;
 100         if ( serverVersion != null && ( serverVersion.Major < 8 || ( serverVersion.Major == 8 && serverVersion.Minor < 
 101         {
 102            throw new PgSQLException( "Unsupported server version: " + serverVersion + "." );
 103         }
 104         this.LastSeenTransactionStatus = status;
 105         this.BackendProcessID = backendPID;
 106         this.EnqueuedNotifications = new Queue<NotificationEventArgs>();
 107      }
 108
 109      public TypeRegistryImpl TypeRegistry { get; }
 110
 111      public Int32 BackendProcessID { get; }
 112
 113      public IReadOnlyDictionary<String, String> ServerParameters { get; }
 114
 115      protected override ReservedForStatement CreateReservationObject( SQLStatementBuilderInformation stmt )
 116      {
 117         return new PgReservedForStatement(
 118#if DEBUG
 119            stmt,
 120#endif
 121            stmt.IsSimple(),
 122            stmt.HasBatchParameters() ? "cbam_statement" : null
 123            );
 124      }
 125
 126      protected override void ValidateStatementOrThrow( SQLStatementBuilderInformation statement )
 127      {
 128         ArgumentValidator.ValidateNotNull( nameof( statement ), statement );
 129         if ( statement.BatchParameterCount > 1 )
 130         {
 131            // Verify that all columns have same typeIDs
 132            var first = statement
 133               .GetParametersEnumerable( 0 )
 134               .Select( param => this.TypeRegistry.TryGetTypeInfo( param.ParameterCILType ).DatabaseData.TypeID )
 135               .ToArray();
 136            var max = statement.BatchParameterCount;
 137            for ( var i = 1; i < max; ++i )
 138            {
 139               var j = 0;
 140               foreach ( var param in statement.GetParametersEnumerable( i ) )
 141               {
 142                  if ( first[j] != this.TypeRegistry.TryGetTypeInfo( param.ParameterCILType ).DatabaseData.TypeID )
 143                  {
 144                     throw new PgSQLException( "When using batch parameters, columns must have same type IDs for all bat
 145                  }
 146                  ++j;
 147               }
 148            }
 149         }
 150      }
 151
 152      private static (Int32[] ParameterIndices, TypeFunctionalityInformation[] TypeInfos, Int32[] TypeIDs) GetVariablesF
 153         SQLStatementBuilderInformation stmt,
 154         TypeRegistry typeRegistry,
 155         Func<SQLStatementBuilderInformation, Int32, StatementParameter> paramExtractor
 156         )
 157      {
 158         var pCount = stmt.SQLParameterCount;
 159         TypeFunctionalityInformation[] typeInfos;
 160         Int32[] typeIDs;
 161         if ( pCount > 0 )
 162         {
 163            typeInfos = new TypeFunctionalityInformation[pCount];
 164            typeIDs = new Int32[pCount];
 165            for ( var i = 0; i < pCount; ++i )
 166            {
 167               var param = paramExtractor( stmt, i );
 168               var typeInfo = typeRegistry.TryGetTypeInfo( param.ParameterCILType );
 169               typeInfos[i] = typeInfo;
 170               typeIDs[i] = typeInfo?.DatabaseData?.TypeID ?? 0;
 171            }
 172         }
 173         else
 174         {
 175            typeInfos = Empty<TypeFunctionalityInformation>.Array;
 176            typeIDs = Empty<Int32>.Array;
 177         }
 178
 179         return (( (PgSQLStatementBuilderInformation) stmt ).ParameterIndices, typeInfos, typeIDs);
 180      }
 181
 182      private MessageIOArgs GetIOArgs( ResizableArray<Byte> bufferToUse = null, CancellationToken? tokenToUse = null )
 183      {
 184         return (this.MessageIOArgs, this.Stream, tokenToUse ?? this.CurrentCancellationToken, bufferToUse ?? this.Buffe
 185      }
 186
 187      protected override async ValueTask<TStatementExecutionSimpleTaskParameter> ExecuteStatementAsBatch(
 188         SQLStatementBuilderInformation statement,
 189         ReservedForStatement reservedState
 190         )
 191      {
 192         // TODO somehow make statement name and chunk size parametrizable
 193         (var parameterIndices, var typeInfos, var typeIDs) = GetVariablesForExtendedQuerySequence( statement, this.Type
 194         var ioArgs = this.GetIOArgs();
 195         var stmtName = ( (PgReservedForStatement) reservedState ).StatementName;
 196         var chunkSize = 1000;
 197
 198         // Send a parse message with statement name
 199         await new ParseMessage( statement.SQL, parameterIndices, typeIDs, stmtName ).SendMessageAsync( ioArgs, true );
 200
 201         // Now send describe message
 202         await new DescribeMessage( true, stmtName ).SendMessageAsync( ioArgs, true );
 203
 204         // And then Flush message for backend to send responses
 205         await FrontEndMessageWithNoContent.FLUSH.SendMessageAsync( ioArgs, false );
 206
 207         // Receive first batch of messages
 208         BackendMessageObject msg = null;
 209         SQLStatementExecutionResult current = null;
 210         List<PgSQLError> notices = new List<PgSQLError>();
 211         var sendBatch = true;
 212         while ( msg == null )
 213         {
 214            msg = ( await this.ReadMessagesUntilMeaningful( notices ) ).Item1;
 215            switch ( msg )
 216            {
 217               case MessageWithNoContents nc:
 218                  switch ( nc.Code )
 219                  {
 220                     case BackendMessageCode.ParseComplete:
 221                        // Continue reading messages
 222                        msg = null;
 223                        break;
 224                     case BackendMessageCode.EmptyQueryResponse:
 225                        // The statement does not produce any data, we are done
 226                        sendBatch = false;
 227                        break;
 228                     case BackendMessageCode.NoData:
 229                        // Do nothing, thus causing batch messages to be sent
 230                        break;
 231                     default:
 232                        throw new PgSQLException( "Unrecognized response at this point: " + msg.Code );
 233                  }
 234                  break;
 235               case RowDescription rd:
 236                  // This happens when e.g. doing SELECT schema.function(x, y, z) -> can return NULLs or rows, we don't 
 237                  break; // throw new PgSQLException( "Batch statements may only be used for non-query statements." );
 238               case ParameterDescription pd:
 239                  if ( !ArrayEqualityComparer<Int32>.ArrayEquality( pd.ObjectIDs, typeIDs ) )
 240                  {
 241                     throw new PgSQLException( "Backend required certain amount of parameters, but either they were not 
 242                  }
 243                  // Continue to RowDescription/NoData message
 244                  msg = null;
 245                  break;
 246               default:
 247                  throw new PgSQLException( "Unrecognized response at this point: " + msg.Code );
 248            }
 249         }
 250
 251         if ( sendBatch )
 252         {
 253            var batchCount = statement.BatchParameterCount;
 254            var affectedRowsArray = new Int32[batchCount];
 255            // Send and receive messages asynchronously
 256            var commandTag = new String[1];
 257            await
 258#if NET40
 259               TaskEx
 260#else
 261               Task
 262#endif
 263               .WhenAll(
 264               this.SendMessagesForBatch( statement, typeInfos, stmtName, ioArgs, chunkSize, batchCount ),
 265               this.ReceiveMessagesForBatch( notices, affectedRowsArray, commandTag )
 266               );
 267            current = new BatchCommandExecutionResultImpl(
 268               commandTag[0],
 269               new Lazy<SQLException[]>( () => notices?.Select( n => new PgSQLException( n ) )?.ToArray() ),
 270               affectedRowsArray
 271               );
 272         }
 273
 274         return (current, null);
 275      }
 276
 277      private async Task SendMessagesForBatch(
 278         SQLStatementBuilderInformation statement,
 279         TypeFunctionalityInformation[] typeInfos,
 280         String statementName,
 281         MessageIOArgs ioArgs,
 282         Int32 chunkSize,
 283         Int32 batchCount
 284         )
 285      {
 286         var singleRowParamCount = statement.SQLParameterCount;
 287         Int32 max;
 288         var execMessage = new ExecuteMessage();
 289         for ( var i = 0; i < batchCount; i = max )
 290         {
 291            max = Math.Min( batchCount, i + chunkSize );
 292            for ( var j = i; j < max; ++j )
 293            {
 294               // Send Bind and Execute messages
 295               // TODO reuse BindMessage -> add Reset method.
 296               await new BindMessage(
 297                  statement.GetParametersEnumerable( j ),
 298                  singleRowParamCount,
 299                  typeInfos,
 300                  this.DisableBinaryProtocolSend,
 301                  this.DisableBinaryProtocolReceive,
 302                  statementName: statementName
 303                  ).SendMessageAsync( ioArgs, true );
 304               await execMessage.SendMessageAsync( ioArgs, true );
 305            }
 306
 307            // Now send flush message for backend to start sending results back
 308            await FrontEndMessageWithNoContent.FLUSH.SendMessageAsync( ioArgs, false );
 309         }
 310      }
 311
 312      private async Task ReceiveMessagesForBatch(
 313         List<PgSQLError> notices,
 314         Int32[] affectedRows,
 315         String[] commandTag // This is fugly, but other option is to make both ReceiveMessagesForBatch and SendMessages
 316         )
 317      {
 318         // We must allocate new buffer, since the reading will be done concurrently while the writing still performs
 319         // Furthermore, if some error is occurred during sending task, the backend will send error response right away.
 320         var buffer = new ResizableArray<Byte>( initialSize: 8, exponentialResize: true );
 321
 322         for ( var i = 0; i < affectedRows.Length; ++i )
 323         {
 324            var msg = ( await this.ReadMessagesUntilMeaningful( notices, bufferToUse: buffer ) ).Item1;
 325            if ( msg is MessageWithNoContents nc && msg.Code == BackendMessageCode.BindComplete )
 326            {
 327               // Bind was successul - now read result of execute message
 328               msg = null;
 329               while ( msg == null )
 330               {
 331                  Int32 remaining;
 332                  (msg, remaining) = await this.ReadMessagesUntilMeaningful( notices, bufferToUse: buffer );
 333                  switch ( msg )
 334                  {
 335                     case CommandComplete cc:
 336                        Interlocked.Exchange( ref affectedRows[i], cc.AffectedRows ?? 0 );
 337                        if ( commandTag[0] == null )
 338                        {
 339                           Interlocked.Exchange( ref commandTag[0], cc.CommandTag );
 340                        }
 341                        break;
 342                     case DataRowObject dr:
 343                        // Skip thru data
 344                        await this.Stream.ReadSpecificAmountAsync( buffer.SetCapacityAndReturnArray( remaining ), 0, rem
 345                        // And read more
 346                        msg = null;
 347                        break;
 348                     default:
 349                        throw new PgSQLException( "Unrecognized response at this point: " + msg.Code );
 350                  }
 351               }
 352            }
 353            else
 354            {
 355               throw new PgSQLException( "Unrecognized response at this point: " + msg.Code );
 356            }
 357         }
 358      }
 359
 360      protected override async ValueTask<TStatementExecutionSimpleTaskParameter> ExecuteStatementAsPrepared(
 361         SQLStatementBuilderInformation statement,
 362         ReservedForStatement reservedState
 363         )
 364      {
 365         (var parameterIndices, var typeInfos, var typeIDs) = GetVariablesForExtendedQuerySequence( statement, this.Type
 366         var ioArgs = this.GetIOArgs();
 367
 368         // First, send the parse message
 369         await new ParseMessage( statement.SQL, parameterIndices, typeIDs ).SendMessageAsync( ioArgs, true );
 370
 371         // Then send bind message
 372         var bindMsg = new BindMessage( statement.GetParametersEnumerable(), parameterIndices.Length, typeInfos, this.Di
 373         await bindMsg.SendMessageAsync( ioArgs, true );
 374
 375         // Then send describe message
 376         await new DescribeMessage( false ).SendMessageAsync( ioArgs, true );
 377
 378         // Then execute message
 379         await new ExecuteMessage().SendMessageAsync( ioArgs, true );
 380
 381         // Then flush in order to receive response
 382         await FrontEndMessageWithNoContent.FLUSH.SendMessageAsync( ioArgs, false );
 383
 384         // Start receiving messages
 385         BackendMessageObject msg = null;
 386         SQLStatementExecutionResult current = null;
 387         Func<ValueTask<(Boolean, SQLStatementExecutionResult)>> moveNext = null;
 388         RowDescription seenRD = null;
 389         List<PgSQLError> notices = new List<PgSQLError>();
 390         while ( msg == null )
 391         {
 392            msg = ( await this.ReadMessagesUntilMeaningful( notices ) ).Item1;
 393            switch ( msg )
 394            {
 395               case MessageWithNoContents nc:
 396                  switch ( nc.Code )
 397                  {
 398                     case BackendMessageCode.ParseComplete:
 399                     case BackendMessageCode.BindComplete:
 400                     case BackendMessageCode.NoData:
 401                        // Continue reading messages
 402                        msg = null;
 403                        break;
 404                     case BackendMessageCode.EmptyQueryResponse:
 405                        // The statement does not produce any data, we are done
 406                        break;
 407                     default:
 408                        throw new PgSQLException( "Unrecognized response at this point: " + msg.Code );
 409                  }
 410                  break;
 411               case RowDescription rd:
 412                  // 0..* DataRowObjects incoming...
 413                  seenRD = rd;
 414                  msg = null;
 415                  break;
 416               case DataRowObject dr:
 417                  var streamArray = new PgSQLDataRowColumn[seenRD.Fields.Length];
 418                  var mdArray = new PgSQLDataColumnMetaDataImpl[streamArray.Length];
 419                  PgSQLDataRowColumn prevCol = null;
 420                  for ( var i = 0; i < streamArray.Length; ++i )
 421                  {
 422                     var curField = seenRD.Fields[i];
 423                     var curMD = new PgSQLDataColumnMetaDataImpl( this, curField.DataFormat, curField.dataTypeID, this.T
 424                     var curStream = new PgSQLDataRowColumn( curMD, i, prevCol, this, reservedState, curField );
 425                     prevCol = curStream;
 426                     streamArray[i] = curStream;
 427                     curStream.Reset( dr );
 428                     mdArray[i] = curMD;
 429                  }
 430                  var warningsLazy = LazyFactory.NewReadOnlyResettableLazy<SQLException[]>( () => notices?.Select( n => 
 431                  var dataRowCurrent = new SQLDataRowImpl(
 432                        new PgSQLDataRowMetaDataImpl( mdArray ),
 433                        streamArray,
 434                        warningsLazy
 435                        );
 436                  current = dataRowCurrent;
 437                  moveNext = async () => await this.MoveNextAsync( reservedState, streamArray, notices, dataRowCurrent, 
 438                  break;
 439               case CommandComplete cc:
 440                  if ( seenRD == null )
 441                  {
 442                     current = new SingleCommandExecutionResultImpl(
 443                        cc.CommandTag,
 444                        new Lazy<SQLException[]>( () => notices?.Select( n => new PgSQLException( n ) )?.ToArray() ),
 445                        cc.AffectedRows ?? 0
 446                        );
 447                  }
 448                  break;
 449               default:
 450                  throw new PgSQLException( "Unrecognized response at this point: " + msg.Code );
 451            }
 452         }
 453
 454         return (current, moveNext);
 455      }
 456
 457      protected override async ValueTask<TStatementExecutionSimpleTaskParameter> ExecuteStatementAsSimple(
 458         SQLStatementBuilderInformation stmt,
 459         ReservedForStatement reservedState
 460         )
 461      {
 462         // Send Query message
 463         await new QueryMessage( stmt.SQL ).SendMessageAsync( this.GetIOArgs() );
 464
 465         // Then wait for appropriate response
 466         List<PgSQLError> notices = new List<PgSQLError>();
 467         Func<ValueTask<(Boolean, SQLStatementExecutionResult)>> drMoveNext = null;
 468
 469         // We have to always set moveNext, since we might be executing arbitrary amount of SQL statements in simple Sta
 470         Func<ValueTask<(Boolean, SQLStatementExecutionResult)>> moveNext = async () =>
 471         {
 472            SQLStatementExecutionResult current = null;
 473            if ( drMoveNext != null )
 474            {
 475               // We are iterating over some query result, check that first.
 476               var drNext = await drMoveNext();
 477               if ( drNext.Item1 )
 478               {
 479                  current = drNext.Item2;
 480               }
 481               else
 482               {
 483                  drMoveNext = null;
 484               }
 485            }
 486
 487            if ( current == null )
 488            {
 489               BackendMessageObject msg = null;
 490               RowDescription seenRD = null;
 491               while ( msg == null )
 492               {
 493                  msg = ( await this.ReadMessagesUntilMeaningful( notices ) ).Item1;
 494
 495                  switch ( msg )
 496                  {
 497                     case CommandComplete cc:
 498                        if ( seenRD == null )
 499                        {
 500                           current = new SingleCommandExecutionResultImpl(
 501                              cc.CommandTag,
 502                              new Lazy<SQLException[]>( () => notices?.Select( n => new PgSQLException( n ) )?.ToArray()
 503                              cc.AffectedRows ?? 0
 504                              );
 505                        }
 506                        else
 507                        {
 508                           // RowDescription followed immediately by CommandComplete -> treat as empty query
 509                           // Read more
 510                           msg = null;
 511                        }
 512                        seenRD = null;
 513                        break;
 514                     case RowDescription rd:
 515                        seenRD = rd;
 516                        // Read more (DataRow or CommandComplete)
 517                        msg = null;
 518                        break;
 519                     case DataRowObject dr:
 520                        // First DataRowObject
 521                        var streamArray = new PgSQLDataRowColumn[seenRD.Fields.Length];
 522                        var mdArray = new PgSQLDataColumnMetaDataImpl[streamArray.Length];
 523                        PgSQLDataRowColumn prevCol = null;
 524                        for ( var i = 0; i < streamArray.Length; ++i )
 525                        {
 526                           var curField = seenRD.Fields[i];
 527                           var curMD = new PgSQLDataColumnMetaDataImpl( this, curField.DataFormat, curField.dataTypeID, 
 528                           var curStream = new PgSQLDataRowColumn( curMD, i, prevCol, this, reservedState, curField );
 529                           prevCol = curStream;
 530                           streamArray[i] = curStream;
 531                           curStream.Reset( dr );
 532                           mdArray[i] = curMD;
 533                        }
 534                        var warningsLazy = LazyFactory.NewReadOnlyResettableLazy<SQLException[]>( () => notices?.Select(
 535                        var dataRowCurrent = new SQLDataRowImpl(
 536                              new PgSQLDataRowMetaDataImpl( mdArray ),
 537                              streamArray,
 538                              warningsLazy
 539                              );
 540                        current = dataRowCurrent;
 541                        drMoveNext = async () => await this.MoveNextAsync( reservedState, streamArray, notices, dataRowC
 542                        break;
 543                     case ReadyForQuery rfq:
 544                        ( (PgReservedForStatement) reservedState ).RFQSeen();
 545                        break;
 546                     default:
 547                        if ( !ReferenceEquals( MessageWithNoContents.EMPTY_QUERY, msg ) )
 548                        {
 549                           throw new PgSQLException( "Unrecognized response at this point: " + msg.Code );
 550                        }
 551                        // Read more
 552                        msg = null;
 553                        break;
 554                  }
 555               }
 556            }
 557
 558            return (current != null, current);
 559         };
 560
 561         var firstResult = await moveNext();
 562
 563         return (firstResult.Item1 ? firstResult.Item2 : null, moveNext);
 564
 565      }
 566
 567      private async Task<(Boolean, SQLStatementExecutionResult)> MoveNextAsync(
 568         ReservedForStatement reservationObject,
 569         PgSQLDataRowColumn[] streams,
 570         List<PgSQLError> notices,
 571         SQLDataRowImpl dataRow,
 572         ReadOnlyResettableLazy<SQLException[]> warningsLazy
 573         )
 574      {
 575         return await this.UseStreamWithinStatementAsync( reservationObject, async () =>
 576         {
 577            // Force read of all columns
 578            foreach ( var colStream in streams )
 579            {
 580               await colStream.SkipBytesAsync( this.Buffer.Array );
 581            }
 582
 583            notices.Clear();
 584            var msg = ( await this.ReadMessagesUntilMeaningful( notices ) ).Item1;
 585            var dr = msg as DataRowObject;
 586            foreach ( var stream in streams )
 587            {
 588               stream.Reset( dr );
 589            }
 590
 591            var retVal = dr != null;
 592            warningsLazy.Reset();
 593            return (Success: retVal, Item: dataRow);
 594         } );
 595      }
 596
 597
 598      public TransactionStatus LastSeenTransactionStatus
 599      {
 600         get
 601         {
 602            return (TransactionStatus) this._lastSeenTransactionStatus;
 603         }
 604         private set
 605         {
 606            Interlocked.Exchange( ref this._lastSeenTransactionStatus, (Int32) value );
 607         }
 608      }
 609
 610      //public Boolean StandardConformingStrings
 611      //{
 612      //   get
 613      //   {
 614      //      return Convert.ToBoolean( this._standardConformingStrings );
 615      //   }
 616      //   set
 617      //   {
 618      //      Interlocked.Exchange( ref this._standardConformingStrings, Convert.ToInt32( value ) );
 619      //   }
 620      //}
 621
 622      protected override async Task PerformDisposeStatementAsync(
 623         ReservedForStatement reservationObject
 624         )
 625      {
 626         var ioArgs = this.GetIOArgs();
 627         var pgReserved = (PgReservedForStatement) reservationObject;
 628         if ( !String.IsNullOrEmpty( pgReserved.StatementName ) )
 629         {
 630            // Need to close our named statement
 631            await new CloseMessage( true, pgReserved.StatementName ).SendMessageAsync( ioArgs, true );
 632         }
 633
 634         // Simple statement already received RFQ in its MoveNext method
 635         if ( !pgReserved.IsSimple )
 636         {
 637            // Need to send SYNC
 638            await FrontEndMessageWithNoContent.SYNC.SendMessageAsync( ioArgs );
 639
 640         }
 641
 642         // TODO The new moveNextEnded parameter could tell that instead of RFQEncountered property, investigate that
 643         if ( !pgReserved.RFQEncountered )
 644         {
 645            // Then wait for RFQ
 646            // This happens for non-simple statements, or simple statements which cause exception when iterated over.
 647            BackendMessageObject msg;
 648            Int32 remaining;
 649            while ( ( (msg, remaining) = ( await this.ReadMessagesUntilMeaningful( null, dontThrowExceptions: true ) ) )
 650            {
 651               if ( remaining > 0 )
 652               {
 653                  ioArgs.Item4.CurrentMaxCapacity = remaining;
 654                  await ioArgs.Item2.ReadSpecificAmountAsync( ioArgs.Item4.Array, 0, remaining, ioArgs.Item3 );
 655               }
 656            }
 657         }
 658      }
 659
 660      public BackendABIHelper MessageIOArgs { get; }
 661
 662      public ResizableArray<Byte> Buffer { get; }
 663
 664      public Stream Stream { get; }
 665
 666#if !NETSTANDARD1_0
 667      public Socket Socket { get; }
 668#endif
 669
 670      public ResizableArray<ResettableTransformable<Int32?, Int32>> DataRowColumnSizes { get; }
 671
 672      public Boolean DisableBinaryProtocolSend { get; }
 673      public Boolean DisableBinaryProtocolReceive { get; }
 674
 675      public Queue<NotificationEventArgs> EnqueuedNotifications { get; }
 676
 677      internal async ValueTask<Object> ConvertFromBytes(
 678         Int32 typeID,
 679         DataFormat dataFormat,
 680         EitherOr<ReservedForStatement, Stream> stream,
 681         Int32 byteCount
 682         )
 683      {
 684         var actualStream = stream.IsFirst ? this.Stream : stream.Second;
 685         var typeInfo = this.TypeRegistry.TryGetTypeInfo( typeID );
 686         if ( typeInfo != null )
 687         {
 688            var limitedStream = StreamFactory.CreateLimitedReader(
 689                  actualStream,
 690                  byteCount,
 691                  this.CurrentCancellationToken,
 692                  this.Buffer
 693                  );
 694
 695            try
 696            {
 697               return await typeInfo.Functionality.ReadBackendValueAsync(
 698                  dataFormat,
 699                  typeInfo.DatabaseData,
 700                  this.MessageIOArgs,
 701                  limitedStream
 702                  );
 703            }
 704            finally
 705            {
 706               try
 707               {
 708                  await limitedStream.SkipThroughRemainingBytes();
 709               }
 710               catch
 711               {
 712                  // Ignore this one.
 713               }
 714
 715            }
 716
 717         }
 718         else if ( dataFormat == DataFormat.Text )
 719         {
 720            // Initial type load, or unknown type and format is textual
 721            await actualStream.ReadSpecificAmountAsync( this.Buffer, 0, byteCount, this.CurrentCancellationToken );
 722            return this.MessageIOArgs.GetStringWithPool( this.Buffer.Array, 0, byteCount );
 723         }
 724         else
 725         {
 726            // Unknown type, and data format is binary.
 727            throw new PgSQLException( $"The type ID {typeID} is not known." );
 728         }
 729      }
 730
 731      internal async ValueTask<(BackendMessageObject, Int32)> ReadMessagesUntilMeaningful(
 732         List<PgSQLError> notices,
 733         Func<Boolean> checkReadForNextMessage = null,
 734         ResizableArray<Byte> bufferToUse = null,
 735         Boolean dontThrowExceptions = false
 736      )
 737      {
 738         Boolean encounteredMeaningful;
 739         var ioArgs = this.GetIOArgs( bufferToUse );
 740         BackendMessageObject msg;
 741         Int32 remaining;
 742         do
 743         {
 744            (msg, remaining) = await BackendMessageObject.ReadBackendMessageAsync( ioArgs, this.DataRowColumnSizes );
 745            switch ( msg )
 746            {
 747               case PgSQLErrorObject errorObject:
 748                  encounteredMeaningful = false;
 749                  if ( errorObject.Code == BackendMessageCode.NoticeResponse )
 750                  {
 751                     if ( notices != null )
 752                     {
 753                        notices.Add( ( (PgSQLErrorObject) msg ).Error );
 754                     }
 755                  }
 756                  else if ( !dontThrowExceptions )
 757                  {
 758                     throw new PgSQLException( ( (PgSQLErrorObject) msg ).Error );
 759                  }
 760                  break;
 761               case NotificationMessage notification:
 762                  this.EnqueuedNotifications.Enqueue( notification.Args );
 763                  encounteredMeaningful = false;
 764                  break;
 765               case ParameterStatus ps:
 766                  this._serverParameters[ps.Name] = ps.Value;
 767                  encounteredMeaningful = false;
 768                  break;
 769               default:
 770                  {
 771                     if ( msg is ReadyForQuery rfq )
 772                     {
 773                        this.LastSeenTransactionStatus = rfq.Status;
 774                     }
 775                     encounteredMeaningful = true;
 776                     break;
 777                  }
 778
 779            }
 780         } while ( !encounteredMeaningful && ( checkReadForNextMessage?.Invoke() ?? true ) );
 781
 782         return (msg, remaining);
 783      }
 784
 785      public async Task PerformClose( CancellationToken token )
 786      {
 787         // Send termination message
 788         // Don't use this.CurrentCancellationToken, since one-time pool has already reset the token.
 789         // Furthermore, we might come here from other entrypoints than connection pool's UseConnection (e.g. when dispo
 790         await FrontEndMessageWithNoContent.TERMINATION.SendMessageAsync( this.GetIOArgs( tokenToUse: token ) );
 791      }
 792
 793#if !NETSTANDARD1_0
 794      private Boolean SocketHasDataPending()
 795      {
 796         var socket = this.Socket;
 797         return socket.Available > 0 || socket.Poll( 1, SelectMode.SelectRead ) || socket.Available > 0;
 798      }
 799#endif
 800
 801      public async ValueTask<NotificationEventArgs[]> CheckNotificationsAsync()
 802      {
 803         // TODO this could be optimized a little, if we notice EnqueuedNotifications.Count > 0, then just don't read fr
 804         NotificationEventArgs[] args = null;
 805
 806         NotificationEventArgs[] GetEnqueuedNotifications()
 807         {
 808            var enqueued = this.EnqueuedNotifications.ToArray();
 809            this.EnqueuedNotifications.Clear();
 810            return enqueued;
 811         }
 812
 813#if !NETSTANDARD1_0
 814         var socket = this.Socket;
 815         if ( socket == null )
 816         {
 817#endif
 818            // Just do "SELECT 1"; to get any notifications
 819            var enumerable = this.PrepareStatementForExecution( this.VendorFunctionality.CreateStatementBuilder( "SELECT
 820               .AsObservable();
 821            // Use GetEnqueuedNotifications while we are still inside statement reservation region, by registering to Be
 822            enumerable.BeforeEnumerationEnd += ( eArgs ) => args = GetEnqueuedNotifications();
 823            await enumerable.EnumerateAsync();
 824#if !NETSTANDARD1_0
 825         }
 826         else
 827         {
 828            // First, check from the socket that we have any data pending
 829
 830            var hasDataPending = this.SocketHasDataPending();
 831            if ( hasDataPending || this.EnqueuedNotifications.Count > 0 )
 832            {
 833               // There is pending data
 834               // We always must use UseStreamOutsideStatementAsync method, since modifying this.EnqueuedNotifications o
 835               await this.UseStreamOutsideStatementAsync( async () =>
 836               {
 837                  // If we call "ReadMessagesUntilMeaningful" with no socket data pending, we will never break free of l
 838                  if ( hasDataPending )
 839                  {
 840                     await this.ReadMessagesUntilMeaningful(
 841                        null,
 842                        this.SocketHasDataPending
 843                        );
 844                  }
 845                  args = GetEnqueuedNotifications();
 846                  return false;
 847               } );
 848            }
 849         }
 850#endif
 851
 852         return args ?? Empty<NotificationEventArgs>.Array;
 853
 854      }
 855
 856
 857      public IAsyncEnumerable<NotificationEventArgs> ListenToNotificationsAsync()
 858      {
 859#if !NETSTANDARD1_0
 860         if ( this.Socket == null )
 861         {
 862#else
 863         throw new NotSupportedException( "No socket available for this method." );
 864#endif
 865#if !NETSTANDARD1_0
 866         }
 867
 868         var enqueued = this.EnqueuedNotifications;
 869         Boolean KeepReadingMore()
 870         {
 871            return enqueued.Count <= 0 || ( enqueued.Count <= 1000 && this.SocketHasDataPending() );
 872         }
 873
 874         async Task PerformReadForNotifications()
 875         {
 876            if ( enqueued.Count <= 0 )
 877            {
 878               await this.ReadMessagesUntilMeaningful( null, KeepReadingMore );
 879            }
 880         }
 881
 882         return AsyncEnumerationFactory.CreateStatefulWrappingEnumerable( () =>
 883         {
 884            PgReservedForStatement reservation = null;
 885            return AsyncEnumerationFactory.CreateWrappingStartInfo(
 886               async () =>
 887               {
 888                  if ( reservation == null )
 889                  {
 890                     reservation = new PgReservedForStatement(
 891#if DEBUG
 892                        null,
 893#endif
 894                        true,
 895                        null
 896                        );
 897                     reservation.RFQSeen();
 898                     await this.UseStreamOutsideStatementAsync( reservation, PerformReadForNotifications, false, true );
 899                  }
 900                  else
 901                  {
 902                     await this.UseStreamWithinStatementAsync( reservation, PerformReadForNotifications, true );
 903                  }
 904
 905                  return enqueued.Count > 0;
 906               },
 907               ( out Boolean success ) =>
 908               {
 909                  success = enqueued.Count > 0;
 910                  return success ? enqueued.Dequeue() : default;
 911               },
 912               () =>
 913               {
 914                  return this.DisposeStatementAsync( reservation );
 915               }
 916               );
 917         }, this.AsyncProvider );
 918#endif
 919      }
 920
 921      public static async Task<(PostgreSQLProtocol Protocol, List<PgSQLError> notices)> PerformStartup(
 922         PgSQLConnectionVendorFunctionality vendorFunctionality,
 923         PgSQLConnectionCreationInfo creationInfo,
 924         CancellationToken token,
 925         Stream stream,
 926         BackendABIHelper abiHelper,
 927         ResizableArray<Byte> buffer
 928#if !NETSTANDARD1_0
 929         , Socket socket
 930#endif
 931         )
 932      {
 933         var initData = creationInfo?.CreationData?.Initialization ?? throw new PgSQLException( "Please specify initiali
 934         var startupInfo = await DoConnectionInitialization(
 935            creationInfo,
 936            (abiHelper, stream, token, buffer)
 937            );
 938         var protoConfig = initData?.Protocol;
 939         var retVal = (
 940            new PostgreSQLProtocol(
 941               vendorFunctionality,
 942               protoConfig?.DisableBinaryProtocolSend ?? false,
 943               protoConfig?.DisableBinaryProtocolReceive ?? false,
 944               abiHelper,
 945               stream,
 946               buffer,
 947               startupInfo.ServerParameters,
 948               startupInfo.TransactionStatus,
 949               startupInfo.backendProcessID ?? 0
 950#if !NETSTANDARD1_0
 951               , socket
 952#endif
 953            ),
 954            startupInfo.Notices ?? new List<PgSQLError>()
 955            );
 956
 957         await retVal.Item1.ReadTypesFromServer( protoConfig?.ForceTypeIDLoad ?? false, token );
 958
 959         return retVal;
 960      }
 961
 962      internal const String SERVER_PARAMETER_DATABASE = "database";
 963
 964      private static async Task<(IDictionary<String, String> ServerParameters, Int32? backendProcessID, Int32? backendKe
 965         PgSQLConnectionCreationInfo creationInfo,
 966         MessageIOArgs ioArgs
 967         )
 968      {
 969         var dbConfig = creationInfo?.CreationData?.Initialization?.Database ?? throw new ArgumentException( "Please spe
 970         var authConfig = creationInfo?.CreationData?.Initialization?.Authentication ?? throw new ArgumentException( "Pl
 971
 972         var encoding = ioArgs.Item1.Encoding.Encoding;
 973         var username = authConfig.Username ?? throw new ArgumentException( "Please specify username in authentication c
 974         var parameters = new Dictionary<String, String>()
 975         {
 976            { SERVER_PARAMETER_DATABASE, dbConfig.Name ?? throw new ArgumentException("Please specify database name in d
 977            { "user",username },
 978            { "DateStyle", "ISO" },
 979            { "client_encoding", encoding.WebName  },
 980            { "extra_float_digits", "2" },
 981            { "lc_monetary", "C" }
 982         };
 983         var sp = dbConfig.SearchPath;
 984         if ( !String.IsNullOrEmpty( sp ) )
 985         {
 986            parameters.Add( "search_path", sp );
 987         }
 988
 989         await new StartupMessage( 3 << 16, parameters ).SendMessageAsync( ioArgs );
 990
 991         BackendMessageObject msg;
 992         List<PgSQLError> notices = null;
 993         Int32? backendProcessID = null;
 994         Int32? backendKeyData = null;
 995         TransactionStatus tStatus = 0;
 996         Object saslState = null;
 997         try
 998         {
 999            do
 1000            {
 1001               Int32 ignored;
 1002               (msg, ignored) = await BackendMessageObject.ReadBackendMessageAsync( ioArgs, null );
 1003               switch ( msg )
 1004               {
 1005                  case ParameterStatus ps:
 1006                     parameters[ps.Name] = ps.Value;
 1007                     break;
 1008                  case AuthenticationResponse auth:
 1009                     var newSaslState = await ProcessAuth(
 1010                        creationInfo,
 1011                        username,
 1012                        ioArgs,
 1013                        auth,
 1014                        saslState
 1015                        );
 1016                     if ( newSaslState != null )
 1017                     {
 1018                        saslState = newSaslState;
 1019                     }
 1020                     break;
 1021                  case PgSQLErrorObject error:
 1022                     if ( error.Code == BackendMessageCode.NoticeResponse )
 1023                     {
 1024                        if ( notices == null )
 1025                        {
 1026                           notices = new List<PgSQLError>();
 1027                        }
 1028                        notices.Add( error.Error );
 1029                     }
 1030                     else
 1031                     {
 1032                        throw new PgSQLException( error.Error );
 1033                     }
 1034                     break;
 1035                  case BackendKeyData key:
 1036                     backendProcessID = key.ProcessID;
 1037                     backendKeyData = key.Key;
 1038                     break;
 1039                  case ReadyForQuery rfq:
 1040                     tStatus = rfq.Status;
 1041                     break;
 1042               }
 1043            } while ( msg.Code != BackendMessageCode.ReadyForQuery );
 1044         }
 1045         finally
 1046         {
 1047            DisposeSASLState( saslState );
 1048         }
 1049         return (parameters, backendProcessID, backendKeyData, notices, tStatus);
 1050      }
 1051
 1052      private static async Task<Object> ProcessAuth(
 1053         PgSQLConnectionCreationInfo creationInfo,
 1054         String username,
 1055         MessageIOArgs ioArgs,
 1056         AuthenticationResponse msg,
 1057         Object saslState
 1058         )
 1059      {
 1060         var authType = msg.RequestType;
 1061         var initData = creationInfo.CreationData.Initialization.Database;
 1062         switch ( authType )
 1063         {
 1064            case AuthenticationResponse.AuthenticationRequestType.AuthenticationClearTextPassword:
 1065               await new PasswordMessage( GetPasswordBytes( creationInfo, ioArgs ) ).SendMessageAsync( ioArgs );
 1066               break;
 1067            case AuthenticationResponse.AuthenticationRequestType.AuthenticationMD5Password:
 1068               await HandleMD5Authentication( ioArgs, msg, username, GetPasswordBytes( creationInfo, ioArgs ) ).SendMess
 1069               break;
 1070            case AuthenticationResponse.AuthenticationRequestType.AuthenticationOk:
 1071               // Nothing to do
 1072               break;
 1073            case AuthenticationResponse.AuthenticationRequestType.AuthenticationSASL:
 1074               var saslResult = ( HandleSASLAuthentication_Start( creationInfo, ioArgs, username, msg ) );
 1075               saslState = saslResult.Item2;
 1076               await ( saslResult.Item1 ?? throw new PgSQLException( "Authentication failed." ) ).SendMessageAsync( ioAr
 1077               break;
 1078            case AuthenticationResponse.AuthenticationRequestType.AuthenticationSASLContinue:
 1079               await ( HandleSASLAuthentication_Continue( ioArgs, msg, saslState ) ?? throw new PgSQLException( "Authent
 1080               break;
 1081            case AuthenticationResponse.AuthenticationRequestType.AuthenticationSASLFinal:
 1082               HandleSASLAuthentication_Final( creationInfo, ioArgs, msg, saslState );
 1083               break;
 1084            default:
 1085               throw new PgSQLException( $"Authentication kind {authType} is not support." );
 1086         }
 1087
 1088         return saslState;
 1089      }
 1090
 1091      private static Byte[] GetPasswordBytes(
 1092         PgSQLConnectionCreationInfo creationInfo,
 1093         MessageIOArgs ioArgs
 1094         )
 1095      {
 1096         var authConfig = creationInfo.CreationData.Initialization.Authentication;
 1097         var encoding = ioArgs.Item1.Encoding.Encoding;
 1098         return ( String.Equals( PgSQLAuthenticationConfiguration.PasswordByteEncoding.WebName, encoding.WebName ) ?
 1099            authConfig.PasswordBytes :
 1100            encoding.GetBytes( authConfig.Password ) ) ?? throw new PgSQLException( "Backend requested password, but it 
 1101      }
 1102
 1103      // Having this in separate method also won't force load of UtilPack.Cryptography assemblies if other than MD5/SASL
 1104      private static PasswordMessage HandleMD5Authentication(
 1105         MessageIOArgs ioArgs,
 1106         AuthenticationResponse msg,
 1107         String username,
 1108         Byte[] pw
 1109         )
 1110      {
 1111         var buffer = ioArgs.Item4;
 1112         var helper = ioArgs.Item1;
 1113
 1114         if ( pw == null )
 1115         {
 1116            throw new PgSQLException( "Backend requested password, but it was not supplied." );
 1117         }
 1118         using ( var md5 = new FluentCryptography.Digest.MD5() )
 1119         {
 1120            // Extract server salt before using args.Buffer
 1121
 1122            var serverSalt = buffer.Array.CreateArrayCopy( msg.AdditionalDataInfo.offset, msg.AdditionalDataInfo.count )
 1123
 1124            // Hash password with username as salt
 1125            var prehashLength = helper.Encoding.Encoding.GetByteCount( username ) + pw.Length;
 1126            buffer.CurrentMaxCapacity = prehashLength;
 1127            var idx = 0;
 1128            pw.CopyTo( buffer.Array, ref idx, 0, pw.Length );
 1129            helper.Encoding.Encoding.GetBytes( username, 0, username.Length, buffer.Array, pw.Length );
 1130            var hash = md5.ComputeDigest( buffer.Array, 0, prehashLength );
 1131
 1132            // Write hash as hexadecimal string
 1133            buffer.CurrentMaxCapacity = hash.Length * 2 * helper.Encoding.BytesPerASCIICharacter;
 1134            idx = 0;
 1135            foreach ( var hashByte in hash )
 1136            {
 1137               helper.Encoding.WriteHexDecimal( buffer.Array, ref idx, hashByte );
 1138            }
 1139
 1140            // Hash result again with server-provided salt
 1141            buffer.CurrentMaxCapacity += serverSalt.Length;
 1142            var dummy = 0;
 1143            serverSalt.CopyTo( buffer.Array, ref dummy, idx, serverSalt.Length );
 1144            hash = md5.ComputeDigest( buffer.Array, 0, idx + serverSalt.Length );
 1145
 1146            // Send back string "md5" followed by hexadecimal hash value
 1147            buffer.CurrentMaxCapacity = 3 * helper.Encoding.BytesPerASCIICharacter + hash.Length * 2 * helper.Encoding.B
 1148            idx = 0;
 1149            var array = buffer.Array;
 1150            helper.Encoding
 1151               .WriteASCIIByte( array, ref idx, (Byte) 'm' )
 1152               .WriteASCIIByte( array, ref idx, (Byte) 'd' )
 1153               .WriteASCIIByte( array, ref idx, (Byte) '5' );
 1154            foreach ( var hashByte in hash )
 1155            {
 1156               helper.Encoding.WriteHexDecimal( array, ref idx, hashByte );
 1157            }
 1158
 1159            var retValArray = new Byte[idx + 1]; // Remember string-terminating zero
 1160            dummy = 0;
 1161            array.CopyTo( retValArray, ref dummy, 0, idx );
 1162            return new PasswordMessage( retValArray );
 1163         }
 1164
 1165
 1166      }
 1167
 1168      private static (PasswordMessage, Object) HandleSASLAuthentication_Start(
 1169         PgSQLConnectionCreationInfo creationInfo,
 1170         MessageIOArgs ioArgs,
 1171         String username,
 1172         AuthenticationResponse msg
 1173         )
 1174      {
 1175         var idx = msg.AdditionalDataInfo.offset;
 1176         var count = msg.AdditionalDataInfo.count;
 1177         var buffer = ioArgs.Item4;
 1178         while ( count > 0 && buffer.Array[idx + count - 1] == 0 )
 1179         {
 1180            --count;
 1181         }
 1182         var protocolEncoding = ioArgs.Item1.Encoding;
 1183         var authSchemes = protocolEncoding.Encoding.GetString( buffer.Array, idx, count );
 1184
 1185         var mechanismInfo = creationInfo.CreateSASLMechanism?.Invoke( authSchemes ) ?? throw new PgSQLException( "Faile
 1186         var mechanism = mechanismInfo.Item1 ?? throw new PgSQLException( "Failed to provide SASL mechanism." );
 1187         var mechanismName = mechanismInfo.Item2 ?? throw new PgSQLException( "Failed to provide SASL mechanism name." )
 1188         var authConfig = creationInfo.CreationData.Initialization.Authentication;
 1189         var pwDigest = authConfig.PasswordDigest;
 1190         var credentials = pwDigest.IsNullOrEmpty() ?
 1191            new SASLCredentialsSCRAMForClient( username, authConfig.Password ) :
 1192            new SASLCredentialsSCRAMForClient( username, pwDigest );
 1193         var writeBuffer = new ResizableArray<Byte>();
 1194         var saslEncoding = new UTF8Encoding( false, true ).CreateDefaultEncodingInfo();
 1195         var challengeResult = mechanism.ChallengeAsync( credentials.CreateChallengeArguments(
 1196            Empty<Byte>.Array,
 1197            -1,
 1198            -1,
 1199            writeBuffer,
 1200            0,
 1201            saslEncoding
 1202            ) ).GetResultForceSynchronous();
 1203
 1204         (PasswordMessage, Object) retVal;
 1205         if ( !challengeResult.IsFirst || challengeResult.First.Item2 != SASLChallengeResult.MoreToCome )
 1206         {
 1207            retVal = default;
 1208         }
 1209         else
 1210         {
 1211            // SASL initial response is: null-terminated string for mechanism name, length of initial response, and init
 1212            var bytesWritten = challengeResult.First.Item1;
 1213            var pwArray = new Byte[
 1214               protocolEncoding.Encoding.GetByteCount( mechanismName ) + protocolEncoding.BytesPerASCIICharacter
 1215               + sizeof( Int32 )
 1216               + bytesWritten
 1217               ];
 1218            idx = protocolEncoding.Encoding.GetBytes( mechanismName, 0, mechanismName.Length, pwArray, 0 ) + 1;
 1219            pwArray.WritePgInt32( ref idx, bytesWritten );
 1220            var dummy = 0;
 1221            writeBuffer.Array.CopyTo( pwArray, ref dummy, idx, bytesWritten );
 1222
 1223            retVal = (
 1224               new PasswordMessage( pwArray ),
 1225               new TSASLAuthState( mechanism, credentials, writeBuffer, saslEncoding )
 1226               );
 1227         }
 1228
 1229         return retVal;
 1230
 1231      }
 1232
 1233      private static PasswordMessage HandleSASLAuthentication_Continue(
 1234         MessageIOArgs ioArgs,
 1235         AuthenticationResponse msg,
 1236         Object state
 1237         )
 1238      {
 1239         var challengeResult = HandleSASLAuthentication_ContinueOrFinal( ioArgs, msg, state );
 1240         PasswordMessage retVal;
 1241         if ( challengeResult.IsSecond || challengeResult.First.Item2 != SASLChallengeResult.MoreToCome )
 1242         {
 1243            retVal = default;
 1244         }
 1245         else
 1246         {
 1247            // Responses are password messages with whole SASL message as content
 1248            retVal = new PasswordMessage( ( (TSASLAuthState) state ).Item3.Array.CreateArrayCopy( 0, challengeResult.Fir
 1249         }
 1250
 1251         return retVal;
 1252      }
 1253
 1254      private static void HandleSASLAuthentication_Final(
 1255         PgSQLConnectionCreationInfo creationInfo,
 1256         MessageIOArgs ioArgs,
 1257         AuthenticationResponse msg,
 1258         Object state
 1259         )
 1260      {
 1261         var challengeResult = HandleSASLAuthentication_ContinueOrFinal( ioArgs, msg, state );
 1262         if ( challengeResult.IsSecond || challengeResult.First.Item2 != SASLChallengeResult.Completed )
 1263         {
 1264            throw new PgSQLException( "Authentication failed." );
 1265         }
 1266         else
 1267         {
 1268            try
 1269            {
 1270               creationInfo.OnSASLSCRAMSuccess?.Invoke( ( (TSASLAuthState) state ).Item2.PasswordDigest );
 1271            }
 1272            catch
 1273            {
 1274               // Ignore...
 1275            }
 1276
 1277            DisposeSASLState( state );
 1278         }
 1279      }
 1280
 1281      private static EitherOr<(Int32, SASLChallengeResult), Int32> HandleSASLAuthentication_ContinueOrFinal(
 1282         MessageIOArgs ioArgs,
 1283         AuthenticationResponse msg,
 1284         Object state
 1285         )
 1286      {
 1287         var idx = msg.AdditionalDataInfo.offset;
 1288         var count = msg.AdditionalDataInfo.count;
 1289         var buffer = ioArgs.Item4;
 1290
 1291         var saslState = (TSASLAuthState) state;
 1292         return saslState.Item1.ChallengeAsync( saslState.Item2.CreateChallengeArguments(
 1293            buffer.Array,
 1294            idx,
 1295            count,
 1296            saslState.Item3,
 1297            0,
 1298            saslState.Item4
 1299            ) ).GetResultForceSynchronous();
 1300      }
 1301
 1302      private static void DisposeSASLState( Object state )
 1303      {
 1304         if ( state is TSASLAuthState saslState )
 1305         {
 1306            saslState.Item1?.DisposeSafely();
 1307            saslState.Item3?.Array?.Clear();
 1308         }
 1309      }
 1310
 1311      internal class PgReservedForStatement : ReservedForStatement
 1312      {
 1313         private Int32 _rfqEncountered;
 1314
 1315         public PgReservedForStatement(
 1316#if DEBUG
 1317         Object statement,
 1318#endif
 1319         Boolean isSimple,
 1320            String statementName
 1321            )
 1322#if DEBUG
 1323         : base( statement )
 1324#endif
 1325         {
 1326            this.IsSimple = isSimple;
 1327            this.StatementName = statementName;
 1328            this._rfqEncountered = Convert.ToInt32( false );
 1329         }
 1330
 1331         public Boolean IsSimple { get; }
 1332
 1333         public String StatementName { get; }
 1334
 1335         public Boolean RFQEncountered => Convert.ToBoolean( this._rfqEncountered );
 1336
 1337         public void RFQSeen()
 1338         {
 1339            Interlocked.Exchange( ref this._rfqEncountered, Convert.ToInt32( true ) );
 1340         }
 1341      }
 1342
 1343   }
 1344
 1345   // TODO move to utilpack
 1346   internal static class E_TODO
 1347   {
 1348      public static T GetResultForceSynchronous<T>( this ValueTask<T> task )
 1349      {
 61350         return task.IsCompleted ? task.Result : throw new InvalidOperationException( "ValueTask is not completed when i
 1351      }
 1352   }
 1353}