### Install Requirements Source: https://github.com/git-disl/memetrans/blob/main/README.md Install the necessary Python dependencies for the project. ```sh pip install -r requirements.txt ``` -------------------------------- ### Process memecoin data for BigQuery Source: https://context7.com/git-disl/memetrans/llms.txt Loads migration data from JSONL files and calculates time windows for transaction analysis. The resulting windows are saved to CSV for BigQuery query generation. ```python import pandas as pd from pathlib import Path import json def load_memecoin_jsonl(input_path: Path) -> pd.DataFrame: """Load memecoin migration data from JSONL file""" rows = [] with input_path.open("r", encoding="utf-8") as f: for line in f: obj = json.loads(line.strip()) rows.append({ "mint": obj["token_address"], "migration_ts": int(obj["timestamp"]), }) return pd.DataFrame(rows) def build_windows(df: pd.DataFrame, pre_hours: int = 24, post_hours: int = 2) -> pd.DataFrame: """Build time windows for each memecoin (24h before to 2h after migration)""" df = df.copy() df["window_start"] = df["migration_ts"] - pre_hours * 3600 df["window_end"] = df["migration_ts"] + post_hours * 3600 return df[["mint", "migration_ts", "window_start", "window_end"]] # Example usage df = load_memecoin_jsonl(Path("raw_data/memecoin.jsonl")) windows_df = build_windows(df, pre_hours=24, post_hours=2) windows_df.to_csv("memecoin_windows.csv", index=False) # Run BigQuery script generation # python -m data_pipeline.transaction.bigquery --num_parts 6 --project_id YOUR_PROJECT ``` -------------------------------- ### Collect Pump.fun token migration transactions Source: https://context7.com/git-disl/memetrans/llms.txt Uses the Solana AsyncClient to monitor the Raydium fee account for migration events. Requires a valid RPC endpoint and the Pump.fun creator address. ```python import asyncio from solana.rpc.async_api import AsyncClient from solders.pubkey import Pubkey from datetime import datetime, timezone # Configure RPC endpoint (add your endpoint to data_pipeline/rpc_endpoints.txt) RPC_URL = "https://api.mainnet-beta.solana.com" RAYDIUM_FEE_ACCOUNT = "7YttLkHDoNj9wyDur5pM1ejNaAvT9X4eqaYcHQqtj2G5" PUMP_FUN_CREATOR = "39azUYFWPz3VHgKCf3VChUwbpURdCHRxjWVowf5jUJjg" async def collect_coins(): """Collect Pump.fun token migration transactions from Raydium fee account""" async with AsyncClient(RPC_URL) as client: pubkey = Pubkey.from_string(RAYDIUM_FEE_ACCOUNT) # Fetch signatures for the fee account sigs_resp = await client.get_signatures_for_address(pubkey, limit=1000) sig_infos = sigs_resp.value for sig_info in sig_infos: if sig_info.err: continue # Fetch and parse the transaction parsed_tx = await client.get_transaction( sig_info.signature, encoding="jsonParsed", commitment="confirmed", max_supported_transaction_version=0 ) # Filter for Pump.fun creator transactions account_keys = parsed_tx.value.transaction.message.account_keys creator = str(account_keys[0]) if creator == PUMP_FUN_CREATOR: # Extract token address from post_token_balances print(f"Found Pump.fun token migration: {sig_info.signature}") # Run: python -m data_pipeline.memecoin.coin_collection asyncio.run(collect_coins()) ``` -------------------------------- ### Evaluate Memecoin Selection Source: https://github.com/git-disl/memetrans/blob/main/README.md Run the evaluation script for the memecoin selection application. ```sh python memecoin_selection.py ``` -------------------------------- ### Train ML Model with Class Imbalance Handling Source: https://context7.com/git-disl/memetrans/llms.txt This snippet demonstrates training a PyTorch model using BCEWithLogitsLoss with positive class weighting to handle imbalanced datasets. It includes a standard training loop with optimization and evaluation steps. ```python pos_weight = torch.tensor( (len(y_train) - y_train.sum()) / (y_train.sum() + 1e-8) ).to(device) criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5) for epoch in range(50): model.train() for batch_X, batch_y in train_loader: batch_X, batch_y = batch_X.to(device), batch_y.to(device) optimizer.zero_grad() logits = model(batch_X) loss = criterion(logits, batch_y) loss.backward() optimizer.step() # Evaluate model.eval() with torch.no_grad(): y_proba = torch.sigmoid(model(X_test_tensor.to(device))).cpu().numpy() auprc = average_precision_score(y_test, y_proba) print(f"Epoch {epoch}: AUPRC = {auprc:.4f}") ``` -------------------------------- ### Parse Outer Transactions (Post-Migration) - Python Source: https://context7.com/git-disl/memetrans/llms.txt Parses post-migration transactions on Raydium DEX, extracting swap events by analyzing token and WSOL balance changes for the Raydium V4 program. Requires defaultdict. ```python from collections import defaultdict RAYDIUM_V4 = "5Q544fKrFoe6tsEbD7S8EmxGTJYAKtTVhAW5Q5pge4j1" WSOL_MINT = "So11111111111111111111111111111111111111112" def parse_raydium_swap(mint, accounts, token_map, wsol_token_map, fee): """Parse a Raydium DEX swap transaction""" if RAYDIUM_V4 not in token_map: return None token_change = token_map[RAYDIUM_V4] sol_change = 0 # Extract WSOL change (represents SOL in/out) if len(wsol_token_map[RAYDIUM_V4]) == 1: sol_change = wsol_token_map[RAYDIUM_V4][0] else: for amount in wsol_token_map[RAYDIUM_V4]: if amount * token_change < 0: # Opposite direction = swap sol_change = amount if token_change == 0 or sol_change == 0: return None price = sol_change / token_change trader_map = {a: v for a, v in token_map.items() if a != RAYDIUM_V4} return Transaction( mint=mint, type="swap", trader_map=trader_map, token_amount=-token_change, sol_amount=-sol_change, price=price, fee_amount=float(fee / LAMPORTS_PER_SOL), # ... other fields ) # Run: python -m data_pipeline.transaction.parse_outer_trans ``` -------------------------------- ### Train Tree-Based ML Models (RF, XGBoost, LightGBM) Source: https://context7.com/git-disl/memetrans/llms.txt Trains Random Forest, XGBoost, or LightGBM classifiers. Requires data preprocessing and handles class imbalance. Use 'rf', 'xgb', or 'lgbm' as the --model argument. ```python import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler from sklearn.ensemble import RandomForestClassifier from xgboost import XGBClassifier from lightgbm import LGBMClassifier from sklearn.metrics import average_precision_score, classification_report import argparse def data_process(): """Load and preprocess the feature dataset""" Xy_df = pd.read_csv("../dataset/feat_label.csv") # Filter for valid samples (sufficient trading activity) Xy_df = Xy_df[ (Xy_df["group3_time_span_valid"] >= 60) & (Xy_df["group3_holder_num"] >= 100) ] Xy_df = Xy_df.sample(frac=1, random_state=42) # Shuffle # Extract features and labels feature_cols = [c for c in Xy_df.columns if "group" in c] X = Xy_df[feature_cols].values # Standardize features scaler = StandardScaler() X = scaler.fit_transform(X).astype(np.float32) # Binary labels: 0 = high-risk, 1 = safe y = np.where(Xy_df["label"] == "high", 0, 1) # Train/test split (70/30) split_idx = int(len(Xy_df) * 0.7) return X[:split_idx], y[:split_idx], X[split_idx:], y[split_idx:] if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("--model", type=str, default="rf", choices=["rf", "xgb", "lgbm", "lr"]) args = parser.parse_args() X_train, y_train, X_test, y_test = data_process() if args.model == "rf": model = RandomForestClassifier( n_estimators=800, max_depth=16, min_samples_leaf=3, class_weight="balanced_subsample", n_jobs=-1, random_state=42 ) elif args.model == "xgb": scale_pos_weight = (1 - y_train.mean()) / y_train.mean() model = XGBClassifier( n_estimators=800, max_depth=6, learning_rate=0.05, scale_pos_weight=scale_pos_weight, eval_metric="aucpr", n_jobs=-1 ) elif args.model == "lgbm": model = LGBMClassifier( n_estimators=2000, learning_rate=0.02, num_leaves=64, class_weight="balanced", n_jobs=-1 ) model.fit(X_train, y_train) y_proba = model.predict_proba(X_test)[:, 1] auprc = average_precision_score(y_test, y_proba) print(f"AUPRC: {auprc}") # ~0.56-0.57 for RF/XGB/LGBM y_pred = (y_proba >= 0.53).astype(int) print(classification_report(y_test, y_pred, digits=4)) # Run: python ml_model_train.py --model rf ``` -------------------------------- ### Analyze Fund Flows and Wallet Creators Source: https://context7.com/git-disl/memetrans/llms.txt Traces wallet funding history to identify creators and extracts transaction signers via RPC calls. ```python import asyncio from solana.rpc.async_api import AsyncClient from solders.pubkey import Pubkey from solders.signature import Signature as Sig async def collect_first_signature(account: str, before_sig_str: str, rpc_endpoints: list): """Find the first transaction for a wallet to identify its creator""" pubkey = Pubkey.from_string(account) before_signature = Sig.from_string(before_sig_str) all_sig_infos = [] rpc_endpoint = rpc_endpoints[0] async with AsyncClient(rpc_endpoint) as client: while True: sigs_resp = await client.get_signatures_for_address( pubkey, limit=1000, before=before_signature ) sig_infos = sigs_resp.value all_sig_infos.extend(sig_infos) if not sig_infos or len(all_sig_infos) >= 20000: break before_signature = sig_infos[-1].signature # Return oldest valid signatures (first transactions) valid_sigs = [ [str(s.signature), s.block_time] for s in all_sig_infos if s.err is None ] return valid_sigs[::-1] # Reverse to get chronological order async def get_transaction_signer(session, signature: str, rpc_url: str): """Fetch transaction and extract the fee payer (signer)""" async with session.post(rpc_url, json={ "jsonrpc": "2.0", "id": 1, "method": "getTransaction", "params": [signature, { "commitment": "confirmed", "encoding": "json", "maxSupportedTransactionVersion": 0 }] }) as resp: tx_obj = await resp.json() if tx_obj.get("result"): return tx_obj["result"]["transaction"]["message"]["accountKeys"][0] return None ``` -------------------------------- ### Parse Inner Transactions (Pre-Migration) - Python Source: https://context7.com/git-disl/memetrans/llms.txt Parses raw parquet files for pre-migration transactions on Pump.fun's bonding curve. Handles SOL and token balance changes to reconstruct trader positions and transaction prices. Requires solders and dataclasses. ```python import os import pickle from dataclasses import dataclass from decimal import Decimal from solders.pubkey import Pubkey # Solana constants LAMPORTS_PER_SOL = Decimal('1000000000') BONDING_CURVE_PROGRAM = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P" @dataclass class Transaction: mint: str type: str # "mint", "swap", "transfer", "mint&swap", "zero" timestamp: int signature: str trader_map: dict # {wallet_address: token_amount_change} token_amount: float sol_amount: float fee_amount: float price: float block_slot: int block_index: int def get_bonding_curve_address(mint: str) -> str: """Derive the bonding curve PDA for a token mint""" mint_key = Pubkey.from_string(mint) program_key = Pubkey.from_string(BONDING_CURVE_PROGRAM) addr, _ = Pubkey.find_program_address( [b"bonding-curve", bytes(mint_key)], program_key ) return str(addr) def sol_balance_change(balance_changes: list) -> dict: """Calculate SOL balance changes for each account""" return {b["account"]: b["after"] - b["before"] for b in balance_changes} def write_tx(file_name: str, tx_list: list, output_root: str): """Serialize parsed transactions to pickle file""" out_path = os.path.join(output_root, f"{file_name}.pkl") with open(out_path, "wb") as f: pickle.dump(tx_list, f) # Run: python -m data_pipeline.transaction.parse_inner_trans ``` -------------------------------- ### Evaluate Top-K Memecoin Selection Strategy Source: https://context7.com/git-disl/memetrans/llms.txt This Python function evaluates a top-K memecoin selection strategy for trading. It calculates metrics like average return, median return, win rate, and win/loss ratio based on predicted probabilities and actual outcomes, while filtering out certain high-risk tokens. ```python import numpy as np import pandas as pd from sklearn.metrics import average_precision_score, classification_report def topk_selection(k, y_pred_proba, y_test, mint_list_test, mint2label_info): """Evaluate top-K selection strategy for memecoin trading""" # Select top K tokens by predicted probability of being safe topk_idx = np.argsort(-y_pred_proba)[:k] topk_mints = [mint_list_test[i] for i in topk_idx] topk_probs = y_pred_proba[topk_idx] # Calculate returns for selected tokens return_ratio_list = [] for idx in topk_idx: mint = mint_list_test[idx] label_info = mint2label_info[mint] return_ratio = label_info["return_ratio"] # Skip tokens with positive return but high manipulation probability if return_ratio > 0 and label_info["pred_proba"] > 0.6: continue return_ratio_list.append(return_ratio) return_ratio_list = np.array(return_ratio_list) # Calculate trading metrics avg_return = return_ratio_list.mean() median_return = np.median(return_ratio_list) wins = return_ratio_list[return_ratio_list >= 0] losses = return_ratio_list[return_ratio_list < 0] win_rate = len(wins) / len(return_ratio_list) win_loss_ratio = wins.sum() / abs(losses.sum()) if len(losses) > 0 else np.inf print(f"Top-{k} Selection Results:") print(f" Average Return: {avg_return:.4f}") print(f" Median Return: {median_return:.4f}") print(f" Win Rate: {win_rate:.4f}") print(f" Win/Loss Ratio: {win_loss_ratio:.4f}") return avg_return, median_return, win_rate, win_loss_ratio # Load model predictions model_df = pd.read_csv("../results/rf_pred_0.5684831293402106.csv") y_pred_proba = model_df["prob"].values # Evaluate at different K values for k in [100, 200]: topk_selection(k, y_pred_proba, y_test, mint_list_test, mint2label_info) ``` -------------------------------- ### Fetch Jito Bundle ID - Python Source: https://context7.com/git-disl/memetrans/llms.txt Queries the Jito bundle API to identify MEV bundles by transaction signature. Handles rate limiting and retries. Requires requests, time, and random libraries. ```python import requests import time import random USER_AGENTS = [ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", ] def fetch_bundle_id(tx_sig: str, max_retries: int = 3) -> str: """Query Jito API to check if a transaction is part of a MEV bundle""" url = f"https://bundles.jito.wtf/api/v1/bundles/transaction/{tx_sig}" for attempt in range(max_retries): try: headers = { "User-Agent": random.choice(USER_AGENTS), "Accept": "application/json", "Referer": "https://explorer.jito.wtf", } resp = requests.get(url, timeout=10, headers=headers) if resp.status_code == 200: return resp.json()[0].get("bundle_id", "None") elif resp.status_code == 404: return "None" # Not a bundled transaction elif resp.status_code == 429: time.sleep(2 ** attempt + random.random()) # Rate limit backoff except Exception: time.sleep(2 ** attempt + random.random()) return "error" ``` -------------------------------- ### Generate On-Chain Behavioral Features Source: https://context7.com/git-disl/memetrans/llms.txt Implements Union-Find for wallet clustering, Gini coefficient for concentration, and a feature generation pipeline for memecoin transactions. ```python import numpy as np import pandas as pd from collections import defaultdict, OrderedDict class UnionFind: """Union-Find data structure for clustering related wallets""" def __init__(self): self.parent = {} self.rank = {} def find(self, x): if x not in self.parent: self.parent[x] = x self.rank[x] = 0 root = x while self.parent[root] != root: root = self.parent[root] return root def union(self, x, y): rx, ry = self.find(x), self.find(y) if rx == ry: return if self.rank[rx] < self.rank[ry]: self.parent[rx] = ry else: self.parent[ry] = rx if self.rank[rx] == self.rank[ry]: self.rank[rx] += 1 def gini(array: np.ndarray) -> float: """Compute Gini coefficient for holding concentration""" if len(array) == 0: return 0.0 array = np.sort(array.clip(min=0)) n = len(array) cum = np.cumsum(array, dtype=float) return (n + 1 - 2 * np.sum(cum) / cum[-1]) / n if cum[-1] > 0 else 0.0 def feature_generation(mint_address, tx_list, price_dict, sig2bundle, account2signer): """Generate full feature set for a memecoin""" feat = OrderedDict() mint_ts = tx_list[0]["timestamp"] # Feature Group 1: Context ts_hour = tx_list[-1]["timestamp"] - (tx_list[-1]["timestamp"] % 3600) feat["group1_price"] = price_dict[ts_hour] # SOL price at migration # Track holdings and sniper activity holdings = defaultdict(int) sniper_0s_list = set() # Traders who bought in same second as mint for row in tx_list: if "swap" in row["type"]: for trader, value in row["trader_map"].items(): holdings[trader] += value if row["timestamp"] == mint_ts: sniper_0s_list.add(trader) # Feature Group 2: Holding Concentration hold_df = pd.DataFrame(holdings.items(), columns=["trader", "amount"]) hold_df = hold_df.sort_values("amount", ascending=False) total_supply = hold_df.amount.sum() or 1e-6 feat["group2_holder_gini"] = gini(hold_df[hold_df.amount >= 1].amount.values) feat["group2_top1_pct"] = hold_df.head(1).amount.sum() / total_supply feat["group2_top10_pct"] = hold_df.head(10).amount.sum() / total_supply feat["group2_sniper_0s_hold_pct"] = hold_df[ hold_df.trader.isin(sniper_0s_list) ].amount.sum() / total_supply # Feature Group 3: Market Activity feat["group3_tx_num"] = len(tx_list) feat["group3_holder_num"] = (hold_df.amount >= 1).sum() # Feature Group 4: Cluster Statistics (using Union-Find) uf = UnionFind() # ... cluster wallets by bundle_id and shared signers return feat # Run: python -m data_pipeline.feature.feat_gen ``` -------------------------------- ### Train MLP Classifier with PyTorch Source: https://context7.com/git-disl/memetrans/llms.txt Implements and trains a Multi-Layer Perceptron (MLP) classifier using PyTorch. Includes batch normalization, dropout, and handles class imbalance with BCEWithLogitsLoss and positive class weighting. ```python import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import TensorDataset, DataLoader from sklearn.metrics import average_precision_score, classification_report class MLP(nn.Module): def __init__(self, input_dim, hidden_dims=[512, 256], dropout=0.2): super().__init__() layers = [] in_dim = input_dim for h in hidden_dims: layers.append(nn.Linear(in_dim, h)) layers.append(nn.BatchNorm1d(h)) layers.append(nn.ReLU()) layers.append(nn.Dropout(dropout)) in_dim = h layers.append(nn.Linear(in_dim, 1)) self.net = nn.Sequential(*layers) def forward(self, x): return self.net(x).squeeze(1) # Training loop device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Prepare data (from data_process()) X_train_tensor = torch.from_numpy(X_train).float() y_train_tensor = torch.from_numpy(y_train).float() train_loader = DataLoader( TensorDataset(X_train_tensor, y_train_tensor), batch_size=512, shuffle=True ) model = MLP(input_dim=X_train.shape[1]).to(device) ``` -------------------------------- ### Train Machine Learning Models Source: https://github.com/git-disl/memetrans/blob/main/README.md Train the risk prediction model using the specified algorithm. ```sh cd MemeTrans/risk_prediction python ml_model_train.py --model rf ``` -------------------------------- ### Check Jito Bundle Association Source: https://context7.com/git-disl/memetrans/llms.txt Retrieves the bundle ID for a given transaction signature to determine if it is part of a Jito bundle. ```python tx_signature = "5gNDGvwopnNxoYceBfhL4UWQVUymvUfojRpNw5M5Fdd6T8k8CFGz..." bundle_id = fetch_bundle_id(tx_signature) print(f"Bundle ID: {bundle_id}") # "None" if not bundled ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.