MaiBot/src/think_flow_demo/outer_world.py

135 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#定义了来自外部世界的信息
import asyncio
from datetime import datetime
from src.plugins.models.utils_model import LLM_request
from src.plugins.config.config import global_config
from src.common.database import db
#存储一段聊天的大致内容
class Talking_info:
def __init__(self,chat_id):
self.chat_id = chat_id
self.talking_message = []
self.talking_message_str = ""
self.talking_summary = ""
self.last_observe_time = int(datetime.now().timestamp()) #初始化为当前时间
self.observe_times = 0
self.activate = 360
self.oberve_interval = 3
self.llm_summary = LLM_request(
model=global_config.llm_outer_world, temperature=0.7, max_tokens=300, request_type="outer_world")
async def start_observe(self):
while True:
if self.activate <= 0:
print(f"聊天 {self.chat_id} 活跃度不足,进入休眠状态")
await self.waiting_for_activate()
print(f"聊天 {self.chat_id} 被重新激活")
await self.observe_world()
await asyncio.sleep(self.oberve_interval)
async def waiting_for_activate(self):
while True:
# 检查从上次观察时间之后的新消息数量
new_messages_count = db.messages.count_documents({
"chat_id": self.chat_id,
"time": {"$gt": self.last_observe_time}
})
if new_messages_count > 15:
self.activate = 360*(self.observe_times+1)
return
await asyncio.sleep(8) # 每10秒检查一次
async def observe_world(self):
# 查找新消息限制最多20条
new_messages = list(db.messages.find({
"chat_id": self.chat_id,
"time": {"$gt": self.last_observe_time}
}).sort("time", 1).limit(20)) # 按时间正序排列最多20条
if not new_messages:
self.activate += -1
return
# 将新消息添加到talking_message同时保持列表长度不超过20条
self.talking_message.extend(new_messages)
if len(self.talking_message) > 20:
self.talking_message = self.talking_message[-20:] # 只保留最新的20条
self.translate_message_list_to_str()
# print(self.talking_message_str)
self.observe_times += 1
self.last_observe_time = new_messages[-1]["time"]
if self.observe_times > 3:
await self.update_talking_summary()
# print(f"更新了聊天总结:{self.talking_summary}")
async def update_talking_summary(self):
#基于已经有的talking_summary和新的talking_message生成一个summary
prompt = ""
prompt = f"你正在参与一个qq群聊的讨论这个群之前在聊的内容是{self.talking_summary}\n"
prompt += f"现在群里的群友们产生了新的讨论,有了新的发言,具体内容如下:{self.talking_message_str}\n"
prompt += '''以上是群里在进行的聊天,请你对这个聊天内容进行总结,总结内容要包含聊天的大致内容,
以及聊天中的一些重要信息,记得不要分点,不要太长,精简的概括成一段文本\n'''
prompt += "总结概括:"
self.talking_summary, reasoning_content = await self.llm_summary.generate_response_async(prompt)
def translate_message_list_to_str(self):
self.talking_message_str = ""
for message in self.talking_message:
self.talking_message_str += message["detailed_plain_text"]
class SheduleInfo:
def __init__(self):
self.shedule_info = ""
class OuterWorld:
def __init__(self):
self.talking_info_list = [] #装的一堆talking_info
self.shedule_info = "无日程"
# self.interest_info = "麦麦你好"
self.outer_world_info = ""
self.start_time = int(datetime.now().timestamp())
self.llm_summary = LLM_request(
model=global_config.llm_outer_world, temperature=0.7, max_tokens=600, request_type="outer_world_info")
async def check_and_add_new_observe(self):
# 获取所有聊天流
all_streams = db.chat_streams.find({})
# 遍历所有聊天流
for data in all_streams:
stream_id = data.get("stream_id")
# 检查是否已存在该聊天流的观察对象
existing_info = next((info for info in self.talking_info_list if info.chat_id == stream_id), None)
# 如果不存在创建新的Talking_info对象并添加到列表中
if existing_info is None:
print(f"发现新的聊天流: {stream_id}")
new_talking_info = Talking_info(stream_id)
self.talking_info_list.append(new_talking_info)
# 启动新对象的观察任务
asyncio.create_task(new_talking_info.start_observe())
async def open_eyes(self):
while True:
print("检查新的聊天流")
await self.check_and_add_new_observe()
await asyncio.sleep(60)
def get_world_by_stream_id(self,stream_id):
for talking_info in self.talking_info_list:
if talking_info.chat_id == stream_id:
return talking_info
return None
outer_world = OuterWorld()
if __name__ == "__main__":
asyncio.run(outer_world.open_eyes())