### Install Dependencies and Run Phoenix Models Source: https://context7.com/xai-org/x-algorithm/llms.txt This bash snippet outlines the steps to set up and run the Phoenix models. It includes installing dependencies using 'uv' and then executing the ranking and retrieval model demos. ```bash # Install dependencies with uv cd phoenix uv sync # Run the ranking model demo uv run run_ranker.py # Run the retrieval model demo uv run run_retrieval.py ``` -------------------------------- ### Chain Multiple Filters for Candidate Processing (Rust) Source: https://context7.com/xai-org/x-algorithm/llms.txt Demonstrates how to chain multiple filters, including AgeFilter, to process a list of candidates. This example shows a typical pipeline setup using boxed trait objects. ```rust // Usage: Chain multiple filters let filters: Vec>> = vec![ Box::new(DropDuplicatesFilter), Box::new(AgeFilter::new(Duration::from_secs(172800))), // 2 days Box::new(SelfTweetFilter), Box::new(MutedKeywordFilter::new()), Box::new(AuthorSocialgraphFilter), ]; ``` -------------------------------- ### Manage Posts with Thunder PostStore Source: https://context7.com/xai-org/x-algorithm/llms.txt This Rust snippet illustrates the usage of the Thunder PostStore, a thread-safe in-memory store for recent posts. It covers creating the store, inserting posts, retrieving posts by followed users, filtering for video posts, and starting background maintenance tasks. ```rust use thunder::posts::post_store::PostStore; use xai_thunder_proto::LightPost; use std::sync::Arc; use std::collections::HashSet; use std::time::Instant; // Create post store with 2-day retention and 100ms request timeout let post_store = Arc::new(PostStore::new( 2 * 24 * 60 * 60, // retention_seconds 100, // request_timeout_ms )); // Insert posts from Kafka stream let posts = vec![ LightPost { post_id: 1234567890, author_id: 42, created_at: chrono::Utc::now().timestamp(), is_retweet: false, is_reply: false, has_video: false, in_reply_to_post_id: None, in_reply_to_user_id: None, source_post_id: None, source_user_id: None, conversation_id: None, }, ]; post_store.insert_posts(posts); // Retrieve posts from followed users let following_ids: Vec = vec![100, 200, 300, 400, 500]; let exclude_ids: HashSet = HashSet::new(); let start_time = Instant::now(); let request_user_id = 42; let in_network_posts = post_store.get_all_posts_by_users( &following_ids, &exclude_ids, start_time, request_user_id, ); // Get video posts only let video_posts = post_store.get_videos_by_users( &following_ids, &exclude_ids, start_time, request_user_id, ); // Start background maintenance tasks let store_clone = Arc::clone(&post_store); store_clone.start_stats_logger(); store_clone.start_auto_trim(30); // Trim every 30 minutes ``` -------------------------------- ### Configure and Run Phoenix Ranking Model (Python/JAX) Source: https://context7.com/xai-org/x-algorithm/llms.txt This snippet shows the configuration and execution of the Phoenix ranking model, a transformer-based model for predicting engagement probabilities. It includes setting up model configurations, initializing an inference runner, creating example batches, and accessing the ranking outputs. ```python from recsys_model import PhoenixModelConfig, HashConfig, RecsysBatch, RecsysEmbeddings from grok import TransformerConfig from runners import RecsysInferenceRunner, ModelRunner, create_example_batch # Configure the ranking model hash_config = HashConfig( num_user_hashes=2, num_item_hashes=2, num_author_hashes=2, ) model_config = PhoenixModelConfig( emb_size=128, num_actions=19, # Number of engagement types to predict history_seq_len=128, candidate_seq_len=32, hash_config=hash_config, product_surface_vocab_size=16, model=TransformerConfig( emb_size=128, widening_factor=2, key_size=64, num_q_heads=2, num_kv_heads=2, num_layers=2, attn_output_multiplier=0.125, ), ) # Initialize inference runner inference_runner = RecsysInferenceRunner( runner=ModelRunner(model=model_config, bs_per_device=0.125), name="phoenix_ranker", ) inference_runner.initialize() # Create batch with user history and candidates batch, embeddings = create_example_batch( batch_size=1, emb_size=128, history_len=128, num_candidates=32, num_actions=19, num_user_hashes=2, num_item_hashes=2, num_author_hashes=2, ) # Rank candidates - returns probabilities for each action type ranking_output = inference_runner.rank(batch, embeddings) # Access predictions scores = ranking_output.scores # [batch, num_candidates, num_actions] ranked_indices = ranking_output.ranked_indices # [batch, num_candidates] # Individual action probabilities p_favorite = ranking_output.p_favorite_score # [batch, num_candidates] p_repost = ranking_output.p_repost_score p_reply = ranking_output.p_reply_score p_block = ranking_output.p_block_author_score ``` -------------------------------- ### Running Phoenix Models Source: https://context7.com/xai-org/x-algorithm/llms.txt Instructions for setting up and running the Phoenix ranking and retrieval models. ```APIDOC ## Running Phoenix Models ### Description This section provides command-line instructions for installing dependencies and running the Phoenix ranking and retrieval model demonstrations. ### Method `bash` ### Endpoint N/A (Command Line) ### Parameters N/A (Command Line) ### Request Example ```bash # Install dependencies with uv cd phoenix uv sync # Run the ranking model demo uv run run_ranker.py # Run the retrieval model demo uv run run_retrieval.py ``` ### Response N/A (Command Line) ``` -------------------------------- ### Implement Recommendation Pipeline with CandidatePipeline Trait (Rust) Source: https://context7.com/xai-org/x-algorithm/llms.txt Demonstrates how to implement the `CandidatePipeline` trait for building recommendation pipelines. This framework supports parallel execution of stages and configurable error handling, processing queries and candidates through sources, filters, and scorers. ```rust use xai_candidate_pipeline::candidate_pipeline::{CandidatePipeline, PipelineResult, HasRequestId}; use xai_candidate_pipeline::source::Source; use xai_candidate_pipeline::filter::Filter; use xai_candidate_pipeline::scorer::Scorer; use xai_candidate_pipeline::hydrator::Hydrator; use xai_candidate_pipeline::selector::Selector; use std::sync::Arc; use tonic::async_trait; // Define your query type with request tracking #[derive(Clone)] struct MyQuery { request_id: String, user_id: i64, } impl HasRequestId for MyQuery { fn request_id(&self) -> &str { &self.request_id } } // Define your candidate type #[derive(Clone, Default)] struct MyCandidate { id: i64, score: Option, } // Implement the pipeline struct MyPipeline { sources: Vec>>, filters: Vec>>, scorers: Vec>> // ... other components } // Execute the pipeline async fn run_pipeline(pipeline: &impl CandidatePipeline, query: MyQuery) { let result: PipelineResult = pipeline.execute(query).await; println!("Retrieved: {} candidates", result.retrieved_candidates.len()); println!("Filtered: {} candidates", result.filtered_candidates.len()); println!("Selected: {} candidates", result.selected_candidates.len()); } ``` -------------------------------- ### Configure and Run Phoenix Retrieval Model Source: https://context7.com/xai-org/x-algorithm/llms.txt This Python snippet demonstrates how to configure and run the Phoenix Retrieval Model, a two-tower architecture for similarity search. It involves setting up model configurations, initializing a runner, preparing a corpus of embeddings, and performing retrieval. ```python from recsys_retrieval_model import PhoenixRetrievalModelConfig from recsys_model import HashConfig from grok import TransformerConfig from runners import ( RecsysRetrievalInferenceRunner, RetrievalModelRunner, create_example_batch, create_example_corpus, ) # Configure retrieval model retrieval_config = PhoenixRetrievalModelConfig( emb_size=128, history_seq_len=128, candidate_seq_len=32, hash_config=HashConfig(num_user_hashes=2, num_item_hashes=2, num_author_hashes=2), product_surface_vocab_size=16, model=TransformerConfig( emb_size=128, widening_factor=2, key_size=64, num_q_heads=2, num_kv_heads=2, num_layers=2, ), ) # Initialize retrieval runner retrieval_runner = RecsysRetrievalInferenceRunner( runner=RetrievalModelRunner(model=retrieval_config, bs_per_device=0.125), name="phoenix_retrieval", ) retrieval_runner.initialize() # Create corpus of candidate embeddings (pre-computed) corpus_embeddings, corpus_post_ids = create_example_corpus( corpus_size=1000000, # 1M candidates emb_size=128, ) retrieval_runner.set_corpus(corpus_embeddings, corpus_post_ids) # Create user batch batch, embeddings = create_example_batch(batch_size=2, emb_size=128, ...) # Retrieve top-k candidates per user output = retrieval_runner.retrieve(batch, embeddings, top_k=100) top_k_indices = output.top_k_indices # [batch, top_k] - corpus indices top_k_scores = output.top_k_scores # [batch, top_k] - similarity scores user_repr = output.user_representation # [batch, emb_size] - user embeddings ``` -------------------------------- ### Home Mixer gRPC Service for Ranked Posts Source: https://context7.com/xai-org/x-algorithm/llms.txt This Rust snippet demonstrates how to interact with the Home Mixer gRPC service, which provides ranked posts for a user's feed. It shows constructing a request with various parameters and processing the paginated response containing scored posts. ```rust use xai_home_mixer_proto::{ScoredPostsQuery, ScoredPostsResponse, ScoredPost}; use xai_home_mixer_proto::scored_posts_service_server::ScoredPostsService; use tonic::{Request, Response, Status}; // gRPC request let request = ScoredPostsQuery { viewer_id: 123456789, client_app_id: 1, country_code: "US".to_string(), language_code: "en".to_string(), seen_ids: vec![111, 222, 333], // Previously seen post IDs served_ids: vec![444, 555], // Recently served post IDs in_network_only: false, // Include out-of-network content is_bottom_request: false, // Initial feed load bloom_filter_entries: vec![], // Bloom filter for deduplication }; // Response contains ranked posts let response: ScoredPostsResponse = client.get_scored_posts(request).await?; for post in response.scored_posts { println!("Post {} by user {} - score: {:.4}", post.tweet_id, post.author_id, post.score ); println!(" In-network: {}", post.in_network); println!(" Ancestors: {:?}", post.ancestors); } ``` -------------------------------- ### Run Pytest Tests Source: https://context7.com/xai-org/x-algorithm/llms.txt This command executes pytest tests for the recommendation system models. It specifies the test files to be run, ensuring the integrity and functionality of the recommendation logic. ```bash uv run pytest test_recsys_model.py test_recsys_retrieval_model.py ``` -------------------------------- ### Thunder PostStore Source: https://context7.com/xai-org/x-algorithm/llms.txt The PostStore is a thread-safe in-memory store for recent posts, optimized for fast lookups of in-network content. ```APIDOC ## Thunder PostStore ### Description This section describes the `PostStore`, a thread-safe in-memory data structure designed for storing and retrieving recent posts with high efficiency, featuring sub-millisecond lookup times. ### Method N/A (Code Example) ### Endpoint N/A (Code Example) ### Parameters N/A (Code Example) ### Request Example ```rust use thunder::posts::post_store::PostStore; use xai_thunder_proto::LightPost; use std::sync::Arc; use std::collections::HashSet; use std::time::Instant; // Create post store with 2-day retention and 100ms request timeout let post_store = Arc::new(PostStore::new( 2 * 24 * 60 * 60, // retention_seconds 100, // request_timeout_ms )); // Insert posts from Kafka stream let posts = vec![ LightPost { post_id: 1234567890, author_id: 42, created_at: chrono::Utc::now().timestamp(), is_retweet: false, is_reply: false, has_video: false, in_reply_to_post_id: None, in_reply_to_user_id: None, source_post_id: None, source_user_id: None, conversation_id: None, }, ]; post_store.insert_posts(posts); // Retrieve posts from followed users let following_ids: Vec = vec![100, 200, 300, 400, 500]; let exclude_ids: HashSet = HashSet::new(); let start_time = Instant::now(); let request_user_id = 42; let in_network_posts = post_store.get_all_posts_by_users( &following_ids, &exclude_ids, start_time, request_user_id, ); // Get video posts only let video_posts = post_store.get_videos_by_users( &following_ids, &exclude_ids, start_time, request_user_id, ); // Start background maintenance tasks let store_clone = Arc::clone(&post_store); store_clone.start_stats_logger(); store_clone.start_auto_trim(30); // Trim every 30 minutes ``` ### Response N/A (Code Example) ``` -------------------------------- ### Implement ThunderSource for In-Network Content Retrieval (Rust) Source: https://context7.com/xai-org/x-algorithm/llms.txt Shows the implementation of the `Source` trait for `ThunderSource`, which retrieves in-network posts from the Thunder service. This source is conditionally enabled based on the query to ensure it only runs for mixed feeds. ```rust use tonic::async_trait; use xai_candidate_pipeline::source::Source; pub struct ThunderSource { pub thunder_client: Arc, } #[async_trait] impl Source for ThunderSource { // Optionally disable source based on query fn enable(&self, query: &ScoredPostsQuery) -> bool { !query.in_network_only // Only run for mixed feeds } async fn get_candidates(&self, query: &ScoredPostsQuery) -> Result, String> { let request = GetInNetworkPostsRequest { user_id: query.user_id as u64, following_user_ids: query.user_features.followed_user_ids.clone(), max_results: 1000, exclude_tweet_ids: vec![], algorithm: "default".to_string(), debug: false, is_video_request: false, }; let response = self.client .get_in_network_posts(request) .await .map_err(|e| format!("ThunderSource: {}", e))?; let candidates: Vec = response .into_inner() .posts .into_iter() .map(|post| PostCandidate { tweet_id: post.post_id, author_id: post.author_id as u64, ..Default::default() }) .collect(); Ok(candidates) } } ``` -------------------------------- ### Home Mixer gRPC Service Source: https://context7.com/xai-org/x-algorithm/llms.txt The HomeMixerServer provides a gRPC endpoint for fetching ranked posts tailored for a user's For You feed. ```APIDOC ## Home Mixer gRPC Service ### Description This section outlines the `HomeMixerServer`, which exposes a gRPC service for retrieving a ranked list of posts suitable for a user's personalized 'For You' feed. ### Method N/A (Code Example) ### Endpoint N/A (Code Example) ### Parameters N/A (Code Example) ### Request Example ```rust use xai_home_mixer_proto::{ScoredPostsQuery, ScoredPostsResponse, ScoredPost}; use xai_home_mixer_proto::scored_posts_service_server::ScoredPostsService; use tonic::{Request, Response, Status}; // gRPC request let request = ScoredPostsQuery { viewer_id: 123456789, client_app_id: 1, country_code: "US".to_string(), language_code: "en".to_string(), seen_ids: vec![111, 222, 333], // Previously seen post IDs served_ids: vec![444, 555], // Recently served post IDs in_network_only: false, // Include out-of-network content is_bottom_request: false, // Initial feed load bloom_filter_entries: vec![], // Bloom filter for deduplication }; // Response contains ranked posts let response: ScoredPostsResponse = client.get_scored_posts(request).await?; for post in response.scored_posts { println!("Post {} by user {} - score: {:.4}", post.tweet_id, post.author_id, post.score ); println!(" In-network: {}", post.in_network); println!(" Ancestors: {:?}", post.ancestors); } ``` ### Response N/A (Code Example) ``` -------------------------------- ### Implement AgeFilter for Candidate Filtering (Rust) Source: https://context7.com/xai-org/x-algorithm/llms.txt The AgeFilter partitions candidates based on their age, keeping those within a specified maximum age. It implements the Filter trait and takes a Duration for max_age. Candidates are processed in a vector, returning kept and removed sets. ```rust use tonic::async_trait; use xai_candidate_pipeline::filter::{Filter, FilterResult}; use std::time::Duration; pub struct AgeFilter { pub max_age: Duration, } impl AgeFilter { pub fn new(max_age: Duration) -> Self { Self { max_age } } fn is_within_age(&self, tweet_id: i64) -> bool { snowflake::duration_since_creation_opt(tweet_id) .map(|age| age <= self.max_age) .unwrap_or(false) } } #[async_trait] impl Filter for AgeFilter { fn enable(&self, _query: &ScoredPostsQuery) -> bool { true // Always enabled } async fn filter( &self, _query: &ScoredPostsQuery, candidates: Vec, ) -> Result, String> { let (kept, removed): (Vec<_>, Vec<_>) = candidates .into_iter() .partition(|c| self.is_within_age(c.tweet_id)); Ok(FilterResult { kept, removed }) } } ``` -------------------------------- ### Phoenix Retrieval Model (Two-Tower) Source: https://context7.com/xai-org/x-algorithm/llms.txt The PhoenixRetrievalModel utilizes a two-tower architecture for efficient similarity search, suitable for large-scale candidate retrieval. ```APIDOC ## Phoenix Retrieval Model (Two-Tower) ### Description This section details the configuration and usage of the `PhoenixRetrievalModel`, a two-tower architecture designed for efficient similarity search across millions of candidates. ### Method N/A (Code Example) ### Endpoint N/A (Code Example) ### Parameters N/A (Code Example) ### Request Example ```python from recsys_retrieval_model import PhoenixRetrievalModelConfig from recsys_model import HashConfig from grok import TransformerConfig from runners import ( RecsysRetrievalInferenceRunner, RetrievalModelRunner, create_example_batch, create_example_corpus, ) # Configure retrieval model retrieval_config = PhoenixRetrievalModelConfig( emb_size=128, history_seq_len=128, candidate_seq_len=32, hash_config=HashConfig(num_user_hashes=2, num_item_hashes=2, num_author_hashes=2), product_surface_vocab_size=16, model=TransformerConfig( emb_size=128, widening_factor=2, key_size=64, num_q_heads=2, num_kv_heads=2, num_layers=2, ), ) # Initialize retrieval runner retrieval_runner = RecsysRetrievalInferenceRunner( runner=RetrievalModelRunner(model=retrieval_config, bs_per_device=0.125), name="phoenix_retrieval", ) retrieval_runner.initialize() # Create corpus of candidate embeddings (pre-computed) corpus_embeddings, corpus_post_ids = create_example_corpus( corpus_size=1000000, # 1M candidates emb_size=128, ) retrieval_runner.set_corpus(corpus_embeddings, corpus_post_ids) # Create user batch batch, embeddings = create_example_batch(batch_size=2, emb_size=128, ...) # Retrieve top-k candidates per user output = retrieval_runner.retrieve(batch, embeddings, top_k=100) top_k_indices = output.top_k_indices # [batch, top_k] - corpus indices top_k_scores = output.top_k_scores # [batch, top_k] - similarity scores user_repr = output.user_representation # [batch, emb_size] - user embeddings ``` ### Response N/A (Code Example) ``` -------------------------------- ### Implement WeightedScorer for Candidate Scoring (Rust) Source: https://context7.com/xai-org/x-algorithm/llms.txt The WeightedScorer calculates a weighted score for candidates based on various engagement predictions. It implements the Scorer trait and updates the candidate's weighted_score field. The scoring logic combines multiple prediction types with predefined weights. ```rust use tonic::async_trait; use xai_candidate_pipeline::scorer::Scorer; pub struct WeightedScorer; #[async_trait] impl Scorer for WeightedScorer { async fn score( &self, _query: &ScoredPostsQuery, candidates: &[PostCandidate], ) -> Result, String> { let scored = candidates .iter() .map(|c| { // Combine Phoenix ML predictions into weighted score let s = &c.phoenix_scores; let weighted_score = s.favorite_score.unwrap_or(0.0) * 0.5 + s.reply_score.unwrap_or(0.0) * 0.3 + s.retweet_score.unwrap_or(0.0) * 1.0 + s.click_score.unwrap_or(0.0) * 0.1 + s.not_interested_score.unwrap_or(0.0) * -1.0 + s.block_author_score.unwrap_or(0.0) * -50.0; PostCandidate { weighted_score: Some(weighted_score), ..Default::default() } }) .collect(); Ok(scored) } fn update(&self, candidate: &mut PostCandidate, scored: PostCandidate) { candidate.weighted_score = scored.weighted_score; } } ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.