@@ -139,6 +139,52 @@ public async Task<string> EnqueueBatchAsync(string queue, string tenant, List<Ta
139139 return batchId ;
140140 }
141141
142+ public async Task < string > EnqueueTaskAsync ( string queue , string tenant , TaskData task )
143+ {
144+ var taskId = Guid . NewGuid ( ) . ToString ( "D" ) ;
145+
146+ var q = task . Queue ?? queue ;
147+
148+ var db = _redis . GetDatabase ( ) ;
149+ var tra = db . CreateTransaction ( ) ;
150+ #pragma warning disable CS4014
151+
152+ tra . HashSetAsync ( Prefix ( $ "task:{ taskId } ") , [
153+ new HashEntry ( "id" , taskId ) ,
154+ new HashEntry ( "tenant" , tenant ) ,
155+ new HashEntry ( "data" , task . Data ) ,
156+ new HashEntry ( "topic" , task . Topic ) ,
157+ new HashEntry ( "queue" , q ) ,
158+ new HashEntry ( "retries" , task . Retries ?? _options . Retries )
159+ ] ) ;
160+
161+ if ( task . DelayUntil . HasValue )
162+ tra . SortedSetAddAsync ( Prefix ( $ "queue:{ queue } :pushback") , taskId ,
163+ task . DelayUntil . Value . ToUnixTimeSeconds ( ) ) ;
164+ else
165+ {
166+ tra . SortedSetAddAsync ( Prefix ( "queues" ) , q , DateTimeOffset . UtcNow . ToUnixTimeSeconds ( ) ) ;
167+ tra . ListLeftPushAsync ( Prefix ( $ "queue:{ q } ") , taskId ) ;
168+ tra . PublishAsync ( RedisChannel . Literal ( Prefix ( $ "queue:{ q } :event") ) , "fetch" ) ;
169+ }
170+
171+ #pragma warning enable CS4014
172+ await tra . ExecuteAsync ( ) . ConfigureAwait ( false ) ;
173+ return taskId ;
174+ }
175+
176+ public Task < string > EnqueueTaskAsync ( string queue , string tenant , Expression < Func < Task > > methodCall , DateTimeOffset ? delayUntil = null , int ? retries = null )
177+ {
178+ return EnqueueTaskAsync ( queue , tenant , new TaskData
179+ {
180+ Topic = "internal:expression:v1" ,
181+ Data = _options . ExpressionExecutor . Serialize ( methodCall ) ,
182+ DelayUntil = delayUntil ,
183+ Queue = queue ,
184+ Retries = retries
185+ } ) ;
186+ }
187+
142188 public async Task < bool > AppendBatchAsync ( string queue , string tenant , string batchId , List < TaskData > tasks )
143189 {
144190 var db = _redis . GetDatabase ( ) ;
@@ -306,6 +352,29 @@ public async Task<bool> FetchAsync()
306352 _tasks . TryAdd ( info . TaskId , info ) ;
307353 _actionBlock . Post ( info ) ;
308354 }
355+ else // single task
356+ {
357+ var linkedCts = CancellationTokenSource . CreateLinkedTokenSource ( _shutdown . Token ) ;
358+
359+ var info = new TaskContext
360+ {
361+ Processor = this ,
362+ TaskId = j ,
363+ Tenant = ( string ) taskData [ "tenant" ] ! ,
364+ Queue = ( string ? ) taskData [ "queue" ] ,
365+ CancelSource = linkedCts ,
366+ CancelToken = linkedCts . Token ,
367+ Topic = ( string ? ) taskData [ "topic" ] ?? string . Empty ,
368+ Data = ( byte [ ] ) taskData [ "data" ] ! ,
369+ BatchId = null ,
370+ Retries = ( int ? ) taskData [ "retries" ] ,
371+ IsCancellation = false ,
372+ IsContinuation = false
373+ } ;
374+
375+ _tasks . TryAdd ( info . TaskId , info ) ;
376+ _actionBlock . Post ( info ) ;
377+ }
309378
310379 return true ;
311380 }
@@ -575,7 +644,7 @@ private async Task Process(TaskContext task)
575644 tra . HashIncrementAsync ( Prefix ( $ "batch:{ task . BatchId } ") , "failed" ,
576645 flags : CommandFlags . FireAndForget ) ;
577646 //remaining = tra.HashDecrementAsync(Prefix($"batch:{task.BatchId}"), "remaining");
578- if ( _options . Deadletter && ( ! string . IsNullOrEmpty ( task . BatchId ) || _options . DeadletterUniqueSchedules ) )
647+ if ( _options . Deadletter && ( string . IsNullOrEmpty ( task . ScheduleId ) || _options . DeadletterSchedules ) )
579648 tra . SortedSetAddAsync ( Prefix ( $ "queue:{ task . Queue } :deadletter") , task . TaskId ,
580649 DateTimeOffset . UtcNow . ToUnixTimeSeconds ( ) ) ;
581650 else
0 commit comments