使用与代码对照分析
流量控制之limit-conn
代码实现
- 初始化
local plugin_name = "limit-conn" local schema = { type = "object", properties = { conn = {type = "integer", exclusiveMinimum = 0}, burst = {type = "integer", minimum = 0}, default_conn_delay = {type = "number", exclusiveMinimum = 0}, only_use_default_delay = {type = "boolean", default = false}, key = {type = "string"}, key_type = {type = "string", enum = {"var", "var_combination"}, default = "var", }, rejected_code = { type = "integer", minimum = 200, maximum = 599, default = 503 }, rejected_msg = { type = "string", minLength = 1 }, allow_degradation = {type = "boolean", default = false} }, required = {"conn", "burst", "default_conn_delay", "key"} } local _M = { version = 0.1, priority = 1003, name = plugin_name, schema = schema, }
优先级是1003,包含连接数,爆破数,默认连接延迟,是否只使用默认延迟,key,关键词类型,拒绝响应码,拒绝信息。必要的信息是连接数,爆破树,默认链接延迟,key。
- 检查配置是否符合
function _M.check_schema(conf) return core.schema.check(schema, conf) end
- access方法
function _M.increase(conf, ctx) core.log.info("ver: ", ctx.conf_version) local lim, err = lrucache(conf, nil, create_limit_obj, conf) if not lim then core.log.error("failed to instantiate a resty.limit.conn object: ", err) if conf.allow_degradation then return end return 500 end local conf_key = conf.key local key if conf.key_type == "var_combination" then local err, n_resolved key, err, n_resolved = core.utils.resolve_var(conf_key, ctx.var); if err then core.log.error("could not resolve vars in ", conf_key, " error: ", err) end if n_resolved == 0 then key = nil end else key = ctx.var[conf_key] end if key == nil then core.log.info("The value of the configured key is empty, use client IP instead") -- When the value of key is empty, use client IP instead key = ctx.var["remote_addr"] end key = key .. ctx.conf_type .. ctx.conf_version core.log.info("limit key: ", key) local delay, err = lim:incoming(key, true) if not delay then if err == "rejected" then if conf.rejected_msg then return conf.rejected_code, { error_msg = conf.rejected_msg } end return conf.rejected_code or 503 end core.log.error("failed to limit conn: ", err) if conf.allow_degradation then return end return 500 end if lim:is_committed() then if not ctx.limit_conn then ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6) end core.table.insert_tail(ctx.limit_conn, lim, key, delay, conf.only_use_default_delay) end if delay >= 0.001 then sleep(delay) end end
从缓存中获取该配置出现的次数,并从上下文中获取请求的参数,然后进行一系列的配置来确定是否超过了限制。
- log
function _M.decrease(conf, ctx) local limit_conn = ctx.limit_conn if not limit_conn then return end for i = 1, #limit_conn, 4 do local lim = limit_conn[i] local key = limit_conn[i + 1] local delay = limit_conn[i + 2] local use_delay = limit_conn[i + 3] local latency if not use_delay then if ctx.proxy_passed then latency = ctx.var.upstream_response_time else latency = ctx.var.request_time - delay end end core.log.debug("request latency is ", latency) -- for test local conn, err = lim:leaving(key, latency) if not conn then core.log.error("failed to record the connection leaving request: ", err) break end end core.tablepool.release("plugin#limit-conn", limit_conn) ctx.limit_conn = nil return end
可观测性之syslog插件
- 初始化
local batch_processor_manager = bp_manager_mod.new("sys logger") local schema = { type = "object", properties = { host = {type = "string"}, port = {type = "integer"}, max_retry_times = {type = "integer", minimum = 1, default = 1}, retry_interval = {type = "integer", minimum = 0, default = 1}, flush_limit = {type = "integer", minimum = 1, default = 4096}, drop_limit = {type = "integer", default = 1048576}, timeout = {type = "integer", minimum = 1, default = 3}, sock_type = {type = "string", default = "tcp", enum = {"tcp", "udp"}}, pool_size = {type = "integer", minimum = 5, default = 5}, tls = {type = "boolean", default = false}, include_req_body = {type = "boolean", default = false} }, required = {"host", "port"} } local lrucache = core.lrucache.new({ ttl = 300, count = 512, serial_creating = true, }) -- syslog uses max_retry_times/retry_interval/timeout -- instead of max_retry_count/retry_delay/inactive_timeout local schema = batch_processor_manager:wrap_schema(schema) schema.max_retry_count = nil schema.retry_delay = nil schema.inactive_timeout = nil local _M = { version = 0.1, priority = 401, name = plugin_name, schema = schema, }
优先级是401,配置中包含host,port,max_retry_times,retry_interval等等,都是将日志发送到一个syslog服务器必备的选项。
- 检查配置
function _M.check_schema(conf) local ok, err = core.schema.check(schema, conf) if not ok then return false, err end -- syslog uses max_retry_times/retry_interval/timeout -- instead of max_retry_count/retry_delay/inactive_timeout conf.max_retry_count = conf.max_retry_times conf.retry_delay = conf.retry_interval conf.inactive_timeout = conf.timeout return true end
检查配置,并赋值。
- 发送日志
local function send_syslog_data(conf, log_message, api_ctx) local err_msg local res = true core.log.info("sending a batch logs to ", conf.host, ":", conf.port) -- fetch it from lrucache local logger, err = core.lrucache.plugin_ctx( lrucache, api_ctx, nil, logger_socket.new, logger_socket, { host = conf.host, port = conf.port, flush_limit = conf.flush_limit, drop_limit = conf.drop_limit, timeout = conf.timeout, sock_type = conf.sock_type, max_retry_times = conf.max_retry_times, retry_interval = conf.retry_interval, pool_size = conf.pool_size, tls = conf.tls, } ) if not logger then res = false err_msg = "failed when initiating the sys logger processor".. err end -- reuse the logger object local ok, err = logger:log(core.json.encode(log_message)) if not ok then res = false err_msg = "failed to log message" .. err end return res, err_msg end
这里的logger:log就是发送的日志。
- log
unction _M.log(conf, ctx) local entry = log_util.get_full_log(ngx, conf) if batch_processor_manager:add_entry(conf, entry) then return end -- Generate a function to be executed by the batch processor local cp_ctx = core.table.clone(ctx) local func = function(entries, batch_max_size) local data, err if batch_max_size == 1 then data, err = core.json.encode(entries[1]) -- encode as single {} else data, err = core.json.encode(entries) -- encode as array [{}] end if not data then return false, 'error occurred while encoding the data: ' .. err end return send_syslog_data(conf, data, cp_ctx) end batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func) end
apisix中插件是如何工作的
上节分析了四种不同类型的插件,那么究竟这些插件是怎么工作的呢?apisix里面的调度流程究竟是怎么样的? 带着上面的问题,我们来分析一下源码。
apisix.plugin
load方法
里面定义了对所有插件的加载,同时,其在init_worker中被调用
run_plugin方法
function _M.run_plugin(phase, plugins, api_ctx) local plugin_run = false api_ctx = api_ctx or ngx.ctx.api_ctx if not api_ctx then return end plugins = plugins or api_ctx.plugins if not plugins or #plugins == 0 then return api_ctx end if phase ~= "log" and phase ~= "header_filter" and phase ~= "body_filter" then for i = 1, #plugins, 2 do local phase_func = plugins[i][phase] if phase_func then plugin_run = true local code, body = phase_func(plugins[i + 1], api_ctx) if code or body then if is_http then if code >= 400 then core.log.warn(plugins[i].name, " exits with http status code ", code) end core.response.exit(code, body) else if code >= 400 then core.log.warn(plugins[i].name, " exits with status code ", code) end ngx_exit(1) end end end end return api_ctx, plugin_run end for i = 1, #plugins, 2 do local phase_func = plugins[i][phase] if phase_func then plugin_run = true phase_func(plugins[i + 1], api_ctx) end end return api_ctx, plugin_run end
可以看见,这里提到了如果是Log/header_filter/body_filter,就执行方法并根据结果打印日志,如果是别的则只执行方法。
apisix.init
http_init_worker
function _M.http_init_worker() local seed, err = core.utils.get_seed_from_urandom() if not seed then core.log.warn('failed to get seed from urandom: ', err) seed = ngx_now() * 1000 + ngx.worker.pid() end math.randomseed(seed) -- for testing only core.log.info("random test in [1, 10000]: ", math.random(1, 10000)) local we = require("resty.worker.events") local ok, err = we.configure({shm = "worker-events", interval = 0.1}) if not ok then error("failed to init worker event: " .. err) end local discovery = require("apisix.discovery.init").discovery if discovery and discovery.init_worker then discovery.init_worker() end require("apisix.balancer").init_worker() load_balancer = require("apisix.balancer") require("apisix.admin.init").init_worker() require("apisix.timers").init_worker() require("apisix.debug").init_worker() plugin.init_worker() router.http_init_worker() require("apisix.http.service").init_worker() plugin_config.init_worker() require("apisix.consumer").init_worker() if core.config == require("apisix.core.config_yaml") then core.config.init_worker() end apisix_upstream.init_worker() require("apisix.plugins.ext-plugin.init").init_worker() local_conf = core.config.local_conf() if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then ver_header = "APISIX" end end
这一个方法是核心,在apisix的nginx.conf的配置文件中,有如下配置:
init_worker_by_lua_block { apisix.http_init_worker() }
可见,该方法是在nginx启动的时候被执行的。
http_access_phase
在nginx.conf中有如下配置
access_by_lua_block { apisix.http_access_phase() }
跟插件有关的主要调用了
plugin.run_global_rules(api_ctx, router.global_rules, nil) local plugins = plugin.filter(api_ctx, route) plugin.run_plugin("rewrite", plugins, api_ctx)
而这里的plugin是plugin目录下的 apisix.plugin
http_header_filter_phase
header_filter_by_lua_block { apisix.http_header_filter_phase() }
处理header,没有调用Plugin
http_body_filter_phase
body_filter_by_lua_block { apisix.http_body_filter_phase() }
处理body,没有调用plugin
http_log_phase
log_by_lua_block { apisix.http_log_phase() }
处理log,没有调用plugin
标准模式分析
插件类型分类与作用
根据昨天的分析,插件的类型主要有身份验证、安全防护、流量控制、无服务架构和可观测性。
插件的分类介绍
官方中文插件介绍
auth
当一个插件设置 type = 'auth'
,说明它是个认证插件,认证插件需要在执行后选择对应的 consumer。举个例子,在 key-auth 插件中,它通过 apikey
请求头获取对应的 consumer,然后通过 consumer.attach_consumer
设置它。
其余插件代码中都不需要刻意去指定
编写自定义插件的步骤与方法
官方中文扩展编写介绍