Python中日志异步发送到远程服务器

简介: 在Python中使用日志最常用的方式就是在控制台和文件中输出日志了,logging模块也很好的提供的相应 的类,使用起来也非常方便,但是有时我们可能会有一些需求,如还需要将日志发送到远端,或者直接写入数 据库,这种需求该如何实现呢?

背景

在Python中使用日志最常用的方式就是在控制台和文件中输出日志了,logging模块也很好的提供的相应 的类,使用起来也非常方便,但是有时我们可能会有一些需求,如还需要将日志发送到远端,或者直接写入数 据库,这种需求该如何实现呢?

StreamHandler和FileHandler

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
 File Name:   loger
 Description :
 Author :    yangyanxing
 date:     2020/9/23
-------------------------------------------------
"""
import logging
import sys
import os
# 初始化logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# 设置日志格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# 添加cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
# 添加文件的handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 将cmd和file handler添加到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天气不错")

先初始化一个logger, 并且设置它的日志级别是DEBUG,然后添初始化了 cmd_handler和 file_handler,最后将它们添加到logger中, 运行脚本,会在cmd中打印出

[2020-09-23 10:45:56] [DEBUG] 今天天气不错

添加HTTPHandler

# 添加一个httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")

结果在服务端我们收到了很多信息

{
'name': [b 'yyx'],
'msg': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
'funcName': [b '<module>'],
'created': [b '1600831054.8881223'],
'msecs': [b '888.1223201751709'],
'relativeCreated': [b '22.99976348876953'],
'thread': [b '14876'],
'threadName': [b 'MainThread'],
'processName': [b 'MainProcess'],
'process': [b '8648'],
'message': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'asctime': [b '2020-09-23 11:17:34']
}

可以说是信息非常之多,但是却并不是我们想要的样子,我们只是想要类似于

[2020-09-23 10:45:56][DEBUG] 今天天气不错

logging.handlers.HTTPHandler 只是简单的将日志所有信息发送给服务端,至于服务端要怎么组织内 容是由服务端来完成. 所以我们可以有两种方法,一种是改服务端代码,根据传过来的日志信息重新组织一 下日志内容, 第二种是我们重新写一个类,让它在发送的时候将重新格式化日志内容发送到服务端。

我们采用第二种方法,因为这种方法比较灵活, 服务端只是用于记录,发送什么内容应该是由客户端来决定。

我们需要重新定义一个类,我们可以参考 logging.handlers.HTTPHandler 这个类,重新写一个httpHandler类

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    '''
   重写emit方法,这里主要是为了把初始化时的baseParam添加进来
   :param record:
   :return:
   '''
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={'log': msg}, headers=headers,
timeout=1)

这行代码表示,将会根据日志对象设置的格式返回对应的内容。

{'log': [b'[2020-09-23 11:39:45] [DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99']}

将bytes类型转一下就得到了

[2020-09-23 11:43:50] [DEBUG] 今天天气不错

异步的发送远程日志

async def post(self):
  print(self.getParam('log'))
  await asyncio.sleep(5)
  self.write({"msg": 'ok'})

此时我们再打印上面的日志

logger.debug("今天天气不错")
logger.debug("是风和日丽的")

得到的输出为

[2020-09-23 11:47:33] [DEBUG] 今天天气不错
[2020-09-23 11:47:38] [DEBUG] 是风和日丽的

那么现在问题来了,原本只是一个记录日志,现在却成了拖累整个脚本的累赘,所以我们需要异步的来 处理远程写日志。

1

使用多线程处理

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{'log': msg},

2

使用线程池处理

python 的 concurrent.futures 中有ThreadPoolExecutor, ProcessPoolExecutor类,是线程池和进程池, 就是在初始化的时候先定义几个线程,之后让这些线程来处理相应的函数,这样不用每次都需要新创建线程

exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程
exector.submit(fn, args, kwargs) # 将函数submit到线程池中
exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={'log': msg},
headers=headers, timeout=6)

3

使用异步aiohttp库来发送请求

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={'log': msg}) as resp:
          print(await resp.text())

这时代码执行崩溃了

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine
'CustomHandler.emit' was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

究其原因是由于emit方法中使用 async with session.post 函数,它需要在一个使用async 修饰的函数 里执行,所以修改emit函数,使用async来修饰,这里emit函数变成了异步的函数, 返回的是一个 coroutine 对象,要想执行coroutine对象,需要使用await, 但是脚本里却没有在哪里调用 await emit() ,所以崩溃信息 中显示 coroutine 'CustomHandler.emit' was never awaited。

async def main():
  await logger.debug("今天天气不错")
  await logger.debug("是风和日丽的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

执行依然报错

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方。

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("执行结束, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()

注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上, 然后调用该循环的 run_forever() 函数,从而使该循环上的协程对象得以正常的执行。

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
执行结束, task n is 0

可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了如果不执行 loop.run_forever() 函数,则注册在它上面的协程也不会执行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()
loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # 使用aiohttp封装发送数据函数
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
# 添加一个httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
logger.debug("是风和日丽的")
loop.run_forever()

loop.create_task(self.submit(msg)) 也可以使用

asyncio.ensure_future(self.submit(msg), loop=loop) 来代替,目的都是将协程对象注册到事件循环中。

但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用 loop.stop() 方法. 可以注册到某个task的回调中。

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
5月前
|
人工智能 JavaScript API
零基础构建MCP服务器:TypeScript/Python双语言实战指南
作为一名深耕技术领域多年的博主摘星,我深刻感受到了MCP(Model Context Protocol)协议在AI生态系统中的革命性意义。MCP作为Anthropic推出的开放标准,正在重新定义AI应用与外部系统的交互方式,它不仅解决了传统API集成的复杂性问题,更为开发者提供了一个统一、安全、高效的连接框架。在过去几个月的实践中,我发现许多开发者对MCP的概念理解透彻,但在实际动手构建MCP服务器时却遇到了各种技术壁垒。从环境配置的细节问题到SDK API的深度理解,从第一个Hello World程序的调试到生产环境的部署优化,每一个环节都可能成为初学者的绊脚石。因此,我决定撰写这篇全面的实
1156 67
零基础构建MCP服务器:TypeScript/Python双语言实战指南
|
2月前
|
监控 安全 程序员
Python日志模块配置:从print到logging的优雅升级指南
从 `print` 到 `logging` 是 Python 开发的必经之路。`print` 调试简单却难维护,日志混乱、无法分级、缺乏上下文;而 `logging` 支持级别控制、多输出、结构化记录,助力项目可维护性升级。本文详解痛点、优势、迁移方案与最佳实践,助你构建专业日志系统,让程序“有记忆”。
273 0
|
5月前
|
数据采集 存储 JSON
Python爬取知乎评论:多线程与异步爬虫的性能优化
Python爬取知乎评论:多线程与异步爬虫的性能优化
|
5月前
|
数据采集 存储 C++
Python异步爬虫(aiohttp)加速微信公众号图片下载
Python异步爬虫(aiohttp)加速微信公众号图片下载
|
3月前
|
运维 监控 安全
EventLog Analyzer:高效的Web服务器日志监控与审计解决方案
ManageEngine EventLog Analyzer是一款企业级Web服务器日志监控与审计工具,支持Apache、IIS、Nginx等主流服务器,实现日志集中管理、实时威胁检测、合规报表生成及可视化分析,助力企业应对安全攻击与合规挑战,提升运维效率。
246 0
|
4月前
|
人工智能 自然语言处理 安全
Python构建MCP服务器:从工具封装到AI集成的全流程实践
MCP协议为AI提供标准化工具调用接口,助力模型高效操作现实世界。
896 1
|
5月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
5月前
|
安全 Linux 网络安全
Python极速搭建局域网文件共享服务器:一行命令实现HTTPS安全传输
本文介绍如何利用Python的http.server模块,通过一行命令快速搭建支持HTTPS的安全文件下载服务器,无需第三方工具,3分钟部署,保障局域网文件共享的隐私与安全。
1140 0
|
7月前
|
人工智能 安全 Shell
Jupyter MCP服务器部署实战:AI模型与Python环境无缝集成教程
Jupyter MCP服务器基于模型上下文协议(MCP),实现大型语言模型与Jupyter环境的无缝集成。它通过标准化接口,让AI模型安全访问和操作Jupyter核心组件,如内核、文件系统和终端。本文深入解析其技术架构、功能特性及部署方法。MCP服务器解决了传统AI模型缺乏实时上下文感知的问题,支持代码执行、变量状态获取、文件管理等功能,提升编程效率。同时,严格的权限控制确保了安全性。作为智能化交互工具,Jupyter MCP为动态计算环境与AI模型之间搭建了高效桥梁。
549 2
Jupyter MCP服务器部署实战:AI模型与Python环境无缝集成教程

推荐镜像

更多