### Basic Core.async 'Hello, world!' Example Source: https://github.com/clojure/core.async/wiki/Getting-Started This example demonstrates the fundamental operations of requiring the core.async namespace, creating a buffered channel, sending a message synchronously using '>!!', and receiving it synchronously using ' (require '[clojure.core.async :as async]) nil user> (def a-channel (async/chan 1)) #> user> (async/>!! a-channel "Hello, world!") true user> (async/data logic, no channels (defn doubler-step ([] {:ins {:in "Input values"} :outs {:out "Doubled values"}}) ([state] state) ;; init ([state transition] state) ;; lifecycle ([state _input-id msg] ;; transform: return [new-state {output-map}] [state {:out [(* 2 msg)]}])) ;; Source step: reads from external channel via ::flow/in-ports (defn source-step ([] {:ins {} :outs {:out "Values from external channel"}}) ([{:keys [source-chan] :as state}] (assoc state ::flow/in-ports {:in source-chan})) ([state _transition] state) ([state _in msg] [state {:out [msg]}])) ;; Sink step: writes to external channel via ::flow/out-ports (defn sink-step ([] {:ins {:in "Values to collect"} :outs {}}) ([{:keys [result-chan] :as state}] (assoc state ::flow/out-ports {:out result-chan})) ([state _transition] state) ([state _in msg] [state {}])) ;; Build and run the flow (let [in-chan (a/chan 10) out-chan (a/chan 10) flow-def {:procs {:source {:proc (flow/process #'source-step) :args {:source-chan in-chan}} :double {:proc (flow/process #'doubler-step)} :sink {:proc (flow/process #'sink-step) :args {:result-chan out-chan}}} :conns [[[:source :out] [:double :in]] [[:double :out] [:sink :in]]]} f (flow/create-flow flow-def) {:keys [report-chan error-chan]} (flow/start f)] ;; Inject values (a/>!! in-chan 3) (a/>!! in-chan 7) (Thread/sleep 100) (println (a/poll! out-chan)) ;; => 6 (println (a/poll! out-chan)) ;; => 14 (flow/stop f) (a/close! in-chan)) ``` -------------------------------- ### start Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.html Starts the entire flow from its initial values. Processes begin in a paused state and can be resumed using `resume` or `resume-proc`. Returns a map containing report and error channels. ```APIDOC ## start ### Description Starts the entire flow. Processes begin paused and can be resumed. ### Signature `(start g)` ### Parameters * **g** - The flow graph to start. ### Returns A map with the following keys: * **:report-chan** - A core.async channel for reading 'ping' responses and explicit `::flow/report` outputs from `:transform`. * **:error-chan** - A core.async channel for reading exceptions thrown within the flow. Each entry will contain at least a `::flow/ex` key with the exception, and potentially other keys like `pid`, `state`, and `status`. ``` -------------------------------- ### ProcLauncher.start Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.spi.html Starts a process, typically by launching its thread. It takes process details like pid, arguments, channels, and a resolver. ```APIDOC ## start `(start p {:keys [pid args ins outs resolver]})` return ignored, called for the effect of starting the process (typically, starting its thread) where: :pid - the id of the process in the graph, so that e.g. it can refer to itself in control, reporting etc :args - a map of param->val, as supplied in the graph def :ins - a map of in-id->readable-channel, plus the ::flow/control channel :outs - a map of out-id->writeable-channel, plus the ::flow/error and ::flow/report channels N.B. outputs may be nil if not connected :resolver - an impl of spi/Resolver, which can be used to find channels given their logical [pid cid] coordinates, as well as to obtain Executors corresponding to the logical :mixed/:io/:compute contexts ``` -------------------------------- ### Performance Timing Example (Clojure) Source: https://github.com/clojure/core.async/wiki/Sieve-of-Eratosthenes Measures the execution time of consuming prime numbers from both the naive and doomed prime channel generators. ```clojure (time (consume 2000 (chan-of-primes-naive))) (time (consume 2000 (chan-of-primes-doomed))) ``` -------------------------------- ### Describe Step-Fn Arity Example Source: https://github.com/clojure/core.async/blob/master/doc/flow-guide.md The describe arity must return a static description of the step-fn's :params, :ins, and :outs. Each of these is a map of name (a keyword) to docstring. Input and output channel names should be distinct. ```clojure {:params {:size "Max size"} ;; step-fn params :ins {:in "Input channel"} ;; input channels :outs {:out "Output channel"}} ``` -------------------------------- ### Read messages from channels with alts!! Source: https://github.com/clojure/core.async/blob/master/docs/walkthrough.html Use `alts!!` to read from multiple channels. This example reads a specified number of messages and asserts their content. ```clojure (dotimes [i n] (let [[v c] (a/alts!! cs)] (assert (= "hi" v)))) (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms")) ``` -------------------------------- ### Handle blocking publication scenario Source: https://github.com/clojure/core.async/wiki/Pub-Sub Demonstrates a scenario where a publication might block if a subscribed channel cannot accept values. This setup subscribes to a ':loser' topic. ```clojure (def loser-chan (chan)) (sub our-pub :loser loser-chan) (>!! input-chan {:msg-type :loser :text "I won't be accepted"}) ``` -------------------------------- ### Helper for Integer Channel with Transducer (Clojure) Source: https://github.com/clojure/core.async/wiki/Sieve-of-Eratosthenes Creates a channel that emits integers starting from `start-n`, applying the given transducer `xform`. Useful for building prime number generators. ```clojure (defn chan-of-ints [xform start-n] (let [ints (chan 1 xform)] (go-loop [n start-n] (>! ints n) (recur (inc n))) ints)) ``` -------------------------------- ### ProcLauncher Protocol Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.spi.html Defines the ProcLauncher protocol for creating custom process types. It includes methods to describe process parameters and to start new process instances. ```APIDOC ## ProcLauncher Protocol Note - defining a ProcLauncher is an advanced feature and should not be needed for ordinary use of the library. This protocol is for creating new types of Processes that are not possible to create with ::flow/process. A ProcLauncher is a constructor for a process, a thread of activity. It has two functions - to describe the parameters and input/output requirements of the process and to start it. The launcher should acquire no resources, nor retain any connection to the started process. A launcher may be called upon to start a process more than once, and should start a new process each time start is called. The process launched process must obey the following: It must have 2 logical statuses, :paused and :running. In the :paused status operation is suspended and no output is produced. When the process starts it must be :paused Whenever it is reading or writing to any channel a process must use alts!! and include a read of the ::flow/control channel, giving it priority. Command messages sent over the ::flow/control channel have the keys: ::flow/to - either ::flow/all or a process id ::flow/command - ::flow/stop|pause|resume|ping or process-specific It must act upon any, and only, control messages whose ::flow/to key is its pid or ::flow/all It must act upon the following values of ::flow/command: ::flow/stop - all resources should be cleaned up and any thread(s) should exit ordinarily - there will be no more subsequent use of the process. ::flow/pause - enter the :paused status ::flow/resume - enter the :running status and resume processing ::flow/ping - emit a ping message (format TBD) to the ::flow/report channel containing at least its pid and status A process can define and respond to other commands in its own namespace. A process should not transmit channel objects (use [pid io-id] data coordinates instead) A process should not close channels Finally, if a process encounters an error it must report it on the ::flow/error channel (format TBD) and attempt to continue, though it may subsequently get a ::flow/stop command it must respect ``` -------------------------------- ### Create Channel of Integers Source: https://github.com/clojure/core.async/wiki/Sieve-of-Eratosthenes Generates an infinite sequence of integers starting from a given number and sends them to a core.async channel. This function is used as a source for prime number generation. ```clojure (defn chan-of-ints [start-n] (let [ints (chan)] (go-loop [n start-n] (>! ints n) (recur (inc n))) ints)) ``` -------------------------------- ### Combine `alts!!` with `a/timeout` for Timed Waits Source: https://github.com/clojure/core.async/blob/master/doc/walkthrough.md This example shows how to use `alts!!` with a timeout channel to perform a timed wait for a value on another channel. If the timeout occurs before a value is received, `alts!!` returns nil. ```clojure (let [c (a/chan) begin (System/currentTimeMillis)] (a/alts!! [c (a/timeout 100)]) (println "Gave up after" (- (System/currentTimeMillis) begin))) ``` -------------------------------- ### step-fn Arity 1: init Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.html Initializes the process state. ```APIDOC ## init `(init arg-map) -> initial-state` ### Description Called once to establish initial state. `arg-map` contains param->val mappings. The key `::flow/pid` is added. Optionally returns `::flow/in-ports` and/or `::flow/out-ports` as maps of cid -> channel. Optionally returns `::flow/input-filter` predicate. ``` -------------------------------- ### Create Channels with `chan` Source: https://context7.com/clojure/core.async/llms.txt Demonstrates creating different types of channels: unbuffered, fixed buffer, dropping buffer, sliding buffer, and channels with transducers and exception handlers. Also shows how to close a channel. ```clojure (require '[clojure.core.async :as a]) ;; Unbuffered — put blocks until a reader takes (def rendezvous (a/chan)) ;; Fixed buffer of 10 — up to 10 items before puts block (def buffered (a/chan 10)) ;; Dropping buffer — when full, new puts complete but value is dropped (def dropping (a/chan (a/dropping-buffer 5))) ;; Sliding buffer — when full, oldest value is evicted to make room (def sliding (a/chan (a/sliding-buffer 5))) ;; Channel with transducer (buffer required when using xform) (def evens (a/chan 10 (filter even?))) ;; Channel with transducer + exception handler (def safe (a/chan 10 (map #(/ 10 %)) (fn [ex] (println "Error:" ex) nil))) ;; Close a channel — no more puts accepted; remaining values still takeable (a/close! buffered) ``` -------------------------------- ### ping-proc Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.html Pings a specific process within the flow to get its live status. ```APIDOC ## ping-proc ### Description Pings a specific process within the flow to get its live status. ### Parameters * **flow** (flow) - The flow object. * **process-id** (keyword) - The ID of the process to ping. ### Returns A representation of the live status of the specified process. ``` -------------------------------- ### Efficiently Read from Many Channels with `alts!!` Source: https://github.com/clojure/core.async/blob/master/doc/walkthrough.md Demonstrates creating a large number of go blocks and channels, then efficiently reading from them as they become ready using `alts!!`. This highlights the scalability of core.async. ```clojure (let [n 1000 cs (repeatedly n a/chan) begin (System/currentTimeMillis)] (doseq [c cs] (a/go (>! c "hi"))) (dotimes [i n] (let [[v c] (a/alts!! cs)] (assert (= "hi" v)))) (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms")) ``` -------------------------------- ### Channel of Primes Generator Source: https://github.com/clojure/core.async/wiki/Sieve-of-Eratosthenes Creates a channel that emits prime numbers, starting with 2 and then odd primes. It utilizes the `posmod-sift` for filtering. ```clojure (defn chan-of-primes [] (let [inputs (filter odd? (drop 3 (range))) primes (chan 1 (posmod-sift))] (put! primes 2) (onto-chan primes inputs) primes)) ``` -------------------------------- ### ProcLauncher.describe Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.spi.html Returns a map describing the parameters, inputs, and outputs of a process. ```APIDOC ## describe `(describe p)` returns a map with keys - :params, :ins and :outs, each of which in turn is a map of keyword to docstring :params describes the initial arguments to setup the state for the process :ins enumerates the input[s], for which the graph will create channels :outs enumerates the output[s], for which the graph may create channels. describe may be called by users to understand how to use the proc. It will also be called by the impl in order to discover what channels are needed. ``` -------------------------------- ### `onto-chan!` / `to-chan!` Source: https://context7.com/clojure/core.async/llms.txt Loads collections into channels. `onto-chan!` pours a collection onto an existing channel, while `to-chan!` creates a new channel pre-loaded with a collection's contents. Both close the channel by default upon completion. ```APIDOC ## `onto-chan!` / `to-chan!` — Load collections into channels `onto-chan!` pours a collection onto an existing channel. `to-chan!` creates a new channel pre-loaded with a collection's contents. Both close the channel when done by default. Use `!!` variants for blocking/lazy collections. ```clojure (require '[clojure.core.async :as a :refer [ [1 2 3] ;; to-chan! — convenience: create channel from collection (let [c (a/to-chan! [:a :b :c])] (println ( [:a :b :c] ;; onto-chan! with close? false — reuse channel across multiple pours (let [c (a/chan 10)] ( [1 2 3 4 5 6] ``` ``` -------------------------------- ### Init State Options Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.html The initial state can include :flow/in-ports and :flow/out-ports as maps of channel IDs to core.async channels. It can also contain :flow/input-filter to selectively process inputs. ```clojure Optionally, a returned init state may contain the keys ::flow/in-ports and/or ::flow/out-ports. These should be maps of cid -> a core.async.channel. The cids must not conflict with the in/out ids. These channels will become part of the input/output set of the process, but are not otherwise visible/resolvable within the flow. Ports are a way to allow data to enter or exit the flow from outside of it. Use :transition to coordinate the lifecycle of these external channels. Optionally, _any_ returned state, whether from init, transition or transform, may contain the key ::flow/input-filter, a predicate of cid. Only inputs (including in-ports) satisfying the predicate will be part of the next channel read set. In the absence of this predicate all inputs are read. ``` -------------------------------- ### Transition Step-Fn State Source: https://github.com/clojure/core.async/blob/master/docs/flow-guide.html The transition arity is called during lifecycle events (start, stop, pause, resume) to update the process state. Use this to manage external resources. ```clojure (step-fn state transition) -> state' ``` -------------------------------- ### Import core.async library Source: https://github.com/clojure/core.async/wiki/Pub-Sub Imports the necessary core.async library for pub-sub functionality. ```clojure (use 'clojure.core.async) ``` -------------------------------- ### Transduce with Optimized Range Source: https://github.com/clojure/core.async/wiki/Sieve-of-Eratosthenes Generates prime numbers by applying the `posmod-sift` transducer to a sequence that includes 2 and then only odd numbers starting from 3. This is a slightly optimized approach. ```clojure (transduce (posmod-sift) conj (cons 2 (range 3 100 2))) => [2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97] ``` -------------------------------- ### offer! Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Attempts to immediately put a value into a channel without blocking. Returns true if the offer succeeds, false otherwise. Nil values are not allowed. ```APIDOC ## offer! `(offer! port val)` Puts a val into port if it's possible to do so immediately. nil values are not allowed. Never blocks. Returns true if offer succeeds. ``` -------------------------------- ### step-fn Arity 0: describe Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.html Describes the process, including its parameters, inputs, outputs, and signal selection. ```APIDOC ## describe `() -> description` ### Description Returns a map with optional keys: `:params`, `:ins`, `:outs`, `:signal-select`, `:workload`. - `:params`: Map of keyword to doc string for initial arguments. - `:ins`: Map of keyword to doc string enumerating process inputs. - `:outs`: Map of keyword to doc string enumerating process outputs. - `:signal-select`: A predicate of a signal-id. Messages on approved signals will appear in the transform arity. Can be a set of signal-ids. - `:workload`: Describes the nature of the workload: :mixed, :io, or :compute. ``` -------------------------------- ### Wait on Multiple Channels with `alts!!` Source: https://github.com/clojure/core.async/blob/master/doc/walkthrough.md Use `alts!!` in ordinary threads to wait on multiple channels simultaneously. It returns the value and the channel that succeeded. This example demonstrates reading from two channels in a background thread. ```clojure (let [c1 (a/chan) c2 (a/chan)] (a/thread (while true (let [[v ch] (a/alts!! [c1 c2])] (println "Read" v "from" ch)))) (>!! c1 "hi") (>!! c2 "there")) ``` -------------------------------- ### take! Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Asynchronously takes a value from a port and passes it to a callback function. Handles immediate availability and potential closure. ```APIDOC ## take! ### Description Asynchronously takes a value from a port, passing it to `fn1`. Will pass `nil` if the port is closed. If `on-caller?` is true and the value is immediately available, `fn1` will be called on the calling thread. `fn1` should not perform blocking I/O. ### Signature `(take! port fn1)` `(take! port fn1 on-caller?)` ### Parameters - `port`: The channel to take the value from. - `fn1`: The callback function to receive the value. - `on-caller?`: An optional boolean (defaults to `true`) indicating if `fn1` should be called on the calling thread if the value is immediately available. ### Returns `nil`. ``` -------------------------------- ### Creating a Mix of Input Channels Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Creates a mix of input channels that will feed into a supplied output channel. Allows adding/removing inputs and controlling solo, mute, and pause states. ```clojure (mix out) ``` -------------------------------- ### Use `alts!` within `go` Blocks Source: https://github.com/clojure/core.async/blob/master/doc/walkthrough.md Similar to `alts!!`, `alts!` is used within `go` blocks for waiting on multiple channels. This example shows how to concurrently read from two channels using `go` blocks. ```clojure (let [c1 (a/chan) c2 (a/chan)] (a/go (while true (let [[v ch] (a/alts! [c1 c2])] (println "Read" v "from" ch)))) (a/go (>! c1 "hi")) (a/go (>! c2 "there"))) ``` -------------------------------- ### pub Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Creates a publisher for a channel, partitioning messages into topics based on a topic function. Supports optional buffering. ```APIDOC ## pub ### Description Creates and returns a pub(lication) of the supplied channel, partitioned into topics by the topic-fn. topic-fn will be applied to each value on the channel and the result will determine the 'topic' on which that value will be put. Channels can be subscribed to receive copies of topics using 'sub', and unsubscribed using 'unsub'. Each topic will be handled by an internal mult on a dedicated channel. By default these internal channels are unbuffered, but a buf-fn can be supplied which, given a topic, creates a buffer with desired properties. Each item is distributed to all subs in parallel and synchronously, i.e. each sub must accept before the next item is distributed. Use buffering/windowing to prevent slow subs from holding up the pub. Items received when there are no matching subs get dropped. Note that if buf-fns are used then each topic is handled asynchronously, i.e. if a channel is subscribed to more than one topic it should not expect them to be interleaved identically with the source. ### Signature `(pub ch topic-fn)` `(pub ch topic-fn buf-fn)` ``` -------------------------------- ### Enable Development-time Blocking Call Detection in go Blocks Source: https://context7.com/clojure/core.async/llms.txt Set the JVM system property `clojure.core.async.go-checking=true` to detect accidental use of blocking operations inside go blocks, which can starve the go thread pool. This example shows how to enable it programmatically and handle the resulting exceptions. ```clojure ;; Enable via JVM system property at startup: ;; -Dclojure.core.async.go-checking=true ;; Or programmatically before loading core.async: (System/setProperty "clojure.core.async.go-checking" "true") (require '[clojure.core.async :as a]) ;; This will now throw in the go thread (caught by uncaught exception handler): (a/go (a/ RuntimeException: Blocking call in go block ;; Handle in development: (Thread/setDefaultUncaughtExceptionHandler (reify Thread$UncaughtExceptionHandler (uncaughtException [_ thread ex] (println "Go block violation on" (.getName thread) ":" (.getMessage ex))))) ``` -------------------------------- ### Many Go Blocks with Alts Source: https://github.com/clojure/core.async/blob/master/docs/walkthrough.html Demonstrates creating a large number of lightweight go blocks that communicate via channels. `a/alts!!` is used to efficiently read from these channels as they become ready. ```clojure (let [n 1000 cs (repeatedly n a/chan) begin (System/currentTimeMillis)] (doseq [c cs] (a/go (>! c "hi")))) ``` -------------------------------- ### Create a publication channel with pub Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Creates a publication channel that partitions incoming values into topics based on a topic function. Supports optional buffering. ```clojure (pub ch topic-fn) (pub ch topic-fn buf-fn) ``` -------------------------------- ### Add core.async Dependency Source: https://context7.com/clojure/core.async/llms.txt Instructions for adding core.async to your project using deps.edn or Leiningen, and how to require it in your namespace. ```clojure ;; deps.edn {:deps {org.clojure/clojure {:mvn/version "1.12.0"} org.clojure/core.async {:mvn/version "1.9.865"}}} ;; Leiningen project.clj [org.clojure/core.async "1.9.865"] ;; Namespace require (ns my.app (:require [clojure.core.async :as a :refer [!! ! go go-loop chan close!]])) ``` -------------------------------- ### Describe Map Keys Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.html The description map can include :params, :ins, :outs, :signal-select, and :workload. These define the process's configuration and how it interacts with the flow. ```clojure description is a map with possible keys: :params :ins and :outs, each of which in turn is a map of keyword to doc string :signal-select - a predicate of a signal-id. Messages on approved signals will appear in the transform arity (see below) For the simple case of enumerated signal-ids, use a set, e.g. #{:this/signal :that/signal} If no :signal-select is provided, no signals will be received :workload with possible values of :mixed :io :compute All entries in the describe return map are optional. ``` -------------------------------- ### chan Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Creates a channel with optional buffering, a transducer, and an exception handler. ```APIDOC ## chan ### Description Creates a channel with optional buffering, a transducer, and an exception handler. If a number is provided for buffer, a fixed buffer is used. If a transducer is supplied, a buffer must also be specified. The exception handler is a function that takes a Throwable and returns a value to be placed in the channel. ### Signatures `(chan) (chan buf-or-n) (chan buf-or-n xform) (chan buf-or-n xform ex-handler)` ``` -------------------------------- ### Topic-based publish/subscribe with `pub` and `sub` Source: https://context7.com/clojure/core.async/llms.txt Use `pub` to wrap a channel into a publication, routing values to subscribers based on a topic function. `sub` registers a channel to receive all values matching a given topic. Unmatched values are dropped. ```clojure (require '[clojure.core.async :as a :refer [chan go-loop !! close!]]) (let [input (a/chan 10) ;; :event-type keyword used as topic function pub (a/pub input :event-type) orders (a/chan 10) alerts (a/chan 10)] ;; Subscribe channels to specific topics (a/sub pub :order orders) (a/sub pub :alert alerts) ;; Consumers (go-loop [] (when-let [v (!! input {:event-type :order :payload "Order #1001"}) (>!! input {:event-type :alert :payload "Low stock"}) (>!! input {:event-type :order :payload "Order #1002"}) (>!! input {:event-type :other :payload "Ignored"}) (Thread/sleep 50) (close! input)) ``` -------------------------------- ### Create Fixed Buffer Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Returns a fixed-size buffer of size `n`. Puts will block or park when the buffer is full. ```clojure (buffer n) ``` -------------------------------- ### step-fn Arity 2: transition Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.html Handles state transitions for the process. ```APIDOC ## transition `(transition state transition) -> state'` ### Description Called when the process makes a state transition (e.g., `::flow/resume`, `::flow/pause`, `::flow/stop`). Used to track changes and coordinate resources, especially cleanup on `:stop`. ``` -------------------------------- ### Load Collections into Channels Source: https://context7.com/clojure/core.async/llms.txt onto-chan! pours a collection onto an existing channel, returning a channel that closes when done. to-chan! creates a new channel pre-loaded with a collection's contents. ```clojure (require '[clojure.core.async :as a :refer [ [1 2 3] ;; to-chan! — convenience: create channel from collection (let [c (a/to-chan! [:a :b :c])] (println ( [:a :b :c] ;; onto-chan! with close? false — reuse channel across multiple pours (let [c (a/chan 10)] ( [1 2 3 4 5 6] ``` -------------------------------- ### Create Channel with Custom Buffer Policies Source: https://github.com/clojure/core.async/blob/master/doc/walkthrough.md Create channels with custom buffer policies. `dropping-buffer` drops newest values when full, while `sliding-buffer` drops oldest values. ```clojure ;; Use `dropping-buffer` to drop newest values when the buffer is full: (a/chan (a/dropping-buffer 10)) ``` ```clojure ;; Use `sliding-buffer` to drop oldest values when the buffer is full: (a/chan (a/sliding-buffer 10)) ``` -------------------------------- ### Create Buffered Channel Source: https://github.com/clojure/core.async/blob/master/doc/walkthrough.md Create a channel with a fixed buffer size by passing a number to `a/chan`. This allows for a specified number of values to be held before blocking. ```clojure (a/chan 10) ``` -------------------------------- ### Describe Step-Fn Descriptor Source: https://github.com/clojure/core.async/blob/master/docs/flow-guide.html The describe arity must return a static description of the step-fn’s :params, :ins, and :outs. Each of these is a map of name (a keyword) to docstring. Input and output channel names should be distinct. ```clojure { :params {:size "Max size"} ;; step-fn params :ins {:in "Input channel"} ;; input channels :outs {:out "Output channel"}} ;; output channels ``` -------------------------------- ### take Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Returns a channel that takes at most `n` items from a source channel. The output channel closes after `n` items or when the source closes. ```APIDOC ## take ### Description Returns a channel that will return, at most, `n` items from `ch`. After `n` items have been returned, or `ch` has been closed, the return channel will close. The output channel is unbuffered by default, unless `buf-or-n` is given. ### Signature `(take n ch)` `(take n ch buf-or-n)` ### Parameters - `n`: The maximum number of items to take. - `ch`: The source channel. - `buf-or-n`: Optional buffer size or buffer for the output channel. ``` -------------------------------- ### Put items into a channel with onto-chan!! Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Use this version of onto-chan when accessing the collection might block, such as with a lazy sequence of blocking operations. ```clojure (onto-chan!! ch coll) (onto-chan!! ch coll close?) ``` -------------------------------- ### Create a publication Source: https://github.com/clojure/core.async/wiki/Pub-Sub Creates a publication from an input channel and a topic function. The topic function categorizes messages. ```clojure (def input-chan (chan)) (def our-pub (pub input-chan :msg-type)) ``` -------------------------------- ### Step Function Arity: Init Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.html The 'init' arity (1-arity) is called once to establish the initial state of the process. It can optionally return :flow/in-ports and :flow/out-ports for external channel integration. ```clojure (arg-map) -> initial-state ``` -------------------------------- ### Offering a Value to a Channel Immediately Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Attempts to put a value into a channel without blocking. Returns true if successful, false otherwise. Nil values are not allowed. ```clojure (offer! port val) ``` -------------------------------- ### Lightweight Asynchronous Blocks with `go` Source: https://context7.com/clojure/core.async/llms.txt Demonstrates the `go` block for executing code asynchronously. Channel operations (`>!` / `!! ! go chan close!]]) (let [c (chan)] ;; Producer go block parks on >! until a taker is ready (go (>! c "hello from go")) ;; Consumer go block parks on "hello from go" ;; Thousands of lightweight go blocks — no OS thread per block (let [n 1000 channels (repeatedly n chan) start (System/currentTimeMillis)] (doseq [c channels] (go (>! c :ping))) (dotimes [_ n] (let [[v _] (a/alts!! (vec channels))] (assert (= :ping v)))) (println "Done in" (- (System/currentTimeMillis) start) "ms")) ``` -------------------------------- ### Create a Dropping Buffer Channel Source: https://github.com/clojure/core.async/blob/master/docs/walkthrough.html Use `a/dropping-buffer` with `a/chan` to create a channel that drops the newest values when the buffer is full. This is useful for scenarios where old data is less relevant. ```clojure (a/chan (a/dropping-buffer 10)) ``` -------------------------------- ### Process Definition with Step Function Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.html Defines a process using a step function with four arities: describe, init, transition, and transform. The step function dictates the process's logic, state management, and message handling. ```clojure (process step-fn) ``` ```clojure (process step-fn {:keys [workload compute-timeout-ms], :as opts}) ``` -------------------------------- ### Put items into a channel with onto-chan! Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Copies items from a collection into a channel. The channel is closed by default after copying. Use onto-chan!! if accessing the collection might block. ```clojure (onto-chan! ch coll) (onto-chan! ch coll close?) ``` -------------------------------- ### ioc-alts! Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Performs alternative operations on channels within an I/O context. Takes a state, continuation block, and ports, with optional options. ```APIDOC ## ioc-alts! `(ioc-alts! state cont-block ports & {:as opts})` ``` -------------------------------- ### mix Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Creates a mix of input channels that are put onto a supplied output channel. Supports adding/removing inputs and controlling their behavior (solo, mute, pause) via `toggle` and `solo-mode`. ```APIDOC ## mix `(mix out)` Creates and returns a mix of one or more input channels which will be put on the supplied out channel. Input sources can be added to the mix with 'admix', and removed with 'unmix'. A mix supports soloing, muting and pausing multiple inputs atomically using 'toggle', and can solo using either muting or pausing as determined by 'solo-mode'. Each channel can have zero or more boolean modes set via 'toggle': :solo - when true, only this (ond other soloed) channel(s) will appear in the mix output channel. :mute and :pause states of soloed channels are ignored. If solo-mode is :mute, non-soloed channels are muted, if :pause, non-soloed channels are paused. :mute - muted channels will have their contents consumed but not included in the mix :pause - paused channels will not have their contents consumed (and thus also not included in the mix) ``` -------------------------------- ### >! (put to channel) Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Puts a value into a channel. Nil values are not allowed. Must be called inside a (go ...) block. Parks if no buffer space is available. Returns true unless the channel is already closed. ```APIDOC ## >! (put to channel) ### Description Puts a value into a channel. Nil values are not allowed. Must be called inside a (go ...) block. Parks if no buffer space is available. Returns true unless the channel is already closed. ### Signature `(>! port val)` ``` -------------------------------- ### buffer Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Returns a fixed buffer of a specified size. Puts will block or park when the buffer is full. ```APIDOC ## buffer ### Description Returns a fixed buffer of a specified size. Puts will block or park when the buffer is full. ### Signature `(buffer n)` ``` -------------------------------- ### Init Step-Fn State Source: https://github.com/clojure/core.async/blob/master/docs/flow-guide.html The init arity is called once by the process to take arguments from the flow definition and return the initial state of the process. ```clojure (step-fn arg-map) -> init-state ``` -------------------------------- ### into Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Consumes items from a channel and collects them into a collection, returning a channel with the single resulting collection. The input channel must be closed before a result is produced. ```APIDOC ## into `(into coll ch)` Returns a channel containing the single (collection) result of the items taken from the channel conjoined to the supplied collection. ch must close before into produces a result. ``` -------------------------------- ### onto-chan! Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Puts the contents of a collection into a channel. The channel can optionally be closed after the items are copied. ```APIDOC ## onto-chan! ### Description Puts the contents of coll into the supplied channel. By default the channel will be closed after the items are copied, but can be determined by the close? parameter. Returns a channel which will close after the items are copied. ### Signature `(onto-chan! ch coll)` `(onto-chan! ch coll close?)` ``` -------------------------------- ### Create a Sliding Buffer Channel Source: https://github.com/clojure/core.async/blob/master/docs/walkthrough.html Use `a/sliding-buffer` with `a/chan` to create a channel that drops the oldest values when the buffer is full. This ensures the most recent data is always available. ```clojure (a/chan (a/sliding-buffer 10)) ``` -------------------------------- ### Create Unbuffered Channel Source: https://github.com/clojure/core.async/blob/master/doc/walkthrough.md Create an unbuffered channel using `a/chan`. Unbuffered channels require a rendezvous between producer and consumer for value transfer. ```clojure (a/chan) ``` -------------------------------- ### onto-chan!! Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Similar to onto-chan!, but intended for use when accessing the collection might block. ```APIDOC ## onto-chan!! ### Description Like onto-chan! for use when accessing coll might block, e.g. a lazy seq of blocking operations. ### Signature `(onto-chan!! ch coll)` `(onto-chan!! ch coll close?)` ``` -------------------------------- ### Dynamic fan-in with `mix`, `admix`, `unmix`, and `toggle` Source: https://context7.com/clojure/core.async/llms.txt Control dynamic fan-in from multiple input channels using `mix`. `admix` adds channels, `unmix-all` removes them, and `toggle` controls channel states like `:mute` (values consumed but dropped) or `:solo` (only soloed channels pass through). ```clojure (require '[clojure.core.async :as a :refer [chan !!]]) (let [out (a/chan 10) c1 (a/chan 5) c2 (a/chan 5) c3 (a/chan 5) m (a/mix out)] (a/admix m c1) (a/admix m c2) (a/admix m c3) ;; Mute c2 — its values are consumed but discarded (a/toggle m {c2 {:mute true}}) ;; Solo c1 — only c1 passes through (c3 is paused by default in solo mode) (a/solo-mode m :pause) (a/toggle m {c1 {:solo true}}) (>!! c1 :from-c1) (>!! c2 :from-c2) (>!! c3 :from-c3) (Thread/sleep 50) (println (a/poll! out)) ;; => :from-c1 (only soloed channel passes) (a/unmix-all m) (a/close! out)) ``` -------------------------------- ### to-chan! Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Creates a channel containing the elements of a collection, closing when exhausted. Use for collections where access is not blocking. ```APIDOC ## to-chan! ### Description Creates and returns a channel which contains the contents of `coll`, closing when exhausted. Use this version when accessing `coll` might block. ### Signature `(to-chan! coll)` ### Parameters - `coll`: The collection to convert into a channel. ``` -------------------------------- ### alts!! (multiple channel operations, blocking) Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Similar to alts!, but takes are made as if by !!, and will block until completed. Not intended for use in (go ...) blocks. ```APIDOC ## alts!! (multiple channel operations, blocking) ### Description Similar to alts!, but takes are made as if by !!, and will block until completed. Not intended for use in (go ...) blocks. ### Signature `(alts!! ports & opts)` ``` -------------------------------- ### go Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.html Asynchronously executes a body of code, returning immediately. Channel operations within the body block by parking the thread instead of tying up an OS thread. Returns a channel that receives the result of the body upon completion. ```APIDOC ## go #### macro `(go & body)` Asynchronously executes the body, returning immediately to the calling thread. Additionally, any visible calls to ! and alt!/alts! channel operations within the body will block (if necessary) by 'parking' the calling thread rather than tying up an OS thread (or the only JS thread when in ClojureScript). Upon completion of the operation, the body will be resumed. go blocks should not (either directly or indirectly) perform operations that may block indefinitely. Doing so risks depleting the fixed pool of go block threads, causing all go block processing to stop. This includes core.async blocking ops (those ending in !!) and other blocking IO. Returns a channel which will receive the result of the body when completed. The resources associated with a go block may be reclaimed, and the block never resumed, when the channels with which it interacts are no longer referenced (outside of the go block). ``` -------------------------------- ### Select First Completed Operation with `alts!!` and `alts!` Source: https://context7.com/clojure/core.async/llms.txt Use `alts!!` (blocking) or `alts!` (non-blocking) to wait on multiple channel operations. They return the value and channel of the first operation to complete. Options include `:priority true` for ordered selection and `:default val` for non-blocking behavior. ```clojure (require '[clojure.core.async :as a :refer [chan go thread alts!! alts! >!! !! c1 "hello") (>!! c2 "world") (Thread/sleep 50)) ;; Received hello on c1 ;; Received world on c2 ;; alts! in a go block — with timeout (let [c (a/chan) t (a/timeout 200)] ( Timed out! ;; alts! with :default — non-blocking attempt (let [c (a/chan 1)] ( :nothing-ready ``` -------------------------------- ### process Source: https://github.com/clojure/core.async/blob/master/docs/clojure.core.async.flow.html Defines the logic for a process compliant with the process protocol. It takes a step-fn and an optional options map. ```APIDOC ## process `(process step-fn)` `(process step-fn {:keys [workload compute-timeout-ms], :as opts})` ### Description Given a function of four arities (0-3), aka the 'step-fn', returns a launcher that creates a process compliant with the process protocol. This is the core facility for defining the logic for processes via ordinary functions. ### Arities for step-fn: 0 - 'describe', `() -> description` 1 - 'init', `(arg-map) -> initial-state` 2 - 'transition', `(state transition) -> state'` 3 - 'transform', `(state input msg) -> [state' output-map]` ### Options Map: - `:workload` - one of :mixed, :io or :compute. Overrides :workload from describe. - `:compute-timeout-ms` - timeout for :compute workload (default 5000 msec). ``` -------------------------------- ### Asynchronous Put and Blocking Take Source: https://github.com/clojure/core.async/blob/master/doc/walkthrough.md Launch a background task using `a/thread` to put a value on a channel, then read that value in the current thread using a blocking take. This avoids blocking the main thread for the put operation. ```clojure (let [c (a/chan)] (a/thread (>!! c "hello")) (assert (= "hello" (