mirror of https://github.com/Mai-with-u/MaiBot.git
feat:可记录和查询群昵称
parent
ff56bd043c
commit
4032a2ca9e
|
|
@ -84,10 +84,19 @@ class HeartFCMessageReceiver:
|
|||
|
||||
logger.info(f"[{mes_name}]{userinfo.user_nickname}:{processed_plain_text}") # type: ignore
|
||||
|
||||
# 如果是群聊,获取群号和群昵称
|
||||
group_id = None
|
||||
group_nick_name = None
|
||||
if chat.group_info:
|
||||
group_id = chat.group_info.group_id # type: ignore
|
||||
group_nick_name = userinfo.user_cardname # type: ignore
|
||||
|
||||
_ = Person.register_person(
|
||||
platform=message.message_info.platform, # type: ignore
|
||||
user_id=message.message_info.user_info.user_id, # type: ignore
|
||||
nickname=userinfo.user_nickname, # type: ignore
|
||||
group_id=group_id,
|
||||
group_nick_name=group_nick_name,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -267,6 +267,7 @@ class PersonInfo(BaseModel):
|
|||
platform = TextField() # 平台
|
||||
user_id = TextField(index=True) # 用户ID
|
||||
nickname = TextField(null=True) # 用户昵称
|
||||
group_nick_name = TextField(null=True) # 群昵称列表 (JSON格式,存储 [{"group_id": str, "group_nick_name": str}])
|
||||
memory_points = TextField(null=True) # 个人印象的点
|
||||
know_times = FloatField(null=True) # 认识时间 (时间戳)
|
||||
know_since = FloatField(null=True) # 首次印象总结时间
|
||||
|
|
|
|||
|
|
@ -12,6 +12,50 @@ from .tool_registry import register_memory_retrieval_tool
|
|||
logger = get_logger("memory_retrieval_tools")
|
||||
|
||||
|
||||
def _format_group_nick_names(group_nick_name_field) -> str:
|
||||
"""格式化群昵称信息
|
||||
|
||||
Args:
|
||||
group_nick_name_field: 群昵称字段(可能是字符串JSON或None)
|
||||
|
||||
Returns:
|
||||
str: 格式化后的群昵称信息字符串
|
||||
"""
|
||||
if not group_nick_name_field:
|
||||
return ""
|
||||
|
||||
try:
|
||||
# 解析JSON格式的群昵称列表
|
||||
group_nick_names_data = json.loads(group_nick_name_field) if isinstance(group_nick_name_field, str) else group_nick_name_field
|
||||
|
||||
if not isinstance(group_nick_names_data, list) or not group_nick_names_data:
|
||||
return ""
|
||||
|
||||
# 格式化群昵称列表
|
||||
group_nick_list = []
|
||||
for item in group_nick_names_data:
|
||||
if isinstance(item, dict):
|
||||
group_id = item.get("group_id", "未知群号")
|
||||
group_nick_name = item.get("group_nick_name", "未知群昵称")
|
||||
group_nick_list.append(f" - 群号 {group_id}:{group_nick_name}")
|
||||
elif isinstance(item, str):
|
||||
# 兼容旧格式(如果存在)
|
||||
group_nick_list.append(f" - {item}")
|
||||
|
||||
if group_nick_list:
|
||||
return "群昵称:\n" + "\n".join(group_nick_list)
|
||||
return ""
|
||||
except (json.JSONDecodeError, TypeError, ValueError) as e:
|
||||
logger.warning(f"解析群昵称信息失败: {e}")
|
||||
# 如果解析失败,尝试显示原始内容(截断)
|
||||
if isinstance(group_nick_name_field, str):
|
||||
preview = group_nick_name_field[:200]
|
||||
if len(group_nick_name_field) > 200:
|
||||
preview += "..."
|
||||
return f"群昵称(原始数据):{preview}"
|
||||
return ""
|
||||
|
||||
|
||||
async def query_person_info(person_name: str) -> str:
|
||||
"""根据person_name查询用户信息,使用模糊查询
|
||||
|
||||
|
|
@ -68,6 +112,11 @@ async def query_person_info(person_name: str) -> str:
|
|||
if record.user_id:
|
||||
result_parts.append(f"平台用户ID:{record.user_id}")
|
||||
|
||||
# 群昵称信息
|
||||
group_nick_name_str = _format_group_nick_names(getattr(record, "group_nick_name", None))
|
||||
if group_nick_name_str:
|
||||
result_parts.append(group_nick_name_str)
|
||||
|
||||
# 名称设定原因
|
||||
if record.name_reason:
|
||||
result_parts.append(f"名称设定原因:{record.name_reason}")
|
||||
|
|
@ -132,6 +181,11 @@ async def query_person_info(person_name: str) -> str:
|
|||
if record.user_id:
|
||||
result_parts.append(f"平台用户ID:{record.user_id}")
|
||||
|
||||
# 群昵称信息
|
||||
group_nick_name_str = _format_group_nick_names(getattr(record, "group_nick_name", None))
|
||||
if group_nick_name_str:
|
||||
result_parts.append(group_nick_name_str)
|
||||
|
||||
# 名称设定原因
|
||||
if record.name_reason:
|
||||
result_parts.append(f"名称设定原因:{record.name_reason}")
|
||||
|
|
@ -219,7 +273,7 @@ def register_tool():
|
|||
"""注册工具"""
|
||||
register_memory_retrieval_tool(
|
||||
name="query_person_info",
|
||||
description="根据查询某个用户的所有信息。名称、昵称、平台、用户ID、qq号等",
|
||||
description="根据查询某个用户的所有信息。名称、昵称、平台、用户ID、qq号、群昵称等",
|
||||
parameters=[
|
||||
{
|
||||
"name": "person_name",
|
||||
|
|
|
|||
|
|
@ -160,7 +160,9 @@ def levenshtein_distance(s1: str, s2: str) -> int:
|
|||
|
||||
class Person:
|
||||
@classmethod
|
||||
def register_person(cls, platform: str, user_id: str, nickname: str):
|
||||
def register_person(
|
||||
cls, platform: str, user_id: str, nickname: str, group_id: Optional[str] = None, group_nick_name: Optional[str] = None
|
||||
):
|
||||
"""
|
||||
注册新用户的类方法
|
||||
必须输入 platform、user_id 和 nickname 参数
|
||||
|
|
@ -169,6 +171,8 @@ class Person:
|
|||
platform: 平台名称
|
||||
user_id: 用户ID
|
||||
nickname: 用户昵称
|
||||
group_id: 群号(可选,仅在群聊时提供)
|
||||
group_nick_name: 群昵称(可选,仅在群聊时提供)
|
||||
|
||||
Returns:
|
||||
Person: 新注册的Person实例
|
||||
|
|
@ -182,7 +186,11 @@ class Person:
|
|||
|
||||
if is_person_known(person_id=person_id):
|
||||
logger.debug(f"用户 {nickname} 已存在")
|
||||
return Person(person_id=person_id)
|
||||
person = Person(person_id=person_id)
|
||||
# 如果是群聊,更新群昵称
|
||||
if group_id and group_nick_name:
|
||||
person.add_group_nick_name(group_id, group_nick_name)
|
||||
return person
|
||||
|
||||
# 创建Person实例
|
||||
person = cls.__new__(cls)
|
||||
|
|
@ -201,6 +209,11 @@ class Person:
|
|||
person.know_since = time.time()
|
||||
person.last_know = time.time()
|
||||
person.memory_points = []
|
||||
person.group_nick_name = [] # 初始化群昵称列表
|
||||
|
||||
# 如果是群聊,添加群昵称
|
||||
if group_id and group_nick_name:
|
||||
person.add_group_nick_name(group_id, group_nick_name)
|
||||
|
||||
# 同步到数据库
|
||||
person.sync_to_database()
|
||||
|
|
@ -217,6 +230,7 @@ class Person:
|
|||
self.platform = platform
|
||||
self.nickname = global_config.bot.nickname
|
||||
self.person_name = global_config.bot.nickname
|
||||
self.group_nick_name: list[dict[str, str]] = []
|
||||
return
|
||||
|
||||
self.user_id = ""
|
||||
|
|
@ -255,6 +269,7 @@ class Person:
|
|||
self.know_since = None
|
||||
self.last_know: Optional[float] = None
|
||||
self.memory_points = []
|
||||
self.group_nick_name: list[dict[str, str]] = [] # 群昵称列表,存储 {"group_id": str, "group_nick_name": str}
|
||||
|
||||
# 从数据库加载数据
|
||||
self.load_from_database()
|
||||
|
|
@ -342,6 +357,31 @@ class Person:
|
|||
return memory_list
|
||||
return random.sample(memory_list, num)
|
||||
|
||||
def add_group_nick_name(self, group_id: str, group_nick_name: str):
|
||||
"""
|
||||
添加或更新群昵称
|
||||
|
||||
Args:
|
||||
group_id: 群号
|
||||
group_nick_name: 群昵称
|
||||
"""
|
||||
if not group_id or not group_nick_name:
|
||||
return
|
||||
|
||||
# 检查是否已存在该群号的记录
|
||||
for item in self.group_nick_name:
|
||||
if item.get("group_id") == group_id:
|
||||
# 更新现有记录
|
||||
item["group_nick_name"] = group_nick_name
|
||||
self.sync_to_database()
|
||||
logger.debug(f"更新用户 {self.person_id} 在群 {group_id} 的群昵称为 {group_nick_name}")
|
||||
return
|
||||
|
||||
# 添加新记录
|
||||
self.group_nick_name.append({"group_id": group_id, "group_nick_name": group_nick_name})
|
||||
self.sync_to_database()
|
||||
logger.debug(f"添加用户 {self.person_id} 在群 {group_id} 的群昵称 {group_nick_name}")
|
||||
|
||||
def load_from_database(self):
|
||||
"""从数据库加载个人信息数据"""
|
||||
try:
|
||||
|
|
@ -372,6 +412,21 @@ class Person:
|
|||
else:
|
||||
self.memory_points = []
|
||||
|
||||
# 处理group_nick_name字段(JSON格式的列表)
|
||||
if record.group_nick_name:
|
||||
try:
|
||||
loaded_group_nick_names = json.loads(record.group_nick_name)
|
||||
# 确保是列表格式
|
||||
if isinstance(loaded_group_nick_names, list):
|
||||
self.group_nick_name = loaded_group_nick_names
|
||||
else:
|
||||
self.group_nick_name = []
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
logger.warning(f"解析用户 {self.person_id} 的group_nick_name字段失败,使用默认值")
|
||||
self.group_nick_name = []
|
||||
else:
|
||||
self.group_nick_name = []
|
||||
|
||||
logger.debug(f"已从数据库加载用户 {self.person_id} 的信息")
|
||||
else:
|
||||
self.sync_to_database()
|
||||
|
|
@ -403,6 +458,9 @@ class Person:
|
|||
)
|
||||
if self.memory_points
|
||||
else json.dumps([], ensure_ascii=False),
|
||||
"group_nick_name": json.dumps(self.group_nick_name, ensure_ascii=False)
|
||||
if self.group_nick_name
|
||||
else json.dumps([], ensure_ascii=False),
|
||||
}
|
||||
|
||||
# 检查记录是否存在
|
||||
|
|
|
|||
Loading…
Reference in New Issue