#!/usr/bin/env python3 """ SSE实时通知功能验证测试 专注验证SSE推送和实时通知接收 """ import requests import time import threading import json BASE_URL = "http://localhost:5511" def log(msg, status="INFO"): icons = {"PASS": "✅", "FAIL": "❌", "INFO": "ℹ️", "WARN": "⚠️"} print(f"{icons.get(status, 'ℹ️')} {msg}") def test_sse_realtime_notifications(): """测试SSE实时通知功能""" log("=== SSE实时通知功能验证 ===", "INFO") # 1. 登录获取token log("1. 用户登录...", "INFO") login_data = {"phone": "15921072307", "password": "Sl52788542"} response = requests.post(f"{BASE_URL}/api/members/login", json=login_data) if response.status_code != 200: log(f"登录失败: {response.status_code}", "FAIL") return token = response.json()['data']['token'] member_id = response.json()['data']['memberId'] headers = {"Authorization": f"Bearer {token}"} log(f"登录成功 - Member ID: {member_id}", "PASS") # 2. 建立SSE连接并监听通知 log("2. 建立SSE连接并监听实时通知...", "INFO") notifications_received = [] def listen_sse(): """监听SSE通知""" try: sse_response = requests.get( f"{BASE_URL}/api/notifications/sse", headers=headers, stream=True, timeout=30 ) if sse_response.status_code == 200: log("SSE连接建立成功", "PASS") for line in sse_response.iter_lines(): if line: try: line_str = line.decode('utf-8') if line_str.startswith('data: '): data_str = line_str[6:] # 移除 'data: ' 前缀 if data_str.strip(): notification = json.loads(data_str) notifications_received.append(notification) log(f"收到通知: {notification.get('type', 'unknown')} - {notification.get('title', '')}", "INFO") except Exception as e: log(f"解析SSE消息失败: {e}", "WARN") except Exception as e: log(f"SSE连接异常: {e}", "FAIL") # 启动SSE监听线程 sse_thread = threading.Thread(target=listen_sse, daemon=True) sse_thread.start() # 等待SSE连接建立 time.sleep(2) # 3. 获取初始积分 log("3. 检查初始积分...", "INFO") current_resp = requests.get(f"{BASE_URL}/api/members/current", headers=headers) initial_points = current_resp.json()['availablePoints'] if current_resp.status_code == 200 else 0 log(f"初始积分: {initial_points}", "INFO") # 4. 执行扫码操作 log("4. 执行扫码操作...", "INFO") scan_data = {"code": "001-000098-20260211095706-7420"} scan_resp = requests.post(f"{BASE_URL}/api/marketing-codes/scan", json=scan_data, headers=headers) if scan_resp.status_code == 200: result = scan_resp.json() message = result['data']['message'] log(f"扫码结果: {message}", "PASS") else: log(f"扫码失败: {scan_resp.status_code}", "FAIL") return # 5. 等待并验证通知接收 log("5. 等待通知接收...", "INFO") time.sleep(10) # 等待事件处理和通知推送 # 6. 验证结果 log("6. 验证测试结果...", "INFO") # 检查是否收到通知 if notifications_received: log(f"总共收到 {len(notifications_received)} 条通知", "INFO") # 分析通知类型 connection_noti = [n for n in notifications_received if n.get('type') == 'connection'] heartbeat_noti = [n for n in notifications_received if n.get('type') == 'heartbeat'] points_noti = [n for n in notifications_received if '积分' in n.get('title', '') or '积分' in n.get('message', '')] log(f"连接通知: {len(connection_noti)} 条", "INFO") log(f"心跳通知: {len(heartbeat_noti)} 条", "INFO") log(f"积分相关通知: {len(points_noti)} 条", "INFO") if points_noti: log("✅ 成功收到积分相关通知", "PASS") for i, noti in enumerate(points_noti[:2]): log(f" 积分通知 {i+1}: {noti.get('title')} - {noti.get('message')}", "INFO") else: log("⚠️ 未收到积分相关通知", "WARN") else: log("❌ 未收到任何通知", "FAIL") # 7. 验证积分是否增加 log("7. 验证积分变化...", "INFO") current_resp2 = requests.get(f"{BASE_URL}/api/members/current", headers=headers) if current_resp2.status_code == 200: final_points = current_resp2.json()['availablePoints'] points_diff = final_points - initial_points log(f"积分变化: {initial_points} → {final_points} (+{points_diff})", "INFO") if points_diff > 0: log("✅ 积分确实增加了", "PASS") else: log("⚠️ 积分未增加", "WARN") # 8. 总结 log("=== 测试总结 ===", "INFO") success_indicators = [] warning_indicators = [] if notifications_received: success_indicators.append("SSE连接和消息接收正常") else: warning_indicators.append("SSE消息接收可能有问题") if points_noti: success_indicators.append("积分通知成功推送") else: warning_indicators.append("积分通知推送可能有问题") if points_diff > 0: success_indicators.append("积分处理机制正常") else: warning_indicators.append("积分处理机制可能需要检查") for indicator in success_indicators: log(indicator, "PASS") for indicator in warning_indicators: log(indicator, "WARN") if __name__ == "__main__": test_sse_realtime_notifications()