Summary

Class:CBAM.HTTP.Implementation.ReadState
Assembly:CBAM.HTTP.Implementation
File(s):/repo-dir/contents/Source/Code/CBAM.HTTP.Implementation/Connection.cs
Covered lines:4
Uncovered lines:0
Coverable lines:4
Total lines:521
Line coverage:100%

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
.ctor()101%0%

File(s)

/repo-dir/contents/Source/Code/CBAM.HTTP.Implementation/Connection.cs

#LineLine coverage
 1/*
 2 * Copyright 2017 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using CBAM.Abstractions.Implementation;
 19using CBAM.HTTP;
 20using CBAM.HTTP.Implementation;
 21using System;
 22using System.Collections.Generic;
 23using System.IO;
 24using System.Linq;
 25using System.Text;
 26using System.Threading;
 27using System.Threading.Tasks;
 28using UtilPack;
 29
 30namespace CBAM.HTTP.Implementation
 31{
 32   internal sealed class HTTPConnectionImpl<TRequestMetaData> : ConnectionImpl<HTTPStatement<TRequestMetaData>, HTTPStat
 33   {
 34      public HTTPConnectionImpl(
 35         HTTPConnectionFunctionalityImpl<TRequestMetaData> functionality
 36         ) : base( functionality )
 37      {
 38      }
 39
 40      public String ProtocolVersion => HTTPFactory.VERSION_HTTP1_1;
 41   }
 42
 43   internal sealed class HTTPConnectionFunctionalityImpl<TRequestMetaData> : ConnectionFunctionalitySU<HTTPStatement<TRe
 44   {
 45      private readonly ClientProtocolIOState _state;
 46
 47      public HTTPConnectionFunctionalityImpl(
 48         HTTPConnectionVendorImpl<TRequestMetaData> vendor,
 49         ClientProtocolIOState state
 50         ) : base( vendor, AsyncEnumeration.Implementation.Provider.DefaultAsyncProvider.Instance )
 51      {
 52         this._state = ArgumentValidator.ValidateNotNull( nameof( state ), state );
 53      }
 54
 55
 56      public Stream Stream => this._state.Stream;
 57
 58      protected override ReservedForStatement CreateReservationObject( HTTPStatementInformation<TRequestMetaData> stmt )
 59      {
 60         return new ReservedForStatement(
 61#if DEBUG
 62            stmt
 63#endif
 64            );
 65      }
 66
 67      protected override HTTPStatementInformation<TRequestMetaData> GetInformationFromStatement( HTTPStatement<TRequestM
 68      {
 69         return statement?.Information;
 70      }
 71
 72      protected override Task PerformDisposeStatementAsync( ReservedForStatement reservationObject )
 73      {
 74         // Nothing to do as HTTP is stateless protocol
 75         return TaskUtils.CompletedTask;
 76      }
 77
 78      protected override void ValidateStatementOrThrow( HTTPStatementInformation<TRequestMetaData> statement )
 79      {
 80         ArgumentValidator.ValidateNotNull( nameof( statement ), statement );
 81      }
 82
 83      protected override async ValueTask<(HTTPResponseInfo<TRequestMetaData>, Boolean, Func<ValueTask<(Boolean, HTTPResp
 84         HTTPStatementInformation<TRequestMetaData> stmt,
 85         ReservedForStatement reservationObject
 86         )
 87      {
 88         var stmtImpl = (HTTPStatementInformationImpl<TRequestMetaData>) stmt;
 89         var generator = stmtImpl.NextRequestGenerator;
 90         var currentMD = stmtImpl.InitialRequestMetaData;
 91         HTTPResponse currentResponse = default;
 92         async ValueTask<(Boolean, HTTPResponseInfo<TRequestMetaData>)> ReadNextResponse()
 93         {
 94            var requestInfo = await ( generator?.Invoke( new HTTPResponseInfo<TRequestMetaData>( currentResponse, curren
 95            currentMD = requestInfo.RequestMetaData;
 96            // Call this always, as it will take care of reading the previous response content till the end.
 97            currentResponse = await this.SendAndReceive( currentResponse, requestInfo.Request );
 98            return (currentResponse != default, currentResponse == default ? default : new HTTPResponseInfo<TRequestMeta
 99         }
 100
 101         // Send request
 102         return (
 103            new HTTPResponseInfo<TRequestMetaData>( currentResponse = await this.SendAndReceive( default, stmtImpl.Initi
 104            true,
 105            ReadNextResponse
 106            );
 107      }
 108
 109      private async Task<HTTPResponse> SendAndReceive(
 110         HTTPResponse prevResponse,
 111         HTTPRequest request
 112         )
 113      {
 114         var state = this._state;
 115         var buffer = state.ReadState.Buffer;
 116
 117         HTTPResponseContent prevResponseContent;
 118         if ( ( prevResponseContent = prevResponse?.Content ) != null )
 119         {
 120            while ( ( await prevResponseContent.ReadToBuffer( buffer.Array, 0, buffer.CurrentMaxCapacity ) ) > 0 )
 121               ;
 122         }
 123
 124         HTTPResponse retVal;
 125         if ( request != null )
 126         {
 127            var requestMethod = await this._state.SendRequest(
 128               request,
 129               this.CurrentCancellationToken
 130               );
 131            retVal = await this._state.ReceiveResponse(
 132               requestMethod,
 133               this.CurrentCancellationToken
 134               );
 135         }
 136         else
 137         {
 138            retVal = null;
 139         }
 140
 141         return retVal;
 142      }
 143
 144   }
 145
 146   internal abstract class AbstractIOState
 147   {
 148
 149      public AbstractIOState()
 150      {
 151         //this.Lock = new AsyncLock();
 152         this.Buffer = new ResizableArray<Byte>( 0x100 );
 153      }
 154
 155      public ResizableArray<Byte> Buffer { get; }
 156
 157      //public AsyncLock Lock { get; }
 158   }
 159
 160   internal sealed class WriteState : AbstractIOState
 161   {
 162      public WriteState(
 163         ) : base()
 164      {
 165      }
 166   }
 167
 168   internal sealed class ReadState : AbstractIOState
 169   {
 170      public ReadState(
 2171         ) : base()
 172      {
 2173         this.BufferAdvanceState = new BufferAdvanceState();
 2174      }
 175
 2176      public BufferAdvanceState BufferAdvanceState { get; }
 177   }
 178
 179   internal sealed class ClientProtocolIOState
 180   {
 181
 182      public ClientProtocolIOState(
 183         Stream stream,
 184         BinaryStringPool stringPool,
 185         IEncodingInfo encoding,
 186         WriteState writeState,
 187         ReadState readState
 188         )
 189      {
 190         this.Stream = ArgumentValidator.ValidateNotNull( nameof( stream ), stream );
 191         this.StringPool = stringPool ?? BinaryStringPoolFactory.NewNotConcurrentBinaryStringPool( encoding.Encoding );
 192         this.Encoding = ArgumentValidator.ValidateNotNull( nameof( encoding ), encoding );
 193         this.WriteState = writeState ?? new WriteState();
 194         this.ReadState = readState ?? new ReadState();
 195      }
 196
 197      public WriteState WriteState { get; }
 198
 199      public ReadState ReadState { get; }
 200
 201      public Stream Stream { get; }
 202
 203      public BinaryStringPool StringPool { get; }
 204
 205      public IEncodingInfo Encoding { get; }
 206   }
 207
 208   internal sealed class HTTPConnectionVendorImpl<TRequestMetaData> : HTTPConnectionVendorFunctionality<TRequestMetaData
 209   {
 210      internal static HTTPConnectionVendorImpl<TRequestMetaData> Instance { get; } = new HTTPConnectionVendorImpl<TReque
 211
 212      private HTTPConnectionVendorImpl()
 213      {
 214
 215      }
 216
 217      public HTTPStatement<TRequestMetaData> CreateStatementBuilder( HTTPRequestInfo<TRequestMetaData> creationArgs )
 218      {
 219         return new HTTPStatementImpl<TRequestMetaData>( creationArgs );
 220      }
 221   }
 222
 223   internal sealed class HTTPWriterImpl : HTTPWriter
 224   {
 225      private readonly ResizableArray<Byte> _buffer;
 226      private readonly Stream _stream;
 227      private readonly CancellationToken _token;
 228
 229      public HTTPWriterImpl( ResizableArray<Byte> buffer, Stream stream, CancellationToken token )
 230      {
 231         this._buffer = ArgumentValidator.ValidateNotNull( nameof( buffer ), buffer );
 232         this._stream = ArgumentValidator.ValidateNotNull( nameof( stream ), stream );
 233         this._token = token;
 234      }
 235
 236      public Byte[] Buffer => this._buffer.Array;
 237
 238      public async ValueTask<Int64> FlushBufferContents( Int32 offset, Int32 count )
 239      {
 240         await this._stream.WriteAsync( this.Buffer, offset, count, this._token );
 241         return count - offset;
 242      }
 243   }
 244
 245}
 246
 247/// <summary>
 248/// This class contains extension methods for types defined in this assembly.
 249/// </summary>
 250public static partial class E_CBAM
 251{
 252   private const Byte CR = 0x0D; // \r
 253   private const Byte LF = 0x0A; // \n
 254   private const Byte SPACE = 0x20;
 255   private const Byte COLON = 0x3A;
 256
 257   private const String CRLF = "\r\n";
 258   private static readonly Byte[] CRLF_BYTES = new[] { (Byte) '\r', (Byte) '\n' };
 259   private const String SPACE_STR = " ";
 260   private const String COLON_STR = ":";
 261
 262   internal static Task WriteHTTPString( this Stream stream, ResizableArray<Byte> buffer, Encoding encoding, String str,
 263   {
 264      Task retVal;
 265      if ( !String.IsNullOrEmpty( str ) )
 266      {
 267         var strByteCount = encoding.GetByteCount( str );
 268         var count = bufferIndex + strByteCount;
 269         var array = buffer.SetCapacityAndReturnArray( count );
 270         encoding.GetBytes( str, 0, str.Length, array, bufferIndex );
 271         retVal = stream.WriteAsync( array, 0, count, token );
 272      }
 273      else
 274      {
 275         retVal = TaskUtils.CompletedTask;
 276      }
 277
 278      return retVal;
 279   }
 280
 281   internal static async Task<String> SendRequest(
 282      this ClientProtocolIOState state,
 283      HTTPRequest request,
 284      CancellationToken token
 285      )
 286   {
 287      // null as "separator" treats "str" as URI path and query
 288      // Empty string as "separator" prevents all escaping
 289      String EscapeHTTPComponentString( String str, String separator )
 290      {
 291         if ( !String.IsNullOrEmpty( str ) )
 292         {
 293            if ( separator == null && str != "*" && !Uri.IsWellFormedUriString( str, UriKind.RelativeOrAbsolute ) )
 294            {
 295               str = new Uri( "dummy://dummy:1" + ( str[0] == '/' ? "" : "/" ) + str ).PathAndQuery;
 296            }
 297            else if ( !String.IsNullOrEmpty( separator ) && str.IndexOf( separator ) >= 0 )
 298            {
 299               // TODO extremely ineffective, but hopefully we won't be going here very often
 300               str = str.Replace( separator, new String( separator.ToCharArray().SelectMany( s => new[] { '\\', s } ).To
 301            }
 302         }
 303         return str;
 304      }
 305
 306      String method = null;
 307
 308      var stream = state.Stream;
 309      var wState = state.WriteState;
 310      var buffer = wState.Buffer;
 311      var encoding = state.Encoding.Encoding;
 312
 313      // First line - method, path, version
 314      await stream.WriteHTTPString( buffer, encoding, method = EscapeHTTPComponentString( request.Method, SPACE_STR ), t
 315      buffer.Array[0] = SPACE;
 316      var path = request.Path;
 317      if ( String.IsNullOrEmpty( path ) )
 318      {
 319         path = "/";
 320      }
 321      await stream.WriteHTTPString( buffer, encoding, EscapeHTTPComponentString( path, null ), token, 1 );
 322      await stream.WriteHTTPString( buffer, encoding, EscapeHTTPComponentString( request.Version, CRLF ), token, 1 );
 323      // CRLF will be sent as part of sending headers
 324
 325      // Headers
 326      foreach ( var hdr in request.Headers )
 327      {
 328         foreach ( var hdrValue in hdr.Value )
 329         {
 330            buffer.Array[0] = CR;
 331            buffer.Array[1] = LF;
 332            await stream.WriteHTTPString( buffer, encoding, EscapeHTTPComponentString( hdr.Key.Trim(), CRLF ), token, 2 
 333            buffer.Array[0] = COLON;
 334            await stream.WriteHTTPString( buffer, encoding, EscapeHTTPComponentString( hdrValue, CRLF ), token, 1 );
 335         }
 336      }
 337
 338      await stream.WriteHTTPString( buffer, encoding, CRLF + CRLF, token );
 339      await stream.FlushAsync( default );
 340
 341      // Body
 342      var body = request.Content;
 343      if ( body != null )
 344      {
 345         var bodySize = body.ByteCount;
 346         if ( ( bodySize ?? -1 ) != 0 )
 347         {
 348            if ( bodySize.HasValue )
 349            {
 350               buffer.CurrentMaxCapacity = buffer.MaximumSize < 0 ? (Int32) bodySize.Value : (Int32) Math.Min( bodySize.
 351            }
 352
 353            await body.WriteToStream( new HTTPWriterImpl( buffer, stream, token ), bodySize );
 354
 355            await stream.FlushAsync( default );
 356         }
 357      }
 358
 359      return method;
 360   }
 361
 362   internal static async ValueTask<HTTPResponse> ReceiveResponse(
 363      this ClientProtocolIOState state,
 364      String requestMethod,
 365      CancellationToken token
 366      )
 367   {
 368      String UnescapeHTTPComponentString( String str ) //, String separator )
 369      {
 370         //if ( !String.IsNullOrEmpty( str ) )
 371         //{
 372         //   if ( separator == null && str != "*" )
 373         //   {
 374         //      //str = new Uri( str, UriKind.RelativeOrAbsolute ).PathAndQuery;
 375         //   }
 376         //   else if ( !String.IsNullOrEmpty( separator ) &&  )
 377         //      str = new Uri( "dummy://dummy:1" + ( str[0] == '/' ? "" : "/" ) + str ).PathAndQuery;
 378         //}
 379         return str;
 380      }
 381
 382      var streamReadCount = 0x1000;
 383      var stream = state.Stream;
 384      var rState = state.ReadState;
 385      var buffer = rState.Buffer;
 386      var aState = rState.BufferAdvanceState;
 387      var strings = state.StringPool;
 388
 389      // Read first line
 390      if ( aState.BufferTotal > 0 )
 391      {
 392         HTTPUtils.EraseReadData( aState, buffer, true );
 393      }
 394      await stream.ReadUntilMaybeAsync( buffer, aState, CRLF_BYTES, streamReadCount );
 395      var array = buffer.Array;
 396      var idx = Array.IndexOf( array, SPACE );
 397      var version = UnescapeHTTPComponentString( strings.GetString( array, 0, idx ) );
 398
 399      var start = idx + 1;
 400      idx = Array.IndexOf( array, SPACE, start );
 401      var statusCode = UnescapeHTTPComponentString( strings.GetString( array, start, idx - start ) );
 402      Int32.TryParse( statusCode, out var statusCodeInt );
 403
 404      // The rest is message
 405      var statusMessage = UnescapeHTTPComponentString( strings.GetString( array, idx + 1, aState.BufferOffset - idx - 1 
 406      // Read headers - one line at a time
 407      // TODO max header count limit (how many fieldname:fieldvalue lines)
 408      var headers = HTTPFactory.CreateHeadersDictionary();
 409      HTTPUtils.EraseReadData( aState, buffer );
 410      Int32 bufferOffset;
 411      do
 412      {
 413         await stream.ReadUntilMaybeAsync( buffer, aState, CRLF_BYTES, streamReadCount );
 414         bufferOffset = aState.BufferOffset;
 415         if ( bufferOffset > 0 )
 416         {
 417            array = buffer.Array;
 418            idx = Array.IndexOf( array, COLON );
 419            if ( idx > 0 )
 420            {
 421               start = 0;
 422               // In this block, "idx" = count
 423               TrimBeginAndEnd( array, ref start, ref idx, false );
 424               if ( start < idx )
 425               {
 426                  var headerName = UnescapeHTTPComponentString( strings.GetString( array, start, idx ) );
 427                  start = idx + 1;
 428                  idx = bufferOffset - start;
 429                  TrimBeginAndEnd( array, ref start, ref idx, true );
 430                  String headerValue;
 431                  if ( idx > 0 )
 432                  {
 433                     headerValue = UnescapeHTTPComponentString( strings.GetString( array, start, idx ) );
 434                  }
 435                  else
 436                  {
 437                     headerValue = String.Empty;
 438                  }
 439                  headers
 440                     .GetOrAdd_NotThreadSafe( headerName, hn => new List<String>( 1 ) )
 441                     .Add( headerValue );
 442               }
 443            }
 444         }
 445
 446         HTTPUtils.EraseReadData( aState, buffer );
 447      } while ( bufferOffset > 0 );
 448
 449      // Now we can set the content, if it is present
 450      // https://tools.ietf.org/html/rfc7230#section-3.3
 451      var hasContent = CanHaveMessageContent( requestMethod, statusCodeInt );
 452      HTTPResponseContent responseContent;
 453      if ( hasContent )
 454      {
 455
 456         if ( headers.TryGetValue( "Content-Length", out var headerValues )
 457            && headerValues.Count > 0
 458            && Int64.TryParse( headerValues[0], out var contentLengthInt )
 459            )
 460         {
 461            responseContent = HTTPFactory.CreateResponseContentWithKnownByteCount(
 462               stream,
 463               buffer.Array,
 464               aState,
 465               contentLengthInt,
 466               token
 467               );
 468         }
 469         else if ( headers.TryGetValue( "Transfer-Encoding", out headerValues )
 470          && headerValues.Count > 0
 471          && headerValues.Any( xferEncoding => String.Equals( xferEncoding, "chunked", StringComparison.OrdinalIgnoreCas
 472          )
 473         {
 474            responseContent = await HTTPFactory.CreateResponseContentWithChunkedEncoding(
 475               stream,
 476               buffer,
 477               aState,
 478               streamReadCount,
 479               token
 480               );
 481         }
 482         else
 483         {
 484            throw new InvalidOperationException( "Response did not have content length nor recognizable transfer encodin
 485         }
 486      }
 487      else
 488      {
 489         responseContent = EmptyHTTPResponseContent.Instance;
 490      }
 491
 492      return HTTPFactory.CreateResponse( version, statusCodeInt, statusMessage, headers, responseContent );
 493   }
 494
 495
 496   private static Boolean CanHaveMessageContent( String requestMethod, Int32 statusCode )
 497   {
 498      // Request: HEAD method never has content
 499      // Response: 1XX, 204, and 304 never have content
 500      return !( String.Equals( requestMethod, "HEAD" ) || ( statusCode >= 100 && statusCode < 200 ) || statusCode == 204
 501   }
 502
 503   private static void TrimBeginAndEnd( Byte[] array, ref Int32 start, ref Int32 count, Boolean trimEnd )
 504   {
 505      // Trim begin
 506      while ( count > 0 && Char.IsWhiteSpace( (Char) array[start] ) )
 507      {
 508         ++start;
 509         --count;
 510      }
 511      if ( trimEnd )
 512      {
 513         // Trim end
 514         while ( count > 0 && Char.IsWhiteSpace( (Char) array[start + count - 1] ) )
 515         {
 516            --count;
 517         }
 518      }
 519   }
 520}
 521

Methods/Properties

.ctor()
BufferAdvanceState()