### Starting Expansion Service with Allowlist Source: https://beam.apache.org/documentation/programming-guide/index A command-line example demonstrating how to start the Java expansion service, providing the path to the YAML allowlist file using the `javaClassLookupAllowlistFile` option. ```Shell java -jar --javaClassLookupAllowlistFile= ``` -------------------------------- ### Verify Docker Installation Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build Runs a 'hello-world' container to verify that Docker is installed correctly and that the user can run containers without root privileges. ```bash docker run hello-world ``` -------------------------------- ### Install Git Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build Installs Git, which is used to download the Apache Beam source code from GitHub. This is a prerequisite for building the SDK image. ```bash sudo apt-get install -y git ``` -------------------------------- ### Generate Maven Archetype for Beam Examples Source: https://beam.apache.org/documentation/patterns/cross-language This command generates a Maven project using the Beam examples archetype, which is useful for setting up a Java Beam project, especially for multi-language examples. It requires specifying the Beam version and project details. ```Shell export BEAM_VERSION= mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=$BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=multi-language-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false ``` -------------------------------- ### Download and Install Golang 1.23.2 Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build This snippet downloads, extracts, and installs Golang version 1.23.2 for Linux AMD64. It then removes any previous Go installation, sets up the new one in '/usr/local/go', and adds the Go binary directory to the system's PATH environment variable. ```Shell # Download and install curl -OL https://go.dev/dl/go1.23.2.linux-amd64.tar.gz sudo rm -rf /usr/local/go && sudo tar -C /usr/local -xzf go1.23.2.linux-amd64.tar.gz # Add go to PATH. export PATH=:/usr/local/go/bin:$PATH ``` -------------------------------- ### Start Expansion Service (Bash) Source: https://beam.apache.org/documentation/programming-guide/index Starts the expansion service using a JAR file on a specified port. This is a command-line utility for initiating the service. ```bash jar -jar /path/to/expansion_service.jar ``` -------------------------------- ### Online Clustering Example with Apache Beam Source: https://beam.apache.org/documentation/ml/overview Provides an example of implementing online clustering algorithms using Apache Beam. This allows for dynamic clustering of data as it arrives in streaming pipelines. ```Python # Conceptual example for online clustering # from apache_beam.ml.clustering.online import OnlineKMeans # pipeline | 'OnlineClustering' >> OnlineKMeans(num_clusters=5) ``` -------------------------------- ### Install Docker on Debian Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build This sequence of commands installs Docker on a Debian system. It updates the package list, installs necessary packages for adding repositories, downloads and adds Docker's GPG key, and sets up the Docker APT repository. ```Shell # Add GPG keys. sudo apt-get update sudo apt-get install ca-certificates curl sudo install -m 0755 -d /etc/apt/keyrings sudo curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc sudo chmod a+r /etc/apt/keyrings/docker.asc ``` -------------------------------- ### Shell: Start Java Expansion Service Source: https://beam.apache.org/documentation/sdks/python-multi-language-pipelines Starts the Java ExpansionService for multi-language pipelines. This command assumes a bundled JAR file and specifies the port for the service. ```Shell java -jar java-prefix-bundled-0.1.jar 12345 ``` -------------------------------- ### Start Python Expansion Service Source: https://beam.apache.org/documentation/programming-guide/index Starts the Apache Beam Python SDK's expansion service using the `python -m` command. It specifies the port and the pickle library for serialization, making Python transforms available to the expansion service. ```Shell $ python -m apache_beam.runners.portability.expansion_service_test -p $PORT_FOR_EXPANSION_SERVICE --pickle_library=cloudpickle ``` -------------------------------- ### Install Docker Packages Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build Installs the necessary Docker packages, including Docker Engine, CLI, containerd, and buildx plugin. These are required to run and build Docker containers. ```bash sudo apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin ``` -------------------------------- ### Go SplittableDoFn CreateInitialRestriction Example Source: https://beam.apache.org/documentation/programming-guide/index Provides an example of `CreateInitialRestriction` for a SplittableDoFn in Go. This method initializes a restriction for a given filename, setting the start to 0 and the end to the file's length. ```Go func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction { return offsetrange.Restriction{ Start: 0, End: getFileLength(filename), } } ``` -------------------------------- ### Install and Configure Pyenv Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build This snippet installs Pyenv, a tool for managing multiple Python versions, and configures the shell environment to use Pyenv for Python version management and virtual environment initialization. ```Shell # Install Pyenv curl https://pyenv.run | bash # Add pyenv to PATH. export PATH="$HOME/.pyenv/bin:$PATH" eval "$(pyenv init -)" eval "$(pyenv virtualenv-init -)" ``` -------------------------------- ### Start Apache Beam Transform Service (Python) Source: https://beam.apache.org/documentation/programming-guide/index Manually starts the Apache Beam Transform service using the Python SDK. Requires specifying the port, Beam version, project name, and the 'up' command via the transform_service_launcher utility. ```bash python -m apache_beam.utils.transform_service_launcher --port --beam_version --project_name --command up ``` -------------------------------- ### Instantiate BeamJarExpansionService (Python) Source: https://beam.apache.org/documentation/programming-guide/index Shows how to instantiate BeamJarExpansionService for starting an expansion service from a JAR released with Beam. It requires a Gradle target for building the shaded JAR. ```python expansion_service = BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar') ``` -------------------------------- ### Install Apache Beam with GCP and DataFrame Packages Source: https://beam.apache.org/documentation/patterns/cross-language Installs the Apache Beam Python SDK with necessary packages for GCP integration and DataFrame operations. This is a prerequisite for running the Python expansion service. ```Shell pip install 'apache-beam[gcp,dataframe]' ``` -------------------------------- ### Go Quickstart for Apache Beam WordCount Source: https://beam.apache.org/documentation/resources/learning-resources This resource provides instructions on how to set up and execute a WordCount pipeline using the Go SDK for Apache Beam. It's a starting point for Go developers new to Apache Beam. ```Go // Example of a Go Quickstart for Apache Beam WordCount pipeline // This would involve setting up a Go module and running a Go program. ``` -------------------------------- ### Start Apache Beam Transform Service (Java) Source: https://beam.apache.org/documentation/programming-guide/index Manually starts the Apache Beam Transform service using the Java SDK. Requires specifying the JAR file, port, Beam version, project name, and the 'up' command. ```bash java -jar beam-sdks-java-transform-service-app-.jar --port --beam_version --project_name --command up ``` -------------------------------- ### Apache Beam State Garbage Collection using Windows (Go) Source: https://beam.apache.org/documentation/programming-guide/index Provides a Go example for garbage collecting state using windowing. Demonstrates `beam.WindowInto` with `window.NewFixedWindows` to manage state per day. ```Go items := beam.ParDo(s, statefulDoFn{ S: state.MakeValueState[int]("S"), }, elements) out := beam.WindowInto(s, window.NewFixedWindows(24*time.Hour), items) ``` -------------------------------- ### Python Stateful DoFn Example Source: https://beam.apache.org/documentation/programming-guide/index A Python example of a stateful DoFn demonstrating the use of state bags and timers for processing elements. It includes logic for clearing buffer state and managing timers based on specific conditions. ```Python def _is_clear_buffer_1_required(buffer_1_data): # Some business logic return True ``` -------------------------------- ### Sample Formatted Results Source: https://beam.apache.org/documentation/programming-guide/index This section provides sample formatted output strings that represent the joined and processed contact information, consistent across different language examples. ```Text "amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']", "carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']", "james; []; ['222-333-4444']", "julia; ['julia@example.com']; []" ``` -------------------------------- ### Apache Beam Timer Callback Parameters (Go) Source: https://beam.apache.org/documentation/programming-guide/index Provides Go code examples for timer callback functions, demonstrating how to access context, timestamps, state providers, timer providers, keys, and emit functions for debugging. ```Go func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, _ func(beam.EventTime, string, string)) error { s.ElementBag.Add(sp, word) s.MinTime.Add(sp, int64(ts)) // Process the element here. } func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context, emit func(beam.EventTime, string, string)) { log.Infof(ctx, "Timer fired for key %q, for family %q and tag %q", key, timer.Family, timer.Tag) } ``` -------------------------------- ### Python Quickstart for Apache Beam WordCount Source: https://beam.apache.org/documentation/resources/learning-resources This resource guides users through setting up and running a WordCount pipeline with the Python SDK for Apache Beam. It's designed for beginners to understand basic data processing concepts. ```Python # Example of a Python Quickstart for Apache Beam WordCount pipeline # This would typically involve installing the apache-beam package and running a Python script. ``` -------------------------------- ### Anomaly Detection Example with Apache Beam Source: https://beam.apache.org/documentation/ml/overview Illustrates how to build an anomaly detection pipeline using Apache Beam. This involves processing data streams or batches to identify unusual patterns or outliers. ```Python # Conceptual example for anomaly detection # from apache_beam.ml.anomaly_detection import AnomalyDetector # pipeline | 'AnomalyDetection' >> AnomalyDetector(model_path='path/to/anomaly_model') ``` -------------------------------- ### Install Pyenv Build Dependencies Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build Installs essential development libraries and tools required for compiling Python from source using Pyenv. These dependencies include build tools, SSL libraries, compression libraries, and more. ```Shell # Install dependencies sudo apt-get install -y make build-essential libssl-dev zlib1g-dev \ libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev \ libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev ``` -------------------------------- ### Start Job Server for Portable DirectRunner Source: https://beam.apache.org/documentation/patterns/cross-language Starts the job server for the portable DirectRunner, which is necessary for running multi-language pipelines locally. This command requires setting the JOB_SERVER_PORT environment variable. ```Shell export JOB_SERVER_PORT= python -m apache_beam.runners.portability.local_job_service_main -p $JOB_SERVER_PORT ``` -------------------------------- ### Start Python Expansion Service Source: https://beam.apache.org/documentation/patterns/cross-language Starts the standard expansion service for Python transforms in Apache Beam. This command requires setting a port number and can use a glob pattern to specify which fully qualified names to expand. ```Shell python -m apache_beam.runners.portability.expansion_service_main -p --fully_qualified_name_glob "*" ``` -------------------------------- ### Java: Common Task Examples (BigQuery, Pub/Sub, File Writing) Source: https://beam.apache.org/documentation/resources/learning-resources Provides examples of common tasks in Java with Apache Beam, focusing on configuration and data handling. Includes setting up BigQuery and Pub/Sub connectors, and writing data to files on a per-window basis. ```Java // Example for configuring BigQuery // Example for configuring Pub/Sub // Example for writing one file per window ``` -------------------------------- ### CombiningState Example in Go Source: https://beam.apache.org/documentation/programming-guide/index Provides a Go implementation for CombiningState in Apache Beam. It shows how to define a state struct, process elements, and use a combining function for state updates. ```Go // combiningStateFn keeps track of the number of elements seen. type combiningStateFn struct { // types are the types of the accumulator, input, and output respectively Val state.Combining[int, int, int] } func (s *combiningStateFn) ProcessElement(p state.Provider, book string, word string, emitWords func(string)) error { // Get the value stored in our state val, _, err := s.Val.Read(p) if err != nil { return err } s.Val.Add(p, 1) if val > 10000 { // Example of clearing and starting again with an empty bag s.Val.Clear(p) } return nil } func combineState(s beam.Scope, input beam.PCollection) beam.PCollection { // ... // CombineFn param can be a simple fn like this or a structural CombineFn cFn := state.MakeCombiningState[int, int, int]("stateKey", func(a, b int) int { return a + b }) combined := beam.ParDo(s, combiningStateFn{Val: cFn}, input) // ... } ``` -------------------------------- ### Install Python 3.9 using Pyenv Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build Installs Python version 3.9 using Pyenv and sets it as the global default Python version for the environment. This process may take several minutes. ```Shell pyenv install 3.9 pyenv global 3.9 ``` -------------------------------- ### Install OpenJDK 11 JDK Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build This command installs the OpenJDK 11 Java Development Kit (JDK) on a Debian-based system, which is required for running Gradle tasks. ```Shell sudo apt-get install -y openjdk-11-jdk ``` -------------------------------- ### Java Development Environment Setup for Apache Beam Source: https://beam.apache.org/documentation/resources/learning-resources This guide details the process of configuring a Java development environment for Apache Beam, specifically using IntelliJ IDEA and Maven. It ensures users can build and run Beam pipelines effectively. ```Java // Example configuration for IntelliJ and Maven for Apache Beam development // This might involve setting up pom.xml dependencies and IDE configurations. ``` -------------------------------- ### Accessing Window in Go DoFn Source: https://beam.apache.org/documentation/programming-guide/index Provides an example of accessing the window in a Go DoFn by declaring a parameter of type `beam.Window` and then type asserting it to the concrete window implementation like `window.IntervalWindow`. ```Go func MyDoFn(w beam.Window, word string) string { iw := w.(window.IntervalWindow) ... } ``` -------------------------------- ### Python: End-to-End Pipeline Examples Source: https://beam.apache.org/documentation/resources/learning-resources Presents end-to-end example pipelines in Python with Apache Beam. These examples cover diverse applications such as auto-completion, mobile gaming statistics, Julia set calculation, distributed optimization, PI estimation, TF-IDF calculation, and Wikipedia session analysis. ```Python # Example for auto-complete pipeline # Example for mobile gaming statistics pipeline # Example for Julia set calculation pipeline # Example for distributed optimization tasks pipeline # Example for estimating PI pipeline # Example for TF-IDF calculation pipeline # Example for top Wikipedia sessions pipeline ``` -------------------------------- ### BagState Example in Go Source: https://beam.apache.org/documentation/programming-guide/index Provides a Go implementation for BagState in Apache Beam, used for accumulating elements. It demonstrates reading, adding, and conditionally clearing the bag state. ```Go // bagStateFn only emits words that haven't been seen type bagStateFn struct { Bag state.Bag[string] } func (s *bagStateFn) ProcessElement(p state.Provider, book, word string, emitWords func(string)) error { // Get all values we've written to this bag state in this window. vals, ok, err := s.Bag.Read(p) if err != nil { return err } if !ok || !contains(vals, word) { emitWords(word) s.Bag.Add(p, word) } if len(vals) > 10000 { // Example of clearing and starting again with an empty bag s.Bag.Clear(p) } return nil } ``` -------------------------------- ### Java SplittableDoFn Size Calculation Example Source: https://beam.apache.org/documentation/programming-guide/index Provides a Java example for the `@GetSize` method in a SplittableDoFn. This method calculates the size of a restriction, incorporating a weight based on the filename to influence work distribution. ```Java @GetSize double getSize(@Element String fileName, @Restriction OffsetRange restriction) { return (fileName.contains("expensiveRecords") ? 2 : 1) * restriction.getTo() - restriction.getFrom(); } ``` -------------------------------- ### Configure Pipeline Options (Python) Source: https://beam.apache.org/documentation/programming-guide/index Shows how to instantiate Beam PipelineOptions in Python. ```Python from apache_beam.options.pipeline_options import PipelineOptions beam_options = PipelineOptions() ``` -------------------------------- ### Apache Beam WordCount Pipeline Example Source: https://beam.apache.org/documentation/resources/learning-resources This section walks through the code of a simple WordCount pipeline in Apache Beam. It serves as an introductory example to illustrate fundamental data processing concepts, akin to 'Hello World' for data processing. ```Python import apache_beam as beam with beam.Pipeline() as pipeline: lines = pipeline | 'Create' >> beam.Create(['hello world', 'apache beam']) words = lines | 'Split' >> beam.FlatMap(lambda line: line.split(' ')) word_counts = words | 'Count' >> beam.combiners.Count.PerElement() word_counts | 'Print' >> beam.Map(print) ``` -------------------------------- ### Go SplittableDoFn CreateTracker Example Source: https://beam.apache.org/documentation/programming-guide/index Shows the `CreateTracker` method for a SplittableDoFn in Go, which creates and returns a `sdf.LockRTracker` initialized with the provided `offsetrange.Restriction`. ```Go func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) } ``` -------------------------------- ### Install Apache Beam Prism with Go Source: https://beam.apache.org/documentation/runners/prism Installs the Apache Beam Prism binary using the 'go install' command. This method is suitable for running Prism on your local machine. ```Go go install github.com/apache/beam/sdks/v2/go/cmd/prism@latest prism ``` -------------------------------- ### Create Apache Beam Pipeline (TypeScript) Source: https://beam.apache.org/documentation/programming-guide/index Provides an example of creating a Beam pipeline in TypeScript using an asynchronous function that accepts a root object. ```TypeScript await beam.createRunner().run(function pipeline(root) { // Use root to build a pipeline. }); ``` -------------------------------- ### Java Data Generator Class Example Source: https://beam.apache.org/documentation/programming-guide/index An example Java class demonstrating how to create a transform that can be used directly from the Python API. It shows a static factory method for construction and a builder method for configuration, meeting the requirements for direct usage. ```Java public class JavaDataGenerator extends PTransform> { . . . // The following method satisfies requirement 1. // Note that you could use a class constructor instead of a static method. public static JavaDataGenerator create(Integer size) { return new JavaDataGenerator(size); } static class JavaDataGeneratorConfig implements Serializable { public String prefix; public long length; public String suffix; . . . } // The following method conforms to requirement 2. public JavaDataGenerator withJavaDataGeneratorConfig(JavaDataGeneratorConfig dataConfig) { return new JavaDataGenerator(this.size, javaDataGeneratorConfig); } . . . } ``` -------------------------------- ### Confirm Golang Version Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build This command checks the installed version of the Go compiler and its operating system/architecture. ```Shell go version ``` -------------------------------- ### CombiningState Example in Python Source: https://beam.apache.org/documentation/programming-guide/index Illustrates the use of CombiningState in Python for Beam pipelines. It defines a state specification and processes elements by adding values to the state. ```Python class CombiningStateDoFn(DoFn): SUM_TOTAL = CombiningValueStateSpec('total', sum) def process(self, element, state=DoFn.StateParam(SUM_TOTAL)): state.add(1) _ = (p | 'Read per user' >> ReadPerUser() | 'Combine state pardo' >> beam.ParDo(CombiningStateDofn())) ``` -------------------------------- ### Example JDBC IO Integration Test Setup (Java) Source: https://beam.apache.org/documentation/io/testing This is a placeholder comment indicating where instructions for running a JDBC I/O integration test in Java would typically be found within a test class. It implies the use of the `integrationTest` Gradle task. ```Java // See the instructions in the test class (e.g. JdbcIOIT.java) ``` -------------------------------- ### Running Beam Expansion Service Source: https://beam.apache.org/documentation/sdks/python-custom-multi-language-pipelines-guide Demonstrates how to run a compiled Apache Beam expansion service JAR from the command line. It specifies the JAR file path and a port number for the service to listen on, showing example output of registered transforms. ```Shell $ java -jar path/to/my-expansion-service.jar 12345 Starting expansion service at localhost:12345 Registered transforms: ... Registered SchemaTransformProviders: beam:schematransform:org.apache.beam:my_transform:v1 ``` -------------------------------- ### CombiningState Example in Java Source: https://beam.apache.org/documentation/programming-guide/index Demonstrates how to use CombiningState in Java to update state using a Beam combiner. It shows the state specification and the process element logic for adding values. ```Java PCollection> perUser = readPerUser(); perUser.apply(ParDo.of(new DoFn, OutputT>() { @StateId("state") private final StateSpec> numElements = StateSpecs.combining(Sum.ofIntegers()); @ProcessElement public void process(@StateId("state") ValueState state) { state.add(1); } })); ``` -------------------------------- ### Apache Beam Fixed Time Windows Source: https://beam.apache.org/documentation/programming-guide/index Illustrates the concept of fixed time windows, where data is partitioned into non-overlapping intervals of a consistent duration. This example shows a 30-second window duration. ```Conceptual Elements with timestamps from 0:00:00 up to (but not including) 0:00:30 belong to the first window. Elements with timestamps from 0:00:30 up to (but not including) 0:01:00 belong to the second window. ``` -------------------------------- ### BagState Example in Python Source: https://beam.apache.org/documentation/programming-guide/index Illustrates the use of BagState in Python for accumulating elements in Apache Beam. It shows adding elements, reading them, processing, and clearing the state. ```Python class BagStateDoFn(DoFn): ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder()) def process(self, element_pair, state=DoFn.StateParam(ALL_ELEMENTS)): state.add(element_pair[1]) if should_fetch(): all_elements = list(state.read()) process_values(all_elements) state.clear() _ = (p | 'Read per user' >> ReadPerUser() | 'Bag state pardo' >> beam.ParDo(BagStateDoFn())) ``` -------------------------------- ### Java Quickstart for Apache Beam WordCount Source: https://beam.apache.org/documentation/resources/learning-resources This resource explains how to set up and run a WordCount pipeline using the Java SDK for Apache Beam. It covers the initial steps and execution of a basic data processing pipeline. ```Java // Example of a Java Quickstart for Apache Beam WordCount pipeline // This would typically involve setting up a Maven project and running a pipeline class. ``` -------------------------------- ### Java GeminiAIClient Implementation Source: https://beam.apache.org/documentation/io/built-in/webapis An example of implementing the Caller and SetupTeardown interfaces in Java to integrate the Vertex AI Gemini client with Apache Beam's RequestResponseIO. It shows how to initialize the client in setup() and close it in teardown(). ```Java class GeminiAIClient implements Caller, KV>, SetupTeardown { @Override public KV call(KV requestKV) throws UserCodeExecutionException { GenerateContentResponse response = client.generateContent(request.getContentsList()); return KV.of(requestKV.getKey(), response); } @Override public void setup() throws UserCodeExecutionException { vertexAI = new VertexAI(getProjectId(), getLocation()); client = new GenerativeModel(getModelName(), vertexAI); } @Override public void teardown() throws UserCodeExecutionException { vertexAI.close(); } } ``` -------------------------------- ### Java: End-to-End Pipeline Examples Source: https://beam.apache.org/documentation/resources/learning-resources Showcases complete end-to-end pipeline examples in Java using Apache Beam. These cover various applications like auto-completion, streaming word extraction, TF-IDF calculation, analyzing Wikipedia sessions, and traffic flow analysis. ```Java // Example for auto-complete pipeline // Example for streaming word extract pipeline // Example for TF-IDF calculation pipeline // Example for top Wikipedia sessions pipeline // Example for traffic max lane flow pipeline // Example for traffic routes pipeline ``` -------------------------------- ### Go Stateful DoFn with State and Timers Source: https://beam.apache.org/documentation/programming-guide/index A Go implementation of a stateful DoFn using Apache Beam's state and timers. This example shows how to manage state bags (Buffer1, Buffer2) and event time timers (Watermark) within the ProcessElement and OnTimer methods. ```Go // stateAndTimersFn is an example stateful DoFn with state and a timer. type stateAndTimersFn struct { Buffer1 state.Bag[string] Buffer2 state.Bag[int64] Watermark timers.EventTime } func (s *stateAndTimersFn) ProcessElement(sp state.Provider, tp timers.Provider, w beam.Window, key string, value int64, emit func(string, int64)) error { // ... handle processing elements here, set a callback timer... // Read all the data from Buffer1 in this window. vals, ok, err := s.Buffer1.Read(sp) if err != nil { return err } if ok && s.shouldClearBuffer(vals) { // clear the buffer data if required conditions are met. s.Buffer1.Clear(sp) } // Add the value to Buffer2. s.Buffer2.Add(sp, value) if s.allConditionsMet() { // Clear the timer if certain condition met and you don't want to trigger // the callback method. s.Watermark.Clear(tp) } emit(key, value) return nil } func (s *stateAndTimersFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(string, int64)) error { // Window and key parameters are really useful especially for debugging issues. switch timer.Family { case s.Watermark.Family: // timer expired, emit a different signal emit(key, -1) } return nil } func (s *stateAndTimersFn) shouldClearBuffer([]string) bool { // some business logic return false } func (s *stateAndTimersFn) allConditionsMet() bool { // other business logic return true } ``` -------------------------------- ### Install and Run Beam SQL Shell Source: https://beam.apache.org/documentation/dsls/sql/shell Commands to clone the Beam SDK repository, build the JDBC extension with specified runners and I/O connectors, and then launch the Beam SQL shell. ```Shell ./gradlew -p sdks/java/extensions/sql/jdbc -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist ./sdks/java/extensions/sql/jdbc/build/install/jdbc/bin/jdbc ``` -------------------------------- ### BagState Example in Java Source: https://beam.apache.org/documentation/programming-guide/index Demonstrates using BagState in Java to accumulate multiple elements. It shows how to add elements to the bag, read them, process them, and clear the state. ```Java PCollection> perUser = readPerUser(); perUser.apply(ParDo.of(new DoFn, OutputT>() { @StateId("state") private final StateSpec> numElements = StateSpecs.bag(); @ProcessElement public void process( @Element KV element, @StateId("state") BagState state) { // Add the current element to the bag for this key. state.add(element.getValue()); if (shouldFetch()) { // Occasionally we fetch and process the values. Iterable values = state.read(); processValues(values); state.clear(); // Clear the state for this key. } } })); ``` -------------------------------- ### Python DoFn Example Source: https://beam.apache.org/documentation/programming-guide/index This Python code demonstrates a ComputeWordLengthFn class that inherits from beam.DoFn. It implements the process method to calculate and return the length of each input element. ```Python class ComputeWordLengthFn(beam.DoFn): def process(self, element): return [len(element)] ``` -------------------------------- ### Implement Element-wise and Batched DoFn Logic Source: https://beam.apache.org/documentation/programming-guide/index Provides an example of a DoFn that implements both element-wise (`process`) and batched (`process_batch`) logic. Beam automatically selects the appropriate method based on the input context. ```Python class MultiplyByTwo(beam.DoFn): def process(self, element: np.int64) -> Iterator[np.int64]: # Multiply an individual int64 by 2     yield element * 2 def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]: # Multiply a _batch_ of int64s by 2     yield batch * 2 ``` -------------------------------- ### Global Aggregations in TypeScript Source: https://beam.apache.org/documentation/programming-guide/index This TypeScript example demonstrates applying global aggregations to a PCollection using Beam. It shows how to combine elements using sum and product operations. ```TypeScript const pcoll = root.apply(beam.create([1, 10, 100, 1000])); const result = pcoll.apply( beam .groupGlobally() .combining((c) => c, (x, y) => x + y, "sum") .combining((c) => c, (x, y) => x * y, "product") ); const expected = { sum: 1111, product: 1000000 } ``` -------------------------------- ### JavaScript Beam Create with Row Coder Source: https://beam.apache.org/documentation/programming-guide/index Demonstrates creating a Beam pipeline in JavaScript using `beam.create` and specifying the element type with `beam.withRowCoder`. This example shows how to provide an exemplar object to infer the schema. ```JavaScript const pcoll = root .apply( beam.create([ { intField: 1, stringField: "a" }, { intField: 2, stringField: "b" }, ]) ) // Let beam know the type of the elements by providing an exemplar. .apply(beam.withRowCoder({ intField: 0, stringField: "" })); ``` -------------------------------- ### Go DoFn Example Source: https://beam.apache.org/documentation/programming-guide/index This Go code defines a ComputeWordLengthFn struct that implements the ProcessElement method for Apache Beam. It takes a string and an emit function to output the length of the string. ```Go // ComputeWordLengthFn is the DoFn to perform on each element in the input PCollection. type ComputeWordLengthFn struct {} // ProcessElement is the method to execute for each element. func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { emit(len(word)) } // DoFns must be registered with beam. func init() { beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil))) // 2 inputs and 0 outputs => DoFn2x0 ``` -------------------------------- ### Create Apache Beam Pipeline (Go) Source: https://beam.apache.org/documentation/programming-guide/index Illustrates how to initialize Beam and create a pipeline with a root scope in Go. Initialization must occur before pipeline creation. ```Go // beam.Init() is an initialization hook that must be called // near the beginning of main(), before creating a pipeline. beam.Init() // Create the Pipeline object and root scope. pipeline, scope := beam.NewPipelineWithRoot() ``` -------------------------------- ### Build Beam Python SDK Docker Image Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build Executes the Gradle task to build the Docker image for the Beam Python SDK. The example targets Python 3.10, but other versions can be specified using '-PpythonVersion'. ```bash ./gradlew :sdks:python:container:py310:docker ``` -------------------------------- ### Stateful DoFn with State and Timer in Python Source: https://beam.apache.org/documentation/programming-guide/index A Python example demonstrating a stateful DoFn using BagStateSpec for state management and TimerSpec for timed operations, including accessing PaneInfo, Window, Key, and State parameters. ```Python class StatefulDoFn(beam.DoFn): """An example stateful DoFn with state and timer""" BUFFER_STATE_1 = BagStateSpec('buffer1', beam.BytesCoder()) BUFFER_STATE_2 = BagStateSpec('buffer2', beam.VarIntCoder()) WATERMARK_TIMER = TimerSpec('watermark_timer', TimeDomain.WATERMARK) def process( self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam, buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1), buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2), watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)): # Do your processing here key, value = element # Read all the data from buffer1 all_values_in_buffer_1 = [x for x in buffer_1.read()] if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1): # clear the buffer data if required conditions are met. buffer_1.clear() # add the value to buffer 2 buffer_2.add(value) if StatefulDoFn._all_condition_met(): # Clear the timer if certain condition met and you don't want to trigger # the callback method. watermark_timer.clear() yield element @on_timer(WATERMARK_TIMER) def on_expiry_1(self, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam, key=beam.DoFn.KeyParam, buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1), buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)): # Window and key parameters are really useful especially for debugging issues. yield 'expired1' @staticmethod def _all_condition_met(): # some logic return True @staticmethod ``` -------------------------------- ### Build Custom Beam Go SDK Container Image from Scratch (Dockerfile) Source: https://beam.apache.org/documentation/runtime/environments This Dockerfile example illustrates building a custom container image for the Apache Beam Go SDK from scratch using Alpine Linux. It involves building the Go bootloader and setting it as the container's entrypoint. ```dockerfile FROM golang:latest-alpine AS build_base WORKDIR /tmp/beam RUN GOBIN=`pwd` go install github.com/apache/beam/sdks/v2/go/container@v2.53.0 FROM alpine:3.9 RUN apk add ca-certificates COPY --from=build_base /tmp/beam/container /opt/apache/beam/boot ENTRYPOINT ["/opt/apache/beam/boot"] ``` -------------------------------- ### Go SplittableDoFn Initial Split Example Source: https://beam.apache.org/documentation/programming-guide/index Demonstrates how to implement `SplitRestriction` in Go for a SplittableDoFn. This function divides a given `offsetrange.Restriction` into multiple smaller restrictions, each approximately 64 MiB in size. ```Go func (fn *splittableDoFn) SplitRestriction(filename string, rest offsetrange.Restriction) (splits []offsetrange.Restriction) { size := 64 * (1 << 20) i := rest.Start for i < rest.End - size { // Compute and output 64 MiB size ranges to process in parallel end := i + size splits = append(splits, offsetrange.Restriction{i, end}) i = end } // Output the last range splits = append(splits, offsetrange.Restriction{i, rest.End}) return splits } ``` -------------------------------- ### Python Apache Beam ReadAllFromBigQuery Example Source: https://beam.apache.org/documentation/io/io-standards Shows how to use `ReadAllFromBigQuery` for sources that support runtime configuration, enabling batch sources to function as streaming sources. It emphasizes schema richness and type safety. ```Python ReadAllFromBigQuery ``` -------------------------------- ### Apache Beam Sliding Time Windows Source: https://beam.apache.org/documentation/programming-guide/index Explains sliding time windows, which can overlap. This example uses a 60-second window duration and a 30-second period, allowing for running computations. ```Conceptual Each window captures 60 seconds worth of data, but a new window starts every 30 seconds. This is useful for calculating running averages. ``` -------------------------------- ### Add Docker Apt Repository and Update Source: https://beam.apache.org/documentation/sdks/python-sdk-image-build Adds the Docker Apt repository to the system and updates the package list to include Docker packages. This ensures that the latest Docker CE is available for installation. ```bash echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian \ $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \ sudo tee /etc/apt/sources.list.d/docker.list > /dev/null sudo apt-get update ``` -------------------------------- ### Simple Sum Combine (Python) Source: https://beam.apache.org/documentation/programming-guide/index Demonstrates a simple sum operation using the Beam Combine transform in Python. This example showcases how to apply a basic aggregation function to a PCollection. ```Python import apache_beam as beam with beam.Pipeline() as pipeline: numbers = pipeline | 'CreateNumbers' >> beam.Create([1, 2, 3, 4, 5]) sum_of_numbers = numbers | 'SumNumbers' >> beam.CombineGlobally(sum) sum_of_numbers | 'PrintSum' >> beam.Map(print) ``` -------------------------------- ### Define Pipeline Options (Yaml) Source: https://beam.apache.org/documentation/programming-guide/index Shows a basic YAML structure for defining pipeline and options. ```Yaml pipeline: ... options: ... ``` -------------------------------- ### Run WordCount Example with DataflowRunner Source: https://beam.apache.org/documentation/runtime/environments This command executes the Apache Beam WordCount example using the DataflowRunner. It specifies input and output paths, project, region, temporary location, enables the 'use_runner_v2' experiment, and points to a custom SDK container image. ```Shell python -m apache_beam.examples.wordcount \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output "${GCS_PATH}/counts" \ --runner DataflowRunner \ --project $GCP_PROJECT \ --region $REGION \ --temp_location "${GCS_PATH}/tmp/" \ --experiment=use_runner_v2 \ --sdk_container_image=$IMAGE_URL ``` -------------------------------- ### Python ExternalTransformProvider Initialization Source: https://beam.apache.org/documentation/sdks/python-custom-multi-language-pipelines-guide Shows how to initialize the `ExternalTransformProvider` in Python to connect to Apache Beam expansion services. It covers connecting to a running service via address and starting a service from a Java JAR file. ```Python from apache_beam.transforms.external_transform_provider import ExternalTransformProvider ``` ```Python provider = ExternalTransformProvider("localhost:12345") ``` ```Python from apache_beam.transforms.external import JavaJarExpansionService provider = ExternalTransformProvider( JavaJarExpansionService("path/to/my-expansion-service.jar")) ``` ```Python provider = ExternalTransformProvider([ "localhost:12345", JavaJarExpansionService("path/to/my-expansion-service.jar"), JavaJarExpansionService("path/to/another-expansion-service.jar")]) ``` -------------------------------- ### Python Development Environment Setup for Apache Beam Source: https://beam.apache.org/documentation/resources/learning-resources This resource focuses on setting up a Python development environment for Apache Beam, utilizing PyCharm as the IDE. It covers necessary configurations for smooth pipeline development. ```Python # Example configuration for PyCharm for Apache Beam Python development # This would involve setting up a virtual environment and project interpreter. ``` -------------------------------- ### Branching Pipeline (Java, Python, Go, TypeScript) Source: https://beam.apache.org/documentation/programming-guide/index Illustrates creating a branching pipeline where multiple transforms are applied to the same input PCollection. ```Java [PCollection of database table rows] = [Database Table Reader].apply([Read Transform]) [PCollection of 'A' names] = [PCollection of database table rows].apply([Transform A]) [PCollection of 'B' names] = [PCollection of database table rows].apply([Transform B]) ``` ```Python [PCollection of database table rows] = [Database Table Reader] | [Read Transform] [PCollection of 'A' names] = [PCollection of database table rows] | [Transform A] [PCollection of 'B' names] = [PCollection of database table rows] | [Transform B] ``` ```Go [PCollection of database table rows] = beam.ParDo(scope, [Read Transform], [Database Table Reader]) [PCollection of 'A' names] = beam.ParDo(scope, [Transform A], [PCollection of database table rows]) ``` ```TypeScript [PCollection of database table rows] = [Database Table Reader].apply([Read Transform]) [PCollection of 'A' names] = [PCollection of database table rows].apply([Transform A]) [PCollection of 'B' names] = [PCollection of database table rows].apply([Transform B]) ``` -------------------------------- ### Run Data Ingestion Pipeline Source: https://beam.apache.org/documentation/ml/anomaly-detection Instructions for running the Apache Beam data ingestion pipeline locally or on Google Cloud Dataflow. Requires installation of packages and configuration of Google Cloud variables. ```bash python main.py python main.py --mode cloud ``` -------------------------------- ### Using RunInference from Java SDK in Apache Beam Source: https://beam.apache.org/documentation/ml/overview Demonstrates how to use the RunInference transform from the Apache Beam Java SDK for ML inference. This enables multi-language pipelines where Java components can leverage ML models. ```Java import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.transforms.join.Key; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; // Example structure for a Java RunInference transform wrapper // public class RunInferenceJava extends PTransform, PCollection> { // private final ModelHandler modelHandler; // public RunInferenceJava(ModelHandler modelHandler) { // this.modelHandler = modelHandler; // } // @Override // public PCollection expand(PCollection input) { // return input.apply("RunInference", ParDo.of(new RunInferenceDoFn<>(modelHandler))); // } // } ``` -------------------------------- ### Per Entity Training in Apache Beam Source: https://beam.apache.org/documentation/ml/overview Explains the concept and implementation of per-entity training using Apache Beam. This approach allows for training models tailored to individual entities or user groups within a large dataset. ```Python # Conceptual example for per-entity training # def train_model_for_entity(entity_data): # # Train a model specific to the entity # pass # pipeline | 'GroupPerEntity' >> beam.GroupByKey() # | 'TrainPerEntity' >> beam.MapTuple(train_model_for_entity) ```