### Cassandra Schema Setup for Events and Blacklist Tables Source: https://context7.com/xmidt-org/codex-db/llms.txt This snippet provides the CQL commands to set up the Cassandra database schema for the Codex DB project. It includes creating the 'devices' keyspace, the 'events' table with TTL and specific clustering orders, and indexes for filtering and pagination. It also defines the 'blacklist' table. ```cql -- Create keyspace CREATE KEYSPACE IF NOT EXISTS devices; -- Create events table with TTL CREATE TABLE devices.events ( device_id varchar, record_type INT, birthdate BIGINT, deathdate BIGINT, data BLOB, nonce BLOB, alg VARCHAR, kid VARCHAR, row_id TIMEUUID, PRIMARY KEY (device_id, birthdate, record_type) ) WITH CLUSTERING ORDER BY (birthdate DESC, record_type ASC) AND default_time_to_live = 2768400 AND transactions = {'enabled': 'false'}; -- Index for filtering by record type CREATE INDEX search_by_record_type ON devices.events (device_id, record_type, birthdate) WITH CLUSTERING ORDER BY (record_type ASC, birthdate DESC) AND default_time_to_live = 2768400 AND transactions = {'enabled': 'false', 'consistency_level':'user_enforced'}; -- Index for pagination by row_id CREATE INDEX search_by_row_id ON devices.events (device_id, row_id) WITH CLUSTERING ORDER BY (row_id DESC) AND default_time_to_live = 2768400 AND transactions = {'enabled': 'false', 'consistency_level':'user_enforced'}; -- Blacklist table CREATE TABLE devices.blacklist ( device_id varchar PRIMARY KEY, reason varchar ); ``` -------------------------------- ### Implement Database Interfaces for Event Storage and Retrieval (Go) Source: https://context7.com/xmidt-org/codex-db/llms.txt Provides example implementations for core database interfaces: `Inserter`, `RecordGetter`, and `Pruner`. These interfaces define the contract for database operations such as inserting records, retrieving records by device ID (with optional filtering by type), and preparing records for deletion. ```Go package main import ( db "github.com/xmidt-org/codex-db" ) // Inserter interface - for adding records type myInserter struct{} func (m *myInserter) InsertRecords(records ...db.Record) error { // Implementation inserts one or more records for _, record := range records { // Store record in database } return nil } // RecordGetter interface - for querying records type myGetter struct{} func (m *myGetter) GetRecords(deviceID string, limit int, stateHash string) ([]db.Record, error) { // Return records for a device, optionally after a state hash return []db.Record{}, nil } func (m *myGetter) GetRecordsOfType(deviceID string, limit int, eventType db.EventType, stateHash string) ([]db.Record, error) { // Return records filtered by event type return []db.Record{}, nil } func (m *myGetter) GetStateHash(records []db.Record) (string, error) { // Return hash of the latest record for pagination return "uuid-hash", nil } // Pruner interface - for cleanup operations type myPruner struct{} func (m *myPruner) GetRecordsToDelete(shard int, limit int, deathDate int64) ([]db.RecordToDelete, error) { return []db.RecordToDelete{{DeathDate: deathDate, RecordID: 123}}, nil } func (m *myPruner) DeleteRecord(shard int, deathdate int64, recordID int64) error { return nil } ``` -------------------------------- ### Initialize and Use BatchDeleter in Go Source: https://context7.com/xmidt-org/codex-db/llms.txt Shows the setup for a BatchDeleter, which periodically identifies and removes expired records from the database. It requires a connection implementing the db.Pruner interface and specific timing/batch configurations. ```go package main import ( "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics/provider" "github.com/xmidt-org/codex-db/batchDeleter" "github.com/xmidt-org/codex-db/postgresql" ) func main() { var conn *postgresql.Connection config := batchDeleter.Config{ Shard: 0, MaxWorkers: 5, MaxBatchSize: 10, SetSize: 1000, DeleteWaitTime: 100 * time.Millisecond, GetLimit: 100, GetWaitTime: 5 * time.Second, } logger := log.NewNopLogger() metricsProvider := provider.NewDiscardProvider() deleter, err := batchDeleter.NewBatchDeleter( config, logger, metricsProvider, conn, ) if err != nil { panic("failed to create batch deleter: " + err.Error()) } deleter.Start() deleter.Stop() } ``` -------------------------------- ### Cassandra DB Schema Setup Source: https://github.com/xmidt-org/codex-db/blob/main/README.md Defines the Cassandra database schema for the codex project, including keyspaces, tables for events and blacklists, and associated indexes. This setup is crucial for data storage and retrieval within the codex ecosystem. ```cassandraql CREATE KEYSPACE IF NOT EXISTS devices; CREATE TABLE devices.events ( device_id varchar, record_type INT, birthdate BIGINT, deathdate BIGINT, data BLOB, nonce BLOB, alg VARCHAR, kid VARCHAR, row_id TIMEUUID, PRIMARY KEY (device_id, birthdate, record_type)) WITH CLUSTERING ORDER BY (birthdate DESC, record_type ASC) AND default_time_to_live = 2768400 AND transactions = {'enabled': 'false'}; CREATE INDEX search_by_record_type ON devices.events (device_id, record_type, birthdate) WITH CLUSTERING ORDER BY (record_type ASC, birthdate DESC) AND default_time_to_live = 2768400 AND transactions = {'enabled': 'false', 'consistency_level':'user_enforced'}; CREATE INDEX search_by_row_id ON devices.events (device_id, row_id) WITH CLUSTERING ORDER BY (row_id DESC) AND default_time_to_live = 2768400 AND transactions = {'enabled': 'false', 'consistency_level':'user_enforced'}; CREATE TABLE devices.blacklist (device_id varchar PRIMARY KEY, reason varchar); ``` -------------------------------- ### GET /devices Source: https://context7.com/xmidt-org/codex-db/llms.txt Retrieves a paginated list of device IDs that have reported events within a specific time range. ```APIDOC ## GET /devices ### Description Retrieves a paginated list of device IDs that have events within a specified time range. ### Method GET ### Endpoint /devices ### Parameters #### Query Parameters - **startDate** (string) - Required - ISO8601 timestamp for the start of the range - **endDate** (string) - Required - ISO8601 timestamp for the end of the range - **offset** (integer) - Optional - Pagination offset - **limit** (integer) - Optional - Number of records to return ### Response #### Success Response (200) - **devices** (array) - List of device IDs #### Response Example { "devices": ["mac:112233445566", "mac:aabbccddeeff"] } ``` -------------------------------- ### Manage Blacklist with SyncList and ListRefresher (Go) Source: https://context7.com/xmidt-org/codex-db/llms.txt Demonstrates how to manage a device blacklist using SyncList for manual updates and ListRefresher for automatic database synchronization. Supports exact matches and regex patterns. Requires go-kit logging and cassandra connection. ```Go package main import ( "time" "github.com/go-kit/kit/log" "github.com/xmidt-org/codex-db/blacklist" "github.com/xmidt-org/codex-db/cassandra" ) func main() { // Manual blacklist management with SyncList syncList := blacklist.NewEmptySyncList() // Update the blacklist with items items := []blacklist.BlackListedItem{ {ID: "mac:000000000000", Reason: "test device"}, {ID: "mac:aabbcc.*", Reason: "known bad batch"}, // Regex pattern {ID: "mac:112233445566", Reason: "compromised device"}, } syncList.UpdateList(items) // Check if a device is blacklisted if reason, ok := syncList.InList("mac:112233445566"); ok { // Device is blacklisted _ = reason // "compromised device" } // Regex matching also works if reason, ok := syncList.InList("mac:aabbcc112233"); ok { _ = reason // "known bad batch" } // Auto-refreshing blacklist from database var conn *cassandra.Connection // = cassandra.CreateDbConnection(...) refreshConfig := blacklist.RefresherConfig{ UpdateInterval: 1 * time.Minute, // Refresh every minute Logger: log.NewNopLogger(), } stopChan := make(chan struct{}) refreshingList := blacklist.NewListRefresher(refreshConfig, conn, stopChan) // Use the refreshing list - it automatically updates from database if reason, ok := refreshingList.InList("mac:112233445566"); ok { _ = reason } // Stop the refresher close(stopChan) } ``` -------------------------------- ### Initialize and Use BatchInserter in Go Source: https://context7.com/xmidt-org/codex-db/llms.txt Demonstrates how to configure and instantiate a BatchInserter to group database records, reducing round-trips. It covers setting worker counts, batch sizes, and the insertion loop for processing records. ```go package main import ( "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics/provider" db "github.com/xmidt-org/codex-db" "github.com/xmidt-org/codex-db/batchInserter" "github.com/xmidt-org/codex-db/cassandra" ) func main() { var conn *cassandra.Connection config := batchInserter.Config{ ParseWorkers: 2, MaxInsertWorkers: 5, MaxBatchSize: 100, MaxBatchWaitTime: 500 * time.Millisecond, QueueSize: 1000, } logger := log.NewNopLogger() metricsProvider := provider.NewDiscardProvider() inserter, err := batchInserter.NewBatchInserter( config, logger, metricsProvider, conn, nil, ) if err != nil { panic("failed to create batch inserter: " + err.Error()) } inserter.Start() defer inserter.Stop() for i := 0; i < 1000; i++ { record := batchInserter.RecordWithTime{ Record: db.Record{ Type: db.Default, DeviceID: "mac:112233445566", BirthDate: time.Now().UnixNano(), DeathDate: time.Now().Add(32 * 24 * time.Hour).UnixNano(), Data: []byte(`{"event": "data"}`), }, Beginning: time.Now(), } err := inserter.Insert(record) if err != nil { continue } } } ``` -------------------------------- ### Create Cassandra Connection with Configuration Source: https://context7.com/xmidt-org/codex-db/llms.txt Establishes a connection to a Cassandra cluster using a configuration object. It supports SSL/TLS, authentication, retry logic, and integrates with health checkers and metrics providers. ```go config := cassandra.Config{ Hosts: []string{"cassandra-1.example.com", "cassandra-2.example.com"}, Database: "devices", OpTimeout: 10 * time.Second, SSLRootCert: "/path/to/ca.crt", Username: "cassandra_user", Password: "secret_password", NumRetries: 3, } conn, err := cassandra.CreateDbConnection(config, metricsProvider, healthChecker) ``` -------------------------------- ### Define and Initialize a Device Event Record (Go) Source: https://context7.com/xmidt-org/codex-db/llms.txt Demonstrates the creation of a `Record` struct, the primary data structure for device events. It includes fields for event type, device ID, timestamps, payload, and encryption metadata. The `TableName()` method is shown for context. ```Go package main import ( "time" db "github.com/xmidt-org/codex-db" ) func main() { // Create a new record for device event storage record := db.Record{ Type: db.State, // Event type (Default or State) DeviceID: "mac:112233445566", // Unique device identifier BirthDate: time.Now().UnixNano(), // Creation timestamp in nanoseconds DeathDate: time.Now().Add(32 * 24 * time.Hour).UnixNano(), // TTL expiration Data: []byte(`{"event": "online", "timestamp": 1234567890}`), // Event payload Nonce: []byte("random-nonce-value"), // Encryption nonce (if encrypted) Alg: "AES-256-GCM", // Encryption algorithm KID: "key-2024-01", // Key identifier for decryption } // Records are stored in the "events" table tableName := record.TableName() // Returns "events" } ``` -------------------------------- ### POST /db/connection Source: https://context7.com/xmidt-org/codex-db/llms.txt Initializes a connection to the PostgreSQL database with support for SSL, connection pooling, and health monitoring. ```APIDOC ## POST /db/connection ### Description Creates a connection to a PostgreSQL database with SSL support, health checks, and connection pool management. ### Method POST ### Endpoint /db/connection ### Request Body - **Server** (string) - Required - Host address and port - **Username** (string) - Required - Database user - **Database** (string) - Required - Database name - **MaxIdleConns** (integer) - Optional - Max idle connections in pool - **MaxOpenConns** (integer) - Optional - Max open connections in pool ### Response #### Success Response (200) - **status** (string) - Connection status message #### Response Example { "status": "connected" } ``` -------------------------------- ### Create PostgreSQL Database Connection Source: https://context7.com/xmidt-org/codex-db/llms.txt Configures and initializes a PostgreSQL connection with SSL support, connection pooling, and integrated health checks. It requires a configuration struct, a metrics provider, and a health checker instance. ```go func main() { config := postgresql.Config{ Server: "postgres.example.com:5432", Username: "codex_user", Database: "codex_events", SSLRootCert: "/path/to/ca.crt", SSLKey: "/path/to/client.key", SSLCert: "/path/to/client.crt", ConnectTimeout: 10 * time.Second, OpTimeout: 10 * time.Second, NumRetries: 3, WaitTimeMult: 2 * time.Second, MaxIdleConns: 10, MaxOpenConns: 50, PingInterval: time.Second, PruneLimit: 1000, } healthChecker := health.New() metricsProvider := provider.NewDiscardProvider() conn, err := postgresql.CreateDbConnection(config, metricsProvider, healthChecker) if err != nil { panic("failed to connect to PostgreSQL: " + err.Error()) } defer conn.Close() } ``` -------------------------------- ### Create Retry Insert Service with Exponential Backoff (Go) Source: https://context7.com/xmidt-org/codex-db/llms.txt Wraps a database inserter with retry logic using exponential backoff. This enhances reliability by automatically retrying failed inserts. Requires backoff library, go-kit metrics, and a cassandra connection. ```Go package main import ( "time" "github.com/cenkalti/backoff/v3" "github.com/go-kit/kit/metrics/provider" db "github.com/xmidt-org/codex-db" dbretry "github.com/xmidt-org/codex-db/retry" "github.com/xmidt-org/codex-db/cassandra" ) func main() { // Assume conn is an established database connection var conn *cassandra.Connection // = cassandra.CreateDbConnection(...) // Configure exponential backoff backoffConfig := backoff.ExponentialBackOff{ InitialInterval: 100 * time.Millisecond, RandomizationFactor: 0.5, Multiplier: 2.0, MaxInterval: 10 * time.Second, MaxElapsedTime: 1 * time.Minute, // Give up after 1 minute Clock: backoff.SystemClock, } metricsProvider := provider.NewDiscardProvider() // Create retry-enabled inserter retryInserter := dbretry.CreateRetryInsertService( conn, dbretry.WithBackoff(backoffConfig), dbretry.WithMeasures(metricsProvider), ) // Insert with automatic retry on failure record := db.Record{ Type: db.Default, DeviceID: "mac:112233445566", BirthDate: time.Now().UnixNano(), DeathDate: time.Now().Add(32 * 24 * time.Hour).UnixNano(), Data: []byte(`{"event": "test"}`), } // This will retry with exponential backoff if the insert fails err := retryInserter.InsertRecords(record) if err != nil { // Failed after all retries exhausted panic("insert failed after retries: " + err.Error()) } } ``` -------------------------------- ### CreateDbConnection Source: https://context7.com/xmidt-org/codex-db/llms.txt Initializes a connection to a Cassandra cluster with support for SSL/TLS, authentication, and retry logic. ```APIDOC ## FUNCTION CreateDbConnection ### Description Establishes a connection to a Cassandra cluster using the provided configuration, metrics provider, and health checker. ### Method N/A (Go Function) ### Parameters - **config** (cassandra.Config) - Required - Configuration object containing hosts, keyspace, SSL settings, and retry logic. - **metricsProvider** (provider.Provider) - Required - Metrics collection interface. - **healthChecker** (health.Health) - Required - Health checking interface. ### Request Example { "Hosts": ["cassandra-1.example.com"], "Database": "devices", "Username": "cassandra_user", "Password": "secret_password" } ### Response #### Success Response - **conn** (*cassandra.Connection) - The established database connection object. ``` -------------------------------- ### Parse and Use Event Types (Go) Source: https://context7.com/xmidt-org/codex-db/llms.txt Illustrates how to parse `EventType` enums from strings, providing flexibility for configuration. It shows the conversion of string representations like "State" and "Default" into their corresponding enum values and their usage within a `Record`. ```Go package main import ( db "github.com/xmidt-org/codex-db" ) func main() { // Parse event type from string (e.g., from configuration) stateType := db.ParseEventType("State") // Returns db.State defaultType := db.ParseEventType("Default") // Returns db.Default unknownType := db.ParseEventType("Unknown") // Returns db.Default (fallback) // Use event types directly record := db.Record{ Type: db.State, // For online/offline events DeviceID: "mac:112233445566", } // Event types as integers for database storage // db.Default = 0 // db.State = 1 } ``` -------------------------------- ### Retrieve Paginated Device List from Cassandra Source: https://context7.com/xmidt-org/codex-db/llms.txt Demonstrates how to fetch a list of device IDs active within a specific time range using pagination. It iterates through results by updating the offset until no more devices are returned. ```go func listActiveDevices(conn *cassandra.Connection) { endDate := time.Now() startDate := endDate.Add(-24 * time.Hour) offset := 0 limit := 100 for { devices, err := conn.GetDeviceList(startDate, endDate, offset, limit) if err != nil { panic("failed to get device list: " + err.Error()) } if len(devices) == 0 { break } for _, deviceID := range devices { _ = deviceID } offset += len(devices) } } ``` -------------------------------- ### Adapt Go-Kit Logger to HealthLogger Interface (Go) Source: https://context7.com/xmidt-org/codex-db/llms.txt Adapts a standard go-kit logger to the go-health logger interface, enabling unified logging within the health check system. This involves creating a go-kit logger and then wrapping it with the healthlogger.NewHealthLogger function. Supports standard logging methods and field injection. ```Go package main import ( "os" "github.com/go-kit/kit/log" "github.com/xmidt-org/codex-db/healthlogger" ) func main() { // Create a go-kit logger goKitLogger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)) goKitLogger = log.With(goKitLogger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) // Adapt to go-health logger interface healthLog := healthlogger.NewHealthLogger(goKitLogger) // Use with go-health // healthChecker := health.New() // healthChecker.Logger = healthLog // The health logger supports all standard log methods healthLog.Info("health check initialized") healthLog.Debug("debug message") healthLog.Warn("warning message") healthLog.Error("error message") // Formatted logging healthLog.Infof("check completed in %dms", 150) // With fields logWithFields := healthLog.WithFields(map[string]interface{}{ "component": "database", "host": "cassandra-1", }) logWithFields.Info("connection established") } ``` -------------------------------- ### InsertRecords Source: https://context7.com/xmidt-org/codex-db/llms.txt Inserts one or more event records into Cassandra using efficient batch operations. ```APIDOC ## FUNCTION InsertRecords ### Description Persists device event records to the Cassandra database. Supports batching multiple records in a single call. ### Method N/A (Go Function) ### Parameters - **records** (...db.Record) - Required - Variadic list of record objects to insert. ### Request Example [ { "Type": "Default", "DeviceID": "mac:112233445566", "BirthDate": 1672531200000000000, "Data": "{\"firmware\": \"v2.1.0\"}" } ] ### Response #### Success Response - **error** (error) - Returns nil on successful batch insertion. ``` -------------------------------- ### Retrieve and Paginate Device Records Source: https://context7.com/xmidt-org/codex-db/llms.txt Retrieves device event records from Cassandra with support for filtering by event type. It demonstrates how to use state hashes to implement pagination for large result sets. ```go records, err := conn.GetRecords(deviceID, 100, "") stateRecords, err := conn.GetRecordsOfType(deviceID, 50, db.State, "") stateHash, err := conn.GetStateHash(records) nextPage, _ := conn.GetRecords(deviceID, 100, stateHash) ``` -------------------------------- ### GetRecords / GetRecordsOfType Source: https://context7.com/xmidt-org/codex-db/llms.txt Retrieves device event records from Cassandra with support for filtering by type and pagination. ```APIDOC ## FUNCTION GetRecords / GetRecordsOfType ### Description Fetches event records for a specific device. Use GetRecordsOfType to filter by specific event categories like 'State' or 'Default'. ### Method N/A (Go Function) ### Parameters - **deviceID** (string) - Required - The unique identifier of the device. - **limit** (int) - Required - Maximum number of records to return. - **type** (db.Type) - Optional - Event type filter (used in GetRecordsOfType). - **stateHash** (string) - Optional - Token for pagination to fetch the next page of results. ### Response #### Success Response - **records** ([]db.Record) - A slice of retrieved event records. ### Response Example [ { "Type": "State", "DeviceID": "mac:112233445566", "BirthDate": 1672531200000000000, "Data": "{\"status\": \"online\"}" } ] ``` -------------------------------- ### Insert Records via Batch Operation Source: https://context7.com/xmidt-org/codex-db/llms.txt Performs efficient insertion of multiple event records into the Cassandra database. The method accepts a variadic list of records and processes them as a single batch operation. ```go records := []db.Record{ {Type: db.State, DeviceID: "mac:112233445566", Data: []byte(`{"status": "online"}`)}, {Type: db.Default, DeviceID: "mac:112233445566", Data: []byte(`{"firmware": "v2.1.0"}`)}, } err := conn.InsertRecords(records...) ``` -------------------------------- ### Migrate devices.events table schema to v0.5.0 Source: https://github.com/xmidt-org/codex-db/blob/main/cassandra/README.md This script adds a row_id column of type TIMEUUID to the devices.events table and creates a corresponding index to support state tracking. It is required for the transition from v0.4.0 to v0.5.0 and includes specific clustering and TTL configurations. ```cassandraql ALTER TABLE devices.events ADD row_id TIMEUUID; CREATE INDEX search_by_row_id ON devices.events (device_id, row_id) WITH CLUSTERING ORDER BY (row_id DESC) AND default_time_to_live = 2768400 AND transactions = {'enabled': 'false', 'consistency_level':'user_enforced'}; ``` -------------------------------- ### DELETE /records/expired Source: https://context7.com/xmidt-org/codex-db/llms.txt Identifies and removes expired records from the database using the Pruner interface. ```APIDOC ## DELETE /records/expired ### Description Retrieves a list of expired records and deletes them from the specified shard. ### Method DELETE ### Endpoint /records/expired ### Parameters #### Query Parameters - **shard** (integer) - Required - Shard identifier - **limit** (integer) - Required - Number of records to process - **deathDate** (integer) - Required - Unix timestamp threshold ### Response #### Success Response (200) - **deletedCount** (integer) - Number of records successfully removed #### Response Example { "deletedCount": 50 } ``` -------------------------------- ### Prune Expired PostgreSQL Records Source: https://context7.com/xmidt-org/codex-db/llms.txt Uses the Pruner interface to identify and delete expired records from the database. It retrieves a batch of records based on a shard and death date, then iterates through them to perform individual deletions. ```go func pruneExpiredRecords(conn *postgresql.Connection) { shard := 0 limit := 100 deathDate := time.Now().UnixNano() recordsToDelete, err := conn.GetRecordsToDelete(shard, limit, deathDate) if err != nil { panic("failed to get expired records: " + err.Error()) } for _, record := range recordsToDelete { err := conn.DeleteRecord(shard, record.DeathDate, record.RecordID) if err != nil { continue } } } ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.