Summary

Class:AsyncEnumeration.Implementation.Provider.SelectManyAsyncEnumerator`2
Assembly:AsyncEnumeration.Implementation.Provider
File(s):/repo-dir/contents/Source/Code/AsyncEnumeration.Implementation.Provider/SelectMany.cs
Covered lines:26
Uncovered lines:0
Coverable lines:26
Total lines:389
Line coverage:100%
Branch coverage:90%
Tag:7d9974899246b95481b7aa9cd3a1462ae2a67c91

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
.ctor(...)101%0%
WaitForNextAsync()1801%1%
TryGetNext(...)201%1%
DisposeAsync()101%0%

File(s)

/repo-dir/contents/Source/Code/AsyncEnumeration.Implementation.Provider/SelectMany.cs

#LineLine coverage
 1/*
 2 * Copyright 2018 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using AsyncEnumeration.Abstractions;
 19using System;
 20using System.Collections.Generic;
 21using System.Text;
 22using System.Threading;
 23using System.Threading.Tasks;
 24using UtilPack;
 25
 26namespace AsyncEnumeration.Implementation.Provider
 27{
 28
 29   public partial class DefaultAsyncProvider
 30   {
 31      /// <summary>
 32      /// This extension method will return <see cref="IAsyncEnumerable{T}"/> which will flatten the items returned by g
 33      /// </summary>
 34      /// <typeparam name="T">The type of source items.</typeparam>
 35      /// <typeparam name="U">The type of target items.</typeparam>
 36      /// <param name="enumerable">This <see cref="IAsyncEnumerable{T}"/>.</param>
 37      /// <param name="selector">The callback to transform a single item into enumerable of items of type <typeparamref 
 38      /// <returns><see cref="IAsyncEnumerable{T}"/> which will return items as flattened asynchronous enumerable.</retu
 39      /// <exception cref="NullReferenceException">If this <see cref="IAsyncEnumerable{T}"/> is <c>null</c>.</exception>
 40      /// <exception cref="ArgumentNullException">If <paramref name="selector"/> is <c>null</c>.</exception>
 41      /// <seealso cref="System.Linq.Enumerable.SelectMany{TSource, TResult}(IEnumerable{TSource}, Func{TSource, IEnumer
 42      public IAsyncEnumerable<U> SelectMany<T, U>( IAsyncEnumerable<T> enumerable, Func<T, IEnumerable<U>> selector )
 43      {
 44         ArgumentValidator.ValidateNotNullReference( enumerable );
 45         ArgumentValidator.ValidateNotNull( nameof( selector ), selector );
 46         return FromTransformCallback( enumerable, selector, ( e, s ) => new SelectManyEnumeratorSync<T, U>( e, s ) );
 47      }
 48
 49      /// <summary>
 50      /// This extension method will return <see cref="IAsyncEnumerable{T}"/> which will asynchronously flatten the item
 51      /// </summary>
 52      /// <typeparam name="T">The type of source items.</typeparam>
 53      /// <typeparam name="U">The type of target items.</typeparam>
 54      /// <param name="enumerable">This <see cref="IAsyncEnumerable{T}"/>.</param>
 55      /// <param name="asyncSelector">The callback to transform a single item into asynchronous enumerable of items of t
 56      /// <returns><see cref="IAsyncEnumerable{T}"/> which will return items as flattened asynchronous enumerable.</retu
 57      /// <exception cref="NullReferenceException">If this <see cref="IAsyncEnumerable{T}"/> is <c>null</c>.</exception>
 58      /// <exception cref="ArgumentNullException">If <paramref name="asyncSelector"/> is <c>null</c>.</exception>
 59      /// <seealso cref="System.Linq.Enumerable.SelectMany{TSource, TResult}(IEnumerable{TSource}, Func{TSource, IEnumer
 60      public IAsyncEnumerable<U> SelectMany<T, U>( IAsyncEnumerable<T> enumerable, Func<T, IAsyncEnumerable<U>> asyncSel
 61      {
 62         ArgumentValidator.ValidateNotNullReference( enumerable );
 63         ArgumentValidator.ValidateNotNull( nameof( asyncSelector ), asyncSelector );
 64         return FromTransformCallback( enumerable, asyncSelector, ( e, s ) => new SelectManyEnumeratorAsync<T, U>( e, s 
 65      }
 66
 67      /// <summary>
 68      /// This extension method will return <see cref="IAsyncEnumerable{T}"/> which will asynchronously flatten the item
 69      /// </summary>
 70      /// <typeparam name="T">The type of source items.</typeparam>
 71      /// <typeparam name="U">The type of target items.</typeparam>
 72      /// <param name="enumerable">This <see cref="IAsyncEnumerable{T}"/>.</param>
 73      /// <param name="asyncSelector">The callback to asynchronously transform a single item into enumerable of items of
 74      /// <returns><see cref="IAsyncEnumerable{T}"/> which will return items as flattened asynchronous enumerable.</retu
 75      /// <exception cref="NullReferenceException">If this <see cref="IAsyncEnumerable{T}"/> is <c>null</c>.</exception>
 76      /// <exception cref="ArgumentNullException">If <paramref name="asyncSelector"/> is <c>null</c>.</exception>
 77      /// <seealso cref="System.Linq.Enumerable.SelectMany{TSource, TResult}(IEnumerable{TSource}, Func{TSource, IEnumer
 78      public IAsyncEnumerable<U> SelectMany<T, U>( IAsyncEnumerable<T> enumerable, Func<T, Task<IEnumerable<U>>> asyncSe
 79      {
 80         ArgumentValidator.ValidateNotNullReference( enumerable );
 81         ArgumentValidator.ValidateNotNull( nameof( asyncSelector ), asyncSelector );
 82         return FromTransformCallback( enumerable, asyncSelector, ( e, s ) => new SelectManyAsyncEnumerator<T, U>( e, s 
 83      }
 84
 85      /// <summary>
 86      /// This extension method will return <see cref="IAsyncEnumerable{T}"/> which will asynchronously flatten the item
 87      /// </summary>
 88      /// <typeparam name="T">The type of source items.</typeparam>
 89      /// <typeparam name="U">The type of target items.</typeparam>
 90      /// <param name="enumerable">This <see cref="IAsyncEnumerable{T}"/>.</param>
 91      /// <param name="asyncSelector">The callback to asynchronously transform a single item into asynchronous enumerabl
 92      /// <returns><see cref="IAsyncEnumerable{T}"/> which will return items as flattened asynchronous enumerable.</retu
 93      /// <exception cref="NullReferenceException">If this <see cref="IAsyncEnumerable{T}"/> is <c>null</c>.</exception>
 94      /// <exception cref="ArgumentNullException">If <paramref name="asyncSelector"/> is <c>null</c>.</exception>
 95      /// <seealso cref="System.Linq.Enumerable.SelectMany{TSource, TResult}(IEnumerable{TSource}, Func{TSource, IEnumer
 96      public IAsyncEnumerable<U> SelectMany<T, U>( IAsyncEnumerable<T> enumerable, Func<T, Task<IAsyncEnumerable<U>>> as
 97      {
 98         ArgumentValidator.ValidateNotNullReference( enumerable );
 99         ArgumentValidator.ValidateNotNull( nameof( asyncSelector ), asyncSelector );
 100         return FromTransformCallback( enumerable, asyncSelector, ( e, s ) => new SelectManyAsyncEnumeratorAsync<T, U>( 
 101      }
 102
 103   }
 104
 105   internal sealed class SelectManyEnumeratorSync<TSource, TResult> : IAsyncEnumerator<TResult>
 106   {
 107
 108      private readonly IAsyncEnumerator<TSource> _source;
 109      private readonly Func<TSource, IEnumerable<TResult>> _selector;
 110
 111      private IEnumerator<TResult> _current;
 112
 113      public SelectManyEnumeratorSync(
 114         IAsyncEnumerator<TSource> source,
 115          Func<TSource, IEnumerable<TResult>> selector
 116         )
 117      {
 118         this._source = ArgumentValidator.ValidateNotNull( nameof( source ), source );
 119         this._selector = ArgumentValidator.ValidateNotNull( nameof( selector ), selector );
 120      }
 121
 122      public Task<Boolean> WaitForNextAsync()
 123      {
 124         var current = this._current;
 125         current?.Dispose();
 126         this._current = null;
 127         return this._source.WaitForNextAsync();
 128      }
 129
 130      public TResult TryGetNext( out Boolean success )
 131      {
 132         var current = this._current;
 133         if ( current == null || !current.MoveNext() )
 134         {
 135            current?.Dispose();
 136            current = null;
 137            do
 138            {
 139               var item = this._source.TryGetNext( out success );
 140               if ( success )
 141               {
 142                  var enumerator = this._selector( item )?.GetEnumerator();
 143                  if ( enumerator != null )
 144                  {
 145                     if ( enumerator.MoveNext() )
 146                     {
 147                        this._current = current = enumerator;
 148                     }
 149                     else
 150                     {
 151                        enumerator.Dispose();
 152                     }
 153                  }
 154
 155               }
 156            } while ( current == null && success );
 157            success = success && current != null;
 158         }
 159         else
 160         {
 161            success = true;
 162         }
 163
 164         return current == null ? default : current.Current;
 165      }
 166
 167      public Task DisposeAsync()
 168      {
 169         return this._source.DisposeAsync();
 170      }
 171   }
 172
 173   internal sealed class SelectManyEnumeratorAsync<TSource, TResult> : IAsyncEnumerator<TResult>
 174   {
 175      private const Int32 CALL_WAIT = 0;
 176      private const Int32 CALL_TRYGET = 1;
 177      private const Int32 ENDED = 2;
 178
 179      private readonly IAsyncEnumerator<TSource> _source;
 180      private readonly Func<TSource, IAsyncEnumerable<TResult>> _selector;
 181
 182      private IAsyncEnumerator<TResult> _current;
 183      private Int32 _state;
 184
 185      public SelectManyEnumeratorAsync(
 186         IAsyncEnumerator<TSource> source,
 187         Func<TSource, IAsyncEnumerable<TResult>> selector
 188         )
 189      {
 190         this._source = ArgumentValidator.ValidateNotNull( nameof( source ), source );
 191         this._selector = ArgumentValidator.ValidateNotNull( nameof( selector ), selector );
 192      }
 193
 194      public Task<Boolean> WaitForNextAsync()
 195      {
 196         return this._state == ENDED ?
 197            TaskUtils.False :
 198            this.PerformWaitForNextAsync();
 199      }
 200
 201      public TResult TryGetNext( out Boolean success )
 202      {
 203         return this._current.TryGetNext( out success );
 204      }
 205
 206      public Task DisposeAsync()
 207      {
 208         return this._source.DisposeAsync();
 209      }
 210
 211      private async Task<Boolean> PerformWaitForNextAsync()
 212      {
 213         var current = this._current;
 214         Boolean retVal;
 215         if ( current == null || !( await current.WaitForNextAsync() ) )
 216         {
 217            if ( current != null )
 218            {
 219               await current.DisposeAsync();
 220            }
 221            current = null;
 222
 223            var state = this._state;
 224            do
 225            {
 226               if ( state == CALL_WAIT )
 227               {
 228                  retVal = await this._source.WaitForNextAsync();
 229                  if ( retVal )
 230                  {
 231                     state = CALL_TRYGET;
 232                  }
 233               }
 234               if ( state == CALL_TRYGET )
 235               {
 236                  var item = this._source.TryGetNext( out retVal );
 237                  if ( retVal )
 238                  {
 239                     current = this._selector( item )?.GetAsyncEnumerator();
 240                  }
 241                  else
 242                  {
 243                     state = CALL_WAIT;
 244                  }
 245               }
 246               else
 247               {
 248                  state = ENDED;
 249               }
 250            } while ( state == CALL_WAIT || ( state == CALL_TRYGET && current == null ) );
 251
 252            Interlocked.Exchange( ref this._state, state );
 253            Interlocked.Exchange( ref this._current, current );
 254         }
 255
 256         return this._state != ENDED;
 257      }
 258   }
 259
 260   internal sealed class SelectManyAsyncEnumerator<TSource, TResult> : IAsyncEnumerator<TResult>
 261   {
 262      private readonly IAsyncEnumerator<TSource> _source;
 263      private readonly Func<TSource, Task<IEnumerable<TResult>>> _selector;
 264      private readonly LinkedList<TResult> _list;
 265
 1266      public SelectManyAsyncEnumerator(
 1267         IAsyncEnumerator<TSource> source,
 1268         Func<TSource, Task<IEnumerable<TResult>>> selector
 1269         )
 270      {
 1271         this._source = source;
 1272         this._selector = selector;
 1273         this._list = new LinkedList<TResult>();
 1274      }
 275
 276
 277      public async Task<Boolean> WaitForNextAsync()
 278      {
 2279         var stack = this._list;
 280         // Discard any previous items
 2281         stack.Clear();
 282         // We must use the predicate in this method, since this is our only asynchronous method while enumerating
 3283         while ( stack.Count == 0 && await this._source.WaitForNextAsync() )
 284         {
 285            Boolean success;
 286            do
 287            {
 2288               var next = this._source.TryGetNext( out success );
 289               IEnumerable<TResult> items;
 2290               if ( success && ( items = await this._selector( next ) ) != null )
 291               {
 6292                  foreach ( var item in items )
 293                  {
 2294                     stack.AddLast( item );
 295                  }
 296
 297               }
 2298            } while ( success );
 299         }
 300
 2301         return stack.Count > 0;
 302
 2303      }
 304
 305      public TResult TryGetNext( out Boolean success )
 306      {
 3307         success = this._list.Count > 0;
 308         TResult retVal;
 3309         if ( success )
 310         {
 2311            retVal = this._list.First.Value;
 2312            this._list.RemoveFirst();
 2313         }
 314         else
 315         {
 1316            retVal = default;
 317         }
 3318         return retVal;
 319      }
 320
 321      public Task DisposeAsync()
 1322         => this._source.DisposeAsync();
 323
 324   }
 325
 326   internal sealed class SelectManyAsyncEnumeratorAsync<TSource, TResult> : IAsyncEnumerator<TResult>
 327   {
 328      private readonly IAsyncEnumerator<TSource> _source;
 329      private readonly Func<TSource, Task<IAsyncEnumerable<TResult>>> _selector;
 330      private readonly LinkedList<TResult> _list;
 331
 332      public SelectManyAsyncEnumeratorAsync(
 333         IAsyncEnumerator<TSource> source,
 334         Func<TSource, Task<IAsyncEnumerable<TResult>>> selector
 335         )
 336      {
 337         this._source = source;
 338         this._selector = selector;
 339         this._list = new LinkedList<TResult>();
 340      }
 341
 342
 343      public async Task<Boolean> WaitForNextAsync()
 344      {
 345         var list = this._list;
 346         // Discard any previous items
 347         list.Clear();
 348         // We must use the predicate in this method, since this is our only asynchronous method while enumerating
 349         while ( list.Count == 0 && await this._source.WaitForNextAsync() )
 350         {
 351            Boolean success;
 352            do
 353            {
 354               var next = this._source.TryGetNext( out success );
 355               IAsyncEnumerable<TResult> items;
 356               if ( success && ( items = await this._selector( next ) ) != null )
 357               {
 358                  await items.EnumerateAsync( item => list.AddLast( item ) );
 359               }
 360            } while ( success );
 361         }
 362
 363         return list.Count > 0;
 364
 365      }
 366
 367      public TResult TryGetNext( out Boolean success )
 368      {
 369         success = this._list.Count > 0;
 370         TResult retVal;
 371         if ( success )
 372         {
 373            retVal = this._list.First.Value;
 374            this._list.RemoveFirst();
 375         }
 376         else
 377         {
 378            retVal = default;
 379         }
 380         return retVal;
 381      }
 382
 383      public Task DisposeAsync()
 384         => this._source.DisposeAsync();
 385
 386   }
 387
 388}
 389