Project.Fengling.QoderVersion/tests/sse_notification_tests/simplified_cap_test.py
sam d88ec60ef4 feat(marketing): 扩展营销码支持品类信息并完善通知机制
- 在MarketingCode聚合中新增品类ID和品类名称字段,完善产品信息结构
- 迁移生成营销码命令,支持传入品类ID和品类名称参数
- 积分发放失败时发送积分获得失败通知集成事件
- 新增通知发送及积分失败通知的集成事件处理器,使用SSE推送通知
- 在积分相关集成事件处理器中添加发送积分变动通知功能
- 移除Notification聚合,相关数据库表删除
- 新增分页结果类型PagedResult,支持营销码查询分页返回
- 营销码查询支持分页参数,返回分页结果数据
2026-02-13 19:00:06 +08:00

192 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
简化版CAP诊断测试
直接测试扫码后的通知推送情况
"""
import requests
import json
import time
from datetime import datetime
BASE_URL = "http://localhost:5511"
def log(msg, status="INFO"):
timestamp = datetime.now().strftime("%H:%M:%S")
icons = {"PASS": "", "FAIL": "", "INFO": "", "WARN": "⚠️"}
print(f"[{timestamp}] {icons.get(status, '')} {msg}")
def simplified_cap_test():
"""简化版CAP测试"""
log("=== 简化版CAP诊断测试 ===", "INFO")
# 1. 用户登录
log("1. 用户登录...", "INFO")
user_login_data = {
"phone": "15921072307",
"password": "Sl52788542"
}
try:
user_resp = requests.post(
f"{BASE_URL}/api/members/login",
json=user_login_data,
timeout=10
)
if user_resp.status_code == 200:
user_token = user_resp.json()['data']['token']
member_id = user_resp.json()['data']['memberId']
user_headers = {"Authorization": f"Bearer {user_token}"}
log(f"✅ 用户登录成功 - Member ID: {member_id}", "PASS")
else:
log(f"❌ 用户登录失败: {user_resp.status_code}", "FAIL")
return
# 2. 使用已知的可用营销码直接测试
test_code = "011-000050-20260213075254-3805" # 从之前的查询中获取
log(f"2. 使用营销码: {test_code}", "INFO")
# 3. 建立SSE连接
log("3. 建立SSE连接...", "INFO")
sse_connected = False
notifications_received = []
def sse_listener():
nonlocal sse_connected, notifications_received
try:
sse_resp = requests.get(
f"{BASE_URL}/api/notifications/sse",
headers=user_headers,
stream=True,
timeout=45 # 增加超时时间
)
if sse_resp.status_code == 200:
sse_connected = True
log("✅ SSE连接建立成功", "PASS")
for line in sse_resp.iter_lines():
if line:
try:
line_str = line.decode('utf-8')
if line_str.startswith('data: '):
data_str = line_str[6:]
if data_str.strip():
notification = json.loads(data_str)
notifications_received.append(notification)
notification_type = notification.get('type', 'unknown')
title = notification.get('title', '')
log(f" 收到通知 [{notification_type}]: {title}", "INFO")
# 记录时间戳用于分析
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f" 时间: {timestamp}")
except Exception as e:
log(f" 解析消息失败: {e}", "WARN")
except Exception as e:
log(f"❌ SSE连接异常: {e}", "FAIL")
# 在后台启动SSE监听
import threading
sse_thread = threading.Thread(target=sse_listener, daemon=True)
sse_thread.start()
# 等待SSE连接建立
time.sleep(3)
if not sse_connected:
log("❌ SSE连接建立失败", "FAIL")
return
# 4. 执行扫码操作
log("4. 执行扫码操作...", "INFO")
scan_start_time = time.time()
scan_data = {"code": test_code}
scan_resp = requests.post(
f"{BASE_URL}/api/marketing-codes/scan",
json=scan_data,
headers=user_headers,
timeout=15
)
scan_end_time = time.time()
log(f" 扫码耗时: {scan_end_time - scan_start_time:.2f}", "INFO")
if scan_resp.status_code == 200:
scan_result = scan_resp.json()
message = scan_result['data']['message']
earned_points = scan_result['data'].get('earnedPoints', 0)
log(f"✅ 扫码成功: {message} (获得积分: {earned_points})", "PASS")
else:
log(f"❌ 扫码失败: {scan_resp.status_code}", "FAIL")
log(f" 错误详情: {scan_resp.text}", "INFO")
return
# 5. 等待并观察通知
log("5. 等待通知推送...", "INFO")
initial_count = len(notifications_received)
# 等待30秒观察通知
wait_duration = 30
for i in range(wait_duration):
time.sleep(1)
current_count = len(notifications_received)
if current_count > initial_count:
new_notifications = notifications_received[initial_count:]
points_notifications = [
n for n in new_notifications
if '积分' in n.get('title', '') or '积分' in n.get('message', '')
]
if points_notifications:
log(f"✅ 在第{i+1}秒收到积分通知!", "PASS")
for notification in points_notifications:
log(f" 通知内容: {notification.get('title', '')} - {notification.get('message', '')}", "INFO")
break
if i % 5 == 4: # 每5秒报告一次进度
log(f" 等待中... ({i+1}/{wait_duration}秒)", "INFO")
# 6. 最终分析
log("6. 最终结果分析...", "INFO")
final_count = len(notifications_received)
new_notifications = notifications_received[initial_count:] if final_count > initial_count else []
connection_msgs = [n for n in new_notifications if n.get('type') == 'connection']
heartbeat_msgs = [n for n in new_notifications if n.get('type') == 'heartbeat']
points_msgs = [n for n in new_notifications if '积分' in n.get('title', '') or '积分' in n.get('message', '')]
log(f" 新增通知总数: {len(new_notifications)}", "INFO")
log(f" 连接消息: {len(connection_msgs)}", "INFO")
log(f" 心跳消息: {len(heartbeat_msgs)}", "INFO")
log(f" 积分消息: {len(points_msgs)}", "INFO")
# 最终结论
log("=== 测试结论 ===", "INFO")
if len(points_msgs) > 0:
log("🎉 积分通知推送成功!", "PASS")
log("✅ CAP消息队列工作正常", "PASS")
else:
log("❌ 积分通知推送失败", "FAIL")
log("🔍 需要进一步诊断CAP消息处理链路", "WARN")
# 显示所有收到的通知最后10条
if notifications_received:
log("=== 最近收到的通知 ===", "INFO")
recent_notifications = notifications_received[-10:] # 显示最近10条
for i, notification in enumerate(recent_notifications, 1):
msg_type = notification.get('type', 'unknown')
title = notification.get('title', '')
message = notification.get('message', '')
log(f" {i}. [{msg_type}] {title} - {message}", "INFO")
except Exception as e:
log(f"❌ 测试过程中出现异常: {e}", "FAIL")
import traceback
log(f" 详细错误: {traceback.format_exc()}", "INFO")
if __name__ == "__main__":
simplified_cap_test()