Note
State machines communicate through states.
asyncmachine-go is a distributed workflow engine (technically a pathless control-flow graph with a consensus), which implements AOP and actor model through a clock-based state-machine. It features atomic transitions, relations, transparent RPC, TUI debugger, telemetry, REPL, selective distribution, diagrams, and WASM support.
As a control flow library, it decides about running of predefined bits of code (transition handlers) - their order and which ones to run, according to currently active states (flags). Thanks to a novel state machine, the number of handlers can be minimized while maximizing scenario coverage. It's lightweight, fault-tolerant by design, has rule-based mutations, and can target virtually any step-in-time, in any workflow. It's a low-level tool with acceptable performance.
asyncmachine-go takes care of context, select, and panic, while allowing for graph-structured concurrency
with goroutine cancelation. The history log and relations have
vector formats. It aims to create autonomous workflows with organic control flow and stateful APIs.
Note
git clone https://github.com/pancsta/asyncmachine-go.git
- binary flag
- node with relations
- AOP aspect
- logical clock
- subscription topic
- multiple methods
- metric
- trace
- lock
- breakpoint
Besides the main use-case of workflows, it can be used for stateful applications of any size - daemons, UIs, configs, bots, firewalls, synchronization consensus, games, smart graphs, microservice orchestration, robots, contracts, streams, DI containers, message broking, test scenarios, simulators, as well as "real-time" systems which rely on instant cancelation.
Note
Flow is state, and state is flow, in a graph.
Minimal - an untyped definition of 2 states and 1 relation, then 1 mutation and a check.
import am "github.com/pancsta/asyncmachine-go/pkg/machine"
// ...
mach := am.New(nil, am.Schema{
"Foo": {Require: am.S{"Bar"}},
"Bar": {},
}, nil)
mach.Add1("Foo", nil)
mach.Is1("Foo") // falseComplicated - wait on a multi state (event) and the Ready state with a 1s timeout, then mutate with typed args, on top of a state context.
// state ctx is an expiration ctx
ctx := client.Mach.NewStateCtx(ssC.WorkerReady)
// clock-based subscription
whenPayload := client.Mach.WhenTicks(ssC.WorkerPayload, 1, ctx)
// state mutation
client.RpcWorker.NetMach.Add1(ssW.WorkRequested, Pass(&A{
Input: 2}))
// WaitFor* wraps select statements
err := amhelp.WaitForAll(ctx, time.Second,
// post-mutation subscription
mach.When1(ss.BasicStatesDef.Ready, nil),
// pre-mutation subscription
whenPayload)
// check cancelation
if ctx.Err() != nil {
return // state ctx expired
}
// check error
if err != nil {
// error state mutation
client.Mach.AddErr(err, nil)
return // no err required
}
// client/WorkerPayload and mach/Ready activatedNote
Clock-based navigation in time.
Handlers - Aspect Oriented transition handlers.
// can Foo activate?
func (h *Handlers) FooEnter(e *am.Event) bool {
return true
}
// with Foo active, can Bar activate?
func (h *Handlers) FooBar(e *am.Event) bool {
return true
}
// Foo activates
func (h *Handlers) FooState(e *am.Event) {
h.foo = NewConn()
}
// Foo de-activates
func (h *Handlers) FooEnd(e *am.Event) {
h.foo.Close()
}Schemas - relational schemas (aRPC server).
var ServerSchema = am.Schema{
ssF.ClientConnected: {Require: S{ssS.RpcReady}},
ssF.ErrDelivery: {Require: S{ssS.Exception}},
ssF.ErrHandlerTimeout: {
Add: S{ssS.Exception},
Multi: true,
Require: S{ssS.Exception},
},
ssF.ErrNetwork: {
Remove: S{ssS.ClientConnected},
Require: S{ssS.Exception},
},
ssF.ErrNetworkTimeout: {Require: S{ssS.Exception}},
ssF.ErrOnClient: {Require: S{ssS.Exception}},
ssF.ErrProviding: {Require: S{ssS.Exception}},
ssF.ErrRpc: {Require: S{ssS.Exception}},
ssF.ErrSendPayload: {Require: S{ssS.Exception}},
ssF.Exception: {Multi: true},
ssF.HandshakeDone: {
Remove: S{ssS.Handshaking, ssS.HandshakeDone, ssS.Exception},
Require: S{ssS.Start, ssS.ClientConnected},
},
ssF.Handshaking: {
Remove: S{ssS.Handshaking, ssS.HandshakeDone},
Require: S{ssS.Start},
},
ssF.Healthcheck: {Multi: true},
ssF.Heartbeat: {},
ssF.MetricSync: {Multi: true},
ssF.Ready: {
Auto: true,
Require: S{ssS.HandshakeDone, ssS.RpcReady},
},
ssF.RpcAccepting: {
Remove: S{ssS.RpcStarting, ssS.RpcAccepting, ssS.RpcReady},
Require: S{ssS.Start},
},
ssF.RpcReady: {
Remove: S{ssS.RpcStarting, ssS.RpcAccepting, ssS.RpcReady},
Require: S{ssS.Start},
},
ssF.RpcStarting: {
Remove: S{ssS.RpcStarting, ssS.RpcAccepting, ssS.RpcReady},
Require: S{ssS.Start},
},
ssF.SendPayload: {Multi: true},
ssF.Start: {Add: S{ssS.RpcStarting}},
ssF.WebSocketTunnel: {},
}All examples and benchmarks can be found in /examples.
- 🦾
/pkg/machineis the main package /docs/manual.mdis the go-to/docs/diagrams.mdtry to explain things visually/examplesshow use cases and integrations- with
/examples/mach_templatebeing ready for copy-paste - also Basic, CLI Daemon, aRPC, WASM, WASM Workflow, TUI
- with
/tools/cmd/am-genwill bootstrap/tools/cmd/am-dbgwill record every detail- code in
/pkg/nodeshows a high-level usage - and reading tests is always a good idea
This monorepo offers the following importable packages, especially:
- 🦾
/pkg/machineState machine, dependency free, semver compatible. /pkg/statesReusable state schemas, handlers, and piping./pkg/helpersUseful functions when working with async state machines./pkg/telemetryTelemetry exporters for dbg, metrics, traces, and logs.
Other packages:
/pkg/rpcRemote state machines, with the same API as local ones./pkg/historyHistory tracking and traversal, including Key-Value and SQL./pkg/integrationsIntegrations with NATS and JSON./pkg/graphDirectional multigraph of connected state machines./pkg/nodeDistributed worker pools with supervisors./pkg/pubsubDecentralized PubSub based on libp2p gossipsub.
/tools/cmd/am-dbgMulti-client TUI debugger./tools/cmd/am-genGenerates schema files and Grafana dashboards./tools/cmd/arpcNetwork-native REPL and CLI./tools/cmd/am-visGenerates D2 diagrams./tools/cmd/am-relayRotates logs and relays WASM.
Note
Inspecting cause-and-effect in distributed systems.
asyncmachine-go synchronizes state for the following projects:
- secai - AI Workflows framework
- secai Web UI - WebAssembly go-app PWA
- Self-hosting of pkg/rpc, pkg/node, pkg/pubsub
- arpc REPL - Cobra-based REPL
- am-dbg TUI Debugger - Single state-machine TUI app
- libp2p PubSub Simulator - Sandbox simulator for libp2p-pubsub
- libp2p PubSub Benchmark - Benchmark of libp2p-pubsub ported to asyncmachine-go
- scale up, not down
- defaults work by default
- everything can be traced and debugged
- automation is evolution
- state != data
Note
Hundreds of clones.
Under development, status depends on each package. The bottom layers seem prod grade, the top ones are alpha or testing.
Note
Managing distributed concurrency.
- good first issues
- before
./scripts/dep-taskfile.shtask install-deps
- after
task testtask formattask linttask precommit
- more tooling and diagrams
- bug fixes, optimizations
- network security, ACLs
- ROADMAP.md
Note
Step by step.
It calls struct methods according to conventions, a schema, and currently active states (eg BarEnter, FooFoo,
FooBar, BarState). It tackles nondeterminism by embracing it - like an UDP event stream with structure.
State is a binary ID as in status / switch / flag, eg "process RUNNING" or "car BROKEN".
Each state has a counter of activations & deactivations, and all state counters create "machine time". These are logical clocks, and the queue is also (partially) counted.
The same event happening many times will cause only 1 state activation, until the state becomes inactive.
The complete FAQ is available at FAQ.md.
Note
Don't lose your sync.
