### Create Example Init Script Source: https://cloud.google.com/dataproc/docs/guides/dpgke/dataproc-gke-custom-images Creates a shell script named 'init-script.sh' that writes 'hello world' to a file. This script can be used for initialization tasks when the Dataproc cluster starts. ```bash cat >init-script.sh </tmp/init-script.out EOF ``` -------------------------------- ### Install and Configure Kafka Server Source: https://cloud.google.com/dataproc/docs/tutorials/dataproc-kafka-tutorial?hl=es Installs Kafka, configures its properties including Zookeeper connection and broker ID, and starts the Kafka service. It handles both HA and non-HA configurations. ```bash function install_and_configure_kafka_server() { # Find zookeeper list first, before attempting any installation. local zookeeper_client_port zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg | tail -n 1 | cut -d '=' -f 2) local zookeeper_list zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg | cut -d '=' -f 2 | cut -d ':' -f 1 | sort | uniq | sed "s/$/:${zookeeper_client_port}/" | xargs echo | sed "s/ /,/g") if [[ -z "${zookeeper_list}" ]]; then # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't # bother to populate it. Check if YARN HA is configured. zookeeper_list=$(bdconfig get_property_value --configuration_file /etc/hadoop/conf/yarn-site.xml --name yarn.resourcemanager.zk-address 2>/dev/null) fi # If all attempts failed, error out. if [[ -z "${zookeeper_list}" ]]; then err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA' fi ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}" # Install Kafka from Dataproc distro. install_apt_get kafka-server || dpkg -l kafka-server || err 'Unable to install and find kafka-server.' mkdir -p /var/lib/kafka-logs chown kafka:kafka -R /var/lib/kafka-logs if [[ "${ROLE}" == "Master" ]]; then # For master nodes, broker ID starts from 10,000. if [[ "$(hostname)" == *-m ]]; then # non-HA BROKER_ID=10000 else # HA BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g'))) fi else # For worker nodes, broker ID is a random number generated less than 10000. # 10000 is choosen since the max broker ID allowed being set is 10000. BROKER_ID=$((RANDOM % 10000)) fi sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' "${KAFKA_PROP_FILE}" sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' "${KAFKA_PROP_FILE}" sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' "${KAFKA_PROP_FILE}" echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}" echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}" if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh fi wait_for_zookeeper # Start Kafka. service kafka-server restart wait_for_kafka } ``` -------------------------------- ### Create a Scala "Hello World" Project Source: https://cloud.google.com/dataproc/docs/tutorials/spark-scala?hl=de This snippet demonstrates how to create a basic Scala "Hello World" project using the command line. ```bash $ mkdir hello $ cd hello $ echo \ 'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \ Helloworld.scala ``` -------------------------------- ### Example Dockerfile for Spark Runtime Customization Source: https://cloud.google.com/dataproc/docs/guides/dataproc-docker-yarn?hl=it This Dockerfile provides a base for customizing your Spark job's runtime environment. It includes steps for installing utilities, adding extra JARs, setting up Miniconda3, installing Conda packages, and adding custom Python modules. Use this as a starting point and modify it according to your specific needs. ```docker FROM debian:10-slim # Suppress interactive prompts. ENV DEBIAN_FRONTEND=noninteractive # Required: Install utilities required by Spark scripts. RUN apt update && apt install -y procps tini # Optional: Add extra jars. ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/ ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*' RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" COPY *.jar "${SPARK_EXTRA_JARS_DIR}" # Optional: Install and configure Miniconda3. ENV CONDA_HOME=/opt/miniconda3 ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python ENV PYSPARK_DRIVER_PYTHON=${CONDA_HOME}/bin/python ENV PATH=${CONDA_HOME}/bin:${PATH} COPY Miniconda3-py39_4.10.3-Linux-x86_64.sh . RUN bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /opt/miniconda3 \ && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \ && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict # Optional: Install Conda packages. # # The following packages are installed in the default image. It is strongly # recommended to include all of them. # # Use mamba to install packages quickly. RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \ && ${CONDA_HOME}/bin/mamba install \ conda \ cython \ fastavro \ fastparquet \ gcsfs \ google-cloud-bigquery-storage \ google-cloud-bigquery[pandas] \ google-cloud-bigtable \ google-cloud-container \ google-cloud-datacatalog \ google-cloud-dataproc \ google-cloud-datastore \ google-cloud-language \ google-cloud-logging \ google-cloud-monitoring \ google-cloud-pubsub \ google-cloud-redis \ google-cloud-spanner \ google-cloud-speech \ google-cloud-storage \ google-cloud-texttospeech \ google-cloud-translate \ google-cloud-vision \ koalas \ matplotlib \ nltk \ numba \ numpy \ openblas \ pandas \ pyarrow \ pysal \ pytables \ python \ regex \ requests \ rtree \ scikit-image \ scikit-learn \ scipy \ seaborn \ sqlalchemy \ sympy \ virtualenv # Optional: Add extra Python modules. ENV PYTHONPATH=/opt/python/packages RUN mkdir -p "${PYTHONPATH}" COPY test_util.py "${PYTHONPATH}" # Required: Create the 'yarn_docker_user' group/user. ``` -------------------------------- ### Create SBT Project Directory and Scala File Source: https://cloud.google.com/dataproc/docs/tutorials/spark-scala Set up a new project directory and create a HelloWorld.scala file for an SBT project. ```bash $ mkdir hello $ cd hello $ echo \ 'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \ HelloWorld.scala ``` -------------------------------- ### Dataproc Quickstart Setup Notes (Python) Source: https://cloud.google.com/dataproc/docs/samples/dataproc-quickstart?hl=zh-TW This section provides setup instructions for the Dataproc quickstart using the Python client library, including authentication and API reference links. ```python """ This quickstart sample walks a user through creating a Cloud Dataproc cluster, submitting a PySpark job from Google Cloud Storage to the cluster, reading the output of the job and deleting the cluster, all using the Python client library. Usage: python quickstart.py --project_id --region \ --cluster_name --job_file_path """ import argparse import re from google.cloud import dataproc_v1 as dataproc from google.cloud import storage def quickstart(project_id, region, cluster_name, job_file_path): # Create the cluster client. ``` -------------------------------- ### Dataproc Customization Script Example Source: https://cloud.google.com/dataproc/docs/guides/dataproc-images This bash script installs custom packages and updates configurations for Dataproc images. It includes updating apt-get, installing python-dev and python-pip, and installing the numpy package. ```bash #! /usr/bin/bash apt-get -y update apt-get install python-dev apt-get install python-pip pip install numpy ``` -------------------------------- ### Example Workflow Instantiation and Output Source: https://cloud.google.com/dataproc/docs/concepts/workflows/use-workflows Demonstrates the command to instantiate a workflow template and shows a sample output indicating the workflow's progress from running to completion, including cluster and job status. ```bash gcloud beta dataproc workflow-templates instantiate my-template-id \ --region=us-central1 ... WorkflowTemplate [my-template-id] RUNNING ... Created cluster: my-template-id-rg544az7mpbfa. Job ID teragen-rg544az7mpbfa RUNNING Job ID teragen-rg544az7mpbfa COMPLETED Job ID terasort-rg544az7mpbfa RUNNING Job ID terasort-rg544az7mpbfa COMPLETED Job ID teravalidate-rg544az7mpbfa RUNNING Job ID teravalidate-rg544az7mpbfa COMPLETED ... Deleted cluster: my-template-id-rg544az7mpbfa. WorkflowTemplate [my-template-id] DONE ``` -------------------------------- ### Dataproc Quickstart with Java Client Library Source: https://cloud.google.com/dataproc/docs/samples/dataproc-quickstart?hl=de This sample walks through creating a Cloud Dataproc cluster, submitting a PySpark job from Google Cloud Storage, reading the job output, and deleting the cluster using the Java client library. Ensure you have followed the Java setup guide for Managed Service for Apache Spark and set up application default credentials for authentication. ```java /* This quickstart sample walks a user through creating a Cloud Dataproc * cluster, submitting a PySpark job from Google Cloud Storage to the * cluster, reading the output of the job and deleting the cluster, all * using the Java client library. * * Usage: * mvn clean package -DskipTests * * mvn exec:java -Dexec.args=" " * * You can also set these arguments in the main function instead of providing them via the CLI. */ import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.dataproc.v1.Cluster; import com.google.cloud.dataproc.v1.ClusterConfig; import com.google.cloud.dataproc.v1.ClusterControllerClient; import com.google.cloud.dataproc.v1.ClusterControllerSettings; import com.google.cloud.dataproc.v1.ClusterOperationMetadata; import com.google.cloud.dataproc.v1.InstanceGroupConfig; import com.google.cloud.dataproc.v1.Job; import com.google.cloud.dataproc.v1.JobControllerClient; import com.google.cloud.dataproc.v1.JobControllerSettings; import com.google.cloud.dataproc.v1.JobMetadata; import com.google.cloud.dataproc.v1.JobPlacement; import com.google.cloud.dataproc.v1.PySparkJob; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.protobuf.Empty; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.regex.Matcher; import java.util.regex.Pattern; public class Quickstart { public static void quickstart( String projectId, String region, String clusterName, String jobFilePath) throws IOException, InterruptedException { String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); // Configure the settings for the cluster controller client. ClusterControllerSettings clusterControllerSettings = ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); // Configure the settings for the job controller client. JobControllerSettings jobControllerSettings = JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); // Create both a cluster controller client and job controller client with the // configured settings. The client only needs to be created once and can be reused for // multiple requests. Using a try-with-resources closes the client, but this can also be done // manually with the .close() method. try (ClusterControllerClient clusterControllerClient = ClusterControllerClient.create(clusterControllerSettings); JobControllerClient jobControllerClient = JobControllerClient.create(jobControllerSettings)) { // Configure the settings for our cluster. InstanceGroupConfig masterConfig = InstanceGroupConfig.newBuilder() .setMachineTypeUri("n1-standard-2") .setNumInstances(1) .build(); InstanceGroupConfig workerConfig = InstanceGroupConfig.newBuilder() .setMachineTypeUri("n1-standard-2") .setNumInstances(2) .build(); ClusterConfig clusterConfig = ClusterConfig.newBuilder() .setMasterConfig(masterConfig) .setWorkerConfig(workerConfig) .build(); // Create the cluster object with the desired cluster config. Cluster cluster = Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build(); // Create the Cloud Dataproc cluster. OperationFuture createClusterAsyncRequest = clusterControllerClient.createClusterAsync(projectId, region, cluster); Cluster clusterResponse = createClusterAsyncRequest.get(); System.out.println( String.format("Cluster created successfully: %s", clusterResponse.getClusterName())); // Configure the settings for our job. JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build(); PySparkJob pySparkJob = PySparkJob.newBuilder().setMainPythonFileUri(jobFilePath).build(); Job job = Job.newBuilder().setPlacement(jobPlacement).setPysparkJob(pySparkJob).build(); // Submit an asynchronous request to execute the job. OperationFuture submitJobAsOperationAsyncRequest = jobControllerClient.submitJobAsOperationAsync(projectId, region, job); ``` -------------------------------- ### Example Workflow Instantiation and Execution Source: https://cloud.google.com/dataproc/docs/concepts/workflows/use-workflows?hl=es Demonstrates the command and expected output for instantiating a workflow template, showing cluster creation, job execution, and cluster deletion. ```bash gcloud beta dataproc workflow-templates instantiate my-template-id \ --region=us-central1 ... WorkflowTemplate [my-template-id] RUNNING ... Created cluster: my-template-id-rg544az7mpbfa. Job ID teragen-rg544az7mpbfa RUNNING Job ID teragen-rg544az7mpbfa COMPLETED Job ID terasort-rg544az7mpbfa RUNNING Job ID terasort-rg544az7mpbfa COMPLETED Job ID teravalidate-rg544az7mpbfa RUNNING Job ID teravalidate-rg544az7mpbfa COMPLETED ... Deleted cluster: my-template-id-rg544az7mpbfa. WorkflowTemplate [my-template-id] DONE ``` -------------------------------- ### Launch SBT and Run Code Source: https://cloud.google.com/dataproc/docs/tutorials/spark-scala Launch the SBT interactive shell and run the 'HelloWorld' application. ```bash $ sbt [info] Set current project to hello ... > run ... Compiling 1 Scala source to .../hello/target/scala-.../classes... ... Running HelloWorld Hello, world! [success] Total time: 3 s ... ``` -------------------------------- ### Dataproc Cluster Properties Example Source: https://cloud.google.com/dataproc/docs/concepts/compute/supported-machine-types This is a sample of cluster properties that might be displayed in the terminal after a Dataproc cluster starts. ```properties ... properties: distcp:mapreduce.map.java.opts: -Xmx1638m distcp:mapreduce.map.memory.mb: '2048' distcp:mapreduce.reduce.java.opts: -Xmx4915m distcp:mapreduce.reduce.memory.mb: '6144' mapred:mapreduce.map.cpu.vcores: '1' mapred:mapreduce.map.java.opts: -Xmx1638m ... ``` -------------------------------- ### Instantiate a Workflow Template (Example) Source: https://cloud.google.com/dataproc/docs/concepts/workflows/use-workflows?hl=pt-BR An example of instantiating a workflow template and the expected output, showing the lifecycle of the workflow execution from running to completion, including cluster creation and deletion. ```bash gcloud beta dataproc workflow-templates instantiate my-template-id \ --region=us-central1 ... WorkflowTemplate [my-template-id] RUNNING ... Created cluster: my-template-id-rg544az7mpbfa. Job ID teragen-rg544az7mpbfa RUNNING Job ID teragen-rg544az7mpbfa COMPLETED Job ID terasort-rg544az7mpbfa RUNNING Job ID terasort-rg544az7mpbfa COMPLETED Job ID teravalidate-rg544az7mpbfa RUNNING Job ID teravalidate-rg544az7mpbfa COMPLETED ... Deleted cluster: my-template-id-rg544az7mpbfa. WorkflowTemplate [my-template-id] DONE ``` -------------------------------- ### Example Output of Connector Verification Source: https://cloud.google.com/dataproc/docs/tutorials/spark-hbase?hl=it Shows the expected output when the HBase Spark connector is found, indicating a successful installation. ```text -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar ``` -------------------------------- ### Create Sample Init Script Source: https://cloud.google.com/dataproc/docs/guides/dpgke/dataproc-gke-custom-images?hl=ja This command creates a sample shell script named `init-script.sh` that writes 'hello world' to a file. ```bash cat >init-script.sh </tmp/init-script.out EOF ``` -------------------------------- ### Dataproc Quickstart Example Source: https://cloud.google.com/dataproc/docs/samples/dataproc-quickstart?hl=zh-CN This Python script orchestrates the creation of a Dataproc cluster, submits a PySpark job, retrieves its output, and finally deletes the cluster. It requires the `google-cloud-dataproc` and `google-cloud-storage` libraries. ```python cluster_client = dataproc.ClusterControllerClient( client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} ) # Create the cluster config. cluster = { "project_id": project_id, "cluster_name": cluster_name, "config": { "master_config": { "num_instances": 1, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}, }, "worker_config": { "num_instances": 2, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}, }, }, } # Create the cluster. operation = cluster_client.create_cluster( request={"project_id": project_id, "region": region, "cluster": cluster} ) result = operation.result() print("Cluster created successfully: {}".format(result.cluster_name)) # Create the job client. job_client = dataproc.JobControllerClient( client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} ) # Create the job config. job = { "placement": {"cluster_name": cluster_name}, "pyspark_job": {"main_python_file_uri": job_file_path}, } operation = job_client.submit_job_as_operation( request={"project_id": project_id, "region": region, "job": job} ) response = operation.result() # Dataproc job output gets saved to the Google Cloud Storage bucket # allocated to the job. Use a regex to obtain the bucket and blob info. matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) output = ( storage.Client() .get_bucket(matches.group(1)) .blob(f"{matches.group(2)}.000000000") .download_as_bytes() .decode("utf-8") ) print(f"Job finished successfully: {output}") # Delete the cluster once the job has terminated. operation = cluster_client.delete_cluster( request={ "project_id": project_id, "region": region, "cluster_name": cluster_name, } ) operation.result() print("Cluster {} successfully deleted.".format(cluster_name)) if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--project_id", type=str, required=True, help="Project to use for creating resources.", ) parser.add_argument( "--region", type=str, required=True, help="Region where the resources should live.", ) parser.add_argument( "--cluster_name", type=str, required=True, help="Name to use for creating a cluster.", ) parser.add_argument( "--job_file_path", type=str, required=True, help="Job in GCS to execute against the cluster.", ) args = parser.parse_args() quickstart(args.project_id, args.region, args.cluster_name, args.job_file_path) ``` -------------------------------- ### Create Python Example File Source: https://cloud.google.com/dataproc/docs/guides/dpgke/dataproc-gke-custom-images Creates a Python file named 'test_util.py' with two functions: 'hello' and 'read_lines'. This demonstrates how to include custom Python code in your image. ```bash cat >test_util.py <<'EOF' def hello(name): print("hello {}".format(name)) def read_lines(path): with open(path) as f: return f.readlines() EOF ``` -------------------------------- ### Get Driver Node Group Metadata Source: https://cloud.google.com/dataproc/docs/guides/node-groups/dataproc-driver-node-groups?hl=pt-BR This snippet demonstrates how to retrieve metadata for a specific Dataproc driver node group using the Dataproc API. It includes examples for both curl and gcloud CLI. ```APIDOC ## GET https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/clusters/CLUSTER_NAME/nodeGroups/NODE_GROUP_ID ### Description Retrieves metadata for a specific Dataproc node group, which can include driver node groups. ### Method GET ### Endpoint https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/clusters/CLUSTER_NAME/nodeGroups/NODE_GROUP_ID ### Parameters #### Path Parameters - **PROJECT_ID** (string) - Required - The Google Cloud project ID. - **REGION** (string) - Required - The region of the cluster. - **CLUSTER_NAME** (string) - Required - The name of the cluster. - **NODE_GROUP_ID** (string) - Required - The ID of the node group to describe. ### Request Example (curl) ```bash curl -X GET \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ "https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/clusters/CLUSTER_NAME/nodeGroups/NODE_GROUP_ID" ``` ### Request Example (gcloud CLI) ```bash gcloud dataproc node-groups describe NODE_GROUP_ID \ --cluster=CLUSTER_NAME \ --region=REGION ``` ### Response #### Success Response (200) Returns a JSON object containing the metadata of the specified node group. ``` -------------------------------- ### Example Dataproc initialization script output Source: https://cloud.google.com/dataproc/docs/guides/dataproc-best-practices?hl=it Example output from a Dataproc initialization script, showing Ranger version and configuration status. ```bash + readonly RANGER_VERSION=1.2.0 ... Ranger admin password not set. Please use metadata flag - default-password ``` -------------------------------- ### Dataproc Quickstart Example Source: https://cloud.google.com/dataproc/docs/samples/dataproc-quickstart?hl=zh-TW This Python script demonstrates how to create a Dataproc cluster, submit a PySpark job, retrieve job output, and delete the cluster. It requires the google-cloud-dataproc library and argparse for command-line arguments. ```python cluster_client = dataproc.ClusterControllerClient( client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} ) # Create the cluster config. cluster = { "project_id": project_id, "cluster_name": cluster_name, "config": { "master_config": { "num_instances": 1, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}, }, "worker_config": { "num_instances": 2, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}, }, }, } # Create the cluster. operation = cluster_client.create_cluster( request={"project_id": project_id, "region": region, "cluster": cluster} ) result = operation.result() print("Cluster created successfully: {}".format(result.cluster_name)) # Create the job client. job_client = dataproc.JobControllerClient( client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} ) # Create the job config. job = { "placement": {"cluster_name": cluster_name}, "pyspark_job": {"main_python_file_uri": job_file_path}, } operation = job_client.submit_job_as_operation( request={"project_id": project_id, "region": region, "job": job} ) response = operation.result() # Dataproc job output gets saved to the Google Cloud Storage bucket # allocated to the job. Use a regex to obtain the bucket and blob info. matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) output = ( storage.Client() .get_bucket(matches.group(1)) .blob(f"{matches.group(2)}.000000000") .download_as_bytes() .decode("utf-8") ) print(f"Job finished successfully: {output}") # Delete the cluster once the job has terminated. operation = cluster_client.delete_cluster( request={ "project_id": project_id, "region": region, "cluster_name": cluster_name, } ) operation.result() print("Cluster {} successfully deleted.".format(cluster_name)) if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--project_id", type=str, required=True, help="Project to use for creating resources.", ) parser.add_argument( "--region", type=str, required=True, help="Region where the resources should live.", ) parser.add_argument( "--cluster_name", type=str, required=True, help="Name to use for creating a cluster.", ) parser.add_argument( "--job_file_path", type=str, required=True, help="Job in GCS to execute against the cluster.", ) args = parser.parse_args() quickstart(args.project_id, args.region, args.cluster_name, args.job_file_path) ``` -------------------------------- ### Install and Configure Kafka Server Source: https://cloud.google.com/dataproc/docs/tutorials/dataproc-kafka-tutorial?hl=ja Installs Kafka, configures Zookeeper connection, sets broker ID, and manages log directories. Includes JMX options if enabled. ```bash function install_and_configure_kafka_server() { # Find zookeeper list first, before attempting any installation. local zookeeper_client_port zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg | tail -n 1 | cut -d '=' -f 2) local zookeeper_list zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg | cut -d '=' -f 2 | cut -d ':' -f 1 | sort | uniq | sed "s/$/:${zookeeper_client_port}/" | xargs echo | sed "s/ /,/g") if [[ -z "${zookeeper_list}" ]]; then # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't # bother to populate it. Check if YARN HA is configured. zookeeper_list=$(bdconfig get_property_value --configuration_file /etc/hadoop/conf/yarn-site.xml --name yarn.resourcemanager.zk-address 2>/dev/null) fi # If all attempts failed, error out. if [[ -z "${zookeeper_list}" ]]; then err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA' fi ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}" # Install Kafka from Dataproc distro. install_apt_get kafka-server || dpkg -l kafka-server || err 'Unable to install and find kafka-server.' mkdir -p /var/lib/kafka-logs chown kafka:kafka -R /var/lib/kafka-logs if [[ "${ROLE}" == "Master" ]]; then # For master nodes, broker ID starts from 10,000. if [[ "$(hostname)" == *-m ]]; then # non-HA BROKER_ID=10000 else # HA BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g'))) fi else # For worker nodes, broker ID is a random number generated less than 10000. # 10000 is choosen since the max broker ID allowed being set is 10000. BROKER_ID=$((RANDOM % 10000)) fi sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' "${KAFKA_PROP_FILE}" sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' "${KAFKA_PROP_FILE}" sed -i 's,^\(broker\.id=\).*,'\1'${BROKER_ID}',' "${KAFKA_PROP_FILE}" echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}" echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}" if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh fi wait_for_zookeeper # Start Kafka. service kafka-server restart wait_for_kafka } ``` -------------------------------- ### Airflow DAG for Dataproc Workflow (Airflow 2) Source: https://cloud.google.com/dataproc/docs/tutorials/workflow-composer?hl=fr An example Airflow DAG using DataprocInstantiateWorkflowTemplateOperator to run a Dataproc workflow template. It relies on an Airflow variable 'project_id' and is configured to start one day ago. ```python """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a Spark Pi Job. This DAG relies on an Airflow variable https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template. """ import datetime from airflow import models from airflow.providers.google.cloud.operators.dataproc import ( DataprocInstantiateWorkflowTemplateOperator, ) from airflow.utils.dates import days_ago project_id = "{{var.value.project_id}}" default_args = { # Tell airflow to start one day ago, so that it runs as soon as you upload it "start_date": days_ago(1), "project_id": project_id, } # Define a DAG (directed acyclic graph) of tasks. # Any task you create within the context manager is automatically added to the # DAG object. with models.DAG( # The id you will see in the DAG airflow page "dataproc_workflow_dag", default_args=default_args, # The interval with which to schedule the DAG schedule_interval=datetime.timedelta(days=1), # Override to match your needs ) as dag: start_template_job = DataprocInstantiateWorkflowTemplateOperator( # The task id of your job task_id="dataproc_workflow_dag", # The template id of your workflow template_id="sparkpi", project_id=project_id, # The region for the template region="us-central1", ) ``` -------------------------------- ### Create Dataproc Cluster in Node.js Source: https://cloud.google.com/dataproc/docs/guides/create-cluster?hl=de Use the Node.js client library to create a Dataproc cluster. Ensure you have the client library installed and application default credentials configured. The example shows how to set master and worker configurations. ```javascript const dataproc = require('@google-cloud/dataproc'); // TODO(developer): Uncomment and set the following variables // projectId = 'YOUR_PROJECT_ID' // region = 'YOUR_CLUSTER_REGION' // clusterName = 'YOUR_CLUSTER_NAME' // Create a client with the endpoint set to the desired cluster region const client = new dataproc.v1.ClusterControllerClient({ apiEndpoint: `${region}-dataproc.googleapis.com`, projectId: projectId, }); async function createCluster() { // Create the cluster config const request = { projectId: projectId, region: region, cluster: { clusterName: clusterName, config: { masterConfig: { numInstances: 1, machineTypeUri: 'n1-standard-2', }, workerConfig: { numInstances: 2, machineTypeUri: 'n1-standard-2', }, }, }, }; // Create the cluster const [operation] = await client.createCluster(request); const [response] = await operation.promise(); // Output a success message console.log(`Cluster created successfully: ${response.clusterName}`); } ``` -------------------------------- ### Install and Configure Kafka Server Source: https://cloud.google.com/dataproc/docs/tutorials/dataproc-kafka-tutorial Installs Kafka, configures it with Zookeeper addresses, sets broker IDs, and enables topic deletion and JMX monitoring if specified. ```bash function install_and_configure_kafka_server() { # Find zookeeper list first, before attempting any installation. local zookeeper_client_port zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg | tail -n 1 | cut -d '=' -f 2) local zookeeper_list zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg | cut -d '=' -f 2 | cut -d ':' -f 1 | sort | uniq | sed "s/$/:${zookeeper_client_port}/" | xargs echo | sed "s/ /,/g") if [[ -z "${zookeeper_list}" ]]; then # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't # bother to populate it. Check if YARN HA is configured. zookeeper_list=$(bdconfig get_property_value --configuration_file \ /etc/hadoop/conf/yarn-site.xml \ --name yarn.resourcemanager.zk-address 2>/dev/null) fi # If all attempts failed, error out. if [[ -z "${zookeeper_list}" ]]; then err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA' fi ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}" # Install Kafka from Dataproc distro. install_apt_get kafka-server || dpkg -l kafka-server || err 'Unable to install and find kafka-server.' mkdir -p /var/lib/kafka-logs chown kafka:kafka -R /var/lib/kafka-logs if [[ "${ROLE}" == "Master" ]]; then # For master nodes, broker ID starts from 10,000. if [[ "$(hostname)" == *-m ]]; then # non-HA BROKER_ID=10000 else # HA BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g'))) fi else # For worker nodes, broker ID is a random number generated less than 10000. # 10000 is choosen since the max broker ID allowed being set is 10000. BROKER_ID=$((RANDOM % 10000)) fi sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \ "${KAFKA_PROP_FILE}" sed -i 's|^(zookeeper\.connect=).*|\1'${zookeeper_list}'|' \ "${KAFKA_PROP_FILE}" sed -i 's,^(broker\.id=).*,'${BROKER_ID}',' \ "${KAFKA_PROP_FILE}" echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}" echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}" if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh fi wait_for_zookeeper # Start Kafka. service kafka-server restart wait_for_kafka } ``` -------------------------------- ### Install Kafka Initialization Action Script Source: https://cloud.google.com/dataproc/docs/tutorials/dataproc-kafka-tutorial?hl=es This bash script installs Apache Kafka on a Google Cloud Dataproc cluster. It configures Kafka properties and handles role-based execution on the cluster. ```bash #!/bin/bash # Copyright 2015 Google, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud # Dataproc cluster. set -euxo pipefail readonly ZOOKEEPER_HOME=/usr/lib/zookeeper readonly KAFKA_HOME=/usr/lib/kafka readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties' readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)" readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)" readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)" readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/jmx-port || echo 9999)" readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)" # The first ZooKeeper server address, e.g., "cluster1-m-0:2181". ZOOKEEPER_ADDRESS='' ``` -------------------------------- ### Python Quickstart for Dataproc Cluster and PySpark Job Source: https://cloud.google.com/dataproc/docs/quickstarts/create-cluster-client-libraries?hl=es This script creates a Dataproc cluster, submits a PySpark job, downloads the job output, and deletes the cluster. Ensure you have the necessary Python libraries installed and authentication configured. ```python import argparse import re from google.cloud import dataproc_v1 as dataproc from google.cloud import storage def quickstart(project_id, region, cluster_name, job_file_path): # Create the cluster client. cluster_client = dataproc.ClusterControllerClient( client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} ) # Create the cluster config. cluster = { "project_id": project_id, "cluster_name": cluster_name, "config": { "master_config": { "num_instances": 1, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}, }, "worker_config": { "num_instances": 2, "machine_type_uri": "n1-standard-2", "disk_config": {"boot_disk_size_gb": 100}, }, }, } # Create the cluster. operation = cluster_client.create_cluster( request={"project_id": project_id, "region": region, "cluster": cluster} ) result = operation.result() print("Cluster created successfully: {}".format(result.cluster_name)) # Create the job client. job_client = dataproc.JobControllerClient( client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} ) # Create the job config. job = { "placement": {"cluster_name": cluster_name}, "pyspark_job": {"main_python_file_uri": job_file_path}, } operation = job_client.submit_job_as_operation( request={"project_id": project_id, "region": region, "job": job} ) response = operation.result() # Dataproc job output gets saved to the Google Cloud Storage bucket # allocated to the job. Use a regex to obtain the bucket and blob info. matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) output = ( storage.Client() .get_bucket(matches.group(1)) .blob(f"{matches.group(2)}.000000000") .download_as_bytes() .decode("utf-8") ) print(f"Job finished successfully: {output}") # Delete the cluster once the job has terminated. operation = cluster_client.delete_cluster( request={ "project_id": project_id, "region": region, "cluster_name": cluster_name, } ) operation.result() print("Cluster {} successfully deleted.".format(cluster_name)) if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--project_id", type=str, required=True, help="Project to use for creating resources.", ) parser.add_argument( "--region", ``` -------------------------------- ### Instantiate Inline Workflow Template (Python) Source: https://cloud.google.com/dataproc/docs/samples/dataproc-instantiate-inline-workflow-template?hl=de This Python sample demonstrates submitting a workflow template using the Dataproc client library. Ensure you have followed the setup guide for Managed Service for Apache Spark and configured authentication. ```python from google.cloud import dataproc_v1 as dataproc def instantiate_inline_workflow_template(project_id, region): """This sample walks a user through submitting a workflow for a Cloud Dataproc using the Python client library. Args: project_id (string): Project to use for running the workflow. region (string): Region where the workflow resources should live. """ # Create a client with the endpoint set to the desired region. workflow_template_client = dataproc.WorkflowTemplateServiceClient( client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} ) parent = f"projects/{project_id}/regions/{region}" template = { "jobs": [ { "hadoop_job": { "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/" \ "hadoop-mapreduce-examples.jar", "args": ["teragen", "1000", "hdfs:///gen/"], }, "step_id": "teragen", }, { "hadoop_job": { "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/" \ "hadoop-mapreduce-examples.jar", "args": ["terasort", "hdfs:///gen/", "hdfs:///sort/"], }, "step_id": "terasort", "prerequisite_step_ids": ["teragen"], }, ], "placement": { "managed_cluster": { "cluster_name": "my-managed-cluster", "config": { "gce_cluster_config": { # Leave 'zone_uri' empty for 'Auto Zone Placement' # 'zone_uri': '' "zone_uri": "us-central1-a" } }, } }, } # Submit the request to instantiate the workflow from an inline template. operation = workflow_template_client.instantiate_inline_workflow_template( request={"parent": parent, "template": template} ) operation.result() # Output a success message. print("Workflow ran successfully.") # Example usage: # instantiate_inline_workflow_template('your-project-id', 'your-region') ```