Project.Fengling.QoderVersion/tests/test_business_flow.py
sam d88ec60ef4 feat(marketing): 扩展营销码支持品类信息并完善通知机制
- 在MarketingCode聚合中新增品类ID和品类名称字段,完善产品信息结构
- 迁移生成营销码命令,支持传入品类ID和品类名称参数
- 积分发放失败时发送积分获得失败通知集成事件
- 新增通知发送及积分失败通知的集成事件处理器,使用SSE推送通知
- 在积分相关集成事件处理器中添加发送积分变动通知功能
- 移除Notification聚合,相关数据库表删除
- 新增分页结果类型PagedResult,支持营销码查询分页返回
- 营销码查询支持分页参数,返回分页结果数据
2026-02-13 19:00:06 +08:00

522 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
一物一码会员营销系统 - 业务流程测试
测试范围:
1. 管理后台Admin登录、营销码生成、礼品管理、积分规则配置、订单管理
2. C端应用H5登录/注册、扫码积分、积分商城、积分兑换、订单查看
"""
from playwright.sync_api import sync_playwright, Page, expect
import time
import json
from datetime import datetime
class BusinessFlowTester:
def __init__(self, admin_url: str, h5_url: str):
self.admin_url = admin_url
self.h5_url = h5_url
self.test_results = []
self.marketing_codes = []
self.gift_id = None
def log_test(self, test_name: str, status: str, message: str = ""):
"""记录测试结果"""
result = {
"test": test_name,
"status": status,
"message": message,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.test_results.append(result)
status_emoji = "" if status == "PASS" else ""
print(f"{status_emoji} [{test_name}] {status}: {message}")
def test_admin_login(self, page: Page):
"""测试管理后台登录"""
try:
print("\n=== 测试:管理后台登录 ===")
page.goto(self.admin_url)
page.wait_for_load_state('networkidle')
# 检查是否在登录页
page.wait_for_selector('input[type="text"]', timeout=5000)
# 输入登录信息Mock登录
page.fill('input[type="text"]', 'admin')
page.fill('input[type="password"]', 'admin123')
# 点击登录按钮
page.click('button[type="submit"]')
# 等待跳转到仪表盘
page.wait_for_url('**/dashboard', timeout=10000)
# 验证仪表盘加载成功
page.wait_for_selector('text=仪表盘', timeout=5000)
self.log_test("管理后台登录", "PASS", "成功登录并跳转到仪表盘")
return True
except Exception as e:
self.log_test("管理后台登录", "FAIL", str(e))
return False
def test_admin_dashboard(self, page: Page):
"""测试管理后台仪表盘"""
try:
print("\n=== 测试:管理后台仪表盘 ===")
# 检查统计卡片
page.wait_for_selector('text=总礼品数', timeout=5000)
page.wait_for_selector('text=待处理订单', timeout=5000)
# 截图保存
page.screenshot(path='test_screenshots/admin_dashboard.png', full_page=True)
self.log_test("管理后台仪表盘", "PASS", "仪表盘数据展示正常")
return True
except Exception as e:
self.log_test("管理后台仪表盘", "FAIL", str(e))
return False
def test_generate_marketing_codes(self, page: Page):
"""测试营销码生成"""
try:
print("\n=== 测试:营销码生成 ===")
# 导航到营销码管理页面
page.goto(f"{self.admin_url}/marketing-codes")
page.wait_for_load_state('networkidle')
# 填写生成表单
batch_no = f"BATCH-{int(time.time())}"
page.fill('input[name="batchNo"]', batch_no)
page.fill('input[name="productId"]', 'PROD-001')
page.fill('input[name="productName"]', '测试产品A')
page.fill('input[name="quantity"]', '10')
# 提交生成
page.click('button:has-text("生成营销码")')
# 等待生成结果
page.wait_for_selector('text=生成成功', timeout=10000)
# 获取生成的营销码
time.sleep(2)
code_elements = page.locator('table tbody tr td:first-child').all()
self.marketing_codes = [el.inner_text() for el in code_elements[:5]]
print(f"生成的营销码示例: {self.marketing_codes[:3]}")
# 截图
page.screenshot(path='test_screenshots/marketing_codes.png', full_page=True)
self.log_test("营销码生成", "PASS", f"成功生成营销码,批次号: {batch_no}")
return True
except Exception as e:
self.log_test("营销码生成", "FAIL", str(e))
return False
def test_create_points_rule(self, page: Page):
"""测试创建积分规则"""
try:
print("\n=== 测试:创建积分规则 ===")
# 导航到积分规则页面
page.goto(f"{self.admin_url}/points-rules")
page.wait_for_load_state('networkidle')
# 填写规则表单
page.fill('input[name="ruleName"]', '测试产品积分规则')
# 选择规则类型(产品规则)
page.click('select[name="ruleType"]')
page.click('option[value="1"]') # Product = 1
# 填写积分值
page.fill('input[name="pointsValue"]', '100')
# 填写产品ID
page.fill('input[name="productId"]', 'PROD-001')
# 提交创建
page.click('button:has-text("创建规则")')
# 等待成功提示
page.wait_for_selector('text=创建成功', timeout=10000)
# 截图
page.screenshot(path='test_screenshots/points_rule.png', full_page=True)
self.log_test("创建积分规则", "PASS", "成功创建产品积分规则")
return True
except Exception as e:
self.log_test("创建积分规则", "FAIL", str(e))
return False
def test_create_gift(self, page: Page):
"""测试创建礼品"""
try:
print("\n=== 测试:创建礼品 ===")
# 导航到礼品列表
page.goto(f"{self.admin_url}/gifts")
page.wait_for_load_state('networkidle')
# 点击创建按钮
page.click('button:has-text("创建礼品")')
page.wait_for_url('**/gifts/create')
# 填写礼品信息
page.fill('input[name="name"]', '测试礼品-电子产品')
# 选择礼品类型(实物)
page.click('select[name="giftType"]')
page.click('option[value="1"]') # Physical = 1
page.fill('textarea[name="description"]', '这是一个测试用的电子产品礼品')
page.fill('input[name="imageUrl"]', 'https://via.placeholder.com/400')
page.fill('input[name="requiredPoints"]', '500')
page.fill('input[name="totalStock"]', '100')
page.fill('input[name="redemptionLimit"]', '2')
# 提交创建
page.click('button:has-text("保存")')
# 等待成功并跳转回列表
page.wait_for_url('**/gifts', timeout=10000)
page.wait_for_selector('text=创建成功', timeout=5000)
# 截图
page.screenshot(path='test_screenshots/gift_created.png', full_page=True)
self.log_test("创建礼品", "PASS", "成功创建礼品")
return True
except Exception as e:
self.log_test("创建礼品", "FAIL", str(e))
return False
def test_gift_shelf_management(self, page: Page):
"""测试礼品上下架"""
try:
print("\n=== 测试:礼品上下架 ===")
# 确保在礼品列表页面
page.goto(f"{self.admin_url}/gifts")
page.wait_for_load_state('networkidle')
# 找到第一个礼品的上架开关
switch = page.locator('button[role="switch"]').first
# 点击上架
switch.click()
page.wait_for_selector('text=上架成功', timeout=5000)
# 等待并再次点击下架
time.sleep(1)
switch.click()
page.wait_for_selector('text=下架成功', timeout=5000)
self.log_test("礼品上下架", "PASS", "礼品上下架功能正常")
return True
except Exception as e:
self.log_test("礼品上下架", "FAIL", str(e))
return False
def test_h5_register(self, page: Page):
"""测试C端会员注册"""
try:
print("\n=== 测试C端会员注册 ===")
page.goto(f"{self.h5_url}/register")
page.wait_for_load_state('networkidle')
# 填写注册信息
phone = f"138{int(time.time()) % 100000000}"
page.fill('input[name="phone"]', phone)
page.fill('input[name="password"]', '123456')
page.fill('input[name="nickname"]', '测试用户')
# 提交注册
page.click('button:has-text("注册")')
# 等待注册成功并跳转
page.wait_for_url('**/home', timeout=10000)
# 截图
page.screenshot(path='test_screenshots/h5_register.png', full_page=True)
self.log_test("C端会员注册", "PASS", f"成功注册会员,手机号: {phone}")
return True
except Exception as e:
self.log_test("C端会员注册", "FAIL", str(e))
return False
def test_h5_scan_code(self, page: Page):
"""测试C端扫码积分"""
try:
print("\n=== 测试C端扫码积分 ===")
if not self.marketing_codes:
self.log_test("C端扫码积分", "SKIP", "没有可用的营销码")
return False
# 导航到扫码页面
page.goto(f"{self.h5_url}/scan")
page.wait_for_load_state('networkidle')
# 模拟扫码(输入营销码)
marketing_code = self.marketing_codes[0]
page.fill('input[name="marketingCode"]', marketing_code)
page.click('button:has-text("确认")')
# 等待积分获取结果
page.wait_for_selector('text=获得积分', timeout=10000)
# 截图
page.screenshot(path='test_screenshots/h5_scan_code.png', full_page=True)
self.log_test("C端扫码积分", "PASS", f"成功扫码获得积分,营销码: {marketing_code}")
return True
except Exception as e:
self.log_test("C端扫码积分", "FAIL", str(e))
return False
def test_h5_mall(self, page: Page):
"""测试C端积分商城"""
try:
print("\n=== 测试C端积分商城 ===")
# 导航到商城页面
page.goto(f"{self.h5_url}/mall")
page.wait_for_load_state('networkidle')
# 检查商品列表
page.wait_for_selector('text=积分商城', timeout=5000)
# 查找第一个商品
first_gift = page.locator('.gift-card').first
first_gift.click()
# 等待商品详情页
page.wait_for_url('**/mall/gift/**')
# 截图
page.screenshot(path='test_screenshots/h5_mall.png', full_page=True)
self.log_test("C端积分商城", "PASS", "积分商城展示正常")
return True
except Exception as e:
self.log_test("C端积分商城", "FAIL", str(e))
return False
def test_h5_redeem_gift(self, page: Page):
"""测试C端积分兑换"""
try:
print("\n=== 测试C端积分兑换 ===")
# 假设已经在商品详情页
# 点击兑换按钮
page.click('button:has-text("立即兑换")')
# 填写收货地址(如果是实物礼品)
page.wait_for_selector('input[name="consignee"]', timeout=5000)
page.fill('input[name="consignee"]', '张三')
page.fill('input[name="phone"]', '13800138000')
page.fill('input[name="address"]', '北京市朝阳区xxx街道xxx号')
# 确认兑换
page.click('button:has-text("确认兑换")')
# 等待兑换成功
page.wait_for_selector('text=兑换成功', timeout=10000)
# 截图
page.screenshot(path='test_screenshots/h5_redeem.png', full_page=True)
self.log_test("C端积分兑换", "PASS", "成功兑换礼品")
return True
except Exception as e:
self.log_test("C端积分兑换", "FAIL", str(e))
return False
def test_h5_orders(self, page: Page):
"""测试C端订单查询"""
try:
print("\n=== 测试C端订单查询 ===")
# 导航到订单页面
page.goto(f"{self.h5_url}/orders")
page.wait_for_load_state('networkidle')
# 检查订单列表
page.wait_for_selector('text=我的订单', timeout=5000)
# 点击第一个订单查看详情
first_order = page.locator('.order-card').first
if first_order.count() > 0:
first_order.click()
page.wait_for_url('**/orders/**')
# 截图
page.screenshot(path='test_screenshots/h5_orders.png', full_page=True)
self.log_test("C端订单查询", "PASS", "订单列表展示正常")
return True
except Exception as e:
self.log_test("C端订单查询", "FAIL", str(e))
return False
def test_h5_points_detail(self, page: Page):
"""测试C端积分明细"""
try:
print("\n=== 测试C端积分明细 ===")
# 导航到积分明细页面
page.goto(f"{self.h5_url}/points")
page.wait_for_load_state('networkidle')
# 检查积分明细列表
page.wait_for_selector('text=积分明细', timeout=5000)
# 截图
page.screenshot(path='test_screenshots/h5_points.png', full_page=True)
self.log_test("C端积分明细", "PASS", "积分明细展示正常")
return True
except Exception as e:
self.log_test("C端积分明细", "FAIL", str(e))
return False
def test_admin_order_management(self, page: Page):
"""测试管理后台订单管理"""
try:
print("\n=== 测试:管理后台订单管理 ===")
# 导航到订单列表
page.goto(f"{self.admin_url}/orders")
page.wait_for_load_state('networkidle')
# 检查订单列表
page.wait_for_selector('text=兑换订单', timeout=5000)
# 筛选待处理订单
page.click('select[name="status"]')
page.click('option[value="1"]') # Pending = 1
# 等待筛选结果
time.sleep(2)
# 查看第一个订单详情
first_order = page.locator('table tbody tr').first
if first_order.count() > 0:
detail_button = first_order.locator('button:has-text("详情")')
detail_button.click()
page.wait_for_url('**/orders/**')
# 截图订单详情
page.screenshot(path='test_screenshots/admin_order_detail.png', full_page=True)
self.log_test("管理后台订单管理", "PASS", "订单管理功能正常")
return True
except Exception as e:
self.log_test("管理后台订单管理", "FAIL", str(e))
return False
def generate_report(self):
"""生成测试报告"""
print("\n" + "="*60)
print("测试报告")
print("="*60)
total = len(self.test_results)
passed = sum(1 for r in self.test_results if r['status'] == 'PASS')
failed = sum(1 for r in self.test_results if r['status'] == 'FAIL')
skipped = sum(1 for r in self.test_results if r['status'] == 'SKIP')
print(f"\n总测试数: {total}")
print(f"通过: {passed}")
print(f"失败: {failed}")
print(f"跳过: {skipped} ⏭️")
print(f"成功率: {(passed/total*100):.2f}%")
print("\n详细结果:")
print("-"*60)
for result in self.test_results:
status_emoji = "" if result['status'] == 'PASS' else "" if result['status'] == 'FAIL' else "⏭️"
print(f"{status_emoji} {result['test']}: {result['message']}")
# 保存JSON报告
with open('test_report.json', 'w', encoding='utf-8') as f:
json.dump(self.test_results, f, ensure_ascii=False, indent=2)
print("\n测试报告已保存到: test_report.json")
print("截图已保存到: test_screenshots/ 目录")
def run_tests():
"""运行所有测试"""
admin_url = "http://localhost:3000"
h5_url = "http://localhost:5173"
print("一物一码会员营销系统 - 业务流程测试")
print(f"管理后台地址: {admin_url}")
print(f"C端应用地址: {h5_url}")
print("-"*60)
tester = BusinessFlowTester(admin_url, h5_url)
with sync_playwright() as p:
# 启动浏览器headless模式
browser = p.chromium.launch(headless=True)
# 测试管理后台
print("\n【开始测试管理后台】")
admin_page = browser.new_page()
if tester.test_admin_login(admin_page):
tester.test_admin_dashboard(admin_page)
tester.test_generate_marketing_codes(admin_page)
tester.test_create_points_rule(admin_page)
tester.test_create_gift(admin_page)
tester.test_gift_shelf_management(admin_page)
tester.test_admin_order_management(admin_page)
admin_page.close()
# 测试C端应用
print("\n【开始测试C端应用】")
h5_page = browser.new_page()
h5_page.set_viewport_size({"width": 375, "height": 667}) # 模拟移动设备
if tester.test_h5_register(h5_page):
tester.test_h5_scan_code(h5_page)
tester.test_h5_mall(h5_page)
# tester.test_h5_redeem_gift(h5_page) # 需要有足够积分
tester.test_h5_orders(h5_page)
tester.test_h5_points_detail(h5_page)
h5_page.close()
browser.close()
# 生成测试报告
tester.generate_report()
if __name__ == "__main__":
# 创建截图目录
import os
os.makedirs('test_screenshots', exist_ok=True)
run_tests()