Implementation Details

Dive into the code implementation of core algorithms and components.

Table of contents

  1. Inverted Index Implementation
    1. Core Data Structures
    2. Text Preprocessing
    3. Document Indexing
    4. TF-IDF Retrieval
  2. CTR Model Implementation
    1. Feature Extraction
    2. Model Training
    3. CTR Prediction
  3. Service Orchestration
    1. Index Service
  4. Performance Optimization
    1. Memory Optimization
    2. Query Optimization
    3. Scalability
  5. Testing & Validation
    1. Unit Tests
    2. Integration Tests
  6. Key Files Reference
  7. Further Reading
    1. Core Components
    2. Analysis & Optimization

Inverted Index Implementation

Core Data Structures

# File: src/search_engine/index_tab/offline_index.py
class InvertedIndex:
    """Inverted index implementation - Memory-optimized version"""
    
    def __init__(self):
        # Core data structures
        self.index = defaultdict(set)          # term -> document ID set
        self.doc_lengths = {}                  # document ID -> length
        self.documents = {}                    # document ID -> content
        self.term_freq = defaultdict(dict)     # term -> {doc_id: frequency}
        self.doc_freq = defaultdict(int)       # term -> document frequency
        
        # Chinese stop words
        self.stop_words = {
            '的', '了', '在', '是', '我', '有', '和', '就', '不', '人', 
            '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去'
        }

Text Preprocessing

def preprocess_text(self, text: str) -> List[str]:
    """Text preprocessing - Chinese word segmentation"""
    # Use jieba for Chinese word segmentation
    words = jieba.lcut(text.lower())
    
    # Filter stop words and short words
    words = [word for word in words 
             if len(word) > 1 and word not in self.stop_words]
    
    return words

Document Indexing

def add_document(self, doc_id: str, content: str):
    """Add document to index - Inverted index construction"""
    # Save original document
    self.documents[doc_id] = content
    
    # Preprocess text
    words = self.preprocess_text(content)
    
    # Calculate document length
    self.doc_lengths[doc_id] = len(words)
    
    # Count word frequencies
    word_freq = Counter(words)
    
    # Update inverted index
    for word, freq in word_freq.items():
        self.index[word].add(doc_id)
        self.term_freq[word][doc_id] = freq
    
    # Update document frequency
    for word in word_freq:
        self.doc_freq[word] = len(self.index[word])

TF-IDF Retrieval

def search(self, query: str, top_k: int) -> List[Tuple[str, float, str]]:
    """TF-IDF retrieval core logic"""
    # Step 1: Query preprocessing
    query_terms = self.preprocess_text(query)
    if not query_terms:
        return []
    
    # Step 2: Candidate document recall
    candidates = set()
    for term in query_terms:
        if term in self.index:
            candidates.update(self.index[term])
    
    # Step 3: TF-IDF relevance calculation
    scores = {}
    total_docs = len(self.documents)
    
    for doc_id in candidates:
        score = 0.0
        doc_length = self.doc_lengths[doc_id]
        
        for term in query_terms:
            if term in self.term_freq and doc_id in self.term_freq[term]:
                # Calculate TF (Term Frequency)
                tf = self.term_freq[term][doc_id] / doc_length
                
                # Calculate IDF (Inverse Document Frequency)
                df = self.doc_freq[term]
                idf = math.log(total_docs / df) if df > 0 else 0
                
                # Calculate TF-IDF score
                score += tf * idf
        
        if score > 0:
            scores[doc_id] = score
    
    # Step 4: Top-K sorting and return
    sorted_results = sorted(scores.items(), 
                           key=lambda x: x[1], 
                           reverse=True)[:top_k]
    
    # Add document summaries
    results = []
    for doc_id, score in sorted_results:
        summary = self.documents[doc_id][:200] + "..." \
                  if len(self.documents[doc_id]) > 200 \
                  else self.documents[doc_id]
        results.append((doc_id, score, summary))
    
    return results

CTR Model Implementation

Feature Extraction

# File: src/search_engine/training_tab/ctr_model.py
def extract_features(self, ctr_data: List[Dict[str, Any]]) -> Tuple[np.ndarray, np.ndarray]:
    """Extract 7-dimensional feature vector from CTR data"""
    df = pd.DataFrame(ctr_data)
    
    # Feature 1: Position feature (absolute position)
    position_features = df['position'].values.reshape(-1, 1)
    
    # Feature 2: Document length feature
    doc_lengths = df['summary'].str.len().values.reshape(-1, 1)
    
    # Feature 3: Query length feature
    query_lengths = df['query'].str.len().values.reshape(-1, 1)
    
    # Feature 4: Summary length feature
    summary_lengths = df['summary'].str.len().values.reshape(-1, 1)
    
    # Feature 5: Query-summary match score (core relevance feature)
    match_scores = []
    for _, row in df.iterrows():
        query_words = set(jieba.lcut(row['query']))
        summary_words = set(jieba.lcut(row['summary']))
        if len(query_words) > 0:
            match_ratio = len(query_words.intersection(summary_words)) / len(query_words)
        else:
            match_ratio = 0
        match_scores.append(match_ratio)
    match_scores = np.array(match_scores).reshape(-1, 1)
    
    # 6-7. Historical CTR features (avoid data leakage)
    query_ctr_features, doc_ctr_features = self._extract_historical_ctr(df)
    
    # Combine all features into 7-dimensional vector
    X = np.column_stack([
        position_features,      # Dimension 1: Position
        doc_lengths,           # Dimension 2: Document length
        query_lengths,         # Dimension 3: Query length
        summary_lengths,       # Dimension 4: Summary length
        match_scores,          # Dimension 5: Match score
        query_ctr_features,    # Dimension 6: Query historical CTR
        doc_ctr_features       # Dimension 7: Document historical CTR
    ])
    
    # Extract labels
    y = df['clicked'].values
    
    return X, y

Model Training

def train(self, training_data: List[Dict]) -> Dict[str, float]:
    """Complete model training pipeline"""
    # Step 1: Feature extraction
    X, y = self.extract_features(training_data)
    if len(X) == 0:
        return {"error": "No training data"}
    
    # Step 2: Data splitting
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    # Step 3: Feature standardization
    self.scaler = StandardScaler()
    X_train_scaled = self.scaler.fit_transform(X_train)
    X_test_scaled = self.scaler.transform(X_test)
    
    # Step 4: Model training
    self.model = LogisticRegression(
        random_state=42,
        class_weight='balanced',  # Handle class imbalance
        max_iter=1000
    )
    self.model.fit(X_train_scaled, y_train)
    
    # Step 5: Model evaluation
    y_pred = self.model.predict(X_test_scaled)
    y_pred_proba = self.model.predict_proba(X_test_scaled)[:, 1]
    
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred),
        "recall": recall_score(y_test, y_pred),
        "auc": roc_auc_score(y_test, y_pred_proba),
        "train_samples": len(X_train),
        "test_samples": len(X_test)
    }
    
    self.is_trained = True
    return metrics

CTR Prediction

def predict_ctr(self, query: str, doc_id: str, position: int, 
               tfidf_score: float, summary: str) -> float:
    """Predict CTR probability for single sample"""
    if not self.is_trained:
        return 0.5  # Return neutral probability if untrained
    
    # Construct feature vector
    features = np.array([[
        position,                    # Position feature
        len(summary),               # Document length
        len(query),                 # Query length
        len(summary),               # Summary length
        self._calculate_match_score(query, summary),  # Match score
        0.1,                        # Query historical CTR (simplified)
        0.1                         # Document historical CTR (simplified)
    ]])
    
    # Standardize and predict
    features_scaled = self.scaler.transform(features)
    ctr_prob = self.model.predict_proba(features_scaled)[0][1]
    
    return float(ctr_prob)

Service Orchestration

Index Service

# File: src/search_engine/index_service.py
class IndexService:
    """Search service orchestrator"""
    
    def search(self, query: str, top_k: int = 10) -> List[Tuple[str, float, str]]:
        """Unified search interface"""
        # Stage 1: Recall - Get candidate documents
        candidates = self.inverted_index.search(query, top_k * 2)
        
        # Stage 2: Ranking - CTR reranking
        if self.ctr_enabled:
            return self.rank_with_ctr(query, candidates, top_k)
        else:
            return candidates[:top_k]
    
    def rank_with_ctr(self, query: str, candidates: List, top_k: int):
        """CTR reranking logic"""
        # Feature construction → Model prediction → Reranking
        features = self._build_features(query, candidates)
        ctr_scores = self.model_service.predict_batch(features)
        return self._rerank_by_ctr(candidates, ctr_scores, top_k)

Performance Optimization

Memory Optimization

  • Set data structure: Reduce memory fragmentation for document ID storage
  • Sparse matrix: Use sparse representation for term-document matrix
  • Index compression: Optional compression for production deployment

Query Optimization

  • Short-circuit evaluation: Prioritize low-frequency terms
  • Result caching: LRU cache for popular queries
  • Batch processing: Batch CTR predictions for efficiency

Scalability

  • Horizontal sharding: Hash-based term partitioning
  • Incremental updates: Support dynamic document addition/deletion
  • Distributed coordination: Query fanout and result merging

Testing & Validation

Unit Tests

# Test inverted index
def test_inverted_index():
    index = InvertedIndex()
    index.add_document("doc1", "artificial intelligence machine learning")
    results = index.search("machine learning", top_k=5)
    assert len(results) > 0
    assert results[0][0] == "doc1"

# Test CTR model
def test_ctr_model():
    model = CTRModel()
    training_data = load_test_data()
    metrics = model.train(training_data)
    assert metrics['accuracy'] > 0.5

Integration Tests

# Test end-to-end search pipeline
def test_search_pipeline():
    service = IndexService()
    service.load_documents("test_documents.json")
    results = service.search("test query", mode="ctr")
    assert len(results) > 0
    assert all(isinstance(r, tuple) for r in results)

Key Files Reference

File Description Lines
offline_index.py Inverted index implementation ~300
ctr_model.py CTR model implementation ~400
index_service.py Service orchestration ~250
data_service.py Data collection and management ~200
model_service.py Model serving and inference ~150

Further Reading

Core Components

Analysis & Optimization