# Asyncmachine Go Asyncmachine-go is a distributed workflow engine that implements Aspect-Oriented Programming (AOP) and the actor model through a clock-based state machine. It provides a novel approach to controlling program flow using multi-state machines with atomic transitions, state relations, and a negotiation-based consensus mechanism. The core library (`/pkg/machine`) is dependency-free and serves as the foundation for building fault-tolerant, stateful applications ranging from simple daemons to complex distributed systems. The framework excels at orchestrating blocking APIs into fully controllable async state machines where write operations are state mutations, read operations are state checks, and subscriptions are state waiting. Key features include transparent RPC for remote state machines, a TUI debugger for step-level inspection, telemetry exporters (OpenTelemetry, Prometheus, Loki), selective distribution over networks, and WebAssembly support. Each state represents multiple concepts simultaneously: a binary flag, a node with relations, an AOP aspect, a logical clock, a subscription topic, methods, metrics, traces, locks, and breakpoints. ## Creating a State Machine Initialize a new state machine with a schema defining states and their relations using `am.New()`. The schema maps state names to their properties (Auto, Multi) and relations (Require, Add, Remove, After). ```go import am "github.com/pancsta/asyncmachine-go/pkg/machine" func main() { ctx := context.Background() // Define schema with states and relations schema := am.Schema{ "ProcessingFile": { Remove: am.S{"FileProcessed"}, // mutually exclusive }, "FileProcessed": { Remove: am.S{"ProcessingFile"}, }, "InProgress": { Auto: true, // auto-activates when possible Require: am.S{"ProcessingFile"}, // requires ProcessingFile to be active }, } // Create machine with options mach := am.New(ctx, schema, &am.Opts{ Id: "file-processor", LogLevel: am.LogChanges, }) // Add a state result := mach.Add1("ProcessingFile", nil) // result: am.Executed, am.Canceled, or am.Queued // Check state fmt.Println(mach.Is1("ProcessingFile")) // true fmt.Println(mach.Is1("InProgress")) // true (auto-activated) fmt.Println(mach.StringAll()) // (ProcessingFile:1 InProgress:1) [FileProcessed:0 Exception:0] } ``` ## State Mutations (Add, Remove, Set) Mutations change the active states of a machine. `Add` preserves existing states, `Remove` deactivates specified states, and `Set` replaces all states with the specified ones. ```go import am "github.com/pancsta/asyncmachine-go/pkg/machine" func main() { mach := am.New(ctx, am.Schema{ "Foo": {}, "Bar": {}, "Baz": {}, }, nil) // Add mutation - preserves existing states mach.Add1("Foo", nil) // (Foo:1) mach.Add(am.S{"Bar", "Baz"}, nil) // (Foo:1 Bar:1 Baz:1) // Remove mutation - deactivates specified states mach.Remove1("Foo", nil) // (Bar:1 Baz:1) [Foo:2] mach.Remove(am.S{"Bar"}, nil) // (Baz:1) [Foo:2 Bar:2] // Set mutation - replaces all states mach.Set(am.S{"Foo", "Bar"}, nil) // (Foo:3 Bar:3) [Baz:2] // Toggle mutation - switches state on/off mach.Toggle1("Foo", nil) // (Bar:3) [Foo:4 Baz:2] mach.Toggle1("Foo", nil) // (Foo:5 Bar:3) [Baz:2] // Add with arguments mach.Add1("Foo", am.A{"key": "value", "count": 42}) // Error handling via Exception state mach.AddErr(errors.New("something failed"), nil) mach.AddErrState("ErrNetwork", errors.New("connection failed"), nil) } ``` ## Defining Transition Handlers Handlers are struct methods that execute during state transitions. Negotiation handlers (`Enter`, `Exit`) can cancel transitions by returning false, while final handlers (`State`, `End`) perform side effects. ```go import am "github.com/pancsta/asyncmachine-go/pkg/machine" type Handlers struct { *am.ExceptionHandler // handles built-in Exception state Filename string } // Negotiation handler - can cancel transition func (h *Handlers) ProcessingFileEnter(e *am.Event) bool { // Read-only validation, no blocking if h.Filename == "" { return false // cancel transition } return true // accept transition } // Final handler - perform side effects func (h *Handlers) ProcessingFileState(e *am.Event) { mach := e.Machine // Create state context (expires when state deactivates) stateCtx := mach.NewStateCtx("ProcessingFile") // Fork goroutine for blocking work go func() { if stateCtx.Err() != nil { return // state already deactivated } // Blocking operation data, err := processFile(h.Filename, stateCtx) // Re-check context after blocking call if stateCtx.Err() != nil { return // state deactivated during processing } if err != nil { mach.AddErr(err, nil) return } // Transition to next state mach.Add1("FileProcessed", am.A{"data": data}) }() } // Exit handler - cleanup func (h *Handlers) ProcessingFileEnd(e *am.Event) { // Cleanup resources when state deactivates } // Cross-state handler: with Foo active, can Bar activate? func (h *Handlers) FooBar(e *am.Event) bool { return true } func main() { mach := am.New(ctx, schema, nil) err := mach.BindHandlers(&Handlers{Filename: "data.txt"}) if err != nil { panic(err) } mach.Add1("ProcessingFile", nil) } ``` ## Waiting for States Subscribe to state changes using "when methods" that return channels. These are efficient (no goroutine allocation) and reusable. ```go import am "github.com/pancsta/asyncmachine-go/pkg/machine" func main() { mach := am.New(ctx, schema, nil) // Wait for single state activation <-mach.When1("Ready", nil) // Wait for multiple states to be active simultaneously <-mach.When(am.S{"Connected", "Authenticated"}, nil) // Wait for state deactivation <-mach.WhenNot1("Processing", nil) // Wait with context cancellation ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() select { case <-mach.When1("Done", ctx): fmt.Println("completed") case <-ctx.Done(): fmt.Println("timeout") } // Wait for specific arguments <-mach.WhenArgs("UserEvent", am.A{"userId": "123"}, nil) // Wait for clock tick value <-mach.WhenTime1("Foo", 6, nil) // tick >= 6 <-mach.WhenTicks("Foo", 2, nil) // tick increased by 2 // Wait for queued mutation to execute result := mach.Add1("Foo", nil) <-mach.WhenQueue(result) // Wait for any error <-mach.WhenErr(nil) // Custom clock query <-mach.WhenQuery(func(c am.Clock) bool { return c["Foo"] >= 10 && c["Bar"] >= 2*c["Foo"] }, nil) } ``` ## State Relations Relations define rules between states: `Require` (dependencies), `Remove` (mutual exclusion), `Add` (implied states), and `After` (handler ordering). ```go import am "github.com/pancsta/asyncmachine-go/pkg/machine" func main() { schema := am.Schema{ // Require: state needs other states to be active "Ready": { Require: am.S{"Connected", "Authenticated"}, }, // Remove: mutually exclusive states (state groups) "Connected": { Remove: am.S{"Connecting", "Disconnected"}, }, "Connecting": { Remove: am.S{"Connected", "Disconnected"}, }, "Disconnected": { Auto: true, // auto-activates when possible Remove: am.S{"Connected", "Connecting"}, }, // Add: activating this state implies others "Start": { Add: am.S{"Initializing", "Loading"}, }, // After: handler execution order "Cleanup": { After: am.S{"Processing"}, // Cleanup handlers run after Processing }, // Multi: can activate multiple times without deactivating "EventReceived": { Multi: true, }, } mach := am.New(ctx, schema, nil) // Require relation example mach.Add1("Ready", nil) // Canceled - requires Connected mach.Add(am.S{"Connected", "Authenticated"}, nil) mach.Add1("Ready", nil) // Executed // Remove relation example mach.Is1("Disconnected") // false - removed by Connected mach.Remove1("Connected", nil) mach.Is1("Disconnected") // true - auto-activated // Add relation example mach.Add1("Start", nil) mach.Is(am.S{"Start", "Initializing", "Loading"}) // true } ``` ## Typesafe States with Schema Files Generate type-safe schema definitions using `am-gen` or define manually for IDE support and compile-time checking. ```go // states/ss_mymach.go - generated or hand-written package states import am "github.com/pancsta/asyncmachine-go/pkg/machine" // MyMachStatesDef contains all states type MyMachStatesDef struct { *am.StatesBase // Processing indicates file processing is in progress Processing string // Completed indicates processing finished successfully Completed string // Error indicates an error occurred Error string } // MyMachSchema defines relations var MyMachSchema = am.Schema{ ssM.Processing: { Remove: am.S{ssM.Completed, ssM.Error}, }, ssM.Completed: { Remove: am.S{ssM.Processing, ssM.Error}, }, ssM.Error: { Remove: am.S{ssM.Processing, ssM.Completed}, }, } var ssM = am.NewStates(MyMachStatesDef{}) var MyMachStates = ssM // main.go package main import ( am "github.com/pancsta/asyncmachine-go/pkg/machine" "myproject/states" ) func main() { ss := states.MyMachStates mach := am.New(ctx, states.MyMachSchema, nil) err := mach.VerifyStates(ss.Names()) // verify at runtime if err != nil { panic(err) } // Type-safe state references mach.Add1(ss.Processing, nil) mach.Is1(ss.Processing) <-mach.When1(ss.Completed, nil) } ``` ## Typesafe Arguments Structure mutation arguments with type-safe helpers for parsing and passing between handlers. ```go import am "github.com/pancsta/asyncmachine-go/pkg/machine" import amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers" // A contains typed arguments for mutations type A struct { UserId string `log:"user_id"` // log tag for telemetry RequestId string `log:"req_id"` Data []byte } // ParseArgs extracts A from event args func ParseArgs(args am.A) *A { if a, ok := args["myapp"].(*A); ok { return a } return nil } // Pass wraps A for mutation calls func Pass(args *A) am.A { return am.A{"myapp": args} } // Handler using typed args func (h *Handlers) ProcessingState(e *am.Event) { args := ParseArgs(e.Args) if args == nil { e.Machine.AddErr(errors.New("missing args"), nil) return } // Use typed fields fmt.Printf("Processing user %s, request %s\n", args.UserId, args.RequestId) // Pass args to next state e.Machine.Add1("Completed", Pass(&A{ UserId: args.UserId, RequestId: args.RequestId, })) } // Usage func main() { mach.Add1("Processing", Pass(&A{ UserId: "user-123", RequestId: "req-456", Data: []byte{1, 2, 3}, })) } ``` ## RPC Server and Client Expose state machines over the network with transparent RPC. Clock changes are pushed to clients, and most API methods execute locally on cached state. ```go import ( am "github.com/pancsta/asyncmachine-go/pkg/machine" amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers" arpc "github.com/pancsta/asyncmachine-go/pkg/rpc" ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states" ) // Server side func runServer(ctx context.Context) { // Create state source machine (inherits RPC states) schema := am.SchemaMerge(ssrpc.StateSourceSchema, am.Schema{ "Processing": {}, "Done": {}, }) sourceMach := am.New(ctx, schema, nil) sourceMach.VerifyStates(am.SAdd(ssrpc.StateSourceStates.Names(), am.S{"Processing", "Done"})) // Create RPC server server, err := arpc.NewServer(ctx, "localhost:50051", "myserver", sourceMach, nil) if err != nil { panic(err) } server.Start() err = amhelp.WaitForAll(ctx, 5*time.Second, server.Mach.When1(ssrpc.ServerStates.RpcReady, ctx)) if err != nil { panic(err) } fmt.Println("Server ready") <-ctx.Done() } // Client side func runClient(ctx context.Context) { schema := am.Schema{"Processing": {}, "Done": {}} names := am.S{"Processing", "Done"} // Consumer receives payloads consumer := am.New(ctx, ssrpc.ConsumerSchema, nil) // Create RPC client client, err := arpc.NewClient(ctx, "localhost:50051", "myclient", schema, &arpc.ClientOpts{Consumer: consumer}) if err != nil { panic(err) } client.Start() err = amhelp.WaitForAll(ctx, 5*time.Second, client.Mach.When1(ssrpc.ClientStates.Ready, ctx)) if err != nil { panic(err) } // Use remote machine with same API as local netMach := client.NetMach netMach.Add1("Processing", nil) // remote mutation <-netMach.When1("Done", nil) // local wait (cached state) fmt.Println(netMach.Is1("Done")) // local check (cached state) } ``` ## Telemetry and Debugging Enable debugging with the TUI debugger (am-dbg), OpenTelemetry traces, Prometheus metrics, and Loki logging. ```go import ( am "github.com/pancsta/asyncmachine-go/pkg/machine" amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers" amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry" ) func main() { mach := am.New(ctx, schema, nil) // Enable am-dbg telemetry (env: AM_DBG_ADDR=localhost:6831) amhelp.MachDebugEnv(mach) // Or manually: err := amtele.TransitionsToDBG(mach, "localhost:6831") // OpenTelemetry traces (env: AM_OTEL_ADDR) err = amtele.MachBindOtelEnv(mach) // Or manually with custom tracer: machTracer := amtele.NewOtelMachTracer(otelTracer, nil) mach.BindTracer(machTracer) // Prometheus metrics (env: AM_PROM_ADDR) metrics := amprom.MachMetricsEnv(mach) // Loki logging (env: AM_LOKI_ADDR) amtele.BindLokiEnv(mach) // Set log level mach.SetLogLevel(am.LogChanges) // LogNothing, LogChanges, LogOps, LogDecisions, LogEverything mach.SetLoggerSimple(log.Printf, am.LogOps) // Add breakpoint for debugging mach.AddBreakpoint(am.S{"Error"}, nil, false) } ``` ## Helper Functions for Synchronous Calls Use helper functions to simplify waiting patterns and synchronous-style async operations. ```go import ( am "github.com/pancsta/asyncmachine-go/pkg/machine" amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers" ) func main() { mach := am.New(ctx, schema, nil) // Synchronous add - waits for state to activate result := amhelp.Add1Sync(ctx, mach, "Processing", nil) // result is am.Executed or am.Canceled, never am.Queued // Wait for multiple channels with timeout err := amhelp.WaitForAll(ctx, 5*time.Second, mach.When1("Ready", nil), mach.When1("Connected", nil)) if ctx.Err() != nil { return // context canceled } if err != nil { // timeout or error mach.AddErr(err, nil) return } // Wait for any of multiple channels err = amhelp.WaitForAny(ctx, 5*time.Second, mach.When1("Success", nil), mach.When1("Error", nil)) // Wait with error checking err = amhelp.WaitForErrAll(ctx, 5*time.Second, mach, mach.When1("Done", nil)) // Returns error if timeout OR Exception activates // Simple wait with context if !amhelp.Wait(ctx, 1*time.Second) { return // context expired } // Failsafe mutation with retries _, err = amhelp.NewReqAdd1(mach, "WorkerRequested", nil).Run(ctx) // Retries: 10, Delay: 100ms, Backoff: 5s, MaxDuration: 5s } ``` ## State Piping Between Machines Bind states between machines to automatically propagate state changes. Useful for composing complex workflows from smaller machines. ```go import ( am "github.com/pancsta/asyncmachine-go/pkg/machine" ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes" ) func main() { source := am.New(ctx, sourceSchema, nil) target := am.New(ctx, targetSchema, nil) // Pipe Ready state: source.Ready -> target.SourceReady ampipe.BindReady(source, target, "Ready", "SourceReady") // Pipe with activation and deactivation states // source.Connected -> target.SourceConnected (on activate) // source.Connected deactivate -> target.SourceDisconnected (on deactivate) ampipe.Bind(source, target, "Connected", "SourceConnected", "SourceDisconnected") // Pipe errors ampipe.BindErr(source, target) // Manual piping handlers := &struct { FooState am.HandlerFinal FooEnd am.HandlerFinal }{ FooState: ampipe.Add(source, target, "Foo", "RemoteFoo"), FooEnd: ampipe.Remove(source, target, "Foo", "RemoteFoo"), } source.BindHandlers(handlers) // Now when source.Foo activates, target.RemoteFoo also activates source.Add1("Foo", nil) target.Is1("RemoteFoo") // true } ``` ## Distributed Worker Pools with pkg/node Create distributed worker pools with supervisors, workers, and clients communicating via aRPC. ```go import ( am "github.com/pancsta/asyncmachine-go/pkg/machine" amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers" amnode "github.com/pancsta/asyncmachine-go/pkg/node" ssnode "github.com/pancsta/asyncmachine-go/pkg/node/states" arpc "github.com/pancsta/asyncmachine-go/pkg/rpc" ) // Worker handler type WorkerHandlers struct{} func (w *WorkerHandlers) WorkRequestedState(e *am.Event) { input := e.Args["input"].(int) result := input * input // Send payload to client e.Machine.Add1(ssnode.WorkerStates.ClientSendPayload, arpc.Pass(&arpc.A{ Name: "result", Payload: &arpc.ArgsPayload{ Name: "result", Data: result, Source: e.Machine.Id(), }, })) } func main() { ctx := context.Background() workerKind := "calculator" // Create supervisor (starts and manages workers) workerBin := []string{"./worker-binary", "--kind", workerKind} super, err := amnode.NewSupervisor(ctx, workerKind, workerBin, workerSchema, nil) if err != nil { panic(err) } super.Start("localhost:50051") amhelp.WaitForAll(ctx, 10*time.Second, super.Mach.When1(ssnode.SupervisorStates.PoolReady, ctx)) // Create client client, err := amnode.NewClient(ctx, "myclient", workerKind, workerSchema, nil) if err != nil { panic(err) } client.Start([]string{"localhost:50051"}) amhelp.WaitForAll(ctx, 5*time.Second, client.Mach.When1(ssnode.ClientStates.SuperReady, ctx)) // Request a worker client.ReqWorker(ctx) amhelp.WaitForAll(ctx, 5*time.Second, client.Mach.When1(ssnode.ClientStates.WorkerReady, ctx)) // Use the worker worker := client.WorkerRpc.NetMach worker.Add1(ssnode.WorkerStates.WorkRequested, am.A{"input": 5}) // Wait for and receive payload <-client.Mach.When1(ssnode.ClientStates.WorkerPayload, ctx) } ``` ## Clock and State Context Each state has a tick counter forming a logical clock. Odd ticks mean active, even mean inactive. State contexts expire when the state deactivates. ```go import am "github.com/pancsta/asyncmachine-go/pkg/machine" func main() { mach := am.New(ctx, schema, nil) // Check tick values mach.Tick("Foo") // 0 (inactive) mach.Add1("Foo", nil) mach.Tick("Foo") // 1 (active, odd = active) mach.Remove1("Foo", nil) mach.Tick("Foo") // 2 (inactive, even = inactive) mach.Add1("Foo", nil) mach.Tick("Foo") // 3 (active again) // Get machine time (all ticks) time := mach.Time(am.S{"Foo", "Bar"}) // am.Time{3, 0} // Check if time matches current state mach.IsTime(time, am.S{"Foo", "Bar"}) // true // Create state-bound context stateCtx := mach.NewStateCtx("Processing") go func() { // This context cancels when Processing deactivates select { case <-stateCtx.Done(): fmt.Println("Processing ended") return case <-time.After(10 * time.Second): // Continue work } }() // Time-based waiting <-mach.WhenTime1("Foo", 5, nil) // wait for tick >= 5 <-mach.WhenTicks("Foo", 2, nil) // wait for 2 more ticks } ``` Asyncmachine-go is designed for building autonomous workflows with organic control flow and stateful APIs. The primary use cases include distributed systems orchestration, daemon and service lifecycle management, complex UI state management, RPC-based microservice communication, real-time systems requiring instant cancellation, test scenario modeling, and any application needing predictable async state coordination. Integration patterns center around defining state schemas that model your domain's workflow states, binding handlers for state transitions, using relations to enforce business rules automatically, leveraging the queue for actor-model concurrency, and composing larger systems through state piping and RPC. The clock-based design enables precise coordination across distributed components while the TUI debugger and telemetry exporters provide deep observability into system behavior. Start with simple schemas, add relations as requirements emerge, and scale to distributed deployments using the RPC and node packages.