English | 简体中文 | Español | 日本語 | Français
Abstract completion event driven dispatch engine for non-blocking I/O stacks.
In a proactor model, I/O operations are submitted to the kernel and their completions arrive asynchronously. The application must correlate each completion back to the computation that requested it, resume that computation, and handle the full range of outcomes — success, partial progress, backpressure, and failure.
takt provides this dispatch algebra as an abstract layer over the kont effect system. A Dispatcher evaluates one algebraic effect at a time, classifying the result according to the iox outcome algebra. A Backend submits operations to an asynchronous engine (e.g., io_uring) and polls for completions. The Loop event loop ties them together: it submits computations, polls the backend, correlates completions by token, and resumes the suspended continuations.
Two equivalent APIs: Cont (closure-based, straightforward composition) and Expr (frame-based, lower allocation overhead in hot paths).
go get code.hybscloud.com/taktRequires Go 1.26+.
Every dispatched operation returns an iox outcome. The dispatcher and stepping API handle each case:
| Outcome | Meaning | Dispatcher | Stepping API |
|---|---|---|---|
nil |
completed | resume | resume, return nil |
ErrMore |
progress, more expected | resume | resume, return ErrMore |
ErrWouldBlock |
no progress | wait | return suspension to caller |
| other | infrastructure failure | panic | return error to caller |
A Dispatcher maps each algebraic effect to a concrete I/O operation and returns the result with an iox outcome.
type myDispatcher struct{ /* ... */ }
func (d *myDispatcher) Dispatch(op kont.Operation) (kont.Resumed, error) {
// dispatch op, return (value, nil) or (nil, iox.ErrWouldBlock)
}Exec and ExecExpr run a computation to completion, synchronously waiting when the dispatcher yields iox.ErrWouldBlock.
result := takt.Exec(d, computation) // Cont-world
result := takt.ExecExpr(d, exprComputation) // Expr-worldFor proactor event loops (e.g., io_uring), Step and Advance evaluate one effect at a time. When the dispatcher yields iox.ErrWouldBlock, the suspension is returned to the caller, letting the event loop reschedule.
result, susp := takt.Step[int](exprComputation)
if susp != nil {
var err error
result, susp, err = takt.Advance(d, susp)
if iox.IsWouldBlock(err) {
return susp // yield to event loop, reschedule when ready
}
}
// result is the final valueCompose dispatcher effects with error effects. Throw eagerly short-circuits the computation and discards the pending suspension.
either := takt.ExecError[string](d, computation)
// Right on success, Left on Throw
// Stepping with errors
either, susp := takt.StepError[string, int](exprComputation)
if susp != nil {
var err error
either, susp, err = takt.AdvanceError[string](d, susp)
if iox.IsWouldBlock(err) {
return susp // yield to event loop, reschedule when ready
}
}A Loop drives computations through a Backend. It submits operations, polls for completions, correlates them by Token, and resumes the suspended continuations.
maxCompletions in NewLoop must be greater than zero.
Backend.Poll([]Completion) (int, error) reports both the number of ready completions and any infrastructure poll
failure. Loop treats iox.ErrWouldBlock from Poll as an idle tick rather than a terminal error.
When a completion carries iox.ErrWouldBlock, the loop resubmits the same operation under an affine suspension
lifecycle. If an iox.ErrMore (multishot) completion would resume into a new suspended effect, Poll / Run return
ErrUnsupportedMultishot.
loop := takt.NewLoop[*myBackend, int](backend, 64)
// Submit computations
loop.SubmitExpr(exprComputation1)
loop.SubmitExpr(exprComputation2)
loop.Submit(contComputation) // Cont-world
// Drive all to completion
results, err := loop.Run()Dispatcher[D Dispatcher[D]]— F-bounded dispatch interfaceExec[D, R](d D, m kont.Eff[R]) R— blocking Cont-world evaluationExecExpr[D, R](d D, m kont.Expr[R]) R— blocking Expr-world evaluation
Step[R](m kont.Expr[R]) (R, *kont.Suspension[R])— evaluate to first suspensionAdvance[D, R](d D, susp *kont.Suspension[R]) (R, *kont.Suspension[R], error)— dispatch one operation
ExecError[E, D, R](d D, m kont.Eff[R]) kont.Either[E, R]— blocking with errorsExecErrorExpr[E, D, R](d D, m kont.Expr[R]) kont.Either[E, R]— Expr-world with errorsStepError[E, R](m kont.Expr[R]) (kont.Either[E, R], *kont.Suspension[kont.Either[E, R]])— step with errorsAdvanceError[E, D, R](d D, susp *kont.Suspension[kont.Either[E, R]]) (kont.Either[E, R], *kont.Suspension[kont.Either[E, R]], error)— advance with errors
Backend[B Backend[B]]— F-bounded async submit/poll interfaceToken— submission-completion correlation (uint64)Completion—{Token, Value kont.Resumed, Err error}NewLoop[B, R](b B, maxCompletions int) *Loop[B, R]— create event loop (maxCompletions > 0)(*Loop[B, R]).SubmitExpr(m kont.Expr[R]) (R, bool, error)— step and submit Expr(*Loop[B, R]).Submit(m kont.Eff[R]) (R, bool, error)— step and submit Cont(*Loop[B, R]).Poll() ([]R, error)— poll and dispatch completions(*Loop[B, R]).Run() ([]R, error)— drive all to completion(*Loop[B, R]).Pending() int— count pending operationsErrUnsupportedMultishot— multishot completion cannot suspend on a new effect
Reify[A](kont.Eff[A]) kont.Expr[A]— Cont → ExprReflect[A](kont.Expr[A]) kont.Eff[A]— Expr → Cont
- G. D. Plotkin and M. Pretnar. "Handlers of Algebraic Effects." In Proc. ESOP, 2009.
- T. Uustalu and V. Vene. "Comonadic Notions of Computation." In ENTCS 203(5), 2008.
- D. Ahman and A. Bauer. "Runners in Action." In Proc. ESOP, 2020.
MIT License. See LICENSE for details.
©2026 Hayabusa Cloud Co., Ltd.