利用函数计算流式 gz 打包 ECS 上的单个 超大文件

简介: 在某些业务场景下,生成超大的日志文件或者其他文件, 这些文件需要及时移出并 gz 压缩保存到 OSS,但是压缩文件可能会大于 3G 超出函数计算执行环境的最大内存限制, 本文提供流式解决这个问题的方案

背景

在某些业务场景下,生成超大的日志文件或者其他文件, 这些文件需要及时移出并 gz 压缩保存到 OSS,但是压缩文件可能会大于 3G 超出函数计算执行环境的最大内存限制, 本文提供流式解决这个问题的方案

  • 函数计算配置VPC, 内网打通ecs
  • OSS 和 函数计算在相同 region, 内网传输

示例代码

依赖使用第三方库 paramiko, 但是默认的库在传输大文件上有传输速率限制, 需要做如下改造, 同时构造 paramiko.SFTPClient 的时候需要设置好 window_size 和 max_packet_size 这两个参数

image

import paramiko
import gzip
import oss2
import logging
import os
import time

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.22.3"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

def handler(event, context):
    start = time.time()
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    window_size = 2 ** 28
    max_packet_size = 2 ** 26
    sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)
    
    pos = 0
    CHUNK_SIZE = 1024*1024
    APPEND_SIZE = 128
     
    LOG_FILE = "test-1.log"
    DST_OBJ = 'dst/' + LOG_FILE + '.gz'
    
    data_out = []
    with sftp.open("/root/" + LOG_FILE, "r") as f:
        while 1:
            data=f.read(CHUNK_SIZE)      
            if not data:
                if len(data_out) > 0 and len(data_out) < APPEND_SIZE:
                  result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
                break    
            data_out.append(gzip.compress(data))
            if len(data_out) == APPEND_SIZE:
                upload_data = b"".join(data_out)
                result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
                pos += len(upload_data)
                data_out = []
    
    print("total time = ", time.time() - start)
    return "OK"

这个方案, 虽然解决了函数计算内存限制, 但是对于某些超大文件, 比如15G 以上的文件, 10分钟的时间限制又是一个limit

ImproveMent

OSS 支持分片上传功能,那基于分片上传,配合 FC 的弹性伸缩功能, 可以有如下方案:

image

示例代码:

master:

# -*- coding: utf-8 -*-
import logging
import fc2
import oss2
import paramiko
import math
import json
import time
from threading import Thread

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

SRC_LOG_FILE =  "/root/huge.log"
DST_FILE = "jiahe/push_log_50000_5G.gz"
BUCKET_NAME = "oss-demo"

ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

SUB_LOG = 5*1024*1024*1024  # 交给每个work函数处理的日志大小
PART_SIZE = 16 * 1024 * 1024  # work 函数上传分片的大小, 和work函数大小必须相同

SERVICE_NMAE = "fc_demo"    # work函数所在的service, 最好在同一个service
SUB_FUNCTION_NAME = "worker"  # work函数的名字

parts = []
def sub_gz(fcClient, subevent):
    content=fcClient.invoke_function(SERVICE_NMAE, SUB_FUNCTION_NAME, json.dumps(subevent)).data
    global parts
    #print(content)
    parts.extend(json.loads(content))

def handler(event, context):
    #start = time.time()
    global parts
    parts = []
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    sftp=paramiko.SFTPClient.from_transport(scp)
    info = sftp.stat(SRC_LOG_FILE)
    total_size = info.st_size
    print(total_size)
    sftp.close()

    endpoint = "http://{0}.{1}-internal.fc.aliyuncs.com".format(context.account_id, context.region)
    fcClient = fc2.Client(endpoint=endpoint,accessKeyID=creds.accessKeyId,accessKeySecret=creds.accessKeySecret, securityToken=creds.securityToken,Timeout=900)

    threadNum = int(math.ceil(float(total_size)/SUB_LOG))
    key = DST_FILE
    upload_id = bucket.init_multipart_upload(key).upload_id

    part_step = int(math.ceil(float(SUB_LOG)/PART_SIZE))
    ts = []
    left_size = total_size
    for i in range(threadNum):
        part_start = part_step * i + 1
        size = SUB_LOG if SUB_LOG < left_size else left_size
        subEvt = {
          "src": SRC_LOG_FILE,
          "dst": DST_FILE,
          "offset": i * SUB_LOG,
          "size": size,
          "part_number":  part_start, 
          "upload_id" : upload_id,
        }
        print(i, subEvt)
        t = Thread(target=sub_gz, args=(fcClient, subEvt,))
        left_size = left_size - SUB_LOG
        t.start()
        ts.append(t)

    for t in ts:
        t.join()

    parts.sort(key=lambda k: (k.get('part_number', 0)))
    #print(parts)
    part_objs = []
    for part in parts:
        part_objs.append(oss2.models.PartInfo(part["part_number"], part["etag"], size = part["size"], part_crc = part["part_crc"]))
    bucket.complete_multipart_upload(key, upload_id, part_objs)
    #print(time.time() - start)
    return "ok"

worker:

import paramiko
import gzip
import oss2
import logging
import os
import time
import json

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

CHUNK_SIZE = 1024*1024
PART_SIZE = 16 * CHUNK_SIZE

def handler(event, context):
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    window_size = 2 ** 30
    max_packet_size = 2 ** 28
    sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)

    evt = json.loads(event)
    src_log_file = evt['src']
    offset = evt['offset']
    size = evt['size']
    dst_gz_file = evt['dst']
    part_number = int(evt['part_number'])
    upload_id = evt['upload_id']

    key =  dst_gz_file
    data_out = []
    partsInfo = []

    with sftp.open(src_log_file, "r") as f:
        f.seek(offset)
        while 1:
            data=f.read(CHUNK_SIZE)
            cur = f.tell()
            if not data or (cur > offset + size):
                if data_out:
                    upload_data = b"".join(data_out)
                    size_to_upload = len(upload_data)
                    # 这里有可能出现分片不足100K的情况, 比如你的文件是 15G+1k, 这个时候出现1K漏单的情况或者即使大于100K但是压缩之后小于100K的情况
                    # 对于日志文件, 可以考虑填充点无效的字符在后面
                    if size_to_upload < 100 * 1024:
                        # fill_data=b"\n\n\n\n\n\n\n\n\n\nAliyunFCFill" + os.urandom(102400)
                        fill_data=b"\t\t\t\t\t\t\t\t\t\t\t\n"*1024*1024*5
                        upload_data += gzip.compress(fill_data) # fill_data 压缩后的结果为>100K
                        size_to_upload = len(upload_data)
                    result = bucket.upload_part(key, upload_id, part_number, upload_data)
                    partsInfo.append({
                        "part_number":part_number,
                        "etag":result.etag,
                        "size":size_to_upload,
                        "part_crc":result.crc})
                break
            data_out.append(gzip.compress(data))
            # 16M gz压缩的结果生成一个分片, oss要求一个分片最小为100K(102400), 通常16M压缩后的文件应该大于100K
            if (cur - offset) % PART_SIZE == 0: 
                upload_data = b"".join(data_out)
                size_to_upload = len(upload_data)
                result = bucket.upload_part(key, upload_id, part_number, upload_data)
                partsInfo.append({
                      "part_number":part_number,
                      "etag":result.etag,
                      "size":size_to_upload,
                      "part_crc":result.crc})
                part_number += 1
                data_out = []
    
    return json.dumps(partsInfo)
相关实践学习
通义万相文本绘图与人像美化
本解决方案展示了如何利用自研的通义万相AIGC技术在Web服务中实现先进的图像生成。
7天玩转云服务器
云服务器ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,可降低 IT 成本,提升运维效率。本课程手把手带你了解ECS、掌握基本操作、动手实操快照管理、镜像管理等。了解产品详情:&nbsp;https://www.aliyun.com/product/ecs
目录
相关文章
|
5月前
|
机器学习/深度学习 存储 监控
内部文件审计:企业文件服务器审计对网络安全提升有哪些帮助?
企业文件服务器审计是保障信息安全、确保合规的关键措施。DataSecurity Plus 是由卓豪ManageEngine推出的审计工具,提供全面的文件访问监控、实时异常告警、用户行为分析及合规报告生成功能,助力企业防范数据泄露风险,满足GDPR、等保等多项合规要求,为企业的稳健发展保驾护航。
155 0
|
5月前
|
安全 Linux Shell
使用SCP命令在CentOS 7上向目标服务器传输文件
以上步骤是在CentOS 7系统上使用SCP命令进行文件传输的基础,操作简洁,易于理解。务必在执行命令前确认好各项参数,尤其是目录路径和文件名,以避免不必要的传输错误。
537 17
|
5月前
|
自然语言处理 Unix Linux
解决服务器中Jupyter笔记本的文件名字符编码问题
通过上述步骤,可以有效解决Jupyter笔记本的文件名字符编码问题,确保所有文件能在服务器上正常访问并交互,避免因编码问题引起的混淆和数据丢失。在处理任何编码问题时,务必谨慎并确保备份,因为文件名变更是
201 17
|
5月前
|
安全 Linux 网络安全
Python极速搭建局域网文件共享服务器:一行命令实现HTTPS安全传输
本文介绍如何利用Python的http.server模块,通过一行命令快速搭建支持HTTPS的安全文件下载服务器,无需第三方工具,3分钟部署,保障局域网文件共享的隐私与安全。
1081 0
|
8月前
|
Python
使用Python实现multipart/form-data文件接收的http服务器
至此,使用Python实现一个可以接收 'multipart/form-data' 文件的HTTP服务器的步骤就讲解完毕了。希望通过我的讲解,你可以更好地理解其中的逻辑,另外,你也可以尝试在实际项目中运用这方面的知识。
383 69
|
Java
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
248 9
|
9月前
|
JavaScript 应用服务中间件 nginx
Vue项目部署:如何打包并上传至服务器进行部署?
以上就是Vue项目打包及部署的方法,希望对你有所帮助。描述中可能会有一些小疏漏,但基本流程应该没有问题。记住要根据你的实际情况调整对应的目录路径和服务器IP地址等信息。此外,实际操作时可能会遇到各种问题,解决问题的能力是每一位开发者必备的技能。祝你部署顺利!
1892 17
|
9月前
|
前端开发 Cloud Native Java
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
|
9月前
|
存储 关系型数据库 分布式数据库
|
11月前
|
存储 安全 网络安全
服务器感染了.baxia勒索病毒,如何确保数据文件完整恢复?
近年来,勒索病毒如.baxia不断演变,利用漏洞、社交工程等手段加密文件,威胁范围扩大。加密货币的兴起使其支付方式更匿名,追踪困难。技术支持尤为重要,添加技术服务号(shuju315),专业团队提供数据恢复方案。面对复杂解密要求,包括赎金支付、个人信息提供和执行特定操作,需保持冷静并寻求帮助。防御措施包括加强安全意识、定期备份数据、安装杀毒软件、避免未知文件、更新系统及制定应急响应计划。
482 11

热门文章

最新文章

相关产品

  • 函数计算