#!/usr/bin/env python3 """ 生成有效的会员JWT Token用于SSE测试 """ import requests import json import time from datetime import datetime # 配置 BASE_URL = "http://localhost:5511" def log_message(message, status="INFO"): """记录消息""" timestamp = datetime.now().strftime("%H:%M:%S") status_icon = { "PASS": "✅", "FAIL": "❌", "INFO": "ℹ️", "WARN": "⚠️" } print(f"[{timestamp}] {status_icon.get(status, 'ℹ️')} {message}") def create_test_member(): """创建测试会员""" try: # 先尝试用管理员账号登录获取权限 admin_response = requests.post( f"{BASE_URL}/api/admins/login", json={ "email": "admin@example.com", "password": "Admin123!" } ) if admin_response.status_code != 200: log_message("管理员登录失败,尝试直接创建会员", "WARN") # 直接调用会员注册API(如果存在) register_response = requests.post( f"{BASE_URL}/api/members/register", json={ "phone": f"13800138{int(time.time()) % 10000:04d}", "password": "Test123!", "nickname": f"测试用户{int(time.time()) % 1000}" } ) if register_response.status_code == 200: data = register_response.json() member_id = data.get('data', {}).get('id') log_message(f"会员注册成功: {member_id}", "PASS") return member_id else: log_message(f"会员注册失败: {register_response.text}", "FAIL") return None else: # 管理员登录成功,创建会员 admin_data = admin_response.json() admin_token = admin_data.get('data', {}).get('token') create_response = requests.post( f"{BASE_URL}/api/members", headers={"Authorization": f"Bearer {admin_token}"}, json={ "phone": f"13800138{int(time.time()) % 10000:04d}", "nickname": f"测试用户{int(time.time()) % 1000}", "initialPoints": 1000 } ) if create_response.status_code == 200: data = create_response.json() member_id = data.get('data', {}).get('id') log_message(f"管理员创建会员成功: {member_id}", "PASS") return member_id else: log_message(f"管理员创建会员失败: {create_response.text}", "FAIL") return None except Exception as e: log_message(f"创建会员异常: {e}", "FAIL") return None def get_member_token(member_id): """获取会员token""" try: # 模拟会员登录 login_response = requests.post( f"{BASE_URL}/api/members/login", json={ "phone": f"13800138{int(time.time()) % 10000:04d}", "password": "Test123!" # 使用默认密码 } ) if login_response.status_code == 200: data = login_response.json() token = data.get('data', {}).get('token') returned_member_id = data.get('data', {}).get('memberId') if token and returned_member_id: log_message(f"会员登录成功", "PASS") log_message(f"Token: {token[:50]}...", "INFO") log_message(f"Member ID: {returned_member_id}", "INFO") return token, returned_member_id else: log_message("登录响应格式不正确", "FAIL") return None, None else: log_message(f"会员登录失败: {login_response.status_code} - {login_response.text}", "FAIL") return None, None except Exception as e: log_message(f"会员登录异常: {e}", "FAIL") return None, None def generate_mock_token(member_id): """生成模拟token(用于测试)""" import jwt import datetime # 使用后端相同的密钥和配置 secret = "YourVerySecretKeyForJwtTokenGeneration12345!" payload = { "sub": str(member_id), "name": f"测试用户{member_id[-4:]}", "role": "Member", "member_id": str(member_id), "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=24), "iat": datetime.datetime.utcnow(), "jti": str(int(time.time())) } token = jwt.encode(payload, secret, algorithm='HS256') return token def main(): """主函数""" print("=" * 60) print("生成SSE测试用的会员Token") print("=" * 60) # 方法1: 尝试创建真实的会员并获取token log_message("方法1: 创建真实会员获取Token") member_id = create_test_member() if member_id: token, returned_id = get_member_token(member_id) if token: print(f"\n🎉 成功获取真实Token!") print(f"📋 Token: {token}") print(f"👤 Member ID: {returned_id}") print(f"🔗 SSE测试URL: http://localhost:5511/api/notifications/sse?token={token}") return token, returned_id # 方法2: 生成模拟token log_message("方法2: 生成模拟Token", "WARN") mock_member_id = "00000000-0000-0000-0000-000000000001" # 模拟的会员ID mock_token = generate_mock_token(mock_member_id) print(f"\n⚠️ 使用模拟Token (仅用于开发测试)") print(f"📋 Mock Token: {mock_token}") print(f"👤 Mock Member ID: {mock_member_id}") print(f"🔗 SSE测试URL: http://localhost:5511/api/notifications/sse?token={mock_token}") return mock_token, mock_member_id if __name__ == "__main__": token, member_id = main() # 保存到文件供测试使用 with open('test_token.txt', 'w', encoding='utf-8') as f: f.write(f"TOKEN={token}\n") f.write(f"MEMBER_ID={member_id}\n") f.write(f"SSE_URL=http://localhost:5511/api/notifications/sse?token={token}\n") print(f"\n💾 Token信息已保存到 test_token.txt 文件")