模拟微信第一篇,nodejs搭建一套高性能分布式的在线文件服务

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
日志服务 SLS,月写入数据量 50GB 1个月
全局流量管理 GTM,标准版 1个月
简介: 模拟微信第一篇,nodejs搭建一套高性能分布式的在线文件服务。深度好文实战好文连载:手把手教对象从零开始,开发一款社交通讯APP

标题:模拟微信第一篇,nodejs搭建一套高性能分布式的在线文件服务。

引言:

1、前言

对象入手了前端开发,为了让她对程序员工作有更深刻理解,准备展示一套前后端兼具的模拟微信开发。微信开发少不了在线文件服务(图片,头像,音乐,视频……),本想省事儿,直接采用FastDFS,或者是腾讯云的对象存储服务。奈何要考虑到从零开始,一步一个脚印的渲染,遂自己动手干吧。

其实用Java,或者golang来实现的话,操作起来更容易。但是呢,还是担心她可能看不懂原生后端语言,毕竟前端开发能看懂的语言就只有JavaScript了,故采用了Nodejs。为了操作的高效性与直观性,不会引入任何web依赖框架(例如:express,egg……),利用NodeJs自带的http模块纯手动实现。

埋个伏笔,建议点个关注,后面还有模拟微信通讯开发的奉献,包括服务端的数据接口,全栈javaScript来实现

2、高性能

成熟的在线文件服务,肯定需要高性能支撑的,每个客户端每时每刻都有文件资源交互的场景。文件传输的数据包要远大于API数据,所以对服务的性能要求较高,不能出现因为下载大文件而导致后续请求阻塞卡顿的场景。

2.1、Nodejs主线程

Node的创始人认为,Web服务器的高并发性能关键点在于事件驱动和异步I/O,并不是线程数的多少。Google发明的Chrome浏览器,使用V8引擎来解析javaScript程序,恰好满足了他的这两点需求,于是他把V8引擎移植到了服务器端,构建了基于服务端运行的JavaScript环境,成就了Nodejs,目的是花最小的硬件成本,追求更高的并发性能。

在大多数web服务模型中,使用的都是多线程来解决并发的问题。像用得最为广泛的Java SpringMVC,每个http请求对应着单独的线程。而每一个客户端连接创建的一个线程,需要耗费1M的内存(jdk默认值),也就是说,理论分析一个8G的服务器可以同时支持连接用户数小于8000,(除了线程创建,还有线程内其他资源开销)

但是Nodejs使用一个主线程处理所有请求,利用异步I/O(也称非阻塞I/O)和事件驱动。省去了创建线程的资源开销,理论分析一个8G的服务器可以同时支持连接用户数为3万~4万。

2.1.1、多线程同步I/O

多线程同步阻塞I_O.jpg

并发请求多线程的特点:

虽然充分利用着硬件多核CPU的优势,但是对线程生命周期的管理和上下文的切换,从微观角度看,也是耗费效率的一环节。

2.1.2、单线程异步I/O

单线程异步非阻塞I_O.jpg

在传统的线程处理机制中,因为同步串行的原因,I/O会阻塞代码的执行;

Nodejs采用的非阻塞I/O,把异步操作丢给了底层的I/O线程池,使得主线程永远在执行计算操作,CPU的核心利用率永远是满状态,主线程通过事件轮询机制与IO线程池交互得到异步数据,当某个I/O执行完成后,会以事件通知的形式执行回调函数

故此,多线程同步I/O的web模型,并不一定比单线程异步I/O的web模型性能好,而且前者对于资源环境的要求很高。

但是Node只是解决了I/O的交互瓶颈,并没有提高I/O速度,只是资源调度的效率提升,要想解决I/O速度,还是应该升级服务器硬件环境,例如把硬盘换成SSD。

2.1.3、局限性

随着硬件资源的更新发展,全面升级为多核CPU,Node单线程模式下的上限就是单个CPU被打满了,无法享受到多核CPU下并发的处理,Nodejs通过提供clusterchild_process API 以牺牲内存,来创建子进程的方式来赋予Node应用程序进一步提升高并发的能力。

2.2、Node Cluster

为了充分发挥多核CPU的优势,Nodejs提供了cluster模块。允许设立一个master进程,和若干个worker进程,由master进程负责监控和协调worker进程的相关状态。各个进程之间,变量和资源引用相对独立。

理想状态下:master数(1) + worker数 = cpu数。充分利用每一核
const cluster = require('cluster');
const cpus = require('os').cpus().length;

if (cluster.isMaster) {
    for (let i = 1; i < cpus; i++) {
        cluster.fork();
    }
    cluster.on('online', function (newWorker) {
        console.log('new worker online: ' + newWorker.id)
    });
} else {
    console.log('worker内部,执行逻辑')
}

如上所示:获取cpu的核数,然后master循环依次fork出worker进程,fork出来的工作进程继续运行当前代码,但是cluster.isMasterfalse,所以会执行到else里面。

因为进程之间的资源不共享,也可以根据业务逻辑,再fork工作进程的时候,为其设置环境变量。

2.2.1、进程通信

worker之间,采用进程间通信交换信息。通信方式有点特殊:

  • worker进程调用send函数,主动发送数据包;
  • master进程会接收到该数据包,然后遍历当前worker数量,广播调用send函数转发原数据包;
  • 每个worker都会接收到该数据包,然后执行相关逻辑;
  • 也可以,在master中,指定具体的worker来发送,由被指定的worker自己接收(当然,这种场景少,基本都是广播);
  • 相当于master是一个调度的中转站,worker之间无法直接通信;

如下所示:

  1. 1号worker主动发送了一条refresh的数据包;
  2. master接收到该数据包后,遍历当前的worker集合,选出3号worker,原样转发给它;
  3. 3号worker接收到数据包后,解析,并且执行对应逻辑;
  4. 相当于3号worker是自己发给自己的,但必须经过master调度;
const cluster = require('cluster');
const cpus = require('os').cpus().length;

if (cluster.isMaster) {
    for (let i = 1; i < cpus; i++) {
        let env = Object.assign({}, process.env, {workerId: i})
        let worker = cluster.fork(env);
        worker.w_id = i
    }
    /**
     * 2、master负责接收通信消息,
     */
    cluster.on('message', (workers, msg) => {
        /**
         * 3、将原消息定向推给3号worker
         */
        for (const id in cluster.workers) {
            if (cluster.workers[id].w_id == 3) {
                cluster.workers[id].send(msg, err => {
                    if (err) {
                        console.log('master send message error')
                    }
                })
            }
        }
    })
}
/**
 * 1、由1号worker进程主动通信
 */
if (process.env.workerId == 1) {
    cluster.worker.send({
        cmd: 'refresh',
        data: {host: '127.0.0.1'}
    }, err => {
        if (err) {
            console.log(`send refresh error, workerId: ${process.env.workerId}`)
        }
    })
}
/**
 * 4、3号worker接收通信消息
 */
if (process.env.workerId == 3) {
    cluster.worker.on('message', msg => {
        if (msg.cmd == 'refresh') {
            console.log(`receive refresh:${JSON.stringify(msg.data)} `)
        }
    })
}
进程之间通信的方式,可以有效解决单节点模式下资源不共享的问题,例如:

1、单机使用内存的限流,需要共享进程之间的请求计数;

2、单机设置IP黑名单的添加与移除,也要实时同步……

2.2.2、负载分发

cluster模块内置一个负载均衡器,默认采用Round-robin算法,由master负责分发,协调各个worker之间的负载。http服务运行时,master负责监听IP/端口,然后将接收到的请求根据负载算法分配给对应worker进程处理,这段话在官方文档写的很清楚了:

如下所示:
Node_master进程分发方式.png

  1. 根据当前机器的CPU核数,fork出CPU核数 - 1的worker进程;
  2. worker进程负责处理http请求;
  3. 这里写法虽然是在worker中,开启了http服务,监听端口,但其实底层实现只有master负责监听IP/端口,然后请求交给worker处理;
  4. 不然的话,这么多进程监听同一个端口,肯定会报错端口已被占用的;
const cluster = require('cluster');
const http = require('http');
const cpus = require('os').cpus().length;

if (cluster.isMaster) {
    for (let i = 1; i < cpus; i++) {
        cluster.fork()
    }
} else {
    http.createServer().on('request', (request, response) => {

        console.log("接收请求");
        console.log(JSON.stringify(request.headers));

        let body = [];
        request.on('data', chunk => {
            body.push(chunk);
        });
        request.on('end', () => {
            console.dir(body.toString());
        });
        response.writeHead(200);
        response.end('OK\n');

    }).listen(8000);
}

3、在线文件服务

一款成熟的在线文件服务,需要提供文件上传,预览,下载的三大功能,从http协议的层面来分析,不管是哪一项功能,都是文件数据报文的传输,没有什么特殊的操作,区别就在于这三者的请求&响应,头信息各不一致。

3.3、文件上传

从本质上来分析,文件上传的主体功能是,客户端获取到文件后,将文件内容以数据包的形式发送给了后端。后端解析数据包,通过I/O流的方式,将文件以原样的名称和格式写入服务器本地硬盘。如何从数据包中提取文件名和文件格式,才是核心所在。

3.3.1、报文抓包

  • 搭建一个简单的http服务,无需提供文件上传的处理功能,只要保证端口能通;
  • postman发起post请求,body类型选择file,造两个file,一个上传简单的文本文件(checkTls.sh),另一个为空;
  • wireshark抓包如下:
POST /upload HTTP/1.1
User-Agent: PostmanRuntime/7.28.4
Accept: */*
Postman-Token: a786fccc-5d6e-4042-9d58-82c41dbbf510
Host: 127.0.0.1:80
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
Content-Type: multipart/form-data; boundary=--------------------------242891718799640360995416
Content-Length: 1174

----------------------------242891718799640360995416
Content-Disposition: form-data; name=""; filename="checkTls.sh"
Content-Type: application/x-sh

#!/bin/bash

# ......https...............
if [ $# -ne 1 ]; then
   echo "................................. ...... /check_ssl.sh www.baidu.com"
else
   #...............host
   host=$1
   #..................
   #end_date=`echo |openssl s_client -servername $host  -connet $host:443 2nssl x509 -noout -dates|awk -F '=' '/notAfter/{print $2}'`
    #....................................
    end_data=`date +%s -d "$(echo |openssl s_client -servername $host  -connect $host:443 2>/dev/null | openssl x509 -noout -dates|awk -F '=' '/notAfter/{print $2}')"`
    #...............
    new_date=$(date +%s)
    #......SSL....................................
    #......SSL..........................................
    days=$(expr $(expr $end_data - $new_date) / 86400)
    echo -e "\033[31m ......$host ...... ......${days}.........  ............... \033[0m"
fi
----------------------------242891718799640360995416
Content-Disposition: form-data; name=""; filename=""


----------------------------242891718799640360995416--
HTTP/1.1 200 OK
Date: Thu, 20 Oct 2022 15:19:11 GMT
Connection: keep-alive
Keep-Alive: timeout=5
Content-Length: 20

{"name":"ikejcwang"}

3.3.2、请求头的设定

Content-Type: multipart/form-data; boundary=--------------------------242891718799640360995416

1、抓包可以看到,即使postman中没有设置Content-Type,文件上传的request.headers['Content-Type']被设置成了multipart/form-data,这一点可以粗暴的记忆,文件上传操作的这个头信息不会变;

2、boundary代表分隔符的内容,设计理念是,post请求的报文体应该是一个完整的form表单,而不会单单只是一个文件,所以为了方便的从请求数据包中区分处理不同的表单项,特地出现这一分割线,本质就是格式化解析报文,重新构造表单内容。

3.3.3、请求体的设定

----------------------------242891718799640360995416
Content-Disposition: form-data; name=""; filename="checkTls.sh"
Content-Type: application/x-sh

#!/bin/bash

# ......https...............
if [ $# -ne 1 ]; then
   echo "................................. ...... /check_ssl.sh www.baidu.com"
else
   #...............host
   host=$1
   #..................
   #end_date=`echo |openssl s_client -servername $host  -connet $host:443 2nssl x509 -noout -dates|awk -F '=' '/notAfter/{print $2}'`
    #....................................
    end_data=`date +%s -d "$(echo |openssl s_client -servername $host  -connect $host:443 2>/dev/null | openssl x509 -noout -dates|awk -F '=' '/notAfter/{print $2}')"`
    #...............
    new_date=$(date +%s)
    #......SSL....................................
    #......SSL..........................................
    days=$(expr $(expr $end_data - $new_date) / 86400)
    echo -e "\033[31m ......$host ...... ......${days}.........  ............... \033[0m"
fi

这一段,是分割符分割出来完整的第一个文件内容,包含内容描述,文件名,内容类型,内容详情,所以需要格式化处理它~

1、Content-Disposition为内容描述字段,name为请求提交数据表单项的字段名,postman这里没填,所以为空,filename为上传文件的文件名;

2、Content-Type含义等同于请求头中的 Content-Type,但是它会具体化文件的格式类型;

3、下面的一个空格之后,才是上传文件的内容详情(可以拿它来做操作),这里我上传的是文本,所以能够被解析出来,要是图片或者其他类型文件,解析的内容就无法辨识了;

4、第二个分割符表示,postman那里创建了两个字段,只不过有一个为空,没有填值,这里也给标记出来了。

3.4、文件预览

3.4.1、响应头的设定

1、判断响应头Content-Type中,设定的类型,浏览器是否可以解析预览,如果可以的话,图片,媒体,文本,PDF……等可以直接在浏览器预览,无需先下载然后打开

以下是预览一个JS文件,设定的Content-Typeapplication/javascript,抓包响应数据如下所示:

HTTP/1.1 200 OK
Content-type: application/javascript;charset=utf-8
Date: Sat, 22 Oct 2022 06:48:29 GMT
Connection: keep-alive
Keep-Alive: timeout=5
Content-Length: 570

/**
 * ............
 */
'use strict';
const os = require('os');
const fs = require('fs');
const settingFile = './etc/setting.json';

run();

function run() {
    process.env.settingsFile = settingFile;
    process.env.settings = fs.readFileSync(settingFile).toString();
    process.workerCount = JSON.parse(process.env.settings).workerCount || Math.max(Math.min(process.env.CPUS_LIMIT || 16, os.cpus().length) - 1, 1);  // .....................
    process.ikeVersion = JSON.parse(process.env.settings).version || new Date().toISOString();
    require('./lib/enging');
}

3.5、文件下载

3.5.1、响应头的设定

1、同样也是判断响应头Content-Type是否是直接下载的类型:application/force-download,是的话,直接将指定链接的文件下载到本地;

2、下载文件的名称,如果响应头有设置Content-Disposition的话,则以其中的filename为主,如果没有该头信息的话,浏览器会根据请求path来最终确定需要指定的文件名称;

以下是下载一个JS文件,设定的Content-Typeapplication/force-download,设定的Content-Dispositionattachment; filename=start.js,抓包响应数据如下所示:

HTTP/1.1 200 OK
Content-Type: application/force-download
Content-Disposition: attachment; filename=start.js
Date: Sat, 22 Oct 2022 06:53:36 GMT
Connection: keep-alive
Keep-Alive: timeout=5
Content-Length: 570

/**
 * ............
 */
'use strict';
const os = require('os');
const fs = require('fs');
const settingFile = './etc/setting.json';

run();

function run() {
    process.env.settingsFile = settingFile;
    process.env.settings = fs.readFileSync(settingFile).toString();
    process.workerCount = JSON.parse(process.env.settings).workerCount || Math.max(Math.min(process.env.CPUS_LIMIT || 16, os.cpus().length) - 1, 1);  // .....................
    process.ikeVersion = JSON.parse(process.env.settings).version || new Date().toISOString();
    require('./lib/enging');
}

4、实现方案

4.1、业务未动,日志先行

虽然是简单的模拟,但也需要参照一款线上服务组件来开发,不管是针对后面的排错,拓展,开源……都是打下了标准的基础,这一点,在任何场景下都屡试不爽。

为了同样兼容Nodecluster多进程高性能的模式,采集日志也是如此,worker进程负责采集日志到本地缓冲区,然后以进程通讯的方式,全部传递给master落盘存储;

大致逻辑如下:

  • 对外提供WriteLog函数,收集日志的类型,日志字段信息,由各个进程调用,采集到的日志信息暂时推送到当前进程的日志队列中:

    1、定时轮询这个队列,清空它,通过进程通信的方式将其发送给master处理;

    2、为了避免高并发下,日志采集汹涌,通信发送的队列太长,每次往队列推送结束后,判断当前队列的长度是否大于配置项日志队列最大长度,如果大于的话,清空它,通过进程通信的方式将其发送给master处理;

  • master进程接收到worker进程发送的日志数据包后落盘处理操作:

    1、将日志解析,先推送到日志缓冲区,并没有进行实时落盘,减少I/O密集操作;

    2、定时轮询缓冲区,15秒一次,清空它,执行落盘计划;

    3、为了避免高并发下日志采集汹涌,缓冲区太大,I/O压力上升;每次推送到日志缓冲区后,判断当前的缓冲区长度是否大于配置项日志最大缓冲长度,如果大于的话,清空它,并且执行落盘操作;

    4、落盘操作,是否小时分割,是否需要创建新文件,内容追加……

  • 通过对外函数,提供主动配置参数(同时设置默认配置):

    日志最大缓冲长度;(日志落盘存储不是实时,为了避开I/O太密集的操作,通过一个缓冲区来接收);

    日志队列最大长度(worker采集的日志并不是实时传递给master,而是有一个队列来存储,定时传递);

    日志存储路径;

    是否启用小时分割;

    日志保存天数(定时清除历史日志);

    禁止日志采集的类型(数组);

  • 定时任务清除历史日志,防止磁盘打满,1小时轮询一次,默认保存时间30天。
这段代码,可以独立于项目存在,可以在任意场景都直接引入它;
/**
 * 日志采集
 */
'use strict'
const fs = require('fs');
const cluster = require('cluster');
const path = require('path');

const logCache = {};  // 异步写入的日志缓存,主线程控制
const lineCaches = {};  // 线程之间的日志缓存,会put到logCache

const logPrefixFormat = 'yyyyMMdd';
const cleanTime = 3600000;  // 清理日志的周期,毫秒粒度,1小时
const flushTime = 15000;    // 刷新日志的周期,毫秒粒度,15秒

let maxLogBufferSize = 8 * 1024 * 1024; // 日志最大缓冲长度
let maxLineCacheSize = 16;  // 行缓存的最大长度
let useHourSuffix = false;    // 是否启用小时做后缀分割
let keepDays = 30;    // 日志保存的天数
let logPath = path.resolve(__dirname);    // 日志路径
let disableCategory = {};   // 禁止采集的日志类型
let maxColumnSize = 4000;   // 行的最大长度

let META_CHARS = {
    '\b': '\\b',
    '\n': '\\n',
    '\f': '\\f',
    '\r': '\\r',
    '"': '""'
};
let escapable = /[\b\n\f\r"]/g;

function escapeChar(char) {
    return META_CHARS[char];
}

/**
 * 添加日志到缓存中
 * @param category
 * @param line
 */
function putLine(category, line) {
    let cat = logCache[category];
    if (!cat) {
        cat = logCache[category] = [];
        cat.charCount = 0;
    }
    cat.push(line);
    cat.charCount += line.length;
    if (cat.charCount >= maxLogBufferSize) {
        console.log('putLine exceed max log buffer size', maxLogBufferSize, 'flushing....');
        exports.FlushLog(category);
    }
}

/**
 * 发送日志到缓存中,主线程直接put到缓存中,worker线程需要通过通信put
 * @param category
 * @param line
 */
function sendLine(category, line) {
    if (cluster.isMaster) {
        putLine(category, line);
    } else {
        cluster.worker.send({
            cmd: 'WriteLog',
            category: category,
            line: line
        }, err => {
            if (err) {
                console.error('Write Log Error: ' + err.toString())
            }
        })
    }
}

/**
 * 递归创建目录
 * @param dir
 */
function mkdirp(dir) {
    let dirNames = dir.split(path.sep);
    let base = dirNames[0];
    if (dir.indexOf('/') == 0) {
        base = base + '/';
    }
    for (let i = 1; i < dirNames.length; i++) {
        base = path.join(base, dirNames[i]);
        !fs.existsSync(base) && fs.mkdirSync(base);
    }
}

function flushLog(targetCategory) {
    if (!cluster.isMaster) {
        return;
    }
    if (!logPath) {
        logPath = path.resolve('log');
    }
    mkdirp(logPath);

    /**
     * 刷新缓存,写日志的步骤
     * @param category
     */
    let checkCategory = category => {
        let toWrite = logCache[category];
        if (toWrite) {
            delete logCache[category];
            let now = new Date();
            let file = logPath + '/' + now.Format(logPrefixFormat) + '.' + category + (useHourSuffix ? '_' + now.getHours() : '') + '.csv';
            let s = fs.createWriteStream(file, {flags: 'a'});
            s.on('error', err => {
                console.error('Flush Log Error: ' + err.message);
            });
            toWrite.forEach(block => {
                s.write(block);
                s.write('\n');
            });
            toWrite.length = 0;
            s.end();
        }
    }
    if (targetCategory) {
        checkCategory(targetCategory);
    } else {
        for (let category in logCache) {
            checkCategory(category);
        }
    }
}

/**
 * 周期执行函数,发送日志
 */
setInterval(async () => {
    let needDelete = [];
    let now = Date.now();
    for (let category in lineCaches) {
        let cat = lineCaches[category];
        if (cat.firstLineTime < (now - 1000)) {
            sendLine(category, cat.join('\n'));
            needDelete.push(category);
        }
    }
    for (let category of needDelete) {
        delete lineCaches[category];
    }
}, 1000);

/**
 * 主线程的工作:
 * 1、定期删除日志文件;
 * 2、定时刷新日志到文件;
 * 3、接收工作线程发来的日志消息并put到缓存
 * 4、程序退出时,刷新日志到文件
 */
if (cluster.isMaster) {
    setInterval(() => {
        let oldEst = new Date(Date.now() - (keepDays * 1000 * 86400)).Format(logPrefixFormat);
        if (logPath && fs.existsSync(logPath)) {
            fs.readdir(logPath, (err, files) => {
                if (!err && files) {
                    files.forEach(f => {
                        let m = f.match(/^([0-9]{8})/);
                        if (m && m[1] <= oldEst) {
                            fs.unlink(path.join(logPath, f), () => {
                            });
                        }
                    });
                }
            });
        }
    }, cleanTime);

    setInterval(flushLog, flushTime);

    cluster.on('online', worker => {
        worker.on('message', msg => {
            if (msg.cmd === 'WriteLog') {
                putLine(msg.category, msg.line);
            }
        });
    });

    process.on('beforeExit', () => {
        exports.FlushLog();
    })
}

/**
 * 为日期对象添加格式化函数
 * @param fmt
 * @constructor
 */
Date.prototype.Format = function (fmt) {
    let options = {
        "y+": this.getFullYear(), //年份
        "M+": this.getMonth() + 1, //月份
        "d+": this.getDate(), //日
        "h+": this.getHours(), //小时
        "m+": this.getMinutes(), //分
        "s+": this.getSeconds(), //秒
        "S+": this.getMilliseconds() //毫秒
    }
    let regexMap = {
        "y+": /(y+)/,
        "M+": /(M+)/,
        "d+": /(d+)/,
        "h+": /(h+)/,
        "m+": /(m+)/,
        "s+": /(s+)/,
        "S+": /(S+)/
    }
    for (let k in options) {
        if (regexMap[k].test(fmt)) {
            fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (options[k]) : (('0000' + options[k]).substr(-RegExp.$1.length)));
        }
    }
    return fmt;
}

/**
 * 设置日志最大缓冲长度
 * @param size
 * @constructor
 */
exports.SetMaxLogBufferSize = function (size) {
    maxLogBufferSize = size;
}
/**
 * 设置最大行缓存长度
 * @param size
 * @constructor
 */
exports.SetMaxLineCacheSize = function (size) {
    maxLineCacheSize = size;
}
/**
 * 设置日志路径
 * @param _logPath
 * @constructor
 */
exports.SetLogPath = function (_logPath) {
    logPath = path.resolve(_logPath);
}
/**
 * 设置是否启用小时分割
 * @param _useHourSuffix
 * @constructor
 */
exports.SetHourSuffix = function (_useHourSuffix) {
    useHourSuffix = _useHourSuffix;
}
/**
 * 设置日志保存的天数
 * @param days
 * @constructor
 */
exports.SetKeepDays = function (days) {
    keepDays = days;
}
/**
 * 获取日志存储路径
 * @returns {*}
 * @constructor
 */
exports.GetLogPath = function () {
    return logPath;
}

/**
 * 获取当前日期:yyyyMMdd
 * @returns {*}
 * @constructor
 */
exports.GetDayYMD = function () {
    return new Date().Format(logPrefixFormat)
}

/**
 * 设置禁止采集的日志类型
 * @param categories
 * @constructor
 */
exports.SetDisableCategory = function (categories = ['ALL']) {
    disableCategory = {};
    categories.forEach(c => {
        if (c) {
            disableCategory[c] = 1;
        }
    });
}

/**
 * 设置行的最大长度
 * @param maxSize
 * @constructor
 */
exports.SetMaxColumnSize = function (maxSize) {
    if (maxSize > 0) {
        maxColumnSize = maxSize;
    } else {
        maxColumnSize = 4000;
    }
}

/**
 * 写日志
 * @param category
 * @param msg
 * @constructor
 */
exports.WriteLog = function (category, msgList) {
    if (!msgList || !category || msgList.length <= 0) {
        return;
    }
    if (disableCategory['ALL'] || disableCategory[category]) {
        return;
    }
    let outMsg = [];
    outMsg.push('"');
    outMsg.push(new Date().Format('yyyy-MM-dd hh:mm:ss'));
    outMsg.push('",');

    msgList.forEach(msg => {
        outMsg.push('"');
        if (msg) {
            msg = String(msg);
            if (msg.length > maxColumnSize) {
                msg = msg.slice(0, maxColumnSize);
            }
            outMsg.push(msg.replace(escapable, escapeChar));
        }
        outMsg.push('"');
        outMsg.push(',');
    });
    outMsg.length--;    // 移除最后一个逗号

    let line = outMsg.join('');
    let cat = lineCaches[category];
    if (!cat) {
        cat = lineCaches[category] = [];
        cat.firstLineTime = Date.now();
    }
    cat.push(line);
    if (cat.length > maxLineCacheSize) {
        sendLine(category, cat.join('\n'));
        delete lineCaches[category];
    }
}

/**
 * 刷新日志缓存
 * @param targetCategory
 * @constructor
 */
exports.FlushLog = flushLog

4.2、配置项的设定

通过配置文件,可以决定部署架构方式,监听的IP/Port,数据存储,日志落盘……相关路径的设置,并且允许默认值填充;

  • nativeIp作为分布式部署访问的前提,为当前机器对外提供访问的IP,会以哈希值填充到在线文件的路径中;
  • 允许对外提供当前启动目录下的etc/settings.json,通过它来引入配置项;
  • 配置允许授权文件上传客户端的AppId,appToken,可以配置多个key-value(防止恶意上传打满磁盘);
  • 配置文件数据的存储位置;
  • 配置文件下载,预览的前缀,集群模式下为协议://域名,作为上传成功的响应;
  • 配置日志加载项,对应上述日志代码;
'use strict'
const fs = require('fs');
const path = require('path');

const settingFile = './etc/settings.json';

process.env.settingsFile = settingFile;
process.env.settings = fs.existsSync(settingFile) ? fs.readFileSync(settingFile).toString() : '{}';

let default_settings = {
    appName: 'ike_httpFileServer',
    bindIP: '0.0.0.0',
    bindPort: 8088,
    nativeIp: '2.5.8.88',
    secrets: {
        ike: '4a5826a55872410499c27aeb860ac195',    // 授权允许的app,key-value分别对呀appId与appToken
    },
    dataDir: path.resolve(__dirname, 'data'),    // 数据文件的存放位置
    dataUrlPrefix: 'http://9.135.218.88:8088',  // 文件下载的前缀,协议+域名(ip)
    log: {
        maxLogBufferSize: 8 * 1024 * 1024,   // 默认日志最大缓冲大小L:8M
        maxLineCacheSize: 16,   // 默认行缓存队列的最大长度:16行
        maxColumnSize: 4000,    // 默认一行日志的最大长度:4000
        path: path.resolve(__dirname, 'log'),    // 默认日志的保存路径,当前
        useHourSuffix: false,   // 默认不启用小时分割
        keepDays: 30,   // 默认日志的保存天数
    }
}
let settings = Object.assign({}, default_settings, JSON.parse(process.env.settings));

exports.settings = settings;

4.3、核心代码

这里才是关键点,引入了上文的log.jssettings.js

  • 前期准备工作:

    引入 log.js,并依次初始化日志的配置项;

    根据配置文件的DataDirLogPath,依次初始化数据存储目录,日志落盘目录;

    根据当前运行机器的CPU核数,fork出CPU核数 - 1 的worker进程数,充分利用机器资源,打满并发;

    master监控worker的健康状态,若有意外死掉的进程,重新fork起来,时刻保证CPU被用满;

    worker在当前的DataDir中,又新建一层文件夹,以预设的当前进程的环境变量workerProcessId命名;

  • 启动http服务,配置日志采集项;
  • 监听http请求->文件上传:

    文件上传的硬性要求,必须是post,path必须是 /upload

    文件上传请求的headers必须携带签名项:appid,apptoken,signature(用作授权的客户端校验);

    文件上传报文的处理,按照上文抓包的分析,格式化处理,支持多文件上传;

    文件落盘存储,新名称采用随机值存储(防止通过在线链接判断其目的);

    文件落盘存储,路径为DataDir/workerProcessId/DAY/,指定存储目录/进程号/日期,方便查看

    上传成功后,响应值为key-value,key为上传的文件原始名称,value为在线链接的完整地址,在线链接的完整地址前面添加当前机器nativeIp的哈希值,作为分布式部署访问的条件;

  • 监听http请求->文件下载:

    文件下载由于是读操作,所以此处鉴权,要求必须是get;

    从请求的path中分离出文件路径,然后前面加上DataDir,判断文件是否存在;是否需要响应404;

    读取文件,将内容Buffer写入响应体中,设置响应头为文件类型,表示预览,浏览器不支持解析的类型直接为下载;

    预留了直接下载的响应头注释,放开那几行代码即可。

  • 根据请求的完成finisherrorabort的状态,来采集不同的日志信息;
'use strict'

const http = require('http')
const fs = require('fs');
const os = require('os');
const cluster = require('cluster');
const nodeUtil = require('util');
const mime = require('mime')
const URL = require('url');
const crypto = require('crypto')
const path = require('path');
const log = require('./log');
const settings = require('./settings').settings;

/**
 * 随机数组,随机位数
 */
const randomArr = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'];
const randomRange = 16;

/**
 * 子进程数:当前机器cpu核数 - master进程数(1),默认cpu核数最大限制为16
 */
const workCount = Math.min(16, os.cpus().length) - 1;

/**
 * 初始化日志的配置
 */
log.SetMaxColumnSize(settings.log.maxColumnSize);
log.SetMaxLogBufferSize(settings.log.maxLogBufferSize);
log.SetMaxLineCacheSize(settings.log.maxLineCacheSize);
log.SetHourSuffix(settings.log.useHourSuffix);
log.SetLogPath(settings.log.path);
log.SetKeepDays(settings.log.keepDays);

/**
 * 初始化目录
 */
if (!fs.existsSync(settings.dataDir)) {
    fs.mkdirSync(settings.dataDir)
}
if (!fs.existsSync(settings.log.path)) {
    fs.mkdirSync(settings.log.path)
}

/**
 * master进程负责fork子进程
 */
if (cluster.isMaster) {
    let workProcessMap = {};    // 子进程的信息map
    for (let i = 1; i < workCount; i++) {
        console.log('for 循环:' + i, workCount)
        let env = Object.assign({}, process.env, {
            'workerProcessId': i
        });
        let worker = cluster.fork(env);
        workProcessMap[worker.id] = env;
    }
    cluster.on('online', function (newWorker) {
        console.log('new worker online: ' + newWorker.id)
    });
    /**
     * 子进程断开
     */
    cluster.on('disconnect', function (oldWorker) {
        console.log('The worker #' + oldWorker.id + ' has disconnected');
    });
    /**
     * 子进程下线,重新fork
     */
    cluster.on('exit', function (oldWorker, code, signal) {
        console.log(`worker ${oldWorker.id} died (${signal || code}). restarting...`);

        let cp = cluster.fork(workProcessMap[oldWorker.id]);
        console.log('worker restart created, ', cp.id);

        workProcessMap[cp.id] = workProcessMap[oldWorker.id];
        delete workProcessMap[oldWorker.id];
        cp.workerProcessId = workProcessMap[cp.id].workerProcessId;
        console.log('worker log id is', cp.workerProcessId);
        cp.isMasterWorker = (cp.workerProcessId == '-1');
        cp.isRealWorker = (cp.workerProcessId >= 0);
    });
} else if (cluster.isWorker) {
    if (process.env.workerProcessId) {
        let dataDir = path.resolve(settings.dataDir, process.env.workerProcessId.toString());
        if (!fs.existsSync(dataDir)) {
            fs.mkdirSync(dataDir)
        }
        startServer()
    }
}

function startServer() {
    let server = http.createServer();
    server.on('request', listenRequestEvent);
    server.on('close', () => {
        console.log('http Server has Stopped At:' + settings['bindPort'])
    });
    server.on('error', err => {
        console.log('http Server error:' + err.toString());
        setTimeout(() => {
            process.exit(1);
        }, 3000);
    });
    server.listen(settings['bindPort'], settings['bindIP'], settings['backlog'] || 8191, () => {
        console.log('Started Http Server At: ' + settings['bindIP'] + ':' + settings['bindPort'])
    })
}

/**
 * 请求日志:
 * 类型(aborted,finished,error),
 * 结果(fail,success),
 * 动作(上传还是下载),
 * 请求头host,请求路径,客户端IP,耗时,异常信息
 * @param fileLog
 * @param type
 */
function requestLog(fileLog, type) {
    fileLog.duration = Date.now() - fileLog.startTime
    log.WriteLog('request', [
        type, defaultStr(fileLog.result), defaultStr(fileLog.action), defaultStr(fileLog.host), defaultStr(fileLog.path),
        defaultStr(fileLog.remoteAddress), defaultStr(fileLog.duration), defaultStr(fileLog.error)]);
}

/**
 * 异常日志
 * @param args
 */
function exceptionLog(...args) {
    log.WriteLog('exception', [...args])
}

function defaultStr(v) {
    return v ? v : ''
}

/**
 * 监听请求事件(强制要求上传一定是post,path = /upload)
 * @param request
 * @param response
 * @returns {Promise<void>}
 */
async function listenRequestEvent(request, response) {
    request.fileLog = {
        startTime: Date.now(),
        host: request.headers['host'],
        remoteAddress: request.remoteAddress,
    };
    request.on('aborted', () => {
        request.fileLog.result = 'fail'
        requestLog(request.fileLog, 'aborted');
    });
    request.on('finish', () => {
        request.fileLog.result = 'success'
        requestLog(request.fileLog, 'finish');
    })
    request.on('error', (err) => {
        request.fileLog.result = 'fail'
        request.fileLog.error = nodeUtil.inspect(err)
        requestLog(request.fileLog, 'err');
    })
    try {
        let sourceUrl = URL.parse(request.url, true);
        if (request.method.toLowerCase() == 'post') {
            request.fileLog.action = 'upload'
            if (sourceUrl.pathname != '/upload') {
                request.abort()
                return
            }
            if (!checkHeaders(request.headers)) {
                request.fileLog.result = 'fail';
                request.fileLog.error = 'request headers checkout fail';
                response.statusCode = 400;
                response.end('request headers checkout fail, please retry')
                return
            }
            upload(request).then(res => {
                response.statusCode = 200;
                response.setHeader('Content-Type', 'application/json')
                response.end(JSON.stringify(res))
            })
        } else if (request.method.toLowerCase() == 'get') {
            request.fileLog.action = 'download';
            download(request, sourceUrl).then(res => {
                // let temp = sourceUrl.pathname.split('/');
                response.statusCode = 200;
                response.setHeader('Content-type', mime.getType(res.filePath) + ';charset=utf-8')
                // response.setHeader('Content-Type', 'application/force-download');
                // response.setHeader('Content-Disposition', 'attachment; filename=haha' + temp[temp.length - 1]);
                response.end(res.data)
            }).catch(err => {
                console.dir(err)
                if (err.code) {
                    response.statusCode = err.code
                } else {
                    response.statusCode = 500;
                }
                if (err.body) {
                    response.end(err.body.toString)
                } else {
                    response.end(err.toString)
                }
            });
        } else {
            request.fileLog.action = 'unknown'
            request.abort()
            return
        }
    } catch (e) {
        request.fileLog.error = nodeUtil.inspect(e)
        request.abort()
        return
    }
}

/**
 * 验证请求头的签名信息
 * @param headers
 * @returns {boolean}
 */
function checkHeaders(headers) {
    if (!headers['appid'] || !headers['apptoken'] || !headers['signature']) {
        return false;
    }
    let appId = headers['appid'];
    if (!settings.secrets[appId]) {
        return false
    }
    let appToken = headers['apptoken'];
    let signature = hashStr(appId + appToken);
    return headers['signature'] === signature
}

/**
 * 哈希值计算
 * @param str
 * @returns {string}
 */
function hashStr(str) {
    return crypto.createHash('sha256').update(str).digest('hex').toUpperCase()
}

/**
 * 上传,兼容单文件和多文件的上传
 * @param request
 * @returns {Promise<void>}
 */
function upload(request) {
    return new Promise((resolve, _) => {
        let data = Buffer.alloc(0)
        let separator = `--${request.headers['content-type'].split('boundary=')[1]}`
        request.on('data', chunk => {
            data = Buffer.concat([data, chunk])
        })
        request.on('end', () => {
            let bufArr = uploadBufferSplit(data, separator).slice(1, -1)
            let result = [];
            let error = [];
            bufArr.forEach(item => {
                let [head, body] = uploadBufferSplit(item, '\r\n\r\n')
                let headArr = uploadBufferSplit(head, '\r\n').slice(1)    //可能存在两行head,用换行符'\r\n'分割一下,第一个元素是截取后剩下空buffer,剔除
                let headerVal = parseUploadHeader(headArr[0].toString())    // header的第一行为Content-Disposition,通过它能拿到文件名
                if (headerVal.filename) {
                    let viewPathName = `${process.env.workerProcessId}/${log.GetDayYMD()}`   // 对外展示的路径
                    let dirPathName = path.resolve(settings.dataDir, viewPathName)  // 真正存储在磁盘上的路径
                    try {
                        if (!fs.existsSync(dirPathName)) {
                            fs.mkdirSync(dirPathName)
                        }
                        let temp = headerVal.filename.split('.')
                        let createFileName = `${randomStr()}.${temp[temp.length - 1]}`  // 随机文件名
                        let filePath = `${dirPathName}/${createFileName}`
                        let resultPath = `${viewPathName}/${createFileName}`

                        fs.writeFileSync(filePath, body.slice(0, -2))
                        result.push({
                            [headerVal.filename]: `${settings.dataUrlPrefix}/${hashStr(settings.nativeIp)}/${resultPath}`
                        })
                    } catch (e) {
                        error.push(`${headerVal.filename}: ${e}`)
                    }
                } else {
                    error.push(`unable to parse filename: ${headerVal}`)
                    exceptionLog('upload', headerVal, 'unable to parse filename');
                }
            })
            resolve({result, error})
        })
    })
}

/**
 * 多文件上传的缓存数组分割
 * @param buffer
 * @param separator
 * @returns {[]}
 */
function uploadBufferSplit(buffer, separator) {
    const res = []
    let offset = 0;
    let index = buffer.indexOf(separator, 0)
    while (index != -1) {
        res.push(buffer.slice(offset, index))
        offset = index + separator.length
        index = buffer.indexOf(separator, index + separator.length)
    }

    res.push(buffer.slice(offset))

    return res
}

/**
 * 文件上传的头信息格式化,
 * @param header
 * @returns {{}}
 */
function parseUploadHeader(header) {
    const [_, value] = header.split(': ')
    const valueObj = {}
    try {
        value.split('; ').forEach(item => {
            const [key, val = ''] = item.split('=')
            valueObj[key] = val && JSON.parse(val)
        })
    } catch (e) {
        exceptionLog('parseUploadHeader', value, e);
    }
    return valueObj
}

/**
 * 下载
 * @param request
 * @param sourceUrl
 * @returns {Promise<unknown>}
 */
async function download(request, sourceUrl) {
    return new Promise((resolve, reject) => {
        let tempArr = sourceUrl.pathname.split('/');
        tempArr.splice(0, 2);   // 删除第一个空格,和第二个nativeIp的哈希值
        let filePath = `${settings.dataDir}/${tempArr.join('/')}`
        if (!fs.existsSync(filePath)) {
            reject({
                code: 404,
                body: 'file not found'
            })
        } else {
            fs.readFile(filePath, (err, data) => {
                if (err) {
                    reject({
                        code: 500,
                        body: nodeUtil.inspect(err)
                    })
                } else {
                    resolve({filePath, data})
                }
            })
        }
    })
}

/**
 * 随机字符串
 * @returns {string}
 */
function randomStr() {
    let result = ''
    for (let i = 0; i < randomRange; i++) {
        result += randomArr[Math.round(Math.random() * (randomArr.length - 1))];
    }
    return result;
}

4.4、测试

1、打开postman,请求头的设定,新增三个字段appid,apptoken,signature(为appid + apptoken 的sha256值大写);
node_文件上传headers.png

2、body选择上传的文件,可以上传多个;
node_文件上传body.png

3、浏览器预览:
node_文件上传预览.png

4、数据存储结构:

├── data
│   ├── 1
│   │   └── 20221023
│   │       ├── IMoCfUdNRaUMkVVy.sh
│   │       ├── KnJSPvlejvMoSrxH.sh
│   │       ├── QwOOJoUTCYdkmZby.pdf
│   │       ├── iJlEbBwgifNFMgXH.jpeg
│   │       ├── sEMrenkBNPThOUVy.pdf
│   │       └── tYRGuYkiwvJTGBjF.sh
│   ├── 2
│   │   ├── 20221018
│   │   │   ├── YGCYdDgPDMHKzfrg.jpeg
│   │   │   └── kgVjozYwRAYUSpqU.js
│   │   └── 20221023
│   │       ├── UdEbPfVsniDfORru.sh
│   │       └── jyfgVDrvJrIIkWpP.pdf
…………………………

4、分布式部署

分布式部署:鸡蛋不能放到一个篮子里,为了拓宽硬件机房的功能性,更多的场景不管是数据存储,还是在线服务,都是选择多节点组装成集群,采用分布式部署的架构,做到融灾与高性能的兼具性;

4.1、架构图

画了个流程图,大致如下所示:

  • 部署多个文件服务节点,组成文件服务集群(即把上述编译好的js文件,放到多台机器上执行),文件服务节点之间不关联;
  • 部署多个http网关节点,组成网关集群,通过网关可以设置安全组,防刷,限流,可以配置网关为文件服务集群的唯一白名单;
  • 客户端的请求先到http网关,网关在分析请求后,选择转发到具体的文件服务节点;文件上传可以是随机均衡,可以是权重,也可以是会话保持,但是文件下载(预览),http网关必须根据请求path的nativeIp哈希值,从自身配置中找到具体的文件服务节点,然后定向转发,否则会报错404;

_分布式文件服务部署.jpg

4.2、http网关

作为一款http网关,完全透传客户端的请求&响应报文(path,headers,body……),只不过在请求链路的中间,充当安全防控和文件预览的溯源的功能:

  • 文件服务集群的nativeIp列表必须配置在网关集群上,这样子它才可以根据path中的哈希值做到定向转发;
  • 文件服务节点,文件上传响应值的链接前缀,必须配置成http网关集群的域名,这样透传给客户端的最终在线链接才是有效的;
  • 其他的,跟绝大多数网关功能类似……

实现的话,逻辑比较简单,这里就不再续写代码了,本来只是通过“模拟微信开发,nodejs搭建一套高性能分布式的在线文件服务系统”作为芥子,引入分布式部署构建的架构图,而我本地自测和调试的时候,就没必要消耗额外的资源去部署啦,单节点即可。

5、写在文末

上述代码内容言简意赅,没有冗余项,也没有引入第三方框架依赖包,完全是以node原生的模式搭建,是可以直接拿出来使用的。但作者并不是专业的前端coder,本着带对象学习前端开发的方式,才开启了“模拟微信”这一征程,在征程结束之前,即对象还没有拿到一份正式的前端offer,代码是不会放到github的。

每次推送的章节,即便没有开源链接,代码也绝大不会穿插嵌套复杂,都可以直接贴出去引用~

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
82 3
|
4月前
|
存储 监控 负载均衡
检索服务elasticsearch分布式结构
【8月更文挑战第22天】
49 3
|
4月前
|
存储 监控 固态存储
【vSAN分布式存储服务器数据恢复】VMware vSphere vSAN 分布式存储虚拟化平台VMDK文件1KB问题数据恢复案例
在一例vSAN分布式存储故障中,因替换故障闪存盘后磁盘组失效,一台采用RAID0策略且未使用置备的虚拟机VMDK文件受损,仅余1KB大小。经分析发现,该VMDK文件与内部虚拟对象关联失效导致。恢复方案包括定位虚拟对象及组件的具体物理位置,解析分配空间,并手动重组RAID0结构以恢复数据。此案例强调了深入理解vSAN分布式存储机制的重要性,以及定制化数据恢复方案的有效性。
99 5
|
21天前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
36 5
|
1月前
|
监控 算法 网络协议
|
3月前
|
自然语言处理 搜索推荐 数据库
高性能分布式搜索引擎Elasticsearch详解
高性能分布式搜索引擎Elasticsearch详解
89 4
高性能分布式搜索引擎Elasticsearch详解
|
2月前
|
存储 开发框架 .NET
C#语言如何搭建分布式文件存储系统
C#语言如何搭建分布式文件存储系统
77 2
|
2月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
38 1
|
2月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
51 1
|
3月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
99 3