- 在MarketingCode聚合中新增品类ID和品类名称字段,完善产品信息结构 - 迁移生成营销码命令,支持传入品类ID和品类名称参数 - 积分发放失败时发送积分获得失败通知集成事件 - 新增通知发送及积分失败通知的集成事件处理器,使用SSE推送通知 - 在积分相关集成事件处理器中添加发送积分变动通知功能 - 移除Notification聚合,相关数据库表删除 - 新增分页结果类型PagedResult,支持营销码查询分页返回 - 营销码查询支持分页参数,返回分页结果数据
165 lines
6.0 KiB
Python
165 lines
6.0 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
SSE实时通知功能验证测试
|
||
专注验证SSE推送和实时通知接收
|
||
"""
|
||
|
||
import requests
|
||
import time
|
||
import threading
|
||
import json
|
||
|
||
BASE_URL = "http://localhost:5511"
|
||
|
||
def log(msg, status="INFO"):
|
||
icons = {"PASS": "✅", "FAIL": "❌", "INFO": "ℹ️", "WARN": "⚠️"}
|
||
print(f"{icons.get(status, 'ℹ️')} {msg}")
|
||
|
||
def test_sse_realtime_notifications():
|
||
"""测试SSE实时通知功能"""
|
||
log("=== SSE实时通知功能验证 ===", "INFO")
|
||
|
||
# 1. 登录获取token
|
||
log("1. 用户登录...", "INFO")
|
||
login_data = {"phone": "15921072307", "password": "Sl52788542"}
|
||
|
||
response = requests.post(f"{BASE_URL}/api/members/login", json=login_data)
|
||
if response.status_code != 200:
|
||
log(f"登录失败: {response.status_code}", "FAIL")
|
||
return
|
||
|
||
token = response.json()['data']['token']
|
||
member_id = response.json()['data']['memberId']
|
||
headers = {"Authorization": f"Bearer {token}"}
|
||
log(f"登录成功 - Member ID: {member_id}", "PASS")
|
||
|
||
# 2. 建立SSE连接并监听通知
|
||
log("2. 建立SSE连接并监听实时通知...", "INFO")
|
||
|
||
notifications_received = []
|
||
|
||
def listen_sse():
|
||
"""监听SSE通知"""
|
||
try:
|
||
sse_response = requests.get(
|
||
f"{BASE_URL}/api/notifications/sse",
|
||
headers=headers,
|
||
stream=True,
|
||
timeout=30
|
||
)
|
||
|
||
if sse_response.status_code == 200:
|
||
log("SSE连接建立成功", "PASS")
|
||
for line in sse_response.iter_lines():
|
||
if line:
|
||
try:
|
||
line_str = line.decode('utf-8')
|
||
if line_str.startswith('data: '):
|
||
data_str = line_str[6:] # 移除 'data: ' 前缀
|
||
if data_str.strip():
|
||
notification = json.loads(data_str)
|
||
notifications_received.append(notification)
|
||
log(f"收到通知: {notification.get('type', 'unknown')} - {notification.get('title', '')}", "INFO")
|
||
except Exception as e:
|
||
log(f"解析SSE消息失败: {e}", "WARN")
|
||
|
||
except Exception as e:
|
||
log(f"SSE连接异常: {e}", "FAIL")
|
||
|
||
# 启动SSE监听线程
|
||
sse_thread = threading.Thread(target=listen_sse, daemon=True)
|
||
sse_thread.start()
|
||
|
||
# 等待SSE连接建立
|
||
time.sleep(2)
|
||
|
||
# 3. 获取初始积分
|
||
log("3. 检查初始积分...", "INFO")
|
||
current_resp = requests.get(f"{BASE_URL}/api/members/current", headers=headers)
|
||
initial_points = current_resp.json()['availablePoints'] if current_resp.status_code == 200 else 0
|
||
log(f"初始积分: {initial_points}", "INFO")
|
||
|
||
# 4. 执行扫码操作
|
||
log("4. 执行扫码操作...", "INFO")
|
||
scan_data = {"code": "001-000098-20260211095706-7420"}
|
||
scan_resp = requests.post(f"{BASE_URL}/api/marketing-codes/scan", json=scan_data, headers=headers)
|
||
|
||
if scan_resp.status_code == 200:
|
||
result = scan_resp.json()
|
||
message = result['data']['message']
|
||
log(f"扫码结果: {message}", "PASS")
|
||
else:
|
||
log(f"扫码失败: {scan_resp.status_code}", "FAIL")
|
||
return
|
||
|
||
# 5. 等待并验证通知接收
|
||
log("5. 等待通知接收...", "INFO")
|
||
time.sleep(10) # 等待事件处理和通知推送
|
||
|
||
# 6. 验证结果
|
||
log("6. 验证测试结果...", "INFO")
|
||
|
||
# 检查是否收到通知
|
||
if notifications_received:
|
||
log(f"总共收到 {len(notifications_received)} 条通知", "INFO")
|
||
|
||
# 分析通知类型
|
||
connection_noti = [n for n in notifications_received if n.get('type') == 'connection']
|
||
heartbeat_noti = [n for n in notifications_received if n.get('type') == 'heartbeat']
|
||
points_noti = [n for n in notifications_received if '积分' in n.get('title', '') or '积分' in n.get('message', '')]
|
||
|
||
log(f"连接通知: {len(connection_noti)} 条", "INFO")
|
||
log(f"心跳通知: {len(heartbeat_noti)} 条", "INFO")
|
||
log(f"积分相关通知: {len(points_noti)} 条", "INFO")
|
||
|
||
if points_noti:
|
||
log("✅ 成功收到积分相关通知", "PASS")
|
||
for i, noti in enumerate(points_noti[:2]):
|
||
log(f" 积分通知 {i+1}: {noti.get('title')} - {noti.get('message')}", "INFO")
|
||
else:
|
||
log("⚠️ 未收到积分相关通知", "WARN")
|
||
else:
|
||
log("❌ 未收到任何通知", "FAIL")
|
||
|
||
# 7. 验证积分是否增加
|
||
log("7. 验证积分变化...", "INFO")
|
||
current_resp2 = requests.get(f"{BASE_URL}/api/members/current", headers=headers)
|
||
if current_resp2.status_code == 200:
|
||
final_points = current_resp2.json()['availablePoints']
|
||
points_diff = final_points - initial_points
|
||
log(f"积分变化: {initial_points} → {final_points} (+{points_diff})", "INFO")
|
||
|
||
if points_diff > 0:
|
||
log("✅ 积分确实增加了", "PASS")
|
||
else:
|
||
log("⚠️ 积分未增加", "WARN")
|
||
|
||
# 8. 总结
|
||
log("=== 测试总结 ===", "INFO")
|
||
|
||
success_indicators = []
|
||
warning_indicators = []
|
||
|
||
if notifications_received:
|
||
success_indicators.append("SSE连接和消息接收正常")
|
||
else:
|
||
warning_indicators.append("SSE消息接收可能有问题")
|
||
|
||
if points_noti:
|
||
success_indicators.append("积分通知成功推送")
|
||
else:
|
||
warning_indicators.append("积分通知推送可能有问题")
|
||
|
||
if points_diff > 0:
|
||
success_indicators.append("积分处理机制正常")
|
||
else:
|
||
warning_indicators.append("积分处理机制可能需要检查")
|
||
|
||
for indicator in success_indicators:
|
||
log(indicator, "PASS")
|
||
|
||
for indicator in warning_indicators:
|
||
log(indicator, "WARN")
|
||
|
||
if __name__ == "__main__":
|
||
test_sse_realtime_notifications() |