-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredis_config_client.py
110 lines (88 loc) · 3.99 KB
/
redis_config_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import asyncio
import logging
import os
import socket
from typing import Optional, Any
from pydantic import BaseModel
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 加载环境变量
REDIS_URL = os.getenv("REDIS_URL", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
DEFAULT_CONFIG_GROUP = os.getenv("DEFAULT_CONFIG_GROUP", "default")
# 配置 Redis 客户端
import valkey.asyncio as avalkey
try:
redis_client = avalkey.Valkey(
host=REDIS_URL,
port=REDIS_PORT,
db=REDIS_DB,
password=REDIS_PASSWORD,
max_connections=1
)
except Exception as e:
logger.error(f"[Redis Config Center] 初始化 Redis 客户端失败: {e}")
raise
class ConfigGroup(BaseModel):
group_name: str
group_version: int
config_dict: Optional[dict] = None
class ConfigClient:
def __init__(self, config_server_name: str, refresh_time: int = 5):
self.config_server_name = config_server_name
self.config_group = ConfigGroup(group_name=DEFAULT_CONFIG_GROUP, group_version=-1, config_dict={})
self.refresh_time = refresh_time
self._refresh_task = None
def get_config(self, key: str) -> Optional[Any]:
config_group = self.get_config_group()
if config_group and config_group.config_dict is not None:
return config_group.config_dict.get(key, None)
return None
def get_config_group(self) -> Optional[ConfigGroup]:
return self.config_group
async def _get_config_group(self, group_name: str) -> Optional[ConfigGroup]:
try:
config_group_data = await redis_client.hget(self.config_server_name, group_name)
if config_group_data:
return ConfigGroup.model_validate_json(config_group_data)
logger.warning(f"[ConfigClient] 配置组 {group_name} 不存在")
except Exception as e:
logger.error(f"[ConfigClient] 获取配置组失败: {e}")
return None
async def _extract_current_ip_to_group_name(self) -> str:
try:
group_names_bytes = await redis_client.hkeys(self.config_server_name)
group_names = sorted([name.decode('utf-8') for name in group_names_bytes])
if not group_names or DEFAULT_CONFIG_GROUP in group_names:
return DEFAULT_CONFIG_GROUP
current_ip = socket.gethostbyname(socket.gethostname()).split('.')[-1]
index = int(current_ip) % len(group_names)
return group_names[index]
except Exception as e:
logger.error(f"[ConfigClient] 提取当前IP对应的配置组名失败: {e}")
return DEFAULT_CONFIG_GROUP
async def _schedule_refresh(self):
while True:
try:
new_group_name = await self._extract_current_ip_to_group_name()
new_config_group = await self._get_config_group(new_group_name)
if new_config_group and (
new_config_group.group_name != self.config_group.group_name or
new_config_group.group_version != self.config_group.group_version
):
self.config_group = new_config_group
logger.info(
f"[ConfigClient] 当前配置组 {new_group_name}, 版本 {new_config_group.group_version}, 配置已更新: {new_config_group.config_dict}"
)
await asyncio.sleep(self.refresh_time)
except Exception as e:
logger.error(f"[ConfigClient] 刷新配置失败: {e}")
async def start(self):
# 确保启动刷新任务时使用当前事件循环
loop = asyncio.get_event_loop()
loop.create_task(self._schedule_refresh())
logger.info("[ConfigClient] Redis client config center started.")
config_client = ConfigClient(config_server_name="rcc::config::redis_config_center", refresh_time=5)