社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

Python 轻量级日志解决方案,告警日志主动发送飞书消息

Python猫 • 2 年前 • 338 次点击  

△点击上方“Python猫”关注 ,回复“1”领取电子书

作者:二两

来源:懒编程

前言

日志,几乎每个程序都需要有的功能,对于很多比较大型的,多人合作的程序,使用专业的日志解决方案,比如 fluentd,是个不错的选择。

fluentd 就是重了点,你需要搭建 fluentd 服务,然后不同的应用再通过相应的方式将日志信息传导 fluentd 服务中,当然重的好处是强大,它可以兼容多个语言,只有你的 client 实现好就行,还可以在日志传输管道中加入各种 hook,比如某个带关键字的日志要执行某种操作等等。

我的程序比较轻,之前都是运维同学搭建好了 fluentd+ES 一套日志管理系统,现在要自己弄,有点麻烦,所以决定使用其他方式来实现日志的管理。

先列一下我简单的需求:

  • 1. 日志可以存入文件(最基本要求)
  • 2. 日志可以存入 MongoDB(方便搜索分析)
  • 3. 报警日志可以主动告警(方便我及时修复)

日志存文件

首先来实现前两个功能,利用 Python 自带的 logging 便可以实现将日志内容存入文件的功能,代码如下:

import time
import logging
import logging.handlers

LOG_FILENAME = 'main.log'
logger = logging.getLogger()


def set_logger():
    logger.setLevel(logging.INFO)

    formatter = logging.Formatter('%(asctime)s - %(process)d-%(threadName)s - '
                                  '%(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    # log output to file
    file_handler = logging.handlers.RotatingFileHandler(
        LOG_FILENAME, maxBytes=10485760, backupCount=5, encoding="utf-8")
    logger.addHandler(file_handler)


set_logger()

logging 模块标准的写法,利用 logging 的 handler 功能实现格式化,同样利用 handler 功能,将日志存入到本地文件中。

日志存 MongoDB

使用 log4mongo 库,可以让你的 logging 无缝将日志存入到 MongoDB 中,log4mongo 提供了符合 logging 调用格式的 Handler,直接使用则可,代码如下:

import time
import logging
import logging.handlers
from log4mongo.handlers import MongoHandler
from logging import *

LOG_FILENAME = 'main.log'
logger = logging.getLogger()


def set_logger(mongodb=False):
    logger.setLevel(logging.INFO)

    formatter = logging.Formatter('%(asctime)s - %(process)d-%(threadName)s - '
                                  '%(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    if mongodb:
        # log output to mongodb
        db_name = 'bestpitcher_log'
        mon_handler = MongoHandler(host=mongodb_config['host'],
                                   port=int(mongodb_config['port']),
                                   database_name=db_name,
                                   # username=mongodb_config['user'],
                                   # password=mongodb_config['password'],
                                   # authentication_db=db_name
                                   )
        mon_handler.setLevel(logging.INFO)
        logger.addHandler(mon_handler)
    else:
        # log output to file
        file_handler = logging.handlers.RotatingFileHandler(
            LOG_FILENAME, maxBytes=10485760, backupCount=5, encoding="utf-8")
        logger.addHandler(file_handler)


set_logger(mongodb=True)

实例化 MongoHandler,获得相应的 handler,然后添加到 logger 中,便实现了将日志写入 MongoDB 的效果,如下图:

日志报警

项目代码在阿里云上运行,阿里云提供了对日志文件进行监控并通过钉钉报警的功能,跟其他组同事交流,他不希望不是自己组里的项目也使用他这套,会显得很乱,至于会不会乱,不纠结,既然人家不想我这样搞,那就自己搞。

简单调用,使用飞书的 WebHook 机器人可以非常轻松的实现日志推送报警的功能。

在开始编写前,要理清飞书机器人的概念,飞书中其实有两种机器人,如果你通过【飞书机器人】去搜索,就会有点懵。

飞书中,每个群组可以设置一个 WebHook 机器人,这个使用个人版飞书便可以直接使用,非常方便,我们的日志监控就利用 WebHook 机器人,其添加方式如下:

1. 先创建一个群,然后点击设置,然后点击【群机器人】,然后点击【添加机器人】


2. 简单配置 WebHook 机器人

从配置就可以看出,WebHook 机器人的工作原理,通过 HTTP 请求机器人的 webhook 地址,请求数据的格式符合 webhook 文档定义的格式变可以请求成功了。

为了安全,我这里还开启了【签名校验】,即如果你通过中间人攻击抓我的包,包中的内容是加密的,而我的后端程序会使用这个签名校验秘钥对加密内容进行解析,获得真实数据,与 WebHook 交互代码如下:

import base64
import hashlib
import hmac
from datetime import datetime

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util import Retry
import hashlib
import base64
from Crypto.Cipher import AES


from configs import *

timestamp = int(datetime.now().timestamp())


class AESCipher(object):
    def __init__(self, key):
        self.bs = AES.block_size
        self.key = hashlib.sha256(AESCipher.str_to_bytes(key)).digest()

    @staticmethod
    def str_to_bytes(data):
        u_type = type(b"".decode('utf8'))
        if isinstance(data, u_type):
            return data.encode('utf8')
        return data

    @staticmethod
    def _unpad(s):
        return s[:-ord(s[len(s) - 1:])]

    def decrypt(self, enc):
        iv = enc[:AES.block_size]
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        return self._unpad(cipher.decrypt(enc[AES.block_size:]))

    def decrypt_string(self, enc):
        enc = base64.b64decode(enc)
        return self.decrypt(enc).decode('utf8')


class BaseBot:

    def __init__(self):
        self.session = requests.Session()
        # 设置重试
        self.session.mount('https://', HTTPAdapter(
            max_retries=Retry(total=5, method_whitelist=frozenset(['GET''POST']))))

    def  gen_sign(self, secret):
        # 拼接时间戳以及签名校验
        string_to_sign = '{}\n{}'.format(timestamp, secret)

        # 使用 HMAC-SHA256 进行加密
        hmac_code = hmac.new(
            string_to_sign.encode("utf-8"), digestmod=hashlib.sha256
        ).digest()

        # 对结果进行 base64 编码
        sign = base64.b64encode(hmac_code).decode('utf-8')

        return sign


class BaseMsgBot(BaseBot):
    def __init__(self):
        super(BaseMsgBot, self).__init__()

    def send_base_msg(self, msg):
        """
        发送基本的信息
        :return:
        """

        sign = self.gen_sign(WEBHOOK_SECRET)
        params = {
            "timestamp": timestamp,
            "sign": sign,
            "msg_type""text",
            "content": {"text": msg}
        }

        resp = requests.post(WEBHOOK_URL, json=params)
        resp.raise_for_status()
        result = resp.json()
        if result.get("code"and result.get("code") != 0:
            print(f"发送失败:{result['msg']}")
            return
        print("消息发送成功")


if __name__ == '__main__':
    BaseMsgBot().send_base_msg('懒编程YYDS!')

效果如下:

WebHook 机器人是不与我们的后端程序交互的,即无法实现,我发一段指令给他,他执行相应动作这样的效果,但对于单纯的日志监控,WebHook 够用了。

飞书中另外一种机器人是需要通过创建机器人应用的方式创建,这种机器人不在群组里,而是在工作台中,比如下图我创建了自己的应用机器人。


要创建这种机器人,需要企业版飞书,因为机器人获取消息、发送消息的功能需要申请相应的权限,当然,还有国内惯例,通过 APPSECRET 换取 2 小时后会过期的 access_token,这个我也弄了,因为我喜欢通过飞书机器人控制程序的一下动作,比如从日志机器人中发现了严重报错,但日志机器人无法控制程序,而我人在外面,此时可以通过应用机器人执行一些动作。

飞书 WebHook 机器人对接完了,那怎么与 logging 结合在一起使用呢?

因为我已有的项目中已经大量的使用 logger 了,我不希望去逐行修改使用 logger 的方式,而是希望通过某种对 logger 无感的方式来实现日志传递到 WebHook 的效果。

简单阅读 logging 文档,发现没有 Hook 机制,没办法,只能看 logging 源码走继承重写的解决方案了。

这里可以总结一下我对库修改的方式,如果一个库,没有我想要的功能(通过文档判断),我就会去看它的源代码,然后尝试将核心类通过继承的方式弄出来,然后再在继承出的子类中添加自己的逻辑。

简单分析,会发现 logger 下,使用的 info、warning、error 等方法,都会调用_log 方法,_log 方法会进一步执行相应的动作,这些动作我不关心,因为我会通过 super 方法直接使用。

琢磨一下自己的需求,对于 info 基本的日志,当然不需要日志报警,简单记录到 MongoDB 中就好了,对于 error 级别日志,报错了嘛,当然希望主动告诉我,但有时 info 基本,我也希望它主动告诉我,基于上述分析,写出如下代码:

import logging
import logging.handlers
from logging import *

LOG_FILENAME = 'main.log'
LOG_LEVEL = ERROR


class MyLogger(Logger):

    def __init__(self, name, level=NOTSET):
        super(MyLogger, self).__init__(name=name, level=level)

    def _log(
            self,
            level,
            msg,
            args,
            exc_info=None,
            extra=None,
            stack_info=False,
            robot=False
    )
 -> None:

        """

        :param level:
        :param msg:
        :param args:
        :param exc_info:
        :param extra:
        :param stack_info:
        :param robot: 是否要通过飞书机器人将日志发送到飞书上
        :return:
        """

        super(MyLogger, self)._log(level, msg, args, exc_info, extra, stack_info)
        if robot or level >= LOG_LEVEL:
            msg_bot.send_base_msg(msg)

    def __reduce__(self):
        return getLogger, ()


logger = MyLogger('bestpitcher_log', WARNING)

上述代码中,实现 MyLogger 基础于 Logger,然后重写其中的_log 方法,_log 方法中,第一件事便是通过 super 调用父类中_log 方法的逻辑,然后再添加自己的逻辑,即发送信息到飞书 webhook 的逻辑。

_log 方法中,我添加了 robot 参数,如果打印日志时,设置了 robot,则发送到 webhook,此外还有默认日志级别,这里是 ERROR 级别,即 error 日志,就算 robot 为 False,也会主动发送日志到 webhook 中。

简单测试使用一下:

from logger import logger

def test_logger():
    logger.info('[info] 这条日志只会记录在MongoDB中')
    # exc_info 获得报错时的调用链
    logger.error('[error] 这条日志会发送到WebHook机器人上', exc_info=True)
    logger.warning('[warning] 这条日志也会发送到WebHook', robot=True)


if __name__ == '__main__':
    test_logger()

WebHook 效果:

MongoDB 效果:

结尾

项目 Github 地址:https://github.com/ayuLiao/simple-logger
Python猫技术交流群开放啦!群里既有国内一二线大厂在职员工,也有国内外高校在读学生,既有十多年码龄的编程老鸟,也有中小学刚刚入门的新人,学习氛围良好!想入群的同学,请在公号内回复『交流群』,获取猫哥的微信(谢绝广告党,非诚勿扰!)~


还不过瘾?试试它们




Python 的元类设计起源自哪里?

基于 Python 探针完成调用库的数据提取

与 Python 之父聊天:更快的 Python!

2021年,你应该知道的Python打包指南

Python 为什么要有 pass 语句?

编程语言之问:何时该借用,何时该创造?


如果你觉得本文有帮助
请慷慨分享点赞,感谢啦
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/125607
 
338 次点击