Summary

Class:CBAM.SQL.PostgreSQL.Implementation.DataRowObject
Assembly:CBAM.SQL.PostgreSQL.Implementation
File(s):/repo-dir/contents/Source/Code/CBAM.SQL.PostgreSQL.Implementation/Protocol.Reading.cs
Covered lines:27
Uncovered lines:0
Coverable lines:27
Total lines:648
Line coverage:100%
Branch coverage:80%

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
.ctor(...)401%1%
ReadColumnByteCount()101%1%
>c__DisplayClass3_0/<<ReadColumnByteCount()401%0.5%
ReadDataRow()101%1%

File(s)

/repo-dir/contents/Source/Code/CBAM.SQL.PostgreSQL.Implementation/Protocol.Reading.cs

#LineLine coverage
 1/*
 2 * Copyright 2017 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using CBAM.Abstractions;
 19using CBAM.SQL.PostgreSQL;
 20using CBAM.SQL.PostgreSQL.Implementation;
 21using System;
 22using System.Collections.Generic;
 23using System.IO;
 24using System.Text;
 25using System.Threading;
 26using System.Threading.Tasks;
 27using UtilPack;
 28using MessageIOArgs = System.ValueTuple<CBAM.SQL.PostgreSQL.BackendABIHelper, System.IO.Stream, System.Threading.Cancell
 29
 30namespace CBAM.SQL.PostgreSQL.Implementation
 31{
 32   internal abstract class BackendMessageObject
 33   {
 34
 35      internal BackendMessageObject( BackendMessageCode code )
 36      {
 37         this.Code = code;
 38      }
 39
 40      internal BackendMessageCode Code { get; }
 41
 42      public static async ValueTask<(BackendMessageObject msg, Int32 msgSize)> ReadBackendMessageAsync(
 43         MessageIOArgs ioArgs,
 44         ResizableArray<ResettableTransformable<Int32?, Int32>> columnSizes
 45         )
 46      {
 47         var args = ioArgs.Item1;
 48         var stream = ioArgs.Item2;
 49         var token = ioArgs.Item3;
 50         var buffer = ioArgs.Item4;
 51         await stream.ReadSpecificAmountAsync( buffer.Array, 0, 5, token );
 52         var code = (BackendMessageCode) buffer.Array[0];
 53         var length = buffer.Array.ReadInt32BEFromBytesNoRef( 1 );
 54         var remaining = length - sizeof( Int32 );
 55         if ( code != BackendMessageCode.DataRow && code != BackendMessageCode.CopyData )
 56         {
 57            // Just read the whole message at once for everything else except DataRow and CopyData messages
 58            buffer.CurrentMaxCapacity = remaining;
 59            await stream.ReadSpecificAmountAsync( buffer.Array, 0, remaining, token );
 60            remaining = 0;
 61         }
 62         var array = buffer.Array;
 63         var encoding = args.Encoding.Encoding;
 64
 65         BackendMessageObject result;
 66         switch ( code )
 67         {
 68            case BackendMessageCode.AuthenticationRequest:
 69               result = new AuthenticationResponse( array, length );
 70               break;
 71            case BackendMessageCode.ErrorResponse:
 72               result = new PgSQLErrorObject( array, encoding, true );
 73               break;
 74            case BackendMessageCode.NoticeResponse:
 75               result = new PgSQLErrorObject( array, encoding, false );
 76               break;
 77            case BackendMessageCode.RowDescription:
 78               result = new RowDescription( array, encoding );
 79               break;
 80            case BackendMessageCode.DataRow:
 81               (result, remaining) = await DataRowObject.ReadDataRow( stream, token, array, columnSizes, remaining );
 82               break;
 83            case BackendMessageCode.ParameterDescription:
 84               result = new ParameterDescription( array );
 85               break;
 86            case BackendMessageCode.ParameterStatus:
 87               result = new ParameterStatus( array, encoding );
 88               break;
 89            case BackendMessageCode.ReadyForQuery:
 90               result = new ReadyForQuery( array );
 91               break;
 92            case BackendMessageCode.BackendKeyData:
 93               result = new BackendKeyData( array );
 94               break;
 95            case BackendMessageCode.CommandComplete:
 96               result = new CommandComplete( array, encoding );
 97               break;
 98            case BackendMessageCode.NotificationResponse:
 99               result = new NotificationMessage( array, encoding );
 100               break;
 101            case BackendMessageCode.CopyInResponse:
 102               result = new CopyInOrOutMessage( array, true );
 103               break;
 104            case BackendMessageCode.CopyOutResponse:
 105               result = new CopyInOrOutMessage( array, false );
 106               break;
 107            case BackendMessageCode.CopyData:
 108               result = new CopyDataMessage( length );
 109               break;
 110            case BackendMessageCode.ParseComplete:
 111               result = MessageWithNoContents.PARSE_COMPLETE;
 112               break;
 113            case BackendMessageCode.BindComplete:
 114               result = MessageWithNoContents.BIND_COMPLETE;
 115               break;
 116            case BackendMessageCode.EmptyQueryResponse:
 117               result = MessageWithNoContents.EMPTY_QUERY;
 118               break;
 119            case BackendMessageCode.NoData:
 120               result = MessageWithNoContents.NO_DATA;
 121               break;
 122            case BackendMessageCode.CopyDone:
 123               result = MessageWithNoContents.COPY_DONE;
 124               break;
 125            case BackendMessageCode.CloseComplete:
 126               result = MessageWithNoContents.CLOSE_COMPLETE;
 127               break;
 128            default:
 129               throw new NotSupportedException( "Not supported backend response: " + code );
 130         }
 131
 132         return (result, remaining);
 133      }
 134   }
 135
 136   internal sealed class PgSQLErrorObject : BackendMessageObject
 137   {
 138      private readonly PgSQLError _error;
 139
 140      public PgSQLErrorObject( Byte[] array, Encoding encoding, Boolean isError )
 141         : base( isError ? BackendMessageCode.ErrorResponse : BackendMessageCode.NoticeResponse )
 142      {
 143         String severity = null,
 144            code = null,
 145            message = null,
 146            detail = null,
 147            hint = null,
 148            position = null,
 149            internalPosition = null,
 150            internalQuery = null,
 151            where = null,
 152            file = null,
 153            line = null,
 154            routine = null,
 155            schemaName = null,
 156            tableName = null,
 157            columnName = null,
 158            datatypeName = null,
 159            constraintName = null;
 160
 161         Byte fieldType;
 162         var offset = 0;
 163         do
 164         {
 165            fieldType = array.ReadByteFromBytes( ref offset );
 166            if ( fieldType != 0 )
 167            {
 168               var str = array.ReadZeroTerminatedStringFromBytes( ref offset, encoding );
 169               switch ( fieldType )
 170               {
 171                  case (Byte) 'S':
 172                     severity = str;
 173                     break;
 174                  case (Byte) 'C':
 175                     code = str;
 176                     break;
 177                  case (Byte) 'M':
 178                     message = str;
 179                     break;
 180                  case (Byte) 'D':
 181                     detail = str;
 182                     break;
 183                  case (Byte) 'H':
 184                     hint = str;
 185                     break;
 186                  case (Byte) 'P':
 187                     position = str;
 188                     break;
 189                  case (Byte) 'p':
 190                     internalPosition = str;
 191                     break;
 192                  case (Byte) 'q':
 193                     internalQuery = str;
 194                     break;
 195                  case (Byte) 'W':
 196                     where = str;
 197                     break;
 198                  case (Byte) 'F':
 199                     file = str;
 200                     break;
 201                  case (Byte) 'L':
 202                     line = str;
 203                     break;
 204                  case (Byte) 'R':
 205                     routine = str;
 206                     break;
 207                  case (Byte) 's':
 208                     schemaName = str;
 209                     break;
 210                  case (Byte) 't':
 211                     tableName = str;
 212                     break;
 213                  case (Byte) 'c':
 214                     columnName = str;
 215                     break;
 216                  case (Byte) 'd':
 217                     datatypeName = str;
 218                     break;
 219                  case (Byte) 'n':
 220                     constraintName = str;
 221                     break;
 222                  default:
 223                     // Unknown error field, just continue
 224                     break;
 225               }
 226            }
 227         } while ( fieldType != 0 );
 228
 229         this._error = new PgSQLError(
 230            severity,
 231            code,
 232            message,
 233            detail,
 234            hint,
 235            position,
 236            internalPosition,
 237            internalQuery,
 238            where,
 239            file,
 240            line,
 241            routine,
 242            schemaName,
 243            tableName,
 244            columnName,
 245            datatypeName,
 246            constraintName
 247            );
 248      }
 249
 250      internal PgSQLError Error
 251      {
 252         get
 253         {
 254            return this._error;
 255         }
 256      }
 257   }
 258
 259   internal sealed class AuthenticationResponse : BackendMessageObject
 260   {
 261      internal enum AuthenticationRequestType
 262      {
 263         AuthenticationOk = 0,
 264         AuthenticationKerberosV4 = 1,
 265         AuthenticationKerberosV5 = 2,
 266         AuthenticationClearTextPassword = 3,
 267         AuthenticationCryptPassword = 4,
 268         AuthenticationMD5Password = 5,
 269         AuthenticationSCMCredential = 6,
 270         AuthenticationGSS = 7,
 271         AuthenticationGSSContinue = 8,
 272         AuthenticationSSPI = 9,
 273         AuthenticationSASL = 10,
 274         AuthenticationSASLContinue = 11,
 275         AuthenticationSASLFinal = 12
 276      }
 277
 278      public AuthenticationResponse( Byte[] array, Int32 messageLength )
 279         : base( BackendMessageCode.AuthenticationRequest )
 280      {
 281         var offset = 0;
 282         this.RequestType = (AuthenticationRequestType) array.ReadPgInt32( ref offset );
 283         // Don't count in message length int32 + auth request type int32
 284         this.AdditionalDataInfo = (offset, messageLength - offset - sizeof( Int32 ));
 285      }
 286
 287      internal AuthenticationRequestType RequestType { get; }
 288
 289      internal (Int32 offset, Int32 count) AdditionalDataInfo { get; }
 290   }
 291
 292   internal sealed class RowDescription : BackendMessageObject
 293   {
 294      internal sealed class FieldData
 295      {
 296         internal readonly String name;
 297         internal readonly Int32 tableID;
 298         internal readonly Int16 colAttr;
 299         internal readonly Int32 dataTypeID;
 300         internal readonly Int16 dataTypeSize;
 301         internal readonly Int32 dataTypeModifier;
 302
 303         internal FieldData( Byte[] array, Encoding encoding, ref Int32 offset )
 304         {
 305            this.name = array.ReadZeroTerminatedStringFromBytes( ref offset, encoding );
 306            this.tableID = array.ReadPgInt32( ref offset );
 307            this.colAttr = array.ReadPgInt16( ref offset );
 308            this.dataTypeID = array.ReadPgInt32( ref offset );
 309            this.dataTypeSize = array.ReadPgInt16( ref offset );
 310            this.dataTypeModifier = array.ReadPgInt32( ref offset );
 311            this.DataFormat = (DataFormat) array.ReadPgInt16( ref offset );
 312         }
 313
 314         internal DataFormat DataFormat { get; }
 315      }
 316
 317      public RowDescription( Byte[] array, Encoding encoding )
 318         : base( BackendMessageCode.RowDescription )
 319      {
 320         var offset = 0;
 321         var fieldCount = array.ReadPgInt16Count( ref offset );
 322         var fields = new FieldData[Math.Max( 0, fieldCount )];
 323         for ( var i = 0; i < fieldCount; ++i )
 324         {
 325            fields[i] = new FieldData( array, encoding, ref offset );
 326         }
 327         this.Fields = fields;
 328      }
 329
 330      internal FieldData[] Fields { get; }
 331   }
 332
 333   internal sealed class ParameterDescription : BackendMessageObject
 334   {
 335
 336      public ParameterDescription( Byte[] array )
 337         : base( BackendMessageCode.ParameterDescription )
 338      {
 339         var offset = 0;
 340         var idCount = array.ReadPgInt16Count( ref offset );
 341         var ids = new Int32[Math.Max( 0, idCount )];
 342         for ( var i = 0; i < idCount; ++i )
 343         {
 344            ids[i] = array.ReadPgInt32( ref offset );
 345         }
 346         this.ObjectIDs = ids;
 347
 348      }
 349
 350      internal Int32[] ObjectIDs { get; }
 351   }
 352
 353   internal sealed class ParameterStatus : BackendMessageObject
 354   {
 355
 356      public ParameterStatus( Byte[] array, Encoding encoding )
 357         : base( BackendMessageCode.ParameterStatus )
 358      {
 359         var offset = 0;
 360         this.Name = array.ReadZeroTerminatedStringFromBytes( ref offset, encoding );
 361         this.Value = array.ReadZeroTerminatedStringFromBytes( ref offset, encoding );
 362      }
 363
 364      internal String Name { get; }
 365
 366      internal String Value { get; }
 367   }
 368
 369   internal sealed class DataRowObject : BackendMessageObject
 370   {
 371      private readonly Int32 _columnCount;
 372      private readonly Transformable<Int32?, Int32>[] _columnSizes;
 373
 374      private DataRowObject(
 375         Int32 columnCount,
 376         ResizableArray<ResettableTransformable<Int32?, Int32>> columnSizes
 377         )
 158378         : base( BackendMessageCode.DataRow )
 379      {
 158380         this._columnCount = columnCount;
 158381         columnSizes.CurrentMaxCapacity = columnCount;
 158382         var array = columnSizes.Array;
 159383         this._columnSizes = array;
 982384         for ( var i = 0; i < columnCount; ++i )
 385         {
 333386            var cur = array[i];
 333387            if ( cur == null )
 388            {
 35389               cur = new ResettableTransformable<Int32?, Int32>( null );
 34390               array[i] = cur;
 391            }
 333392            cur.TryReset();
 393         }
 158394      }
 395
 396      public async Task<Int32> ReadColumnByteCount(
 397         BackendABIHelper args,
 398         Stream stream,
 399         CancellationToken token,
 400         Int32 columnIndex,
 401         ResizableArray<Byte> array
 402         )
 403      {
 333404         var columnSizeHolder = this._columnSizes[columnIndex];
 333405         await columnSizeHolder.TryTransitionOrWaitAsync( async unused =>
 333406         {
 666407            await stream.ReadSpecificAmountAsync( array.Array, 0, sizeof( Int32 ), token );
 666408            var idx = 0;
 666409            return array.Array.ReadPgInt32( ref idx );
 665410         } );
 333411         return columnSizeHolder.Transformed;
 333412      }
 413
 414      public static async ValueTask<(DataRowObject, Int32)> ReadDataRow(
 415         Stream stream,
 416         CancellationToken token,
 417         Byte[] array,
 418         ResizableArray<ResettableTransformable<Int32?, Int32>> columnSizes,
 419         Int32 msgSize
 420         )
 421      {
 157422         await stream.ReadSpecificAmountAsync( array, 0, sizeof( Int16 ), token );
 158423         msgSize -= sizeof( Int16 );
 157424         var idx = 0;
 158425         var colCount = array.ReadPgInt16Count( ref idx );
 159426         return (new DataRowObject( colCount, columnSizes ), msgSize);
 159427      }
 428   }
 429
 430   internal sealed class ReadyForQuery : BackendMessageObject
 431   {
 432
 433      public ReadyForQuery( Byte[] array )
 434         : base( BackendMessageCode.ReadyForQuery )
 435      {
 436         this.Status = (TransactionStatus) array[0];
 437      }
 438
 439      public TransactionStatus Status { get; }
 440   }
 441
 442   internal sealed class BackendKeyData : BackendMessageObject
 443   {
 444
 445      public BackendKeyData( Byte[] array )
 446         : base( BackendMessageCode.BackendKeyData )
 447      {
 448         var idx = 0;
 449         this.ProcessID = array.ReadPgInt32( ref idx );
 450         this.Key = array.ReadPgInt32( ref idx );
 451      }
 452
 453      internal Int32 ProcessID { get; }
 454
 455      internal Int32 Key { get; }
 456   }
 457
 458   internal sealed class CommandComplete : BackendMessageObject
 459   {
 460
 461      public CommandComplete( Byte[] array, Encoding encoding )
 462         : base( BackendMessageCode.CommandComplete )
 463      {
 464         const String INSERT = "INSERT";
 465
 466         var idx = 0;
 467         var tag = array.ReadZeroTerminatedStringFromBytes( ref idx, encoding );
 468         this.FullCommandTag = tag;
 469         idx = 0;
 470         while ( Char.IsWhiteSpace( tag[idx] ) && ++idx < tag.Length ) ;
 471         String actualTag = null;
 472         if ( idx < tag.Length - 1 )
 473         {
 474            var start = idx;
 475            var max = idx;
 476            while ( !Char.IsWhiteSpace( tag[max] ) && ++max < tag.Length ) ;
 477            var isInsert = max - idx == INSERT.Length && tag.IndexOf( INSERT, StringComparison.OrdinalIgnoreCase ) == id
 478            if ( isInsert )
 479            {
 480               // Next word is inserted row id
 481               idx = max + 1;
 482               while ( Char.IsWhiteSpace( tag[idx] ) && ++idx < tag.Length ) ;
 483               max = idx;
 484               while ( !Char.IsWhiteSpace( tag[max] ) && ++max < tag.Length ) ;
 485               if ( max - idx > 0 && Int64.TryParse( tag.Substring( idx, max - idx ), out Int64 insertedID ) )
 486               {
 487                  this.LastInsertedID = insertedID;
 488               }
 489            }
 490
 491            // Last word is affected row count
 492            max = tag.Length - 1;
 493            while ( Char.IsWhiteSpace( tag[max] ) && --max >= 0 ) ;
 494            if ( max > 0 )
 495            {
 496               idx = max;
 497               ++max;
 498               while ( !Char.IsWhiteSpace( tag[idx] ) && --idx >= 0 ) ;
 499               ++idx;
 500               if ( max - idx > 0 && Int32.TryParse( tag.Substring( idx, max - idx ), out Int32 affectedRows ) )
 501               {
 502                  this.AffectedRows = affectedRows;
 503               }
 504
 505               // First integer word marks actual command id
 506               if ( this.AffectedRows.HasValue )
 507               {
 508                  // See if previous word is number (happens only in insert)
 509                  --idx;
 510                  Char c;
 511                  while ( ( Char.IsDigit( ( c = tag[idx] ) ) || c == '-' || c == '+' || Char.IsWhiteSpace( c ) ) && --id
 512                  if ( idx >= 0 )
 513                  {
 514                     actualTag = tag.Substring( 0, idx + 1 );
 515                  }
 516               }
 517            }
 518
 519         }
 520
 521         this.CommandTag = actualTag ?? tag;
 522
 523      }
 524
 525      internal String CommandTag { get; }
 526
 527      internal String FullCommandTag { get; }
 528
 529      internal Int32? AffectedRows { get; }
 530
 531      internal Int64? LastInsertedID { get; }
 532   }
 533
 534   internal sealed class NotificationMessage : BackendMessageObject
 535   {
 536      private readonly NotificationEventArgs _args;
 537
 538      public NotificationMessage( Byte[] array, Encoding encoding )
 539         : base( BackendMessageCode.NotificationResponse )
 540      {
 541         var idx = 0;
 542         this._args = new NotificationEventArgs(
 543            array.ReadPgInt32( ref idx ),
 544            array.ReadZeroTerminatedStringFromBytes( ref idx, encoding ),
 545            array.ReadZeroTerminatedStringFromBytes( ref idx, encoding )
 546            );
 547      }
 548
 549      internal NotificationEventArgs Args
 550      {
 551         get
 552         {
 553            return this._args;
 554         }
 555      }
 556   }
 557
 558   internal sealed class CopyInOrOutMessage : BackendMessageObject
 559   {
 560
 561      public CopyInOrOutMessage( Byte[] array, Boolean isIn )
 562         : base( isIn ? BackendMessageCode.CopyInResponse : BackendMessageCode.CopyOutResponse )
 563      {
 564         var idx = 0;
 565         this.CopyFormat = (DataFormat) array.ReadByteFromBytes( ref idx );
 566         var arraySize = array.ReadPgInt16Count( ref idx );
 567         var formats = new Int16[Math.Max( 0, arraySize )];
 568         for ( var i = 0; i < arraySize; ++i )
 569         {
 570            formats[i] = array.ReadPgInt16( ref idx );
 571         }
 572         this.FieldFormats = formats;
 573      }
 574
 575      internal DataFormat CopyFormat { get; }
 576
 577      internal Int16[] FieldFormats { get; }
 578   }
 579
 580   internal sealed class CopyDataMessage : BackendMessageObject
 581   {
 582      public CopyDataMessage( Int32 messageLength )
 583         : base( BackendMessageCode.CopyData )
 584      {
 585         this.DataSize = messageLength - 4;
 586      }
 587
 588      public Int32 DataSize { get; }
 589   }
 590
 591   internal sealed class MessageWithNoContents : BackendMessageObject
 592   {
 593      public static MessageWithNoContents PARSE_COMPLETE = new MessageWithNoContents( BackendMessageCode.ParseComplete )
 594      public static MessageWithNoContents BIND_COMPLETE = new MessageWithNoContents( BackendMessageCode.BindComplete );
 595      public static MessageWithNoContents CLOSE_COMPLETE = new MessageWithNoContents( BackendMessageCode.CloseComplete )
 596      public static MessageWithNoContents EMPTY_QUERY = new MessageWithNoContents( BackendMessageCode.EmptyQueryResponse
 597      public static MessageWithNoContents NO_DATA = new MessageWithNoContents( BackendMessageCode.NoData );
 598      public static MessageWithNoContents COPY_DONE = new MessageWithNoContents( BackendMessageCode.CopyDone );
 599
 600      private MessageWithNoContents( BackendMessageCode code )
 601         : base( code )
 602      {
 603
 604      }
 605   }
 606
 607
 608   internal enum BackendMessageCode : byte
 609   {
 610      ParseComplete = (Byte) '1',
 611      BindComplete = (Byte) '2',
 612      CloseComplete = (Byte) '3',
 613
 614      NotificationResponse = (Byte) 'A',
 615      CommandComplete = (Byte) 'C',
 616      DataRow = (Byte) 'D',
 617      ErrorResponse = (Byte) 'E',
 618      CopyInResponse = (Byte) 'G',
 619      CopyOutResponse = (Byte) 'H',
 620      EmptyQueryResponse = (Byte) 'I',
 621      BackendKeyData = (Byte) 'K',
 622      NoticeResponse = (Byte) 'N',
 623      AuthenticationRequest = (Byte) 'R',
 624      ParameterStatus = (Byte) 'S',
 625      RowDescription = (Byte) 'T',
 626      FunctionCallResponse = (Byte) 'V',
 627      ReadyForQuery = (Byte) 'Z',
 628
 629      CopyDone = (Byte) 'c',
 630      CopyData = (Byte) 'd',
 631      NoData = (Byte) 'n',
 632      PortalSuspended = (Byte) 's', // We should never get this message, as we always specify to fetch all rows in Execu
 633      ParameterDescription = (Byte) 't',
 634   }
 635}
 636
 637public static partial class E_CBAM
 638{
 639   internal static Int16 ReadPgInt16( this Byte[] array, ref Int32 index )
 640   {
 641      return array.ReadInt16BEFromBytes( ref index );
 642   }
 643
 644   internal static Int32 ReadPgInt16Count( this Byte[] array, ref Int32 index )
 645   {
 646      return array.ReadUInt16BEFromBytes( ref index );
 647   }
 648}