### Get Source Source: https://airbytehq.github.io/PyAirbyte/airbyte/sources/util.html Initializes and returns an Airbyte Source instance, handling installation and execution methods. ```APIDOC ## GET /api/source ### Description Initializes and returns an Airbyte Source instance. It automatically selects an appropriate execution method (YAML manifest, PyPI, or Docker) if not explicitly specified. ### Method GET ### Endpoint /api/source ### Parameters #### Query Parameters - **name** (string) - Required - The name of the connector. - **config** (dict) - Optional - The configuration for the connector. If not provided, it must be set later using `set_config`. - **config_change_callback** (ConfigChangeCallback) - Optional - A callback function to be invoked when the connector's configuration changes. - **streams** (string | list[string]) - Optional - A list of stream names to select. Use "*" for all streams. If not provided, streams can be selected later using `select_streams()` or `select_all_streams()`. - **version** (string) - Optional - The connector version. Defaults to the currently installed version or the latest available if none is installed. Can be set to "latest" to force the use of the latest version. - **use_python** (bool | Path | str) - Optional - Specifies the Python interpreter. `True` uses the current interpreter (inferred if `pip_url` is set). `False` uses Docker. A `Path` specifies the interpreter path. A `str` specifies a Python version (e.g., "3.11"), which will be installed by uv if not present. - **pip_url** (string) - Optional - The pip URL for the connector. If not provided, it's inferred from the connector name. - **local_executable** (Path | str) - Optional - Path to an already installed connector executable. If set, the connector is assumed to be installed and will be executed using this path. Otherwise, the connector will be installed in a virtual environment. - **docker_image** (bool | str) - Optional - If set, the connector will be executed using Docker. `True` uses the default image. A string specifies a custom image name. If `version` is also specified and the image name lacks a tag, the version will be appended as a tag (e.g., `my-image:0.1.0`). - **use_host_network** (boolean) - Optional - Whether to use the host's network. Defaults to false. - **source_manifest** (bool | dict | Path | str) - Optional - Specifies the source manifest. Can be `True` to use the default manifest, a `dict` for inline manifest, or a `Path`/`str` to a manifest file. - **install_if_missing** (boolean) - Optional - Whether to automatically install the connector if it's missing. Defaults to true. - **install_root** (Path) - Optional - The root directory for connector installations. - **no_executor** (boolean) - Optional - If true, prevents the automatic selection or creation of an executor. Defaults to false. ### Request Example ```json { "name": "source-stripe", "config": { "api_key": "sk_test_..." }, "version": "0.1.0", "streams": ["customers", "charges"] } ``` ### Response #### Success Response (200) - **Source** (object) - An instance of the Airbyte Source. #### Response Example ```json { "source_instance": "" } ``` ``` -------------------------------- ### Get Available Connectors Source: https://airbytehq.github.io/PyAirbyte/airbyte/registry.html Returns a sorted list of available connector names. Filters based on install type and Docker availability. Defaults to installable connectors. ```python def get_available_connectors( install_type: InstallType | str | None = InstallType.INSTALLABLE, ) -> list[str]: """Return a list of all available connectors. Connectors will be returned in alphabetical order, with the standard prefix "source-". Args: install_type: The type of installation for the connector. Defaults to `InstallType.INSTALLABLE`. """ if install_type is None or install_type == InstallType.INSTALLABLE: # Filter for installable connectors (default behavior). if is_docker_installed(): logger.info("Docker is detected. Returning all connectors.") return sorted(_get_registry_cache().keys()) logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") return sorted( [ connector_name for connector_name, conn_info in _get_registry_cache().items() if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY} ] ) if not isinstance(install_type, InstallType): install_type = InstallType(install_type) if install_type == InstallType.PYTHON: return sorted( connector_name for connector_name, conn_info in _get_registry_cache().items() if conn_info.pypi_package_name is not None ) if install_type == InstallType.JAVA: warnings.warn( ``` -------------------------------- ### Get Available Connectors Source: https://airbytehq.github.io/PyAirbyte/airbyte/registry.html Returns a list of all available connectors, optionally filtered by installation type. Connectors are returned in alphabetical order. ```APIDOC ## GET /connectors ### Description Returns a list of all available connectors, optionally filtered by installation type. ### Method GET ### Endpoint /connectors ### Parameters #### Query Parameters - **install_type** (InstallType | str | None) - Optional - The type of installation for the connector. Defaults to `InstallType.INSTALLABLE`. ### Response #### Success Response (200) - **list[str]** - A list of connector names. #### Response Example ```json [ "source-airbyte-example", "source-postgres", "destination-snowflake" ] ``` ``` -------------------------------- ### Get Source Source: https://airbytehq.github.io/PyAirbyte/airbyte/sources.html Retrieves a connector by name and version, with options for installation and execution methods. ```APIDOC ## GET /api/v1/sources/get ### Description Get a connector by name and version. If an explicit install or execution method is requested (e.g. `local_executable`, `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. Otherwise, an appropriate method will be selected based on the available connector metadata. ### Method GET ### Endpoint /api/v1/sources/get ### Parameters #### Query Parameters - **name** (str) - Required - connector name - **config** (dict[str, typing.Any] | None) - Optional - connector config - if not provided, you need to set it later via the set_config method. - **config_change_callback** (Callable[[dict[str, typing.Any]], None] | None) - Optional - callback function to be called when the connector config changes. - **streams** (str | list[str] | None) - Optional - list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the `select_streams()` or `select_all_streams()` method. - **version** (str | None) - Optional - connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version. - **use_python** (bool | pathlib.Path | str | None) - Optional - Python interpreter specification: True: Use current Python interpreter. (Inferred if `pip_url` is set.) False: Use Docker instead. Path: Use interpreter at this path. str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.) - **pip_url** (str | None) - Optional - connector pip URL - if not provided, the pip url will be inferred from the connector name. - **local_executable** (pathlib.Path | str | None) - Optional - If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment. - **docker_image** (bool | str | None) - Optional - If set, the connector will be executed using Docker. You can specify `True` - **use_host_network** (bool) - Optional - Default: False - Use host network for Docker execution. - **source_manifest** (bool | dict | pathlib.Path | str | None) - Optional - If set, the connector will be executed using the provided source manifest. - **install_if_missing** (bool) - Optional - Default: True - Install the connector if it is missing. - **install_root** (pathlib.Path | None) - Optional - The root directory for installing the connector. - **no_executor** (bool) - Optional - Default: False - Do not use an executor for the connector. ### Response #### Success Response (200) - **Source** (Source) - The configured Source object. #### Response Example ```json { "source_instance": "" } ``` ``` -------------------------------- ### Get Available Connectors Source: https://airbytehq.github.io/PyAirbyte/airbyte/sources/registry.html Retrieves a list of all available connectors. Connectors are returned in alphabetical order, prefixed with 'source-'. The behavior varies based on Docker installation and the specified install_type. ```python def get_available_connectors( install_type: InstallType | str | None = InstallType.INSTALLABLE, ) -> list[str]: """Return a list of all available connectors. Connectors will be returned in alphabetical order, with the standard prefix "source-". Args: install_type: The type of installation for the connector. Defaults to `InstallType.INSTALLABLE`. """ if install_type is None or install_type == InstallType.INSTALLABLE: # Filter for installable connectors (default behavior). if is_docker_installed(): logger.info("Docker is detected. Returning all connectors.") return sorted(_get_registry_cache().keys()) logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") return sorted( [ connector_name for connector_name, conn_info in _get_registry_cache().items() if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY} ] ) if not isinstance(install_type, InstallType): install_type = InstallType(install_type) if install_type == InstallType.PYTHON: return sorted( connector_name for connector_name, conn_info in _get_registry_cache().items() if conn_info.pypi_package_name is not None ) if install_type == InstallType.JAVA: warnings.warn( message="Java connectors are not yet supported.", stacklevel=2, ) return sorted( connector_name for connector_name, conn_info in _get_registry_cache().items() if conn_info.language == Language.JAVA ) if install_type in {InstallType.DOCKER, InstallType.ANY}: return sorted(_get_registry_cache().keys()) if install_type == InstallType.YAML: return sorted( conn.name for conn in _get_registry_cache().values() if InstallType.YAML in conn.install_types ) # pragma: no cover # Should never be reached. raise exc.PyAirbyteInputError( message="Invalid install type.", context={ "install_type": install_type, }, ) ``` -------------------------------- ### List Available Airbyte Connectors Source: https://airbytehq.github.io/PyAirbyte/airbyte/mcp/registry.html Use this function to get a list of all available Airbyte connectors. It supports filtering by keyword, connector type (source/destination), and installation type (python, yaml, java, docker). If no install types are specified, all connectors are returned. ```python from airbyte.api import list_connectors # List all connectors all_connectors = list_connectors() # List only source connectors source_connectors = list_connectors(connector_type_filter="source") # List connectors that can be installed via YAML yaml_connectors = list_connectors(install_types="yaml") # List connectors with 'postgres' in their name postgres_connectors = list_connectors(keyword_filter="postgres") ``` -------------------------------- ### Failed Job Statuses Example Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud/constants.html An example showing the `FAILED_STATUSES` set with specific `JobStatusEnum` values. ```python FAILED_STATUSES: set[airbyte_api.models.jobstatusenum.JobStatusEnum] = {, } The set of `.JobStatusEnum` strings that indicate a sync job has failed. ``` -------------------------------- ### Define Connector Installation Types Source: https://airbytehq.github.io/PyAirbyte/airbyte/registry.html Defines the different types of installations supported for connectors, such as YAML, Python, Docker, and Java. Use these enums to specify or check how a connector can be installed. ```python class InstallType(str, Enum): """The type of installation for a connector.""" YAML = "yaml" """Manifest-only connectors that can be run without Docker.""" PYTHON = "python" """Python-based connectors available via PyPI.""" DOCKER = "docker" """Docker-based connectors (returns all connectors for backward compatibility).""" JAVA = "java" """Java-based connectors.""" INSTALLABLE = "installable" """Connectors installable in the current environment (environment-sensitive). Returns all connectors if Docker is installed, otherwise only Python and YAML. """ ANY = "any" """All connectors in the registry (environment-independent).""" ``` -------------------------------- ### Get Sync Start Time Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud.html Retrieves the start time of the synchronization job. ```python @property def start_time(self) -> datetime: ``` -------------------------------- ### Deploy and Manage Source with CloudWorkspace Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud/workspaces.html Example demonstrating how to initialize a CloudWorkspace, deploy a source, run a check, and permanently delete the source. ```python import airbyte as ab from airbyte import cloud workspace = cloud.CloudWorkspace( workspace_id="...", client_id="...", client_secret="...", ) # Deploy a source to the workspace source = ab.get_source("source-faker", config={"count": 100}) deployed_source = workspace.deploy_source( name="test-source", source=source, ) # Run a check on the deployed source and raise an exception if the check fails check_result = deployed_source.check(raise_on_error=True) # Permanently delete the newly-created source workspace.permanently_delete_source(deployed_source) ``` -------------------------------- ### Get Connector Metadata Source: https://airbytehq.github.io/PyAirbyte/airbyte/mcp/registry.html Retrieves metadata for a given connector, including installation and configuration specification. Requires the connector to be installed. ```python with contextlib.suppress(Exception): connector_metadata = get_connector_metadata(connector_name) config_spec_jsonschema: dict[str, Any] | None = None with contextlib.suppress(Exception): # This requires running the connector. Install it if it isn't already installed. connector.install() config_spec_jsonschema = connector.config_spec manifest_url = _DEFAULT_MANIFEST_URL.format( source_name=connector_name, version="latest", ) return ConnectorInfo( connector_name=connector.name, connector_metadata=connector_metadata, documentation_url=connector.docs_url, config_spec_jsonschema=config_spec_jsonschema, manifest_url=manifest_url, ) ``` -------------------------------- ### Configure Airbyte Cloud Client using Environment Variables Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud/client_config.html This example shows how to configure the client by resolving credentials from environment variables. Ensure AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET are set. ```python from airbyte.cloud.client_config import CloudClientConfig # Resolves from AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, ``` -------------------------------- ### Initialize CacheBase Source: https://airbytehq.github.io/PyAirbyte/airbyte/caches.html Initializes the cache, sets up SQL processors, and configures catalog and state backends. Ensure necessary configurations are provided. ```python class CacheBase(SqlConfig, AirbyteWriterInterface): # noqa: PLR0904 """Base configuration for a cache. Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings and basic connectivity to the SQL database. The cache is responsible for managing the state of the data synced to the cache, including the stream catalog and stream state. The cache also provides the mechanism to read and write data to the SQL backend specified in the `SqlConfig` class. """ cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) """The directory to store the cache in.""" cleanup: bool = TEMP_FILE_CLEANUP """Whether to clean up the cache after use.""" _name: str = PrivateAttr() _sql_processor_class: ClassVar[type[SqlProcessorBase]] _read_processor: SqlProcessorBase = PrivateAttr() _catalog_backend: CatalogBackendBase = PrivateAttr() _state_backend: StateBackendBase = PrivateAttr() paired_destination_name: ClassVar[str | None] = None paired_destination_config_class: ClassVar[type | None] = None @property def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type """Return a dictionary of destination configuration values.""" raise NotImplementedError( f"The type '{type(self).__name__}' does not define an equivalent destination " "configuration." ) def __init__(self, **data: Any) -> None: # noqa: ANN401 """Initialize the cache and backends.""" super().__init__(**data) # Create a temporary processor to do the work of ensuring the schema exists temp_processor = self._sql_processor_class( sql_config=self, catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), state_writer=StdOutStateWriter(), temp_dir=self.cache_dir, temp_file_cleanup=self.cleanup, ) temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member # Initialize the catalog and state backends self._catalog_backend = SqlCatalogBackend( sql_config=self, table_prefix=self.table_prefix or "", ) self._state_backend = SqlStateBackend( sql_config=self, table_prefix=self.table_prefix or "", ) # Now we can create the SQL read processor self._read_processor = self._sql_processor_class( sql_config=self, catalog_provider=self._catalog_backend.get_full_catalog_provider(), state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor temp_dir=self.cache_dir, temp_file_cleanup=self.cleanup, ) ``` -------------------------------- ### Colab Cache Usage Source: https://airbytehq.github.io/PyAirbyte/airbyte.html Examples of how to get and configure the Colab cache. ```APIDOC ## Colab Cache Usage ### Description Examples of how to get and configure the Colab cache. ### Default Cache ```python from airbyte.caches.colab import get_colab_cache colab_cache = get_colab_cache() ``` ### Custom Cache ```python custom_cache = get_colab_cache( cache_name="my_custom_cache", sub_dir="Airbyte/custom_cache", drive_name="My Company Drive", ) ``` ``` -------------------------------- ### BigQueryCache Initialization Source: https://airbytehq.github.io/PyAirbyte/airbyte/caches/bigquery.html Example of how to initialize the BigQueryCache with project, dataset, and credentials. ```APIDOC ## BigQueryCache Initialization ### Description Initializes the BigQuery cache with necessary configuration details. ### Method Constructor ### Endpoint N/A (Class Initialization) ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Request Example ```python import airbyte as ab from airbyte.caches import BigQueryCache cache = BigQueryCache( project_name="myproject", dataset_name="mydataset", credentials_path="path/to/credentials.json", ) ``` ### Response #### Success Response (200) N/A (Constructor) #### Response Example N/A ``` -------------------------------- ### Get Job Start Time Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud/sync_results.html Retrieves the start time of the sync job in UTC. Handles potential ISO format errors by attempting to fetch raw job info. ```python @property def start_time(self) -> datetime: """Return the start time of the sync job in UTC.""" try: return ab_datetime_parse(self._fetch_latest_job_info().start_time) except (ValueError, TypeError) as e: if "Invalid isoformat string" in str(e): job_info_raw = api_util._make_config_api_request( # noqa: SLF001 api_root=self.workspace.api_root, path="/jobs/get", json={"id": self.job_id}, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, bearer_token=self.workspace.bearer_token, ) raw_start_time = job_info_raw.get("startTime") if raw_start_time: return ab_datetime_parse(raw_start_time) raise ``` -------------------------------- ### CloudWorkspace Configuration and Initialization Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud.html Demonstrates how to initialize a CloudWorkspace instance using environment variables or explicit arguments. ```APIDOC ## CloudWorkspace Initialization ### Description Initializes a CloudWorkspace instance, automatically resolving credentials and workspace ID from environment variables or explicit arguments. ### Environment Variables - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). ### Arguments - **workspace_id** (str | None): The workspace ID. If not provided, will be resolved from the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. - **api_root** (str | None): The API root URL. If not provided, will be resolved from the `AIRBYTE_CLOUD_API_URL` environment variable, or default to the Airbyte Cloud API. ### Returns A `CloudWorkspace` instance configured with credentials from the environment. ### Raises - `PyAirbyteSecretNotFoundError`: If required credentials are not found in the environment. ### Example ```python # With workspace_id from environment workspace = CloudWorkspace.from_env() # With explicit workspace_id workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") ``` ``` -------------------------------- ### Example Usage of Get Connector Version History Source: https://airbytehq.github.io/PyAirbyte/airbyte/registry.html Demonstrates how to call `get_connector_version_history` and iterate through the results to print version and release dates. This example shows a practical application of the function. ```python >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3) >>> for v in versions[:5]: ... print(f"{v.version}: {v.release_date}") ``` -------------------------------- ### Initialize Cloud Workspace with Client Credentials Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud/workspaces.html Instantiate the `CloudWorkspace` class using your workspace ID, client ID, and client secret for authentication. ```python workspace = CloudWorkspace( workspace_id="...", client_id="...", client_secret="...", ) ``` -------------------------------- ### Get Start Time Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud/sync_results.html Retrieves the start time of the sync job in UTC. It attempts to parse the start time from the latest job info and includes error handling for invalid date formats, potentially fetching raw job info if parsing fails. ```python @property def start_time(self) -> datetime: """Return the start time of the sync job in UTC.""" try: return ab_datetime_parse(self._fetch_latest_job_info().start_time) except (ValueError, TypeError) as e: if "Invalid isoformat string" in str(e): job_info_raw = api_util._make_config_api_request( # noqa: SLF001 api_root=self.workspace.api_root, path="/jobs/get", json={"id": self.job_id}, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, bearer_token=self.workspace.bearer_token, ) raw_start_time = job_info_raw.get("startTime") ``` -------------------------------- ### Run Sync Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud.html This section describes how to initiate a data synchronization (sync) for an Airbyte connection. ```APIDOC ## POST /connections/{connection_id}/sync ### Description Initiates a data synchronization job for the specified Airbyte connection. Optionally waits for the job to complete. ### Method POST ### Endpoint `/connections/{connection_id}/sync` ### Parameters #### Query Parameters - **wait** (bool) - Optional - If true, the request will wait for the sync job to complete. Defaults to true. - **wait_timeout** (int) - Optional - The maximum time in seconds to wait for the sync job to complete. Defaults to 300 seconds. ### Request Body This endpoint does not require a request body. ### Response #### Success Response (200) - **SyncResult** (object) - An object containing the result of the sync operation, including job ID and status. #### Response Example ```json { "job_id": "123e4567-e89b-12d3-a456-426614174000", "status": "completed" } ``` ### Raises - **AirbyteWorkspaceMismatchError:** If the connection belongs to a different workspace. - **AirbyteMissingResourceError:** If the connection doesn't exist. ``` -------------------------------- ### Get Connector Source: https://airbytehq.github.io/PyAirbyte/airbyte/sources/util.html Retrieves a connector by name and version, with options for specifying installation and execution methods. ```APIDOC ## Get Connector ### Description Get a connector by name and version. If an explicit install or execution method is requested (e.g. `local_executable`, `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. Otherwise, an appropriate method will be selected based on the available connector metadata. ### Method GET (Conceptual - this describes a function call, not a direct HTTP endpoint) ### Endpoint N/A (This is a Python function) ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Request Example ```python from pyairbyte.api import PyAirbyte # Example usage: connector = PyAirbyte().get_connector( name="source-faker", config={...}, streams="*", version="0.1.0", use_python=True, pip_url="git+https://github.com/airbytehq/airbyte.git#egg=source-faker", local_executable="/path/to/executable", docker_image="airbyte/source-faker:latest", use_host_network=False, source_manifest={"type": "faker"}, install_if_missing=True, install_root="/tmp/pyairbyte_install", no_executor=False ) ``` ### Response #### Success Response (200) - **Source** (object) - An object representing the connector. #### Response Example ```json { "name": "source-faker", "version": "0.1.0", "config": { ... }, "streams": "*" } ``` ### Error Handling - If the connector is not found or cannot be installed/executed, an appropriate exception will be raised. ``` -------------------------------- ### Initialize BigQuery Cache Source: https://airbytehq.github.io/PyAirbyte/airbyte/caches/bigquery.html Instantiate the BigQueryCache with project, dataset, and credentials path. Ensure the credentials file exists and is accessible. ```python import airbyte as ab from airbyte.caches import BigQueryCache cache = BigQueryCache( project_name="myproject", dataset_name="mydataset", credentials_path="path/to/credentials.json", ) ``` -------------------------------- ### CloudWorkspace Initialization Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud/workspaces.html Demonstrates how to initialize the CloudWorkspace class with different authentication methods. ```APIDOC ## CloudWorkspace Initialization ### Description Initialize the CloudWorkspace to interact with Airbyte Cloud or self-managed instances. ### Authentication Methods Two methods are supported (mutually exclusive): 1. OAuth2 client credentials (client_id + client_secret) 2. Bearer token authentication ### Example with Client Credentials ```python workspace = CloudWorkspace( workspace_id="...", client_id="...", client_secret="...", ) ``` ### Example with Bearer Token ```python workspace = CloudWorkspace( workspace_id="...", bearer_token="...", ) ``` ### Example with Self-Managed Instance ```python workspace = CloudWorkspace( workspace_id="...", api_root="http://localhost:8000", client_id="...", client_secret="...", ) ``` ``` -------------------------------- ### Get Sync Job Start Time Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud.html Retrieves the start time of the sync job in UTC. Includes error handling for invalid date formats and a fallback to a raw API request if necessary. ```python @property def start_time(self) -> datetime: """Return the start time of the sync job in UTC.""" try: return ab_datetime_parse(self._fetch_latest_job_info().start_time) except (ValueError, TypeError) as e: if "Invalid isoformat string" in str(e): job_info_raw = api_util._make_config_api_request( # noqa: SLF001 api_root=self.workspace.api_root, path="/jobs/get", json={"id": self.job_id}, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, bearer_token=self.workspace.bearer_token, ) raw_start_time = job_info_raw.get("startTime") if raw_start_time: return ab_datetime_parse(raw_start_time) ``` -------------------------------- ### Dataset Initialization and Configuration Source: https://airbytehq.github.io/PyAirbyte/airbyte/datasets.html Explains how dataset objects are initialized, including how stream configurations are handled, either retrieved from cache or explicitly set. ```APIDOC ## Dataset Initialization ### Description Initializes a dataset object, potentially retrieving stream configuration from a cache or skipping it. ### Method `__init__` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **cache** (object) - Required - The cache object containing data and schema information. - **stream_name** (str) - Required - The name of the stream to associate with this dataset. - **query_statement** (object) - Required - The SQL query statement to select data. - **stream_configuration** (object or bool or None) - Optional - Configuration for the stream. If None, attempts to retrieve from cache. If False, skips retrieval. ### Request Example ```json { "cache": "", "stream_name": "users", "query_statement": "", "stream_configuration": null } ``` ### Response #### Success Response (200) None (initialization method) #### Response Example None ``` -------------------------------- ### Get Source API Source: https://airbytehq.github.io/PyAirbyte/airbyte/experimental.html Retrieves a connector by name and version, with options for specifying installation and execution methods. ```APIDOC ## GET /api/sources ### Description Retrieves a connector by name and version. If an explicit install or execution method is requested, the connector will be executed using that method. Otherwise, an appropriate method will be selected based on available connector metadata. ### Method GET ### Endpoint /api/sources ### Parameters #### Query Parameters - **name** (string) - Required - The name of the connector. - **config** (object) - Optional - The connector configuration. If not provided, it needs to be set later via the `set_config` method. - **config_change_callback** (function) - Optional - A callback function to be called when the connector config changes. - **streams** (string or list[string]) - Optional - A list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, it can be set later via the `select_streams()` or `select_all_streams()` method. - **version** (string) - Optional - The connector version. If not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. Can also be set to "latest" to force the use of the latest available version. - **use_python** (boolean or path or string) - Optional - Python interpreter specification: True for current interpreter, False for Docker, Path for a specific interpreter path, or a string for a specific Python version (e.g., "3.11"). - **pip_url** (string) - Optional - The connector pip URL. If not provided, it will be inferred from the connector name. - **local_executable** (path or string) - Optional - Path to a local executable for the connector. - **docker_image** (boolean or string) - Optional - Specifies whether to use a Docker image for the connector. - **use_host_network** (boolean) - Optional - Whether to use the host network for Docker containers. Defaults to False. - **source_manifest** (boolean or object or path or string) - Optional - Specifies whether to use a source manifest. - **install_if_missing** (boolean) - Optional - Whether to install the connector if it's missing. Defaults to True. - **install_root** (path) - Optional - The root directory for installation. - **no_executor** (boolean) - Optional - If True, no executor will be used. ### Response #### Success Response (200) - **source** (object) - The connector source object. #### Response Example ```json { "source": "" } ``` ``` -------------------------------- ### Deploy Source to Airbyte Cloud Workspace Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud/workspaces.html Initialize a CloudWorkspace with your credentials and deploy a source to it. The deployed source can then be checked for errors. ```python import airbyte as ab from airbyte import cloud workspace = cloud.CloudWorkspace( workspace_id="...", client_id="...", client_secret="...", ) # Deploy a source to the workspace source = ab.get_source("source-faker", config={"count": 100}) deployed_source = workspace.deploy_source( name="test-source", source=source, ) # Run a check on the deployed source and raise an exception if the check fails check_result = deployed_source.check(raise_on_error=True) ``` -------------------------------- ### Cache Initialization and Configuration Source: https://airbytehq.github.io/PyAirbyte/airbyte/caches/base.html Details on how the cache is initialized, including its directory, cleanup settings, and the underlying SQL processor and backends. ```APIDOC ## Cache Class Overview ### Description The cache is responsible for managing the state of the data synced to the cache, including the stream catalog and stream state. It provides mechanisms to read and write data to the SQL backend specified in the `SqlConfig` class. ### Class Attributes - **cache_dir** (Path) - The directory to store the cache in. - **cleanup** (bool) - Whether to clean up the cache after use. - **_sql_processor_class** (ClassVar[type[SqlProcessorBase]]) - The SQL processor class to use. - **paired_destination_name** (ClassVar[str | None]) - The name of the paired destination. - **paired_destination_config_class** (ClassVar[type | None]) - The configuration class for the paired destination. ### Initialization (`__init__`) Initializes the cache and backends. It creates a temporary processor to ensure the schema exists, then initializes the catalog and state backends, and finally creates the read-only SQL processor. ### Context Manager Support - **`__enter__`**: Enters the context manager, returning the cache instance. - **`__exit__`**: Exits the context manager and cleans up resources by calling `close()`. ### Resource Management - **`close()`**: Closes all database connections and disposes of connection pools. This method is idempotent. - **`__del__`**: Cleans up resources when the cache is garbage collected, suppressing exceptions. ### Other Methods - **`paired_destination_config`**: Returns a dictionary of destination configuration values. Raises `NotImplementedError` if not defined by the subclass. - **`config_hash`**: Returns a hash of the cache configuration. - **`execute_sql`**: Executes one or more SQL statements against the cache's SQL backend within a single transaction. ``` -------------------------------- ### Get Destination Source: https://airbytehq.github.io/PyAirbyte/airbyte/destinations/util.html Retrieves a destination connector by its name and version, with options for configuration, execution environment, and installation. ```APIDOC ## GET /api/v1/destinations/get ### Description Retrieves a destination connector by its name and version. This function allows for detailed configuration of how the destination should be executed, including specifying the Python interpreter, Docker image, network settings, and installation behavior. ### Method GET ### Endpoint /api/v1/destinations/get ### Parameters #### Query Parameters - **name** (string) - Required - The name of the connector. - **config** (dict[str, Any]) - Optional - The configuration for the connector. If not provided, it must be set later using the `set_config` method. - **config_change_callback** (ConfigChangeCallback) - Optional - A callback function to be invoked when the connector's configuration changes. - **version** (string) - Optional - The version of the connector to use. If not provided, the currently installed version is used. If no version is installed, the latest available version is used. Can be set to "latest" to force the use of the latest version. - **use_python** (bool | Path | str) - Optional - Specifies the Python interpreter to use. `True` uses the current interpreter (inferred if `pip_url` is set). `False` uses Docker. A `Path` specifies a direct interpreter path. A `str` specifies a Python version (e.g., "3.11"), which will be installed if missing. - **pip_url** (string) - Optional - The pip URL for the connector. If not provided, it's inferred from the connector name. - **local_executable** (Path | str) - Optional - If set, assumes the connector is already installed and uses this path for execution. - **docker_image** (str | bool) - Optional - If set, executes the connector using Docker. `True` uses the default image. A custom image name can be provided. If `version` is specified and the image name lacks a tag, the version will be appended as a tag. - **use_host_network** (bool) - Optional - If `True` and `docker_image` is set, uses the host network. Ignored if `docker_image` is not set. - **install_if_missing** (bool) - Optional - If `True`, installs the connector if it's not available locally. Ignored when `local_executable` is set. - **install_root** (Path) - Optional - The root directory for creating the virtual environment. Defaults to the current working directory. - **no_executor** (bool) - Optional - If `True`, uses `NoOpExecutor` to fetch specs from the registry without local installation. Useful for configuration validation before deployment. ### Response #### Success Response (200) - **Destination** (Destination) - An instance of the Destination class configured for execution. #### Response Example ```json { "destination_instance": "" } ``` ``` -------------------------------- ### Initialize PostgresCache Source: https://airbytehq.github.io/PyAirbyte/airbyte/caches/postgres.html Instantiate the PostgresCache with connection details. Ensure the POSTGRES_PASSWORD environment variable is set for secure password management. ```python from airbyte as ab from airbyte.caches import PostgresCache cache = PostgresCache( host="myhost", port=5432, username="myusername", password=ab.get_secret("POSTGRES_PASSWORD"), database="mydatabase", ) ``` -------------------------------- ### Get Job Details and Configuration Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud/sync_results.html Access detailed job information, including start time, job URL, and destination configuration. ```APIDOC ## GET /jobs/{job_id} ### Description Retrieves comprehensive details about an Airbyte sync job, including its start time and associated URLs. ### Method GET ### Endpoint /jobs/{job_id} ### Parameters #### Path Parameters - **job_id** (string) - Required - The unique identifier of the job. ### Response #### Success Response (200) - **job_id** (string) - The unique identifier of the job. - **start_time** (string) - The start time of the sync job in ISO 8601 format. - **job_url** (string) - The URL to the job history in the Airbyte Cloud UI. - **connection_info** (object) - Information about the connection associated with the job. - **destination_id** (string) - The ID of the destination. - **destination_configuration** (object) - The configuration details for the destination. #### Response Example { "job_id": "123e4567-e89b-12d3-a456-426614174000", "start_time": "2023-10-27T10:00:00Z", "job_url": "https://cloud.airbyte.com/workspaces/your-workspace-id/connections/your-connection-id/job-history", "connection_info": { "destination_id": "dest-abcde" }, "destination_configuration": { "host": "your-db-host", "database": "your-database" } } ``` -------------------------------- ### Get Destination Connector Source: https://airbytehq.github.io/PyAirbyte/airbyte/destinations.html Retrieves a destination connector by name and version. Supports various installation methods including Python interpreter, pip URL, local executables, and Docker images. Use `no_executor=True` to fetch specs without local installation. ```python get_connector_executor( name=name, version=version, use_python=use_python, pip_url=pip_url, local_executable=local_executable, docker_image=docker_image, use_host_network=use_host_network, install_if_missing=install_if_missing, install_root=install_root, no_executor=no_executor, ) ``` ```python return Destination( name=name, config=config, config_change_callback=config_change_callback, executor=executor, ) ``` -------------------------------- ### CloudWorkspace Initialization Source: https://airbytehq.github.io/PyAirbyte/airbyte/cloud/workspaces.html Demonstrates how to initialize a CloudWorkspace object, either by inferring the workspace ID from environment variables or by providing it explicitly. ```APIDOC ## CloudWorkspace Initialization ### Description Initialize a `CloudWorkspace` object. The `workspace_id` can be automatically inferred from the environment or provided explicitly. ### Usage ```python # With workspace_id from environment workspace = CloudWorkspace.from_env() # With explicit workspace_id workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") ``` ``` -------------------------------- ### Get Job Log Source: https://airbytehq.github.io/PyAirbyte/airbyte/mcp/cloud.html Retrieves the log text for a specific job attempt, with options to limit the number of lines and specify the starting point. ```APIDOC ## GET /api/v1/jobs/{job_id}/attempts/{attempt_number}/logs ### Description Retrieves the log text for a specific job attempt. Allows for limiting the number of log lines returned and specifying the starting line or tail. ### Method GET ### Endpoint /api/v1/jobs/{job_id}/attempts/{attempt_number}/logs ### Path Parameters - **job_id** (integer) - Required - The ID of the job. - **attempt_number** (integer) - Required - The attempt number for the job. ### Query Parameters - **max_lines** (integer) - Optional - The maximum number of log lines to return. If 0, all lines are returned. - **from_tail** (integer) - Optional - If specified, return the last `from_tail` lines of the log. - **line_offset** (integer) - Optional - If specified, return lines starting from this offset (0-based index). ### Response #### Success Response (200) - **LogReadResult** - An object containing the log text and related metadata. - **log_text** (string) - The retrieved log text. - **log_text_start_line** (integer) - The 1-based starting line number of the returned log text. - **log_text_line_count** (integer) - The number of lines returned in `log_text`. - **total_log_lines_available** (integer) - The total number of log lines available for the attempt. - **job_id** (integer) - The ID of the job. - **attempt_number** (integer) - The attempt number. #### Response Example ```json { "log_text": "INFO: Starting sync...\nDEBUG: Processing record 1...\nINFO: Sync completed.", "log_text_start_line": 1, "log_text_line_count": 3, "total_log_lines_available": 150, "job_id": 123, "attempt_number": 1 } ``` #### Error Response (404) - **message** (string) - "Log not found for job {job_id}, attempt {attempt_number}" ### Response Example ```json { "message": "Log not found for job 123, attempt 1" } ``` ``` -------------------------------- ### Source State Provider Initialization Source: https://airbytehq.github.io/PyAirbyte/airbyte/destinations.html Combines cache state provider and destination state provider into a single JoinedStateProvider. ```python source_state_provider: StateProviderBase source_state_provider = JoinedStateProvider( primary=cache_state_provider, secondary=destination_state_provider, ) ``` -------------------------------- ### Create BigQuery Client with Credentials Source: https://airbytehq.github.io/PyAirbyte/airbyte/caches/bigquery.html Instantiates a BigQuery Python client. Use this when you have a service account key file. Otherwise, it falls back to default Google Cloud credentials. ```python def get_vendor_client(self) -> bigquery.Client: """Return a BigQuery python client.""" if self.credentials_path: credentials = service_account.Credentials.from_service_account_file( self.credentials_path ) else: credentials, _ = google.auth.default() return bigquery.Client(credentials=credentials, location=self.dataset_location) ``` -------------------------------- ### Get No-Op Destination Source: https://airbytehq.github.io/PyAirbyte/airbyte/destinations/util.html Retrieves a no-operation destination, useful for performance benchmarking without writing data to a real destination. Supports conditional installation. ```python def get_noop_destination(install_if_missing: bool = True) -> Destination: """Get a devnull (no-op) destination. This is useful for performance benchmarking of sources, without adding the overhead of writing data to a real destination. """ return get_destination( "destination-dev-null", config={ "test_destination": { "test_destination_type": "SILENT", } }, docker_image=True, install_if_missing=install_if_missing, ) ``` -------------------------------- ### Enter Context Manager Source: https://airbytehq.github.io/PyAirbyte/airbyte/caches/base.html Enters the context manager for the SQL cache. ```python def __enter__(self) -> Self: """Enter context manager.""" return self ```