Bài viết

System Design Interview: Top K Heavy Hitters - Từ Cơ Bản Đến Tối Ưu

System Design Interview: Top K Heavy Hitters - Từ Cơ Bản Đến Tối Ưu

Đặt Vấn Đề

Top K Heavy Hitters là một trong những bài toán kinh điển trong System Design Interview. Bài toán yêu cầu xác định top K trending events trong các khoảng thời gian cụ thể như:

  • Hashtags phổ biến nhất trên LinkedIn
  • Photos được like nhiều nhất trên Instagram
  • Songs được phát nhiều nhất trên Spotify

Trong các time frame: 1 phút, 5 phút, 1 giờ, và 1 ngày.

Yêu Cầu Hệ Thống

Functional Requirements

  • Tìm top K heavy hitters/events trong các khoảng thời gian: 1 phút, 5 phút, 1 giờ, 1 ngày

Non-Functional Requirements

🔄 Highly Scalable

  • Xử lý được lượng events khổng lồ: hàng tỷ events/ngày
  • Có khả năng mở rộng theo chiều ngang

⚡ High Performance

  • Low latency: Trả về top K list trong vài milliseconds
  • Real-time processing cho các kết quả nhanh

🛡️ Highly Available

  • Đảm bảo uptime cao, không gián đoạn service
  • Fault tolerance khi có sự cố

Phương Pháp Tiếp Cận

Chúng ta sẽ bắt đầu từ solution đơn giản nhất, sau đó phân tích pros/cons và dần tối ưu để đáp ứng các yêu cầu về scalability, availability, và performance.

Solution 1: Hashmap và Heap (Single Server)

Ý Tưởng

  • Sử dụng HashMap để lưu frequency của events
  • Sử dụng Min Heap kích thước K để maintain top K events
  • In-memory processing cho tốc độ cao

Architecture Diagram

Event Stream Events A, B, C... Single Server HashMap Event → Count A: 5, B: 3, C: 8 In-Memory Min Heap Size K Top K Events [C:8, A:5, B:3] Processing Logic Update HashMap → Check Top K → Update Heap Top K Results Low Latency Real-time Performance Characteristics ✓ Pros • Low latency (~ms) • Simple implementation ✗ Cons • Not scalable • Memory limitation

Cách Hoạt Động

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Pseudo code
hashmap = {}  # event -> frequency
min_heap = MinHeap(size=K)

def process_event(event):
    # Update frequency
    hashmap[event] = hashmap.get(event, 0) + 1
    
    # Update top K if needed
    if hashmap[event] impacts top K:
        min_heap.update(event, hashmap[event])

def get_top_k():
    return min_heap.get_all()

Đánh Giá

Pros:

  • Low latency nhờ in-memory processing
  • Đơn giản, dễ implement

Cons:

  • Không scalable: Không thể handle hàng tỷ events trên single server
  • Memory limitation: HashMap sẽ quá lớn

Hashmap & Heap Single Server

Solution 2: Horizontal Scaling với Multiple Servers

Ý Tưởng

  • Partition events across multiple servers bằng shard key
  • Mỗi server maintain HashMap và Min Heap riêng
  • Merge results từ tất cả servers để có final top K

Architecture Diagram

Event Stream Billions/day Load Balancer Shard Key Server 1 HashMap Events A-F MinHeap Top K₁ Server 2 HashMap Events G-M MinHeap Top K₂ Server 3 HashMap Events N-Z MinHeap Top K₃ Merging Service Combine K₁,K₂,K₃ → Final Top K Database Final Top K Results Time Window End Reset & Start Next Window Remaining Problems • Still not fully scalable • Expensive horizontal scaling • Complex result merging • What if events keep increasing?

Cách Hoạt Động

1
2
3
4
5
Events → Load Balancer → Server 1 (HashMap + MinHeap)
                     → Server 2 (HashMap + MinHeap)  
                     → Server 3 (HashMap + MinHeap)
                     ↓
                   Merging Service → Final Top K → Database

Cách Hoạt Động

  1. Events được hash và phân phối đến các servers
  2. Mỗi server xử lý subset của events
  3. Cuối mỗi time window, servers trả về top K lists của họ
  4. Merging service combine các lists thành final result

Đánh Giá

Pros:

  • Cải thiện throughput
  • Phân tán load

Cons:

  • Vẫn không hoàn toàn scalable khi số events tăng liên tục
  • Expensive khi cần thêm nhiều servers
  • Complexity trong việc merge results

Horizontally scaling initial solution

Solution 3: Giải Pháp Hai Phần

Do horizontal scaling vẫn có limitations, chúng ta chia thành 2 parts để giải quyết các vấn đề khác nhau:

Part 1: Count Min Sketch + Min Heap (Real-time, Approximate)

Count Min Sketch là gì?

Count Min Sketch là một probabilistic data structure dùng để estimate frequency của events trong data stream:

  • Fixed size: Không phụ thuộc vào số lượng events
  • Space efficient: Tiết kiệm memory hơn HashMap
  • Approximate results: Có thể overcount do hash collisions

Cách Hoạt Động

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Count Min Sketch matrix: rows × cols
# rows = số hash functions, cols = buckets per hash function
sketch = [[0] * cols for _ in range(rows)]

def update_event(event):
    frequencies = []
    for i in range(rows):
        hash_val = hash_functions[i](event) % cols
        sketch[i][hash_val] += 1
        frequencies.append(sketch[i][hash_val])
    
    # Lấy minimum frequency (tránh overcount)
    estimated_freq = min(frequencies)
    
    # Update min heap nếu cần
    if estimated_freq impacts top K:
        min_heap.update(event, estimated_freq)

Ví Dụ Minh Họa

Giả sử có Count Min Sketch kích thước 3×7 (3 hash functions):

Event A xuất hiện:

  • Hash1(A) % 7 = 2 → sketch[0][2]++
  • Hash2(A) % 7 = 5 → sketch[1][5]++
  • Hash3(A) % 7 = 1 → sketch[2][1]++
  • Frequency(A) = min(sketch[0][2], sketch[1][5], sketch[2][1])

Đánh Giá Part 1

Pros:

  • Real-time và fast
  • Fixed memory footprint
  • Cost effective
  • Simple implementation

Cons:

  • Approximate results (không chính xác 100%)

Count Min Sketch & Min Heap Single Server Solution

Part 2: HDFS + MapReduce (Batch, Exact)

Ý Tưởng

  • Lưu event logs trong distributed file system (HDFS)
  • Sử dụng MapReduce jobs để xử lý batch

HDFS + MapReduce Architecture

HDFS + MapReduce Solution (Batch Processing) Event Stream Kafka Queue HDFS Consumer HDFS Storage event_logs_01.txt event_logs_02.txt event_logs_03.txt MapReduce Processing Pipeline MapReduce Job 1: Frequency Counting Map Phase Input: event_logs Output: (event, 1) Reduce Phase Input: (event, [1,1,1...]) Output: (event, count) MapReduce Job 2: Top K Selection Map Phase Input: (event, count) Output: ("all", (event, count)) Reduce Phase Input: All (event, count) Output: Top K events Intermediate Results Example Data Flow Input: event_logs → [A, B, A, C, A, B, C, C, C] Job 1 Output: A:3, B:2, C:4 Job 2 Output (K=3): [C:4, A:3, B:2] Database Exact Results Batch Updates Execution Schedule Hourly/Daily Batch Jobs High Latency, High Accuracy

Architecture Flow

1
2
3
4
5
6
7
Events → Kafka Queue → HDFS Consumer → HDFS Storage
                                    ↓
                              MapReduce Job 1: Count frequencies
                                    ↓  
                              MapReduce Job 2: Find top K
                                    ↓
                                Database

MapReduce Jobs

Job 1 - Frequency Counting:

1
2
3
4
5
6
7
8
9
# Map phase
def map(event_log):
    for event in parse(event_log):
        if event.timestamp in target_window:
            emit(event.id, 1)

# Reduce phase  
def reduce(event_id, counts):
    emit(event_id, sum(counts))

Job 2 - Top K Selection:

1
2
3
4
5
6
7
8
# Map phase
def map(event_id, frequency):
    emit("all", (event_id, frequency))

# Reduce phase
def reduce(key, event_frequencies):
    top_k = heapq.nlargest(K, event_frequencies, key=lambda x: x[1])
    emit("top_k_result", top_k)

Đánh Giá Part 2

Pros:

  • Exact results (chính xác 100%)
  • Highly scalable với distributed processing
  • Cost effective cho batch processing

Cons:

  • Not real-time (độ trễ cao)
  • Complex setup và maintenance

HDFS and Map Reduce Job Solution

Solution 4: Final Combined Solution

Final Combined Architecture

Final Combined Solution: Real-time + Batch Processing Event Stream Billions/day API Gateway Event Routing Kafka Queue Message Broker Path 1: Real-time Processing Count Min Sketch Processor Matrix 3×7 MinHeap Top K Database (Real-time Results) ~95-99% Accuracy Path 2: Batch Processing HDFS Storage MapReduce Jobs 1. Count Freq 2. Top K Database (Exact Results) 100% Accuracy Real-time Batch Unified Serving Layer Query Interface get_top_k(window, mode) mode: fast | accurate | hybrid Result Merger Combine & Validate Confidence Scoring Response Cache Redis/Memcached TTL-based Metrics • Latency: ~ms • Throughput: 10K+/s • Accuracy: 95-100% • Availability: 99.9% • Scalability: ✓ Optimizations • Event Aggregation • Data Partitioning • Caching Layers • Auto Scaling • Load Balancing

Kiến Trúc Tổng Hợp

Kết hợp Part 1Part 2 để có được cả real-time approximatebatch exact results:

1
2
3
4
5
6
7
8
9
10
11
                            API Gateway
                                 ↓
                               Kafka
                            ↙         ↘
                    Path 1              Path 2
            Count Min Sketch         HDFS Consumer
            (Real-time)                   ↓
                 ↓                      HDFS
              Database              MapReduce Jobs
                                        ↓  
                                    Database

Detailed Flow

Common Components

  1. API Gateway: Route incoming events
  2. Kafka: Message queue cho scalable event processing
  3. Database: Store results (SQL/NoSQL)

Processing Paths

Path 1 - Real-time Processing:

  1. Events từ Kafka → Count Min Sketch processor
  2. Calculate approximate top K real-time
  3. Store results vào database với timestamp

Path 2 - Batch Processing:

  1. Events từ Kafka → HDFS consumer
  2. Store event data trong HDFS
  3. Hourly MapReduce jobs:
    • Job 1: Aggregate event frequencies
    • Job 2: Calculate exact top K
  4. Store precise results vào database

Serving Layer

1
2
3
4
5
6
7
8
9
10
def get_top_k(time_window, precision="fast"):
    if precision == "fast":
        return get_approximate_results(time_window)
    elif precision == "accurate":  
        return get_exact_results(time_window)
    else:
        # Combine both for confidence scoring
        approx = get_approximate_results(time_window)
        exact = get_exact_results(time_window)
        return merge_with_confidence(approx, exact)

Final Solution

Tối Ưu Hóa Nâng Cao

1. Event Aggregation tại API Gateway

1
2
3
4
5
6
7
8
9
# Pre-aggregate events before sending to Kafka
def aggregate_events(events_batch):
    aggregated = {}
    for event in events_batch:
        key = (event.id, event.time_window)
        aggregated[key] = aggregated.get(key, 0) + 1
    
    return [Event(id=k[0], count=v, window=k[1]) 
            for k, v in aggregated.items()]

2. Data Partitioner Pattern

1
2
3
4
5
Kafka → Data Partitioner → Partition by (event_id + time_window)
                    ↓
               Partitioned Kafka Topics
                    ↓  
              Partition Processors

Benefits:

  • Better load balancing
  • Ordered processing per partition
  • Reduced hotspots

3. Caching Layer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Multi-level caching
L1_Cache = {}  # In-memory for frequent queries
L2_Cache = Redis()  # Distributed cache

def get_cached_top_k(time_window):
    # Try L1 first
    if time_window in L1_Cache:
        return L1_Cache[time_window]
    
    # Try L2
    result = L2_Cache.get(f"top_k_{time_window}")
    if result:
        L1_Cache[time_window] = result  # Populate L1
        return result
    
    # Fetch from database
    result = database.get_top_k(time_window)
    
    # Populate both caches
    L2_Cache.set(f"top_k_{time_window}", result, ttl=300)
    L1_Cache[time_window] = result
    
    return result

4. Time Window Management

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class TimeWindowManager:
    def __init__(self):
        self.windows = {
            "1min": SlidingWindow(60),
            "5min": SlidingWindow(300), 
            "1hour": SlidingWindow(3600),
            "1day": SlidingWindow(86400)
        }
    
    def process_event(self, event):
        current_time = time.time()
        for window_name, window in self.windows.items():
            if window.should_process(event.timestamp, current_time):
                window.add_event(event)
                
            if window.should_emit_results(current_time):
                top_k = window.get_top_k()
                self.emit_results(window_name, top_k)

Monitoring và Metrics

Key Metrics cần theo dõi:

📊 Performance Metrics:

  • Latency: P50, P95, P99 response times
  • Throughput: Events processed per second
  • Error Rate: Failed processing percentage

💾 Resource Metrics:

  • Memory Usage: Count Min Sketch memory footprint
  • CPU Utilization: Processing overhead
  • Network I/O: Data transfer rates

🎯 Business Metrics:

  • Accuracy: Approximate vs exact results comparison
  • Freshness: Time lag cho real-time results
  • Coverage: Percentage of events successfully processed

Alerting Strategy

1
2
3
4
5
6
7
8
9
10
11
# Example monitoring setup
def setup_monitoring():
    alerts = [
        Alert("high_latency", "p95_latency > 100ms"),
        Alert("low_throughput", "events_per_sec < 1000"),
        Alert("high_error_rate", "error_rate > 1%"),
        Alert("memory_usage", "memory_usage > 80%")
    ]
    
    for alert in alerts:
        monitor.add_alert(alert)

Trade-offs và Considerations

Real-time vs Accuracy

AspectReal-time (Count Min Sketch)Batch (MapReduce)
LatencyMillisecondsMinutes to Hours
Accuracy~95-99%100%
Resource CostLowHigher
ComplexitySimpleComplex

Scalability Patterns

  • Vertical Scaling: Tăng compute power cho single machines
  • Horizontal Scaling: Thêm machines vào cluster
  • Functional Partitioning: Chia theo event types
  • Temporal Partitioning: Chia theo time windows

Cost Optimization

1
2
3
4
5
6
7
8
# Dynamic resource allocation
def optimize_resources(current_load):
    if current_load > high_threshold:
        scale_up_count_min_sketch_servers()
        increase_kafka_partitions()
    elif current_load < low_threshold:
        scale_down_resources()
        consolidate_partitions()

Kết Luận

Top K Heavy Hitters system design đòi hỏi careful balance giữa real-time performanceaccuracy. Solution cuối cung cấp:

🎯 Hybrid Approach Benefits:

  • Fast approximate results cho real-time use cases
  • Accurate batch results cho analytics và reporting
  • Scalable architecture handle billions of events
  • Cost-effective resource utilization

🚀 Key Takeaways:

  1. Start simple, sau đó optimize dần dần
  2. Understand trade-offs giữa speed, accuracy, và cost
  3. Use appropriate data structures: Count Min Sketch cho approximation
  4. Leverage distributed systems: MapReduce cho exact computation
  5. Monitor everything: Performance, accuracy, và business metrics

📈 Production Considerations:

  • Fault tolerance: Handle server failures gracefully
  • Data consistency: Ensure reliable event processing
  • Security: Protect against malicious events
  • Compliance: Meet data retention và privacy requirements

System này có thể adapt cho nhiều use cases khác nhau từ social media trending đến e-commerce product recommendations, financial fraud detection, và IoT sensor data analysis.

Thành công trong System Design Interview không chỉ là code correct solution, mà còn là demonstrate hiểu biết về scalability principles, trade-offs analysis, và real-world constraints! 💪

Bài viết này được cấp phép bởi tác giả theo giấy phép CC BY 4.0 .