Summary

Class:AsyncEnumeration.Implementation.Enumerable.SequentialEnumeratorCurrentInfoWithInt32
Assembly:AsyncEnumeration.Implementation.Enumerable
File(s):/repo-dir/contents/Source/Code/AsyncEnumeration.Implementation.Enumerable/Enumerable.cs
Covered lines:4
Uncovered lines:0
Coverable lines:4
Total lines:347
Line coverage:100%
Tag:7d9974899246b95481b7aa9cd3a1462ae2a67c91

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
.ctor(...)101%0%

File(s)

/repo-dir/contents/Source/Code/AsyncEnumeration.Implementation.Enumerable/Enumerable.cs

#LineLine coverage
 1/*
 2 * Copyright 2017 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using AsyncEnumeration.Abstractions;
 19using System;
 20using System.Collections.Generic;
 21using System.Threading;
 22using System.Threading.Tasks;
 23using UtilPack;
 24
 25namespace AsyncEnumeration.Implementation.Enumerable
 26{
 27   internal sealed class AsyncSequentialOnlyEnumerable<T> : IAsyncEnumerable<T>
 28   {
 29      private readonly Func<SequentialEnumerationStartInfo<T>> _enumerationStart;
 30
 31      public AsyncSequentialOnlyEnumerable(
 32         Func<SequentialEnumerationStartInfo<T>> enumerationStart,
 33         IAsyncProvider asyncProvider
 34         )
 35      {
 36         this._enumerationStart = ArgumentValidator.ValidateNotNull( nameof( enumerationStart ), enumerationStart );
 37         this.AsyncProvider = ArgumentValidator.ValidateNotNull( nameof( asyncProvider ), asyncProvider );
 38      }
 39
 40      public IAsyncEnumerator<T> GetAsyncEnumerator()
 41      {
 42         var startInfo = this._enumerationStart();
 43         return AsyncEnumerationFactory.CreateSequentialEnumerator( startInfo.MoveNext, startInfo.Dispose );
 44      }
 45
 46      public IAsyncProvider AsyncProvider { get; }
 47   }
 48
 49   internal abstract class AbstractAsyncEnumerator<T> : IAsyncEnumerator<T>
 50   {
 51      private const Int32 STATE_INITIAL = 0;
 52      private const Int32 MOVE_NEXT_STARTED = 1;
 53      private const Int32 MOVE_NEXT_ENDED = 2;
 54      private const Int32 STATE_ENDED = 3;
 55      private const Int32 DISPOSING = 4;
 56      private const Int32 DISPOSED = 5;
 57
 58      private const Int32 MOVE_NEXT_STARTED_CURRENT_NOT_READ = 6;
 59      private const Int32 MOVE_NEXT_STARTED_CURRENT_READING = 7;
 60
 61      private Int32 _state;
 62      private readonly SequentialEnumeratorCurrentInfo<T> _current;
 63
 64      public AbstractAsyncEnumerator(
 65         SequentialEnumeratorCurrentInfo<T> currentInfo
 66         )
 67      {
 68         this._state = STATE_INITIAL;
 69         this._current = ArgumentValidator.ValidateNotNull( nameof( currentInfo ), currentInfo );
 70      }
 71
 72      //public Boolean IsConcurrentEnumerationSupported => false;
 73
 74      public async Task<Boolean> WaitForNextAsync()
 75      {
 76         // We can call move next only in initial state, or after we have called it once
 77         var success = false;
 78         //Int32 prevState;
 79         if (
 80            /*( prevState = */Interlocked.CompareExchange( ref this._state, MOVE_NEXT_STARTED, MOVE_NEXT_ENDED )/* ) */=
 81            || /*( prevState = */Interlocked.CompareExchange( ref this._state, MOVE_NEXT_STARTED, MOVE_NEXT_STARTED_CURR
 82            || /*( prevState = */Interlocked.CompareExchange( ref this._state, MOVE_NEXT_STARTED, STATE_INITIAL ) /* ) *
 83            )
 84         {
 85            T current = default;
 86            try
 87            {
 88
 89               var moveNext = this._current.MoveNext;
 90               if ( moveNext == null )
 91               {
 92                  success = false;
 93               }
 94               else
 95               {
 96                  (success, current) = await moveNext();
 97                  if ( success )
 98                  {
 99                     this._current.Current = current;
 100                  }
 101               }
 102            }
 103            finally
 104            {
 105               Interlocked.Exchange( ref this._state, success ? MOVE_NEXT_STARTED_CURRENT_NOT_READ : STATE_ENDED );
 106            }
 107         }
 108         else
 109         {
 110            // Re-entrancy or concurrent with Reset -> exception
 111            // TODO -> Maybe use await + Interlocked.CompareExchange-loop to wait... ? Waiting is always prone to deadlo
 112            throw new InvalidOperationException( "Tried to concurrently move to next or reset." );
 113         }
 114
 115         return success;
 116      }
 117
 118      public T TryGetNext( out Boolean success )
 119      {
 120         var cur = this._current;
 121         success = Interlocked.CompareExchange( ref this._state, MOVE_NEXT_STARTED_CURRENT_READING, MOVE_NEXT_STARTED_CU
 122         if ( success )
 123         {
 124            try
 125            {
 126               return cur.Current;
 127            }
 128            finally
 129            {
 130               Interlocked.Exchange( ref this._state, MOVE_NEXT_ENDED );
 131            }
 132         }
 133         else
 134         {
 135            return default;
 136         }
 137
 138      }
 139
 140      public virtual async Task DisposeAsync()
 141      {
 142         // We can dispose from STATE_INITIAL, MOVE_NEXT_ENDED, STATE_ENDED, and MOVE_NEXT_STARTED_CURRENT_NOT_READ stat
 143         Int32 prevState;
 144         if (
 145            ( prevState = Interlocked.CompareExchange( ref this._state, DISPOSING, STATE_ENDED ) ) == STATE_ENDED
 146            || ( prevState = Interlocked.CompareExchange( ref this._state, DISPOSING, MOVE_NEXT_ENDED ) ) == MOVE_NEXT_E
 147            || ( prevState = Interlocked.CompareExchange( ref this._state, DISPOSING, STATE_INITIAL ) ) == STATE_INITIAL
 148            || ( prevState = Interlocked.CompareExchange( ref this._state, DISPOSING, MOVE_NEXT_STARTED_CURRENT_NOT_READ
 149            )
 150         {
 151            try
 152            {
 153               var task = this._current.Dispose?.Invoke();
 154               if ( task != null )
 155               {
 156                  await task;
 157               }
 158            }
 159            finally
 160            {
 161               Interlocked.Exchange( ref this._state, DISPOSED );
 162            }
 163         }
 164         else
 165         {
 166            throw prevState == DISPOSED ?
 167               new ObjectDisposedException( this.GetType().FullName ) :
 168               new InvalidOperationException( "Enumerator can not be disposed at this stage." );
 169         }
 170      }
 171
 172   }
 173
 174   internal sealed class AsyncEnumerator<T> : AbstractAsyncEnumerator<T>
 175   {
 176      public AsyncEnumerator(
 177         SequentialEnumeratorCurrentInfo<T> currentInfo
 178         ) : base( currentInfo )
 179      {
 180      }
 181   }
 182
 183   internal sealed class AsyncEnumerableExclusive<T> : IAsyncEnumerable<T>
 184   {
 185      private sealed class Enumerator : AbstractAsyncEnumerator<T>
 186      {
 187         private readonly Action _reset;
 188
 189         public Enumerator(
 190            SequentialEnumeratorCurrentInfo<T> currentInfo,
 191            Action reset
 192            ) : base( currentInfo )
 193         {
 194            this._reset = reset;
 195         }
 196
 197         public override async Task DisposeAsync()
 198         {
 199            try
 200            {
 201               await base.DisposeAsync();
 202            }
 203            finally
 204            {
 205               this._reset();
 206            }
 207         }
 208      }
 209
 210      private const Int32 STATE_AVAILABLE = 0;
 211      private const Int32 STATE_RESERVED = 1;
 212
 213      private Int32 _state;
 214      private readonly SequentialEnumeratorCurrentInfo<T> _currentInfo;
 215      private readonly Action _reset;
 216
 217      public AsyncEnumerableExclusive(
 218         SequentialEnumeratorCurrentInfo<T> currentInfo,
 219         IAsyncProvider asyncProvider
 220         )
 221      {
 222         this.AsyncProvider = ArgumentValidator.ValidateNotNull( nameof( asyncProvider ), asyncProvider );
 223         this._currentInfo = currentInfo;
 224         this._reset = () =>
 225         {
 226            Interlocked.Exchange( ref this._state, STATE_AVAILABLE );
 227         };
 228      }
 229
 230      public IAsyncProvider AsyncProvider { get; }
 231
 232      public IAsyncEnumerator<T> GetAsyncEnumerator()
 233      {
 234         return Interlocked.CompareExchange( ref this._state, STATE_RESERVED, STATE_AVAILABLE ) == STATE_AVAILABLE ?
 235            new Enumerator( this._currentInfo, this._reset ) :
 236            throw new InvalidOperationException( "Concurrent enumeration" );
 237      }
 238   }
 239
 240   internal abstract class SequentialEnumeratorCurrentInfo<T>
 241   {
 242      public SequentialEnumeratorCurrentInfo(
 243         MoveNextAsyncDelegate<T> moveNext,
 244         EnumerationEndedDelegate disposeDelegate
 245      )
 246      {
 247         this.MoveNext = moveNext;
 248         this.Dispose = disposeDelegate;
 249      }
 250
 251      public MoveNextAsyncDelegate<T> MoveNext { get; }
 252      public EnumerationEndedDelegate Dispose { get; }
 253
 254      public abstract T Current { get; set; }
 255   }
 256
 257   internal sealed class SequentialEnumeratorCurrentInfoWithObject<T> : SequentialEnumeratorCurrentInfo<T>
 258   {
 259      private Object _current;
 260
 261      public SequentialEnumeratorCurrentInfoWithObject(
 262         MoveNextAsyncDelegate<T> moveNext,
 263         EnumerationEndedDelegate disposeDelegate
 264         ) : base( moveNext, disposeDelegate )
 265      {
 266      }
 267
 268      public override T Current
 269      {
 270         get => (T) this._current;
 271         set => Interlocked.Exchange( ref this._current, value );
 272      }
 273
 274   }
 275
 276   internal sealed class SequentialEnumeratorCurrentInfoWithInt32 : SequentialEnumeratorCurrentInfo<Int32>
 277   {
 278      private Int32 _current;
 279
 280      public SequentialEnumeratorCurrentInfoWithInt32(
 281         MoveNextAsyncDelegate<Int32> moveNext,
 282         EnumerationEndedDelegate disposeDelegate
 7283         ) : base( moveNext, disposeDelegate )
 284      {
 7285      }
 286
 287      public override Int32 Current
 288      {
 42289         get => this._current;
 42290         set => Interlocked.Exchange( ref this._current, value );
 291      }
 292   }
 293
 294   internal sealed class SequentialEnumeratorCurrentInfoWithInt64 : SequentialEnumeratorCurrentInfo<Int64>
 295   {
 296      private Int64 _current;
 297
 298      public SequentialEnumeratorCurrentInfoWithInt64(
 299         MoveNextAsyncDelegate<Int64> moveNext,
 300         EnumerationEndedDelegate disposeDelegate
 301         ) : base( moveNext, disposeDelegate )
 302      {
 303      }
 304
 305      public override Int64 Current
 306      {
 307         get => Interlocked.Read( ref this._current );
 308         set => Interlocked.Exchange( ref this._current, value );
 309      }
 310   }
 311
 312   internal sealed class SequentialEnumeratorCurrentInfoWithFloat32 : SequentialEnumeratorCurrentInfo<Single>
 313   {
 314      private Single _current;
 315
 316      public SequentialEnumeratorCurrentInfoWithFloat32(
 317         MoveNextAsyncDelegate<Single> moveNext,
 318         EnumerationEndedDelegate disposeDelegate
 319         ) : base( moveNext, disposeDelegate )
 320      {
 321      }
 322
 323      public override Single Current
 324      {
 325         get => this._current;
 326         set => Interlocked.Exchange( ref this._current, value );
 327      }
 328   }
 329
 330   internal sealed class SequentialEnumeratorCurrentInfoWithFloat64 : SequentialEnumeratorCurrentInfo<Double>
 331   {
 332      private Int64 _current;
 333
 334      public SequentialEnumeratorCurrentInfoWithFloat64(
 335         MoveNextAsyncDelegate<Double> moveNext,
 336         EnumerationEndedDelegate disposeDelegate
 337         ) : base( moveNext, disposeDelegate )
 338      {
 339      }
 340
 341      public override Double Current
 342      {
 343         get => BitConverter.Int64BitsToDouble( Interlocked.Read( ref this._current ) );
 344         set => Interlocked.Exchange( ref this._current, BitConverter.DoubleToInt64Bits( value ) );
 345      }
 346   }
 347}