Summary

Class:CBAM.NATS.NATSPublishData
Assembly:CBAM.NATS
File(s):/repo-dir/contents/Source/Code/CBAM.NATS/Statement.cs
Covered lines:11
Uncovered lines:0
Coverable lines:11
Total lines:195
Line coverage:100%
Branch coverage:50%

Coverage History

Metrics

MethodCyclomatic complexity NPath complexity Sequence coverage Branch coverage
.ctor(...)601%0.5%

File(s)

/repo-dir/contents/Source/Code/CBAM.NATS/Statement.cs

#LineLine coverage
 1/*
 2 * Copyright 2018 Stanislav Muhametsin. All rights Reserved.
 3 *
 4 * Licensed  under the  Apache License,  Version 2.0  (the "License");
 5 * you may not use  this file  except in  compliance with the License.
 6 * You may obtain a copy of the License at
 7 *
 8 *   http://www.apache.org/licenses/LICENSE-2.0
 9 *
 10 * Unless required by applicable law or agreed to in writing, software
 11 * distributed  under the  License is distributed on an "AS IS" BASIS,
 12 * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
 13 * implied.
 14 *
 15 * See the License for the specific language governing permissions and
 16 * limitations under the License.
 17 */
 18using CBAM.NATS;
 19using System;
 20using System.Collections.Generic;
 21using System.Linq;
 22using System.Text;
 23using System.Threading;
 24using System.Threading.Tasks;
 25using UtilPack;
 26
 27using TDataProducerResult = System.Threading.Tasks.ValueTask<System.Collections.Generic.IEnumerable<CBAM.NATS.NATSPublis
 28
 29namespace CBAM.NATS
 30{
 31
 32   using TDataProducerFactory = Func<Func<TDataProducerResult>>;
 33
 34   public interface NATSStatementInformation
 35   {
 36      String Subject { get; }
 37   }
 38
 39   public interface NATSPublishStatementInformation
 40   {
 41      //String ReplySubject { get; }
 42
 43      TDataProducerFactory DataProducerFactory { get; }
 44
 45   }
 46
 47   public interface NATSSubscribeStatementInformation : NATSStatementInformation
 48   {
 49      String Queue { get; }
 50
 51      Int64 AutoUnsubscribeAfter { get; }
 52
 53      Func<NATSMessage, Boolean> DynamicUnsubscription { get; }
 54   }
 55
 56   public interface NATSStatement : NATSStatementInformation
 57   {
 58      NATSStatementInformation NATSStatementInformation { get; }
 59   }
 60
 61   public interface NATSSubscribeStatement : NATSStatement, NATSSubscribeStatementInformation
 62   {
 63      new String Queue { get; set; }
 64
 65      new Int64 AutoUnsubscribeAfter { get; set; }
 66
 67      new Func<NATSMessage, Boolean> DynamicUnsubscription { get; set; }
 68
 69   }
 70
 71   public interface NATSPublishStatement : NATSPublishStatementInformation
 72   {
 73      NATSPublishStatementInformation NATSStatementInformation { get; }
 74
 75      new TDataProducerFactory DataProducerFactory { get; set; }
 76
 77
 78   }
 79
 80   public struct NATSPublishData
 81   {
 82      public NATSPublishData(
 83         String subject,
 84         Byte[] data,
 85         Int32 offset = -1,
 86         Int32 count = -1,
 87         String replySubject = null
 88         )
 89      {
 390         this.Subject = ArgumentValidator.ValidateNotEmpty( nameof( subject ), subject );
 391         this.Data = data;
 392         this.Offset = Math.Max( 0, offset );
 393         this.Count = count < 0 ? Math.Max( 0, ( data?.Length ?? 0 ) - this.Offset ) : count;
 394         this.ReplySubject = String.IsNullOrEmpty( replySubject ) ? null : replySubject;
 395      }
 96
 397      public String Subject { get; }
 98
 399      public String ReplySubject { get; }
 100
 3101      public Byte[] Data { get; }
 3102      public Int32 Offset { get; }
 3103      public Int32 Count { get; }
 104
 105   }
 106}
 107
 108public static partial class E_CBAM
 109{
 110   public static NATSPublishStatement WithStaticDataProducerForWholeArray( this NATSPublishStatement statement, String s
 111   {
 112      return statement.WithStaticDataProducer( subject, array, 0, array?.Length ?? 0, replySubject, repeatCount, chunkCo
 113   }
 114
 115   public static NATSPublishStatement WithStaticDataProducer( this NATSPublishStatement statement, String subject, Byte[
 116   {
 117      var chunk = Enumerable.Repeat( new NATSPublishData( subject, array, offset, count, replySubject ), chunkCount );
 118      statement.DataProducerFactory = () =>
 119      {
 120         var remaining = repeatCount;
 121         return () =>
 122         {
 123            if ( remaining > 0 )
 124            {
 125               var original = remaining;
 126               remaining -= chunkCount;
 127               return new TDataProducerResult( remaining >= 0 ? chunk : chunk.Take( (Int32) original ) );
 128            }
 129            else
 130            {
 131               return default;
 132            }
 133         };
 134      };
 135      return statement;
 136   }
 137
 138   public static NATSPublishStatement WithDynamicSynchronousDataProducer( this NATSPublishStatement statement, Func<IEnu
 139   {
 140      var hasMax = repeatCount >= 0;
 141      statement.DataProducerFactory = () =>
 142      {
 143         var remaining = repeatCount;
 144         return () =>
 145         {
 146            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? new TDataProducerResult( producer() ) : defa
 147         };
 148      };
 149      return statement;
 150   }
 151
 152   public static NATSPublishStatement WithDynamicAsynchronousDataProducer( this NATSPublishStatement statement, Func<Tas
 153   {
 154      var hasMax = repeatCount >= 0;
 155      statement.DataProducerFactory = () =>
 156      {
 157         var remaining = repeatCount;
 158         return async () =>
 159         {
 160            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? await producer() : default;
 161         };
 162      };
 163
 164      return statement;
 165   }
 166
 167   public static NATSPublishStatement WithDynamicSynchronousDataProducer( this NATSPublishStatement statement, Func<NATS
 168   {
 169      var hasMax = repeatCount >= 0;
 170      statement.DataProducerFactory = () =>
 171      {
 172         var remaining = repeatCount;
 173         return () =>
 174         {
 175            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? new TDataProducerResult( producer().Singleto
 176         };
 177      };
 178      return statement;
 179   }
 180
 181   public static NATSPublishStatement WithDynamicAsynchronousDataProducer( this NATSPublishStatement statement, Func<Tas
 182   {
 183      var hasMax = repeatCount >= 0;
 184      statement.DataProducerFactory = () =>
 185      {
 186         var remaining = repeatCount;
 187         return async () =>
 188         {
 189            return !hasMax || Interlocked.Decrement( ref remaining ) >= 0 ? ( await producer() ).Singleton() : default;
 190         };
 191      };
 192
 193      return statement;
 194   }
 195}