### Quick Start: Setup Environment and Install contentctl Source: https://github.com/splunk/security_content/wiki/2.-Installation-and-Usage A comprehensive quick start guide to set up the development environment. It includes cloning the repository, creating a virtual environment, activating it, and installing the contentctl package. ```shell git clone https://github.com/splunk/security_content.git cd security_content python3.11 -m venv .venv source .venv/bin/activate pip install contentctl ``` -------------------------------- ### Setup Splunk Security Content Environment Source: https://github.com/splunk/security_content/blob/develop/README.md Clones the Splunk Security Content repository, sets up a Python virtual environment, and installs the necessary contentctl package for managing detections. ```shell git clone https://github.com/splunk/security_content.git cd security_content python3.11 -m venv .venv source .venv/bin/activate pip install contentctl ``` -------------------------------- ### Clone Splunk Security Content Repository Source: https://github.com/splunk/security_content/wiki/2.-Installation-and-Usage Clones the official Splunk Security Content repository from GitHub. This is the first step to get the project files and start working with the content. ```shell git clone https://github.com/splunk/security_content.git ``` -------------------------------- ### Python Builder Pattern: Security Content Director Example Source: https://github.com/splunk/security_content/wiki/3.1-‐-Security-Content-Code Illustrates the Builder design pattern in Python, showing how a `security_content_director` constructs complex security content objects. The example demonstrates a director orchestrating the building process by calling various methods on a `DetectionBuilder` instance to set object properties and add components. ```Python def constructDetection(self, builder: DetectionBuilder, path: str, deployments: list, playbooks: list, baselines: list, tests: list, attack_enrichment: dict, macros: list, lookups: list) -> None: builder.reset() builder.setObject(os.path.join(os.path.dirname(__file__), path)) builder.addDeployment(deployments) builder.addRBA() builder.addNesFields() builder.addAnnotations() builder.addMappings() builder.addBaseline(baselines) builder.addPlaybook(playbooks) builder.addUnitTest(tests) builder.addMitreAttackEnrichment(attack_enrichment) builder.addMacros(macros) builder.addLookups(lookups) ``` -------------------------------- ### Clone and Install Splunk Security Content Source: https://github.com/splunk/security_content/blob/develop/README.md Steps to clone the Splunk Security Content repository and install the contentctl tool, a prerequisite for managing the content. This involves cloning the repository, navigating into the directory, and installing contentctl via pip. ```shell git clone https://github.com/splunk/security_content.git cd security_content pip install contentctl ``` -------------------------------- ### Install contentctl Source: https://github.com/splunk/security_content/wiki/2.-Installation-and-Usage Installs the contentctl Python package using pip. contentctl is a prerequisite tool for validating, building, and testing Splunk security content. ```shell pip install contentctl ``` -------------------------------- ### Commit and Push Changes Source: https://github.com/splunk/security_content/wiki/4.1-‐-Contributing-to-the-Project Steps to commit your changes with a descriptive message and push them to your forked repository. This prepares your work for a pull request. ```bash cd security-content git commit -m "" git push ``` -------------------------------- ### Splunk MLTK Stage Command Example (Splunk SPL) Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb An example of a Splunk Search Processing Language (SPL) query using the MLTK's `fit` command with `mode=stage`. This command generates sample data and metadata, piping it into a specified application context ('app:barebone_template') for model development. ```splunk | makeresults count=10\ | streamstats c as i \ | eval s = i%3 \ | eval feature_{s}=0 \ | foreach feature_* [eval <>=random()/pow(2,31)]\ | fit MLTKContainer mode=stage algo=barebone_template _time feature_* i into app:barebone_template\ ``` -------------------------------- ### Run Pytest for Security Content Source: https://github.com/splunk/security_content/wiki/3.1-‐-Security-Content-Code Execute pytest tests for the security content project. This command sets the Python path and runs the pytest suite, which tests Python files starting with 'test_' and methods starting with 'test_'. ```bash export PYTHONPATH="/path/to/security_content/" pytest -s bin/contentctl_project ``` -------------------------------- ### Clone Repository and Create Branch Source: https://github.com/splunk/security_content/wiki/4.1-‐-Contributing-to-the-Project Instructions for forking the Splunk security_content repository, cloning it locally, and creating a new branch from the 'develop' branch to begin work. This is a standard Git workflow for contributing to projects. ```bash git clone git@github.com:YOUR_GITHUB_USERNAME/security_content.git cd security-content # This project uses 'develop' for all development activity, so create your branch off that git checkout -b your-bugfix-branch-name develop ``` -------------------------------- ### Get Model Summary and Versions Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb Returns a dictionary containing version information for NumPy and Pandas. This function is intended to provide a summary of the environment or model dependencies. ```Python import numpy as np import pandas as pd def summary(model=None): returns = {"version": {"numpy": np.__version__, "pandas": pd.__version__} } return returns ``` -------------------------------- ### Example Usage: Load Model Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_dns_data_exfiltration_using_pretrained_model_in_dsdl.ipynb Demonstrates how to call the `load` function to retrieve a pre-trained model. This is typically used to load a model before making predictions. ```python load("") ``` -------------------------------- ### Print Library Versions (Python) Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb A utility cell for development or testing purposes to print the installed versions of numpy and pandas. This helps in verifying the environment setup. ```python # THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes print("numpy version: " + np.__version__) print("pandas version: " + pd.__version__) ``` -------------------------------- ### Test Library Versions Source: https://github.com/splunk/security_content/blob/develop/notebooks/pretrained_dga_model_dsdl.ipynb A utility cell to print the installed versions of key libraries like NumPy, Pandas, and TensorFlow. This is useful for debugging and ensuring compatibility. ```python # THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes print("numpy version: " + np.__version__) print("pandas version: " + pd.__version__) print("tensorflow version: " + tf.__version__) ``` -------------------------------- ### Apply Trained Model Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb Shows how to apply a previously trained model (e.g., 'barebone_model') to new data. It prepares data similarly to the `fit` command and uses the `apply` command to get predictions or results. ```SPL | makeresults count=10 | streamstats c as i | eval s = i%3 | eval feature_{s}=0 | foreach feature_* [eval <>=random()/pow(2,31)] | apply barebone_model as the_meaning_of_life ``` -------------------------------- ### Print Library Versions Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb A utility cell to print the installed versions of key Python libraries such as NumPy, Pandas, and PyTorch. This is useful for verifying the environment and ensuring compatibility. ```Python # THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes print("numpy version: " + np.__version__) print("pandas version: " + pd.__version__) print("torch version: " + torch.__version__) ``` -------------------------------- ### Initialize SplunkHEC Client Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb Demonstrates how to import and instantiate the SplunkHEC client from the dsdlsupport library. This is the initial step required before sending any data to Splunk's HTTP Event Collector. ```python from dsdlsupport import SplunkHEC as SplunkHEC hec = SplunkHEC.SplunkHEC() ``` -------------------------------- ### Create a New Detection Source: https://github.com/splunk/security_content/wiki/2.-Installation-and-Usage Initiates the creation of a new detection file using the contentctl CLI. The tool will prompt the user to answer questions to populate the detection.yml file. ```shell contentctl new ``` -------------------------------- ### Initialize and Load Model (Python) Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb Initializes the ProcessnameClassifier model and loads pre-trained weights from a specified directory. The model is set to evaluation mode, preparing it for inference. ```python # Assuming df and param are defined elsewhere def init(df, param): model = ProcessnameClassifier(n_letters, n_hidden, n_categories) model.load_state_dict(torch.load(MODEL_DIRECTORY, map_location=torch.device('cpu'))) model.eval() return model ``` -------------------------------- ### Navigate to Security Content Directory Source: https://github.com/splunk/security_content/wiki/2.-Installation-and-Usage Changes the current directory to the root of the cloned Splunk Security Content repository. This is necessary to run subsequent commands within the project context. ```shell cd security_content ``` -------------------------------- ### Model Initialization Function Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb Initializes and prepares the DNSTxtClassifier model for inference. It loads pre-trained weights from a specified file path and sets the model to evaluation mode. The model is moved to the appropriate device (CPU or GPU). ```python # Assuming device, MODEL_DIRECTORY, vocab_size, embedding_dim, hidden_size, fc_hidden_size, dropout are defined def init(df,param): # Placeholder for actual model parameters if they are not global # For example, if vocab_size, embedding_dim etc. are derived from df or param # For this example, we assume they are available in the scope. vocab_size = 10000 # Example value embedding_dim = 128 # Example value hidden_size = 256 # Example value fc_hidden_size = 128 # Example value dropout = 0.5 # Example value model = DNSTxtClassifier(vocab_size, embedding_dim, hidden_size,fc_hidden_size,dropout = dropout) # Ensure the model path is correct and the file exists model_path = MODEL_DIRECTORY+'/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.pt' model.load_state_dict(torch.load(model_path,map_location=torch.device('cpu'))) model = model.to(device) model.eval() return model ``` -------------------------------- ### Initialize SplunkSearch Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb Creates an instance of the `SplunkSearch` class. This object will be used to execute Splunk queries and manage the retrieved data. ```Python search = SplunkSearch.SplunkSearch() ``` -------------------------------- ### Send JSON Object to Splunk Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb An example demonstrating how to send a JSON object to Splunk, suitable for logging structured data or event details. It includes adding a timestamp to the event payload. ```python # example to send a JSON object, e.g. to log some data from datetime import datetime response = hec.send({'event': {'message': 'operation done', 'log_level': 'INFO' }, 'time': datetime.now().timestamp()}) ``` -------------------------------- ### Test Model Initialization Source: https://github.com/splunk/security_content/blob/develop/notebooks/pretrained_dga_model_dsdl.ipynb Calls the `init` function to load the deep learning model and prints the model object to verify successful initialization. This cell is for testing purposes. ```python # THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes print(init(df,param)) ``` -------------------------------- ### Send JSON Object to Splunk HEC Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb Sends a JSON object to Splunk via the HEC, typically for logging or profiling purposes. The example includes event data and a timestamp, demonstrating structured data transmission. ```Python from datetime import datetime response = hec.send({'event': {'message': 'operation done', 'log_level': 'INFO' }, 'time': datetime.now().timestamp()}) ``` -------------------------------- ### Write Security Content Objects to .conf Files (Python) Source: https://github.com/splunk/security_content/wiki/3.1-‐-Security-Content-Code The `ObjToConfAdapter` class implements the Adapter pattern to write security content objects into Splunk configuration files. It handles writing headers for various Splunk configuration files and then writes specific object types (detections, stories, investigations, etc.) using Jinja2 templates. ```Python import os import glob import shutil from bin.contentctl_project.contentctl_core.application.adapter.adapter import Adapter from bin.contentctl_project.contentctl_infrastructure.adapter.conf_writer import ConfWriter from bin.contentctl_project.contentctl_core.domain.entities.enums.enums import SecurityContentType class ObjToConfAdapter(Adapter): def writeHeaders(self, output_folder: str) -> None: ConfWriter.writeConfFileHeader(os.path.join(output_folder, 'default/analyticstories.conf')) ConfWriter.writeConfFileHeader(os.path.join(output_folder, 'default/savedsearches.conf')) ConfWriter.writeConfFileHeader(os.path.join(output_folder, 'default/collections.conf')) ConfWriter.writeConfFileHeader(os.path.join(output_folder, 'default/es_investigations.conf')) ConfWriter.writeConfFileHeader(os.path.join(output_folder, 'default/macros.conf')) ConfWriter.writeConfFileHeader(os.path.join(output_folder, 'default/transforms.conf')) ConfWriter.writeConfFileHeader(os.path.join(output_folder, 'default/workflow_actions.conf')) def writeObjects(self, objects: list, output_path: str, type: SecurityContentType = None) -> None: if type == SecurityContentType.detections: ConfWriter.writeConfFile('savedsearches_detections.j2', os.path.join(output_path, 'default/savedsearches.conf'), objects) ConfWriter.writeConfFile('analyticstories_detections.j2', os.path.join(output_path, 'default/analyticstories.conf'), objects) ConfWriter.writeConfFile('macros_detections.j2', os.path.join(output_path, 'default/macros.conf'), objects) elif type == SecurityContentType.stories: ConfWriter.writeConfFile('analyticstories_stories.j2', os.path.join(output_path, 'default/analyticstories.conf'), objects) elif type == SecurityContentType.baselines: ConfWriter.writeConfFile('savedsearches_baselines.j2', os.path.join(output_path, 'default/savedsearches.conf'), objects) elif type == SecurityContentType.investigations: ConfWriter.writeConfFile('savedsearches_investigations.j2', os.path.join(output_path, 'default/savedsearches.conf'), objects) ConfWriter.writeConfFile('analyticstories_investigations.j2', os.path.join(output_path, 'default/analyticstories.conf'), objects) workbench_panels = [] for investigation in objects: if investigation.inputs: response_file_name_xml = investigation.lowercase_name + "___response_task.xml" workbench_panels.append(investigation) investigation.search = investigation.search.replace(">",">") investigation.search = investigation.search.replace("<","<") ConfWriter.writeConfFileHeader(os.path.join(output_path, 'default/data/ui/panels/', str("workbench_panel_" + response_file_name_xml))) ConfWriter.writeConfFile('panel.j2', os.path.join(output_path, 'default/data/ui/panels/', str("workbench_panel_" + response_file_name_xml)), [investigation.search]) ConfWriter.writeConfFile('es_investigations_investigations.j2', os.path.join(output_path, 'default/es_investigations.conf'), workbench_panels) ConfWriter.writeConfFile('workflow_actions.j2', os.path.join(output_path, 'default/workflow_actions.conf'), workbench_panels) elif type == SecurityContentType.lookups: ConfWriter.writeConfFile('collections.j2', os.path.join(output_path, 'default/collections.conf'), objects) ConfWriter.writeConfFile('transforms.j2', os.path.join(output_path, 'default/transforms.conf'), objects) files = glob.iglob(os.path.join(os.path.dirname(__file__), '../../../..' , 'lookups', '*.csv')) for file in files: if os.path.isfile(file): shutil.copy(file, os.path.join(output_path, 'lookups')) elif type == SecurityContentType.macros: ConfWriter.writeConfFile('macros.j2', ``` -------------------------------- ### Test Data Staging Source: https://github.com/splunk/security_content/blob/develop/notebooks/pretrained_dga_model_dsdl.ipynb Executes the `stage` function to load a sample dataset and its parameters, then prints descriptive statistics of the data and the loaded parameters. This cell is for testing purposes. ```python # THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes df, param = stage("pretrained_dga_model_dsdl") print(df.describe()) print(param) ``` -------------------------------- ### Send 10 Hello World Events Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb Shows how to send a predefined number of "hello world" events to Splunk using the `send_hello_world` method. This is useful for testing connectivity and basic data transmission to the HEC. ```python # example to send 10 hello world events response = hec.send_hello_world(10) ``` -------------------------------- ### Create New MLTK Detection Content Source: https://github.com/splunk/security_content/wiki/6.2-‐-How-to-add-ML-model-files-to-ESCU Uses the `contentctl.py` tool to generate new detection content templates for MLTK. This command specifies the project path and the type of content to create. ```python python contentctl.py -p . new_content -t detection ``` -------------------------------- ### Import Libraries and Define Constants Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb Imports essential Python libraries for deep learning (PyTorch, NumPy, Pandas), text vectorization (Scikit-learn), and data handling. It also defines global constants for model parameters, vocabulary size, embedding dimensions, and directory paths. ```Python # this definition exposes all python module imports that should be available in all subsequent commands import numpy as np import pandas as pd import os import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.utils.data.sampler import SubsetRandomSampler from torch.utils.data import Dataset from torch.utils.data import DataLoader from sklearn.feature_extraction.text import CountVectorizer from torch.autograd import Variable from torch.optim import lr_scheduler from collections import Counter,OrderedDict import pickle # global constants MODEL_DIRECTORY = "/srv/app/model/data/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl/" vocab_size = 10002 embedding_dim = 64 hidden_size = 64 fc_hidden_size = 64 num_output_nodes = 1 dropout = 0.5 ``` -------------------------------- ### Load Model from File Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_dns_data_exfiltration_using_pretrained_model_in_dsdl.ipynb Loads a pre-trained model from a file. It initializes a DNSExfiltration model and loads its state dictionary from a specified PyTorch file, ensuring the model is configured for CPU evaluation. ```python def load(name): model = DNSExfiltration(98) model.load_state_dict(torch.load(MODEL_DIRECTORY+'detect_dns_data_exfiltration_using_pretrained_model_in_dsdl.pt',map_location=torch.device('cpu'))) model = model.to('cpu') model.eval() return model ``` -------------------------------- ### Import SplunkSearch Library Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb Imports the custom `SplunkSearch` library, which is designed to facilitate interaction with Splunk for data retrieval and analysis within the notebook environment. ```Python import libs.SplunkSearch as SplunkSearch ``` -------------------------------- ### Model Summary (Python) Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb Returns a summary of the model, including versions of key libraries like NumPy and Pandas. This is useful for tracking dependencies and environment details. ```python # return a model summary def summary(model=None): returns = {"version": {"numpy": np.__version__, "pandas": pd.__version__} } return returns ``` -------------------------------- ### Fit Model (Python) Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb Placeholder function for training the model. It simulates a training process and returns a simple info object. In a real scenario, this would contain the actual training loop. ```python # train your model # returns a fit info json object and may modify the model object def fit(model,df,param): # model.fit() # Placeholder for actual training info = {"message": "model trained"} return info ``` -------------------------------- ### Python Factory Pattern for Splunk Security Content Processing Source: https://github.com/splunk/security_content/wiki/3.1-‐-Security-Content-Code This Python code demonstrates the factory pattern used in Splunk's security content implementation. It iterates through different security content types, using a director and builder to construct objects like lookups, macros, and detections. It handles file discovery and object creation based on content type. ```python import os # Assuming SecurityContentType, Utils, and builder objects are defined elsewhere # from contentctl.contentctl_core.application.factory.security_content_type import SecurityContentType # from contentctl.contentctl_core.utils.utils import Utils class Factory: # ... (other methods) def createSecurityContent(self, type: SecurityContentType) -> list: objects = [] if type == SecurityContentType.deployments: files = Utils.get_all_yml_files_from_directory(os.path.join(self.input_dto.input_path, str(type.name), 'ESCU')) elif type == SecurityContentType.unit_tests: files = Utils.get_all_yml_files_from_directory(os.path.join(self.input_dto.input_path, 'tests')) else: files = Utils.get_all_yml_files_from_directory(os.path.join(self.input_dto.input_path, str(type.name))) for file in files: if not 'ssa__' in file: if type == SecurityContentType.lookups: self.input_dto.director.constructLookup(self.input_dto.basic_builder, file) self.output_dto.lookups.append(self.input_dto.basic_builder.getObject()) elif type == SecurityContentType.macros: self.input_dto.director.constructMacro(self.input_dto.basic_builder, file) self.output_dto.macros.append(self.input_dto.basic_builder.getObject()) elif type == SecurityContentType.deployments: self.input_dto.director.constructDeployment(self.input_dto.basic_builder, file) self.output_dto.deployments.append(self.input_dto.basic_builder.getObject()) elif type == SecurityContentType.playbooks: self.input_dto.director.constructPlaybook(self.input_dto.playbook_builder, file, self.output_dto.detections) self.output_dto.playbooks.append(self.input_dto.playbook_builder.getObject()) elif type == SecurityContentType.baselines: self.input_dto.director.constructBaseline(self.input_dto.baseline_builder, file, self.output_dto.deployments) baseline = self.input_dto.baseline_builder.getObject() self.output_dto.baselines.append(baseline) elif type == SecurityContentType.investigations: self.input_dto.director.constructInvestigation(self.input_dto.investigation_builder, file) investigation = self.input_dto.investigation_builder.getObject() self.output_dto.investigations.append(investigation) elif type == SecurityContentType.stories: self.input_dto.director.constructStory(self.input_dto.story_builder, file, self.output_dto.detections, self.output_dto.baselines, self.output_dto.investigations) story = self.input_dto.story_builder.getObject() self.output_dto.stories.append(story) elif type == SecurityContentType.detections: self.input_dto.director.constructDetection(self.input_dto.detection_builder, file, self.output_dto.deployments, self.output_dto.playbooks, self.output_dto.baselines, self.output_dto.tests, self.input_dto.attack_enrichment, self.output_dto.macros, self.output_dto.lookups) detection = self.input_dto.detection_builder.getObject() self.output_dto.detections.append(detection) elif type == SecurityContentType.unit_tests: self.input_dto.director.constructTest(self.input_dto.basic_builder, file) test = self.input_dto.basic_builder.getObject() self.output_dto.tests.append(test) return objects ``` -------------------------------- ### Initialize Deep Learning Model Source: https://github.com/splunk/security_content/blob/develop/notebooks/pretrained_dga_model_dsdl.ipynb Defines the `init` function which loads a pre-trained TensorFlow Keras model from a specified directory. This model is used for DGA domain detection. ```python # initialize your model # available inputs: data and parameters # returns the model object which will be used as a reference to call fit, apply and summary subsequently def init(df,param): model = tf.keras.models.load_model(MODEL_DIRECTORY + "pretrained_dga_model_dsdl") return model ``` -------------------------------- ### Notable Event Configuration Source: https://github.com/splunk/security_content/wiki/5.3-‐-ESCU-‐-savedsearch.conf-spec Defines parameters for configuring notable events, including fields for the summary, a detailed rule description, a concise title, the security domain, and the severity level. ```APIDOC action.notable.param.nes_fields - Specifies the fields to be included in the notable event summary. action.notable.param.rule_description - Provides a detailed description of the analytic rationale behind the search, identifying failed MFA challenge attempts in Okta authentication processes. action.notable.param.rule_title - Sets a concise title for the notable event generated by this search. action.notable.param.security_domain - Associates the search with a security domain, in this case, identity. action.notable.param.severity - Defines the severity of the notable event, marked as high. ``` -------------------------------- ### Initialize SplunkSearch for Data Retrieval (Python) Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb Initializes the SplunkSearch class from the dsdlsupport library. This object is used to interact with Splunk and retrieve data for analysis. ```python from dsdlsupport import SplunkSearch as SplunkSearch search = SplunkSearch.SplunkSearch() ``` -------------------------------- ### Load Model from Name Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb Loads a pre-trained model from a specified directory using PyTorch. It initializes a DNSTxtClassifier and loads its state dictionary. The model is moved to the CPU and set to evaluation mode. ```Python import torch import numpy as np import pandas as pd # Assuming DNSTxtClassifier, MODEL_DIRECTORY, device, vocab_size, embedding_dim, hidden_size, fc_hidden_size, dropout are defined elsewhere def load(name): model = DNSTxtClassifier(vocab_size, embedding_dim, hidden_size,fc_hidden_size,dropout = dropout) model.load_state_dict(torch.load(MODEL_DIRECTORY+'/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.pt',map_location=torch.device('cpu'))) model = model.to(device) model.eval() return model ``` -------------------------------- ### Model Fitting (Training) Function Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb A placeholder function for training the model. In a real scenario, this function would contain the training loop, loss calculation, backpropagation, and optimizer steps. It currently returns a mock info object. ```python # train your model # returns a fit info json object and may modify the model object def fit(model,df,param): # model.fit() # Placeholder for actual training logic info = {"message": "model trained"} return info ``` -------------------------------- ### Splunk Deployment Configuration YAML Source: https://github.com/splunk/security_content/wiki/4.2-‐-Customize-to-Your-Environment This YAML configuration defines scheduling and alert actions for Splunk detections. It allows customization of cron schedules, time ranges, and email notifications, linking to analytic stories via tags. ```yaml name: Schedule Credential Dumping Daily id: bc91a8cd-35e7-4bb2-6140-e756cc46f214 date: '2020-04-27' description: Schedule Credential Dumping Daily with Email notification to the SOC author: Jose Hernandez scheduling: cron_schedule: '0 0 * * *' earliest_time: -1d@d latest_time: -10m@m schedule_window: auto alert_action: email: message: Splunk Alert $name$ triggered %fields% subject: Splunk Alert $name$ to: soc@splunk.com tags: analytics_story: Credential Dumping ``` -------------------------------- ### Splunk MLTK Model Fitting and Application (APIDOC) Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb Demonstrates how to use Splunk's Machine Learning Toolkit (MLTK) to fit a model named 'barebone_model' and then apply it. These commands are executed within Splunk's search processing language. ```APIDOC ## Fit Model using MLTK | makeresults count=10 | streamstats c as i | eval s = i%3 | eval feature_{s}=0 | foreach feature_* [eval <>=random()/pow(2,31)] | fit MLTKContainer algo=barebone s from feature_* into app:barebone_model ## Apply Model using MLTK | makeresults count=10 | streamstats c as i | eval s = i%3 | eval feature_{s}=0 | foreach feature_* [eval <>=random()/pow(2,31)] | apply barebone_model as the_meaning_of_life ``` -------------------------------- ### Model Summary Information Source: https://github.com/splunk/security_content/blob/develop/notebooks/pretrained_dga_model_dsdl.ipynb Returns a dictionary containing version information for key libraries used in the model, such as NumPy and Pandas. This is useful for debugging and ensuring compatibility across different environments. ```python def summary(model=None): returns = {"version": {"numpy": np.__version__, "pandas": pd.__version__} } return returns ``` -------------------------------- ### Apply Model for DNS Data Exfiltration Detection Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_dns_data_exfiltration_using_pretrained_model_in_dsdl.ipynb Orchestrates the application of the DNS exfiltration model. It prepares the input data by calling `prepare_input_df`, performs predictions using the `predict` function, and merges the results back with the original data. ```python def apply(model,df,param): df.drop(['_time'], axis=1,inplace=True, errors='ignore') recent_df = prepare_input_df(df) input_df = recent_df.drop(['src' ,'query','rank','request_without_domain','tld'], axis=1) recent_df.drop(['request_without_domain','tld','len','entropy','size_avg','entropy_avg'], axis=1, inplace=True) recent_df.drop(range(0, 94),axis=1,inplace=True) input_tensor = torch.FloatTensor(input_df.values) dataloader = DataLoader(input_tensor, shuffle=True, batch_size=256) predict_is_exfiltration_proba, predict_is_exfiltration = predict(dataloader,model) recent_df['pred_is_dns_data_exfiltration_proba'] = predict_is_exfiltration_proba recent_df['pred_is_dns_data_exfiltration'] = predict_is_exfiltration print(recent_df.columns) print(df.columns) text_rows.clear() size_avg.clear() entropy_avg.clear() output = pd.merge(recent_df,df,on=['src','query','rank'],how='right') return output ``` -------------------------------- ### Load Staged Data Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb This cell demonstrates how to use the `stage` function to load a dataset named 'barebone_template' from local CSV and JSON files. It then displays the loaded DataFrame and its parameters. ```Python # THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes df, param = stage("barebone_template") ``` -------------------------------- ### Create New Detection Source: https://github.com/splunk/security_content/blob/develop/README.md Initiates the creation of a new detection YAML file by running the contentctl new command, which prompts the user for necessary details. ```shell contentctl new ``` -------------------------------- ### Text Tokenization and Preprocessing Utilities Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_dns_txt_records_using_pretrained_model_in_dsdl.ipynb Provides utilities for text analysis, including building a character n-gram analyzer, tokenizing text, mapping tokens to vocabulary indices, and cleaning/normalizing text strings. These functions are crucial for preparing input data for the classification model. ```python from sklearn.feature_extraction.text import CountVectorizer import pandas as pd import pickle import json import torch import torch.nn as nn # Assuming these variables are defined elsewhere in the context: # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # MODEL_DIRECTORY = "/path/to/models/" # --- Tokenization and Analyzer Setup --- v = CountVectorizer(analyzer='char_wb', ngram_range=(2, 4), lowercase=True) analyzer = v.build_analyzer() def tokenize(s): tokens = analyzer(s) try: tokens.remove(' ') except ValueError: pass tokens = list(map(str.strip, tokens)) return tokens # --- Text Preprocessing Helper Functions --- def remove_spaces_key_value(text): return text.replace(' = ','=').replace(" =","=").replace('= ','=').replace(' - ','-').replace(" -","-").replace('- ','-').replace(' : ',":").replace(': ' ,":").replace(' :',':') def preprocess_text(text): text = remove_spaces_key_value(text) text = text.replace('"','').replace("'","").lower() return text def get_token_index(text, vocab): idxs = [] tokens = tokenize(text) for token in tokens: if token not in vocab: idxs.append(vocab['']) else: idxs.append(vocab[token]) return idxs ``` -------------------------------- ### Display Staged Parameters (Python) Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb Displays the loaded parameters, which are expected to be in JSON format. This is used to inspect metadata or configuration associated with the staged data. ```python param ``` -------------------------------- ### Extract Model Artifacts Source: https://github.com/splunk/security_content/wiki/6.3-‐-How-to-deploy-pre-trained-Deep-Learning-models-for-ESCU This command extracts a pre-trained model artifact from a .tar.gz file into a specified directory within the Splunk DSDL container. It is executed within the Jupyter Lab terminal. ```shell tar -xf app/model/data/.tar.gz -C app/model/data// ``` -------------------------------- ### Model Loading Utility Source: https://github.com/splunk/security_content/blob/develop/notebooks/pretrained_dga_model_dsdl.ipynb Provides a function to load a previously saved model from a specified file path. This allows for the retrieval of trained model weights and architecture for immediate use in predictions. ```python def load(name): model = tf.keras.models.load_model(MODEL_DIRECTORY + "pretrained_dga_model_dsdl") return model ``` -------------------------------- ### Stage Data into Notebook Environment (Python) Source: https://github.com/splunk/security_content/blob/develop/notebooks/detect_suspicious_processnames_using_pretrained_model_in_dsdl.ipynb Defines a function to load data from CSV and JSON files located in a 'data/' directory. This function is intended for staging data into the notebook environment, typically after it has been exported from Splunk using the `mode=stage` flag. ```python # this cell is not executed from MLTK and should only be used for staging data into the notebook environment def stage(name): with open("data/"+name+".csv", 'r') as f: df = pd.read_csv(f) with open("data/"+name+".json", 'r') as f: param = json.load(f) return df, param ```