apisix 最详细源码分析以及手撸一个 apisix
source link: https://segmentfault.com/a/1190000040962770
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
apisix 最详细源码分析以及手撸一个 apisix
- make run 调用 ./bin/apisix start
- 寻找 juajit 路径 运行 /usr/local/Cellar/openresty/1.19.9.1_2/luajit/bin/luajit ./apisix/cli/apisix.lua start
调用 ops.lua 里的 start 方法,初始化配置,ETCD,执行openresty 启动命令
local function start(env, ...) init(env) init_etcd(env, args) util.execute_cmd(env.openresty_args) end
初始化 nginx 配置,通过读取 conf/config.yaml 配合模板 ngx_tpl.lua 生成 nginx config 文件。供 openresty(nginx)使用
local conf_render = template.compile(ngx_tpl) local ngxconf = conf_render(sys_conf) local ok, err = util.write_file(env.apisix_home .. "/conf/nginx.conf", ngxconf) if not ok then util.die("failed to update nginx.conf: ", err, "\n") end
初始化 ETCD,读取ETCD集群配置,进行连接
local version_url = host .. "/version" local errmsg local res, err local retry_time = 0 while retry_time < 2 do res, err = request(version_url, yaml_conf) -- In case of failure, request returns nil followed by an error message. -- Else the first return value is the response body -- and followed by the response status code. if res then break end retry_time = retry_time + 1 print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s", version_url, err, retry_time)) end
执行openresty 启动命令 openresty -p /usr/local/apisix -c /conf/nginx.conf
local openresty_args = [[openresty -p ]] .. apisix_home .. [[ -c ]] .. apisix_home .. [[/conf/nginx.conf]] util.execute_cmd(env.openresty_args)
nginx 配置中嵌入的 apisix 流程
init_by_lua_block { require "resty.core" apisix = require("apisix") local dns_resolver = { "172.19.2.70", "172.19.2.62", } local args = { dns_resolver = dns_resolver, } apisix.http_init(args) } init_worker_by_lua_block { apisix.http_init_worker() } exit_worker_by_lua_block { apisix.http_exit_worker() } access_by_lua_block { apisix.http_access_phase() } header_filter_by_lua_block { apisix.http_header_filter_phase() } body_filter_by_lua_block { apisix.http_body_filter_phase() } log_by_lua_block { apisix.http_log_phase() } proxy_pass $upstream_scheme://apisix_backend$upstream_uri; upstream apisix_backend { server 0.0.0.1; balancer_by_lua_block { apisix.http_balancer_phase() } keepalive 320; keepalive_requests 1000; keepalive_timeout 60s; }
apisix.http_init
1.设置 dns resolver
2.设置实例id
3.启动 privileged agentcore.resolver.init_resolver(args) core.id.init() local process = require("ngx.process") local ok, err = process.enable_privileged_agent()
apisix.http_init_worker
function _M.http_init_worker() local seed, err = core.utils.get_seed_from_urandom() if not seed then core.log.warn('failed to get seed from urandom: ', err) seed = ngx_now() * 1000 + ngx.worker.pid() end math.randomseed(seed) -- for testing only core.log.info("random test in [1, 10000]: ", math.random(1, 10000)) local we = require("resty.worker.events") local ok, err = we.configure({shm = "worker-events", interval = 0.1}) if not ok then error("failed to init worker event: " .. err) end local discovery = require("apisix.discovery.init").discovery if discovery and discovery.init_worker then discovery.init_worker() end require("apisix.balancer").init_worker() load_balancer = require("apisix.balancer") require("apisix.admin.init").init_worker() require("apisix.timers").init_worker() plugin.init_worker() router.http_init_worker() require("apisix.http.service").init_worker() plugin_config.init_worker() require("apisix.consumer").init_worker() if core.config == require("apisix.core.config_yaml") then core.config.init_worker() end require("apisix.debug").init_worker() apisix_upstream.init_worker() require("apisix.plugins.ext-plugin.init").init_worker() local_conf = core.config.local_conf() if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then ver_header = "APISIX" end end
1.初始化 openresty worker event
local we = require("resty.worker.events") local ok, err = we.configure({shm = "worker-events", interval = 0.1}) if not ok then error("failed to init worker event: " .. err) end
2.初始化服务发现
3.初始化balancer组件
4.初始化admin组件(会同步一次插件配置到 etcd)
sync_local_conf_to_etcd(true)
5.初始化timers组件
6.初始化 plugin 组件(清掉旧的 table, 然后把从config-default.yaml 和 config.yaml 文件中读取插件配置放到一个 local_plugins_hash中,并按优先级排序)
local local_conf_path = profile:yaml_path("config-default") local default_conf_yaml, err = util.read_file(local_conf_path) local_conf_path = profile:yaml_path("config") local user_conf_yaml, err = util.read_file(local_conf_path) ok, err = merge_conf(default_conf, user_conf) local local_plugins = core.table.new(32, 0) for name in pairs(local_plugins_hash) do unload_plugin(name) end core.table.clear(local_plugins) core.table.clear(local_plugins_hash) for name in pairs(processed) do load_plugin(name, local_plugins) end -- sort by plugin's priority if #local_plugins > 1 then sort_tab(local_plugins, sort_plugin) end local plugin_metadatas, err = core.config.new("/plugin_metadata", {automatic = true} )
7.初始化router组件(初始化 etcd /global_rules 数据)
local global_rules, err = core.config.new("/global_rules", { automatic = true, item_schema = core.schema.global_rule, checker = plugin_checker, })
8.初始化 servers 组件(从ETCD抓取 services 配置)
services, err = core.config.new("/services", { automatic = true, item_schema = core.schema.service, checker = plugin_checker, filter = filter, })
9.初始化sonsumer组件
10.同步 config_yaml 到各个进程
-- sync data in each non-master process ngx.timer.every(1, read_apisix_yaml)
11.初始化upstream组件
12.初始化 ext-plugin 组件
- apisix.http_exit_worker()
停止 "privileged agent" apisix.http_access_phase()
function _M.http_access_phase() local ngx_ctx = ngx.ctx if not verify_tls_client(ngx_ctx.api_ctx) then return core.response.exit(400) end -- always fetch table from the table pool, we don't need a reused api_ctx local api_ctx = core.tablepool.fetch("api_ctx", 0, 32) ngx_ctx.api_ctx = api_ctx core.ctx.set_vars_meta(api_ctx) local uri = api_ctx.var.uri if local_conf.apisix and local_conf.apisix.delete_uri_tail_slash then if str_byte(uri, #uri) == str_byte("/") then api_ctx.var.uri = str_sub(api_ctx.var.uri, 1, #uri - 1) core.log.info("remove the end of uri '/', current uri: ", api_ctx.var.uri) end end if router.api.has_route_not_under_apisix() or core.string.has_prefix(uri, "/apisix/") then local skip = local_conf and local_conf.apisix.global_rule_skip_internal_api local matched = router.api.match(api_ctx, skip) if matched then return end end router.router_http.match(api_ctx) local route = api_ctx.matched_route if not route then -- run global rule plugin.run_global_rules(api_ctx, router.global_rules, nil) core.log.info("not find any matched route") return core.response.exit(404, {error_msg = "404 Route Not Found"}) end core.log.info("matched route: ", core.json.delay_encode(api_ctx.matched_route, true)) local enable_websocket = route.value.enable_websocket if route.value.plugin_config_id then local conf = plugin_config.get(route.value.plugin_config_id) if not conf then core.log.error("failed to fetch plugin config by ", "id: ", route.value.plugin_config_id) return core.response.exit(503) end route = plugin_config.merge(route, conf) end if route.value.service_id then local service = service_fetch(route.value.service_id) if not service then core.log.error("failed to fetch service configuration by ", "id: ", route.value.service_id) return core.response.exit(404) end route = plugin.merge_service_route(service, route) api_ctx.matched_route = route api_ctx.conf_type = "route&service" api_ctx.conf_version = route.modifiedIndex .. "&" .. service.modifiedIndex api_ctx.conf_id = route.value.id .. "&" .. service.value.id api_ctx.service_id = service.value.id api_ctx.service_name = service.value.name if enable_websocket == nil then enable_websocket = service.value.enable_websocket end else api_ctx.conf_type = "route" api_ctx.conf_version = route.modifiedIndex api_ctx.conf_id = route.value.id end api_ctx.route_id = route.value.id api_ctx.route_name = route.value.name -- run global rule plugin.run_global_rules(api_ctx, router.global_rules, nil) if route.value.script then script.load(route, api_ctx) script.run("access", api_ctx) else local plugins = plugin.filter(route) api_ctx.plugins = plugins plugin.run_plugin("rewrite", plugins, api_ctx) if api_ctx.consumer then local changed route, changed = plugin.merge_consumer_route( route, api_ctx.consumer, api_ctx ) core.log.info("find consumer ", api_ctx.consumer.username, ", config changed: ", changed) if changed then api_ctx.matched_route = route core.table.clear(api_ctx.plugins) api_ctx.plugins = plugin.filter(route, api_ctx.plugins) end end plugin.run_plugin("access", plugins, api_ctx) end local up_id = route.value.upstream_id -- used for the traffic-split plugin if api_ctx.upstream_id then up_id = api_ctx.upstream_id end if up_id then local upstream = get_upstream_by_id(up_id) api_ctx.matched_upstream = upstream else if route.has_domain then local err route, err = parse_domain_in_route(route) if err then core.log.error("failed to get resolved route: ", err) return core.response.exit(500) end api_ctx.conf_version = route.modifiedIndex api_ctx.matched_route = route end local route_val = route.value if route_val.upstream and route_val.upstream.enable_websocket then enable_websocket = true end api_ctx.matched_upstream = (route.dns_value and route.dns_value.upstream) or route_val.upstream end if enable_websocket then api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade api_ctx.var.upstream_connection = api_ctx.var.http_connection core.log.info("enabled websocket for route: ", route.value.id) end if route.value.service_protocol == "grpc" then api_ctx.upstream_scheme = "grpc" end local code, err = set_upstream(route, api_ctx) if code then core.log.error("failed to set upstream: ", err) core.response.exit(code) end local server, err = load_balancer.pick_server(route, api_ctx) if not server then core.log.error("failed to pick server: ", err) return core.response.exit(502) end api_ctx.picked_server = server set_upstream_headers(api_ctx, server) -- run the before_proxy method in access phase first to avoid always reinit request common_phase("before_proxy") local ref = ctxdump.stash_ngx_ctx() core.log.info("stash ngx ctx: ", ref) ngx_var.ctx_ref = ref local up_scheme = api_ctx.upstream_scheme if up_scheme == "grpcs" or up_scheme == "grpc" then return ngx.exec("@grpc_pass") end if api_ctx.dubbo_proxy_enabled then return ngx.exec("@dubbo_pass") end end
1.初始化 api_ctx 上下文
2.client tls 验证
3.是否为 apisix 已经注册的路径,对请求进行匹配,内部使用 Radix tree 进行匹配
4.向上下文注入该请求匹配的 apisix route ,service 等信息用于后续阶段使用
5.向上下文注入该请求相关的插件,例如:请求对应的路由存在插件,若请求存在对应的 service 则加入 service 定义的插件,以及全局插件
6.调用 "rewrite" 阶段的插件
7.调用 "access" 阶段的插件
8.获取 upstream
9.执行 loadbalancer 选择 server
10.调用 "balancer" 阶段插件
11.判断 upstream ,根据 upstream 类型,grpc dubbo 等进入 @grpc_pass @dubbo_pass 等不同的后续处理流程。 这些配置可在 nginx.conf 中查看apisix.http_balancer_phase()
function _M.http_header_filter_phase() if ngx_var.ctx_ref ~= '' then -- prevent for the table leak local stash_ctx = fetch_ctx() -- internal redirect, so we should apply the ctx if ngx_var.from_error_page == "true" then ngx.ctx = stash_ctx end end core.response.set_header("Server", ver_header) local up_status = get_var("upstream_status") if up_status and #up_status == 3 and tonumber(up_status) >= 500 and tonumber(up_status) <= 599 then set_resp_upstream_status(up_status) elseif up_status and #up_status > 3 then -- the up_status can be "502, 502" or "502, 502 : " local last_status if str_byte(up_status, -1) == str_byte(" ") then last_status = str_sub(up_status, -6, -3) else last_status = str_sub(up_status, -3) end if tonumber(last_status) >= 500 and tonumber(last_status) <= 599 then set_resp_upstream_status(up_status) end end common_phase("header_filter") end
1.设置头 "Server", APISIX
2.设置上游状态头:X-APISIX-Upstream-Status
3.执行 “header_filter” 阶段的插件
apisix.http_body_filter_phase()
function _M.http_body_filter_phase() common_phase("body_filter") end
执行 “body_filter” 阶段的插件
apisix.http_log_phase()
function _M.http_log_phase() if ngx_var.ctx_ref ~= '' then -- prevent for the table leak local stash_ctx = fetch_ctx() -- internal redirect, so we should apply the ctx if ngx_var.from_error_page == "true" then ngx.ctx = stash_ctx end end local api_ctx = common_phase("log") if not api_ctx then return end healthcheck_passive(api_ctx) if api_ctx.server_picker and api_ctx.server_picker.after_balance then api_ctx.server_picker.after_balance(api_ctx, false) end if api_ctx.uri_parse_param then core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param) end core.ctx.release_vars(api_ctx) if api_ctx.plugins then core.tablepool.release("plugins", api_ctx.plugins) end if api_ctx.curr_req_matched then core.tablepool.release("matched_route_record", api_ctx.curr_req_matched) end core.tablepool.release("api_ctx", api_ctx) end
1.执行 “log” 阶段的插件
2.回收 plugins, matched_route_record,api_ctx 缓存apisix.http_balancer_phase()
function _M.http_balancer_phase() local api_ctx = ngx.ctx.api_ctx if not api_ctx then core.log.error("invalid api_ctx") return core.response.exit(500) end load_balancer.run(api_ctx.matched_route, api_ctx, common_phase) end
1.设置 balance 的基本参数(超时时间,连接失败重试次数)
2.选择上游服务器,采用一致性hash算法
3.调用 set_current_peer 设置 proxy_pass
自定义插件逻辑
通过 common_phase 作为自定义插件的方法的公共入口,被 openresty 各个环节调用
local function common_phase(phase_name) local api_ctx = ngx.ctx.api_ctx if not api_ctx then return end plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name) if api_ctx.script_obj then script.run(phase_name, api_ctx) return api_ctx, true end return plugin.run_plugin(phase_name, nil, api_ctx) end
真正执行自定义插件逻辑
for i = 1, #plugins, 2 do local phase_func = plugins[i][phase] if phase_func then plugin_run = true local code, body = phase_func(plugins[i + 1], api_ctx) if code or body then if is_http then if code >= 400 then core.log.warn(plugins[i].name, " exits with http status code ", code) end core.response.exit(code, body) else if code >= 400 then core.log.warn(plugins[i].name, " exits with status code ", code) end ngx_exit(1) end end end end
admin api
location /apisix/admin { set $upstream_scheme 'http'; set $upstream_host $http_host; set $upstream_uri ''; allow 127.0.0.0/24; deny all; content_by_lua_block { apisix.http_admin() } }
自定义插件
local function common_phase(phase_name) local api_ctx = ngx.ctx.api_ctx if not api_ctx then return end plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name) if api_ctx.script_obj then script.run(phase_name, api_ctx) return api_ctx, true end return plugin.run_plugin(phase_name, nil, api_ctx) end
已知的阶段
preread
ssl
access
balancer
rewrite
header_filter
body_filter
log
admin api
location /apisix/admin { set $upstream_scheme 'http'; set $upstream_host $http_host; set $upstream_uri ''; allow 127.0.0.0/24; deny all; content_by_lua_block { apisix.http_admin() } }
根据路由进行转发
local ok = router:dispatch(get_var("uri"), {method = get_method()})
具体路由 dispatch 逻辑
function _M.dispatch(self, path, opts, ...) if type(path) ~= "string" then error("invalid argument path", 2) end local args local len = select('#', ...) if len > 0 then if not self.args then self.args = {...} else clear_tab(self.args) for i = 1, len do self.args[i] = select(i, ...) end end -- To keep the self.args in safe, -- we can't yield until filter_fun is called args = self.args args[0] = len end local route, err = match_route(self, path, opts or empty_table, args) if not route then if err then return nil, err end return nil end local handler = route.handler if not handler or type(handler) ~= "function" then return nil, "missing handler" end handler(...) return true end
路由与hanlder 的关系
local uri_route = { { paths = [[/apisix/admin/*]], methods = {"GET", "PUT", "POST", "DELETE", "PATCH"}, handler = run, }, { paths = [[/apisix/admin/stream_routes/*]], methods = {"GET", "PUT", "POST", "DELETE", "PATCH"}, handler = run_stream, }, { paths = [[/apisix/admin/plugins/list]], methods = {"GET"}, handler = get_plugins_list, }, { paths = reload_event, methods = {"PUT"}, handler = post_reload_plugins, }, }
调用 run 方法
local function run() local api_ctx = {} core.ctx.set_vars_meta(api_ctx) ngx.ctx.api_ctx = api_ctx local ok, err = check_token(api_ctx) if not ok then core.log.warn("failed to check token: ", err) core.response.exit(401) end local uri_segs = core.utils.split_uri(ngx.var.uri) core.log.info("uri: ", core.json.delay_encode(uri_segs)) -- /apisix/admin/schema/route local seg_res, seg_id = uri_segs[4], uri_segs[5] local seg_sub_path = core.table.concat(uri_segs, "/", 6) if seg_res == "schema" and seg_id == "plugins" then -- /apisix/admin/schema/plugins/limit-count seg_res, seg_id = uri_segs[5], uri_segs[6] seg_sub_path = core.table.concat(uri_segs, "/", 7) end local resource = resources[seg_res] if not resource then core.response.exit(404) end local method = str_lower(get_method()) if not resource[method] then core.response.exit(404) end local req_body, err = core.request.get_body(MAX_REQ_BODY) if err then core.log.error("failed to read request body: ", err) core.response.exit(400, {error_msg = "invalid request body: " .. err}) end if req_body then local data, err = core.json.decode(req_body) if not data then core.log.error("invalid request body: ", req_body, " err: ", err) core.response.exit(400, {error_msg = "invalid request body: " .. err, req_body = req_body}) end req_body = data end local uri_args = ngx.req.get_uri_args() or {} if uri_args.ttl then if not tonumber(uri_args.ttl) then core.response.exit(400, {error_msg = "invalid argument ttl: " .. "should be a number"}) end end local code, data = resource[method](seg_id, req_body, seg_sub_path, uri_args) if code then data = strip_etcd_resp(data) core.response.exit(code, data) end end
根据 resource 定义找到对应的 lua 模板的执行方法
local resources = { routes = require("apisix.admin.routes"), services = require("apisix.admin.services"), upstreams = require("apisix.admin.upstreams"), consumers = require("apisix.admin.consumers"), schema = require("apisix.admin.schema"), ssl = require("apisix.admin.ssl"), plugins = require("apisix.admin.plugins"), proto = require("apisix.admin.proto"), global_rules = require("apisix.admin.global_rules"), stream_routes = require("apisix.admin.stream_routes"), plugin_metadata = require("apisix.admin.plugin_metadata"), plugin_configs = require("apisix.admin.plugin_config"), }
执行对应模块的对应的方法,例如:
GET http://127.0.0.1:9080/apisix/...
就是直接调用 plugin_metadata 的 get 方法
function _M.get(key) local path = "/plugin_metadata" if key then path = path .. "/" .. key end local res, err = core.etcd.get(path, not key) if not res then core.log.error("failed to get metadata[", key, "]: ", err) return 503, {error_msg = err} end return res.status, res.body end
control api
入口 apisix.http_control()
server { listen 127.0.0.1:9090; access_log off; location / { content_by_lua_block { apisix.http_control() } } location @50x.html { set $from_error_page 'true'; content_by_lua_block { require("apisix.error_handling").handle_500() } } }
先注册所有的插件的 control_api 方法,调用 router:dispatch 进行路由分发
function _M.match(uri) if cached_version ~= plugin_mod.load_times then local err router, err = fetch_control_api_router() if router == nil then core.log.error("failed to fetch valid api router: ", err) return false end cached_version = plugin_mod.load_times end core.table.clear(match_opts) match_opts.method = get_method() return router:dispatch(uri, match_opts) end
例如:server_info 插件,注册路径 /v1/server_info 并指定使用 get_server_info函数进行处理
function _M.control_api() return { { methods = {"GET"}, uris ={"/v1/server_info"}, handler = get_server_info, } } end
注册 plugin的 control_api 方法
for _, plugin in ipairs(plugin_mod.plugins) do local api_fun = plugin.control_api if api_fun then local api_route = api_fun() register_api_routes(routes, api_route) end end
通过dispatch 方法调用插件的 handler 方法
local route, err = match_route(self, path, opts or empty_table, args) if not route then if err then return nil, err end return nil end local handler = route.handler if not handler or type(handler) ~= "function" then return nil, "missing handler" end handler(...)
手撸 apisix 地址 :
https://github.com/mousycoder...
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK