Summary

Class:E_CBAM
Assembly:CBAM.NATS
File(s):/repo-dir/contents/Source/Code/CBAM.NATS/Connection.cs
/repo-dir/contents/Source/Code/CBAM.NATS/Message.cs
/repo-dir/contents/Source/Code/CBAM.NATS/Statement.cs
Covered lines:35
Uncovered lines:99
Coverable lines:134
Total lines:478
Line coverage:26.1%
Branch coverage:50%

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
CreateSubscribeStatementBuilder(...)101%0%
CreatePublishStatementBuilder(...)101%0%
CreateSubscribeStatementBuilder(...)101%0%
SubscribeAsync(...)400.833%0.75%
CreatePublishStatementBuilder(...)101%0%
RequestAsync(...)201%0.5%
PublishWithStaticDataProducerForWholeArray(...)201%0.5%
PublishWithStaticDataProducer(...)101%0%
PublishWithDynamicSynchronousDataProducer(...)100%0%
PublishWithDynamicAsynchronousDataProducer(...)100%0%
PublishWithDynamicSynchronousDataProducer(...)100%0%
PublishWithDynamicAsynchronousDataProducer(...)100%0%
CreateDataArray(...)200.857%0.5%
CopyAllDataTo(...)100%0%
WithStaticDataProducerForWholeArray(...)200%0%
WithStaticDataProducer(...)100%0%
WithDynamicSynchronousDataProducer(...)100%0%
WithDynamicAsynchronousDataProducer(...)100%0%
WithDynamicSynchronousDataProducer(...)100%0%
WithDynamicAsynchronousDataProducer(...)100%0%

File(s)

/repo-dir/contents/Source/Code/CBAM.NATS/Connection.cs

#LineLine coverage
 1/*
 2 * Copyright 2018 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using AsyncEnumeration.Abstractions;
 19using CBAM.Abstractions;
 20using CBAM.NATS;
 21using System;
 22using System.Collections.Generic;
 23using System.Linq;
 24using System.Threading;
 25using System.Threading.Tasks;
 26using UtilPack;
 27using TDataProducerResult = System.Threading.Tasks.ValueTask<System.Collections.Generic.IEnumerable<CBAM.NATS.NATSPublis
 28
 29namespace CBAM.NATS
 30{
 31   using TDataProducerFactory = Func<Func<TDataProducerResult>>;
 32
 33   public interface NATSConnection :
 34      Connection<NATSSubscribeStatement, NATSSubscribeStatementInformation, String, NATSMessage, NATSConnectionVendorFun
 35      Connection<NATSPublishStatement, NATSPublishStatementInformation, TDataProducerFactory, NATSPublishCompleted, NATS
 36      NATSConnectionObservability
 37   {
 38      Task<NATSMessage> RequestAsync( String subject, Byte[] data, Int32 offset, Int32 count );
 39   }
 40
 41   public interface NATSConnectionVendorFunctionality :
 42      ConnectionVendorFunctionality<NATSSubscribeStatement, String>,
 43      ConnectionVendorFunctionality<NATSPublishStatement, TDataProducerFactory>
 44   {
 45
 46   }
 47
 48   // TODO this interface might not be required after all - it might encourage 'bad' behaviour (i.e. the exact moment wh
 49   public interface NATSConnectionObservability
 50   {
 51      event GenericEventHandler<AfterSubscriptionSentArgs> AfterSubscriptionSent;
 52
 53      event GenericEventHandler<AfterPublishSentArgs> AfterPublishSent;
 54   }
 55
 56   public sealed class AfterSubscriptionSentArgs
 57   {
 58      public AfterSubscriptionSentArgs(
 59         String subject,
 60         String queue,
 61         Int64? autoUnsubscribeAfter
 62         )
 63      {
 64         this.Subject = ArgumentValidator.ValidateNotEmpty( nameof( subject ), subject );
 65         this.Queue = queue;
 66         this.AutoUnsubscribeAfter = autoUnsubscribeAfter;
 67      }
 68
 69      public String Subject { get; }
 70
 71      public String Queue { get; }
 72
 73      public Int64? AutoUnsubscribeAfter { get; }
 74   }
 75
 76   public sealed class AfterPublishSentArgs
 77   {
 78      public AfterPublishSentArgs(
 79         String subject,
 80         String replyTo
 81         )
 82      {
 83         this.Subject = ArgumentValidator.ValidateNotEmpty( nameof( subject ), subject );
 84         this.ReplyTo = replyTo;
 85      }
 86
 87      public String Subject { get; }
 88
 89      public String ReplyTo { get; }
 90   }
 91}
 92
 93public static partial class E_CBAM
 94{
 95   public static NATSSubscribeStatement CreateSubscribeStatementBuilder( this NATSConnectionVendorFunctionality vendorFu
 96   {
 297      return vendorFunctionality.CreateStatementBuilder( subject );
 98   }
 99
 100   public static NATSPublishStatement CreatePublishStatementBuilder( this NATSConnectionVendorFunctionality vendorFuncti
 101   {
 2102      return vendorFunctionality.CreateStatementBuilder( dataProducer );
 103   }
 104
 105   public static NATSSubscribeStatement CreateSubscribeStatementBuilder( this NATSConnection connection, String subject 
 106   {
 2107      return ( (Connection<NATSSubscribeStatement, NATSSubscribeStatementInformation, String, NATSMessage, NATSConnectio
 108   }
 109
 110   public static IAsyncEnumerable<NATSMessage> SubscribeAsync( this NATSConnection connection, String subject, Int64 aut
 111   {
 2112      var stmt = connection.CreateSubscribeStatementBuilder( subject );
 2113      if ( !String.IsNullOrEmpty( queue ) )
 114      {
 0115         stmt.Queue = queue;
 116      }
 2117      if ( autoUnsubscribeAfter > 0 )
 118      {
 1119         stmt.AutoUnsubscribeAfter = autoUnsubscribeAfter;
 120      }
 121
 2122      return connection.PrepareStatementForExecution( stmt );
 123   }
 124
 125   public static NATSPublishStatement CreatePublishStatementBuilder( this NATSConnection connection, Func<Func<TDataProd
 126   {
 2127      return ( (Connection<NATSPublishStatement, NATSPublishStatementInformation, Func<Func<TDataProducerResult>>, NATSP
 128   }
 129
 130   public static Task<NATSMessage> RequestAsync( this NATSConnection connection, String subject, Byte[] data )
 131   {
 1132      return connection.RequestAsync( subject, data, 0, data?.Length ?? 0 );
 133   }
 134
 135
 136   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithStaticDataProducerForWholeArray( this NATSConnection 
 137   {
 2138      return connection.PublishWithStaticDataProducer( subject, array, 0, array?.Length ?? 0, replySubject, repeatCount,
 139   }
 140
 141   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithStaticDataProducer( this NATSConnection connection, S
 142   {
 2143      var chunk = Enumerable.Repeat( new NATSPublishData( subject, array, offset, count, replySubject ), chunkCount );
 2144      return connection.PrepareStatementForExecution( connection.CreatePublishStatementBuilder( () =>
 2145      {
 4146         var remaining = repeatCount;
 4147         return () =>
 4148         {
 8149            if ( remaining > 0 )
 4150            {
 6151               var original = remaining;
 6152               remaining -= chunkCount;
 6153               return new TDataProducerResult( remaining >= 0 ? chunk : chunk.Take( (Int32) original ) );
 4154            }
 4155            else
 4156            {
 6157               return default;
 4158            }
 4159         };
 2160      } ) );
 161   }
 162
 163   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithDynamicSynchronousDataProducer( this NATSConnection c
 164   {
 0165      var hasMax = repeatCount >= 0;
 0166      return connection.PrepareStatementForExecution( connection.CreatePublishStatementBuilder( () =>
 0167      {
 0168         var remaining = repeatCount;
 0169         return () =>
 0170         {
 0171            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? new TDataProducerResult( producer() ) : defa
 0172         };
 0173      } ) );
 174   }
 175
 176   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithDynamicAsynchronousDataProducer( this NATSConnection 
 177   {
 0178      var hasMax = repeatCount >= 0;
 0179      return connection.PrepareStatementForExecution( connection.CreatePublishStatementBuilder( () =>
 0180      {
 0181         var remaining = repeatCount;
 0182         return async () =>
 0183         {
 0184            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? await producer() : default;
 0185         };
 0186      } ) );
 187   }
 188
 189   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithDynamicSynchronousDataProducer( this NATSConnection c
 190   {
 0191      var hasMax = repeatCount >= 0;
 0192      return connection.PrepareStatementForExecution( connection.CreatePublishStatementBuilder( () =>
 0193      {
 0194         var remaining = repeatCount;
 0195         return () =>
 0196         {
 0197            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? new TDataProducerResult( producer().Singleto
 0198         };
 0199      } ) );
 200   }
 201
 202   public static IAsyncEnumerable<NATSPublishCompleted> PublishWithDynamicAsynchronousDataProducer( this NATSConnection 
 203   {
 0204      var hasMax = repeatCount >= 0;
 0205      return connection.PrepareStatementForExecution( connection.CreatePublishStatementBuilder( () =>
 0206      {
 0207         var remaining = repeatCount;
 0208         return async () =>
 0209         {
 0210            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? ( await producer() ).Singleton() : default;
 0211         };
 0212      } ) );
 213   }
 214
 215}

/repo-dir/contents/Source/Code/CBAM.NATS/Message.cs

#LineLine coverage
 1/*
 2 * Copyright 2018 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using CBAM.NATS;
 19using System;
 20using System.Collections.Generic;
 21using System.Text;
 22using UtilPack;
 23
 24namespace CBAM.NATS
 25{
 26   public interface NATSMessage
 27   {
 28      String Subject { get; }
 29      Int64 SubscriptionID { get; }
 30
 31      String ReplyTo { get; }
 32
 33      Int32 DataLength { get; }
 34
 35      Int32 CopyDataTo( Byte[] array, Int32 offsetInMessage, Int32 offsetInArray, Int32 count = -1 );
 36
 37      Byte GetSingleByteAt( Int32 offsetInMessage );
 38   }
 39
 40   public interface NATSPublishCompleted
 41   {
 42
 43   }
 44}
 45
 46public static partial class E_CBAM
 47{
 48
 49   public static Byte[] CreateDataArray( this NATSMessage obj )
 50   {
 251      var len = obj.DataLength;
 52      Byte[] retVal;
 253      if ( len > 0 )
 54      {
 255         retVal = new Byte[obj.DataLength];
 256         obj.CopyDataTo( retVal, 0, 0 );
 257      }
 58      else
 59      {
 060         retVal = Empty<Byte>.Array;
 61      }
 62
 263      return retVal;
 64
 65   }
 66
 067   public static void CopyAllDataTo( this NATSMessage message, Byte[] array ) => message.CopyDataTo( array, 0, 0 );
 68}

/repo-dir/contents/Source/Code/CBAM.NATS/Statement.cs

#LineLine coverage
 1/*
 2 * Copyright 2018 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using CBAM.NATS;
 19using System;
 20using System.Collections.Generic;
 21using System.Linq;
 22using System.Text;
 23using System.Threading;
 24using System.Threading.Tasks;
 25using UtilPack;
 26
 27using TDataProducerResult = System.Threading.Tasks.ValueTask<System.Collections.Generic.IEnumerable<CBAM.NATS.NATSPublis
 28
 29namespace CBAM.NATS
 30{
 31
 32   using TDataProducerFactory = Func<Func<TDataProducerResult>>;
 33
 34   public interface NATSStatementInformation
 35   {
 36      String Subject { get; }
 37   }
 38
 39   public interface NATSPublishStatementInformation
 40   {
 41      //String ReplySubject { get; }
 42
 43      TDataProducerFactory DataProducerFactory { get; }
 44
 45   }
 46
 47   public interface NATSSubscribeStatementInformation : NATSStatementInformation
 48   {
 49      String Queue { get; }
 50
 51      Int64 AutoUnsubscribeAfter { get; }
 52
 53      Func<NATSMessage, Boolean> DynamicUnsubscription { get; }
 54   }
 55
 56   public interface NATSStatement : NATSStatementInformation
 57   {
 58      NATSStatementInformation NATSStatementInformation { get; }
 59   }
 60
 61   public interface NATSSubscribeStatement : NATSStatement, NATSSubscribeStatementInformation
 62   {
 63      new String Queue { get; set; }
 64
 65      new Int64 AutoUnsubscribeAfter { get; set; }
 66
 67      new Func<NATSMessage, Boolean> DynamicUnsubscription { get; set; }
 68
 69   }
 70
 71   public interface NATSPublishStatement : NATSPublishStatementInformation
 72   {
 73      NATSPublishStatementInformation NATSStatementInformation { get; }
 74
 75      new TDataProducerFactory DataProducerFactory { get; set; }
 76
 77
 78   }
 79
 80   public struct NATSPublishData
 81   {
 82      public NATSPublishData(
 83         String subject,
 84         Byte[] data,
 85         Int32 offset = -1,
 86         Int32 count = -1,
 87         String replySubject = null
 88         )
 89      {
 90         this.Subject = ArgumentValidator.ValidateNotEmpty( nameof( subject ), subject );
 91         this.Data = data;
 92         this.Offset = Math.Max( 0, offset );
 93         this.Count = count < 0 ? Math.Max( 0, ( data?.Length ?? 0 ) - this.Offset ) : count;
 94         this.ReplySubject = String.IsNullOrEmpty( replySubject ) ? null : replySubject;
 95      }
 96
 97      public String Subject { get; }
 98
 99      public String ReplySubject { get; }
 100
 101      public Byte[] Data { get; }
 102      public Int32 Offset { get; }
 103      public Int32 Count { get; }
 104
 105   }
 106}
 107
 108public static partial class E_CBAM
 109{
 110   public static NATSPublishStatement WithStaticDataProducerForWholeArray( this NATSPublishStatement statement, String s
 111   {
 0112      return statement.WithStaticDataProducer( subject, array, 0, array?.Length ?? 0, replySubject, repeatCount, chunkCo
 113   }
 114
 115   public static NATSPublishStatement WithStaticDataProducer( this NATSPublishStatement statement, String subject, Byte[
 116   {
 0117      var chunk = Enumerable.Repeat( new NATSPublishData( subject, array, offset, count, replySubject ), chunkCount );
 0118      statement.DataProducerFactory = () =>
 0119      {
 0120         var remaining = repeatCount;
 0121         return () =>
 0122         {
 0123            if ( remaining > 0 )
 0124            {
 0125               var original = remaining;
 0126               remaining -= chunkCount;
 0127               return new TDataProducerResult( remaining >= 0 ? chunk : chunk.Take( (Int32) original ) );
 0128            }
 0129            else
 0130            {
 0131               return default;
 0132            }
 0133         };
 0134      };
 0135      return statement;
 136   }
 137
 138   public static NATSPublishStatement WithDynamicSynchronousDataProducer( this NATSPublishStatement statement, Func<IEnu
 139   {
 0140      var hasMax = repeatCount >= 0;
 0141      statement.DataProducerFactory = () =>
 0142      {
 0143         var remaining = repeatCount;
 0144         return () =>
 0145         {
 0146            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? new TDataProducerResult( producer() ) : defa
 0147         };
 0148      };
 0149      return statement;
 150   }
 151
 152   public static NATSPublishStatement WithDynamicAsynchronousDataProducer( this NATSPublishStatement statement, Func<Tas
 153   {
 0154      var hasMax = repeatCount >= 0;
 0155      statement.DataProducerFactory = () =>
 0156      {
 0157         var remaining = repeatCount;
 0158         return async () =>
 0159         {
 0160            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? await producer() : default;
 0161         };
 0162      };
 163
 0164      return statement;
 165   }
 166
 167   public static NATSPublishStatement WithDynamicSynchronousDataProducer( this NATSPublishStatement statement, Func<NATS
 168   {
 0169      var hasMax = repeatCount >= 0;
 0170      statement.DataProducerFactory = () =>
 0171      {
 0172         var remaining = repeatCount;
 0173         return () =>
 0174         {
 0175            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? new TDataProducerResult( producer().Singleto
 0176         };
 0177      };
 0178      return statement;
 179   }
 180
 181   public static NATSPublishStatement WithDynamicAsynchronousDataProducer( this NATSPublishStatement statement, Func<Tas
 182   {
 0183      var hasMax = repeatCount >= 0;
 0184      statement.DataProducerFactory = () =>
 0185      {
 0186         var remaining = repeatCount;
 0187         return async () =>
 0188         {
 0189            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? ( await producer() ).Singleton() : default;
 0190         };
 0191      };
 192
 0193      return statement;
 194   }
 195}

Methods/Properties

CreateSubscribeStatementBuilder(CBAM.NATS.NATSConnectionVendorFunctionality,System.String)
CreatePublishStatementBuilder(CBAM.NATS.NATSConnectionVendorFunctionality,System.Func`1<System.Func`1<System.Threading.Tasks.ValueTask`1<System.Collections.Generic.IEnumerable`1<CBAM.NATS.NATSPublishData>>>>)
CreateSubscribeStatementBuilder(CBAM.NATS.NATSConnection,System.String)
SubscribeAsync(CBAM.NATS.NATSConnection,System.String,System.Int64,System.String)
CreatePublishStatementBuilder(CBAM.NATS.NATSConnection,System.Func`1<System.Func`1<System.Threading.Tasks.ValueTask`1<System.Collections.Generic.IEnumerable`1<CBAM.NATS.NATSPublishData>>>>)
RequestAsync(CBAM.NATS.NATSConnection,System.String,System.Byte[])
PublishWithStaticDataProducerForWholeArray(CBAM.NATS.NATSConnection,System.String,System.Byte[],System.String,System.Int64,System.Int32)
PublishWithStaticDataProducer(CBAM.NATS.NATSConnection,System.String,System.Byte[],System.Int32,System.Int32,System.String,System.Int64,System.Int32)
PublishWithDynamicSynchronousDataProducer(CBAM.NATS.NATSConnection,System.Func`1<System.Collections.Generic.IEnumerable`1<CBAM.NATS.NATSPublishData>>,System.Int64)
PublishWithDynamicAsynchronousDataProducer(CBAM.NATS.NATSConnection,System.Func`1<System.Threading.Tasks.Task`1<System.Collections.Generic.IEnumerable`1<CBAM.NATS.NATSPublishData>>>,System.Int64)
PublishWithDynamicSynchronousDataProducer(CBAM.NATS.NATSConnection,System.Func`1<CBAM.NATS.NATSPublishData>,System.Int64)
PublishWithDynamicAsynchronousDataProducer(CBAM.NATS.NATSConnection,System.Func`1<System.Threading.Tasks.Task`1<CBAM.NATS.NATSPublishData>>,System.Int64)
CreateDataArray(CBAM.NATS.NATSMessage)
CopyAllDataTo(CBAM.NATS.NATSMessage,System.Byte[])
WithStaticDataProducerForWholeArray(CBAM.NATS.NATSPublishStatement,System.String,System.Byte[],System.String,System.Int64,System.Int32)
WithStaticDataProducer(CBAM.NATS.NATSPublishStatement,System.String,System.Byte[],System.Int32,System.Int32,System.String,System.Int64,System.Int32)
WithDynamicSynchronousDataProducer(CBAM.NATS.NATSPublishStatement,System.Func`1<System.Collections.Generic.IEnumerable`1<CBAM.NATS.NATSPublishData>>,System.Int64)
WithDynamicAsynchronousDataProducer(CBAM.NATS.NATSPublishStatement,System.Func`1<System.Threading.Tasks.Task`1<System.Collections.Generic.IEnumerable`1<CBAM.NATS.NATSPublishData>>>,System.Int64)
WithDynamicSynchronousDataProducer(CBAM.NATS.NATSPublishStatement,System.Func`1<CBAM.NATS.NATSPublishData>,System.Int64)
WithDynamicAsynchronousDataProducer(CBAM.NATS.NATSPublishStatement,System.Func`1<System.Threading.Tasks.Task`1<CBAM.NATS.NATSPublishData>>,System.Int64)