### Install Keboola Query Service Python SDK Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Installs the Keboola Query Service Python SDK using pip. This is the first step to integrating with the Keboola Query Service API. ```bash pip install keboola-query-service ``` -------------------------------- ### Execute Query using Context Manager (Simplified) in Python Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md A simplified example demonstrating the use of a context manager for the `Client`. The context manager ensures the client is automatically closed upon exiting the `with` block, simplifying resource management. ```python from keboola_query_service import Client # Context manager automatically closes the client with Client(base_url="https://query.keboola.com", token="...") as client: results = client.execute_query( branch_id="1261313", workspace_id="2950146661", statements=["SELECT 1 as test"] ) print(results[0].data) # [['1']] ``` -------------------------------- ### Submit and Monitor Job Status with Python SDK Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Provides examples of using the low-level API to submit a query job without waiting for completion, check its status, wait for it to finish, and retrieve results for a specific statement. This offers more granular control over job execution. ```python # Submit job without waiting job_id = client.submit_job( branch_id="123", workspace_id="456", statements=["SELECT * FROM large_table"] ) # Check status status = client.get_job_status(job_id) print(f"Status: {status.status}") # created, enqueued, processing, completed, failed # Wait for completion final_status = client.wait_for_job(job_id, max_wait_time=300) # Get results for specific statement result = client.get_job_results(job_id, final_status.statements[0].id) ``` -------------------------------- ### Manage Query Job Status and Results Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Provides examples of methods for managing the lifecycle of submitted query jobs. This includes checking the job status with `get_job_status()`, retrieving the final results with `get_job_results()`, and waiting for completion with `wait_for_job()`. ```python # Assuming 'client' is an initialized Client object and 'job_id' is known # status = client.get_job_status(job_id=job_id) # results = client.get_job_results(job_id=job_id) # client.wait_for_job(job_id=job_id) ``` -------------------------------- ### Retrieve Query History using Python SDK Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Shows how to fetch historical query execution records for a specific workspace using the Keboola Query Service Python SDK. The example covers retrieving recent history, paginating through results, and accessing details of each statement. ```python from keboola_query_service import Client with Client(base_url="https://query.keboola.com", token="your-token") as client: # Get recent query history history = client.get_query_history( branch_id="1261313", workspace_id="2950146661", page_size=100 ) print(f"Retrieved {len(history.statements)} historical statements") for stmt in history.statements: print(f"\nJob: {stmt.query_job_id}") print(f"Statement ID: {stmt.id}") print(f"Query: {stmt.query[:80]}...") print(f"Status: {stmt.status}") print(f"Warehouse: {stmt.warehouse}") print(f"Backend size: {stmt.backend_size}") print(f"Rows affected: {stmt.rows_affected}") print(f"Created: {stmt.created_at}") print(f"Executed: {stmt.executed_at}") print(f"Completed: {stmt.completed_at}") if stmt.error: print(f"Error: {stmt.error}") # Paginate through history all_statements = [] after_id = None while True: page = client.get_query_history( branch_id="1261313", workspace_id="2950146661", after_id=after_id, page_size=500 ) all_statements.extend(page.statements) if len(page.statements) < 500: break after_id = page.statements[-1].id print(f"Total historical statements: {len(all_statements)}") ``` -------------------------------- ### Get Job Status Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Retrieve the current status and progress of a submitted job. Allows inspection of individual statement statuses and error details. ```APIDOC ## GET /jobs/{job_id}/status ### Description Retrieves the status and details of a specific job. ### Method GET ### Endpoint /jobs/{job_id}/status ### Parameters #### Path Parameters - **job_id** (string) - Required - The ID of the job to check. ### Response #### Success Response (200) - **query_job_id** (string) - The ID of the job. - **status** (string) - The current status of the job (e.g., created, enqueued, processing, completed, failed, canceled). - **created_at** (string) - Timestamp when the job was created. - **changed_at** (string) - Timestamp when the job status was last changed. - **statements** (array of objects) - Details about each statement within the job. - **id** (string) - The ID of the statement. - **query** (string) - The SQL query executed. - **status** (string) - The status of the individual statement. - **rows_affected** (integer) - The number of rows affected by the statement. - **number_of_rows** (integer) - The number of rows returned by the statement. - **error** (object or null) - Error details if the statement failed. #### Response Example ```json { "query_job_id": "job_12345678", "status": "completed", "created_at": "2023-10-27T10:00:00Z", "changed_at": "2023-10-27T10:05:00Z", "statements": [ { "id": "stmt_1", "query": "SELECT * FROM large_dataset WHERE year = 2024", "status": "completed", "rows_affected": 100, "number_of_rows": 100, "error": null } ] } ``` ``` -------------------------------- ### Get Job Results Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Retrieves query results for a specific statement within a job, supporting pagination for large datasets. Provides access to data, column metadata, and total row count. ```APIDOC ## Get Job Results ### Description Retrieve query results for a specific statement with pagination. ### Method (Implied by SDK function `get_job_results`) ### Endpoint (Internal to SDK, interacts with job results endpoints) ### Parameters #### Path Parameters None #### Query Parameters - **query_job_id** (string) - Required - The ID of the job. - **statement_id** (string) - Required - The ID of the statement within the job. - **offset** (integer) - Optional - The starting row offset for pagination. Defaults to 0. - **page_size** (integer) - Optional - The number of rows to retrieve per page. Defaults to 500. ### Request Example ```python from keboola_query_service import Client with Client(base_url="https://query.keboola.com", token="your-token") as client: job_id = "job_12345678" statement_id = "stmt_87654321" result = client.get_job_results( query_job_id=job_id, statement_id=statement_id, offset=0, page_size=500 ) print(f"Status: {result.status}") print(f"Rows in page: {len(result.data)}") print(f"Total rows: {result.number_of_rows}") for col in result.columns: print(f"Column: {col.name}, Type: {col.type}, Nullable: {col.nullable}, Length: {col.length}") for row in result.data: print(row) # Paginate through large result sets all_data = [] offset = 0 page_size = 1000 while True: page = client.get_job_results(job_id, statement_id, offset=offset, page_size=page_size) all_data.extend(page.data) if len(page.data) < page_size: break offset += page_size print(f"Retrieved total of {len(all_data)} rows") ``` ### Response #### Success Response (200) - **status** (string) - The status of the statement (e.g., 'success'). - **data** (list) - A list of rows, where each row is a list of values. - **columns** (list) - A list of column metadata objects, each with 'name', 'type', 'nullable', and 'length'. - **number_of_rows** (integer) - The total number of rows for the statement. #### Response Example ```json { "status": "success", "data": [ [1, "Alice", 30], [2, "Bob", 25] ], "columns": [ {"name": "id", "type": "INTEGER", "nullable": false, "length": null}, {"name": "name", "type": "VARCHAR", "nullable": true, "length": 50}, {"name": "age", "type": "INTEGER", "nullable": true, "length": null} ], "number_of_rows": 1000 } ``` ``` -------------------------------- ### Get Query History Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Retrieves historical records of query executions within a specified workspace. Supports pagination to fetch large sets of historical data. ```APIDOC ## Get Query History ### Description Retrieve historical query execution records for a workspace. ### Method GET ### Endpoint /branches/{branch_id}/workspaces/{workspace_id}/history ### Parameters #### Path Parameters - **branch_id** (string) - Required - The ID of the branch. - **workspace_id** (string) - Required - The ID of the workspace. #### Query Parameters - **page_size** (integer) - Optional - The number of historical statements to return per page. Defaults to 100. - **after_id** (string) - Optional - Returns statements after the specified statement ID for pagination. ### Request Example ```python from keboola_query_service import Client with Client(base_url="https://query.keboola.com", token="your-token") as client: history = client.get_query_history( branch_id="1261313", workspace_id="2950146661", page_size=100 ) for stmt in history.statements: print(f"Job: {stmt.query_job_id}, Status: {stmt.status}") ``` ### Response #### Success Response (200) - **statements** (array) - A list of historical query statement objects. Each object contains details like `query_job_id`, `id`, `query`, `status`, `warehouse`, `backend_size`, `rows_affected`, `created_at`, `executed_at`, `completed_at`, and `error` if applicable. #### Response Example ```json { "statements": [ { "query_job_id": "job123", "id": "stmt456", "query": "SELECT * FROM my_table", "status": "success", "warehouse": "small", "backend_size": "medium", "rows_affected": 100, "created_at": "2023-10-27T10:00:00Z", "executed_at": "2023-10-27T10:01:00Z", "completed_at": "2023-10-27T10:02:00Z", "error": null } ] } ``` ``` -------------------------------- ### Get Job Status with Python Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Retrieves the current status and progress of a submitted query job. It takes a job ID as input and returns a status object containing details like state, creation time, and individual statement statuses. This function is crucial for monitoring job execution and identifying any errors. ```python from keboola_query_service import Client, JobState, StatementState with Client(base_url="https://query.keboola.com", token="your-token") as client: job_id = "job_12345678" # Get current status status = client.get_job_status(job_id) print(f"Job: {status.query_job_id}") print(f"Status: {status.status}" ) # created, enqueued, processing, completed, failed, canceled print(f"Created: {status.created_at}") print(f"Changed: {status.changed_at}") # Check if job is done if status.status.is_terminal(): print("Job completed!") # Examine individual statements for stmt in status.statements: print(f"\nStatement {stmt.id}:") print(f" Query: {stmt.query[:50]}...") print(f" Status: {stmt.status}") print(f" Rows affected: {stmt.rows_affected}") print(f" Number of rows: {stmt.number_of_rows}") if stmt.error: print(f" Error: {stmt.error}") # Check for failures if status.status == JobState.FAILED: failed_stmts = status.get_failed_statements() first_error = status.get_first_error() print(f"Job failed: {first_error}") for stmt in failed_stmts: print(f"Failed statement: {stmt.query} - {stmt.error}") ``` -------------------------------- ### Initialize Keboola Query Service API Client Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Demonstrates how to instantiate the `Client` object for the Keboola Query Service API. Key parameters include the base URL, authentication token, request/connection timeouts, maximum retry attempts, and a custom user agent. ```python client = Client( base_url="https://query.keboola.com", token="your-token", timeout=120.0, # Request timeout (seconds) connect_timeout=10.0, # Connection timeout (seconds) max_retries=3, # Max retry attempts user_agent="my-app/1.0", # Custom user agent ) ``` -------------------------------- ### Initialize and Execute a Query with Python SDK Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Demonstrates initializing the Keboola Query Service client with a base URL and token, then executing a SQL query. It shows how to process the returned results, including column names and data. The client should be closed after use. ```python from keboola_query_service import Client # Initialize client # IMPORTANT: Use query.keboola.com (NOT connection.keboola.com) # Don't append /api/v1 - the SDK handles routing automatically client = Client( base_url="https://query.keboola.com", # Query Service URL token="your-storage-api-token" # Your Keboola Storage API token ) # Execute a query # - branch_id: Find in Keboola UI URL or via Storage API # - workspace_id: Your workspace ID from Keboola results = client.execute_query( branch_id="1261313", workspace_id="2950146661", statements=["SELECT * FROM my_table LIMIT 10"] ) # Process results - one QueryResult per statement for result in results: print("Columns:", [col.name for col in result.columns]) print("Data:", result.data) # Always close the client when done client.close() ``` -------------------------------- ### Initialize Keboola Query Service Client in Python Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Demonstrates how to initialize the Keboola Query Service client with basic and advanced configurations. It covers setting base URL, API token, timeouts, retries, and using context managers for automatic resource cleanup. Dependencies include the 'keboola_query_service' library. ```python from keboola_query_service import Client # Basic initialization client = Client( base_url="https://query.keboola.com", token="your-storage-api-token" ) # Advanced configuration with custom timeouts and retries client = Client( base_url="https://query.keboola.com", token="your-storage-api-token", timeout=120.0, # Request timeout in seconds connect_timeout=10.0, # Connection timeout in seconds max_retries=3, # Maximum retry attempts for failed requests user_agent="my-app/1.0" # Custom user agent string ) # Best practice: use context manager for automatic cleanup with Client(base_url="https://query.keboola.com", token="your-token") as client: # Client automatically closes when exiting context results = client.execute_query( branch_id="1261313", workspace_id="2950146661", statements=["SELECT 1"] ) ``` -------------------------------- ### Client Initialization Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Initializes the Query Service client with authentication and configuration settings. It's recommended to use the client as a context manager for automatic resource cleanup. ```APIDOC ## Client Initialization ### Description Initialize the Query Service client with authentication and configuration settings. Best practice is to use the client as a context manager for automatic cleanup. ### Method `Client()` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Request Example ```python from keboola_query_service import Client # Basic initialization client = Client( base_url="https://query.keboola.com", token="your-storage-api-token" ) # Advanced configuration with custom timeouts and retries client = Client( base_url="https://query.keboola.com", token="your-storage-api-token", timeout=120.0, # Request timeout in seconds connect_timeout=10.0, # Connection timeout in seconds max_retries=3, # Maximum retry attempts for failed requests user_agent="my-app/1.0" # Custom user agent string ) # Best practice: use context manager for automatic cleanup with Client(base_url="https://query.keboola.com", token="your-token") as client: # Client automatically closes when exiting context results = client.execute_query( branch_id="1261313", workspace_id="2950146661", statements=["SELECT 1"] ) ``` ### Response #### Success Response (200) - **client** (Client) - An initialized client object. #### Response Example ```json // Client object initialization does not return a JSON response directly. // The example above shows Python code for initialization. ``` ``` -------------------------------- ### Execute Query using Context Manager in Python Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Shows how to use the `Client` with a context manager (`with` statement) for automatic client closure. This is the recommended approach for managing client resources, ensuring the client is properly closed even if errors occur. ```python from keboola_query_service import Client with Client(base_url="https://query.keboola.com", token="...") as client: # Execute query and wait for results results = client.execute_query( branch_id="123", workspace_id="456", statements=[ "SELECT * FROM orders WHERE date > '2024-01-01'", "SELECT COUNT(*) FROM customers" ], transactional=True # Execute in a transaction ) # Results is a list - one QueryResult per statement orders_result = results[0] count_result = results[1] print(f"Columns: {[c.name for c in orders_result.columns]}") print(f"Rows: {len(orders_result.data)}") ``` -------------------------------- ### Async Query Execution with Python SDK Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Demonstrates asynchronous query execution using `async` and `await` with the Keboola Query Service SDK. It utilizes `async with` for client management and `execute_query_async` for non-blocking query execution. ```python import asyncio from keboola_query_service import Client async def main(): async with Client(base_url="https://query.keboola.com", token="...") as client: results = await client.execute_query_async( branch_id="1261313", workspace_id="2950146661", statements=["SELECT 1 as test"] ) print(results[0].data) asyncio.run(main()) ``` -------------------------------- ### Retrieve Query History in Python Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Shows how to fetch the query history for a given branch and workspace using the SDK. It demonstrates pagination with `page_size` and iterating through the historical statements to display their job ID, query snippet, and status. ```python history = client.get_query_history( branch_id="123", workspace_id="456", page_size=100 ) for stmt in history.statements: print(f"{stmt.query_job_id}: {stmt.query[:50]}... ({stmt.status})") ``` -------------------------------- ### Handle Common Errors with Python SDK Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Demonstrates error handling for various exceptions that can occur during query execution, including authentication issues, validation errors, job failures, and timeouts. Specific exception types like `AuthenticationError`, `ValidationError`, `JobError`, and `TimeoutError` are shown. ```python from keboola_query_service import ( Client, AuthenticationError, ValidationError, JobError, TimeoutError, ) try: results = client.execute_query(...) except AuthenticationError: print("Invalid token") except ValidationError as e: print(f"Invalid request: {e.message}") except JobError as e: print(f"Query failed: {e.message}") for stmt in e.failed_statements: print(f" Statement {stmt['id']}: {stmt['error']}") except TimeoutError as e: print(f"Job {e.job_id} timed out") ``` -------------------------------- ### Submit and Execute a Query Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md This snippet showcases the `execute_query()` method, which submits a SQL query to the Keboola Query Service, waits for its completion, and then returns the results. It's a synchronous operation. ```python # Assuming 'client' is an initialized Client object # results = client.execute_query(sql="SELECT * FROM my_table") ``` -------------------------------- ### Handle Query Service Errors using Python SDK Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Illustrates robust error handling for various exceptions raised by the Keboola Query Service Python SDK, including authentication, validation, not found, job, timeout, and general service errors. ```python from keboola_query_service import ( Client, AuthenticationError, ValidationError, NotFoundError, JobError, JobTimeoutError, QueryServiceError, ) with Client(base_url="https://query.keboola.com", token="your-token") as client: try: results = client.execute_query( branch_id="1261313", workspace_id="2950146661", statements=["SELECT * FROM nonexistent_table"] ) except AuthenticationError as e: # Invalid or expired token (401) print(f"Authentication failed: {e.message}") print(f"Status code: {e.status_code}") print(f"Exception ID: {e.exception_id}") # Re-authenticate or refresh token except ValidationError as e: # Invalid request parameters (400) print(f"Validation error: {e.message}") print(f"Context: {e.context}") # Check branch_id, workspace_id, or statement syntax except NotFoundError as e: # Resource not found (404) print(f"Not found: {e.message}") # Verify job ID, workspace ID, or branch ID exists except JobError as e: # Query execution failed print(f"Query job {e.job_id} failed: {e.message}") print(f"Failed statements:") for failed_stmt in e.failed_statements: print(f" Statement {failed_stmt['id']}: {failed_stmt['error']}") # Fix SQL syntax or logic errors except JobTimeoutError as e: # Job exceeded max wait time print(f"Job {e.job_id} timed out: {e.message}") # Increase max_wait_time or optimize query # Optionally cancel: client.cancel_job(e.job_id) except QueryServiceError as e: # Generic error (5xx, network, etc.) print(f"Query service error: {e.message}") print(f"Status: {e.status_code}") print(f"Exception ID: {e.exception_id}") print(f"Context: {e.context}") # Retry logic already applied, check service status ``` -------------------------------- ### Execute SQL Query Synchronously in Python Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Shows how to execute SQL queries synchronously using the Keboola Query Service Python SDK. This includes running single or multiple statements transactionally, retrieving results, and processing column information and row data. Requires an initialized client and appropriate workspace/branch IDs. Handles potential transaction failures with `max_wait_time`. ```python from keboola_query_service import Client with Client(base_url="https://query.keboola.com", token="your-token") as client: # Execute single statement results = client.execute_query( branch_id="1261313", workspace_id="2950146661", statements=["SELECT id, name, email FROM customers WHERE created_at > '2024-01-01' LIMIT 100"] ) # Access results - returns list of QueryResult objects result = results[0] print(f"Columns: {[col.name for col in result.columns]}") print(f"Types: {[col.type for col in result.columns]}") print(f"Rows: {len(result.data)}") for row in result.data: print(row) # ['1', 'John Doe', 'john@example.com'] # Execute multiple statements transactionally results = client.execute_query( branch_id="1261313", workspace_id="2950146661", statements=[ "CREATE TEMP TABLE tmp_orders AS SELECT * FROM orders WHERE status = 'pending'", "SELECT COUNT(*) FROM tmp_orders", "SELECT product_id, SUM(amount) as total FROM tmp_orders GROUP BY product_id" ], transactional=True, # All statements succeed or all fail max_wait_time=600.0 # Wait up to 10 minutes ) # Process each statement result count_result = results[1] print(f"Pending orders: {count_result.data[0][0]}") summary_result = results[2] for product_id, total in summary_result.data: print(f"Product {product_id}: ${total}") ``` -------------------------------- ### Synchronous Query Execution with Error Handling (Python) Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Shows how to execute multi-statement queries synchronously using the Keboola Query Service API Python SDK. It includes best practices for client initialization, transactional queries, and robust error handling for JobError, JobTimeoutError, and QueryServiceError. Results are processed after successful execution. ```python import os import asyncio from keboola_query_service import ( Client, JobError, JobTimeoutError, QueryServiceError, ) def synchronous_example(): """Synchronous query execution with error handling.""" # Use environment variables for configuration base_url = "https://query.keboola.com" token = os.environ["KBC_TOKEN"] branch_id = os.environ["BRANCH_ID"] workspace_id = os.environ["WORKSPACE_ID"] # Use context manager for automatic resource cleanup with Client( base_url=base_url, token=token, timeout=120.0, max_retries=3 ) as client: try: # Execute multi-statement query transactionally results = client.execute_query( branch_id=branch_id, workspace_id=workspace_id, statements=[ "CREATE TEMP TABLE analysis AS SELECT * FROM sales WHERE year = 2024", "SELECT region, SUM(revenue) as total_revenue FROM analysis GROUP BY region", "SELECT product_id, AVG(price) as avg_price FROM analysis GROUP BY product_id" ], transactional=True, max_wait_time=300.0 ) # Process results regional_summary = results[1] print("Regional Revenue:") for region, revenue in regional_summary.data: print(f" {region}: ${revenue}") product_pricing = results[2] print("\nProduct Pricing:") for product_id, avg_price in product_pricing.data: print(f" Product {product_id}: ${avg_price}") return results except JobError as e: print(f"Query failed: {e.message}") for stmt in e.failed_statements: print(f"Failed: {stmt['error']}") raise except JobTimeoutError as e: print(f"Query timed out: {e.job_id}") client.cancel_job(e.job_id) raise except QueryServiceError as e: print(f"Service error: {e.message} (code: {e.status_code})") raise if __name__ == "__main__": # Run synchronous example synchronous_example() ``` -------------------------------- ### Submit a Query Job Asynchronously Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Illustrates how to use the `submit_job()` method to submit a SQL query without waiting for it to complete. This is useful for long-running queries where you want to manage the job status and results retrieval separately. ```python # Assuming 'client' is an initialized Client object # job = client.submit_job(sql="SELECT COUNT(*) FROM another_table") ``` -------------------------------- ### Error Handling Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Provides guidance on handling various exceptions that can occur during API interactions, including authentication, validation, not found, job-specific, timeout, and general service errors. ```APIDOC ## Error Handling ### Description Handle specific error types with detailed exception information. ### Method All (applies to various operations like job submission, cancellation, etc.) ### Endpoint N/A (This section describes exception handling) ### Parameters N/A ### Request Example ```python from keboola_query_service import ( Client, AuthenticationError, ValidationError, NotFoundError, JobError, JobTimeoutError, QueryServiceError, ) with Client(base_url="https://query.keboola.com", token="your-token") as client: try: client.execute_query( branch_id="1261313", workspace_id="2950146661", statements=["SELECT * FROM nonexistent_table"] ) except AuthenticationError as e: print(f"Authentication failed: {e.message}") except ValidationError as e: print(f"Validation error: {e.message}") except NotFoundError as e: print(f"Not found: {e.message}") except JobError as e: print(f"Query job {e.job_id} failed: {e.message}") except JobTimeoutError as e: print(f"Job {e.job_id} timed out: {e.message}") except QueryServiceError as e: print(f"Query service error: {e.message}") ``` ### Response #### Success Response (N/A - This section describes error exceptions) #### Response Example (Error response examples vary based on exception type. See individual exception descriptions below.) **Common Exception Attributes:** - **message** (string) - A human-readable error message. - **status_code** (integer) - The HTTP status code associated with the error. - **exception_id** (string) - A unique identifier for the error. - **context** (object/string) - Additional context about the error. **Specific Exception Types:** - **AuthenticationError**: Raised for authentication issues (e.g., invalid token). - **ValidationError**: Raised for invalid request parameters. - **NotFoundError**: Raised when a requested resource (e.g., job, workspace) is not found. - **JobError**: Raised when a query job fails during execution. May include `failed_statements`. - **JobTimeoutError**: Raised when a job exceeds its maximum allowed execution time. - **QueryServiceError**: A general error for other issues like network problems or server errors (5xx). ``` -------------------------------- ### Asynchronous Query Execution with Concurrent Operations (Python) Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Demonstrates how to execute multiple independent queries concurrently using asynchronous capabilities of the Keboola Query Service API Python SDK. It utilizes `asyncio.gather` for parallel execution and includes error handling for exceptions during the process. Results are aggregated and processed after all concurrent operations complete. ```python import os import asyncio from keboola_query_service import ( Client, JobError, JobTimeoutError, QueryServiceError, ) async def asynchronous_example(): """Async query execution with concurrent operations.""" base_url = "https://query.keboola.com" token = os.environ["KBC_TOKEN"] branch_id = os.environ["BRANCH_ID"] workspace_id = os.environ["WORKSPACE_ID"] async with Client(base_url=base_url, token=token) as client: # Run multiple independent queries concurrently results = await asyncio.gather( client.execute_query_async( branch_id=branch_id, workspace_id=workspace_id, statements=["SELECT COUNT(*) FROM orders WHERE status = 'completed'"] ), client.execute_query_async( branch_id=branch_id, workspace_id=workspace_id, statements=["SELECT SUM(amount) FROM payments WHERE date > CURRENT_DATE - 7"] ), client.execute_query_async( branch_id=branch_id, workspace_id=workspace_id, statements=["SELECT COUNT(DISTINCT customer_id) FROM sessions WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '1 hour'"] ), return_exceptions=True ) # Process results with error handling completed_orders = results[0][0].data[0][0] if not isinstance(results[0], Exception) else 0 weekly_revenue = results[1][0].data[0][0] if not isinstance(results[1], Exception) else 0 active_users = results[2][0].data[0][0] if not isinstance(results[2], Exception) else 0 print(f"Completed orders: {completed_orders}") print(f"Weekly revenue: ${weekly_revenue}") print(f"Active users (1h): {active_users}") return { "orders": completed_orders, "revenue": weekly_revenue, "active_users": active_users } if __name__ == "__main__": # Run asynchronous example asyncio.run(asynchronous_example()) ``` -------------------------------- ### Retrieve Query History and Stream Results Source: https://github.com/keboola/query-service-api-python-sdk/blob/main/README.md Shows how to access historical query information using `get_query_history()` and how to efficiently process large result sets by streaming them as NDJSON using `stream_results()`. ```python # Assuming 'client' is an initialized Client object # history = client.get_query_history() # for row in client.stream_results(query_id="some-query-id"): # print(row) ``` -------------------------------- ### Submit Query Job with Python Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Submits a SQL query job for background execution without waiting for completion. This allows for manual job management. The function requires branch and workspace IDs, and a list of statements. It supports transactional execution and specifies the actor type (USER or SYSTEM). An asynchronous version is also available. ```python from keboola_query_service import Client, ActorType with Client(base_url="https://query.keboola.com", token="your-token") as client: # Submit job and get job ID job_id = client.submit_job( branch_id="1261313", workspace_id="2950146661", statements=[ "SELECT * FROM large_dataset WHERE year = 2024", "SELECT product_id, COUNT(*) FROM large_dataset GROUP BY product_id" ], transactional=True, actor_type=ActorType.USER # or ActorType.SYSTEM for automated processes ) print(f"Job submitted: {job_id}") # Job is now running in the background # You can check status later or wait for completion # Async version async def submit_async(): async with Client(base_url="https://query.keboola.com", token="your-token") as client: job_id = await client.submit_job_async( branch_id="1261313", workspace_id="2950146661", statements=["SELECT 1"], transactional=False ) return job_id ``` -------------------------------- ### Execute Query - Synchronous Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Executes SQL statements and waits for results in a blocking manner. Supports single statements, multiple statements transactionally, and provides access to results. ```APIDOC ## Execute Query - Synchronous ### Description Execute SQL statements and wait for results in a blocking manner. This method can execute single or multiple statements transactionally and returns the results for each statement. ### Method `client.execute_query()` ### Endpoint This is a client method and does not correspond to a direct HTTP endpoint in the markdown structure. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **branch_id** (str) - Required - The ID of the branch containing the workspace. - **workspace_id** (str) - Required - The ID of the workspace where the query will be executed. - **statements** (list[str]) - Required - A list of SQL statements to execute. - **transactional** (bool) - Optional - If True, all statements are executed within a single transaction. Defaults to False. - **max_wait_time** (float) - Optional - Maximum time in seconds to wait for the query job to complete. Defaults to None. ### Request Example ```python from keboola_query_service import Client with Client(base_url="https://query.keboola.com", token="your-token") as client: # Execute single statement results = client.execute_query( branch_id="1261313", workspace_id="2950146661", statements=["SELECT id, name, email FROM customers WHERE created_at > '2024-01-01' LIMIT 100"] ) # Access results - returns list of QueryResult objects result = results[0] print(f"Columns: {[col.name for col in result.columns]}") print(f"Types: {[col.type for col in result.columns]}") print(f"Rows: {len(result.data)}") for row in result.data: print(row) # ['1', 'John Doe', 'john@example.com'] # Execute multiple statements transactionally results = client.execute_query( branch_id="1261313", workspace_id="2950146661", statements=[ "CREATE TEMP TABLE tmp_orders AS SELECT * FROM orders WHERE status = 'pending'", "SELECT COUNT(*) FROM tmp_orders", "SELECT product_id, SUM(amount) as total FROM tmp_orders GROUP BY product_id" ], transactional=True, # All statements succeed or all fail max_wait_time=600.0 # Wait up to 10 minutes ) # Process each statement result count_result = results[1] print(f"Pending orders: {count_result.data[0][0]}") summary_result = results[2] for product_id, total in summary_result.data: print(f"Product {product_id}: ${total}") ``` ### Response #### Success Response (200) - **results** (list[QueryResult]) - A list of QueryResult objects, one for each executed statement. - **QueryResult.columns** (list[Column]) - List of column metadata. - **QueryResult.data** (list[list]) - The actual query results as a list of rows, where each row is a list of values. #### Response Example ```json // The response is a list of QueryResult objects, not a simple JSON object. // Example structure for a single QueryResult: // [ // { // "columns": [ // {"name": "id", "type": "integer"}, // {"name": "name", "type": "string"} // ], // "data": [ // [1, "John Doe"], // [2, "Jane Smith"] // ] // } // ] ``` ``` -------------------------------- ### Execute Query - Asynchronous Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Execute SQL statements asynchronously using async/await for concurrent operations. This endpoint allows for running single queries or multiple queries concurrently. ```APIDOC ## POST /queries/execute ### Description Executes SQL statements asynchronously. Supports running single or multiple queries concurrently. ### Method POST ### Endpoint /queries/execute ### Parameters #### Query Parameters - **branch_id** (string) - Required - The ID of the branch. - **workspace_id** (string) - Required - The ID of the workspace. #### Request Body - **statements** (array of strings) - Required - A list of SQL statements to execute. ### Request Example ```json { "statements": ["SELECT * FROM large_table LIMIT 1000"] } ``` ### Response #### Success Response (200) - **data** (array of objects) - Contains the results of the executed queries. Each element in the array corresponds to a statement in the request. - **data** (array of arrays) - The actual query results. - **columns** (array of strings) - The names of the columns. #### Response Example ```json [ { "data": [[1, "value1"], [2, "value2"]], "columns": ["id", "name"] } ] ``` ``` -------------------------------- ### Submit Job Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Submit a query job for background processing without waiting for completion. This is useful for long-running queries or when manual job management is preferred. Supports both synchronous and asynchronous submission. ```APIDOC ## POST /jobs/submit ### Description Submits a SQL query job to be executed in the background. Returns a job ID for later status checking. ### Method POST ### Endpoint /jobs/submit ### Parameters #### Query Parameters - **branch_id** (string) - Required - The ID of the branch. - **workspace_id** (string) - Required - The ID of the workspace. #### Request Body - **statements** (array of strings) - Required - A list of SQL statements to execute. - **transactional** (boolean) - Optional - Whether to run statements transactionally. - **actor_type** (string) - Optional - The type of actor submitting the job (e.g., USER, SYSTEM). ### Request Example ```json { "statements": [ "SELECT * FROM large_dataset WHERE year = 2024", "SELECT product_id, COUNT(*) FROM large_dataset GROUP BY product_id" ], "transactional": true, "actor_type": "USER" } ``` ### Response #### Success Response (200) - **job_id** (string) - The ID of the submitted job. #### Response Example ```json { "job_id": "job_12345678" } ``` ``` -------------------------------- ### Execute Query Asynchronously with Python Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Executes SQL statements asynchronously, allowing for concurrent operations. It uses the `keboola_query_service.Client` and `asyncio` for managing asynchronous tasks. The function accepts branch and workspace IDs, and a list of SQL statements. It returns results for each statement, enabling efficient handling of multiple or large queries. ```python import asyncio from keboola_query_service import Client async def main(): async with Client(base_url="https://query.keboola.com", token="your-token") as client: # Single async query results = await client.execute_query_async( branch_id="1261313", workspace_id="2950146661", statements=["SELECT * FROM large_table LIMIT 1000"] ) print(f"Retrieved {len(results[0].data)} rows") # Run multiple queries concurrently results_batch = await asyncio.gather( client.execute_query_async( branch_id="1261313", workspace_id="2950146661", statements=["SELECT COUNT(*) FROM orders"] ), client.execute_query_async( branch_id="1261313", workspace_id="2950146661", statements=["SELECT COUNT(*) FROM customers"] ), client.execute_query_async( branch_id="1261313", workspace_id="2950146661", statements=["SELECT COUNT(*) FROM products"] ) ) orders_count = results_batch[0][0].data[0][0] customers_count = results_batch[1][0].data[0][0] products_count = results_batch[2][0].data[0][0] print(f"Orders: {orders_count}, Customers: {customers_count}, Products: {products_count}") asyncio.run(main()) ``` -------------------------------- ### Cancel Query Job using Python SDK Source: https://context7.com/keboola/query-service-api-python-sdk/llms.txt Demonstrates how to cancel a running or pending query job using the Keboola Query Service Python SDK. It includes synchronous and asynchronous methods for cancellation and verification of the job status. ```python from keboola_query_service import Client with Client(base_url="https://query.keboola.com", token="your-token") as client: # Submit a long-running job job_id = client.submit_job( branch_id="1261313", workspace_id="2950146661", statements=["SELECT * FROM enormous_table WHERE complex_calculation(col) > 1000"] ) # Cancel the job canceled_job_id = client.cancel_job( query_job_id=job_id, reason="User requested cancellation" ) print(f"Canceled job: {canceled_job_id}") # Verify cancellation status = client.get_job_status(job_id) print(f"Job status: {status.status}") # Should be 'canceled' print(f"Cancellation reason: {status.cancellation_reason}") print(f"Canceled at: {status.canceled_at}") # Async version async def cancel_async(): async with Client(base_url="https://query.keboola.com", token="your-token") as client: await client.cancel_job_async(job_id, reason="Timeout exceeded") ```