文章目录

    • 系列文章
    • 本文主要内容
    • Task4 – 任务一:独立实现对Github Trending页面的爬取,并获取每一个项目的 名称、URL链接、描述
      • 完整代码及注释
    • Task4 – 任务二:独立完成对Huggingface Papers页面的爬取
      • 代码及注释
    • Task4 – 任务三:形成一篇资讯文档
      • 代码及注释
      • 可能存在的问题和思考
    • Task4 – 任务四:实现邮箱发送的功能
      • 代码及注释

系列文章

【AI的未来 – AI Agent系列】【MetaGPT】0. 你的第一个MetaGPT程序

【AI的未来 – AI Agent系列】【MetaGPT】1. AI Agent如何重构世界

【AI的未来 – AI Agent系列】【MetaGPT】2. 实现自己的第一个Agent

本文主要内容

以《MetaGPT智能体开发入门》课程的 Task4 为例,利用MetaGPT从0开始实现一个自己的订阅智能体,定时收集网页信息,将信息汇总然后自动发送到微信和邮箱。

根据上面教程的介绍,你已经学会了如何针对具体场景开发一个实用的资讯收集助手;现在,你可以试着完成一个能订阅自己感兴趣的资讯的Agent:

  • 根据前面你所学习的爬虫基本知识(如果你对写爬虫代码感到不熟练,使用GPT帮助你),为你的Agent自定义两个获取资讯的Action类
    • Action 1:根据第四章 3.2.1和3.2.2的指引,独立实现对Github Trending(https://github.com/trending)页面的爬取,并获取每一个项目的 名称、URL链接、描述
    • Action 2:独立完成对Huggingface Papers(https://huggingface.co/papers)页面的爬取,先获取到每一篇Paper的链接(提示:标题元素中的href标签),并通过链接访问标题的描述页面(例如:https://huggingface.co/papers/2312.03818),在页面中获取一篇Paper的
      标题、摘要
  • 参考第三章 1.4 的内容,重写有关方法,使你的Agent能自动生成总结内容的目录,然后根据二级标题进行分块,每块内容做出对应的总结,形成一篇资讯文档;
  • 自定义Agent的SubscriptionRunner类,独立实现Trigger、Callback的功能,让你的Agent定时为通知渠道发送以上总结的资讯文档(尝试实现邮箱发送的功能,这是加分项)

这节课其实就是告诉我们一个订阅智能体的主要组成部分,然后练习怎样用MetaGPT串起来。

  • 订阅智能体的主要组成部分:
    • 定时器:定时触发订阅任务
    • 订阅信息的获取和总结:爬虫 + 大模型
    • callback:经过总结的信息通过回调,给第三方发消息(微信、邮箱等)

Task4 – 任务一:独立实现对Github Trending页面的爬取,并获取每一个项目的 名称、URL链接、描述

这个跟着教程一步步走就行,过程中我没有遇到太大的问题,主要问题是import的包不够…

完整代码及注释

  • 先看执行结果,微信上收到推送

  • 完整代码及细节注释

# 加载 .env 到环境变量from dotenv import load_dotenv, find_dotenv_ = load_dotenv(find_dotenv())import asynciofrom metagpt.subscription import SubscriptionRunnerfrom metagpt.roles import Searcherfrom metagpt.schema import Messageimport aiohttpfrom bs4 import BeautifulSoupfrom metagpt.actions.action import Actionfrom metagpt.logs import logger# 1. Action1,爬虫,爬取 GitHub Trending 页面class CrawlOSSTrending(Action):"""爬取 GitHub Trending 页面"""async def run(self, url: str = "https://github.com/trending"):async with aiohttp.ClientSession() as client:# async with client.get(url, proxy=CONFIG.global_proxy) as response:async with client.get(url) as response: # 1.1 我这里直接就能访问github,所以不需要设置代理response.raise_for_status()html = await response.text()# logger.debug(html)soup = BeautifulSoup(html, 'html.parser')repositories = []## 1.2 解析Html页面元素,获取我们想要的数据:项目名称、url、描述、编程语言、star数量、fork数量for article in soup.select('article.Box-row'):repo_info = {}repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()# Descriptiondescription_element = article.select_one('p')repo_info['description'] = description_element.text.strip() if description_element else None# Languagelanguage_element = article.select_one('span[itemprop="programmingLanguage"]')repo_info['language'] = language_element.text.strip() if language_element else None# Stars and Forksstars_element = article.select('a.Link--muted')[0]forks_element = article.select('a.Link--muted')[1]repo_info['stars'] = stars_element.text.strip()repo_info['forks'] = forks_element.text.strip()# Today's Starstoday_stars_element = article.select_one('span.d-inline-block.float-sm-right')repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else Nonerepositories.append(repo_info)return repositoriesfrom typing import Anyfrom metagpt.actions.action import ActionTRENDING_ANALYSIS_PROMPT = """# RequirementsYou are a GitHub Trending Analyst, aiming to provide users with insightful and personalized recommendations based on the latestGitHub Trends. Based on the context, fill in the following missing information, generate engaging and informative titles, ensuring users discover repositories aligned with their interests.# The title about Today's GitHub Trending## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.---# Format Example# [Title]## Today's TrendsToday, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.The top popular projects are Project1 and Project2.## The Trends Categories1. Generative AI- [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]- [Project2](https://github/xx/project2): ......## Highlights of the List1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended]....---# Github Trending{trending}"""## 2. 总结爬取到的信息,其实就是 prompt + 爬取到的信息,丢给大模型然后获取结果class AnalysisOSSTrending(Action):async def run(self,trending: Any):return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))from metagpt.roles import Role## 3. 定义role,初始化上面的两个action,按顺序执行class OssWatcher(Role):def __init__(self,name="Codey",profile="OssWatcher",goal="Generate an insightful GitHub Trending analysis report.",constraints="Only analyze based on the provided GitHub Trending data.",):super().__init__(name, profile, goal, constraints)self._init_actions([CrawlOSSTrending, AnalysisOSSTrending]) ## 3.1 初始化两个actionself._set_react_mode(react_mode="by_order") ## 3.2 按顺序执行async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self._rc.todo}")todo = self._rc.todomsg = self.get_memories(k=1)[0] # 3.4 获取最新的一条memory,爬完后的数据存在里面供分析action使用result = await todo.run(msg.content)msg = Message(content=str(result), role=self.profile, cause_by=type(todo))self._rc.memory.add(msg)# 3.3 第一个action执行完,把结果存入memoryreturn msg##============================ 以上为智能体定义 =================================================from pydantic import BaseModel, Fieldimport time# Triggerclass OssInfo(BaseModel):url: strtimestamp: float = Field(default_factory=time.time)from typing import Optionalfrom pytz import BaseTzInfofrom aiocron import crontab## 4. 定时器的实现,用crontab库实现class GithubTrendingCronTrigger():def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:self.crontab = crontab(spec, tz=tz) ## 4.1 这里将定时策略传入crontabself.url = urldef __aiter__(self):return selfasync def __anext__(self): ## 4.3 __anext__ 方法会在每次迭代中被调用await self.crontab.next() ## 4.2 等待 crontab 下一次触发,不到定时时间,这里阻塞,不会执行后续代码return Message(self.url, OssInfo(url=self.url))##======================== 以上为定时器定义 =================================================import os # 5. wechat订阅消息callback,这里使用的是wxpusher实现的# WxPusherClient的功能是给指定用户推送消息class WxPusherClient:def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):self.base_url = base_urlself.token = token or os.environ["WXPUSHER_TOKEN"] # 5.1 从环境变量中获取token,所以你需要在环境变量中配置WXPUSHER_TOKEN或在配置文件中设置WXPUSHER_TOKENasync def send_message(self,content,summary: Optional[str] = None,content_type: int = 1,topic_ids: Optional[list[int]] = None,uids: Optional[list[int]] = None,verify: bool = False,url: Optional[str] = None,):payload = {"appToken": self.token,"content": content,"summary": summary,"contentType": content_type,"topicIds": topic_ids or [],# 5.2 从环境变量中获取uids,所以你需要在环境变量中配置WXPUSHER_UIDS# uids是你想推送给哪个微信,必须是关注了你这个订阅号的微信才可以知道uid"uids": uids or os.environ["WXPUSHER_UIDS"].split(","), "verifyPay": verify,"url": url,}url = f"{self.base_url}/api/send/message"return await self._request("POST", url, json=payload)async def _request(self, method, url, **kwargs):async with aiohttp.ClientSession() as session:async with session.request(method, url, **kwargs) as response:response.raise_for_status()return await response.json()# 5.3 微信callback wrapper,使用WxPusherClient给指定微信推送消息async def wxpusher_callback(msg: Message):client = WxPusherClient()await client.send_message(msg.content, content_type=3)##======================== 以上为回调和微信推送定义 =========================================# 6. 运行入口,这里为了测试方便,定时执行策略为当前时间+1分钟from datetime import datetime, timedeltacurrent_time = datetime.now() ## 6.1 获取当前时间target_time = current_time + timedelta(minutes=1) ## 6.2 目标时间,当前时间+1分钟cron_expression = target_time.strftime('%M %H %d %m %w')spec = cron_expressionlogger.info(f"cron expression: {spec}")async def main(spec: str = spec, wxpusher: bool = True):callbacks = []callbacks.append(wxpusher_callback)async def callback(msg):await asyncio.gather(*(call(msg) for call in callbacks)) # 6.3 遍历所有回调函数,触发回调,分发消息runner = SubscriptionRunner()await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback) # 订阅智能体,本例中所有的工作都由它组织起来await runner.run()if __name__ == "__main__":import firefire.Fire(main)
  • 这里只实现了微信推送,没有去做Discord。用微信跑通一遍流程,也算完整了。
  • 爬虫不太熟悉,这里使用的教程中的代码,但是自己也试着用GPT自己写了爬虫代码,后面可以看到。
  • 定时器不熟悉、网络请求aiohttp也没用过… 但是通过代码完全可以会用,照葫芦画瓢还是很简单的。后面分拆开来具体学。不是本课程重点。

Task4 – 任务二:独立完成对Huggingface Papers页面的爬取

因为Hugging face我访问不到,需要出海,所以我这里换了个爬取页面,爬的是 https://github.com/topics,github的topic页面,然后通过热门topic进入相应topic链接获取简介,如下图。


代码及注释

这里就不贴完整代码了,定时器和callback完全相同,主要修改的是爬虫Action

......## 1. 爬虫Actionclass CrawlOSSTrending(Action):## 1.2 进入相应topic详细页面,获取topic的具体介绍async def _get_brief(self, url):async with aiohttp.ClientSession() as client:async with client.get(url) as response:response.raise_for_status()html = await response.text()soup = BeautifulSoup(html, 'html.parser')topic_list = soup.find_all('div', class_='markdown-body f5 mb-2')for topic_item in topic_list:topic_intro_tag = topic_item.find('p')topic_intro = topic_intro_tag.text.strip() if topic_intro_tag else 'No introduction available.'return topic_intro## 1.1 爬取topics页面async def run(self, url: str = "https://github.com/topics"):async with aiohttp.ClientSession() as client:async with client.get(url) as response:response.raise_for_status()html = await response.text()soup = BeautifulSoup(html, 'html.parser')repositories = []topic_list = soup.find_all('li', class_='col-12 col-sm-6 col-md-4 mb-4')for topic_item in topic_list:repo_info = {}topic_link = topic_item.find('a', class_='no-underline')['href']## 1.1.1 解析出topics的名称repo_info['name'] = topic_item.find('p', class_='f3').text.strip() ## 1.1.2 解析出topics的urlrepo_info['url'] = f'https://github.com{topic_link}' ## 1.1.3 通过_get_brief进入详细页面获取介绍repo_info['description'] = await self._get_brief(repo_info['url'])repositories.append(repo_info)return repositories......
  • 运行结果展示

Task4 – 任务三:形成一篇资讯文档

这个任务其实没看懂什么意思,根据自己的理解实现了一种。

我认为这个任务其实不是练习什么新东西,而是学会怎样将现有的知识串起来,数据流打通。例如我下面的实现步骤。

    1. 爬取 github trending 信息
    1. 总结 github trending 信息,前面的任务到这里结束,给callback了
    1. 本任务将 github trending 总结信息给到 WriteDirectory 去生成目录
    1. 生成的目录给到WriteContent Action,根据目录写内容

代码及注释

  • 先看运行结果 – 微信推送

  • 先看运行结果 – Markdown文件

  • 代码及细节注释

# 加载 .env 到环境变量from dotenv import load_dotenv, find_dotenv_ = load_dotenv(find_dotenv())import asynciofrom metagpt.subscription import SubscriptionRunnerfrom metagpt.roles import Searcherfrom metagpt.schema import Messageimport aiohttpfrom bs4 import BeautifulSoupfrom metagpt.actions.action import Actionfrom metagpt.logs import loggerfrom datetime import datetimefrom typing import Dictfrom metagpt.actions.write_tutorial import WriteDirectory, WriteContentfrom metagpt.const import TUTORIAL_PATHfrom metagpt.roles import Rolefrom metagpt.utils.file import Fileimport fireimport timefrom metagpt.prompts.tutorial_assistant import DIRECTORY_PROMPT, CONTENT_PROMPTfrom metagpt.utils.common import OutputParser## 1.写目录的类,根据总结的GitHub Trending趋势,抽取出一二级标题class WriteDirectory(Action):"""Action class for writing tutorial directories."""def __init__(self, name: str = "", language: str = "Chinese", *args, **kwargs):super().__init__(name, *args, **kwargs)self.language = languageasync def run(self, topic: str, *args, **kwargs) -> Dict:"""Execute the action to generate a tutorial directory according to the topic.Args:topic: The tutorial topic.Returns:the tutorial directory information, including {"title": "xxx", "directory": [{"dir 1": ["sub dir 1", "sub dir 2"]}]}."""COMMON_PROMPT = """你需要在以下topic中抽取出标题内容,包含一级标题和二级标题。the topic "{topic}".注意:一级标题以"##"开头,二级标题以数字序号加"."开头。"""DIRECTORY_PROMPT = COMMON_PROMPT + """Please provide the specific table of contents for this tutorial, strictly following the following requirements:1. The output must be strictly in the specified language, {language}.2. Answer strictly in the dictionary format like {{"title": "xxx", "directory": [{{"dir 1": ["sub dir 1", "sub dir 2"]}}, {{"dir 2": ["sub dir 3", "sub dir 4"]}}]}}.3. The directory should be as specific and sufficient as possible, with a primary and secondary directory.The secondary directory is in the array.4. Do not have extra spaces or line breaks.5. Each directory title has practical significance."""prompt = DIRECTORY_PROMPT.format(topic=topic, language=self.language)resp = await self._aask(prompt=prompt)return OutputParser.extract_struct(resp, dict)## 2. 根据目录标题,写新闻稿class WriteContent(Action):"""Action class for writing tutorial content."""def __init__(self, name: str = "", directory: str = "", language: str = "Chinese", *args, **kwargs):super().__init__(name, *args, **kwargs)self.language = languageself.directory = directorylogger.info(f"init writeContent, {directory}")async def run(self, topic: str, *args, **kwargs) -> str:"""Execute the action to write document content according to the directory and topic."""COMMON_PROMPT = """你是一个github技术专家的老手了,现在你需要写一篇关于github每日热门项目趋势的新闻,新闻标题是"{topic}"."""CONTENT_PROMPT = COMMON_PROMPT + """ 现在我将给你这个主题的模块目录标题。请根据这个标题,写不少于500字的新闻稿。The module directory titles for the topic is as follows:{directory}Strictly limit output according to the following requirements:1. Follow the Markdown syntax format for layout.2. The output must be strictly in the specified language, {language}.3. Do not have redundant output, including concluding remarks.4. Strict requirement not to output the topic "{topic}"."""prompt = CONTENT_PROMPT.format(topic=topic, language=self.language, directory=self.directory)return await self._aask(prompt=prompt)## 3. 爬虫,爬取trending信息class CrawlOSSTrending(Action):async def run(self, url: str = "https://github.com/trending"):...... 与前面的代码完全相同return repositories......与前面的代码完全相同TRENDING_ANALYSIS_PROMPT## 3. 总结Trending的Actionclass AnalysisOSSTrending(Action):async def run(self,trending: Any):result = await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))return resultfrom metagpt.roles import Role## 4. 订阅智能体Roleclass OssWatcher(Role):def __init__(self,name="Codey",profile="OssWatcher",goal="Generate an insightful GitHub Trending analysis report.",constraints="Only analyze based on the provided GitHub Trending data.",):super().__init__(name, profile, goal, constraints)self._init_actions([CrawlOSSTrending, AnalysisOSSTrending, WriteDirectory(language="Chinese")])## 4.1 本任务的重点,在总结完trending之后,将内容直接给WriteDirectory去抽取目录# self._set_react_mode(react_mode="by_order") ## 4.2 这里与前面不一样,一定要注意不要写这一句!设置了这一句,会执行到 WriteDirectory就结束,不会执行后面动态添加的WriteContent Action,应该与执行顺序有关,后续看下源码self.topic = ""self.main_title = ""self.total_content = ""self.language = "Chinese"## 4.3 这里执行trendding爬虫和总结的actionasync def _act1(self) -> Message:logger.info(f"{self._setting}: ready to {self._rc.todo}")todo = self._rc.todomsg = self.get_memories(k=1)[0]result = await todo.run(msg.content)msg = Message(content=str(result), role=self.profile, cause_by=type(todo))self._rc.memory.add(msg) ## 4.4 思考1:这里将爬虫结果填入memory,下面又返回了memory,返回的memory在MetaGPT源码中不会再次添加进Memory了?什么时候需要手动添加,什么时候不用手动添加?return msgasync def _think(self) -> None:"""Determine the next action to be taken by the role."""logger.info(self._rc.state)...... 与之前代码一样async def _handle_directory(self, titles: Dict) -> Message:"""Handle the directories for the tutorial document."""self.main_title = titles.get("title")directory = f"{self.main_title}\n"self.total_content += f"# {self.main_title}"actions = list()for first_dir in titles.get("directory"):key = list(first_dir.keys())[0]directory += f"- {key}\n"for second_dir in first_dir[key]:directory += f"- {second_dir}\n"actions.append(WriteContent(language=self.language, directory=second_dir)) ## 4.5 根据目录,动态添加WriteContent Actionself._init_actions(actions)self._rc.todo = Nonereturn Message(content=directory)async def _act(self) -> Message:"""Perform an action as determined by the role."""todo = self._rc.todoif type(todo) is CrawlOSSTrending or type(todo) is AnalysisOSSTrending:return await self._act1() ## 4.6 trending相关的执行这个acttime.sleep(20) ## 4.7 避免OpenAI的请求速率和tokend限制,每次调用前先歇个20s......不是办法的办法if type(todo) is WriteDirectory:msg = self._rc.memory.get(k=1)[0] ## 4.8 这里拿的是AnalysisOSSTrending的结果,一大堆文字self.topic = msg.contentresp = await todo.run(topic=self.topic)return await self._handle_directory(resp)resp = await todo.run(topic="Github今日热门项目") ## 4.9 这里传入的是一个固定题目,WriteContent是根据这个题目和具体的目录写内容if self.total_content != "":self.total_content += "\n\n\n"self.total_content += respreturn Message(content=resp, role=self.profile) ## 4.10 思考2:你看,这里就直接返回了msg,没像思考1那里一样先手动添加msg :self._rc.memory.add(msg),需要弄清楚async def _react(self) -> Message:"""Execute the assistant's think and actions."""while True:await self._think()if self._rc.todo is None:breakmsg = await self._act()## 4.11 写文件root_path = TUTORIAL_PATH / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")await File.write(root_path, f"{self.main_title}.md", self.total_content.encode('utf-8'))return msg......定时器和callback,以及运行入口的代码都不变,与前面的任务一样

可能存在的问题和思考

  1. 我没有去详细验证生成的这些项目简介和项目特点是否完全准确,只是答题看着还可以。要详细验证,或保证结果的完全正确性,需要更精细的控制手段。

  2. 推送信息只推送最后一段,没有全文推送,这可能是与callback的msg内容只获取了最后一段有关,有空再调一下

  3. 有的目录没有生成内容,触发了大模型的限制?怎么解决?有待研究

  1. 自己写的Prompt很烂,勉强能用…
  2. Python代码水平比较烂,都是想到哪写到哪,读者见谅…

Task4 – 任务四:实现邮箱发送的功能

实现邮箱发送功能,也就是实现一个邮箱的callback,callback里调用邮箱接口发送信息。

代码及注释

  • 先看运行结果

  • 代码及细节注释
...... Action和role,以及定时器的代码不变import email.utilsfrom metagpt.schema import Messageimport smtplibfrom email.mime.text import MIMETextclass EmailClient:def __init__(self):passasync def send_email(self, sender_email, receiver_email, subject, message):# 设置发件人和收件人的邮箱地址sender = sender_emailreceiver = receiver_email# 设置邮件内容msg = MIMEText(message, 'plain', 'utf-8')msg['Subject'] = subjectmsg['From'] = email.utils.formataddr(('同学小张', sender))msg['To'] = receiver_email# 连接到SMTP服务器smtp_server = 'smtp.qq.com'smtp_port = 25smtp_username = '819418509@qq.com'smtp_password = 'xxxxxxxxx' # 此处密码是登录QQ邮箱后开启STMP处生成的,非账号密码try:server = smtplib.SMTP(smtp_server, smtp_port)server.login(smtp_username, smtp_password)server.sendmail(sender, receiver, msg.as_string())server.quit()print("邮件发送成功!")except Exception as e:print("邮件发送失败:", str(e))async def email_pusher_callback(msg: Message):client = EmailClient()# 调用函数发送邮件sender_email = '819418509@qq.com' # 发送邮箱receiver_email = 'xxxxxxx@163.com' # 接收邮箱subject = '今日GitHub Trending分析'message = msg.contentawait client.send_email(sender_email, receiver_email, subject, message)# asyncio.run(email_pusher_callback(Message("test"))) # 邮箱接口测试# 运行入口,from datetime import datetime, timedeltacurrent_time = datetime.now()target_time = current_time + timedelta(minutes=1)cron_expression = target_time.strftime('%M %H %d %m %w')spec = cron_expressionlogger.info(f"cron expression: {spec}")async def main(spec: str = spec, wxpusher: bool = True):callbacks = []callbacks.append(wxpusher_callback) # 微信callbackcallbacks.append(email_pusher_callback) # 邮箱callbackif not callbacks:async def _print(msg: Message):print(msg.content)callbacks.append(_print)async def callback(msg):await asyncio.gather(*(call(msg) for call in callbacks))runner = SubscriptionRunner()await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)await runner.run()if __name__ == "__main__":import firefire.Fire(main)

先写到这,总体来说,订阅智能体的任务是跑通了,对一些概念和MetaGPT的用法有了认知。后面有时间再详细拆解每一步的实现和细节,以及过程中遇到的坑及解决方法。还有爬虫的代码如何用GPT写,微信推送如何做出来的,邮箱推送如何做出来的,定时器的简单探索等,这些本课用到的相关能力也要进一步拆解和学习总结。