### Perform Asynchronous Operations with Antiox Source: https://github.com/rivet-dev/antiox/blob/main/COMPATIBILITY.md Examples of using asynchronous synchronization methods such as locking a mutex or waiting on a barrier. ```javascript await mutex.lock(); await semaphore.acquire(); await barrier.wait(); const value = await onceCell.getOrInit(async () => await fetchData()); ``` -------------------------------- ### Manage Async Tasks with Spawning and JoinSet Source: https://context7.com/rivet-dev/antiox/llms.txt Covers spawning asynchronous tasks with AbortSignal support and managing collections of tasks using JoinSet. It includes examples for completion-order iteration, bulk task joining, and error handling. ```typescript import { spawn, JoinSet, JoinHandle, JoinError, yieldNow, joinAll, tryJoinAll } from "antiox/task"; const handle = spawn(async (signal) => { const res = await fetch("https://api.example.com/data", { signal }); return res.json(); }); const data = await handle; handle.abort(); if (handle.isFinished()) { console.log("Task completed"); } const set = new JoinSet(); set.spawn(async (signal) => { await sleep(100, signal); return 1; }); set.spawn(async (signal) => { await sleep(50, signal); return 2; }); set.spawn(async (signal) => { await sleep(150, signal); return 3; }); for await (const result of set) { console.log(result); } const next = await set.joinNext(); set.abortAll(); const handles = [ spawn(async () => 1), spawn(async () => 2), spawn(async () => 3), ]; const results = await joinAll(handles); try { await tryJoinAll(handles); } catch (e) {} await yieldNow(); ``` -------------------------------- ### Broadcast Channel: Pub/Sub Async Pattern in TypeScript Source: https://context7.com/rivet-dev/antiox/llms.txt Implements a bounded broadcast channel for publish-subscribe patterns, ensuring all subscribed receivers get every message. Supports multiple subscribers, handling lagged receivers, and asynchronous iteration. Senders can be cloned. ```typescript import { broadcast, RecvError } from "antiox/sync/broadcast"; // Create broadcast channel with capacity 16 const [tx, rx1] = broadcast(16); // Subscribe additional receivers const rx2 = tx.subscribe(); const rx3 = tx.subscribe(); // Send broadcasts to all receivers (returns count of notified receivers) const notified = tx.send("hello"); console.log(`Notified ${notified} receivers`); // All receivers get the same message console.log(await rx1.recv()); // "hello" console.log(await rx2.recv()); // "hello" console.log(await rx3.recv()); // "hello" // Handle lagging receivers (missed messages) for (let i = 0; i < 20; i++) { tx.send(`msg${i}`); } try { await rx1.recv(); } catch (e) { if (e instanceof RecvError && e.kind === "lagged") { console.log(`Missed ${e.lagged} messages`); } } // Async iteration (stops on channel close) for await (const msg of rx2) { console.log(msg); } // Clone sender/receiver const tx2 = tx.clone(); const rx4 = rx1.clone(); // Check receiver count console.log(tx.receiverCount()); // number of active receivers // Close channel tx.close(); ``` -------------------------------- ### Broadcast Channel - Multi-Consumer Pub/Sub Source: https://context7.com/rivet-dev/antiox/llms.txt Creates a bounded broadcast channel where every receiver gets every message. Useful for pub/sub patterns where multiple consumers need all messages. ```APIDOC ## Broadcast Channel - Multi-Consumer Pub/Sub ### Description Creates a bounded broadcast channel where every receiver gets every message. Useful for pub/sub patterns where multiple consumers need all messages. ### Method N/A (This is a library usage example, not a direct API endpoint) ### Endpoint N/A ### Parameters N/A ### Request Example ```typescript import { broadcast, RecvError } from "antiox/sync/broadcast"; // Create broadcast channel with capacity 16 const [tx, rx1] = broadcast(16); // Subscribe additional receivers const rx2 = tx.subscribe(); const rx3 = tx.subscribe(); // Send broadcasts to all receivers (returns count of notified receivers) const notified = tx.send("hello"); console.log(`Notified ${notified} receivers`); // All receivers get the same message console.log(await rx1.recv()); // "hello" console.log(await rx2.recv()); // "hello" console.log(await rx3.recv()); // "hello" // Handle lagging receivers (missed messages) for (let i = 0; i < 20; i++) { tx.send(`msg${i}`); } try { await rx1.recv(); } catch (e) { if (e instanceof RecvError && e.kind === "lagged") { console.log(`Missed ${e.lagged} messages`); } } // Async iteration (stops on channel close) for await (const msg of rx2) { console.log(msg); } // Clone sender/receiver const tx2 = tx.clone(); const rx4 = rx1.clone(); // Check receiver count console.log(tx.receiverCount()); // number of active receivers // Close channel tx.close(); ``` ### Response N/A ### Response Example N/A ``` -------------------------------- ### Initialize Synchronization Primitives in Antiox Source: https://github.com/rivet-dev/antiox/blob/main/COMPATIBILITY.md Demonstrates the instantiation of common synchronization primitives in Antiox, mirroring the Tokio API structure. ```javascript const semaphore = new Semaphore(5); const mutex = new Mutex(); const rwLock = new RwLock(); const barrier = new Barrier(3); const onceCell = new OnceCell(); const token = new CancellationToken(); ``` -------------------------------- ### Implement Actor-like System with Channels and Tasks Source: https://github.com/rivet-dev/antiox/blob/main/README.md Demonstrates how to use MPSC channels for communication and oneshot channels for request-response patterns within an async task. This pattern ensures structured concurrency and backpressure. ```typescript import { channel } from "antiox/sync/mpsc"; import { oneshot, OneshotSender } from "antiox/sync/oneshot"; import { unreachable } from "antiox/panic"; import { spawn } from "antiox/task"; type Msg = | { type: "increment"; amount: number } | { type: "get"; resTx: OneshotSender }; const [tx, rx] = channel(32); spawn(async () => { let count = 0; for await (const msg of rx) { switch (msg.type) { case "increment": count += msg.amount; break; case "get": msg.resTx.send(count); break; default: unreachable(msg); } } }); await tx.send({ type: "increment", amount: 5 }); const [resTx, resRx] = oneshot(); await tx.send({ type: "get", resTx }); const value = await resRx; ``` -------------------------------- ### Antiox Synchronization Primitives Overview Source: https://github.com/rivet-dev/antiox/blob/main/COMPATIBILITY.md Documentation mapping Antiox classes to Tokio Rust equivalents, highlighting supported methods and implementation status. ```APIDOC ## Antiox Synchronization API ### Overview Antiox provides asynchronous synchronization primitives for JavaScript, mirroring the functionality of the Rust Tokio library. ### Core Components - **Semaphore**: `new Semaphore(permits)` - Manage a set of permits. - **Notify**: `new Notify()` - Task notification system. - **Mutex**: `new Mutex()` - Mutual exclusion for async tasks. - **RwLock**: `new RwLock()` - Read-Write lock implementation. - **Barrier**: `new Barrier(n)` - Synchronization point for multiple tasks. - **Select**: `select()` - Asynchronous branch selection using AbortSignal. - **OnceCell**: `new OnceCell()` - Single-assignment cell. - **CancellationToken**: `new CancellationToken()` - Cooperative cancellation. - **DropGuard**: `new DropGuard()` - Resource cleanup using `Symbol.dispose`. ### Usage Example ```javascript import { Mutex } from 'antiox/sync/mutex'; const mutex = new Mutex(); const guard = await mutex.lock(); try { // Critical section } finally { guard.unlock(); } ``` ``` -------------------------------- ### Task Spawning and Management with Antiox Task Source: https://github.com/rivet-dev/antiox/blob/main/README.md The antiox/task module provides utilities for spawning asynchronous tasks with cooperative cancellation support via AbortSignal. It mirrors Tokio's task spawning capabilities, including task handles, aborting tasks, and using JoinSet for managing multiple tasks. ```typescript import { spawn, JoinSet, yieldNow } from "antiox/task"; // Spawn a task (returns awaitable JoinHandle) const handle = spawn(async (signal) => { const res = await fetch("https://example.com", { signal }); return res.text(); }); const result = await handle; // Abort a task handle.abort(); // JoinSet for managing multiple tasks const set = new JoinSet(); set.spawn(async (signal) => 1); set.spawn(async (signal) => 2); set.spawn(async (signal) => 3); for await (const result of set) { console.log(result); // 1, 2, 3 (in completion order) } // Yield to event loop await yieldNow(); ``` -------------------------------- ### Implement Watch Channel for State Broadcasting Source: https://context7.com/rivet-dev/antiox/llms.txt Demonstrates creating a watch channel to broadcast state updates to multiple subscribers. It covers initialization, non-blocking reads, conditional updates, and handling receiver lifecycle events. ```typescript import { watch, RecvError } from "antiox/sync/watch"; const [tx, rx] = watch({ count: 0, name: "initial" }); const rx2 = tx.subscribe(); console.log(rx.borrow()); tx.send({ count: 1, name: "updated" }); await rx.changed(); console.log(rx.borrowAndUpdate()); tx.sendIfModified((current) => { if (current.count < 10) { current.count++; return true; } return false; }); async function watchLoop() { while (true) { try { await rx2.changed(); const value = rx2.borrowAndUpdate(); console.log("Value changed:", value); } catch (e) { if (e instanceof RecvError) break; } } } if (tx.isClosed()) { console.log("No more receivers"); } ``` -------------------------------- ### Task Spawning and JoinSet Source: https://context7.com/rivet-dev/antiox/llms.txt Spawn async tasks with cooperative cancellation via AbortSignal. JoinSet manages multiple concurrent tasks with completion-order iteration. ```APIDOC ## Task Spawning and JoinSet ### Description Spawn async tasks with cooperative cancellation via AbortSignal. JoinSet manages multiple concurrent tasks with completion-order iteration. ### Method N/A (This is a library API, not an HTTP endpoint) ### Endpoint N/A ### Parameters N/A ### Request Example ```typescript import { spawn, JoinSet, JoinHandle, JoinError, yieldNow, joinAll, tryJoinAll } from "antiox/task"; // Spawn a task (returns awaitable JoinHandle) const handle = spawn(async (signal) => { // Use signal for cancellation-aware operations const res = await fetch("https://api.example.com/data", { signal }); return res.json(); }); // Await result const data = await handle; // Abort a running task handle.abort(); // Check if task finished if (handle.isFinished()) { console.log("Task completed"); } // JoinSet for managing multiple concurrent tasks const set = new JoinSet(); set.spawn(async (signal) => { await sleep(100, signal); return 1; }); set.spawn(async (signal) => { await sleep(50, signal); return 2; }); set.spawn(async (signal) => { await sleep(150, signal); return 3; }); // Iterate in completion order for await (const result of set) { console.log(result); // 2, 1, 3 (completion order) } // Or get next completed task manually const next = await set.joinNext(); // returns null when empty // Abort all tasks in set set.abortAll(); // Join multiple handles (waits for all, throws on first error) const handles = [ spawn(async () => 1), spawn(async () => 2), spawn(async () => 3), ]; const results = await joinAll(handles); // [1, 2, 3] // tryJoinAll aborts remaining on first error try { await tryJoinAll(handles); } catch (e) { // Remaining tasks were aborted } // Yield to event loop await yieldNow(); ``` ### Response N/A (This is a library API) ### Response Example N/A ``` -------------------------------- ### Implement Binary Heap Priority Queue Source: https://github.com/rivet-dev/antiox/blob/main/README.md A max-heap implementation providing O(log n) push and pop operations. Useful for managing prioritized data sets. ```typescript import { BinaryHeap } from "antiox/collections/binary_heap"; const heap = new BinaryHeap(); heap.push(3); heap.push(1); heap.push(5); console.log(heap.pop()); // 5 console.log(heap.pop()); // 3 ``` -------------------------------- ### Time Utilities Source: https://context7.com/rivet-dev/antiox/llms.txt Timer primitives including sleep, timeout, and interval generators with AbortSignal support. ```APIDOC ## Time Utilities ### Description Provides utility functions for handling time-based operations in an async environment, fully integrated with `AbortSignal` for cancellation. ### Functions - **sleep(ms, signal?)**: Pauses execution for a duration. - **timeout(ms, promise)**: Wraps a promise with a timeout; throws `TimeoutError` if exceeded. - **timeoutAt(date, promise)**: Wraps a promise with a specific deadline. - **interval(ms, signal?)**: Async generator that yields ticks at a fixed interval. ### Example ```typescript try { const data = await timeout(5000, fetchData()); } catch (e) { if (e instanceof TimeoutError) { console.log("Operation timed out"); } } ``` ``` -------------------------------- ### Timer Primitives with AbortSignal Integration Source: https://github.com/rivet-dev/antiox/blob/main/README.md The antiox/time module offers timer primitives such as sleep, timeout, and interval, all integrated with AbortSignal for cancellation. These utilities are essential for managing time-bound operations and periodic tasks in asynchronous code. ```typescript import { sleep, timeout, interval, TimeoutError } from "antiox/time"; await sleep(1000); try { const data = await timeout(5000, fetchData()); } catch (e) { if (e instanceof TimeoutError) console.log("timed out"); } // All functions accept an optional AbortSignal for cancellation const controller = new AbortController(); for await (const tick of interval(1000, controller.signal)) { console.log(`Tick ${tick}`); if (tick >= 4) break; } ``` -------------------------------- ### Implement Collections with BinaryHeap and Deque Source: https://context7.com/rivet-dev/antiox/llms.txt Provides efficient data structures for priority queues (BinaryHeap) and double-ended queues (Deque). BinaryHeap supports custom comparators for min/max heap functionality. ```typescript import { BinaryHeap } from "antiox/collections/binary_heap"; import { Deque } from "antiox/collections/deque"; const heap = new BinaryHeap(); heap.push(3); console.log(heap.pop()); const deque = new Deque(); deque.pushFront("front"); deque.push("back"); console.log(deque.shift()); ``` -------------------------------- ### Async Stream Combinators with Antiox Source: https://context7.com/rivet-dev/antiox/llms.txt Provides composable async stream combinators that work with any AsyncIterable. Supports basic transformations like map, filter, take, skip, and collection methods like collect, bufferUnordered, and buffered for concurrent processing. Includes advanced features like merge, chain, zip, chunks, throttle, timeout, fold, enumerate, peekable, and forEachConcurrent for complex stream manipulation. The pipe helper allows for fluent composition of these combinators. ```typescript import { map, filter, take, skip, chunks, collect, merge, chain, zip, bufferUnordered, buffered, throttle, timeout, fold, pipe, enumerate, scan, flatMap, takeWhile, skipWhile, filterMap, peekable, count, any, all, forEach, forEachConcurrent, chunksTimeout } from "antiox/stream"; // Basic transformations const doubled = map([1, 2, 3], (x) => x * 2); const evens = filter([1, 2, 3, 4], (x) => x % 2 === 0); const first3 = take(source, 3); const after5 = skip(source, 5); // Collect async iterable to array const results = await collect(doubled); // [2, 4, 6] // Concurrent execution with bufferUnordered (completion order) const urls = ["url1", "url2", "url3"]; const responses = await collect( bufferUnordered( map(urls, (url) => fetch(url)), 10 // concurrency limit ) ); // Concurrent execution with buffered (preserves order) const orderedResponses = await collect( buffered( map(urls, (url) => fetch(url)), 10 ) ); // Merge multiple streams (interleaved output) for await (const item of merge(stream1, stream2, stream3)) { console.log(item); } // Chain streams sequentially for await (const item of chain(stream1, stream2)) { console.log(item); } // Zip two streams together for await (const [a, b] of zip(streamA, streamB)) { console.log(a, b); } // Batch items into chunks for await (const batch of chunks(source, 10)) { await processBatch(batch); } // Time-based chunking for await (const batch of chunksTimeout(source, 100, 5000)) { // Yields when 100 items collected OR 5 seconds elapsed } // Pipe for fluent composition const processed = pipe( source, (s) => filter(s, (x) => x > 0), (s) => map(s, (x) => x * 2), (s) => take(s, 100), (s) => chunks(s, 10) ); // Throttle stream (minimum interval between yields) for await (const item of throttle(source, 100)) { console.log(item); // At most 10 items/second } // Timeout per item for await (const item of timeout(source, 5000)) { // Throws TimeoutError if item takes > 5s } // Fold/reduce const sum = await fold(source, 0, (acc, x) => acc + x); // Enumerate with index for await (const [i, item] of enumerate(source)) { console.log(`${i}: ${item}`); } // Peekable iterator const peek = peekable(source); const next = await peek.peek(); // Look ahead without consuming // Concurrent forEach await forEachConcurrent(source, 5, async (item) => { await processItem(item); }); ``` -------------------------------- ### Communicate via MPSC Channels Source: https://github.com/rivet-dev/antiox/blob/main/README.md Multi-producer, single-consumer channels providing backpressure and disconnection detection, similar to Rust's tokio::sync::mpsc. ```typescript import { channel, unboundedChannel } from "antiox/sync/mpsc"; const [tx, rx] = channel(32); await tx.send("hello"); const msg = await rx.recv(); const tx2 = tx.clone(); await tx2.send("from tx2"); for await (const msg of rx) { console.log(msg); } const [utx, urx] = unboundedChannel(); utx.send(42); ``` -------------------------------- ### Barrier: N-Task Synchronization in TypeScript Source: https://context7.com/rivet-dev/antiox/llms.txt A reusable barrier primitive that synchronizes N tasks. All tasks wait until the specified number of tasks arrive, at which point they are all released simultaneously. The barrier automatically resets after release. Requires 'antiox/sync/barrier'. ```typescript import { Barrier, BarrierWaitResult } from "antiox/sync/barrier"; // Create barrier for 3 tasks const barrier = new Barrier(3); // Spawn tasks that synchronize at barrier async function worker(id: number) { console.log(`Worker ${id} starting`); await doWork(); const result = await barrier.wait(); if (result.isLeader()) { console.log(`Worker ${id} is the leader`); // Leader can do cleanup or coordination } console.log(`Worker ${id} continuing`); } // All workers wait until 3rd arrives Promise.all([worker(1), worker(2), worker(3)]); // Barrier is reusable - automatically resets after release ``` -------------------------------- ### Oneshot Channel - Single-Use Promise-Like Channel Source: https://context7.com/rivet-dev/antiox/llms.txt Creates a single-use channel that sends exactly one value. The receiver implements PromiseLike, making it directly awaitable. ```APIDOC ## Oneshot Channel - Single-Use Promise-Like Channel ### Description Creates a single-use channel that sends exactly one value. The receiver implements PromiseLike, making it directly awaitable. ### Method N/A (This is a library usage example, not a direct API endpoint) ### Endpoint N/A ### Parameters N/A ### Request Example ```typescript import { oneshot, OneshotSender, RecvError, SendError } from "antiox/sync/oneshot"; // Create oneshot channel const [tx, rx] = oneshot(); // Send a single value tx.send("done"); // Receiver is directly awaitable (implements PromiseLike) const value = await rx; // "done" // Request-response pattern with oneshot type Request = { type: "get"; resTx: OneshotSender }; async function handleRequest(req: Request) { if (req.type === "get") { req.resTx.send(42); } } const [resTx, resRx] = oneshot(); await handleRequest({ type: "get", resTx }); const result = await resRx; // 42 // Non-blocking receive try { const immediate = rx.tryRecv(); } catch (e) { if (e instanceof RecvError) { console.log("No value sent yet"); } } // Check if receiver is closed if (tx.isClosed()) { console.log("Receiver dropped"); } // Wait for receiver to close await tx.closed(); ``` ### Response N/A ### Response Example N/A ``` -------------------------------- ### Stream Combinators Source: https://context7.com/rivet-dev/antiox/llms.txt Async stream combinators that work with any AsyncIterable, supporting transformation, concurrency, and flow control. ```APIDOC ## Stream Combinators ### Description Provides a set of functions to transform, filter, merge, and batch AsyncIterables. These functions are composable and support the `pipe` helper for fluent API usage. ### Usage - **Transformation**: `map`, `filter`, `take`, `skip` - **Concurrency**: `bufferUnordered`, `buffered`, `forEachConcurrent` - **Aggregation**: `collect`, `fold`, `merge`, `chain`, `zip` - **Flow Control**: `throttle`, `timeout`, `chunks`, `chunksTimeout` ### Example ```typescript import { pipe, filter, map, take, chunks } from "antiox/stream"; const processed = pipe( source, (s) => filter(s, (x) => x > 0), (s) => map(s, (x) => x * 2), (s) => take(s, 100), (s) => chunks(s, 10) ); ``` ``` -------------------------------- ### Halt Execution with Panic Utilities Source: https://context7.com/rivet-dev/antiox/llms.txt Provides diverging functions to halt execution, mark unimplemented code, or enforce exhaustive type checking in switch statements, mirroring Rust's panic primitives. ```typescript import { panic, todo, unreachable } from "antiox/panic"; function processConfig(config: Config) { if (!config.apiKey) { panic("API key is required"); } } function move(dir: Direction): [number, number] { switch (dir) { case "north": return [0, 1]; default: unreachable(dir); } } ``` -------------------------------- ### MPSC Channel - Multi-Producer Single-Consumer Source: https://context7.com/rivet-dev/antiox/llms.txt Creates a bounded or unbounded channel for message passing between async tasks with backpressure support. Bounded channels block senders when full, while unbounded channels never block on send. ```APIDOC ## MPSC Channel - Multi-Producer Single-Consumer ### Description Creates a bounded or unbounded channel for message passing between async tasks with backpressure support. Bounded channels block senders when full, while unbounded channels never block on send. ### Method N/A (This is a library usage example, not a direct API endpoint) ### Endpoint N/A ### Parameters N/A ### Request Example ```typescript import { channel, unboundedChannel, SendError, TrySendError } from "antiox/sync/mpsc"; // Bounded channel with backpressure (capacity 32) const [tx, rx] = channel(32); // Send messages (blocks if channel is full) await tx.send("hello"); await tx.send("world"); // Clone sender for multi-producer pattern const tx2 = tx.clone(); await tx2.send("from tx2"); // Receive messages (returns null when channel closes) const msg = await rx.recv(); // "hello" // Non-blocking send/receive with error handling try { tx.trySend("immediate"); } catch (e) { if (e instanceof TrySendError) { console.log(e.kind); // "full" or "closed" } } // Async iteration for clean consumption for await (const message of rx) { console.log(message); } // Unbounded channel (never blocks on send) const [utx, urx] = unboundedChannel(); utx.send(42); // synchronous, never blocks utx.send(43); const value = await urx.recv(); // 42 // Reserve a permit before sending (for flow control) const permit = await tx.reserve(); permit.send("reserved value"); // Wait for channel closure await tx.closed(); // Close sender (disposable pattern) tx.close(); // or use Symbol.dispose tx[Symbol.dispose](); ``` ### Response N/A ### Response Example N/A ``` -------------------------------- ### Time Utilities with AbortSignal Source: https://context7.com/rivet-dev/antiox/llms.txt Provides timer primitives with AbortSignal integration for cancellation. Includes `sleep` for pausing execution, `timeout` and `timeoutAt` for setting deadlines on promises, and `interval` for creating time-based generators. Handles `TimeoutError` for timed-out operations. ```typescript import { sleep, timeout, timeoutAt, interval, TimeoutError } from "antiox/time"; // Sleep with optional cancellation await sleep(1000); const controller = new AbortController(); sleep(5000, controller.signal).catch(() => { console.log("Sleep cancelled"); }); controller.abort(); // Timeout wrapper for promises try { const data = await timeout(5000, fetchData()); } catch (e) { if (e instanceof TimeoutError) { console.log("Operation timed out"); } } // Timeout at specific deadline const deadline = new Date(Date.now() + 10000); const result = await timeoutAt(deadline, longOperation()); // Interval generator for await (const tick of interval(1000, controller.signal)) { console.log(`Tick ${tick}`); if (tick >= 4) break; } ``` -------------------------------- ### Ensure Cleanup with DropGuard Source: https://github.com/rivet-dev/antiox/blob/main/README.md A utility to ensure specific cleanup logic runs when an object is disposed, supporting manual disarming to prevent execution. ```typescript import { DropGuard } from "antiox/sync/drop_guard"; const guard = new DropGuard(() => cleanup()); guard.disarm(); ``` -------------------------------- ### Synchronize Tasks with Barrier Source: https://github.com/rivet-dev/antiox/blob/main/README.md A synchronization primitive where N tasks wait until all N tasks have arrived at the barrier before proceeding. ```typescript import { Barrier } from "antiox/sync/barrier"; const barrier = new Barrier(3); const result = await barrier.wait(); if (result.isLeader()) console.log("I'm the leader"); ``` -------------------------------- ### Implement Double-Ended Queue Source: https://github.com/rivet-dev/antiox/blob/main/README.md A double-ended queue (deque) allowing O(1) push and pop operations from both the front and back of the collection. ```typescript import { Deque } from "antiox/collections/deque"; const dq = new Deque(); dq.push(1); dq.push(2); dq.pushFront(0); console.log(dq.shift()); // 0 console.log(dq.pop()); // 2 ``` -------------------------------- ### MPSC Channel: Async Message Passing in TypeScript Source: https://context7.com/rivet-dev/antiox/llms.txt Implements Multi-Producer Single-Consumer channels for asynchronous message passing. Supports bounded and unbounded configurations, backpressure, cloning senders, and non-blocking operations with error handling. Receivers can be consumed via async iteration. ```typescript import { channel, unboundedChannel, SendError, TrySendError } from "antiox/sync/mpsc"; // Bounded channel with backpressure (capacity 32) const [tx, rx] = channel(32); // Send messages (blocks if channel is full) await tx.send("hello"); await tx.send("world"); // Clone sender for multi-producer pattern const tx2 = tx.clone(); await tx2.send("from tx2"); // Receive messages (returns null when channel closes) const msg = await rx.recv(); // "hello" // Non-blocking send/receive with error handling try { tx.trySend("immediate"); } catch (e) { if (e instanceof TrySendError) { console.log(e.kind); // "full" or "closed" } } // Async iteration for clean consumption for await (const message of rx) { console.log(message); } // Unbounded channel (never blocks on send) const [utx, urx] = unboundedChannel(); utx.send(42); // synchronous, never blocks utx.send(43); const value = await urx.recv(); // 42 // Reserve a permit before sending (for flow control) const permit = await tx.reserve(); permit.send("reserved value"); // Wait for channel closure await tx.closed(); // Close sender (disposable pattern) tx.close(); // or use Symbol.dispose tx[Symbol.dispose](); ``` -------------------------------- ### Notify: Simple Signaling in TypeScript Source: https://context7.com/rivet-dev/antiox/llms.txt The simplest synchronization primitive, used for waking up waiting tasks. It allows tasks to wait for a notification and for one or all waiting tasks to be notified. It can also store a permit if no tasks are currently waiting. Requires 'antiox/sync/notify'. ```typescript import { Notify } from "antiox/sync/notify"; // Create notify const notify = new Notify(); // Wait for notification async function waiter() { await notify.notified(); console.log("Was notified!"); } // Start waiter waiter(); // Wake one waiter notify.notifyOne(); // Wake all waiters notify.notifyWaiters(); // Notify stores a permit if no waiters notify.notifyOne(); // Stores permit await notify.notified(); // Consumes stored permit immediately ``` -------------------------------- ### Initialize Values with OnceCell Source: https://context7.com/rivet-dev/antiox/llms.txt OnceCell provides a thread-safe way to initialize a value exactly once. It supports asynchronous initialization and provides methods to check status or set the value manually. ```typescript import { OnceCell } from "antiox/sync/once_cell"; const configCell = new OnceCell(); const config = await configCell.getOrInit(async () => { const response = await fetch("/api/config"); return response.json(); }); if (configCell.isInitialized()) { console.log(configCell.get()); } ``` -------------------------------- ### Halt Execution with Panic Utilities Source: https://github.com/rivet-dev/antiox/blob/main/README.md Diverging functions that mirror Rust's panic, todo, and unreachable macros. These are used to handle invariant violations, stub unfinished code, and ensure exhaustive type checking. ```typescript import { panic, todo, unreachable } from "antiox/panic"; if (!isValid) panic("invariant violated"); function processEvent(event: Event): Result { switch (event.type) { case "click": return handleClick(event); case "hover": todo("hover support"); } } type Direction = "north" | "south" | "east" | "west"; function move(dir: Direction) { switch (dir) { case "north": return [0, 1]; case "south": return [0, -1]; case "east": return [1, 0]; case "west": return [-1, 0]; default: unreachable(dir); } } ``` -------------------------------- ### Semaphore: Concurrency Limiting in TypeScript Source: https://context7.com/rivet-dev/antiox/llms.txt A counting semaphore for limiting concurrent access to resources. It allows acquiring single or multiple permits and supports non-blocking acquisition. Includes a utility for rate-limited operations and automatic release. Requires 'antiox/sync/semaphore'. ```typescript import { Semaphore, SemaphorePermit, AcquireError } from "antiox/sync/semaphore"; // Create semaphore with 3 permits const sem = new Semaphore(3); // Acquire single permit const permit = await sem.acquire(); // ... do work ... permit.release(); // Acquire multiple permits const multiPermit = await sem.acquireMany(2); multiPermit.release(); // Non-blocking acquisition try { const permit = sem.tryAcquire(); permit.release(); } catch (e) { if (e instanceof AcquireError) { console.log("No permits available"); } } // Check available permits console.log(sem.availablePermits()); // Limit concurrent operations async function rateLimitedFetch(urls: string[]) { const sem = new Semaphore(10); // Max 10 concurrent requests return Promise.all(urls.map(async (url) => { const permit = await sem.acquire(); try { return await fetch(url); } finally { permit.release(); } })); } // Using with automatic release { using permit = await sem.acquire(); await doWork(); } // permit automatically released ``` -------------------------------- ### Watch Channel - Single-Value Broadcast Source: https://context7.com/rivet-dev/antiox/llms.txt Creates a channel that holds a single value, notifying receivers when it changes. Ideal for sharing configuration or state that updates over time. ```APIDOC ## Watch Channel - Single-Value Broadcast ### Description Creates a channel that holds a single value, notifying receivers when it changes. Ideal for sharing configuration or state that updates over time. ### Method N/A (This is a library API, not an HTTP endpoint) ### Endpoint N/A ### Parameters N/A ### Request Example ```typescript import { watch, RecvError } from "antiox/sync/watch"; // Create watch channel with initial value const [tx, rx] = watch({ count: 0, name: "initial" }); // Subscribe additional receivers const rx2 = tx.subscribe(); // Read current value (non-blocking) console.log(rx.borrow()); // { count: 0, name: "initial" } // Update the value tx.send({ count: 1, name: "updated" }); // Wait for changes and read await rx.changed(); console.log(rx.borrowAndUpdate()); // { count: 1, name: "updated" } // Conditional update with sendIfModified tx.sendIfModified((current) => { if (current.count < 10) { current.count++; return true; // value was modified } return false; // no modification }); // Watch for changes in a loop async function watchLoop() { while (true) { try { await rx2.changed(); const value = rx2.borrowAndUpdate(); console.log("Value changed:", value); } catch (e) { if (e instanceof RecvError) break; } } } // Check if sender is closed (all receivers dropped) if (tx.isClosed()) { console.log("No more receivers"); } ``` ### Response N/A (This is a library API) ### Response Example N/A ``` -------------------------------- ### Ensure Resource Cleanup with DropGuard Source: https://context7.com/rivet-dev/antiox/llms.txt DropGuard ensures that a cleanup function is executed when a scope is exited. It provides a disarm method to prevent cleanup if the operation succeeds. ```typescript import { DropGuard } from "antiox/sync/drop_guard"; { const conn = await database.connect(); using guard = new DropGuard(() => conn.close()); await performQueries(conn); if (shouldKeepConnection) { guard.disarm(); } } ``` -------------------------------- ### Select - Race Async Branches Source: https://context7.com/rivet-dev/antiox/llms.txt Race multiple async operations and automatically cancel losers, with TypeScript type narrowing. ```APIDOC ## Select - Race Multiple Async Branches ### Description Allows racing multiple asynchronous operations. The first to complete wins, and the others are cancelled via `AbortSignal`. The result is returned with a key indicating which branch won. ### Parameters - **branches** (Object) - Required - An object where keys are identifiers and values are functions accepting an `AbortSignal`. - **signal** (AbortSignal) - Optional - Parent signal for external cancellation. ### Example ```typescript const result = await select({ message: (signal) => rx.recv(signal), timeout: (signal) => sleep(5000, signal) }); if (result.key === "message") { console.log("Received:", result.value); } ``` ``` -------------------------------- ### Manage Hierarchical Cancellation with CancellationToken Source: https://context7.com/rivet-dev/antiox/llms.txt CancellationToken enables tree-structured cancellation, where canceling a parent token automatically cancels all its children. It supports manual cancellation and integration with Symbol.dispose for scoped cleanup. ```typescript import { CancellationToken } from "antiox/sync/cancellation_token"; const token = new CancellationToken(); const child1 = token.child(); spawn(async () => { await token.cancelled(); console.log("Token was cancelled"); }); token.cancel(); { using scopedToken = new CancellationToken(); await doWork(scopedToken); } ``` -------------------------------- ### Notify Tasks of Events Source: https://github.com/rivet-dev/antiox/blob/main/README.md A lightweight synchronization primitive used to wake one or all tasks waiting for a notification. ```typescript import { Notify } from "antiox/sync/notify"; const notify = new Notify(); await notify.notified(); notify.notifyOne(); ``` -------------------------------- ### Async Lazy Initialization with OnceCell Source: https://github.com/rivet-dev/antiox/blob/main/README.md The OnceCell primitive allows for asynchronous lazy initialization of a value. It computes the value exactly once and shares it across multiple asynchronous tasks, ensuring that initialization logic runs only when the value is first needed. ```typescript import { OnceCell } from "antiox/sync/once_cell"; const cell = new OnceCell(); const config = await cell.getOrInit(async () => loadConfig()); ``` -------------------------------- ### Single-Use Channel with Oneshot Source: https://github.com/rivet-dev/antiox/blob/main/README.md Oneshot provides a single-use channel for sending exactly one value. The receiver can await this value, making it suitable for scenarios where a single result needs to be communicated between tasks. ```typescript import { oneshot } from "antiox/sync/oneshot"; const [tx, rx] = oneshot(); tx.send("done"); const value = await rx; // "done" ``` -------------------------------- ### Read-Write Lock (RwLock) for Concurrent Access Source: https://github.com/rivet-dev/antiox/blob/main/README.md RwLock implements a read-write lock, allowing multiple concurrent readers or a single exclusive writer. This is useful for protecting shared data structures where reads are frequent and writes are less common. ```typescript import { RwLock } from "antiox/sync/rwlock"; const lock = new RwLock({ data: "hello" }); const reader = await lock.read(); console.log(reader.value); reader.release(); const writer = await lock.write(); writer.value = { data: "world" }; writer.release(); ``` -------------------------------- ### Single-Value Broadcast with Watch Source: https://github.com/rivet-dev/antiox/blob/main/README.md The watch primitive facilitates single-value broadcast communication. A single sender can update a value, and multiple receivers can observe these changes. Receivers can subscribe to receive updates and check for changes. ```typescript import { watch } from "antiox/sync/watch"; const [tx, rx] = watch("initial"); const rx2 = tx.subscribe(); tx.send("updated"); await rx.changed(); console.log(rx.borrowAndUpdate()); // "updated" ``` -------------------------------- ### RwLock: Reader-Writer Lock in TypeScript Source: https://context7.com/rivet-dev/antiox/llms.txt Provides a reader-writer lock allowing multiple concurrent readers or one exclusive writer. It is writer-preferring to prevent writer starvation. Supports blocking and non-blocking operations, and automatic release. Requires 'antiox/sync/rwlock'. ```typescript import { RwLock, RwLockReadGuard, RwLockWriteGuard } from "antiox/sync/rwlock"; // Create RwLock with protected value const lock = new RwLock({ users: [], version: 1 }); // Multiple readers can access concurrently const reader1 = await lock.read(); const reader2 = await lock.read(); console.log(reader1.value.users); console.log(reader2.value.version); reader1.release(); reader2.release(); // Writer gets exclusive access const writer = await lock.write(); writer.value = { users: [...writer.value.users, "alice"], version: writer.value.version + 1 }; writer.release(); // Using with automatic release { using reader = await lock.read(); console.log(reader.value); } { using writer = await lock.write(); writer.value = { ...writer.value, version: 2 }; } // Non-blocking variants const reader = lock.tryRead(); const writer = lock.tryWrite(); ``` -------------------------------- ### Oneshot Channel: Single-Value Async Communication in TypeScript Source: https://context7.com/rivet-dev/antiox/llms.txt Provides a single-use channel for sending exactly one value asynchronously. The receiver implements PromiseLike, allowing direct awaitability. Supports non-blocking receives and checking channel closure status. ```typescript import { oneshot, OneshotSender, RecvError, SendError } from "antiox/sync/oneshot"; // Create oneshot channel const [tx, rx] = oneshot(); // Send a single value tx.send("done"); // Receiver is directly awaitable (implements PromiseLike) const value = await rx; // "done" // Request-response pattern with oneshot type Request = { type: "get"; resTx: OneshotSender }; async function handleRequest(req: Request) { if (req.type === "get") { req.resTx.send(42); } } const [resTx, resRx] = oneshot(); await handleRequest({ type: "get", resTx }); const result = await resRx; // 42 // Non-blocking receive try { const immediate = rx.tryRecv(); } catch (e) { if (e instanceof RecvError) { console.log("No value sent yet"); } } // Check if receiver is closed if (tx.isClosed()) { console.log("Receiver dropped"); } // Wait for receiver to close await tx.closed(); ``` -------------------------------- ### Mutex: Async Mutual Exclusion in TypeScript Source: https://context7.com/rivet-dev/antiox/llms.txt Implements an asynchronous mutex for protecting shared state across await points. It supports blocking locks, non-blocking try locks, and automatic release using Symbol.dispose. Dependencies include the 'antiox/sync/mutex' module. ```typescript import { Mutex, MutexGuard } from "antiox/sync/mutex"; // Create mutex with protected value const mutex = new Mutex({ count: 0, data: [] }); // Acquire lock (blocks until available) const guard = await mutex.lock(); guard.value = { ...guard.value, count: guard.value.count + 1 }; guard.release(); // Using with automatic release via Symbol.dispose { using guard = await mutex.lock(); guard.value.data.push("item"); } // guard automatically released // Non-blocking try lock try { const guard = mutex.tryLock(); // Use guard... guard.release(); } catch (e) { console.log("Mutex already locked"); } // Lock with AbortSignal const controller = new AbortController(); const guard = await mutex.lock(controller.signal); ``` -------------------------------- ### Select - Race Multiple Async Branches Source: https://context7.com/rivet-dev/antiox/llms.txt Enables racing multiple asynchronous operations, automatically cancelling losers. TypeScript narrows the result type based on the winning branch, providing type safety. It accepts an object where keys are identifiers and values are functions returning promises or async iterables, optionally taking an AbortSignal for external cancellation. ```typescript import { select } from "antiox/sync/select"; import { sleep } from "antiox/time"; // Race multiple async operations const result = await select({ message: (signal) => rx.recv(signal), timeout: (signal) => sleep(5000, signal), userInput: (signal) => getUserInput(signal), }); // TypeScript narrows based on key if (result.key === "message") { console.log("Received:", result.value); // value is message type } else if (result.key === "timeout") { console.log("Timed out"); } else { console.log("User input:", result.value); } // With parent AbortSignal for external cancellation const controller = new AbortController(); const winner = await select({ fast: (signal) => fastOperation(signal), slow: (signal) => slowOperation(signal), }, controller.signal); ``` -------------------------------- ### Race Multiple Async Branches with Select Source: https://github.com/rivet-dev/antiox/blob/main/README.md The select primitive allows racing multiple asynchronous operations. It returns the result of the first operation to complete and cancels the others. TypeScript can narrow the result type based on which branch completed. ```typescript import { select } from "antiox/sync/select"; import { sleep } from "antiox/time"; const result = await select({ msg: (signal) => rx.recv(), timeout: (signal) => sleep(5000, signal), }); if (result.key === "msg") { console.log(result.value); // narrowed type } ``` -------------------------------- ### Manage Task Cancellation Source: https://github.com/rivet-dev/antiox/blob/main/README.md Tree-structured cancellation mechanism. Cancelling a parent token automatically propagates the cancellation to all child tokens. ```typescript import { CancellationToken } from "antiox/sync/cancellation_token"; const token = new CancellationToken(); const child = token.child(); spawn(async () => { await child.cancelled(); console.log("cancelled!"); }); token.cancel(); ``` -------------------------------- ### Broadcast Messages to Multiple Receivers Source: https://github.com/rivet-dev/antiox/blob/main/README.md A multi-producer, multi-consumer bounded channel where every subscriber receives every message sent to the channel. ```typescript import { broadcast } from "antiox/sync/broadcast"; const [tx, rx1] = broadcast(16); const rx2 = tx.subscribe(); tx.send("hello"); console.log(await rx1.recv()); // "hello" console.log(await rx2.recv()); // "hello" ``` -------------------------------- ### Process Async Streams Source: https://github.com/rivet-dev/antiox/blob/main/README.md Functional combinators for AsyncIterable. Provides tools for mapping, filtering, buffering, and merging asynchronous data streams without wrapper objects. ```typescript import { map, filter, bufferUnordered, collect, pipe, merge, chunks } from "antiox/stream"; const results = await collect( bufferUnordered( map(urls, (url) => fetch(url)), 10, ), ); const processed = pipe( source, (s) => filter(s, (x) => x > 0), (s) => map(s, (x) => x * 2), (s) => chunks(s, 10), ); for await (const item of merge(stream1, stream2, stream3)) { console.log(item); } ``` -------------------------------- ### Protect Shared State with Mutex Source: https://github.com/rivet-dev/antiox/blob/main/README.md An asynchronous mutex that guarantees exclusive access to a value across multiple await points. ```typescript import { Mutex } from "antiox/sync/mutex"; const mutex = new Mutex({ count: 0 }); const guard = await mutex.lock(); guard.value = { count: guard.value.count + 1 }; guard.release(); ```