Bài viết

Production Best Practices và Performance Optimization

Hướng dẫn toàn diện về triển khai OpenAI Agents trong môi trường production. Từ performance optimization, scaling strategies đến cost management và security.

Production Best Practices và Performance Optimization

Sau khi đã thành thạo việc xây dựng agents với đầy đủ tính năng từ tools, handoffs đến tracing, giờ là lúc đưa chúng vào thế giới thực - môi trường production nơi mà hàng nghìn người dùng sẽ tương tác, nơi mà downtime có nghĩa là mất tiền, và nơi mà performance kém có thể làm hỏng trải nghiệm người dùng.

Production không chỉ là việc “chạy code trên server” mà là cả một nghệ thuật tối ưu hóa, giám sát và bảo vệ hệ thống AI. Giống như việc điều hành một nhà hàng - bạn không chỉ cần món ăn ngon mà còn phải đảm bảo phục vụ nhanh, giá cả hợp lý, vệ sinh an toàn và có thể phục vụ được đông khách.

Hôm nay chúng ta sẽ khám phá cách biến những agents “hoạt động tốt trên máy dev” thành những production-grade systems có thể xử lý traffic thực tế, tối ưu chi phí và đảm bảo độ tin cậy cao.

Chiến Lược Deployment

Environment Management

Việc đầu tiên khi chuẩn bị production là thiết lập các environment riêng biệt và rõ ràng:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
import os
from enum import Enum
from typing import Dict, Any, Optional
from dataclasses import dataclass
from agents import Agent, set_default_openai_key, set_default_openai_client
from openai import AsyncOpenAI

class Environment(Enum):
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"

@dataclass
class EnvironmentConfig:
    """Cấu hình cho từng environment"""
    name: Environment
    openai_api_key: str
    openai_base_url: Optional[str] = None
    max_concurrent_requests: int = 10
    request_timeout: int = 30
    rate_limit_per_minute: int = 100
    enable_tracing: bool = True
    log_level: str = "INFO"
    cost_limit_per_day: float = 100.0  # USD
    monitoring_enabled: bool = True

class ProductionEnvironmentManager:
    """Quản lý cấu hình cho các environment khác nhau"""
    
    def __init__(self):
        self.configs = self._load_environment_configs()
        self.current_env = self._detect_current_environment()
        
    def _load_environment_configs(self) -> Dict[Environment, EnvironmentConfig]:
        """Load cấu hình từ environment variables"""
        
        configs = {}
        
        # Development config
        configs[Environment.DEVELOPMENT] = EnvironmentConfig(
            name=Environment.DEVELOPMENT,
            openai_api_key=os.getenv("OPENAI_API_KEY_DEV", ""),
            max_concurrent_requests=5,
            request_timeout=60,  # Dài hơn để debug
            rate_limit_per_minute=50,
            enable_tracing=True,
            log_level="DEBUG",
            cost_limit_per_day=10.0,
            monitoring_enabled=False
        )
        
        # Staging config
        configs[Environment.STAGING] = EnvironmentConfig(
            name=Environment.STAGING,
            openai_api_key=os.getenv("OPENAI_API_KEY_STAGING", ""),
            max_concurrent_requests=20,
            request_timeout=30,
            rate_limit_per_minute=200,
            enable_tracing=True,
            log_level="INFO",
            cost_limit_per_day=50.0,
            monitoring_enabled=True
        )
        
        # Production config
        configs[Environment.PRODUCTION] = EnvironmentConfig(
            name=Environment.PRODUCTION,
            openai_api_key=os.getenv("OPENAI_API_KEY_PROD", ""),
            max_concurrent_requests=100,
            request_timeout=20,  # Ngắn hơn cho UX tốt
            rate_limit_per_minute=1000,
            enable_tracing=True,
            log_level="WARNING",  # Chỉ log những gì quan trọng
            cost_limit_per_day=500.0,
            monitoring_enabled=True
        )
        
        return configs
    
    def _detect_current_environment(self) -> Environment:
        """Tự động detect environment hiện tại"""
        
        env_name = os.getenv("ENVIRONMENT", "development").lower()
        
        if env_name in ["prod", "production"]:
            return Environment.PRODUCTION
        elif env_name in ["stage", "staging"]:
            return Environment.STAGING
        else:
            return Environment.DEVELOPMENT
    
    def setup_environment(self, env: Optional[Environment] = None):
        """Setup OpenAI client và các cấu hình cho environment"""
        
        target_env = env or self.current_env
        config = self.configs[target_env]
        
        print(f"🚀 Đang setup environment: {target_env.value}")
        
        # Setup OpenAI client
        if config.openai_base_url:
            client = AsyncOpenAI(
                api_key=config.openai_api_key,
                base_url=config.openai_base_url,
                timeout=config.request_timeout
            )
            set_default_openai_client(client)
        else:
            set_default_openai_key(config.openai_api_key)
        
        # Setup logging
        import logging
        logging.basicConfig(level=getattr(logging, config.log_level))
        
        # Setup tracing
        if not config.enable_tracing:
            from agents import set_tracing_disabled
            set_tracing_disabled(True)
        
        print(f"✅ Environment {target_env.value} đã được setup thành công")
        return config
    
    def get_config(self, env: Optional[Environment] = None) -> EnvironmentConfig:
        """Lấy config cho environment cụ thể"""
        target_env = env or self.current_env
        return self.configs[target_env]

# Production-ready agent factory
class ProductionAgentFactory:
    """Factory để tạo agents tối ưu cho production"""
    
    def __init__(self, env_manager: ProductionEnvironmentManager):
        self.env_manager = env_manager
        self.config = env_manager.get_config()
    
    def create_customer_service_agent(self) -> Agent:
        """Tạo customer service agent tối ưu cho production"""
        
        # Instructions được tối ưu cho performance và consistency
        optimized_instructions = """
        Bạn là chuyên viên hỗ trợ khách hàng chuyên nghiệp của công ty.
        
        NGUYÊN TẮC PHẢN HỒI:
        - Luôn thân thiện, lịch sự và chuyên nghiệp
        - Trả lời ngắn gọn nhưng đầy đủ thông tin
        - Ưu tiên giải pháp thực tế có thể thực hiện ngay
        - Hỏi thêm thông tin nếu câu hỏi không rõ ràng
        
        QUY TRÌNH XỬ LÝ:
        1. Xác định loại yêu cầu (thông tin, khiếu nại, hỗ trợ kỹ thuật)
        2. Cung cấp giải pháp phù hợp hoặc hướng dẫn tiếp theo
        3. Xác nhận khách hàng hiểu và hài lòng
        
        LƯU Ý QUAN TRỌNG:
        - Không đưa ra cam kết về thời gian cụ thể mà chưa được xác nhận
        - Luôn đề xuất escalate cho supervisor nếu vấn đề phức tạp
        - Ghi nhận thông tin khách hàng để theo dõi sau này
        
        Mục tiêu: Giải quyết vấn đề của khách hàng nhanh chóng và hiệu quả.
        """
        
        return Agent(
            name="Customer Service Pro",
            instructions=optimized_instructions,
            model="gpt-4o-mini",  # Cân bằng giữa performance và cost
            model_settings={
                "temperature": 0.3,  # Ít creative hơn để consistent
                "max_tokens": 500,   # Giới hạn để tránh phản hồi quá dài
            }
        )
    
    def create_sales_assistant_agent(self) -> Agent:
        """Tạo sales assistant tối ưu cho conversion"""
        
        sales_instructions = """
        Bạn là chuyên viên tư vấn bán hàng thông minh và thấu hiểu khách hàng.
        
        CHIẾN LƯỢC BÁN HÀNG:
        - Lắng nghe và hiểu nhu cầu thực sự của khách hàng
        - Tư vấn sản phẩm phù hợp nhất, không ép buộc
        - Làm nổi bật lợi ích cụ thể cho từng khách hàng
        - Xây dựng lòng tin qua việc cung cấp thông tin chính xác
        
        CÔNG CỤ PERSUASION:
        - Sử dụng social proof (đánh giá, testimonials)
        - Tạo urgency hợp lý (khuyến mãi có thời hạn)
        - Giải quyết objections một cách tự nhiên
        - Đề xuất upsell/cross-sell phù hợp
        
        QUY TRÌNH CHỐT ĐƠN:
        1. Khảo sát nhu cầu và budget
        2. Giới thiệu sản phẩm phù hợp
        3. Xử lý thắc mắc và lo ngại
        4. Tạo động lực mua hàng
        5. Hướng dẫn thực hiện giao dịch
        
        KPI: Tối ưu conversion rate và customer satisfaction.
        """
        
        return Agent(
            name="Sales Assistant Pro",
            instructions=sales_instructions,
            model="gpt-4o",  # Model tốt hơn cho sales conversations
            model_settings={
                "temperature": 0.7,  # Creative hơn để engaging
                "max_tokens": 600,
            }
        )

# Demo production setup
async def demo_production_setup():
    """Demo việc setup production environment"""
    
    print("🏭 **Production Environment Setup Demo**\n")
    
    # Setup environment manager
    env_manager = ProductionEnvironmentManager()
    
    # Test cấu hình cho các environments khác nhau
    environments = [Environment.DEVELOPMENT, Environment.STAGING, Environment.PRODUCTION]
    
    for env in environments:
        print(f"📋 **Environment: {env.value.upper()}**")
        
        config = env_manager.get_config(env)
        
        print(f"   • Max Concurrent: {config.max_concurrent_requests}")
        print(f"   • Timeout: {config.request_timeout}s")
        print(f"   • Rate Limit: {config.rate_limit_per_minute}/min")
        print(f"   • Cost Limit: ${config.cost_limit_per_day}/day")
        print(f"   • Tracing: {config.enable_tracing}")
        print(f"   • Monitoring: {config.monitoring_enabled}")
        print(f"   • Log Level: {config.log_level}")
        print()
    
    # Setup production environment
    prod_config = env_manager.setup_environment(Environment.PRODUCTION)
    
    # Tạo production agents
    agent_factory = ProductionAgentFactory(env_manager)
    
    customer_agent = agent_factory.create_customer_service_agent()
    sales_agent = agent_factory.create_sales_assistant_agent()
    
    print("🤖 **Production Agents Created:**")
    print(f"{customer_agent.name} - Optimized for support")
    print(f"{sales_agent.name} - Optimized for conversion")
    
    return env_manager, agent_factory

if __name__ == "__main__":
    import asyncio
    asyncio.run(demo_production_setup())

Performance Optimization Strategies

Request Processing Optimization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
import asyncio
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import aiohttp
from agents import Agent, Runner

@dataclass
class RequestMetrics:
    """Metrics cho từng request"""
    request_id: str
    start_time: float
    end_time: Optional[float] = None
    processing_time: Optional[float] = None
    tokens_used: int = 0
    success: bool = False
    error_message: Optional[str] = None

class HighPerformanceRequestProcessor:
    """Xử lý requests với performance cao"""
    
    def __init__(self, max_concurrent: int = 50):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.request_queue = asyncio.Queue()
        self.active_requests: Dict[str, RequestMetrics] = {}
        self.completed_requests: List[RequestMetrics] = []
        
        # Performance tracking
        self.total_requests = 0
        self.successful_requests = 0
        self.average_response_time = 0.0
        
        # Start background workers
        self.workers = []
        for i in range(min(max_concurrent, 10)):  # Tối đa 10 workers
            worker = asyncio.create_task(self._request_worker(f"worker_{i}"))
            self.workers.append(worker)
    
    async def _request_worker(self, worker_id: str):
        """Background worker xử lý requests từ queue"""
        
        print(f"🔧 Worker {worker_id} started")
        
        while True:
            try:
                # Lấy request từ queue
                request_data = await self.request_queue.get()
                
                if request_data is None:  # Shutdown signal
                    break
                
                # Xử lý request
                await self._process_single_request(request_data)
                
                # Mark task done
                self.request_queue.task_done()
                
            except Exception as e:
                print(f"❌ Worker {worker_id} error: {str(e)}")
    
    async def _process_single_request(self, request_data: Dict[str, Any]):
        """Xử lý một request cụ thể"""
        
        request_id = request_data["id"]
        agent = request_data["agent"]
        user_input = request_data["input"]
        
        # Tạo metrics tracking
        metrics = RequestMetrics(
            request_id=request_id,
            start_time=time.time()
        )
        
        self.active_requests[request_id] = metrics
        
        async with self.semaphore:  # Limit concurrent requests
            try:
                # Xử lý request với timeout
                result = await asyncio.wait_for(
                    Runner.run(agent, user_input),
                    timeout=30.0  # 30 second timeout
                )
                
                # Update metrics
                metrics.end_time = time.time()
                metrics.processing_time = metrics.end_time - metrics.start_time
                metrics.tokens_used = len(result.final_output.split())  # Approximate
                metrics.success = True
                
                # Store result
                request_data["result"] = result.final_output
                request_data["success"] = True
                
                self.successful_requests += 1
                
            except asyncio.TimeoutError:
                metrics.end_time = time.time()
                metrics.processing_time = metrics.end_time - metrics.start_time
                metrics.error_message = "Request timeout"
                request_data["error"] = "Request timeout after 30 seconds"
                request_data["success"] = False
                
            except Exception as e:
                metrics.end_time = time.time()
                metrics.processing_time = metrics.end_time - metrics.start_time
                metrics.error_message = str(e)
                request_data["error"] = str(e)
                request_data["success"] = False
            
            finally:
                # Update global metrics
                self.total_requests += 1
                
                # Calculate running average
                if metrics.processing_time:
                    self.average_response_time = (
                        (self.average_response_time * (self.total_requests - 1) + metrics.processing_time) 
                        / self.total_requests
                    )
                
                # Move to completed
                self.completed_requests.append(metrics)
                del self.active_requests[request_id]
                
                # Keep only last 1000 completed requests
                if len(self.completed_requests) > 1000:
                    self.completed_requests = self.completed_requests[-1000:]
    
    async def submit_request(self, agent: Agent, user_input: str) -> str:
        """Submit request và chờ kết quả"""
        
        request_id = f"req_{int(time.time() * 1000)}_{len(self.active_requests)}"
        
        # Tạo request data với callback mechanism
        request_data = {
            "id": request_id,
            "agent": agent,
            "input": user_input,
            "future": asyncio.Future()
        }
        
        # Add to queue
        await self.request_queue.put(request_data)
        
        # Wait for completion (polling approach)
        while request_id in self.active_requests or request_data.get("result") is None:
            if request_data.get("success") is not None:  # Completed
                break
            await asyncio.sleep(0.1)  # Poll every 100ms
        
        if request_data.get("success"):
            return request_data["result"]
        else:
            raise Exception(request_data.get("error", "Unknown error"))
    
    def get_performance_stats(self) -> Dict[str, Any]:
        """Lấy thống kê performance"""
        
        success_rate = (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0
        
        # Recent performance (last 100 requests)
        recent_requests = self.completed_requests[-100:]
        recent_avg_time = sum(r.processing_time for r in recent_requests if r.processing_time) / len(recent_requests) if recent_requests else 0
        
        return {
            "total_requests": self.total_requests,
            "successful_requests": self.successful_requests,
            "success_rate": success_rate,
            "average_response_time": self.average_response_time,
            "recent_average_response_time": recent_avg_time,
            "active_requests": len(self.active_requests),
            "queue_size": self.request_queue.qsize(),
            "max_concurrent": self.max_concurrent
        }
    
    async def shutdown(self):
        """Graceful shutdown"""
        print("🛑 Shutting down request processor...")
        
        # Stop workers
        for _ in self.workers:
            await self.request_queue.put(None)  # Shutdown signal
        
        # Wait for workers to finish
        await asyncio.gather(*self.workers, return_exceptions=True)
        
        print("✅ Request processor shutdown complete")

# Load testing system
class LoadTester:
    """Công cụ load testing cho agents"""
    
    def __init__(self, processor: HighPerformanceRequestProcessor):
        self.processor = processor
    
    async def run_load_test(
        self, 
        agent: Agent, 
        test_queries: List[str], 
        concurrent_users: int = 10,
        requests_per_user: int = 5
    ) -> Dict[str, Any]:
        """Chạy load test với concurrent users"""
        
        print(f"🚀 Starting load test:")
        print(f"   • Concurrent users: {concurrent_users}")
        print(f"   • Requests per user: {requests_per_user}")
        print(f"   • Total requests: {concurrent_users * requests_per_user}")
        
        start_time = time.time()
        
        # Tạo tasks cho concurrent users
        user_tasks = []
        
        for user_id in range(concurrent_users):
            user_task = self._simulate_user_requests(
                agent, test_queries, requests_per_user, f"user_{user_id}"
            )
            user_tasks.append(user_task)
        
        # Chạy tất cả user tasks đồng thời
        user_results = await asyncio.gather(*user_tasks, return_exceptions=True)
        
        end_time = time.time()
        total_duration = end_time - start_time
        
        # Tổng hợp kết quả
        total_requests = 0
        successful_requests = 0
        
        for result in user_results:
            if isinstance(result, dict):
                total_requests += result["requests"]
                successful_requests += result["successful"]
        
        # Performance stats
        final_stats = self.processor.get_performance_stats()
        
        return {
            "load_test_summary": {
                "concurrent_users": concurrent_users,
                "requests_per_user": requests_per_user,
                "total_duration": total_duration,
                "requests_per_second": total_requests / total_duration,
                "success_rate": (successful_requests / total_requests * 100) if total_requests > 0 else 0
            },
            "performance_stats": final_stats,
            "individual_user_results": [r for r in user_results if isinstance(r, dict)]
        }
    
    async def _simulate_user_requests(
        self, 
        agent: Agent, 
        queries: List[str], 
        num_requests: int, 
        user_id: str
    ) -> Dict[str, Any]:
        """Mô phỏng requests của một user"""
        
        import random
        
        successful = 0
        failed = 0
        response_times = []
        
        for i in range(num_requests):
            try:
                # Random query
                query = random.choice(queries)
                
                # Measure response time
                start = time.time()
                result = await self.processor.submit_request(agent, query)
                end = time.time()
                
                response_times.append(end - start)
                successful += 1
                
                # Random delay giữa requests (0.1-1s)
                await asyncio.sleep(random.uniform(0.1, 1.0))
                
            except Exception as e:
                failed += 1
                print(f"{user_id} request {i+1} failed: {str(e)}")
        
        return {
            "user_id": user_id,
            "requests": num_requests,
            "successful": successful,
            "failed": failed,
            "avg_response_time": sum(response_times) / len(response_times) if response_times else 0,
            "min_response_time": min(response_times) if response_times else 0,
            "max_response_time": max(response_times) if response_times else 0
        }

# Demo performance optimization
async def demo_performance_optimization():
    """Demo hệ thống performance optimization"""
    
    print("⚡ **Performance Optimization Demo**\n")
    
    # Setup high-performance processor
    processor = HighPerformanceRequestProcessor(max_concurrent=20)
    
    # Tạo test agent
    test_agent = Agent(
        name="Performance Test Agent",
        instructions="""
        Bạn là một AI assistant được tối ưu cho performance testing.
        
        Quy tắc phản hồi:
        - Trả lời ngắn gọn và súc tích
        - Tập trung vào thông tin quan trọng
        - Tránh các câu văn dài dòng
        - Sử dụng bullet points khi phù hợp
        
        Mục tiêu: Cung cấp thông tin hữu ích với thời gian phản hồi tối ưu.
        """,
        model="gpt-4o-mini",  # Model nhanh cho testing
        model_settings={"temperature": 0.3, "max_tokens": 200}
    )
    
    # Test queries
    test_queries = [
        "Machine learning là gì?",
        "Lợi ích của cloud computing",
        "Cách tối ưu database performance",
        "Best practices cho API design",
        "Sự khác biệt giữa REST và GraphQL",
        "Microservices vs Monolith",
        "Docker container hoạt động như thế nào?",
        "CI/CD pipeline là gì?",
        "Cách secure một web application",
        "Performance monitoring tools nào tốt?"
    ]
    
    # Chạy load test
    load_tester = LoadTester(processor)
    
    print("🔥 Bắt đầu load test...")
    
    load_test_results = await load_tester.run_load_test(
        agent=test_agent,
        test_queries=test_queries,
        concurrent_users=8,
        requests_per_user=3
    )
    
    # Hiển thị kết quả
    summary = load_test_results["load_test_summary"]
    print(f"\n📊 **Load Test Results:**")
    print(f"   • Total Duration: {summary['total_duration']:.2f}s")
    print(f"   • Requests/Second: {summary['requests_per_second']:.2f}")
    print(f"   • Success Rate: {summary['success_rate']:.1f}%")
    
    perf_stats = load_test_results["performance_stats"]
    print(f"\n⚡ **Performance Stats:**")
    print(f"   • Total Requests: {perf_stats['total_requests']}")
    print(f"   • Average Response Time: {perf_stats['average_response_time']:.3f}s")
    print(f"   • Recent Average: {perf_stats['recent_average_response_time']:.3f}s")
    print(f"   • Max Concurrent: {perf_stats['max_concurrent']}")
    
    # User performance breakdown
    user_results = load_test_results["individual_user_results"]
    print(f"\n👥 **Per-User Performance:**")
    for user_result in user_results[:5]:  # Show first 5 users
        print(f"{user_result['user_id']}: {user_result['successful']}/{user_result['requests']} success, avg {user_result['avg_response_time']:.3f}s")
    
    # Cleanup
    await processor.shutdown()

if __name__ == "__main__":
    import asyncio
    asyncio.run(demo_performance_optimization())

Cost Optimization và Resource Management

Intelligent Cost Control

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
import time
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum

class CostTier(Enum):
    """Các mức giá khác nhau cho different models"""
    BUDGET = "budget"      # gpt-4o-mini
    STANDARD = "standard"  # gpt-4o
    PREMIUM = "premium"    # gpt-4-turbo, o1

@dataclass
class ModelCostConfig:
    """Cấu hình cost cho từng model"""
    model_name: str
    cost_per_1k_input_tokens: float
    cost_per_1k_output_tokens: float
    tier: CostTier
    performance_score: int  # 1-10, cao hơn = performance tốt hơn

class IntelligentCostManager:
    """Quản lý chi phí thông minh cho OpenAI API"""
    
    def __init__(self, daily_budget: float = 100.0):
        self.daily_budget = daily_budget
        self.current_spend = 0.0
        self.spend_history: List[Dict[str, Any]] = []
        self.last_reset = datetime.now().date()
        
        # Model cost configurations (giá tính theo $)
        self.model_costs = {
            "gpt-4o-mini": ModelCostConfig(
                model_name="gpt-4o-mini",
                cost_per_1k_input_tokens=0.00015,
                cost_per_1k_output_tokens=0.0006,
                tier=CostTier.BUDGET,
                performance_score=7
            ),
            "gpt-4o": ModelCostConfig(
                model_name="gpt-4o",
                cost_per_1k_input_tokens=0.0025,
                cost_per_1k_output_tokens=0.01,
                tier=CostTier.STANDARD,
                performance_score=9
            ),
            "gpt-4-turbo": ModelCostConfig(
                model_name="gpt-4-turbo",
                cost_per_1k_input_tokens=0.01,
                cost_per_1k_output_tokens=0.03,
                tier=CostTier.PREMIUM,
                performance_score=10
            )
        }
        
        # Thresholds cho intelligent switching
        self.cost_thresholds = {
            "warning": 0.7,   # 70% of budget
            "critical": 0.9,  # 90% of budget
            "emergency": 0.95 # 95% of budget
        }
    
    def _reset_daily_budget_if_needed(self):
        """Reset budget nếu qua ngày mới"""
        today = datetime.now().date()
        
        if today > self.last_reset:
            print(f"📅 New day detected, resetting budget")
            print(f"   Yesterday's spend: ${self.current_spend:.4f}")
            
            # Archive yesterday's data
            self.spend_history.append({
                "date": self.last_reset.isoformat(),
                "total_spend": self.current_spend,
                "budget": self.daily_budget,
                "utilization": self.current_spend / self.daily_budget
            })
            
            # Reset for new day
            self.current_spend = 0.0
            self.last_reset = today
    
    def calculate_request_cost(
        self, 
        model: str, 
        input_tokens: int, 
        output_tokens: int
    ) -> float:
        """Tính cost cho một request cụ thể"""
        
        if model not in self.model_costs:
            print(f"⚠️ Unknown model {model}, using default pricing")
            model = "gpt-4o"  # Default fallback
        
        config = self.model_costs[model]
        
        input_cost = (input_tokens / 1000) * config.cost_per_1k_input_tokens
        output_cost = (output_tokens / 1000) * config.cost_per_1k_output_tokens
        
        return input_cost + output_cost
    
    def recommend_model(
        self, 
        complexity_score: int,  # 1-10, cao hơn = phức tạp hơn
        priority: str = "balanced"  # "cost", "performance", "balanced"
    ) -> str:
        """Recommend model dựa trên complexity và priority"""
        
        self._reset_daily_budget_if_needed()
        
        # Check budget status
        budget_utilization = self.current_spend / self.daily_budget
        
        # Emergency mode - chỉ dùng model rẻ nhất
        if budget_utilization >= self.cost_thresholds["emergency"]:
            print(f"🚨 Emergency mode: Budget {budget_utilization:.1%} used, forcing budget model")
            return "gpt-4o-mini"
        
        # Critical mode - prefer budget models
        elif budget_utilization >= self.cost_thresholds["critical"]:
            print(f"⚠️ Critical budget mode: {budget_utilization:.1%} used")
            if complexity_score <= 6:
                return "gpt-4o-mini"
            else:
                return "gpt-4o"  # Minimum acceptable for complex tasks
        
        # Warning mode - be more conservative
        elif budget_utilization >= self.cost_thresholds["warning"]:
            print(f"⚠️ Budget warning: {budget_utilization:.1%} used")
            if priority == "cost" or complexity_score <= 4:
                return "gpt-4o-mini"
            elif complexity_score <= 8:
                return "gpt-4o"
            else:
                return "gpt-4-turbo"
        
        # Normal mode - optimize based on priority
        else:
            if priority == "cost":
                return "gpt-4o-mini" if complexity_score <= 6 else "gpt-4o"
            elif priority == "performance":
                return "gpt-4-turbo" if complexity_score >= 7 else "gpt-4o"
            else:  # balanced
                if complexity_score <= 4:
                    return "gpt-4o-mini"
                elif complexity_score <= 8:
                    return "gpt-4o"
                else:
                    return "gpt-4-turbo"
    
    def record_usage(
        self, 
        model: str, 
        input_tokens: int, 
        output_tokens: int,
        request_metadata: Dict[str, Any] = None
    ):
        """Ghi nhận usage và update budget tracking"""
        
        cost = self.calculate_request_cost(model, input_tokens, output_tokens)
        self.current_spend += cost
        
        # Log usage
        usage_record = {
            "timestamp": datetime.now().isoformat(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost": cost,
            "cumulative_spend": self.current_spend,
            "metadata": request_metadata or {}
        }
        
        print(f"💰 Usage recorded: {model} - ${cost:.4f} (Total today: ${self.current_spend:.4f})")
        
        return usage_record
    
    def get_budget_status(self) -> Dict[str, Any]:
        """Lấy trạng thái budget hiện tại"""
        
        self._reset_daily_budget_if_needed()
        
        utilization = self.current_spend / self.daily_budget
        remaining = self.daily_budget - self.current_spend
        
        # Determine status
        if utilization >= self.cost_thresholds["emergency"]:
            status = "emergency"
        elif utilization >= self.cost_thresholds["critical"]:
            status = "critical"
        elif utilization >= self.cost_thresholds["warning"]:
            status = "warning"
        else:
            status = "healthy"
        
        return {
            "date": self.last_reset.isoformat(),
            "daily_budget": self.daily_budget,
            "current_spend": self.current_spend,
            "remaining_budget": remaining,
            "utilization_percentage": utilization * 100,
            "status": status,
            "recommended_action": self._get_budget_recommendation(status, utilization)
        }
    
    def _get_budget_recommendation(self, status: str, utilization: float) -> str:
        """Đề xuất hành động dựa trên budget status"""
        
        if status == "emergency":
            return "Switch to budget models only, consider increasing daily budget"
        elif status == "critical":
            return "Prefer budget models, monitor usage closely"
        elif status == "warning":
            return "Be more selective with premium models"
        else:
            return "Budget healthy, can use models as needed"
    
    def get_cost_analytics(self) -> Dict[str, Any]:
        """Phân tích chi phí và đưa ra insights"""
        
        if not self.spend_history:
            return {"message": "Insufficient historical data"}
        
        # Recent performance
        recent_days = self.spend_history[-7:] if len(self.spend_history) >= 7 else self.spend_history
        
        avg_daily_spend = sum(day["total_spend"] for day in recent_days) / len(recent_days)
        avg_utilization = sum(day["utilization"] for day in recent_days) / len(recent_days)
        
        # Trends
        if len(recent_days) >= 2:
            recent_trend = recent_days[-1]["total_spend"] - recent_days[-2]["total_spend"]
            trend_direction = "increasing" if recent_trend > 0 else "decreasing" if recent_trend < 0 else "stable"
        else:
            trend_direction = "insufficient_data"
        
        return {
            "analytics_period": f"Last {len(recent_days)} days",
            "average_daily_spend": avg_daily_spend,
            "average_utilization": avg_utilization * 100,
            "trend_direction": trend_direction,
            "total_historical_spend": sum(day["total_spend"] for day in self.spend_history),
            "budget_efficiency": "good" if avg_utilization < 0.8 else "needs_optimization",
            "recommendations": self._generate_cost_recommendations(avg_utilization, trend_direction)
        }
    
    def _generate_cost_recommendations(self, avg_utilization: float, trend: str) -> List[str]:
        """Generate cost optimization recommendations"""
        
        recommendations = []
        
        if avg_utilization > 0.9:
            recommendations.append("Consider increasing daily budget or optimizing model usage")
        
        if trend == "increasing":
            recommendations.append("Monitor increasing cost trend, review model selection strategy")
        
        if avg_utilization < 0.5:
            recommendations.append("Budget utilization is low, could afford higher performance models")
        
        recommendations.append("Regularly review model performance vs cost trade-offs")
        recommendations.append("Consider implementing request batching for similar queries")
        
        return recommendations

# Cost-optimized agent wrapper
class CostOptimizedAgent:
    """Agent wrapper với intelligent cost optimization"""
    
    def __init__(self, base_agent: Agent, cost_manager: IntelligentCostManager):
        self.base_agent = base_agent
        self.cost_manager = cost_manager
        
    def _assess_query_complexity(self, query: str) -> int:
        """Đánh giá độ phức tạp của query (1-10)"""
        
        complexity_indicators = {
            "simple_keywords": ["gì là", "là gì", "nghĩa là", "how to", "what is"],
            "complex_keywords": ["phân tích", "so sánh", "đánh giá", "analyze", "compare", "evaluate"],
            "very_complex_keywords": ["strategy", "chiến lược", "optimization", "tối ưu", "comprehensive"]
        }
        
        query_lower = query.lower()
        
        # Base complexity từ length
        base_complexity = min(len(query) // 50 + 1, 5)  # 1-5 based on length
        
        # Adjust dựa trên keywords
        if any(keyword in query_lower for keyword in complexity_indicators["very_complex_keywords"]):
            complexity = max(base_complexity + 4, 8)  # 8-10
        elif any(keyword in query_lower for keyword in complexity_indicators["complex_keywords"]):
            complexity = max(base_complexity + 2, 6)  # 6-8
        elif any(keyword in query_lower for keyword in complexity_indicators["simple_keywords"]):
            complexity = min(base_complexity, 4)  # 1-4
        else:
            complexity = base_complexity
        
        return min(complexity, 10)
    
    async def run_cost_optimized(
        self, 
        query: str, 
        priority: str = "balanced"
    ) -> Dict[str, Any]:
        """Run agent với cost optimization"""
        
        # Assess complexity
        complexity = self._assess_query_complexity(query)
        
        # Get recommended model
        recommended_model = self.cost_manager.recommend_model(complexity, priority)
        
        print(f"🤖 Query complexity: {complexity}/10")
        print(f"💡 Recommended model: {recommended_model}")
        
        # Update agent model
        original_model = self.base_agent.model
        self.base_agent.model = recommended_model
        
        try:
            # Run agent
            start_time = time.time()
            result = await Runner.run(self.base_agent, query)
            end_time = time.time()
            
            # Estimate token usage (rough approximation)
            input_tokens = len(query.split()) * 1.3  # Approximate tokens
            output_tokens = len(result.final_output.split()) * 1.3
            
            # Record usage
            usage_record = self.cost_manager.record_usage(
                model=recommended_model,
                input_tokens=int(input_tokens),
                output_tokens=int(output_tokens),
                request_metadata={
                    "query_complexity": complexity,
                    "priority": priority,
                    "response_time": end_time - start_time
                }
            )
            
            return {
                "success": True,
                "result": result.final_output,
                "model_used": recommended_model,
                "complexity_score": complexity,
                "cost_info": {
                    "estimated_cost": usage_record["cost"],
                    "input_tokens": usage_record["input_tokens"],
                    "output_tokens": usage_record["output_tokens"]
                },
                "budget_status": self.cost_manager.get_budget_status()
            }
            
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "model_used": recommended_model,
                "complexity_score": complexity
            }
        
        finally:
            # Restore original model
            self.base_agent.model = original_model

# Demo cost optimization
async def demo_cost_optimization():
    """Demo intelligent cost optimization"""
    
    print("💰 **Cost Optimization Demo**\n")
    
    # Setup cost manager với budget nhỏ để demo
    cost_manager = IntelligentCostManager(daily_budget=5.0)  # $5/day for demo
    
    # Tạo base agent
    base_agent = Agent(
        name="Cost-Optimized Assistant",
        instructions="""
        Bạn là một AI assistant được tối ưu về cost và performance.
        
        Nguyên tắc:
        - Trả lời chính xác và hữu ích
        - Tối ưu độ dài phản hồi phù hợp với độ phức tạp câu hỏi
        - Ưu tiên thông tin quan trọng nhất
        - Tránh dài dòng không cần thiết
        """,
        model="gpt-4o"  # Default model
    )
    
    # Wrap với cost optimization
    cost_optimized_agent = CostOptimizedAgent(base_agent, cost_manager)
    
    # Test queries với độ phức tạp khác nhau
    test_scenarios = [
        {
            "query": "Python là gì?",
            "priority": "cost",
            "expected_complexity": "low"
        },
        {
            "query": "So sánh performance giữa React và Vue.js, phân tích ưu nhược điểm của từng framework",
            "priority": "balanced", 
            "expected_complexity": "medium"
        },
        {
            "query": "Xây dựng chiến lược comprehensive để optimize database performance cho enterprise application với millions of users",
            "priority": "performance",
            "expected_complexity": "high"
        },
        {
            "query": "Cách install Docker?",
            "priority": "cost",
            "expected_complexity": "low"
        }
    ]
    
    total_cost = 0.0
    
    for i, scenario in enumerate(test_scenarios, 1):
        print(f"🎯 **Test {i}: {scenario['expected_complexity'].title()} Complexity**")
        print(f"📝 Query: {scenario['query'][:80]}...")
        print(f"🎖️ Priority: {scenario['priority']}")
        
        result = await cost_optimized_agent.run_cost_optimized(
            scenario["query"],
            scenario["priority"]
        )
        
        if result["success"]:
            cost_info = result["cost_info"]
            total_cost += cost_info["estimated_cost"]
            
            print(f"✅ Model: {result['model_used']}")
            print(f"💰 Cost: ${cost_info['estimated_cost']:.4f}")
            print(f"📊 Tokens: {cost_info['input_tokens']:.0f} in, {cost_info['output_tokens']:.0f} out")
            print(f"🎯 Response: {result['result'][:150]}...")
            
            # Budget status
            budget_status = result["budget_status"]
            print(f"💳 Budget: ${budget_status['current_spend']:.4f}/${budget_status['daily_budget']:.2f} ({budget_status['utilization_percentage']:.1f}%)")
            
        else:
            print(f"❌ Error: {result['error']}")
        
        print("-" * 80 + "\n")
    
    # Final cost analytics
    analytics = cost_manager.get_cost_analytics()
    
    print("📈 **Cost Analytics:**")
    if "message" not in analytics:
        print(f"💵 Total Demo Cost: ${total_cost:.4f}")
        print(f"📊 Average Daily Spend: ${analytics['average_daily_spend']:.4f}")
        print(f"📈 Budget Efficiency: {analytics['budget_efficiency']}")
        
        print(f"\n💡 **Recommendations:**")
        for rec in analytics["recommendations"]:
            print(f"{rec}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(demo_cost_optimization())

Security và Compliance

Production Security Framework

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
import hashlib
import secrets
import jwt
from typing import Dict, Any, List, Optional, Set
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
import re

class SecurityLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class SecurityPolicy:
    """Chính sách bảo mật cho agents"""
    max_input_length: int = 10000
    allowed_file_types: Set[str] = None
    rate_limit_per_user: int = 100  # requests per hour
    session_timeout_minutes: int = 30
    require_authentication: bool = True
    log_all_interactions: bool = True
    content_filtering_enabled: bool = True
    pii_detection_enabled: bool = True
    
    def __post_init__(self):
        if self.allowed_file_types is None:
            self.allowed_file_types = {'.txt', '.pdf', '.doc', '.docx'}

class ProductionSecurityManager:
    """Quản lý bảo mật cho production environment"""
    
    def __init__(self, secret_key: str, security_level: SecurityLevel = SecurityLevel.HIGH):
        self.secret_key = secret_key
        self.security_level = security_level
        self.security_policy = self._create_security_policy()
        
        # Tracking security events
        self.security_events: List[Dict[str, Any]] = []
        self.blocked_ips: Set[str] = set()
        self.rate_limits: Dict[str, List[datetime]] = {}
        
        # PII patterns
        self.pii_patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'(\+84|0)[0-9]{9,10}',
            'credit_card': r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
            'ssn': r'\b\d{3}-?\d{2}-?\d{4}\b'
        }
        
        print(f"🔐 Security Manager initialized with {security_level.value} security level")
    
    def _create_security_policy(self) -> SecurityPolicy:
        """Tạo security policy dựa trên security level"""
        
        if self.security_level == SecurityLevel.CRITICAL:
            return SecurityPolicy(
                max_input_length=5000,
                rate_limit_per_user=50,
                session_timeout_minutes=15,
                require_authentication=True,
                log_all_interactions=True,
                content_filtering_enabled=True,
                pii_detection_enabled=True
            )
        elif self.security_level == SecurityLevel.HIGH:
            return SecurityPolicy(
                max_input_length=8000,
                rate_limit_per_user=100,
                session_timeout_minutes=30,
                require_authentication=True,
                log_all_interactions=True,
                content_filtering_enabled=True,
                pii_detection_enabled=True
            )
        elif self.security_level == SecurityLevel.MEDIUM:
            return SecurityPolicy(
                max_input_length=10000,
                rate_limit_per_user=200,
                session_timeout_minutes=60,
                require_authentication=True,
                log_all_interactions=True,
                content_filtering_enabled=True,
                pii_detection_enabled=False
            )
        else:  # LOW
            return SecurityPolicy(
                max_input_length=15000,
                rate_limit_per_user=500,
                session_timeout_minutes=120,
                require_authentication=False,
                log_all_interactions=False,
                content_filtering_enabled=False,
                pii_detection_enabled=False
            )
    
    def generate_secure_token(self, user_id: str, permissions: List[str]) -> str:
        """Tạo secure JWT token"""
        
        payload = {
            'user_id': user_id,
            'permissions': permissions,
            'iat': datetime.utcnow(),
            'exp': datetime.utcnow() + timedelta(hours=24),
            'nonce': secrets.token_hex(16)
        }
        
        token = jwt.encode(payload, self.secret_key, algorithm='HS256')
        
        # Log token generation
        self._log_security_event(
            event_type="token_generated",
            user_id=user_id,
            details={"permissions": permissions}
        )
        
        return token
    
    def validate_token(self, token: str) -> Optional[Dict[str, Any]]:
        """Validate JWT token"""
        
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            
            # Check if token is not expired
            if datetime.utcnow() > datetime.fromisoformat(payload['exp'].replace('Z', '+00:00')):
                self._log_security_event(
                    event_type="token_expired",
                    user_id=payload.get('user_id'),
                    details={"token_hash": hashlib.sha256(token.encode()).hexdigest()[:16]}
                )
                return None
            
            return payload
            
        except jwt.InvalidTokenError as e:
            self._log_security_event(
                event_type="invalid_token",
                details={"error": str(e), "token_hash": hashlib.sha256(token.encode()).hexdigest()[:16]}
            )
            return None
    
    def check_rate_limit(self, user_id: str, ip_address: str) -> bool:
        """Kiểm tra rate limiting"""
        
        if ip_address in self.blocked_ips:
            self._log_security_event(
                event_type="blocked_ip_access",
                user_id=user_id,
                details={"ip": ip_address}
            )
            return False
        
        now = datetime.now()
        hour_ago = now - timedelta(hours=1)
        
        # Clean old requests
        if user_id in self.rate_limits:
            self.rate_limits[user_id] = [
                req_time for req_time in self.rate_limits[user_id]
                if req_time > hour_ago
            ]
        else:
            self.rate_limits[user_id] = []
        
        # Check rate limit
        current_requests = len(self.rate_limits[user_id])
        
        if current_requests >= self.security_policy.rate_limit_per_user:
            self._log_security_event(
                event_type="rate_limit_exceeded",
                user_id=user_id,
                details={"requests_in_hour": current_requests, "limit": self.security_policy.rate_limit_per_user}
            )
            return False
        
        # Record this request
        self.rate_limits[user_id].append(now)
        return True
    
    def validate_input(self, user_input: str, user_id: str) -> Dict[str, Any]:
        """Validate user input cho security threats"""
        
        validation_result = {
            "valid": True,
            "issues": [],
            "risk_level": "low",
            "sanitized_input": user_input
        }
        
        # Check input length
        if len(user_input) > self.security_policy.max_input_length:
            validation_result["valid"] = False
            validation_result["issues"].append(f"Input too long: {len(user_input)} > {self.security_policy.max_input_length}")
            validation_result["risk_level"] = "medium"
        
        # Check for potential injection attacks
        injection_patterns = [
            r'<script[^>]*>.*?</script>',  # XSS
            r'javascript:',
            r'on\w+\s*=',  # Event handlers
            r'union\s+select',  # SQL injection
            r'drop\s+table',
            r'insert\s+into',
            r'delete\s+from'
        ]
        
        for pattern in injection_patterns:
            if re.search(pattern, user_input, re.IGNORECASE):
                validation_result["valid"] = False
                validation_result["issues"].append(f"Potential injection attack detected: {pattern}")
                validation_result["risk_level"] = "high"
        
        # PII detection
        if self.security_policy.pii_detection_enabled:
            pii_found = self._detect_pii(user_input)
            if pii_found:
                validation_result["issues"].append(f"PII detected: {', '.join(pii_found.keys())}")
                validation_result["risk_level"] = "medium"
                
                # Sanitize PII
                sanitized = user_input
                for pii_type, matches in pii_found.items():
                    for match in matches:
                        sanitized = sanitized.replace(match, f"[{pii_type.upper()}_REDACTED]")
                validation_result["sanitized_input"] = sanitized
        
        # Content filtering
        if self.security_policy.content_filtering_enabled:
            if self._contains_inappropriate_content(user_input):
                validation_result["valid"] = False
                validation_result["issues"].append("Inappropriate content detected")
                validation_result["risk_level"] = "high"
        
        # Log security validation
        if not validation_result["valid"] or validation_result["issues"]:
            self._log_security_event(
                event_type="input_validation_failed",
                user_id=user_id,
                details={
                    "issues": validation_result["issues"],
                    "risk_level": validation_result["risk_level"],
                    "input_length": len(user_input)
                }
            )
        
        return validation_result
    
    def _detect_pii(self, text: str) -> Dict[str, List[str]]:
        """Phát hiện PII trong text"""
        
        pii_found = {}
        
        for pii_type, pattern in self.pii_patterns.items():
            matches = re.findall(pattern, text)
            if matches:
                pii_found[pii_type] = matches
        
        return pii_found
    
    def _contains_inappropriate_content(self, text: str) -> bool:
        """Kiểm tra inappropriate content"""
        
        inappropriate_keywords = [
            'hack', 'exploit', 'malware', 'phishing',
            'illegal', 'fraud', 'scam'
        ]
        
        text_lower = text.lower()
        return any(keyword in text_lower for keyword in inappropriate_keywords)
    
    def _log_security_event(
        self, 
        event_type: str, 
        user_id: str = None, 
        details: Dict[str, Any] = None
    ):
        """Log security events"""
        
        event = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "user_id": user_id,
            "details": details or {},
            "security_level": self.security_level.value
        }
        
        self.security_events.append(event)
        
        # Keep only last 1000 events
        if len(self.security_events) > 1000:
            self.security_events = self.security_events[-1000:]
        
        # Print critical events
        if event_type in ["blocked_ip_access", "rate_limit_exceeded", "input_validation_failed"]:
            print(f"🚨 Security Event: {event_type} - User: {user_id}")
    
    def get_security_report(self) -> Dict[str, Any]:
        """Tạo báo cáo bảo mật"""
        
        if not self.security_events:
            return {"message": "No security events recorded"}
        
        # Event statistics
        event_types = {}
        for event in self.security_events:
            event_type = event["event_type"]
            event_types[event_type] = event_types.get(event_type, 0) + 1
        
        # Recent events (last 24 hours)
        day_ago = datetime.now() - timedelta(hours=24)
        recent_events = [
            event for event in self.security_events
            if datetime.fromisoformat(event["timestamp"]) > day_ago
        ]
        
        # Risk assessment
        high_risk_events = [
            event for event in recent_events
            if event["event_type"] in ["blocked_ip_access", "rate_limit_exceeded", "input_validation_failed"]
        ]
        
        risk_level = "high" if len(high_risk_events) > 10 else "medium" if len(high_risk_events) > 3 else "low"
        
        return {
            "report_generated": datetime.now().isoformat(),
            "security_level": self.security_level.value,
            "total_events": len(self.security_events),
            "events_last_24h": len(recent_events),
            "event_type_breakdown": event_types,
            "current_risk_level": risk_level,
            "blocked_ips_count": len(self.blocked_ips),
            "active_rate_limits": len(self.rate_limits),
            "recent_high_risk_events": len(high_risk_events),
            "security_recommendations": self._generate_security_recommendations(risk_level, event_types)
        }
    
    def _generate_security_recommendations(
        self, 
        risk_level: str, 
        event_types: Dict[str, int]
    ) -> List[str]:
        """Generate security recommendations"""
        
        recommendations = []
        
        if risk_level == "high":
            recommendations.append("Immediate review of security events required")
            
        if event_types.get("rate_limit_exceeded", 0) > 5:
            recommendations.append("Consider implementing IP-based blocking for repeat offenders")
            
        if event_types.get("input_validation_failed", 0) > 10:
            recommendations.append("Review and strengthen input validation rules")
            
        if event_types.get("invalid_token", 0) > 3:
            recommendations.append("Monitor for potential token brute force attempts")
        
        recommendations.extend([
            "Regularly rotate security keys and tokens",
            "Monitor security events dashboard daily",
            "Keep security policies updated with latest threats",
            "Conduct periodic security audits"
        ])
        
        return recommendations

# Secure agent wrapper
class SecureAgent:
    """Agent wrapper với comprehensive security"""
    
    def __init__(self, agent: Agent, security_manager: ProductionSecurityManager):
        self.agent = agent
        self.security_manager = security_manager
        
    async def secure_run(
        self, 
        user_input: str, 
        user_id: str,
        auth_token: str,
        ip_address: str = "127.0.0.1"
    ) -> Dict[str, Any]:
        """Run agent với full security checks"""
        
        # 1. Token validation
        if self.security_manager.security_policy.require_authentication:
            token_payload = self.security_manager.validate_token(auth_token)
            if not token_payload:
                return {
                    "success": False,
                    "error": "Invalid or expired authentication token",
                    "error_code": "AUTH_FAILED"
                }
            
            # Check if user_id matches token
            if token_payload.get("user_id") != user_id:
                return {
                    "success": False,
                    "error": "User ID mismatch",
                    "error_code": "AUTH_MISMATCH"
                }
        
        # 2. Rate limiting
        if not self.security_manager.check_rate_limit(user_id, ip_address):
            return {
                "success": False,
                "error": "Rate limit exceeded",
                "error_code": "RATE_LIMIT"
            }
        
        # 3. Input validation
        validation_result = self.security_manager.validate_input(user_input, user_id)
        
        if not validation_result["valid"]:
            return {
                "success": False,
                "error": "Input validation failed",
                "error_code": "INVALID_INPUT",
                "validation_issues": validation_result["issues"]
            }
        
        # 4. Run agent với sanitized input
        try:
            result = await Runner.run(self.agent, validation_result["sanitized_input"])
            
            # Log successful interaction
            if self.security_manager.security_policy.log_all_interactions:
                self.security_manager._log_security_event(
                    event_type="agent_interaction_success",
                    user_id=user_id,
                    details={
                        "input_length": len(user_input),
                        "output_length": len(result.final_output),
                        "sanitized": validation_result["sanitized_input"] != user_input
                    }
                )
            
            return {
                "success": True,
                "result": result.final_output,
                "security_info": {
                    "input_sanitized": validation_result["sanitized_input"] != user_input,
                    "risk_level": validation_result["risk_level"]
                }
            }
            
        except Exception as e:
            # Log error
            self.security_manager._log_security_event(
                event_type="agent_execution_error",
                user_id=user_id,
                details={"error": str(e)}
            )
            
            return {
                "success": False,
                "error": "Agent execution failed",
                "error_code": "EXECUTION_ERROR"
            }

# Demo production security
async def demo_production_security():
    """Demo comprehensive production security"""
    
    print("🔐 **Production Security Demo**\n")
    
    # Setup security manager
    security_manager = ProductionSecurityManager(
        secret_key="production_secret_key_2025",
        security_level=SecurityLevel.HIGH
    )
    
    # Tạo secure agent
    base_agent = Agent(
        name="Secure Production Agent",
        instructions="""
        Bạn là một AI assistant được triển khai trong môi trường production bảo mật cao.
        
        Nguyên tắc bảo mật:
        - Không bao giờ tiết lộ thông tin nhạy cảm
        - Từ chối các yêu cầu có thể gây hại
        - Báo cáo các hoạt động đáng ngờ
        - Tuân thủ nghiêm ngặt các chính sách bảo mật
        
        Phong cách phản hồi:
        - Chuyên nghiệp và cẩn thận
        - Minh bạch về giới hạn bảo mật
        - Hướng dẫn người dùng sử dụng an toàn
        """
    )
    
    secure_agent = SecureAgent(base_agent, security_manager)
    
    # Generate token cho test user
    test_user_id = "user_12345"
    auth_token = security_manager.generate_secure_token(
        user_id=test_user_id,
        permissions=["chat", "query"]
    )
    
    print(f"🎫 Generated auth token for {test_user_id}")
    
    # Test scenarios
    security_test_cases = [
        {
            "name": "Normal Query",
            "input": "Giải thích machine learning cơ bản",
            "should_succeed": True
        },
        {
            "name": "PII in Input",
            "input": "Tôi tên Nguyễn Văn An, email an@company.com, SĐT 0901234567. Hỏi về AI.",
            "should_succeed": True  # Sẽ sanitize PII
        },
        {
            "name": "Potential XSS Attack",
            "input": "<script>alert('hack')</script> Explain JavaScript",
            "should_succeed": False
        },
        {
            "name": "SQL Injection Attempt",
            "input": "'; DROP TABLE users; -- Explain databases",
            "should_succeed": False
        },
        {
            "name": "Very Long Input",
            "input": ("A" * 12000) + " What is AI?",  # Vượt quá limit
            "should_succeed": False
        }
    ]
    
    for i, test_case in enumerate(security_test_cases, 1):
        print(f"🧪 **Test {i}: {test_case['name']}**")
        
        result = await secure_agent.secure_run(
            user_input=test_case["input"],
            user_id=test_user_id,
            auth_token=auth_token,
            ip_address="192.168.1.100"
        )
        
        success_expected = test_case["should_succeed"]
        actual_success = result["success"]
        
        if actual_success == success_expected:
            status = "✅ PASS"
        else:
            status = "❌ FAIL"
        
        print(f"   {status} - Expected: {success_expected}, Got: {actual_success}")
        
        if result["success"]:
            print(f"   📝 Response: {result['result'][:100]}...")
            if result.get("security_info", {}).get("input_sanitized"):
                print(f"   🧹 Input was sanitized for security")
        else:
            print(f"   🚫 Blocked: {result['error']} ({result.get('error_code')})")
        
        print()
    
    # Test rate limiting
    print("🚦 **Testing Rate Limiting**")
    
    # Make multiple requests to trigger rate limit
    for i in range(5):
        result = await secure_agent.secure_run(
            user_input=f"Test request {i+1}",
            user_id=test_user_id,
            auth_token=auth_token,
            ip_address="192.168.1.100"
        )
        
        if result["success"]:
            print(f"   ✅ Request {i+1}: Success")
        else:
            print(f"   🚫 Request {i+1}: {result['error']}")
    
    # Security report
    print("\n📊 **Security Report**")
    
    security_report = security_manager.get_security_report()
    
    print(f"🔐 Security Level: {security_report['security_level']}")
    print(f"📈 Total Events: {security_report['total_events']}")
    print(f"⏰ Events (24h): {security_report['events_last_24h']}")
    print(f"🚨 Risk Level: {security_report['current_risk_level']}")
    
    print(f"\n📋 **Event Breakdown:**")
    for event_type, count in security_report['event_type_breakdown'].items():
        print(f"{event_type}: {count}")
    
    print(f"\n💡 **Security Recommendations:**")
    for rec in security_report['security_recommendations'][:5]:
        print(f"{rec}")

if __name__ == "__main__":
    from agents import Runner
    import asyncio
    asyncio.run(demo_production_security())

Những Điều Cần Biết Khi Đưa Vào Sản Xuất

Kiến Thức Cốt Lõi

Environment management - Phân chia rõ ràng dev, staging và production
Performance optimization - Request processing với high concurrency
Cost management - Intelligent model selection dựa trên complexity
Security framework - Authentication, validation và monitoring toàn diện
Monitoring và alerting - Theo dõi liên tục để phát hiện vấn đề sớm

Kinh Nghiệm Từ Thực Tế

🎯 Về Trải Nghiệm Người Dùng:

  • Luôn prioritize response time và availability
  • Implement graceful degradation khi hệ thống quá tải
  • Cung cấp feedback rõ ràng khi có lỗi xảy ra
  • Design cho mobile-first nếu có nhiều user mobile
  • Test thoroughly trên các devices và browsers khác nhau

Về Hiệu Suất:

  • Monitor response time như metric quan trọng nhất
  • Implement caching ở nhiều layers khác nhau
  • Use connection pooling để optimize database connections
  • Consider CDN cho static assets và responses
  • Load test với realistic traffic patterns

🔧 Về Triển Khai:

  • Automate deployment process hoàn toàn
  • Implement blue-green deployment để minimize downtime
  • Have rollback strategy sẵn sàng
  • Monitor system health ngay sau mỗi deployment
  • Keep deployment logs để troubleshoot khi cần

Hướng Dẫn Triển Khai Production

📊 Thiết Lập Monitoring:

  • Track key metrics: response time, error rate, throughput
  • Set up alerts cho các thresholds quan trọng
  • Monitor cost và usage patterns daily
  • Implement health checks cho tất cả components
  • Create dashboards cho different stakeholders

🔒 Bảo Mật Production:

  • Implement defense-in-depth strategy
  • Regular security audits và penetration testing
  • Keep all dependencies updated
  • Monitor cho suspicious activities
  • Have incident response plan ready

💰 Quản Lý Chi Phí:

  • Set up budget alerts và automatic controls
  • Monitor usage patterns để optimize model selection
  • Implement request batching khi có thể
  • Regular cost analysis và optimization
  • Consider reserved instances cho predictable workloads

Những Sai Lầm Thường Gặp

Thiếu monitoring → Không biết khi nào system có vấn đề
Không có backup plan → Không thể recover khi có sự cố
Over-engineering từ đầu → Waste time và resources
Ignore security → Vulnerabilities có thể được exploit
Không test với real data → Surprise khi go-live

Chiến Lược Scaling

🏗️ Horizontal Scaling:

  • Design stateless services từ đầu cho easy replication
  • Sử dụng load balancers để distribute traffic evenly
  • Implement database sharding strategies khi single DB không đủ
  • Consider microservices architecture cho large systems
  • Plan cho multi-region deployment để serve global users

📈 Vertical Scaling:

  • Monitor resource utilization patterns để identify bottlenecks
  • Upgrade hardware khi cost-effective và necessary
  • Optimize memory usage và CPU utilization
  • Consider GPU instances cho AI-intensive workloads
  • Profile application performance để identify optimization opportunities

Kế Hoạch Disaster Recovery

🚨 Chuẩn Bị Cho Sự Cố:

  • Document tất cả procedures và runbooks chi tiết
  • Thực hiện regular backup và recovery testing
  • Maintain multiple communication channels
  • Train team members về incident response procedures
  • Keep updated emergency contact lists và escalation paths

🔄 Business Continuity:

  • Define clear RTO (Recovery Time Objective) và RPO (Recovery Point Objective)
  • Implement redundancy ở critical system components
  • Have alternative service providers sẵn sàng
  • Conduct regular disaster recovery drills
  • Maintain updated incident response và communication plans

Tối Ưu Chi Phí Trong Dài Hạn

💡 Chiến Lược Cost Optimization:

  • Analyze usage patterns để predict và optimize costs
  • Implement smart caching để reduce unnecessary API calls
  • Sử dụng cheaper models cho simple, routine queries
  • Consider training custom models cho specific, high-volume use cases
  • Regular review và cleanup unused resources và subscriptions

📊 Cost Tracking:

  • Implement detailed cost attribution và tracking
  • Track ROI của different features và use cases
  • Setup cost allocation tags cho different projects/departments
  • Conduct regular cost review meetings với stakeholders
  • Forecast future costs based on growth projections và plans

Chuẩn Bị Cho Tương Lai

🔮 Technology Evolution:

  • Stay updated với latest AI developments
  • Plan cho model upgrades và migrations
  • Consider emerging technologies như multimodal AI
  • Invest in team training và development
  • Build flexible architecture có thể adapt

🌱 Business Growth:

  • Plan infrastructure scaling cho anticipated growth
  • Consider international expansion requirements
  • Prepare cho regulatory compliance changes
  • Build partnerships với technology vendors
  • Invest in automation để handle scale

Checklist Cuối Cùng Trước Khi Go-Live

✅ Sẵn Sàng Kỹ Thuật:

  • Load testing passed với expected traffic volumes
  • Security audit completed và all vulnerabilities addressed
  • Monitoring và alerting systems hoàn tất và tested
  • Backup và recovery procedures tested successfully
  • Performance benchmarks established và documented

✅ Sẵn Sàng Vận Hành:

  • Team training completed cho all relevant personnel
  • Documentation updated và easily accessible
  • Incident response procedures ready và tested
  • Customer support processes established
  • Communication plans prepared cho different scenarios

✅ Sẵn Sàng Business:

  • User acceptance testing passed với real users
  • Legal và compliance requirements fully met
  • Marketing materials prepared cho launch
  • Customer support team ready với proper training
  • Success metrics defined và tracking systems ready

Chúc mừng bạn đã hoàn thành hành trình khám phá OpenAI Agents SDK! Từ những bước đầu tiên với “Hello World” đến việc triển khai production-grade systems, bạn đã trang bị đầy đủ kiến thức để xây dựng những AI applications mạnh mẽ và đáng tin cậy. Hãy áp dụng những kiến thức này vào projects thực tế và tiếp tục học hỏi từ community để không ngừng cải thiện skills của mình.

Bài viết này được cấp phép bởi tác giả theo giấy phép CC BY 4.0 .