Skip to content

Commit 673bd45

Browse files
committed
Add Go and Sleep functions
1 parent 1e1c862 commit 673bd45

4 files changed

Lines changed: 207 additions & 46 deletions

File tree

benchmark_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package async_test
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/b97tsk/async"
10+
)
11+
12+
func BenchmarkAsyncGo(b *testing.B) {
13+
var wg sync.WaitGroup // For keeping track of goroutines.
14+
15+
var myExecutor async.Executor
16+
17+
myExecutor.Autorun(myExecutor.Run)
18+
19+
b.ReportAllocs()
20+
21+
for b.Loop() {
22+
myExecutor.SpawnBlocking(async.Go(
23+
context.Background(), &wg,
24+
func(context.Context) async.Task {
25+
return nil
26+
},
27+
))
28+
}
29+
30+
wg.Wait()
31+
}
32+
33+
func BenchmarkAsyncSleep(b *testing.B) {
34+
var wg sync.WaitGroup // For keeping track of goroutines.
35+
36+
var myExecutor async.Executor
37+
38+
myExecutor.Autorun(myExecutor.Run)
39+
40+
b.ReportAllocs()
41+
42+
for b.Loop() {
43+
myExecutor.SpawnBlocking(async.Sleep(context.Background(), &wg, 0))
44+
}
45+
46+
wg.Wait()
47+
}
48+
49+
func BenchmarkTimeAfterFunc(b *testing.B) {
50+
var wg sync.WaitGroup // For keeping track of goroutines.
51+
52+
var myExecutor async.Executor
53+
54+
myExecutor.Autorun(myExecutor.Run)
55+
56+
sleep := func(d time.Duration) async.Task {
57+
return func(co *async.Coroutine) async.Result {
58+
var sig async.Signal
59+
wg.Add(1)
60+
tm := time.AfterFunc(d, func() {
61+
defer wg.Done()
62+
myExecutor.Spawn(func(co *async.Coroutine) async.Result {
63+
sig.Notify()
64+
return co.End()
65+
})
66+
})
67+
co.CleanupFunc(func() {
68+
if tm.Stop() {
69+
wg.Done()
70+
}
71+
})
72+
return co.Await(&sig).End()
73+
}
74+
}
75+
76+
b.ReportAllocs()
77+
78+
for b.Loop() {
79+
myExecutor.SpawnBlocking(sleep(0))
80+
}
81+
82+
wg.Wait()
83+
}

coroutine.go

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package async
22

33
import (
4+
"context"
45
"fmt"
56
"iter"
7+
"runtime/debug"
68
"slices"
79
"sync"
10+
"time"
811
)
912

1013
type action int
@@ -802,14 +805,28 @@ func (co *Coroutine) raise() Result {
802805
return Result{action: doRaise}
803806
}
804807

805-
// Panic returns a [Result] that will cause co to behave like there's a panic.
808+
// Panic returns a [Result] that will cause co to behave as there's a panic.
806809
// Unlike the built-in panic function, Panic leaves no stack trace behind.
807810
// Please use with caution.
808811
func (co *Coroutine) Panic(v any) Result {
809812
if v == nil {
810813
panic("async: Panic called with nil argument")
811814
}
812-
co.ps.push(v, nil)
815+
co.ps.Push(v, nil)
816+
co.flag |= flagPanicking
817+
return Result{action: doRaise}
818+
}
819+
820+
// PanicWithStackTrace returns a [Result] that will cause co to behave as
821+
// there's a panic.
822+
// PanicWithStackTrace takes an extra stacktrace argument, which makes it
823+
// very much like the built-in panic function.
824+
// Useful when one wants to propagate panics from goroutines.
825+
func (co *Coroutine) PanicWithStackTrace(v any, stacktrace []byte) Result {
826+
if v == nil {
827+
panic("async: PanicWithStackTrace called with nil argument")
828+
}
829+
co.ps.Push(v, stacktrace)
813830
co.flag |= flagPanicking
814831
return Result{action: doRaise}
815832
}
@@ -1292,3 +1309,77 @@ func MergeSeq(concurrency int, seq iter.Seq[Task]) Task {
12921309
}).End()
12931310
}
12941311
}
1312+
1313+
// A Goer is for spawning goroutines and keeping track of them.
1314+
//
1315+
// For go1.25 and later, a [sync.WaitGroup] would satisfy this interface.
1316+
type Goer interface {
1317+
Go(f func())
1318+
}
1319+
1320+
// Go returns a [Task] that uses g to spawn a goroutine to run f, which takes
1321+
// a [context.Context] as argument that will be canceled when the running
1322+
// coroutine or ctx is canceled.
1323+
// The return value of f, a [Task], if non-nil, will be run after f returns.
1324+
// To cancel Go, f must return a [Task] that terminates Go, such as [Exit].
1325+
// If f panics, Go propagates it.
1326+
// Go completes only when everything is settled.
1327+
func Go(ctx context.Context, g Goer, f func(ctx context.Context) Task) Task {
1328+
return func(co *Coroutine) Result {
1329+
ctx, cancel := context.WithCancel(ctx)
1330+
co.CleanupFunc(cancel)
1331+
var state struct {
1332+
Signal
1333+
done bool
1334+
t Task
1335+
v any
1336+
s []byte
1337+
}
1338+
t := func(co *Coroutine) Result {
1339+
switch {
1340+
case state.done:
1341+
if state.t != nil {
1342+
return co.Transition(state.t)
1343+
}
1344+
if state.v != nil {
1345+
return co.PanicWithStackTrace(state.v, state.s)
1346+
}
1347+
if co.Canceled() && !co.NonCancelable() {
1348+
return co.Exit()
1349+
}
1350+
return co.End()
1351+
case co.Canceled():
1352+
return co.HardYield(&state)
1353+
default:
1354+
state.done = true
1355+
state.Notify()
1356+
return co.End()
1357+
}
1358+
}
1359+
e, w := co.Executor(), co.Weight()
1360+
g.Go(func() {
1361+
defer func() {
1362+
if v := recover(); v != nil {
1363+
state.v, state.s = v, debug.Stack()
1364+
}
1365+
e.SpawnWeighted(w, t)
1366+
}()
1367+
state.t = f(ctx)
1368+
})
1369+
return co.SoftAwait(&state).Then(t)
1370+
}
1371+
}
1372+
1373+
// Sleep returns a [Task] that awaits until a period of time elapses, and then
1374+
// ends.
1375+
// When ctx is canceled, the coroutine that runs Sleep exits.
1376+
func Sleep(ctx context.Context, g Goer, d time.Duration) Task {
1377+
return Go(ctx, g, func(ctx context.Context) Task {
1378+
select {
1379+
case <-ctx.Done():
1380+
return Exit()
1381+
case <-time.After(d):
1382+
return nil
1383+
}
1384+
})
1385+
}

example_test.go

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,29 +1033,12 @@ func ExampleMergeSeq() {
10331033

10341034
myExecutor.Autorun(func() { wg.Go(myExecutor.Run) })
10351035

1036-
sleep := func(d time.Duration) async.Task {
1037-
return func(co *async.Coroutine) async.Result {
1038-
var sig async.Signal
1039-
wg.Add(1) // Keep track of timers too.
1040-
tm := time.AfterFunc(d, func() {
1041-
defer wg.Done()
1042-
myExecutor.Spawn(async.Do(sig.Notify))
1043-
})
1044-
co.CleanupFunc(func() {
1045-
if tm.Stop() {
1046-
wg.Done()
1047-
}
1048-
})
1049-
return co.Await(&sig).End()
1050-
}
1051-
}
1052-
10531036
myExecutor.Spawn(async.MergeSeq(3, func(yield func(async.Task) bool) {
10541037
defer fmt.Println("done")
10551038
for n := 1; n <= 6; n++ {
10561039
d := time.Duration(n*100) * time.Millisecond
10571040
f := func() { fmt.Println(n) }
1058-
t := sleep(d).Then(async.Do(f))
1041+
t := async.Sleep(context.Background(), &wg, d).Then(async.Do(f))
10591042
if !yield(t) {
10601043
return
10611044
}
@@ -1066,13 +1049,13 @@ func ExampleMergeSeq() {
10661049
fmt.Println("--- SEPARATOR ---")
10671050

10681051
myExecutor.Spawn(async.Select(
1069-
sleep(1000*time.Millisecond), // Cancel the following task after a period of time.
1052+
async.Sleep(context.Background(), &wg, 1000*time.Millisecond), // Cancel the following task after 1s.
10701053
async.MergeSeq(3, func(yield func(async.Task) bool) {
10711054
defer fmt.Println("done")
10721055
for n := 1; ; n++ { // Infinite loop.
10731056
d := time.Duration(n*100) * time.Millisecond
10741057
f := func() { fmt.Println(n) }
1075-
t := sleep(d).Then(async.Do(f))
1058+
t := async.Sleep(context.Background(), &wg, d).Then(async.Do(f))
10761059
if !yield(t) {
10771060
return
10781061
}
@@ -1124,23 +1107,6 @@ func Example_panicAndRecover() {
11241107
})
11251108
})
11261109

1127-
sleep := func(d time.Duration) async.Task {
1128-
return func(co *async.Coroutine) async.Result {
1129-
var sig async.Signal
1130-
wg.Add(1) // Keep track of timers too.
1131-
tm := time.AfterFunc(d, func() {
1132-
defer wg.Done()
1133-
myExecutor.Spawn(async.Do(sig.Notify))
1134-
})
1135-
co.CleanupFunc(func() {
1136-
if tm.Stop() {
1137-
wg.Done()
1138-
}
1139-
})
1140-
return co.Await(&sig).End()
1141-
}
1142-
}
1143-
11441110
recover := func(co *async.Coroutine) async.Result {
11451111
fmt.Println(co.Recover())
11461112
return co.End()
@@ -1178,7 +1144,7 @@ func Example_panicAndRecover() {
11781144
async.Defer(recover),
11791145
func(co *async.Coroutine) async.Result {
11801146
co.Spawn(async.Block(
1181-
sleep(100*time.Millisecond),
1147+
async.Sleep(context.Background(), &wg, 100*time.Millisecond),
11821148
async.Do(func() { panic("A") }), // Panics after 100ms.
11831149
))
11841150
co.Spawn(async.Block(
@@ -1282,6 +1248,24 @@ func Example_panicAndRecover() {
12821248
wg.Wait()
12831249
fmt.Println("--- SEPARATOR ---")
12841250

1251+
myExecutor.Spawn(async.Join(
1252+
async.Block(
1253+
async.Defer(recover),
1254+
async.Go(context.Background(), &wg, func(_ context.Context) async.Task {
1255+
panic("A")
1256+
}),
1257+
),
1258+
async.Block(
1259+
async.Defer(recover),
1260+
async.Go(context.Background(), &wg, func(_ context.Context) async.Task {
1261+
return async.Panic("A")
1262+
}),
1263+
),
1264+
))
1265+
1266+
wg.Wait()
1267+
fmt.Println("--- SEPARATOR ---")
1268+
12851269
myExecutor.Spawn(func(_ *async.Coroutine) async.Result {
12861270
panic(dummyError) // Unrecovered panics get repanicked when (*async.Executor).Run returns.
12871271
})
@@ -1314,5 +1298,8 @@ func Example_panicAndRecover() {
13141298
// B
13151299
// <nil>
13161300
// --- SEPARATOR ---
1301+
// A
1302+
// A
1303+
// --- SEPARATOR ---
13171304
// dummy error recovered!
13181305
}

panicstack.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,18 @@ func (ps *panicstack) Try(f func()) (ok bool) {
2727
if _, ok := v.(dummy); ok {
2828
return // Ignore dummy values.
2929
}
30-
ps.push(v, debug.Stack())
30+
ps.Push(v, debug.Stack())
3131
}
3232
}()
3333
f()
3434
return true
3535
}
3636

37-
func (ps *panicstack) push(v any, stack []byte) {
37+
func (ps *panicstack) Push(v any, stacktrace []byte) {
3838
s := *ps
3939
n := len(s)
4040
repanicked := n != 0 && equal(v, s[n-1].value)
41-
s = append(s, panicitem{v, stack, repanicked, false})
41+
s = append(s, panicitem{v, stacktrace, repanicked, false})
4242
*ps = s
4343
}
4444

@@ -49,7 +49,7 @@ func equal(a, b any) bool {
4949

5050
type panicitem struct {
5151
value any
52-
stack []byte
52+
stacktrace []byte
5353
repanicked bool
5454
recovered bool
5555
}
@@ -72,9 +72,9 @@ func (pv *panicvalue) Error() string {
7272
case p.recovered:
7373
b.WriteString(" (recovered)")
7474
}
75-
if p.stack != nil {
75+
if p.stacktrace != nil {
7676
b.WriteString("\n\n")
77-
b.Write(p.stack)
77+
b.Write(p.stacktrace)
7878
}
7979
}
8080
return b.String()

0 commit comments

Comments
 (0)