-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
121 lines (105 loc) · 4.29 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import asyncio
import json
import logging
import websockets
import aiohttp
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TunnelClient:
def __init__(self, server_url, local_port, public_port, custom_domain=None):
self.server_url = server_url
self.local_port = local_port
self.public_port = public_port
self.custom_domain = custom_domain
self.websocket = None
self.client_id = None
self.running = False
async def connect_websocket(self):
while True:
try:
async with websockets.connect(f"{self.server_url}/ws/{self.client_id}") as websocket:
self.websocket = websocket
logger.info("Connected to server")
# 发送隧道请求
await self.send_tunnel_request()
# 保持连接并处理消息
while True:
message = await websocket.recv()
await self.handle_message(message)
except websockets.exceptions.ConnectionClosed:
logger.warning("Connection closed, attempting to reconnect...")
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Error in websocket connection: {str(e)}")
await asyncio.sleep(5)
async def send_tunnel_request(self):
request = {
"type": "tunnel_request",
"local_port": self.local_port,
"public_port": self.public_port,
"custom_domain": self.custom_domain
}
await self.websocket.send(json.dumps(request))
async def handle_message(self, message):
try:
data = json.loads(message)
if data["type"] == "tunnel_request":
# 处理新的隧道请求
await self.handle_tunnel_request(data)
elif data["type"] == "tunnel_response":
logger.info(f"Tunnel response received: {data['status']}")
except json.JSONDecodeError:
logger.error(f"Invalid JSON received: {message}")
async def handle_tunnel_request(self, data):
try:
# 创建到本地服务的连接
async with aiohttp.ClientSession() as session:
async with session.request(
method=data["method"],
url=f"http://localhost:{self.local_port}{data['path']}",
headers=data["headers"],
data=data.get("body")
) as response:
# 将响应发送回服务器
response_data = {
"type": "tunnel_response",
"status": response.status,
"headers": dict(response.headers),
"body": await response.text()
}
await self.websocket.send(json.dumps(response_data))
except Exception as e:
logger.error(f"Error handling tunnel request: {str(e)}")
error_response = {
"type": "tunnel_response",
"status": 500,
"error": str(e)
}
await self.websocket.send(json.dumps(error_response))
async def start(self):
self.running = True
self.client_id = os.urandom(16).hex()
await self.connect_websocket()
async def stop(self):
self.running = False
if self.websocket:
await self.websocket.close()
async def main():
# 从环境变量或配置文件获取这些值
server_url = os.getenv("SERVER_URL", "ws://localhost:8080")
local_port = int(os.getenv("LOCAL_PORT", "8000"))
public_port = int(os.getenv("PUBLIC_PORT", "8888"))
custom_domain = os.getenv("CUSTOM_DOMAIN")
client = TunnelClient(server_url, local_port, public_port, custom_domain)
try:
await client.start()
except KeyboardInterrupt:
logger.info("Shutting down client...")
await client.stop()
if __name__ == "__main__":
asyncio.run(main())