""" 启动前后端服务并运行业务流程测试 """ import subprocess import sys import time import os from pathlib import Path def check_port(port: int) -> bool: """检查端口是否被占用""" import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex(('127.0.0.1', port)) sock.close() return result == 0 def wait_for_server(url: str, max_retries: int = 30) -> bool: """等待服务器启动""" import urllib.request import urllib.error print(f"等待服务器启动: {url}") for i in range(max_retries): try: urllib.request.urlopen(url, timeout=1) print(f"✅ 服务器已启动: {url}") return True except: time.sleep(1) print(f"等待中... ({i+1}/{max_retries})") print(f"❌ 服务器启动超时: {url}") return False def main(): demo_dir = Path(__file__).parent backend_dir = demo_dir / "Backend" / "src" / "Fengling.Backend.Web" admin_dir = demo_dir / "Frontend" / "Fengling.Backend.Admin" h5_dir = demo_dir / "Frontend" / "Fengling.H5" processes = [] try: print("="*60) print("一物一码会员营销系统 - 自动化测试") print("="*60) # 1. 启动后端服务 print("\n【步骤 1/4】启动后端服务...") if not check_port(5511): backend_cmd = ["dotnet", "run"] backend_process = subprocess.Popen( backend_cmd, cwd=str(backend_dir), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) processes.append(("后端服务", backend_process)) # 等待后端启动 if not wait_for_server("http://localhost:5511/health", max_retries=60): print("❌ 后端服务启动失败") return else: print("✅ 后端服务已在运行 (端口 5511)") # 2. 启动管理后台 print("\n【步骤 2/4】启动管理后台...") if not check_port(3000): admin_cmd = ["pnpm", "dev"] admin_process = subprocess.Popen( admin_cmd, cwd=str(admin_dir), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) processes.append(("管理后台", admin_process)) # 等待管理后台启动 if not wait_for_server("http://localhost:3000"): print("❌ 管理后台启动失败") return else: print("✅ 管理后台已在运行 (端口 3000)") # 3. 启动C端应用 print("\n【步骤 3/4】启动C端应用...") if not check_port(5173): h5_cmd = ["pnpm", "dev"] h5_process = subprocess.Popen( h5_cmd, cwd=str(h5_dir), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) processes.append(("C端应用", h5_process)) # 等待C端应用启动 if not wait_for_server("http://localhost:5173"): print("❌ C端应用启动失败") return else: print("✅ C端应用已在运行 (端口 5173)") print("\n所有服务已启动!") print("-"*60) print("后端服务: http://localhost:5511") print("管理后台: http://localhost:3000") print("C端应用: http://localhost:5173") print("-"*60) # 4. 运行测试 print("\n【步骤 4/4】运行业务流程测试...") time.sleep(3) # 给服务一点额外的稳定时间 test_cmd = [sys.executable, "test_business_flow.py"] test_result = subprocess.run(test_cmd, cwd=str(demo_dir)) if test_result.returncode == 0: print("\n✅ 测试执行完成") else: print("\n❌ 测试执行失败") except KeyboardInterrupt: print("\n\n收到中断信号,正在停止服务...") except Exception as e: print(f"\n❌ 发生错误: {e}") finally: # 清理进程 print("\n正在清理进程...") for name, process in processes: try: process.terminate() process.wait(timeout=5) print(f"✅ {name} 已停止") except: process.kill() print(f"⚠️ {name} 已强制停止") if __name__ == "__main__": main()