工作中会经常遇到监控告警相关问题,监控和告警的目的是要在事中及时发现问题并定位系统问题,那么当系统或平台出现问题了,如何及时暴露这些问题给对应的项目开发人员呢?
本文记录了在Python项目中利用飞书的自定义机器人webhook向飞书群推送文本及图片消息~
飞书机器人自动发送群消息
- 1. 飞书群添加飞书机器人
- 2. 通过webhook发送群消息
- 2.1 发送文本消息
- 2.2 发送富文本消息
- 2.3 发送图片消息
1. 飞书群添加飞书机器人
飞书群-设置-群机器人-添加机器人-自定义机器人;
复制webhook地址,webhook地址就是用来发送消息的接口;
2. 通过webhook发送群消息
参考:飞书官方开发文档
2.1 发送文本消息
注:如想要@具体成员,则需要获取对方的user_id,可参考:获取user_id
class FeishuTalk: # 机器人webhook chatGPT_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/XXX' # 发送文本消息 def sendTextmessage(self, content): url = self.chatGPT_url headers = { "Content-Type": "application/json; charset=utf-8", } payload_message = { "msg_type": "text", "content": { # @ 单个用户 名字 "text": content + "test" # @ 所有人 所有人 # "text": content + "test" } } response = requests.post(url=url, data=json.dumps(payload_message), headers=headers) return response.json # 执行发送文本消息 content = "生活不止眼前的苟且,还有诗和远方!" FeishuTalk().sendTextmessage(content)
2.2 发送富文本消息
富文本可以在一条消息中同时支持文字、At、图片、超链接等元素,可参考:富文本消息
对于需要换行展示的消息,可通过发送 富文本消息 进行实现,比如发送一个榜单消息
# 发送富文本消息 def sendFuTextmessage(self, content): url = self.chatGPT_url headers = { "Content-Type": "application/json; charset=utf-8", } payload_message = { "msg_type": "post", "content": { "post": { "zh_cn": { "title": "微博头条热榜", "content": content } } } } response = requests.post(url=url, data=json.dumps(payload_message), headers=headers) return response# 微博头条榜单 def getHotBand(self): url = "https://www.weibo.com/ajax/statuses/hot_band" headers = { "cookie": "XSRF-TOKEN=iuIb9M_gQ8D4FjMwUthqcink; SUB=_2AkMUpJdaf8NxqwJRmPEVz2Pib4V_zwrEieKi-GaBJRMxHRl-yT92qhALtRB6PyS5tbPLRbsCo0gfSwhlb8PLq3CnqnuA; SUBP=0033WrSXqPxfM72-Ws9jqgMF55529P9D9WFyRDSrne6a4e.bkQHJzd-.; WBPSESS=IawIaCISeX-46VmeRocrJ40RrQZ7YTNxKt6pB9xuTQ-WP-uhwIvsoHpBEQfU2CGlyGf32loDQLI6ykRbGvzNf_mvmCuvfYLwUPDbYHJizUdUKfKplkCi6sPas7wrz6ACVGt8HOr-w8hjNGpZtkeUtLcl0-BFnXMuSPDMToH7QlI=", "x-xsrf-token": "iuIb9M_gQ8D4FjMwUthqcink" } response = requests.get(url=url, headers=headers).json() bandList_all = [] index = 1 for item in response['data']['band_list']: bandDict = {"tag": "text"} bandList = [] bandDict.update({"text": "No." + str(index) + ":" + item['word']}) bandList.append(bandDict) index += 1 bandList_all.append(bandList) return bandList_all# 发送富文本消息content = FeishuTalk().getHotBand()FeishuTalk().sendFuTextmessage(content)
# 注:富文本消息的content消息体格式如下 [ [ { "tag": "text", "text": "第一行 :" } ], [ { "tag": "text", "text": "第二行:" } ]]
2.3 发送图片消息
发送图片要分为两步:第一步通过上传图片接口获取image_key,第二步将上传的图片发送到群。可参考:发送图片
登录【飞书开放平台-开发者后台】,查看app_id与app_secret,通过接口拿到 tenant_access_token,供上传图片接口使用。可参考:
注:需要开通上传图片权限,入口:开发者后台-权限管理-检索【获取与上传图片或文件资源】;
上传图片获取 image_key ,可参考:上传图片获取image_key
class FeishuTalk: # 应用凭证,获取方式查看上面的步骤 app_id = "cli_a37c6ffbdxxxxxxx" app_secret = "mLstZkv0C4d1sxxxxxxxxxxxxxxx" # 机器人webhook chatGPT_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/XXX' def __init__(self): # 获取tenant_access_token,供上传图片接口使用 url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" headers = { "Content-Type": "application/json; charset=utf-8", } payload_data = { "app_id": self.app_id, "app_secret": self.app_secret } response = requests.post(url=url, data=json.dumps(payload_data), headers=headers).json() self.token = response['tenant_access_token'] # 发送图片 def uploadImage(self, picturePath): image_key_headers = { 'Authorization': 'Bearer ' + self.token, } # 上传图片获取image_key get_image_key_url = "https://open.feishu.cn/open-apis/im/v1/images" form = {'image_type': 'message', 'image': (open(picturePath, 'rb'))} # 需要替换具体的path multi_form = MultipartEncoder(form) image_key_headers['Content-Type'] = multi_form.content_type response = requests.request("POST", get_image_key_url, headers=image_key_headers, data=multi_form).json() # print(response.headers['X-Tt-Logid']) # for debug or oncall image_key = response['data']['image_key'] print("image_key:", image_key) # 发送图片 url = self.chatGPT_url form = {'msg_type': 'image', 'content': {"image_key": image_key} } headers = { 'Authorization': 'Bearer ' + self.token } response = requests.post(url=url, data=json.dumps(form), headers=headers) return response.json()# 发送图片消息picturePath = "E:\PythonCodes\FeishuTalk\picLibs\1.jpg"FeishuTalk().uploadImage(picturePath)