### Ariel API Get Databases Example Source: https://github.com/ibm/api-samples/blob/master/ariel/readme.md Demonstrates how to retrieve a list of all available catalogs (databases) that can be queried using AQL through the API. ```python import requests # Assuming base_url and auth_token are defined # base_url = "https:///api" # auth_token = "" headers = { "Accept": "application/json" } try: response = requests.get(f"{base_url}/ariel/databases", headers=headers, verify=False, auth=('', auth_token)) response.raise_for_status() # Raise an exception for bad status codes print(f"Databases: {response.json()}") except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") ``` -------------------------------- ### Extension Management API Reference Source: https://github.com/ibm/api-samples/blob/master/extension_management/readme.md Provides details on the Extension Management API endpoints for managing extensions within the IBM system. This includes endpoints for uploading, installing, checking installation status, and deleting extensions. ```APIDOC Extension Management API: Base URL: /config/extension_management Endpoints: 1. Upload Extension: - Method: POST - URL: /extensions - Description: Uploads a new extension file. - Request Body: - `file`: The extension file (multipart/form-data). - Response: - `id`: The unique identifier of the uploaded extension. - `name`: The name of the extension. - `version`: The version of the extension. 2. Install Extension: - Method: POST - URL: /installations - Description: Initiates the installation of an extension. - Request Body: - `extension_id`: The ID of the extension to install (string). - Response: - `task_id`: The ID of the installation task. 3. Check Installation Task Status: - Method: GET - URL: /installations/{task_id} - Description: Retrieves the status of an extension installation task. - Parameters: - `task_id`: The ID of the installation task (path parameter). - Response: - `status`: The current status of the task (e.g., PENDING, PROCESSING, COMPLETED, FAILED). - `message`: A message providing details about the task status. 4. Delete Extension: - Method: DELETE - URL: /extensions/{extension_id} - Description: Deletes an extension from the system. - Parameters: - `extension_id`: The ID of the extension to delete (path parameter). - Response: - Success message or status code. Authentication: - Requires Bearer token authentication. Note: - Refer to `https:///api_doc` for the interactive API documentation. - Available endpoints can also be retrieved via the API at `/api/help/endpoints`. ``` -------------------------------- ### Get Domain Samples Source: https://github.com/ibm/api-samples/blob/master/domain_management/readme.md Demonstrates read-only GET requests for the Domain API. Samples show how to retrieve all domains and a specific individual domain, including handling cases where the domain does not exist. ```python import requests # Example for getting all domains response = requests.get('https:///api/config/domain_management/domains', auth=('user', 'password')) print(response.json()) # Example for getting a specific domain response = requests.get('https:///api/config/domain_management/domains/1', auth=('user', 'password')) print(response.json()) # Example for a non-existent domain response = requests.get('https:///api/config/domain_management/domains/999', auth=('user', 'password')) print(response.json()) ``` -------------------------------- ### Manage Extensions with IBM API Source: https://github.com/ibm/api-samples/blob/master/extension_management/readme.md Demonstrates how to manage extensions using the IBM REST API. This includes uploading new extensions, installing them, checking the status of installation tasks, and deleting extensions. It utilizes the /config/extension_management endpoints. ```python import requests import json # --- Configuration --- API_HOST = "" API_TOKEN = "" # --- Upload Extension --- def upload_extension(file_path): url = f"https://{API_HOST}/config/extension_management/extensions" headers = { "Authorization": f"Bearer {API_TOKEN}" } files = { "file": (file_path.split('/')[-1], open(file_path, 'rb')) } response = requests.post(url, headers=headers, files=files) return response.json() # --- Install Extension --- def install_extension(extension_id): url = f"https://{API_HOST}/config/extension_management/installations" headers = { "Authorization": f"Bearer {API_TOKEN}", "Content-Type": "application/json" } payload = json.dumps({ "extension_id": extension_id }) response = requests.post(url, headers=headers, data=payload) return response.json() # --- Check Installation Task --- def check_installation_task(task_id): url = f"https://{API_HOST}/config/extension_management/installations/{task_id}" headers = { "Authorization": f"Bearer {API_TOKEN}" } response = requests.get(url, headers=headers) return response.json() # --- Delete Extension --- def delete_extension(extension_id): url = f"https://{API_HOST}/config/extension_management/extensions/{extension_id}" headers = { "Authorization": f"Bearer {API_TOKEN}" } response = requests.delete(url, headers=headers) return response.json() # --- Example Usage --- if __name__ == "__main__": # Replace with your actual file path and token extension_file = "./my_extension.zip" # 1. Upload Extension print("Uploading extension...") upload_result = upload_extension(extension_file) print(f"Upload Result: {upload_result}") if 'id' in upload_result: extension_id = upload_result['id'] # 2. Install Extension print(f"Installing extension {extension_id}...") install_result = install_extension(extension_id) print(f"Install Result: {install_result}") if 'task_id' in install_result: task_id = install_result['task_id'] # 3. Check Installation Task (polling might be needed) print(f"Checking installation task {task_id}...") task_status = check_installation_task(task_id) print(f"Task Status: {task_status}") # 4. Delete Extension (use with caution) # print(f"Deleting extension {extension_id}...") # delete_result = delete_extension(extension_id) # print(f"Delete Result: {delete_result}") else: print("Failed to upload extension.") ``` -------------------------------- ### Custom Configuration Section Example Source: https://github.com/ibm/api-samples/blob/master/readme.md Demonstrates how to create a custom configuration section within the config.ini file to override or provide specific settings for API clients. ```ini [my_custom_config] username = {my_other_username} password = {my_other_password} ``` -------------------------------- ### Get Assets with REST API Source: https://github.com/ibm/api-samples/blob/master/asset_model/readme.md Demonstrates how to retrieve asset data using the assets REST API. Requires existing assets on the system. ```python import requests # Assuming you have a hostname and authentication token hostname = "" api_token = "" headers = { "Authorization": f"Bearer {api_token}" } # Get assets response = requests.get(f"https://{hostname}/api/asset_model/assets", headers=headers) if response.status_code == 200: assets = response.json() print("Successfully retrieved assets:") # Process assets data for asset in assets: print(f"- {asset['name']} (ID: {asset['id']})") else: print(f"Error retrieving assets: {response.status_code} - {response.text}") ``` -------------------------------- ### Get Tenant List (Python) Source: https://github.com/ibm/api-samples/blob/master/tenant/readme.md Demonstrates how to retrieve a list of tenants using the IBM API. This sample covers basic request sending and handling responses. ```python import requests # Assuming you have authentication details and hostname set up hostname = "" api_token = "" headers = { "SEC": api_token } url = f"https://{hostname}/api/tenants" try: response = requests.get(url, headers=headers) response.raise_for_status() # Raise an exception for bad status codes tenants = response.json() print("Tenants:", tenants) except requests.exceptions.RequestException as e: print(f"Error retrieving tenants: {e}") ``` -------------------------------- ### QRadar REST API Overview Source: https://github.com/ibm/api-samples/blob/master/readme.md This section provides an overview of the QRadar REST API, explaining how endpoints are used and how different HTTP methods (GET, POST, DELETE) can affect their functionality. It also directs users to the interactive API documentation and community forums for more information. ```APIDOC QRadar REST API: Access: Via HTTP requests to specific URLs (endpoints) on the QRadar console. Functionality: Each endpoint performs a specific function, which can vary based on the HTTP method used (GET, POST, DELETE). Integration: Enables custom business processes and integration with external systems. Documentation: Interactive help available at `https:///api_doc`. Endpoint Discovery: Use the `/help/endpoints` endpoint to retrieve a list of available endpoints. Community: Forums available at https://www.ibm.com/developerworks/community/forums/html/forum?id=b02461a3-9a70-4d73-94e8-c096abe263ca Key Categories Covered in Samples: - Introduction package: Low-level API usage. - Reference data package: Endpoints in the `/reference_data` category. - Ariel package: Endpoints in the `/ariel` category. - Offense package: Endpoints in the `/siem/offenses` category. - Domain Management package: Endpoints in the `/config/domain_management/domains` category. - Custom Actions package: Endpoints in the `/analytics/custom_actions` category. - API CLI client: Command-line access to the API. - Shared modules package: Contains shared modules for API interaction. Requirements: - Python: 3.3 or above. - QRadar System: 7.2.8 or higher. ``` -------------------------------- ### Ariel API Search Workflow Example Source: https://github.com/ibm/api-samples/blob/master/ariel/readme.md Demonstrates a complete search workflow using the Ariel API. It submits a search, retrieves its status asynchronously, fetches results in JSON and CSV formats, and saves the results permanently. ```python import requests import time # Assuming base_url and auth_token are defined # base_url = "https:///api" # auth_token = "" # 1. Submit a search search_payload = { "query": "SELECT sourceip, destinationip FROM events LIMIT 5" } headers = { "Content-Type": "application/json", "Accept": "application/json" } try: response = requests.post(f"{base_url}/ariel/searches", headers=headers, json=search_payload, verify=False, auth=('', auth_token)) response.raise_for_status() search_id = response.json().get("search_id") print(f"Search submitted with ID: {search_id}") # 2. Retrieve search status and results status = "" while status not in ["COMPLETED", "FAILED"]: time.sleep(5) # Wait before checking status status_response = requests.get(f"{base_url}/ariel/searches/{search_id}/status", headers=headers, verify=False, auth=('', auth_token)) status_response.raise_for_status() status = status_response.json().get("status") print(f"Current status: {status}") if status == "COMPLETED": # 3. Retrieve results in JSON json_results_response = requests.get(f"{base_url}/ariel/searches/{search_id}?output_format=JSON", headers=headers, verify=False, auth=('', auth_token)) json_results_response.raise_for_status() print("\nJSON Results:") print(json_results_response.json()) # 4. Retrieve results in CSV csv_results_response = requests.get(f"{base_url}/ariel/searches/{search_id}?output_format=CSV", headers=headers, verify=False, auth=('', auth_token)) csv_results_response.raise_for_status() print("\nCSV Results:") print(csv_results_response.text) # 5. Save results permanently save_response = requests.post(f"{base_url}/ariel/searches/{search_id}/save", headers=headers, verify=False, auth=('', auth_token)) save_response.raise_for_status() print(f"\nSearch results saved. Response: {save_response.json()}") else: print(f"Search failed with status: {status}") except requests.exceptions.RequestException as e: print(f"An error occurred during the workflow: {e}") ``` -------------------------------- ### Domain Utility Helper Methods Source: https://github.com/ibm/api-samples/blob/master/domain_management/readme.md Contains helper methods for domain management operations. The `setup_domain()` method specifically demonstrates creating a new domain using the Domain Management API. ```python import requests import json def setup_domain(hostname, username, password, domain_name, source_id): """Helper function to create a new domain.""" url = f"https://{hostname}/api/config/domain_management/domains" payload = { "name": domain_name, "source_id": source_id, "description": "Created by setup_domain helper" } response = requests.post(url, data=json.dumps(payload), auth=(username, password)) return response.json() # Example usage: # result = setup_domain('my_qradar.ibm.com', 'admin', 'admin', 'NewDomain', 'source123') # print(result) ``` -------------------------------- ### Get Offenses for IP Source: https://github.com/ibm/api-samples/blob/master/siem/readme.md Retrieves all offenses associated with a specific IP address. It first uses GET siem/source_addresses and GET siem/local_destination_addresses to find offenses linked to the IP, then uses this information to query the GET siem/offenses endpoint. ```python import json # Assuming 'ibm_api_client' is a pre-configured client object # from ibm_platform_services.siem_v1 import * # Example import # Placeholder for the actual API client initialization class MockSIEMClient: def get_source_addresses(self, **kwargs): print(f"MockSIEMClient.get_source_addresses called with: {kwargs}") # Simulate API response for a specific IP if 'filter' in kwargs and 'source_address = 192.168.1.10' in kwargs['filter']: return { 'success': True, 'data': [ {'offense_id': 101, 'source_address': '192.168.1.10'}, {'offense_id': 105, 'source_address': '192.168.1.10'} ] } return {'success': False, 'data': []} def get_local_destination_addresses(self, **kwargs): print(f"MockSIEMClient.get_local_destination_addresses called with: {kwargs}") # Simulate API response for a specific IP if 'filter' in kwargs and 'local_destination_address = 192.168.1.10' in kwargs['filter']: return { 'success': True, 'data': [ {'offense_id': 102, 'local_destination_address': '192.168.1.10'}, {'offense_id': 106, 'local_destination_address': '192.168.1.10'} ] } return {'success': False, 'data': []} def get_offenses(self, **kwargs): print(f"MockSIEMClient.get_offenses called with: {kwargs}") # Simulate API response based on offense IDs if 'filter' in kwargs and 'id in (101, 105, 102, 106)' in kwargs['filter']: return { 'success': True, 'data': [ {'id': 101, 'name': 'Offense A', 'description': 'Related to 192.168.1.10'}, {'id': 105, 'name': 'Offense B', 'description': 'Also related to 192.168.1.10'}, {'id': 102, 'name': 'Offense C', 'description': 'Destination for 192.168.1.10'}, {'id': 106, 'name': 'Offense D', 'description': 'Another destination for 192.168.1.10'} ] } return {'success': False, 'data': []} siem_client = MockSIEMClient() # Replace with actual client initialization def get_offenses_for_ip(ip_address): """Gets all offenses associated with a given IP address. Args: ip_address (str): The IP address to search for. """ associated_offense_ids = set() # Find offenses where the IP is a source address source_response = siem_client.get_source_addresses(filter=f"source_address = {ip_address}", fields='offense_id') if source_response and source_response['success']: for item in source_response['data']: associated_offense_ids.add(item['offense_id']) # Find offenses where the IP is a local destination address dest_response = siem_client.get_local_destination_addresses(filter=f"local_destination_address = {ip_address}", fields='offense_id') if dest_response and dest_response['success']: for item in dest_response['data']: associated_offense_ids.add(item['offense_id']) if not associated_offense_ids: print(f"No offenses found associated with IP address {ip_address}.") return # Fetch the details of the associated offenses # Construct a filter for multiple IDs (e.g., 'id in (1, 2, 3)') id_list_str = ", ".join(map(str, associated_offense_ids)) offenses_filter = f"id in ({id_list_str})" offenses_response = siem_client.get_offenses(filter=offenses_filter, fields='id,name,description') if offenses_response and offenses_response['success']: print(f"Offenses associated with IP {ip_address}:") for offense in offenses_response['data']: print(f" - ID: {offense['id']}, Name: {offense['name']}, Description: {offense.get('description', 'N/A')}") else: print(f"Could not retrieve details for associated offenses.") # Example usage: # get_offenses_for_ip('192.168.1.10') ``` -------------------------------- ### API Documentation Reference Source: https://github.com/ibm/api-samples/blob/master/asset_model/readme.md Provides information on accessing the API's interactive help page and available endpoints. ```APIDOC API Endpoints: - /asset_model/assets: Retrieve asset data. - /asset_model/properties: Retrieve available asset property data. - /asset_model/saved_searches: Retrieve available saved searches. - /asset_model/assets/search: Search assets using a saved search. Interactive Help: Access the REST API interactive help page at `https:///api_doc`. Available Endpoints via API: GET /api/help/endpoints Required Privileges: - Assets Authentication: Requires a user or authorized token with the 'Assets' privilege. Use 'Authorization: Bearer ' header. ``` -------------------------------- ### Basic GET API Call Source: https://github.com/ibm/api-samples/blob/master/apiclient.md Makes a basic GET request to an API endpoint that does not require any parameters. This is the simplest way to interact with the API. ```python python apiclient.py --api /help/versions --method GET ``` -------------------------------- ### API Documentation Reference Source: https://github.com/ibm/api-samples/blob/master/servers/readme.md Provides information on accessing the REST API interactive help page and retrieving available endpoints through the API itself. This serves as a reference for available endpoints and their parameters. ```APIDOC API Endpoints: 1. `/api_doc` (GET) - Description: Provides an interactive help page for the REST API, detailing available endpoints and parameters. - Access: Typically accessed via a web browser at `https:///api_doc`. 2. `/api/help/versions` (GET) - Description: Retrieves a list of available API endpoints and their versions through the API itself. - Returns: A JSON object containing information about available API endpoints. Relevant API Endpoints for System Servers: - `/system/servers` (GET) - Description: Retrieves a list of server hosts in the QRadar deployment. - Parameters: None - Returns: A list of server objects, each with an 'id' and 'hostname'. - `/system/servers/{id}/` (GET) - Description: Retrieves the general settings for a specific server host. - Parameters: - `id` (path parameter): The unique identifier of the server host. - Returns: A JSON object containing the server's general settings, potentially including `email_server_address`. - `/system/servers/{id}/` (PUT) - Description: Updates the general settings of a specific server host. Currently supports updating `email_server_address`. - Parameters: - `id` (path parameter): The unique identifier of the server host. - Request Body: JSON object with fields to update, e.g., `{"email_server_address": "new.email.server.com"}`. - Returns: Status code indicating success or failure. - `/system/servers/{id}/firewall_rules` (GET) - Description: Retrieves the access control firewall rules for a specific server host. - Parameters: - `id` (path parameter): The unique identifier of the server host. - Returns: A list of firewall rule objects. - `/system/servers/{id}/firewall_rules` (PUT) - Description: Updates the access control firewall rules for a specific server host. Requires sending the complete list of desired rules. - Parameters: - `id` (path parameter): The unique identifier of the server host. - Request Body: JSON array representing the complete set of firewall rules. - Returns: Status code indicating success or failure. - `/system/servers/{id}/network_interfaces/ethernet/{interface_name}` (GET) - Description: Retrieves the settings for a specific ethernet network interface on a server host. - Parameters: - `id` (path parameter): The unique identifier of the server host. - `interface_name` (path parameter): The name of the ethernet interface (e.g., 'eth0'). - Returns: A JSON object with the interface's settings. - `/system/servers/{id}/network_interfaces/ethernet/{interface_name}` (PUT) - Description: Updates the settings for a specific ethernet network interface. Only non-management and non-HACrossover interfaces are supported. - Parameters: - `id` (path parameter): The unique identifier of the server host. - `interface_name` (path parameter): The name of the ethernet interface. - Request Body: JSON object with fields to update, e.g., `{"ip_address": "192.168.1.100", "netmask": "255.255.255.0"}`. - Returns: Status code indicating success or failure. - `/system/servers/{id}/network_interfaces/bonded` (POST) - Description: Creates a new bonded network interface. - Parameters: - `id` (path parameter): The unique identifier of the server host. - Request Body: JSON object specifying the bond name and slave interfaces, e.g., `{"bond_name": "bond0", "ethernet_interfaces": ["eth1", "eth2"]}`. - Returns: Status code indicating success or failure. - `/system/servers/{id}/network_interfaces/bonded/{bond_name}` (PUT) - Description: Updates the settings of an existing bonded network interface. - Parameters: - `id` (path parameter): The unique identifier of the server host. - `bond_name` (path parameter): The name of the bonded interface (e.g., 'bond0'). - Request Body: JSON object with updated settings, e.g., `{"ip_address": "192.168.1.150", "netmask": "255.255.255.0", "ethernet_interfaces": ["eth1", "eth2", "eth3"]}`. - Returns: Status code indicating success or failure. - `/system/servers/{id}/network_interfaces/bonded/{bond_name}` (DELETE) - Description: Unbonds (removes) an existing bonded network interface, reverting its slave interfaces to individual ethernet interfaces. - Parameters: - `id` (path parameter): The unique identifier of the server host. - `bond_name` (path parameter): The name of the bonded interface to unbond. - Returns: Status code indicating success or failure. ``` -------------------------------- ### Vulnerability Instance Search Workflow Source: https://github.com/ibm/api-samples/blob/master/qvm/readme.md Demonstrates a complete search workflow using the QVM REST API, including creating a search, monitoring its status, and retrieving paginated results for vulnerability instances, assets, and vulnerabilities. ```python import requests import time hostname = "" # --- Step 1: Get a 'High Risk' saved search --- print("Step 1: Getting 'High Risk' saved search...") saved_searches_url = f"https://{hostname}/qvm/saved_searches" headers = {"Accept": "application/json"} try: response = requests.get(saved_searches_url, headers=headers) response.raise_for_status() saved_searches = response.json() high_risk_search = next((s for s in saved_searches if s['name'] == 'High Risk'), None) if not high_risk_search: raise ValueError("Could not find a saved search named 'High Risk'.") search_id = high_risk_search['id'] print(f"Found 'High Risk' search with ID: {search_id}") except (requests.exceptions.RequestException, ValueError) as e: print(f"Error in Step 1: {e}") exit(1) # --- Step 2: Create a vulnerability instances search --- print("\nStep 2: Creating vulnerability instances search...") create_search_url = f"https://{hostname}/qvm/vulnerability_instances/search" payload = {"search_id": search_id} try: response = requests.post(create_search_url, json=payload, headers=headers) response.raise_for_status() task_info = response.json() task_id = task_info['task_id'] print(f"Search task created with ID: {task_id}") except requests.exceptions.RequestException as e: print(f"Error in Step 2: {e}") exit(1) # --- Step 3: Monitor search status and retrieve results --- print("\nStep 3: Monitoring search status and retrieving results...") status_url_template = f"https://{hostname}/qvm/vulnerability_instances/search/status/{{task_id}}" vulnerability_instances_url_template = f"https://{hostname}/qvm/vulnerability_instances" assets_url_template = f"https://{hostname}/qvm/assets" vulnerabilities_url_template = f"https://{hostname}/qvm/vulnerabilities" status = "" while status not in ["completed", "error", "timeout"]: try: status_url = status_url_template.format(task_id=task_id) response = requests.get(status_url, headers=headers) response.raise_for_status() status_info = response.json() status = status_info['status'] print(f"Current search status: {status}") if status not in ["completed", "error", "timeout"]: time.sleep(5) # Wait before checking again except requests.exceptions.RequestException as e: print(f"Error checking status: {e}") time.sleep(5) if status == "completed": print("\nSearch completed successfully. Retrieving results...") all_vulnerability_instances = [] all_asset_ids = set() all_vulnerability_ids = set() page_size = 10 total_retrieved = 0 max_total_rows = 30 while total_retrieved < max_total_rows: range_header = f"items={total_retrieved}-{total_retrieved + page_size - 1}" print(f"Fetching page: {range_header}") try: vuln_instances_url = f"{vulnerability_instances_url_template}?search_id={task_id}" response = requests.get(vuln_instances_url, headers={'Range': range_header, **headers}) response.raise_for_status() current_page_instances = response.json() if not current_page_instances: print("No more vulnerability instances found.") break all_vulnerability_instances.extend(current_page_instances) for instance in current_page_instances: if 'asset_id' in instance: all_asset_ids.add(instance['asset_id']) if 'vulnerability_id' in instance: all_vulnerability_ids.add(instance['vulnerability_id']) total_retrieved += len(current_page_instances) if total_retrieved >= max_total_rows: break except requests.exceptions.RequestException as e: print(f"Error retrieving vulnerability instances page: {e}") break print(f"\nRetrieved a total of {len(all_vulnerability_instances)} vulnerability instances (up to {max_total_rows} requested).") # --- Step 4: Get associated assets --- if all_asset_ids: print(f"\nFetching assets for {len(all_asset_ids)} unique asset IDs...") try: asset_ids_str = ",".join(map(str, all_asset_ids)) assets_url = f"{assets_url_template}?asset_ids={asset_ids_str}" response = requests.get(assets_url, headers=headers) response.raise_for_status() assets_data = response.json() print(f"Successfully retrieved {len(assets_data)} assets.") # Process assets_data as needed except requests.exceptions.RequestException as e: print(f"Error fetching assets: {e}") else: print("\nNo asset IDs found in vulnerability instances.") # --- Step 5: Get associated vulnerabilities --- if all_vulnerability_ids: print(f"\nFetching vulnerabilities for {len(all_vulnerability_ids)} unique vulnerability IDs...") try: vulnerability_ids_str = ",".join(map(str, all_vulnerability_ids)) vulnerabilities_url = f"{vulnerabilities_url_template}?vulnerability_ids={vulnerability_ids_str}" response = requests.get(vulnerabilities_url, headers=headers) response.raise_for_status() vulnerabilities_data = response.json() print(f"Successfully retrieved {len(vulnerabilities_data)} vulnerabilities.") # Process vulnerabilities_data as needed except requests.exceptions.RequestException as e: print(f"Error fetching vulnerabilities: {e}") else: print("\nNo vulnerability IDs found in vulnerability instances.") elif status in ["error", "timeout"]: print(f"\nSearch task failed with status: {status}") else: print(f"\nSearch task ended with unexpected status: {status}") ``` -------------------------------- ### Manage Bonded Network Interfaces Source: https://github.com/ibm/api-samples/blob/master/servers/readme.md Demonstrates creating bonded interfaces from existing ethernet interfaces, updating their settings (IP, netmask, slave interfaces), and unbonding them to revert to individual interfaces. Unbonded interfaces enter a monitor state. ```python import requests base_url = "https:///api" auth_token = "YOUR_AUTH_TOKEN" headers = { "SEC": auth_token } host_id = "some_host_id" # Example: Create a bonded interface named bond0 from eth1 and eth2 # create_data = { # "bond_name": "bond0", # "ethernet_interfaces": ["eth1", "eth2"] # } # response_create = requests.post(f"{base_url}/system/servers/{host_id}/network_interfaces/bonded", headers=headers, json=create_data) # print(f"Bonded interface creation status: {response_create.status_code}") # Example: Update settings of an existing bonded interface 'bond0' # update_data = { # "ip_address": "192.168.1.150", # "netmask": "255.255.255.0", # "ethernet_interfaces": ["eth1", "eth2", "eth3"] # Adding eth3 as a slave # } # response_update = requests.put(f"{base_url}/system/servers/{host_id}/network_interfaces/bonded/bond0", headers=headers, json=update_data) # print(f"Bonded interface update status: {response_update.status_code}") # Example: Unbond an interface 'bond0' # response_unbond = requests.delete(f"{base_url}/system/servers/{host_id}/network_interfaces/bonded/bond0", headers=headers) # print(f"Bonded interface unbond status: {response_unbond.status_code}") ``` -------------------------------- ### QVM API Documentation Reference Source: https://github.com/ibm/api-samples/blob/master/qvm/readme.md Provides access to the QVM API documentation, listing available endpoints and their parameters. This is a reference point for understanding API capabilities. ```APIDOC API Documentation Reference: REST API Interactive Help Page: URL: https:///api_doc Description: Provides interactive documentation for all available API endpoints and their parameters. Capabilities Endpoint: URL: /api/help/capabilities Method: GET Description: Retrieves a list of available endpoints through the API itself. Parameters: None Returns: A JSON object listing available API capabilities. QVM API Endpoints: Requires user or authorized token with 'Assets' and 'Vulnerability Management' privileges. /qvm/saved_searches: Method: GET Description: Retrieves QVM saved search data. Parameters: None Returns: A list of saved searches, each with an id and name. /qvm/saved_searches/{search_id}: Method: GET Description: Retrieves a specific QVM saved search by its ID. Parameters: search_id (string): The unique identifier of the saved search. Returns: Details of the specified saved search. /qvm/vulnerability_instances/search: Method: POST Description: Creates a vulnerability instances search using a QVM saved search. Parameters: search_id (string): The ID of the QVM saved search to use. Returns: task_id (string): The ID of the asynchronous search task. /qvm/vulnerability_instances/search/status/{task_id}: Method: GET Description: Checks the current status of a vulnerability instance search task. Parameters: task_id (string): The ID of the search task. Returns: status (string): The current status of the search (e.g., 'running', 'completed', 'error'). /qvm/vulnerability_instances: Method: GET Description: Retrieves vulnerability instances results from a completed search. Parameters: search_id (string): The ID of the completed search task. Range (string): For pagination, e.g., 'items=0-9' for the first 10 items. Returns: A list of vulnerability instances matching the search criteria. /qvm/assets: Method: GET Description: Retrieves asset information, potentially filtered by asset IDs. Parameters: asset_ids (string): A comma-separated list of asset IDs to filter by. Returns: A list of assets matching the provided IDs. /qvm/vulnerabilities: Method: GET Description: Retrieves vulnerability information, potentially filtered by vulnerability IDs. Parameters: vulnerability_ids (string): A comma-separated list of vulnerability IDs to filter by. Returns: A list of vulnerabilities matching the provided IDs. ``` -------------------------------- ### Get Offense Addresses Source: https://github.com/ibm/api-samples/blob/master/siem/readme.md Retrieves all source and local destination IP addresses for a given offense. It first fetches offense details using GET siem/offenses, then uses the offense ID to query siem/source_addresses and siem/local_destination_addresses endpoints. ```python import json # Assuming 'ibm_api_client' is a pre-configured client object # from ibm_platform_services.siem_v1 import * # Example import # Placeholder for the actual API client initialization class MockSIEMClient: def get_offenses(self, **kwargs): print(f"MockSIEMClient.get_offenses called with: {kwargs}") # Simulate API response for a specific offense ID if 'filter' in kwargs and 'id = 123' in kwargs['filter']: return { 'success': True, 'data': [ {'id': 123, 'name': 'Sample Offense'} ] } return {'success': False, 'data': []} def get_source_addresses(self, **kwargs): print(f"MockSIEMClient.get_source_addresses called with: {kwargs}") # Simulate API response if 'filter' in kwargs and 'offense_id = 123' in kwargs['filter']: return { 'success': True, 'data': [ {'source_address': '192.168.1.10'}, {'source_address': '10.0.0.5'} ] } return {'success': False, 'data': []} def get_local_destination_addresses(self, **kwargs): print(f"MockSIEMClient.get_local_destination_addresses called with: {kwargs}") # Simulate API response if 'filter' in kwargs and 'offense_id = 123' in kwargs['filter']: return { 'success': True, 'data': [ {'local_destination_address': '172.16.0.1'}, {'local_destination_address': '192.168.1.1'} ] } return {'success': False, 'data': []} siem_client = MockSIEMClient() # Replace with actual client initialization def get_offense_addresses(offense_id): """Gets source and local destination addresses for a given offense ID. Args: offense_id (int): The ID of the offense. """ # First, verify the offense exists (optional but good practice) offense_response = siem_client.get_offenses(filter=f"id = {offense_id}", fields='id,name') if not offense_response or not offense_response['success'] or not offense_response['data']: print(f"Error: Offense with ID {offense_id} not found.") return offense_name = offense_response['data'][0]['name'] print(f"Fetching addresses for offense: {offense_name} (ID: {offense_id})") # Get source addresses source_addresses_response = siem_client.get_source_addresses(filter=f"offense_id = {offense_id}", fields='source_address') source_addresses = [] if source_addresses_response and source_addresses_response['success']: source_addresses = [addr['source_address'] for addr in source_addresses_response['data']] print(f"Source Addresses: {source_addresses}") # Get local destination addresses local_dest_addresses_response = siem_client.get_local_destination_addresses(filter=f"offense_id = {offense_id}", fields='local_destination_address') local_dest_addresses = [] if local_dest_addresses_response and local_dest_addresses_response['success']: local_dest_addresses = [addr['local_destination_address'] for addr in local_dest_addresses_response['data']] print(f"Local Destination Addresses: {local_dest_addresses}") # Example usage: # get_offense_addresses(123) ``` -------------------------------- ### API Client Arguments Source: https://github.com/ibm/api-samples/blob/master/apiclient.md Details various command-line arguments available for the apiclient.py script, including response format, version selection, and range for paging. ```APIDOC --api /api_name/endpoint - The path to your api endpoint. It is the part of the url that you would append to `http:///restapi/api`. --method METHOD - Determines whether your api request will be a GET, POST, or DELETE. --params ="" - Used for query or body parameters. Syntax for multiple parameters: --params param1="value1" param2="value2". --content_type TYPE or --request_format TYPE - Specifies the content type of the body being sent. Required for body parameters. --response_format RESPONSE_FORMAT - Sets the `Accept` header of the request object, determining the Content-type of the response object. Default is `application/json`. --version VERSION - Specifies the version of the endpoint to call. Rounds down to the closest available version. --range RANGE - Specifies the range of items to return if an endpoint supports paging (e.g., "x-y"). ``` -------------------------------- ### Get QID Records Source: https://github.com/ibm/api-samples/blob/master/data_classification/readme.md Demonstrates how to retrieve a list of QID records using the data classification API. ```python import requests hostname = "" api_key = "" url = f"https://{hostname}/api/qid_records" headers = { "Authorization": f"Bearer {api_key}" } response = requests.get(url, headers=headers) if response.status_code == 200: records = response.json() print("QID Records:") for record in records: print(f"- {record['name']} (ID: {record['id']})") else: print(f"Error: {response.status_code}") print(response.text) ``` -------------------------------- ### Retrieve Server Hosts and General Settings Source: https://github.com/ibm/api-samples/blob/master/servers/readme.md Demonstrates how to retrieve a list of server hosts in a QRadar deployment and access their general settings. Currently, only updating the email server address is supported via the API. ```python import requests # Assuming you have a base URL and authentication token base_url = "https:///api" auth_token = "YOUR_AUTH_TOKEN" headers = { "SEC": auth_token } # Retrieve a list of server hosts response_hosts = requests.get(f"{base_url}/system/servers", headers=headers) server_hosts = response_hosts.json() # Iterate through hosts and retrieve general settings (example for email server) for host in server_hosts: host_id = host['id'] response_settings = requests.get(f"{base_url}/system/servers/{host_id}/", headers=headers) settings = response_settings.json() print(f"Host: {host['hostname']}, Email Server: {settings.get('email_server_address')}") # Example of updating email server address (replace with actual data) # update_data = { # "email_server_address": "new.email.server.com" # } # requests.put(f"{base_url}/system/servers/{host_id}/", headers=headers, json=update_data) ``` -------------------------------- ### Get Single QID Record Source: https://github.com/ibm/api-samples/blob/master/data_classification/readme.md Demonstrates how to retrieve a single QID record by its ID using the data classification API. ```python import requests hostname = "" api_key = "" record_id = "" url = f"https://{hostname}/api/qid_records/{record_id}" headers = { "Authorization": f"Bearer {api_key}" } response = requests.get(url, headers=headers) if response.status_code == 200: record = response.json() print("QID Record Details:") print(f"ID: {record['id']}") print(f"Name: {record['name']}") else: print(f"Error: {response.status_code}") print(response.text) ``` -------------------------------- ### Post/Update Custom Actions and Scripts Source: https://github.com/ibm/api-samples/blob/master/custom_actions/readme.md Demonstrates posting and updating custom action scripts and custom actions to the system. WARNING: This script modifies the QRadar system and should not be run against a production environment. ```python import json import requests hostname = "" auth_token = "" # --- Posting a new custom action script --- script_name = "my_new_script.py" script_content = "print('Hello from custom action script!')" interpreter_name = "python3" url_post_script = f"https://{hostname}/api/analytics/custom_actions/scripts" headers = { "SEC": auth_token, "Content-Type": "application/json" } script_payload = { "name": script_name, "content": script_content, "interpreter": interpreter_name } response_post_script = requests.post(url_post_script, headers=headers, data=json.dumps(script_payload), verify=False) if response_post_script.status_code == 201: print(f"Successfully posted script: {script_name}") else: print(f"Error posting script: {response_post_script.status_code}") print(response_post_script.text) # --- Updating an existing custom action script (example: changing content) --- updated_script_content = "print('Updated content for custom action script!')" url_update_script = f"https://{hostname}/api/analytics/custom_actions/scripts/{script_name}" update_script_payload = { "content": updated_script_content } response_update_script = requests.put(url_update_script, headers=headers, data=json.dumps(update_script_payload), verify=False) if response_update_script.status_code == 200: print(f"Successfully updated script: {script_name}") else: print(f"Error updating script: {response_update_script.status_code}") print(response_update_script.text) # --- Posting a new custom action --- action_name = "run_my_script" action_description = "Runs my new custom action script." action_script_name = script_name # Use the script posted above action_parameters = [{"name": "input_param", "type": "string", "required": False, "default": "default_value"}] url_post_action = f"https://{hostname}/api/analytics/custom_actions" action_payload = { "name": action_name, "description": action_description, "script_name": action_script_name, "parameters": action_parameters } response_post_action = requests.post(url_post_action, headers=headers, data=json.dumps(action_payload), verify=False) if response_post_action.status_code == 201: print(f"Successfully posted action: {action_name}") else: print(f"Error posting action: {response_post_action.status_code}") print(response_post_action.text) # --- Updating an existing custom action (example: changing description) --- updated_action_description = "Updated description for my custom action." url_update_action = f"https://{hostname}/api/analytics/custom_actions/{action_name}" update_action_payload = { "description": updated_action_description } response_update_action = requests.put(url_update_action, headers=headers, data=json.dumps(update_action_payload), verify=False) if response_update_action.status_code == 200: print(f"Successfully updated action: {action_name}") else: print(f"Error updating action: {response_update_action.status_code}") print(response_update_action.text) ```