MaiBot/helm-chart/preprocessor/preprocessor.py

238 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/bin/python3
# 此脚本会被helm chart的post-install hook触发在正式部署后通过k8s的job自动运行一次。
# 这个脚本的作用是在部署helm chart时迁移旧版ConfigMap到配置文件并调整adapter的配置文件中的服务监听和服务连接字段。
# - 迁移旧版ConfigMap到配置文件是因为0.11.6-beta之前版本的helm chart将各个配置文件存储在k8s的ConfigMap中
# 由于功能复杂度提升自0.11.6-beta版本开始配置文件采用文件形式存储到存储卷中。
# 从旧版升级来的用户会通过这个脚本自动执行配置的迁移。
# - 需要调整adapter的配置文件的原因是:
# 1. core的Service的DNS名称是动态的由安装实例名拼接无法在adapter的配置文件中提前确定。
# 用于对外连接的maibot_server.host和maibot_server.port字段会被替换为core的Service对应的DNS名称和8000端口硬编码用户无需配置
# 2. 为了使adapter监听所有地址以及保持chart中配置的端口号需要在adapter的配置文件中覆盖这些配置。
# 用于监听的napcat_server.host和napcat_server.port字段会被替换为0.0.0.0和8095端口实际映射到的Service端口会在Service中配置
import os
import toml
import time
import base64
from kubernetes import client, config
from kubernetes.client.exceptions import ApiException
from datetime import datetime, timezone
config.load_incluster_config()
core_api = client.CoreV1Api()
apps_api = client.AppsV1Api()
# 读取部署的关键信息
with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", 'r') as f:
namespace = f.read().strip()
release_name = os.getenv("RELEASE_NAME").strip()
config_adapter_b64 = os.getenv("CONFIG_ADAPTER_B64")
config_core_env_b64 = os.getenv("CONFIG_CORE_ENV_B64")
config_core_bot_b64 = os.getenv("CONFIG_CORE_BOT_B64")
config_core_model_b64 = os.getenv("CONFIG_CORE_MODEL_B64")
def log(func: str, msg: str, level: str = 'INFO'):
print(f'[{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}] [{level}] [{func}] {msg}')
def migrate_old_config():
"""迁移旧版配置"""
func_name = 'migrate_old_config'
log(func_name, 'Checking whether there are old configmaps to migrate...')
old_configmap_version = None
status_migrating = { # 存储adapter的config.toml、core的bot_config.toml和model_config.toml三个文件的迁移状态
'adapter_config.toml': False,
'core_bot_config.toml': False,
'core_model_config.toml': False
}
# 如果存储卷中已存在配置文件,则跳过迁移
if os.path.isfile('/app/config/core/bot_config.toml') or os.path.isfile('/app/config/core/model_config.toml') or \
os.path.isfile('/app/config/adapter/config.toml'):
log(func_name, 'Found existing config file(s) in PV. Migration will be ignored. Done.')
return
def migrate_cm_to_file(cm_name: str, key_name: str, file_path: str) -> bool:
"""检测是否有指定名称的configmap如果有的话备份到指定的配置文件里并删除configmap返回是否已备份"""
try:
cm = core_api.read_namespaced_config_map(
name=cm_name,
namespace=namespace
)
log(func_name, f'\tMigrating `{key_name}` of `{cm_name}`...')
with open(file_path, 'w', encoding='utf-8') as _f:
_f.write(cm.data[key_name])
core_api.delete_namespaced_config_map(
name=cm_name,
namespace=namespace
)
log(func_name, f'\tSuccessfully migrated `{key_name}` of `{cm_name}`.')
except ApiException as e:
if e.status == 404:
return False
return True
# 对于0.11.5-beta版本adapter的config.toml、core的bot_config.toml和model_config.toml均存储于不同的ConfigMap需要依次迁移
if True not in status_migrating.values():
status_migrating['adapter_config.toml'] = migrate_cm_to_file(f'{release_name}-maibot-adapter-config',
'config.toml',
'/app/config/adapter/config.toml')
status_migrating['core_bot_config.toml'] = migrate_cm_to_file(f'{release_name}-maibot-core-bot-config',
'bot_config.toml',
'/app/config/core/bot_config.toml')
status_migrating['core_model_config.toml'] = migrate_cm_to_file(f'{release_name}-maibot-core-model-config',
'model_config.toml',
'/app/config/core/model_config.toml')
if True in status_migrating.values():
old_configmap_version = '0.11.5-beta'
# 对于低于0.11.5-beta的版本adapter的1个配置和core的3个配置位于各自的configmap中
if True not in status_migrating.values():
status_migrating['adapter_config.toml'] = migrate_cm_to_file(f'{release_name}-maibot-adapter',
'config.toml',
'/app/config/adapter/config.toml')
status_migrating['core_bot_config.toml'] = migrate_cm_to_file(f'{release_name}-maibot-core',
'bot_config.toml',
'/app/config/core/bot_config.toml')
status_migrating['core_model_config.toml'] = migrate_cm_to_file(f'{release_name}-maibot-core',
'model_config.toml',
'/app/config/core/model_config.toml')
if True in status_migrating.values():
old_configmap_version = 'before 0.11.5-beta'
if old_configmap_version:
log(func_name, f'Migrating status for version `{old_configmap_version}`:')
for k, v in status_migrating.items():
log(func_name, f'\t{k}: {v}')
if False in status_migrating.values():
log(func_name, 'There is/are config(s) that not been migrated. Please check the config manually.',
level='WARNING')
else:
log(func_name, 'Successfully migrated old configs. Done.')
else:
log(func_name, 'Old config not found. Ignoring migration. Done.')
def write_config_files():
"""当注入了配置文件时一般是首次安装或者用户指定覆盖将helm chart注入的配置写入存储卷中的实际文件"""
func_name = 'write_config_files'
log(func_name, 'Detecting config files...')
if config_adapter_b64:
log(func_name, '\tWriting `config.toml` of adapter...')
config_str = base64.b64decode(config_adapter_b64).decode("utf-8")
with open('/app/config/adapter/config.toml', 'w', encoding='utf-8') as _f:
_f.write(config_str)
log(func_name, '\t`config.toml` of adapter wrote.')
if True: # .env直接覆盖
log(func_name, '\tWriting .env file of core...')
config_str = base64.b64decode(config_core_env_b64).decode("utf-8")
with open('/app/config/core/.env', 'w', encoding='utf-8') as _f:
_f.write(config_str)
log(func_name, '\t`.env` of core wrote.')
if config_core_bot_b64:
log(func_name, '\tWriting `bot_config.toml` of core not found. Creating...')
config_str = base64.b64decode(config_core_bot_b64).decode("utf-8")
with open('/app/config/core/bot_config.toml', 'w', encoding='utf-8') as _f:
_f.write(config_str)
log(func_name, '\t`bot_config.toml` of core wrote.')
if config_core_model_b64:
log(func_name, '\tWriting `model_config.toml` of core...')
config_str = base64.b64decode(config_core_model_b64).decode("utf-8")
with open('/app/config/core/model_config.toml', 'w', encoding='utf-8') as _f:
_f.write(config_str)
log(func_name, '\t`model_config.toml` of core wrote.')
log(func_name, 'Detection done.')
def reconfigure_adapter():
"""调整adapter的配置文件的napcat_server和maibot_server字段使其Service能被napcat连接以及连接到core的Service"""
func_name = 'reconfigure_adapter'
log(func_name, 'Reconfiguring `config.toml` of adapter...')
with open('/app/config/adapter/config.toml', 'r', encoding='utf-8') as _f:
config_adapter = toml.load(_f)
config_adapter.setdefault('napcat_server', {})
config_adapter['napcat_server']['host'] = '0.0.0.0'
config_adapter['napcat_server']['port'] = 8095
config_adapter.setdefault('maibot_server', {})
config_adapter['maibot_server']['host'] = f'{release_name}-maibot-core' # 根据release名称动态拼接core服务的DNS名称
config_adapter['maibot_server']['port'] = 8000
with open('/app/config/adapter/config.toml', 'w', encoding='utf-8') as _f:
_f.write(toml.dumps(config_adapter))
log(func_name, 'Reconfiguration done.')
def _scale_statefulsets(statefulsets: list[str], replicas: int, wait: bool = False, timeout: int = 300):
"""调整指定几个statefulset的副本数wait参数控制是否等待调整完成再返回"""
statefulsets = set(statefulsets)
for name in statefulsets:
apps_api.patch_namespaced_stateful_set_scale(
name=name,
namespace=namespace,
body={"spec": {"replicas": replicas}}
)
if not wait:
return
start_time = time.time()
while True:
remaining_pods = []
pods = core_api.list_namespaced_pod(namespace).items
for pod in pods:
owners = pod.metadata.owner_references or []
for owner in owners:
if owner.kind == "StatefulSet" and owner.name in statefulsets:
remaining_pods.append(pod.metadata.name)
if not remaining_pods:
return
elapsed = time.time() - start_time
if elapsed > timeout:
raise TimeoutError(
f"Timeout waiting for Pods to be deleted. "
f"Remaining Pods: {remaining_pods}"
)
time.sleep(5)
def _restart_statefulset(name: str, ignore_error: bool = False):
"""重启指定的statefulset"""
now = datetime.now(timezone.utc).isoformat()
body = {
"spec": {
"template": {
"metadata": {
"annotations": {
"kubectl.kubernetes.io/restartedAt": now
}
}
}
}
}
try:
apps_api.patch_namespaced_stateful_set(
name=name,
namespace=namespace,
body=body
)
except ApiException as e:
if ignore_error:
pass
else:
raise e
if __name__ == '__main__':
log('main', 'Start to process data before install/upgrade...')
log('main', 'Scaling adapter and core to 0...')
_scale_statefulsets([f'{release_name}-maibot-adapter', f'{release_name}-maibot-core'], 0, wait=True)
migrate_old_config()
write_config_files()
reconfigure_adapter()
log('main', 'Scaling adapter and core to 1...')
_scale_statefulsets([f'{release_name}-maibot-adapter', f'{release_name}-maibot-core'], 1)
log('main', 'Process done.')