### Run Rust Project with Cargo Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_0/chapter_0_0.md Executes the compiled Rust project. Initially, this command will run the default 'Hello, world!' program before any dataflow logic is added. ```shell cargo run ``` -------------------------------- ### Create New Rust Project with Cargo Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_0/chapter_0_0.md Initializes a new Rust project directory named `my_project` using the Cargo build manager. This command sets up the basic file structure required for a Rust application. ```shell cargo new my_project ``` -------------------------------- ### Run Differential Dataflow Server Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md Invokes the main server binary for Differential Dataflow. The server starts and waits for input, indicating it is ready to receive commands and dataflows. ```Shell cargo run --bin server ``` -------------------------------- ### Execute Differential Dataflow Hello Example with Inspection Source: https://github.com/timelydataflow/differential-dataflow/blob/master/README.md Runs the `hello` example of differential dataflow with a large dataset (10 million nodes, 50 million edges) and enables inspection to observe the initial computation time and the resulting distinct degrees. The output shows the observed degrees and the loading time for the initial graph. ```Shell cargo run --release --example hello -- 10000000 50000000 1 inspect Finished release [optimized + debuginfo] target(s) in 0.04s Running `target/release/examples/hello 10000000 50000000 1 inspect` observed: ((1, 336908), 0, 1) observed: ((2, 843854), 0, 1) observed: ((3, 1404462), 0, 1) observed: ((4, 1751921), 0, 1) observed: ((5, 1757099), 0, 1) observed: ((6, 1459805), 0, 1) observed: ((7, 1042894), 0, 1) observed: ((8, 653178), 0, 1) observed: ((9, 363983), 0, 1) observed: ((10, 181423), 0, 1) observed: ((11, 82478), 0, 1) observed: ((12, 34407), 0, 1) observed: ((13, 13216), 0, 1) observed: ((14, 4842), 0, 1) observed: ((15, 1561), 0, 1) observed: ((16, 483), 0, 1) observed: ((17, 143), 0, 1) observed: ((18, 38), 0, 1) observed: ((19, 8), 0, 1) observed: ((20, 3), 0, 1) observed: ((22, 1), 0, 1) round 0 finished after 15.470465014s (loading) ``` -------------------------------- ### Execute Differential Dataflow Hello Example without Inspection Source: https://github.com/timelydataflow/differential-dataflow/blob/master/README.md Runs the `hello` example with the same large dataset but disables inspection to measure the raw performance of subsequent updates without the overhead of printing detailed output. The output shows the initial loading time and subsequent update times, demonstrating high update throughput. ```Shell cargo run --release --example hello -- 10000000 50000000 1 no_inspect Finished release [optimized + debuginfo] target(s) in 0.04s Running `target/release/examples/hello 10000000 50000000 1 no_inspect` round 0 finished after 15.586969662s (loading) round 1 finished after 1.070239ms round 2 finished after 2.303187ms round 3 finished after 208.45µs round 4 finished after 163.224µs round 5 finished after 118.792µs ... ``` -------------------------------- ### Run Differential Dataflow Hello Example with Batch Size 100 Source: https://github.com/timelydataflow/differential-dataflow/blob/master/README.md Executes the `hello` example of differential dataflow using the `count_total` optimization. This command processes 10 million records with 50 million updates, in batches of 100, showcasing improved performance with a moderate batch size. ```Shell cargo run --release --example hello -- 10000000 50000000 100 no_inspect -w2 ``` -------------------------------- ### Run Differential Dataflow Hello Example with Batch Size 10 Source: https://github.com/timelydataflow/differential-dataflow/blob/master/README.md Executes the `hello` example of differential dataflow using the `count_total` optimization. This command processes 10 million records with 50 million updates, in batches of 10, demonstrating the initial performance improvements for smaller batch sizes. ```Shell cargo run --release --example hello -- 10000000 50000000 10 no_inspect -w2 ``` -------------------------------- ### Initial Latency Output from Degree Distribution Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md Displays an example of the initial latency distribution reported by the 'degr_dist' computation. The higher latencies observed here are typically due to the computation catching up on historical data. ```Log Output delays: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 6, 16, 18, 64, 66, 0, 9, 0, 0, 0, 0, 0, 0, 0] ``` -------------------------------- ### Optimized Latency Output from Degree Distribution Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md Presents examples of latency distributions when the system is run with release optimizations. These outputs demonstrate further improved performance, with latencies typically falling into lower microsecond bins. ```Log Output delays: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 16, 6, 26, 46, 14, 0, 9, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] ``` -------------------------------- ### Build Random Graph Dataflow Library Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md Navigates to the 'random_graph' dataflow directory and compiles the Rust project. This command produces 'librandom_graph.dylib', a shared library ready to be loaded by the server. ```Shell cd dataflows/random_graph cargo build ``` -------------------------------- ### Run Differential Dataflow Hello Example with Batch Size 100000 Source: https://github.com/timelydataflow/differential-dataflow/blob/master/README.md Executes the `hello` example of differential dataflow using the `count_total` optimization. This command processes 10 million records with 50 million updates, in large batches of 100,000, illustrating the significant performance gains achieved with larger batching, absorbing over one million rounds of updates per second. ```Shell cargo run --release --example hello -- 10000000 50000000 100000 no_inspect -w2 ``` -------------------------------- ### Build Degree Distribution Dataflow Library Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md Navigates to the 'degr_dist' dataflow directory and compiles the Rust project. This command produces 'libdegr_dist.dylib', another shared library for server use. ```Shell cd ../degr_dist cargo build ``` -------------------------------- ### Run Differential Dataflow Hello Example with Multiple Workers Source: https://github.com/timelydataflow/differential-dataflow/blob/master/README.md These commands execute the `hello` example of Differential Dataflow using two workers (`-w2`) to process a graph with 10 million nodes and 50 million edges. Each command varies the number of processing rounds to demonstrate how worker efficiency changes with the workload size. The accompanying text describes the observed performance improvements and limitations. ```Shell cargo run --release --example hello -- 10000000 50000000 1 no_inspect -w2 ``` ```Shell cargo run --release --example hello -- 10000000 50000000 10 no_inspect -w2 ``` ```Shell cargo run --release --example hello -- 10000000 50000000 100 no_inspect -w2 ``` ```Shell cargo run --release --example hello -- 10000000 50000000 100000 no_inspect -w2 ``` -------------------------------- ### Load Random Graph Dataflow into Server Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md Loads the compiled 'librandom_graph.dylib' into the running Differential Dataflow server. It binds a random graph with 1,000 nodes and a sliding window over 2,000 edges to the specified '', updating ten times per second. ```Application CLI load ./dataflows/random_graph/target/debug/librandom_graph.dylib build 1000 2000 10 ``` -------------------------------- ### Add Timely and Differential Dataflow Dependencies Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_0/chapter_0_0.md Modifies the `Cargo.toml` configuration file to include `timely` and `differential-dataflow` as project dependencies. This step is essential for using the dataflow libraries in the Rust project. ```toml [package] name = "my_project" version = "0.1.0" authors = ["Your Name "] [dependencies] timely = "=0.13.0" differential-dataflow = "=0.13.0" ``` -------------------------------- ### Rust Example: Using the Join Operator Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_2/chapter_2_5.md This Rust example demonstrates how to use the `join` operator in Differential Dataflow. It first re-keys the `manages` collection from `(m2, m1)` to `(m1, m2)` to enable joining on `m1`, then performs the join with the original `manages` collection, and finally inspects the results. ```Rust # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; # use differential_dataflow::Collection; # use differential_dataflow::operators::Join; # use differential_dataflow::lattice::Lattice; # fn example(manages: &Collection) # where G::Timestamp: Lattice # { manages .map(|(m2, m1)| (m1, m2)) .join(&manages) .inspect(|x| println!("{:?}", x)); # } ``` -------------------------------- ### Stable Latency Output from Degree Distribution Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md Shows examples of improved latency distributions after the initial catch-up phase. These outputs indicate more stable and lower latencies, typically in the range of one to four milliseconds, when running without release optimizations. ```Log Output delays: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] delays: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] ``` -------------------------------- ### Rust Example: Applying the Consolidate Operator Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_2/chapter_2_4.md This Rust code illustrates the effect of the `consolidate` operator. By inserting `consolidate()` before `inspect()`, the code ensures that all updates for a specific element at a given time are combined into a single physical tuple, simplifying the output and improving efficiency. ```Rust # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; # use differential_dataflow::Collection; # use differential_dataflow::lattice::Lattice; # fn example(manages: &Collection) # where G::Timestamp: Lattice # { manages .map(|(m2, m1)| (m1, m2)) .concat(&manages) .consolidate() .inspect(|x| println!("{:?}", x)); # } ``` -------------------------------- ### Execute Naive Differential Delta Query (Rust) Source: https://github.com/timelydataflow/differential-dataflow/blob/master/dogsdogsdogs/README.md This command runs the `delta_query.rs` example on the LiveJournal graph, processing nodes one at a time. It highlights the initial fast performance followed by a significant slowdown when a high-degree node is encountered, demonstrating the limitations of the naive approach. ```Shell Echidnatron% cargo run --release --example delta_query -- ~/Projects/Datasets/livejournal 1 ``` -------------------------------- ### Rust Crate and Trait Imports for Differential Dataflow Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md This snippet shows the initial Rust boilerplate for a Differential Dataflow application. It declares external crates like `timely`, `differential-dataflow`, and `dd_server`, and imports necessary modules and traits for dataflow operations and shared state management. ```Rust extern crate timely; extern crate differential_dataflow; extern crate dd_server; use std::rc::Rc; use std::cell::RefCell; use timely::dataflow::operators::inspect::Inspect; use timely::dataflow::operators::Probe; use differential_dataflow::operators::CountTotal; use dd_server::{Environment, TraceHandle}; ``` -------------------------------- ### Execute Worst-Case Optimal Delta Query (Rust) Source: https://github.com/timelydataflow/differential-dataflow/blob/master/dogsdogsdogs/README.md This command runs the `delta_query_wcoj.rs` example, which employs a worst-case optimal join strategy. It demonstrates significantly improved and more consistent performance compared to the naive implementation, even when encountering high-degree nodes. ```Shell Echidnatron% cargo run --release --example delta_query_wcoj -- ~/Projects/Datasets/livejournal 1 ``` -------------------------------- ### Attach Degree Distribution Computation to Graph Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md Attaches the pre-defined degree distribution computation from 'libdegr_dist.dylib' to the previously loaded graph identified by ''. This initiates the computation of changes to node out-degree counts. ```Application CLI load ./dataflows/degr_dist/target/debug/libdegr_dist.dylib build ``` -------------------------------- ### Run Differential Dataflow with 10 Concurrent Update Rounds Source: https://github.com/timelydataflow/differential-dataflow/blob/master/README.md This command executes the 'hello' example of Differential Dataflow, processing 10,000,000 items with 50,000,000 operations, batching updates in groups of 10. The output shows the time taken for initial loading and subsequent batches, illustrating improved performance compared to single updates. ```Shell Echidnatron% cargo run --release --example hello -- 10000000 50000000 10 no_inspect Finished release [optimized + debuginfo] target(s) in 0.04s Running `target/release/examples/hello 10000000 50000000 10 no_inspect` round 0 finished after 15.556475008s (loading) round 10 finished after 421.219µs round 20 finished after 1.56369ms round 30 finished after 338.54µs round 40 finished after 351.843µs round 50 finished after 339.608µs ... ``` -------------------------------- ### Defining a Differential Dataflow Computation in Rust Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md This Rust function, `build`, defines the core logic of the `degr_dist` computation. It takes an `Environment` context, imports a dataflow, transforms it by counting total elements, and probes the result. The function expects exactly one argument for identifying the trace handle. ```Rust #[no_mangle] pub fn build((dataflow, handles, probe, _timer, args): Environment) -> Result<(), String> { if args.len() != 1 { return Err(format!("expected one argument, instead: {:?}", args)); } handles .get_mut::>>>(&args[0])? .borrow_mut().as_mut().unwrap() .import(dataflow) .as_collection(|k,v| (k.clone(), v.clone())) .map(|(src, _dst)| src as usize).count_total() .map(|(_src, cnt)| cnt as usize).count_total() .probe_with(probe); Ok(()) } ``` -------------------------------- ### Run Differential Dataflow with 100 Concurrent Update Rounds Source: https://github.com/timelydataflow/differential-dataflow/blob/master/README.md This command executes the 'hello' example of Differential Dataflow, processing 10,000,000 items with 50,000,000 operations, batching updates in groups of 100. The output shows further performance improvements as the batch size increases, indicating better throughput. ```Shell Echidnatron% cargo run --release --example hello -- 10000000 50000000 100 no_inspect Finished release [optimized + debuginfo] target(s) in 0.04s Running `target/release/examples/hello 10000000 50000000 100 no_inspect` round 0 finished after 15.528724145s (loading) round 100 finished after 2.567577ms round 200 finished after 1.861168ms round 300 finished after 1.753794ms round 400 finished after 1.528285ms round 500 finished after 1.416605ms ... ``` -------------------------------- ### Optimized Friends of Friends Query with Data Arrangements in Rust Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_5/chapter_5_1.md This Rust snippet refactors the 'friends of friends' query to use data arrangements for improved efficiency. By calling `arrange_by_key()` on the `knows` collection once, the indexed representation is maintained across multiple `join_core` operations, significantly reducing memory and computation overhead compared to the naive approach. ```Rust extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::JoinCore; use differential_dataflow::operators::arrange::ArrangeByKey; fn main() { // define a new timely dataflow computation. timely::execute_from_args(::std::env::args(), move |worker| { let mut knows = differential_dataflow::input::InputSession::new(); let mut query = differential_dataflow::input::InputSession::new(); worker.dataflow(|scope| { let knows = knows.to_collection(scope); let query = query.to_collection(scope); // Arrange the data first! (by key). let knows = knows.arrange_by_key(); // Same logic as before, with a new method name. query.join_core(&knows, |x,q,y| Some((*y,*q))) .join_core(&knows, |y,q,z| Some((*q,*z))) .inspect(|result| println!("result {:?}", result)); }); # // to help with type inference ... # knows.update_at((0,0), 0usize, 1isize); # query.update_at((0,0), 0usize, 1isize); }); } ``` -------------------------------- ### Naive Friends of Friends Query with Differential Dataflow Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_5/chapter_5_1.md This Rust snippet demonstrates a basic implementation of a 'friends of friends' query using `differential_dataflow::operators::Join`. It sets up two input collections, `knows` and `query`, and performs two consecutive join operations to find the desired relation. This approach re-indexes data for each join. ```Rust extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::Join; fn main() { // define a new timely dataflow computation. timely::execute_from_args(::std::env::args(), move |worker| { let mut knows = differential_dataflow::input::InputSession::new(); let mut query = differential_dataflow::input::InputSession::new(); worker.dataflow(|scope| { let knows = knows.to_collection(scope); let query = query.to_collection(scope); // Hop from x to y, then from y to z. query.join_map(&knows, |x,q,y| (*y,*q)) .join_map(&knows, |y,q,z| (*q,*z)) .inspect(|result| println!("result {:?}", result)); }); # // to help with type inference ... # knows.update_at((0,0), 0usize, 1isize); # query.update_at((0,0), 0usize, 1isize); }); } ``` -------------------------------- ### Shell: Example Output of Differential Dataflow Changes Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_0/chapter_0_2.md This console output shows the results of running the differential dataflow program after applying the input modifications. It illustrates how the system processes and outputs the changes to the skip-level reports over various timestamps, reflecting the dynamic updates to the organizational structure. ```Console Output Echidnatron% cargo run -- 10 Running `target/debug/my_project` ((0, (0, 0)), 0, 1) ((0, (0, 1)), 0, 1) ((0, (0, 2)), 2, 1) ((1, (0, 2)), 0, 1) ((1, (0, 2)), 2, -1) ((1, (0, 3)), 0, 1) ((1, (0, 4)), 4, 1) ((1, (0, 5)), 5, 1) ((2, (0, 4)), 2, 1) ((2, (0, 4)), 4, -1) ((2, (0, 5)), 2, 1) ((2, (0, 5)), 5, -1) ((2, (0, 6)), 6, 1) ((2, (0, 7)), 7, 1) ((2, (0, 8)), 8, 1) ((2, (1, 4)), 0, 1) ((2, (1, 4)), 2, -1) ((2, (1, 5)), 0, 1) ((2, (1, 5)), 2, -1) ((3, (1, 6)), 0, 1) ((3, (1, 6)), 6, -1) ((3, (1, 7)), 0, 1) ((3, (1, 7)), 7, -1) ((3, (1, 9)), 9, 1) ((4, (1, 8)), 4, 1) ((4, (1, 8)), 8, -1) ((4, (1, 9)), 4, 1) ((4, (1, 9)), 9, -1) ((4, (2, 8)), 0, 1) ((4, (2, 8)), 4, -1) ((4, (2, 9)), 0, 1) ((4, (2, 9)), 4, -1) Echidnatron% ``` -------------------------------- ### Importing and Querying a Shared Dataflow Arrangement (Rust) Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_5/chapter_5_3.md This Rust example illustrates how to import a previously exported `trace` (arrangement) into new dataflows. It shows how to construct multiple query sets, each reading a part of the shared `knows` collection using `semijoin`. The `trace.import(scope)` line is crucial for bringing the shared arrangement into the current dataflow scope for efficient operations. ```rust for round in 1 .. 1_000 { worker.dataflow(|scope| { // Round-specific query set. let query = (round .. round + 3) .to_stream(scope) .map(move |x| (x, round, 1)) .as_collection(); // Import arrangement, extract keys from `query`. trace .import(scope) .semijoin(&query) .consolidate() .inspect(move |x| println!("{:?}\t{:?}", timer.elapsed(), x)) .probe_with(&mut probe); }); // Change the collection a bit. input.remove((round, round)); input.advance_to(round + 1); input.flush(); // Run until all computations are current. while probe.less_than(input.time()) { worker.step(); } } ``` -------------------------------- ### Execute Worst-Case Optimal Delta Query with Batching (Rust) Source: https://github.com/timelydataflow/differential-dataflow/blob/master/dogsdogsdogs/README.md This command executes the worst-case optimal delta query example with a batch size of 1000 updates per round. It illustrates how increasing the batch size can improve overall throughput for the WCOJ implementation, allowing for more distinct updates per second. ```Shell Echidnatron% cargo run --release --example delta_query_wcoj -- ~/Projects/Datasets/livejournal 1000 ``` -------------------------------- ### Create Differential Dataflow Collection Using Scope's new_collection Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_3/chapter_3_1.md This Rust example shows an alternative way to create a differential dataflow collection directly from a timely dataflow scope using the `new_collection` method. The input handle must be returned from the `dataflow` closure to be used externally for managing the collection. ```Rust // define a new computation. let mut input = worker.dataflow(|scope| { // create a new collection from our input. let (input, manages) = scope.new_collection(); // ... input }); ``` -------------------------------- ### Rust Example: Inspecting Data Without Consolidation Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_2/chapter_2_4.md This Rust code demonstrates a scenario where mapping and concatenating a collection can lead to multiple physical tuples for the same logical element and timestamp. The `inspect` operator reveals these duplicates, highlighting the need for consolidation in certain contexts. ```Rust # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; # use differential_dataflow::Collection; # use differential_dataflow::lattice::Lattice; # use differential_dataflow::operators::Reduce; # fn example(manages: &Collection) # where G::Timestamp: Lattice # { manages .map(|(m2, m1)| (m1, m2)) .concat(&manages) .inspect(|x| println!("{:?}", x)); # } ``` -------------------------------- ### Run Differential Dataflow with 100,000 Concurrent Update Rounds Source: https://github.com/timelydataflow/differential-dataflow/blob/master/README.md This command executes the 'hello' example of Differential Dataflow, processing 10,000,000 items with 50,000,000 operations, batching updates in groups of 100,000. While less 'interactive', this large batch size achieves significantly higher throughput, averaging about five microseconds per update. ```Shell Echidnatron% cargo run --release --example hello -- 10000000 50000000 100000 no_inspect Finished release [optimized + debuginfo] target(s) in 0.04s Running `target/release/examples/hello 10000000 50000000 100000 no_inspect` round 0 finished after 15.65053789s (loading) round 100000 finished after 505.210924ms round 200000 finished after 524.069497ms round 300000 finished after 470.77752ms round 400000 finished after 621.325393ms round 500000 finished after 472.791742ms ... ``` -------------------------------- ### Differential Dataflow: Map and Join to Reverse Pairs Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_2/chapter_2_1.md This Rust example uses `map` to reverse `(m2, m1)` pairs to `(m1, m2)` in a collection. It then `join`s the result with the original `manages` collection. The final output is inspected, demonstrating chained operations and count preservation. ```Rust # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; # use differential_dataflow::Collection; # use differential_dataflow::lattice::Lattice; # use differential_dataflow::operators::Join; # fn example(manages: &Collection) # where G::Timestamp: Lattice # { manages .map(|(m2, m1)| (m1, m2)) .join(&manages) .inspect(|x| println!("{:?}", x)); # } ``` -------------------------------- ### Rust: Perform Interactive Data Updates with Worker Synchronization Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_a/chapter_a_3.md This example shows how to perform interactive data modifications (removals and insertions) in a timely dataflow application. For each update, the workers are synchronized by advancing the input time, flushing, and waiting for the probe to catch up, allowing for step-by-step processing and progress reporting. ```rust // make changes, but await completion. let mut person = 1 + worker.index(); while person < size { input.remove((person/2, person)); input.insert((person/3, person)); input.advance_to(person); input.flush(); while probe.less_than(&input.time()) { worker.step(); } println!("{:?}\tstep {} complete", worker.timer().elapsed(), person); person += worker.peers(); } ``` -------------------------------- ### Rust: Optimizing Transitive Closure with Enter Operator Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_2/chapter_2_7.md This example shows how to use the `enter` operator to bring the original `manages` collection into the iteration loop. This allows the `transitive` collection to be extended by one step along the original `manages` relation, potentially improving performance compared to extending along `transitive` itself. ```rust # extern crate timely; # extern crate differential_dataflow; # use timely::dataflow::Scope; # use differential_dataflow::Collection; # use differential_dataflow::operators::{Join, Threshold}; # use differential_dataflow::operators::{Iterate, iterate::Variable}; # use differential_dataflow::lattice::Lattice; # fn example(manages: &Collection) # where G::Timestamp: Lattice # { manages // transitive contains (manager, person) for many hops. .iterate(|transitive| { let manages = manages.enter(&transitive.scope()); transitive .map(|(mk, m1)| (m1, mk)) .join(&manages) .map(|(m1, (mk, p))| (mk, p)) .concat(&manages) .distinct() }); # } ``` -------------------------------- ### Stashing Differential Dataflow Trace Handle in Rust Source: https://github.com/timelydataflow/differential-dataflow/blob/master/server/README.md This snippet demonstrates how to capture and stash a `TraceHandle` from a Differential Dataflow computation. After probing a collection, the resulting trace is wrapped in a `RefCell>` and registered in a shared `handles` map, allowing other parts of the application to access the dataflow's output. ```Rust let trace = // .. lots of stuff .. .probe_with(probe) .as_collection() .arrange_by_key_u() .trace; *trace_handle.borrow_mut() = Some(trace); handles.set::>>>(name.to_owned(), trace_handle); ``` -------------------------------- ### Rust Example: Finding Duplicate Packages with `reduce` Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_1/chapter_1_2.md This Rust code snippet demonstrates the `reduce` operator applied to an input collection of `(person, package)` pairs. It iterates through packages associated with a person, identifies those with a count greater than one (duplicates), and pushes them to the output. The snippet highlights the need to handle `(val, count)` pairs and the use of `.clone()` due to Rust's ownership rules for input values. ```rust ordered_by .map(|(package, person)| (person, package)) .reduce(|person, packages, duplicates| { for (package, count) in packages.iter() { if count > 1 { duplicates.push((package.clone(), count)); } } }); ``` -------------------------------- ### Modifying Differential Dataflow Input Collection with Inserts and Removes in Rust Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_1/chapter_1_1.md This example illustrates how to interact with an existing Differential Dataflow input collection. It demonstrates inserting new elements, removing existing ones, and crucially, advancing the collection's logical time using `advance_to()` to signal completion of changes at prior times. ```Rust // drive the input around here. input.insert("hello".to_string()); input.insert("world".to_string()); input.advance_to(1); input.remove("hello".to_string()); input.insert("goodbye".to_string()); input.advance_to(2); ``` -------------------------------- ### Enter a nested scope with an arranged collection in Differential Dataflow (Rust) Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_5/chapter_5_4.md This Rust example demonstrates how to use the `enter` method to bring an already arranged collection into a nested iterative scope in Differential Dataflow. Unlike regular collections which are re-entered, an arranged collection is simply wrapped with timestamp-extending logic, allowing it to be efficiently reused within the new scope without requiring re-arrangement, which is beneficial for iterative computations. ```Rust extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::Join; use differential_dataflow::operators::Threshold; use differential_dataflow::operators::Iterate; use differential_dataflow::operators::arrange::ArrangeByKey; fn main() { // define a new timely dataflow computation. timely::execute_from_args(::std::env::args(), move |worker| { let mut knows = differential_dataflow::input::InputSession::new(); let mut query = differential_dataflow::input::InputSession::new(); worker.dataflow(|scope| { let knows = knows.to_collection(scope); let query = query.to_collection(scope); // Arrange the data first! (by key). let knows = knows.arrange_by_key(); // Reachability queries. query.iterate(|reach| { let knows = knows.enter(&reach.scope()); let query = query.enter(&reach.scope()); knows.join_map(reach, |x,y,q| (*y,*q)) .concat(&query) .distinct() }); }); # // to help with type inference ... # knows.update_at((0,0), 0usize, 1isize); # query.update_at((0,0), 0usize, 1isize); }); } ``` -------------------------------- ### Arranging Data by Self for Distinct and Semijoin Operations in Differential Dataflow (Rust) Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_5/chapter_5_2.md Illustrates the use of `arrange_by_self()` where collection elements themselves are taken as keys with no associated values. This is crucial for operators like `distinct`, `count`, and `semijoin`. The example extends the 'friends of friends' problem to include additional 'knows' relationships using `semijoin` with `arrange_by_self`, demonstrating significant resource reduction. ```Rust extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::JoinCore; use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::arrange::ArrangeBySelf; fn main() { // define a new timely dataflow computation. timely::execute_from_args(::std::env::args(), move |worker| { let mut knows = differential_dataflow::input::InputSession::new(); let mut query = differential_dataflow::input::InputSession::new(); worker.dataflow(|scope| { let knows = knows.to_collection(scope); let query = query.to_collection(scope); // Arrange the data first! (by key and self). let knows_by_key = knows.arrange_by_key(); let knows_by_self = knows.arrange_by_self(); // The same outputs as in the previous example. let candidates = query.join_core(&knows_by_key, |x,q,y| Some((*y,(*x,*q)))) .join_core(&knows_by_key, |y,(x,q),z| Some((*q,(*x,*y,*z)))); // Repeatedly put pairs of nodes as keys, and semijoin with knows. candidates .map(|(q,(x,y,z))| ((x,z),(q,y))) .join_core(&knows_by_self, |&(x,z),&(q,y),&()| Some(((y,z),(q,x)))) .join_core(&knows_by_self, |&(y,z),&(q,x),&()| Some(((z,x),(q,y)))) .join_core(&knows_by_self, |&(z,x),&(q,y),&()| Some(((y,x),(q,z)))) .join_core(&knows_by_self, |&(y,x),&(q,z),&()| Some((q,(x,y,z)))) .inspect(|result| println!("result {:?}", result)); }); # // to help with type inference ... # knows.update_at((0,0), 0usize, 1isize); # query.update_at((0,0), 0usize, 1isize); }); } ``` -------------------------------- ### Rust Differential Dataflow Variable Usage in Subgraph Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_2/chapter_2_7.md This Rust function `example` demonstrates how to use a `Variable` within a `scoped` subgraph in Differential Dataflow. It takes a `Collection` as input, enters it into a subgraph, creates a `Variable` initialized from the entered collection, applies a `logic` function to the variable, sets the variable with the result, and then leaves the subgraph. This pattern is common for iterative algorithms or fixed-point computations. ```Rust # fn example<'a, G: Scope>(collection: &Collection) //, logic: impl Fn(&Variable, (u64, u64), isize>) -> Collection, (u64, u64)>) # where G::Timestamp: Lattice # { collection.scope().scoped("inner", |subgraph| { let variable = Variable::new_from(collection.enter(subgraph), 1); let result = logic(&variable); variable.set(&result); result.leave() }); # } ``` -------------------------------- ### Measure Initial Computation Performance (Single Thread) Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_0/chapter_0_3.md Measures the time taken to produce the collection of skip-level management for 10 million people using a single thread, without modifications. The output shows the command executed and its performance statistics, demonstrating the throughput for the initial data loading and processing. ```shell Echidnatron% time cargo run --release -- 10000000 cargo run --release --example hello 10000000 -w1 2.74s user 1.00s system 98% cpu 3.786 total Echidnatron% ``` -------------------------------- ### Measure Initial Computation Performance (Two Threads) Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_0/chapter_0_3.md Measures the time taken to produce the collection of skip-level management for 10 million people using two worker threads (`-w2`). The output shows the command executed and its performance statistics, demonstrating the performance improvement achieved through parallelism for initial data loading. ```shell Echidnatron% time cargo run --release -- 10000000 -w2 cargo run --release --example hello 10000000 -w2 3.34s user 1.27s system 191% cpu 2.402 total Echidnatron% ``` -------------------------------- ### Console Output: Initial Load and First Interactive Steps Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_a/chapter_a_3.md This output snippet displays the console log from a timely dataflow application, showing the elapsed time for initial data loading and the subsequent completion times for the first few interactive update steps. It illustrates the promptness of individual update processing. ```text Echidnatron% cargo run --release -- 10000000 Finished release [optimized] target(s) in 0.24s Running `target/release/my_project 10000000` 4.092895186s data loaded 4.092975626s step 1 complete 4.093021676s step 2 complete 4.093041130s step 3 complete 4.093110803s step 4 complete 4.093144075s step 5 complete 4.093187645s step 6 complete 4.093208245s step 7 complete 4.093236460s step 8 complete 4.093281793s step 9 complete ``` -------------------------------- ### Setting up a Timely Dataflow Execution Environment in Rust Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_1/chapter_1_0.md This Rust code snippet provides the standard skeleton for a timely dataflow application. It initializes a timely dataflow execution environment using `timely::execute_from_args`, where each worker constructs a dataflow and handles its inputs. This foundational structure is crucial for subsequent differential dataflow operations. ```Rust extern crate timely; extern crate differential_dataflow; fn main() { // prepare a timely dataflow execution environment. timely::execute_from_args(std::env::args(), |worker| { // create a differential dataflow. let mut input = worker.dataflow::(|scope| { // create inputs, build dataflow, return input handle. }); // drive the input around here. }).unwrap(); } ``` -------------------------------- ### Creating and Exporting a Sharable Differential Dataflow Arrangement (Rust) Source: https://github.com/timelydataflow/differential-dataflow/blob/master/mdbook/src/chapter_5/chapter_5_3.md This Rust code snippet demonstrates how to create a dynamic collection using `InputSession` and then arrange it by key. The key aspect is returning the `.trace` field of the arrangement, which makes the collection available for re-introduction and use in other dataflows, enabling shared access to its state. ```rust extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::JoinCore; use differential_dataflow::operators::arrange::ArrangeByKey; fn main() { // define a new timely dataflow computation. timely::execute_from_args(::std::env::args(), move |worker| { let mut knows = differential_dataflow::input::InputSession::new(); // Input and arrange a dynamic collection. let mut trace = worker.dataflow(|scope| { let knows = knows.to_collection(scope); let knows = knows.arrange_by_key(); // Return the `.trace` field of the arrangement`. knows.trace }); # // to help with type inference ... # knows.update_at((0,0), 0usize, 1isize); }); } ```