Summary

Class:CBAM.Abstractions.Implementation.ReservedForStatement
Assembly:CBAM.Abstractions.Implementation
File(s):/repo-dir/contents/Source/Code/CBAM.Abstractions.Implementation/Connection.Stream.cs
Covered lines:12
Uncovered lines:0
Coverable lines:12
Total lines:441
Line coverage:100%

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
.ctor()101%0%

File(s)

/repo-dir/contents/Source/Code/CBAM.Abstractions.Implementation/Connection.Stream.cs

#LineLine coverage
 1/*
 2 * Copyright 2017 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using AsyncEnumeration.Abstractions;
 19using AsyncEnumeration.Implementation.Enumerable;
 20using CBAM.Abstractions;
 21using CBAM.Abstractions.Implementation;
 22using System;
 23using System.Collections.Generic;
 24using System.Text;
 25using System.Threading;
 26using System.Threading.Tasks;
 27using UtilPack;
 28
 29namespace CBAM.Abstractions.Implementation
 30{
 31
 32   /// <summary>
 33   /// This interface provides a way to execute some code with checks that the connection is still useable, using <see c
 34   /// This is typically desireable when connection uses unseekable stream (SU) as its communication channel to remote e
 35   /// </summary>
 36   /// <remarks>
 37   /// These methods should really be protected methods of <see cref="ConnectionFunctionalitySU{TStatement, TStatementIn
 38   /// </remarks>
 39   public interface ConnectionFunctionalitySU
 40   {
 41      /// <summary>
 42      /// This method will make sure that connection is reserved for given statement, and if so, execute the given async
 43      /// </summary>
 44      /// <param name="reservedState">The <see cref="ReservedForStatement"/> object identifying the statement the connec
 45      /// <param name="action">The asynchronous callback to execute, if the connection is reserved to statement represen
 46      /// <param name="throwIfBusy">If set to <c>true</c>, will throw an exception if currently underlying stream is res
 47      /// <returns>A task which will complete when <paramref name="action"/> is completed and connection reservation sta
 48      /// <exception cref="InvalidOperationException">If <paramref name="throwIfBusy"/> is <c>true</c>, and if this conn
 49      Task UseStreamWithinStatementAsync( ReservedForStatement reservedState, Func<Task> action, Boolean throwIfBusy );
 50
 51      /// <summary>
 52      /// This method will make sure that connection is reserved for given statement, and if so, execute the given async
 53      /// </summary>
 54      /// <typeparam name="T">The asynchronous return type of <paramref name="func"/>.</typeparam>
 55      /// <param name="reservedState">The <see cref="ReservedForStatement"/> object identifying the statement the connec
 56      /// <param name="func">The asynchronous callback to execute, if the connection is reserved to statement represente
 57      /// <returns>A task which will return result of <paramref name="func"/> and complete when <paramref name="func"/> 
 58      /// <exception cref="InvalidOperationException">If this connection is not reserved to statement represented by giv
 59      ValueTask<T> UseStreamWithinStatementAsync<T>( ReservedForStatement reservedState, Func<ValueTask<T>> func );
 60
 61      /// <summary>
 62      /// This method will make sure that connection is not reserved for any statement, and if so, execute the given asy
 63      /// </summary>
 64      /// <param name="reservedState">The <see cref="ReservedForStatement"/> object identifying the statement reservatio
 65      /// <param name="action">The asynchronous callback to execute, if the connection is not reserved for any statement
 66      /// <param name="oneTimeOnly">If <c>true</c>, the connection reservation state will be returned to original after 
 67      /// <param name="throwIfBusy">If set to <c>true</c>, will throw an exception if currently underlying stream is res
 68      /// <returns>A task which will complete when <paramref name="action"/> is completed and connection reservation sta
 69      /// <exception cref="InvalidOperationException">If <paramref name="throwIfBusy"/> is <c>true</c>, and if this conn
 70      Task UseStreamOutsideStatementAsync( ReservedForStatement reservedState, Func<Task> action, Boolean oneTimeOnly, B
 71
 72      /// <summary>
 73      /// This method will make sure that connection is not reserved for any statement, and if so, execute the given asy
 74      /// </summary>
 75      /// <typeparam name="T">The asynchronous return type of <paramref name="func"/>.</typeparam>
 76      /// <param name="reservedState">The <see cref="ReservedForStatement"/> object identifying the statement reservatio
 77      /// <param name="func">The asynchronous callback to execute, if the connection is not reserved for any statement.<
 78      /// <param name="oneTimeOnly">If <c>true</c>, the connection reservation state will be returned to original after 
 79      /// <returns>A task which will return result of <paramref name="func"/> and complete when <paramref name="func"/> 
 80      /// <exception cref="InvalidOperationException">If this connection is already reserved to another statement.</exce
 81      ValueTask<T> UseStreamOutsideStatementAsync<T>( ReservedForStatement reservedState, Func<ValueTask<T>> func, Boole
 82
 83      /// <summary>
 84      /// This method will make sure that connection is reserved to given statement, and then perform potentially asynch
 85      /// </summary>
 86      /// <param name="reservationObject">The <see cref="ReservedForStatement"/> object identifying the statement reserv
 87      /// <returns>A task which will complete when dipose operations are done, and the connection reservation state has 
 88      /// <exception cref="InvalidOperationException">If this connection is not reserved to statement represented by giv
 89      Task DisposeStatementAsync( ReservedForStatement reservationObject );
 90   }
 91
 92   /// <summary>
 93   /// This class extends <see cref="PooledConnectionFunctionality{TStatement, TStatementInformation, TStatementCreation
 94   /// In such scenarios, the whole stream is typically reserved for execution of single statement at a time.
 95   /// </summary>
 96   /// <typeparam name="TStatement">The type of statement to modify/query remote resource.</typeparam>
 97   /// <typeparam name="TStatementInformation">The read-only information about the statement.</typeparam>
 98   /// <typeparam name="TStatementCreationArgs">The type of arguments to create a new statement.</typeparam>
 99   /// <typeparam name="TEnumerableItem">The type of items enumerated by statement.</typeparam>
 100   /// <typeparam name="TVendor">The type of <see cref="ConnectionVendorFunctionality{TStatement, TStatementCreationArgs
 101   public abstract class ConnectionFunctionalitySU<TStatement, TStatementInformation, TStatementCreationArgs, TEnumerabl
 102      where TStatement : TStatementInformation
 103      where TVendor : ConnectionVendorFunctionality<TStatement, TStatementCreationArgs>
 104   {
 105      private sealed class NotInUse : ConnectionStreamUsageState
 106      {
 107         private NotInUse()
 108         {
 109
 110         }
 111
 112         public static readonly NotInUse Instance = new NotInUse();
 113      }
 114
 115
 116
 117      private ConnectionStreamUsageState _currentlyExecutingStatement;
 118
 119      /// <summary>
 120      /// Creates a new instance of <see cref="ConnectionFunctionalitySU{TStatement, TStatementInformation, TStatementCr
 121      /// </summary>
 122      /// <param name="vendor">The <see cref="ConnectionVendorFunctionality{TStatement, TStatementCreationArgs}"/>.</par
 123      /// <param name="asyncProvider">The optional custom <see cref="IAsyncProvider"/>.</param>
 124      public ConnectionFunctionalitySU(
 125         TVendor vendor,
 126         IAsyncProvider asyncProvider
 127         )
 128         : base( vendor )
 129      {
 130         this._currentlyExecutingStatement = NotInUse.Instance;
 131         this.AsyncProvider = ArgumentValidator.ValidateNotNull( nameof( asyncProvider ), asyncProvider );
 132      }
 133
 134      /// <summary>
 135      /// Gets the <see cref="IAsyncProvider"/> that is used for creating <see cref="IAsyncEnumerable{T}"/>s
 136      /// </summary>
 137      protected IAsyncProvider AsyncProvider { get; }
 138
 139      /// <summary>
 140      /// Implements <see cref="DefaultConnectionFunctionality{TStatement, TStatementInformation, TStatementCreationArgs
 141      /// The <see cref="ExecuteStatement(TStatementInformation, ReservedForStatement)"/> method will be used to perform
 142      /// </summary>
 143      /// <param name="metadata">The statement information.</param>
 144      /// <returns>The <see cref="IAsyncEnumerable{T}"/> that can sequentially enumerate items from underlying stream.</
 145      /// <seealso cref="AsyncEnumerationFactory.CreateExclusiveSequentialEnumerable{T}(SequentialEnumerationStartInfo{T
 146      protected override IAsyncEnumerable<TEnumerableItem> CreateEnumerable(
 147         TStatementInformation metadata
 148         )
 149      {
 150         ReservedForStatement reservation = null;
 151         Func<ValueTask<(Boolean, TEnumerableItem)>> moveNext = null;
 152
 153         return AsyncEnumerationFactory.CreateExclusiveSequentialEnumerable( AsyncEnumerationFactory.CreateSequentialSta
 154               async () =>
 155               {
 156                  TEnumerableItem item;
 157                  Boolean success;
 158                  if ( reservation == null && Interlocked.CompareExchange( ref reservation, this.CreateReservationObject
 159                  {
 160                     (item, success, moveNext) = await this.UseStreamOutsideStatementAsync(
 161                        reservation,
 162                        async () => await this.ExecuteStatement( metadata, reservation ),
 163                        false
 164                        );
 165                  }
 166                  else
 167                  {
 168                     if ( moveNext == null )
 169                     {
 170                        success = false;
 171                        item = default;
 172                     }
 173                     else
 174                     {
 175                        (success, item) = await this.UseStreamWithinStatementAsync( reservation, moveNext );
 176                     }
 177                  }
 178
 179                  return (success, item);
 180               },
 181               () =>
 182               {
 183                  var seenReservation = reservation;
 184
 185                  Interlocked.Exchange( ref reservation, null );
 186                  Interlocked.Exchange( ref moveNext, null );
 187
 188                  return this.DisposeStatementAsync( seenReservation );
 189               } ),
 190               this.AsyncProvider
 191               );
 192
 193      }
 194
 195      /// <summary>
 196      /// Derived classes should override this abstract method to provide custom execution logic for statement.
 197      /// </summary>
 198      /// <param name="stmt">The read-only information about statement being executed.</param>
 199      /// <param name="reservationObject">The reservation object created by <see cref="CreateReservationObject(TStatemen
 200      /// <returns>Information about the backend result and how to enumerate more results.</returns>
 201      /// <remarks>
 202      /// The connection will be marked as reserved to the statement before this method is called.
 203      /// </remarks>
 204      protected abstract ValueTask<(TEnumerableItem, Boolean, Func<ValueTask<(Boolean, TEnumerableItem)>>)> ExecuteState
 205
 206      /// <summary>
 207      /// Derived classes should override this abstract method to create custom <see cref="ReservedForStatement"/> objec
 208      /// </summary>
 209      /// <param name="stmt">The read-only information about statement being executed.</param>
 210      /// <returns>A new instance of <see cref="ReservedForStatement"/> which will be used to mark this connection as be
 211      protected abstract ReservedForStatement CreateReservationObject( TStatementInformation stmt );
 212
 213
 214      /// <inheritdoc />
 215      public async Task UseStreamOutsideStatementAsync( ReservedForStatement reservedState, Func<Task> action, Boolean o
 216      {
 217         if ( ReferenceEquals( Interlocked.CompareExchange( ref this._currentlyExecutingStatement, reservedState, NotInU
 218         {
 219            try
 220            {
 221               await this.UseStreamWithinStatementAsync( reservedState, action );
 222            }
 223            finally
 224            {
 225               if ( oneTimeOnly )
 226               {
 227                  Interlocked.Exchange( ref this._currentlyExecutingStatement, NotInUse.Instance );
 228               }
 229            }
 230         }
 231         else if ( throwIfBusy )
 232         {
 233            throw new InvalidOperationException( "The connection is currently being used by another statement." );
 234         }
 235      }
 236
 237      /// <inheritdoc />
 238      public async ValueTask<T> UseStreamOutsideStatementAsync<T>( ReservedForStatement reservedState, Func<ValueTask<T>
 239      {
 240         if ( ReferenceEquals( Interlocked.CompareExchange( ref this._currentlyExecutingStatement, reservedState, NotInU
 241         {
 242            try
 243            {
 244               return await this.UseStreamWithinStatementAsync( reservedState, func );
 245            }
 246            finally
 247            {
 248               if ( oneTimeOnly )
 249               {
 250                  Interlocked.Exchange( ref this._currentlyExecutingStatement, NotInUse.Instance );
 251               }
 252            }
 253         }
 254         else
 255         {
 256            throw new InvalidOperationException( "The connection is currently being used by another statement." );
 257         }
 258      }
 259
 260      /// <inheritdoc />
 261      public async Task UseStreamWithinStatementAsync( ReservedForStatement reservedState, Func<Task> action, Boolean th
 262      {
 263         ArgumentValidator.ValidateNotNull( nameof( reservedState ), reservedState );
 264         ConnectionStreamUsageState prevState;
 265         if ( ReferenceEquals( ( prevState = Interlocked.CompareExchange( ref this._currentlyExecutingStatement, reserve
 266            || ReferenceEquals( prevState, reservedState.UsageState ) // Re-entrance
 267            )
 268         {
 269            try
 270            {
 271               await action();
 272            }
 273            finally
 274            {
 275               Interlocked.Exchange( ref this._currentlyExecutingStatement, prevState );
 276            }
 277         }
 278         else if ( throwIfBusy )
 279         {
 280            throw new InvalidOperationException( "The stream is not reserved for this statement." );
 281         }
 282      }
 283
 284      /// <inheritdoc />
 285      public async ValueTask<T> UseStreamWithinStatementAsync<T>( ReservedForStatement reservedState, Func<ValueTask<T>>
 286      {
 287         ArgumentValidator.ValidateNotNull( nameof( reservedState ), reservedState );
 288         ConnectionStreamUsageState prevState;
 289         if ( ReferenceEquals( ( prevState = Interlocked.CompareExchange( ref this._currentlyExecutingStatement, reserve
 290            || ReferenceEquals( prevState, reservedState.UsageState ) // Re-entrance
 291            )
 292         {
 293            try
 294            {
 295               return await func();
 296            }
 297            finally
 298            {
 299               Interlocked.Exchange( ref this._currentlyExecutingStatement, prevState );
 300            }
 301         }
 302         else
 303         {
 304            throw new InvalidOperationException( "The stream is not reserved for this statement." );
 305         }
 306      }
 307
 308      /// <inheritdoc />
 309      public async Task DisposeStatementAsync( ReservedForStatement reservationObject )
 310      {
 311         try
 312         {
 313            await this.UseStreamWithinStatementAsync( reservationObject, () => this.PerformDisposeStatementAsync( reserv
 314         }
 315         finally
 316         {
 317            Interlocked.Exchange( ref this._currentlyExecutingStatement, NotInUse.Instance );
 318         }
 319      }
 320
 321      /// <summary>
 322      /// Derived classes should override this abstract method to implement custom dispose functionality when the statem
 323      /// </summary>
 324      /// <param name="reservationObject">The <see cref="ReservedForStatement"/> identifying the statement.</param>
 325      /// <returns>The task which will complete when the dispose procedure has been completed.</returns>
 326      protected abstract Task PerformDisposeStatementAsync( ReservedForStatement reservationObject );
 327
 328      /// <summary>
 329      /// This property implements <see cref="PooledConnectionFunctionality.CanBeReturnedToPool"/> by checking that this
 330      /// </summary>
 331      /// <value>Will return <c>true</c> if this connection is not reserved for any statement.</value>
 332      public override Boolean CanBeReturnedToPool => ReferenceEquals( this._currentlyExecutingStatement, NotInUse.Instan
 333
 334   }
 335
 336
 337   /// <summary>
 338   /// This is common class which is used to mark <see cref="ConnectionFunctionalitySU{TStatement, TStatementInformation
 339   /// </summary>
 340   public abstract class ConnectionStreamUsageState
 341   {
 342
 343   }
 344
 345   /// <summary>
 346   /// This class is used to mark <see cref="ConnectionFunctionalitySU{TStatement, TStatementInformation, TStatementCrea
 347   /// Whenever any actual communication with remote resource is being done, the value of <see cref="UsageState"/> prope
 348   /// </summary>
 349   public class ReservedForStatement : ConnectionStreamUsageState
 350   {
 351      /// <summary>
 352      /// Creates a new instance of <see cref="ReservedForStatement"/> and also new instance of <see cref="CurrentlyInUs
 353      /// </summary>
 96354      public ReservedForStatement(
 96355#if DEBUG
 96356         Object statement
 96357#endif
 96358         )
 359      {
 360#if DEBUG
 361         this.Statement = statement;
 362#endif
 96363         this.UsageState = new CurrentlyInUse(
 96364#if DEBUG
 96365            statement
 96366#endif
 96367            );
 98368      }
 369#if DEBUG
 370      public Object Statement { get; }
 371#endif
 372      /// <summary>
 373      /// Gets the <see cref="CurrentlyInUse"/> object used to mark <see cref="ConnectionFunctionalitySU{TStatement, TSt
 374      /// </summary>
 375      /// <value>The <see cref="CurrentlyInUse"/> object used to mark <see cref="ConnectionFunctionalitySU{TStatement, T
 778376      public CurrentlyInUse UsageState { get; }
 377   }
 378
 379   /// <summary>
 380   /// This class is used to mark <see cref="ConnectionFunctionalitySU{TStatement, TStatementInformation, TStatementCrea
 381   /// </summary>
 382   /// <remarks>
 383   /// The instances of this class can only be created by constructor of <see cref="ReservedForStatement"/> class.
 384   /// </remarks>
 385   public sealed class CurrentlyInUse : ConnectionStreamUsageState
 386   {
 387      internal CurrentlyInUse(
 388#if DEBUG
 389         Object statement
 390#endif
 391         )
 392      {
 393#if DEBUG
 394         this.Statement = statement;
 395#endif
 396      }
 397
 398#if DEBUG
 399      public Object Statement { get; }
 400#endif
 401   }
 402}
 403
 404/// <summary>
 405/// Contains extension methods for types defined in this assembly.
 406/// </summary>
 407public static partial class E_CBAM
 408{
 409   private static readonly ReservedForStatement _NoStatement = new ReservedForStatement(
 410#if DEBUG
 411         null
 412#endif
 413         );
 414
 415   /// <summary>
 416   /// This method will make sure that connection is not reserved for any statement, and if so, execute the given asynch
 417   /// </summary>
 418   /// <param name="functionality">This <see cref="ConnectionFunctionalitySU"/>.</param>
 419   /// <param name="action">The asynchronous callback to execute, if connection is not reserved for any statement.</para
 420   /// <param name="throwIfBusy">If set to <c>true</c>, will throw an exception if currently underlying stream is reserv
 421   /// <returns>A task which will complete when <paramref name="action"/> is completed and connection reservation state 
 422   /// <exception cref="NullReferenceException">If this <see cref="ConnectionFunctionalitySU"/> is <c>null</c>.</excepti
 423   /// <exception cref="InvalidOperationException">If <paramref name="throwIfBusy"/> is <c>true</c>, and if this connect
 424   public static Task UseStreamOutsideStatementAsync( this ConnectionFunctionalitySU functionality, Func<Task> action, B
 425   {
 426      return functionality.UseStreamOutsideStatementAsync( _NoStatement, action, true, throwIfBusy );
 427   }
 428
 429   /// <summary>
 430   /// This method will make sure that connection is not reserved for any statement, and if so, execute the given asynch
 431   /// </summary>
 432   /// <param name="functionality">This <see cref="ConnectionFunctionalitySU"/>.</param>
 433   /// <param name="func">The asynchronous callback to execute, if connection is not reserved for any statement.</param>
 434   /// <returns>A task which will complete when <paramref name="func"/> is completed and connection reservation state ha
 435   /// <exception cref="NullReferenceException">If this <see cref="ConnectionFunctionalitySU"/> is <c>null</c>.</excepti
 436   /// <exception cref="InvalidOperationException">If this connection is reserved to any statement.</exception>
 437   public static ValueTask<T> UseStreamOutsideStatementAsync<T>( this ConnectionFunctionalitySU functionality, Func<Valu
 438   {
 439      return functionality.UseStreamOutsideStatementAsync( _NoStatement, func, true );
 440   }
 441}

Methods/Properties

.ctor()
UsageState()