### Setup Sample DataFrames Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.lateralJoin.html Initializes sample 'customers' and 'orders' DataFrames for demonstrating lateral joins. This setup is required before running the join examples. ```python from pyspark.sql import functions as sf from pyspark.sql import Row customers_data = [ Row(customer_id=1, name="Alice"), Row(customer_id=2, name="Bob"), Row(customer_id=3, name="Charlie"), Row(customer_id=4, name="Diana") ] customers = spark.createDataFrame(customers_data) orders_data = [ Row(order_id=101, customer_id=1, order_date="2024-01-10", items=[Row(product="laptop", quantity=5), Row(product="mouse", quantity=12)]), Row(order_id=102, customer_id=1, order_date="2024-02-15", items=[Row(product="phone", quantity=2), Row(product="charger", quantity=15)]), Row(order_id=105, customer_id=1, order_date="2024-03-20", items=[Row(product="tablet", quantity=4)]), Row(order_id=103, customer_id=2, order_date="2024-01-12", items=[Row(product="tablet", quantity=8)]), Row(order_id=104, customer_id=2, order_date="2024-03-05", items=[Row(product="laptop", quantity=7)]), Row(order_id=106, customer_id=3, order_date="2024-04-05", items=[Row(product="monitor", quantity=1)]), ] orders = spark.createDataFrame(orders_data) ``` -------------------------------- ### Setup Sample DataFrame Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.scalar.html Sets up a sample DataFrame for subsequent examples. Ensure SparkSession is available. ```python data = [ (1, "Alice", 45000, 101), (2, "Bob", 54000, 101), (3, "Charlie", 29000, 102), (4, "David", 61000, 102), (5, "Eve", 48000, 101), ] employees = spark.createDataFrame(data, ["id", "name", "salary", "department_id"]) ``` -------------------------------- ### Start a thread with inheritable_thread_target wrapper Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.inheritable_thread_target.html This example demonstrates how to start a new thread using the `inheritable_thread_target` wrapper to mimic JVM thread behavior closely. ```python >>> Thread(target=inheritable_thread_target(target_func)).start() ``` -------------------------------- ### Arrow Batch Data Source Example Setup Source: https://spark.apache.org/docs/latest/api/python/_sources/tutorial/sql/python_data_source.rst.txt This example demonstrates the basic structure for a Python DataSource that supports direct Arrow Batch serialization for improved performance. It includes necessary imports and class definition. ```python from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition from pyspark.sql import SparkSession import pyarrow as pa # Define the ArrowBatchDataSource class ArrowBatchDataSource(DataSource): """ A Data Source for testing Arrow Batch Serialization """ ``` -------------------------------- ### Starting a Stream with Trigger and Options Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/streaming/readwriter.html Starts a streaming query with a specified processing time trigger and other options like `queryName`, `outputMode`, and `format`. This example demonstrates writing to a memory sink with a 5-second processing interval. ```python df = spark.readStream.format("rate").load() q = df.writeStream.trigger(processingTime='5 seconds').start( queryName='that_query', outputMode="append", format='memory') print(q.name) print(q.isActive) q.stop() ``` -------------------------------- ### Setup Sample Data for Customers and Orders Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.exists.html Sets up two sample DataFrames, 'customers' and 'orders', for use in subsequent examples. This involves creating PySpark DataFrames from Python lists. ```python data_customers = [ (101, "Alice", "USA"), (102, "Bob", "Canada"), (103, "Charlie", "USA"), (104, "David", "Australia") ] data_orders = [ (1, 101, "2023-01-15", 250), (2, 102, "2023-01-20", 300), (3, 103, "2023-01-25", 400), (4, 101, "2023-02-05", 150) ] customers = spark.createDataFrame( data_customers, ["customer_id", "customer_name", "country"]) orders = spark.createDataFrame( data_orders, ["order_id", "customer_id", "order_date", "total_amount"]) ``` -------------------------------- ### Start a thread with SparkSession and inheritable_thread_target for Spark Connect Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.inheritable_thread_target.html This example shows how to start a thread with the `inheritable_thread_target` wrapper, explicitly passing the Spark session for proper inheritance of tags, especially when using Spark Connect. ```python >>> Thread(target=inheritable_thread_target(session)(target_func)).start() ``` -------------------------------- ### Cache and Refresh Table Example Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Catalog.refreshTable.html Demonstrates caching a table, inserting data, showing the cached data, and then refreshing the table to reflect changes (or lack thereof). This example requires SparkSession and temporary directory setup. ```python import tempfile with tempfile.TemporaryDirectory(prefix="refreshTable") as d: _ = spark.sql("DROP TABLE IF EXISTS tbl1") _ = spark.sql( "CREATE TABLE tbl1 (col STRING) USING TEXT LOCATION '{}'".format(d)) _ = spark.sql("INSERT INTO tbl1 SELECT 'abc'") spark.catalog.cacheTable("tbl1") spark.table("tbl1").show() ``` ```python spark.table("tbl1").count() ``` ```python spark.catalog.refreshTable("tbl1") spark.table("tbl1").count() ``` -------------------------------- ### Basic Input Partition Example Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/datasource.html Demonstrates a simple implementation of returning a list of input partitions. ```python >>> def partitions(self): ... return [InputPartition(1)] ``` -------------------------------- ### Generate DatetimeIndex excluding the start boundary Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.date_range.html This example uses `inclusive='right'` to exclude the start date if it falls on a boundary, generating a DatetimeIndex that includes the end date but not the start date. ```python >>> ps.date_range( ... start='2017-01-01', end='2017-01-04', inclusive='right' ... ) DatetimeIndex(['2017-01-02', '2017-01-03', '2017-01-04'], dtype='datetime64[ns]', freq=None) ``` -------------------------------- ### Install PySpark with Extra Dependencies Source: https://spark.apache.org/docs/latest/api/python/getting_started/install.html Install PySpark with specific component dependencies. For example, install Spark SQL, pandas API on Spark (with Plotly for data visualization), or Spark Connect. ```bash # Spark SQL pip install pyspark[sql] ``` ```bash # pandas API on Spark pip install pyspark[pandas_on_spark] plotly # to plot your data, you can install plotly together. ``` ```bash # Spark Connect pip install pyspark[connect] ``` -------------------------------- ### Generate DatetimeIndex by specifying start and periods Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.date_range.html This example demonstrates generating a DatetimeIndex using a start date and the number of periods (days). ```python >>> ps.date_range(start='1/1/2018', periods=8) DatetimeIndex(['2018-01-01', '2018-01-02', '2018-01-03', '2018-01-04', '2018-01-05', '2018-01-06', '2018-01-07', '2018-01-08'], dtype='datetime64[ns]', freq=None) ``` -------------------------------- ### Basic Partitioning with Rate Source Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.partitionBy.html Demonstrates basic partitioning using the 'value' column from the rate source. This is a simple example to show the method's usage. ```python >>> df = spark.readStream.format("rate").load() >>> df.writeStream.partitionBy("value") <...streaming.readwriter.DataStreamWriter object ...> ``` -------------------------------- ### Series values Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.values.html Example of getting NumPy representation for a Series. ```Python >>> ps.Series([1, 2, 3]).values array([1, 2, 3]) ``` -------------------------------- ### Create DataFrame for examples Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.GroupedData.agg.html Initializes a DataFrame with sample data for demonstrating aggregation functions. ```python import pandas as pd from pyspark.sql import functions as sf df = spark.createDataFrame( [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"]) df.show() ``` -------------------------------- ### Generate DatetimeIndex by specifying start and end dates Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.date_range.html This example shows how to generate a DatetimeIndex by providing the start and end dates. The default frequency is daily. ```python >>> ps.date_range(start='1/1/2018', end='1/08/2018') DatetimeIndex(['2018-01-01', '2018-01-02', '2018-01-03', '2018-01-04', '2018-01-05', '2018-01-06', '2018-01-07', '2018-01-08'], dtype='datetime64[ns]', freq=None) ``` -------------------------------- ### Input Partition Example with Strings Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/datasource.html Illustrates returning a list of InputPartition objects, each containing a string value. ```python >>> def partitions(self): ... return [InputPartition('a'), InputPartition('b'), InputPartition('c')] ``` -------------------------------- ### Get Month Names from DatetimeIndex Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DatetimeIndex.month_name.html This example demonstrates how to get the month names from a DatetimeIndex using the month_name() method. The default locale (English) is used. ```Python import pyspark.pandas as ps idx = ps.date_range(start='2018-01', freq='ME', periods=3) idx.month_name() ``` -------------------------------- ### Create DataFrame for examples Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumnsRenamed.html Initializes a DataFrame with 'age' and 'name' columns for subsequent examples. ```python df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) ``` -------------------------------- ### Recover Partitions Example Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Catalog.recoverPartitions.html This example demonstrates how to create a partitioned table, write data to it, and then use `recoverPartitions` to update the catalog. It shows the table's state before and after recovering partitions. This method only works with partitioned tables, not views. ```python import tempfile with tempfile.TemporaryDirectory(prefix="recoverPartitions") as d: _ = spark.sql("DROP TABLE IF EXISTS tbl1") spark.range(1).selectExpr( "id as key", "id as value").write.partitionBy("key").mode("overwrite").save(d) _ = spark.sql( "CREATE TABLE tbl1 (key LONG, value LONG)" "USING parquet OPTIONS (path \"{}\") PARTITIONED BY (key)".format(d)) spark.table("tbl1").show() spark.catalog.recoverPartitions("tbl1") spark.table("tbl1").show() >>> _ = spark.sql("DROP TABLE tbl1") ``` -------------------------------- ### input_file_block_start Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html Gets the start offset of the current input file block. ```APIDOC ## input_file_block_start ### Description Returns the start offset of the block being read, or -1 if not available. ### Signature `input_file_block_start`() ``` -------------------------------- ### Initialize SparkSession in PowerShell Source: https://spark.apache.org/docs/latest/api/python/_sources/getting_started/quickstart_ps.ipynb.txt This snippet shows how to create a SparkSession, which is the entry point to Spark functionality. Ensure Spark is installed and configured in your environment. ```powershell $spark = New-Object -TypeName Apache.Spark.Sql.SparkSession.Builder -Method NewSession ``` -------------------------------- ### Basic PySpark Streaming Example Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.start.html Loads data from a 'rate' source and starts a streaming query to a 'memory' sink. This is a fundamental example for initiating a streaming process. ```python >>> df = spark.readStream.format("rate").load() >>> q = df.writeStream.format('memory').queryName('this_query').start() >>> q.isActive True >>> q.name 'this_query' >>> q.stop() >>> q.isActive False ``` -------------------------------- ### Find indices between two times (excluding start time) Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DatetimeIndex.indexer_between_time.html This example demonstrates using `indexer_between_time` to find index locations between '00:01' and '00:02', but excludes the start time. ```Python psidx.indexer_between_time("00:01", "00:02", include_start=False) ``` -------------------------------- ### Start a streaming query Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.awaitTermination.html This snippet demonstrates how to set up and start a streaming query using the 'rate' format and 'memory' sink. This is a prerequisite for using `awaitTermination`. ```python sdf = spark.readStream.format("rate").load() sq = sdf.writeStream.format('memory').queryName('query_awaitTermination').start() ``` -------------------------------- ### Create DataFrame and Column Instances Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.html Demonstrates how to create a DataFrame and then select or create Column instances from it. ```python df = spark.createDataFrame( [(2, "Alice"), (5, "Bob")], ["age", "name"]) # Select a column out of a DataFrame df.name df["name"] # Create from an expression df.age + 1 1 / df.age ``` -------------------------------- ### Calculate Date Difference (Start - End) Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.datediff.html Shows how to calculate the difference when the start date is later than the end date, resulting in a positive value. Imports are handled by the previous example. ```python df.select('*', sf.datediff(df.d2, df.d1)).show() ``` -------------------------------- ### KernelDensity Example Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.stat.KernelDensity.html Demonstrates how to initialize KernelDensity, set sample data, and estimate probability density at given points. ```python >>> kd = KernelDensity() >>> sample = sc.parallelize([0.0, 1.0]) >>> kd.setSample(sample) >>> kd.estimate([0.0, 1.0]) array([ 0.12938758, 0.12938758]) ``` -------------------------------- ### Get Input File Block Start Offset with input_file_block_start Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions/builtin.html Returns the starting offset of the current block being read from an input file. This function is useful for precise data location tracking within Spark. Returns -1 if the block start offset is not available. ```python from pyspark.sql import functions as sf df = spark.read.text("python/test_support/sql/ages_newlines.csv", lineSep=",") df.select(sf.input_file_block_start()).show() ``` -------------------------------- ### Series values with object dtype Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.values.html Example of getting NumPy representation for a Series with object dtype. ```Python >>> ps.Series(list('aabc')).values array(['a', 'a', 'b', 'c'], dtype=object) ``` -------------------------------- ### Create, List, and Drop Temporary View Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.catalog.html Demonstrates the process of creating a temporary view, listing all tables to confirm its creation, and then dropping the temporary view. ```python >>> spark.range(1).createTempView("test_view") >>> spark.catalog.listTables() [Table(name='test_view', catalog=None, namespace=[], description=None, ... >>> _ = spark.catalog.dropTempView("test_view") ``` -------------------------------- ### PCA Initialization and Usage Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.PCA.html Demonstrates how to initialize PCA, set parameters, fit a DataFrame, and transform data. ```APIDOC ## PCA Initialization and Usage ### Description Initializes a PCA transformer, sets its parameters, fits it to a dataset, and transforms the data. ### Parameters - **k** (int, optional) - The number of principal components to keep. Defaults to None. - **inputCol** (string, optional) - The name of the input column containing feature vectors. - **outputCol** (string, optional) - The name of the output column for the transformed vectors. ### Example ```python from pyspark.ml.linalg import Vectors data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)] df = spark.createDataFrame(data,["features"]) pca = PCA(k=2, inputCol="features") pca.setOutputCol("pca_features") model = pca.fit(df) model.transform(df).collect()[0].pca_features ``` ``` -------------------------------- ### Overlay string with specified length Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.overlay.html Replaces a portion of the source string with the replacement string, specifying both the starting position and the number of bytes to replace. This example replaces 2 bytes starting from position 7. ```python >>> df.select("*", sf.overlay("x", "y", 7, 2)).show() +---------+----+-------------------+ | x| y|overlay(x, y, 7, 2)| +---------+----+-------------------+ |SPARK_SQL|CORE| SPARK_COREL| +---------+----+-------------------+ ``` -------------------------------- ### PowerIterationClustering Example Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.PowerIterationClustering.html Demonstrates how to initialize PowerIterationClustering, set parameters, run the algorithm, and display cluster assignments. It also shows how to save and load the model. ```python >>> data = [(1, 0, 0.5), ... (2, 0, 0.5), (2, 1, 0.7), ... (3, 0, 0.5), (3, 1, 0.7), (3, 2, 0.9), ... (4, 0, 0.5), (4, 1, 0.7), (4, 2, 0.9), (4, 3, 1.1), ... (5, 0, 0.5), (5, 1, 0.7), (5, 2, 0.9), (5, 3, 1.1), (5, 4, 1.3)] >>> df = spark.createDataFrame(data).toDF("src", "dst", "weight").repartition(1) >>> pic = PowerIterationClustering(k=2, weightCol="weight") >>> pic.setMaxIter(40) PowerIterationClustering... >>> assignments = pic.assignClusters(df) >>> assignments.sort(assignments.id).show(truncate=False) +---+ |id |cluster| +---+ |0 |0 | |1 |0 | |2 |0 | |3 |0 | |4 |0 | |5 |1 | +---+ >>> pic_path = temp_path + "/pic" >>> pic.save(pic_path) >>> pic2 = PowerIterationClustering.load(pic_path) >>> pic2.getK() 2 >>> pic2.getMaxIter() 40 >>> pic2.assignClusters(df).take(6) == assignments.take(6) True ``` -------------------------------- ### Default Input Partition Implementation Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.datasource.InputPartition.html Demonstrates how to use the default InputPartition implementation by returning an instance of it from the partitions() method. ```python >>> def partitions(self): ... return [InputPartition(1)] ``` -------------------------------- ### Getting index size example Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/pandas/indexes/base.html Demonstrates how to retrieve the number of elements in a pyspark.pandas DataFrame's index. ```python df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], columns=['dogs', 'cats'], index=list('abcd')) df.index.size ``` -------------------------------- ### Option Class Example Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/pandas/config.html Demonstrates how to create and validate an Option instance. This is useful for understanding how configuration parameters are defined and checked. ```python >>> option = Option( ... key='option.name', ... doc="this is a test option", ... default="default", ... types=(float, int), ... check_func=(lambda v: v > 0, "should be a positive float")) >>> option.validate('abc') # doctest: +NORMALIZE_WHITESPACE Traceback (most recent call last): ... TypeError: The value for option 'option.name' was ; however, expected types are [(, )]. >>> option.validate(-1.1) Traceback (most recent call last): ... ValueError: should be a positive float >>> option.validate(1.1) ``` -------------------------------- ### Create DataFrame Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.assign.html Initializes a pyspark.pandas DataFrame with a single column 'temp_c'. This is a setup step for subsequent examples. ```python >>> df = ps.DataFrame({'temp_c': [17.0, 25.0]}, ... index=['Portland', 'Berkeley']) >>> df ``` -------------------------------- ### Initial DataFrame Partitions Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.coalesce.html Demonstrates the initial partitioning of a DataFrame before applying coalesce. This example sets up a DataFrame with 3 partitions. ```python from pyspark.sql import functions as sf spark.range(0, 10, 1, 3).select( sf.spark_partition_id().alias("partition") ).distinct().sort("partition").show() ``` -------------------------------- ### Train FPGrowth Model and Save/Load Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/fpm.html Demonstrates training an FPGrowth model with specified minimum support and partitions, then saving and loading the model to verify its integrity. ```python >>> data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]] >>> rdd = sc.parallelize(data, 2) >>> model = FPGrowth.train(rdd, 0.6, 2) >>> sorted(model.freqItemsets().collect()) [FreqItemset(items=['a'], freq=4), FreqItemset(items=['c'], freq=3), ... >>> model_path = temp_path + "/fpm" >>> model.save(sc, model_path) >>> sameModel = FPGrowthModel.load(sc, model_path) >>> sorted(model.freqItemsets().collect()) == sorted(sameModel.freqItemsets().collect()) True ``` -------------------------------- ### KernelDensity Initialization and Usage Example Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/stat/KernelDensity.html Demonstrates how to initialize KernelDensity, set sample data, and estimate probability densities. Ensure 'sc' is an active SparkContext. ```python # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from typing import Iterable, Optional import numpy as np from numpy import ndarray from pyspark.mllib.common import callMLlibFunc from pyspark.core.rdd import RDD class KernelDensity: """ Estimate probability density at required points given an RDD of samples from the population. Examples -------- >>> kd = KernelDensity() >>> sample = sc.parallelize([0.0, 1.0]) >>> kd.setSample(sample) >>> kd.estimate([0.0, 1.0]) array([ 0.12938758, 0.12938758]) """ def __init__(self) -> None: self._bandwidth: float = 1.0 self._sample: Optional[RDD[float]] = None def setBandwidth(self, bandwidth: float) -> None: """Set bandwidth of each sample. Defaults to 1.0""" self._bandwidth = bandwidth def setSample(self, sample: RDD[float]) -> None: """Set sample points from the population. Should be a RDD""" if not isinstance(sample, RDD): raise TypeError("samples should be a RDD, received %s" % type(sample)) self._sample = sample def estimate(self, points: Iterable[float]) -> ndarray: """Estimate the probability density at points""" points = list(points) densities = callMLlibFunc("estimateKernelDensity", self._sample, self._bandwidth, points) return np.asarray(densities) ``` -------------------------------- ### Get Spark Start Time Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/core/context.html Retrieves the epoch time when the SparkContext was initialized. Available since version 1.5.0. ```python >>> _ = sc.startTime ``` -------------------------------- ### Partitioning an RDD with partitionBy Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.partitionBy.html Demonstrates how to partition an RDD of key-value pairs using `partitionBy`. The example creates an RDD, partitions it into 2 partitions, and then collects the partitions to verify that elements are distributed. It checks if there's any overlap between the elements in the first two partitions, expecting none for a successful partition. ```python >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) >>> sets = pairs.partitionBy(2).glom().collect() >>> len(set(sets[0]).intersection(set(sets[1]))) == 0 True ``` -------------------------------- ### Create a DataFrame Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.groupby.SeriesGroupBy.nsmallest.html Initializes a pyspark.pandas DataFrame for use in subsequent examples. This setup is common for demonstrating groupby operations. ```Python >>> df = ps.DataFrame({'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) ``` -------------------------------- ### Reading from Text Source with Specified Path Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/streaming/readwriter.html Shows how to read data from a text file source. This example first writes a temporary text file and then sets up a streaming query to read it, writing the output to the console. The query runs for 3 seconds. ```python import tempfile import time with tempfile.TemporaryDirectory(prefix="format") as d: # Write a temporary text file to read it. spark.createDataFrame( [("hello",), ("this",)]).write.mode("overwrite").format("text").save(d) # Start a streaming query to read the text file. q = spark.readStream.format("text").load(d).writeStream.format("console").start() time.sleep(3) q.stop() ``` -------------------------------- ### Pandas UDF Example Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions/builtin.html Defines and uses a Pandas UDF for scalar operations. Ensure pandas is installed and configured for PySpark. ```Python from pyspark.sql.functions import udf, PandasUDFType import pandas as pd @udf(returnType=IntegerType()) def pd_calc(a: pd.Series, b: pd.Series) -> pd.Series: return a + 10 * b pd_calc.evalType == PandasUDFType.SCALAR spark.range(2).select(pd_calc(b=col("id") * 10, a="id")).show() ``` -------------------------------- ### Get Current Timestamp (now) Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions/builtin.html Returns the current timestamp at the start of query evaluation. This is an alternative to `current_timestamp` available from Spark 3.5.0. ```python from pyspark.sql import functions as sf spark.range(1).select(sf.now()).show(truncate=False) ``` -------------------------------- ### Create a DataFrame Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.all.html Creates a sample DataFrame for demonstrating the .all() method. This setup is required before using the .all() method. ```python import pyspark.pandas as ps df = ps.DataFrame({ 'col1': [True, True, True], 'col2': [True, False, False], 'col3': [0, 0, 0], 'col4': [1, 2, 3], 'col5': [True, True, None], 'col6': [True, False, None]}, columns=['col1', 'col2', 'col3', 'col4', 'col5', 'col6']) ``` -------------------------------- ### LDA Example Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.LDA.html Demonstrates initializing LDA, setting parameters, fitting a model to data, and inspecting the results. It covers model saving and loading for both LDA and its distributed/local model variants. ```python from pyspark.ml.linalg import Vectors, SparseVector from pyspark.ml.clustering import LDA df = spark.createDataFrame([[1, Vectors.dense([0.0, 1.0])], [2, SparseVector(2, {0: 1.0})],], ["id", "features"]) lda = LDA(k=2, seed=1, optimizer="em") lda.setMaxIter(10) lda.getMaxIter() lda.clear(lda.maxIter) model = lda.fit(df) model.setSeed(1) model.getTopicDistributionCol() model.isDistributed() localModel = model.toLocal() localModel.isDistributed() model.vocabSize() model.describeTopics().show() model.topicsMatrix() lda_path = temp_path + "/lda" lda.save(lda_path) sameLDA = LDA.load(lda_path) distributed_model_path = temp_path + "/lda_distributed_model" model.save(distributed_model_path) sameModel = DistributedLDAModel.load(distributed_model_path) local_model_path = temp_path + "/lda_local_model" localModel.save(local_model_path) sameLocalModel = LocalLDAModel.load(local_model_path) model.transform(df).take(1) == sameLocalModel.transform(df).take(1) ``` -------------------------------- ### Basic Plotly.js Setup Source: https://spark.apache.org/docs/latest/api/python/_sources/getting_started/quickstart_ps.ipynb.txt This snippet shows the fundamental HTML structure and JavaScript inclusion needed to start using Plotly.js. ```html Plotly.js Quickstart
``` -------------------------------- ### PySpark curdate() Function Example Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions/builtin.html Returns the current date at the start of query evaluation. All calls within the same query return the same value. ```python import pyspark.sql.functions as sf spark.range(1).select(sf.curdate()).show() # doctest: +SKIP ``` -------------------------------- ### Initialize and Configure GaussianMixture Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.GaussianMixture.html Demonstrates how to create a GaussianMixture instance, set parameters like k (number of clusters), tolerance, and seed, and how to modify parameters like maxIter. ```python >>> from pyspark.ml.linalg import Vectors >>> data = [(Vectors.dense([-0.1, -0.05 ]),), ... (Vectors.dense([-0.01, -0.1]),), ... (Vectors.dense([0.9, 0.8]),), ... (Vectors.dense([0.75, 0.935]),), ... (Vectors.dense([-0.83, -0.68]),), ... (Vectors.dense([-0.91, -0.76]),)] >>> df = spark.createDataFrame(data, ["features"]) >>> gm = GaussianMixture(k=3, tol=0.0001, seed=10) >>> gm.getMaxIter() 100 >>> gm.setMaxIter(30) GaussianMixture... >>> gm.getMaxIter() 30 ``` -------------------------------- ### Generate DatetimeIndex including both boundaries Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.date_range.html This example shows the default behavior of `inclusive='both'`, where both the start and end dates are included in the generated DatetimeIndex. ```python >>> ps.date_range( ... start='2017-01-01', end='2017-01-04', inclusive="both" ... ) DatetimeIndex(['2017-01-01', '2017-01-02', '2017-01-03', '2017-01-04'], dtype='datetime64[ns]', freq=None) ``` -------------------------------- ### Get DataStreamWriter Instance Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.writeStream.html Demonstrates how to access the DataStreamWriter interface from a streaming DataFrame. This is the entry point for configuring and starting a streaming query. ```python >>> import time >>> import tempfile >>> df = spark.readStream.format("rate").load() >>> type(df.writeStream) ``` -------------------------------- ### MinMaxScaler Initialization and Usage Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.MinMaxScaler.html Demonstrates how to initialize MinMaxScaler, set input and output columns, fit the data, and transform it. It also shows how to access original min and max values from the fitted model. ```APIDOC ## MinMaxScaler Initialization and Usage ### Description This example shows how to use the `MinMaxScaler` to rescale features in a DataFrame. It covers initialization, setting parameters, fitting the scaler to data, and transforming the data. It also demonstrates saving and loading the scaler and its fitted model. ### Parameters - `min` (float, optional): The lower bound of the output range. Defaults to 0.0. - `max` (float, optional): The upper bound of the output range. Defaults to 1.0. - `inputCol` (string, optional): The name of the input column. - `outputCol` (string, optional): The name of the output column. ### Example ```python from pyspark.ml.linalg import Vectors df = spark.createDataFrame([(Vectors.dense([0.0]),), (Vectors.dense([2.0]),)], ["a"]) mmScaler = MinMaxScaler(outputCol="scaled") mmScaler.setInputCol("a") model = mmScaler.fit(df) model.setOutputCol("scaledOutput") print(model.originalMin) print(model.originalMax) model.transform(df).show() # Saving and Loading minMaxScalerPath = temp_path + "/min-max-scaler" mmScaler.save(minMaxScalerPath) loadedMMScaler = MinMaxScaler.load(minMaxScalerPath) print(loadedMMScaler.getMin() == mmScaler.getMin()) print(loadedMMScaler.getMax() == mmScaler.getMax()) modelPath = temp_path + "/min-max-scaler-model" model.save(modelPath) loadedModel = MinMaxScalerModel.load(modelPath) print(loadedModel.originalMin == model.originalMin) print(loadedModel.originalMax == model.originalMax) print(loadedModel.transform(df).take(1) == model.transform(df).take(1)) ``` ``` -------------------------------- ### Initialize and Use RankingMetrics Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.evaluation.RankingMetrics.html Demonstrates how to initialize RankingMetrics with an RDD of predictions and labels, and then compute various ranking evaluation metrics like precision, mean average precision, NDCG, and recall at different ranks. ```python predictionAndLabels = sc.parallelize([ ([1, 6, 2, 7, 8, 3, 9, 10, 4, 5], [1, 2, 3, 4, 5]), ([4, 1, 5, 6, 2, 7, 3, 8, 9, 10], [1, 2, 3]), ([1, 2, 3, 4, 5], [])]) metrics = RankingMetrics(predictionAndLabels) metrics.precisionAt(1) metrics.precisionAt(5) metrics.precisionAt(15) metrics.meanAveragePrecision metrics.meanAveragePrecisionAt(1) metrics.meanAveragePrecisionAt(2) metrics.ndcgAt(3) metrics.ndcgAt(10) metrics.recallAt(1) metrics.recallAt(5) metrics.recallAt(15) ``` -------------------------------- ### Get a function by name Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Catalog.getFunction.html Use `getFunction` to retrieve a function object by its name. This example first creates a temporary function and then retrieves it. ```python >>> _ = spark.sql( ... "CREATE FUNCTION my_func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") >>> spark.catalog.getFunction("my_func1") Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ... ``` -------------------------------- ### Get Row Slice with Single Column Label Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.loc.html Retrieves a slice of rows and a single column, including both start and stop labels. ```python df.loc['cobra':'viper', 'max_speed'] ``` -------------------------------- ### SQLTransformer Example Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.SQLTransformer.html Demonstrates creating a SQLTransformer, applying it to a DataFrame, and saving/loading the transformer. Use this to define custom transformations using SQL. ```python >>> df = spark.createDataFrame([(0, 1.0, 3.0), (2, 2.0, 5.0)], ["id", "v1", "v2"]) >>> sqlTrans = SQLTransformer( ... statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__") >>> sqlTrans.transform(df).head() Row(id=0, v1=1.0, v2=3.0, v3=4.0, v4=3.0) >>> sqlTransformerPath = temp_path + "/sql-transformer" >>> sqlTrans.save(sqlTransformerPath) >>> loadedSqlTrans = SQLTransformer.load(sqlTransformerPath) >>> loadedSqlTrans.getStatement() == sqlTrans.getStatement() True >>> loadedSqlTrans.transform(df).take(1) == sqlTrans.transform(df).take(1) True ``` -------------------------------- ### Initialize Spark Session and DataFrame Source: https://spark.apache.org/docs/latest/api/python/_sources/getting_started/quickstart_ps.ipynb.txt This snippet demonstrates how to create a SparkSession and a DataFrame. Ensure Spark is installed and configured. ```python from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() data = [("Alice", 1), ("Bob", 2)] columns = ["name", "id"] df = spark.createDataFrame(data, columns) df.show() ``` -------------------------------- ### DataFrame.spark.hint Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.spark.hint.html Specifies a hint on the current DataFrame. This can be used to guide Spark's query optimizer, for example, by suggesting a broadcast join. ```APIDOC ## DataFrame.spark.hint ### Description Specifies some hint on the current DataFrame. ### Parameters - **name** (string) - A name of the hint (e.g., "broadcast"). - **parameters** (any) - Optional parameters for the hint. ### Returns - **ret** (DataFrame) - The DataFrame with the hint applied. ### Example ```python >>> df1 = ps.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'], ... 'value': [1, 2, 3, 5]}, ... columns=['lkey', 'value']).set_index('lkey') >>> df2 = ps.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'], ... 'value': [5, 6, 7, 8]}, ... columns=['rkey', 'value']).set_index('rkey') >>> merged = df1.merge(df2.spark.hint("broadcast"), left_index=True, right_index=True) >>> merged.spark.explain() == Physical Plan == ...BroadcastHashJoin... ``` ``` -------------------------------- ### LinearSVC Usage Example Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LinearSVC.html Demonstrates how to instantiate, configure, train, and use a LinearSVC model. Includes setting parameters, fitting the model to data, making predictions, and saving/loading the model. ```python from pyspark.sql import Row from pyspark.ml.linalg import Vectors df = sc.parallelize([ Row(label=1.0, features=Vectors.dense(1.0, 1.0, 1.0)), Row(label=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF() svm = LinearSVC() svm.getMaxIter() svm.setMaxIter(5) svm.getMaxIter() svm.getRegParam() svm.setRegParam(0.01) svm.getRegParam() model = svm.fit(df) model.setPredictionCol("newPrediction") model.getPredictionCol() model.setThreshold(0.5) model.getThreshold() model.getMaxBlockSizeInMB() model.coefficients model.intercept model.numClasses model.numFeatures test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, -1.0, -1.0))]).toDF() model.predict(test0.head().features) model.predictRaw(test0.head().features) result = model.transform(test0).head() result.newPrediction result.rawPrediction svm_path = temp_path + "/svm" svm.save(svm_path) svm2 = LinearSVC.load(svm_path) svm2.getMaxIter() model_path = temp_path + "/svm_model" model.save(model_path) model2 = LinearSVCModel.load(model_path) bool(model.coefficients[0] == model2.coefficients[0]) model.intercept == model2.intercept model.transform(test0).take(1) == model2.transform(test0).take(1) ``` -------------------------------- ### PySpark current_time() Function Example (Specified Precision) Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions/builtin.html Returns the current time at the start of query evaluation with a specified precision for fractional digits of seconds. ```python from pyspark.sql import functions as sf spark.range(1).select(sf.current_time(3).alias("time")).show() # doctest: +SKIP ``` -------------------------------- ### Manage Catalog and Views Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/session.html Use the Catalog interface to manage databases, tables, and views. This example demonstrates creating, listing, and dropping a temporary view. ```python >>> spark.catalog <...Catalog object ...> Create a temp view, show the list, and drop it. >>> spark.range(1).createTempView("test_view") >>> spark.catalog.listTables() # doctest: +SKIP [Table(name='test_view', catalog=None, namespace=[], description=None, ... >>> _ = spark.catalog.dropTempView("test_view") ``` -------------------------------- ### Generate DatetimeIndex with auto-generated frequency Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.date_range.html This example generates a DatetimeIndex by specifying start, end, and periods, allowing the frequency to be determined automatically (linearly spaced). ```python >>> ps.date_range( ... start='2018-04-24', end='2018-04-27', periods=3 ... ) DatetimeIndex(['2018-04-24 00:00:00', '2018-04-25 12:00:00', '2018-04-27 00:00:00'], dtype='datetime64[ns]', freq=None) ``` -------------------------------- ### DataFrame.between_time Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.between_time.html Select values between particular times of the day (example: 9:00-9:30 AM). By setting `start_time` to be later than `end_time`, you can get the times that are _not_ between the two times. ```APIDOC ## DataFrame.between_time ### Description Select values between particular times of the day. This method filters the DataFrame to include rows within a specified time range. It can also be used to select times outside a given range by reversing the start and end times. ### Method DataFrame.between_time ### Parameters #### Parameters * **start_time** (datetime.time or str) - Initial time as a time filter limit. * **end_time** (datetime.time or str) - End time as a time filter limit. * **inclusive** ({“both”, “neither”, “left”, “right”}, default “both”) - Include boundaries; whether to set each bound as closed or open. New in version 4.0.0. * **axis** ({0 or ‘index’, 1 or ‘columns’}, default 0) - Determine range time on index or columns value. ### Returns * **DataFrame** - Data from the original object filtered to the specified dates range. ### Raises * **TypeError** - If the index is not a `DatetimeIndex` ### See Also * `at_time` - Select values at a particular time of the day. * `first` - Select initial periods of time series based on a date offset. * `last` - Select final periods of time series based on a date offset. * `DatetimeIndex.indexer_between_time` - Get just the index locations for values between particular times of the day. ### Examples ```python >>> idx = pd.date_range('2018-04-09', periods=4, freq='1D20min') >>> psdf = ps.DataFrame({'A': [1, 2, 3, 4]}, index=idx) >>> psdf A 2018-04-09 00:00:00 1 2018-04-10 00:20:00 2 2018-04-11 00:40:00 3 2018-04-12 01:00:00 4 ``` ```python >>> psdf.between_time('0:15', '0:45') A 2018-04-10 00:20:00 2 2018-04-11 00:40:00 3 ``` You get the times that are _not_ between two times by setting `start_time` later than `end_time`: ```python >>> psdf.between_time('0:45', '0:15') A 2018-04-09 00:00:00 1 2018-04-12 01:00:00 4 ``` ``` -------------------------------- ### Initialize DataFrames for where() examples Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.where.html Sets up two DataFrames, `df1` and `df2`, which are used in subsequent examples to demonstrate the `where` functionality. It also configures a pandas-on-Spark option. ```Python from pyspark.pandas.config import set_option, reset_option import pyspark.pandas as ps set_option("compute.ops_on_diff_frames", True) df1 = ps.DataFrame({'A': [0, 1, 2, 3, 4], 'B':[100, 200, 300, 400, 500]}) df2 = ps.DataFrame({'A': [0, -1, -2, -3, -4], 'B':[-100, -200, -300, -400, -500]}) df1 ``` -------------------------------- ### Import necessary libraries for Pandas UDFs Source: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html Before using Pandas UDFs, import pandas and the pandas_udf function. This is a standard setup for most Pandas UDF examples. ```python import pandas as pd from pyspark.sql.functions import pandas_udf ``` -------------------------------- ### Create DataFrame for Sorting Examples Source: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/pandas/frame.html Initializes a pandas-like DataFrame with specified columns and index for demonstrating sorting functionalities. ```python df = ps.DataFrame({ 'col1': ['A', 'B', None, 'D', 'C'], 'col2': [2, 9, 8, 7, 4], 'col3': [0, 9, 4, 2, 3], }, columns=['col1', 'col2', 'col3'], index=['a', 'b', 'c', 'd', 'e']) ```