Summary

Class:CBAM.NATS.AfterPublishSentArgs
Assembly:CBAM.NATS
File(s):/repo-dir/contents/Source/Code/CBAM.NATS/Connection.cs
Covered lines:0
Uncovered lines:9
Coverable lines:9
Total lines:215
Line coverage:0%

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
.ctor(...)100%0%

File(s)

/repo-dir/contents/Source/Code/CBAM.NATS/Connection.cs

#LineLine coverage
 1/*
 2 * Copyright 2018 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using AsyncEnumeration.Abstractions;
 19using CBAM.Abstractions;
 20using CBAM.NATS;
 21using System;
 22using System.Collections.Generic;
 23using System.Linq;
 24using System.Threading;
 25using System.Threading.Tasks;
 26using UtilPack;
 27using TDataProducerResult = System.Threading.Tasks.ValueTask<System.Collections.Generic.IEnumerable<CBAM.NATS.NATSPublis
 28
 29namespace CBAM.NATS
 30{
 31   using TDataProducerFactory = Func<Func<TDataProducerResult>>;
 32
 33   public interface NATSConnection :
 34      Connection<NATSSubscribeStatement, NATSSubscribeStatementInformation, String, NATSMessage, NATSConnectionVendorFun
 35      Connection<NATSPublishStatement, NATSPublishStatementInformation, TDataProducerFactory, NATSPublishCompleted, NATS
 36      NATSConnectionObservability
 37   {
 38      Task<NATSMessage> RequestAsync( String subject, Byte[] data, Int32 offset, Int32 count );
 39   }
 40
 41   public interface NATSConnectionVendorFunctionality :
 42      ConnectionVendorFunctionality<NATSSubscribeStatement, String>,
 43      ConnectionVendorFunctionality<NATSPublishStatement, TDataProducerFactory>
 44   {
 45
 46   }
 47
 48   // TODO this interface might not be required after all - it might encourage 'bad' behaviour (i.e. the exact moment wh
 49   public interface NATSConnectionObservability
 50   {
 51      event GenericEventHandler<AfterSubscriptionSentArgs> AfterSubscriptionSent;
 52
 53      event GenericEventHandler<AfterPublishSentArgs> AfterPublishSent;
 54   }
 55
 56   public sealed class AfterSubscriptionSentArgs
 57   {
 58      public AfterSubscriptionSentArgs(
 59         String subject,
 60         String queue,
 61         Int64? autoUnsubscribeAfter
 62         )
 63      {
 64         this.Subject = ArgumentValidator.ValidateNotEmpty( nameof( subject ), subject );
 65         this.Queue = queue;
 66         this.AutoUnsubscribeAfter = autoUnsubscribeAfter;
 67      }
 68
 69      public String Subject { get; }
 70
 71      public String Queue { get; }
 72
 73      public Int64? AutoUnsubscribeAfter { get; }
 74   }
 75
 76   public sealed class AfterPublishSentArgs
 77   {
 078      public AfterPublishSentArgs(
 079         String subject,
 080         String replyTo
 081         )
 82      {
 083         this.Subject = ArgumentValidator.ValidateNotEmpty( nameof( subject ), subject );
 084         this.ReplyTo = replyTo;
 085      }
 86
 087      public String Subject { get; }
 88
 089      public String ReplyTo { get; }
 90   }
 91}
 92
 93public static partial class E_CBAM
 94{
 95   public static NATSSubscribeStatement CreateSubscribeStatementBuilder( this NATSConnectionVendorFunctionality vendorFu
 96   {
 97      return vendorFunctionality.CreateStatementBuilder( subject );
 98   }
 99
 100   public static NATSPublishStatement CreatePublishStatementBuilder( this NATSConnectionVendorFunctionality vendorFuncti
 101   {
 102      return vendorFunctionality.CreateStatementBuilder( dataProducer );
 103   }
 104
 105   public static NATSSubscribeStatement CreateSubscribeStatementBuilder( this NATSConnection connection, String subject 
 106   {
 107      return ( (Connection<NATSSubscribeStatement, NATSSubscribeStatementInformation, String, NATSMessage, NATSConnectio
 108   }
 109
 110   public static IAsyncEnumerable<NATSMessage> SubscribeAsync( this NATSConnection connection, String subject, Int64 aut
 111   {
 112      var stmt = connection.CreateSubscribeStatementBuilder( subject );
 113      if ( !String.IsNullOrEmpty( queue ) )
 114      {
 115         stmt.Queue = queue;
 116      }
 117      if ( autoUnsubscribeAfter > 0 )
 118      {
 119         stmt.AutoUnsubscribeAfter = autoUnsubscribeAfter;
 120      }
 121
 122      return connection.PrepareStatementForExecution( stmt );
 123   }
 124
 125   public static NATSPublishStatement CreatePublishStatementBuilder( this NATSConnection connection, Func<Func<TDataProd
 126   {
 127      return ( (Connection<NATSPublishStatement, NATSPublishStatementInformation, Func<Func<TDataProducerResult>>, NATSP
 128   }
 129
 130   public static Task<NATSMessage> RequestAsync( this NATSConnection connection, String subject, Byte[] data )
 131   {
 132      return connection.RequestAsync( subject, data, 0, data?.Length ?? 0 );
 133   }
 134
 135
 136   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithStaticDataProducerForWholeArray( this NATSConnection 
 137   {
 138      return connection.PublishWithStaticDataProducer( subject, array, 0, array?.Length ?? 0, replySubject, repeatCount,
 139   }
 140
 141   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithStaticDataProducer( this NATSConnection connection, S
 142   {
 143      var chunk = Enumerable.Repeat( new NATSPublishData( subject, array, offset, count, replySubject ), chunkCount );
 144      return connection.PrepareStatementForExecution( connection.CreatePublishStatementBuilder( () =>
 145      {
 146         var remaining = repeatCount;
 147         return () =>
 148         {
 149            if ( remaining > 0 )
 150            {
 151               var original = remaining;
 152               remaining -= chunkCount;
 153               return new TDataProducerResult( remaining >= 0 ? chunk : chunk.Take( (Int32) original ) );
 154            }
 155            else
 156            {
 157               return default;
 158            }
 159         };
 160      } ) );
 161   }
 162
 163   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithDynamicSynchronousDataProducer( this NATSConnection c
 164   {
 165      var hasMax = repeatCount >= 0;
 166      return connection.PrepareStatementForExecution( connection.CreatePublishStatementBuilder( () =>
 167      {
 168         var remaining = repeatCount;
 169         return () =>
 170         {
 171            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? new TDataProducerResult( producer() ) : defa
 172         };
 173      } ) );
 174   }
 175
 176   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithDynamicAsynchronousDataProducer( this NATSConnection 
 177   {
 178      var hasMax = repeatCount >= 0;
 179      return connection.PrepareStatementForExecution( connection.CreatePublishStatementBuilder( () =>
 180      {
 181         var remaining = repeatCount;
 182         return async () =>
 183         {
 184            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? await producer() : default;
 185         };
 186      } ) );
 187   }
 188
 189   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithDynamicSynchronousDataProducer( this NATSConnection c
 190   {
 191      var hasMax = repeatCount >= 0;
 192      return connection.PrepareStatementForExecution( connection.CreatePublishStatementBuilder( () =>
 193      {
 194         var remaining = repeatCount;
 195         return () =>
 196         {
 197            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? new TDataProducerResult( producer().Singleto
 198         };
 199      } ) );
 200   }
 201
 202   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithDynamicAsynchronousDataProducer( this NATSConnection 
 203   {
 204      var hasMax = repeatCount >= 0;
 205      return connection.PrepareStatementForExecution( connection.CreatePublishStatementBuilder( () =>
 206      {
 207         var remaining = repeatCount;
 208         return async () =>
 209         {
 210            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? ( await producer() ).Singleton() : default;
 211         };
 212      } ) );
 213   }
 214
 215}