kubernetes扩展apiserver实现分析
source link: https://qingwave.github.io/kube-apiserver-aggretation-api/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Apr 24, 2020 · cloud
kubernetes扩展apiserver实现分析
Kubernetes 提供了丰富的扩展功能,实现自定义资源有两种方式CRD
与Aggregation API
。相对于CRD
,扩展 API 功能更丰富,可以实现单独的存储。今天来聊一聊,k8s 是如是实现扩展 api 的,它与 apiserver 之间又是如何协作的
AggregationApiserver 介绍
Aggregator
类似于一个七层负载均衡,将来自用户的请求拦截转发给其他服务器,并且负责整个 APIServer 的 Discovery 功能。
通过APIServices
对象关联到某个Service
来进行请求的转发,其关联的Service
类型进一步决定了请求转发形式。Aggregator
包括一个GenericAPIServer
和维护自身状态的Controller
。其中 GenericAPIServer
主要处理apiregistration.k8s.io
组下的APIService
资源请求。
主要 controller 包括:
- apiserviceRegistrationController:负责
APIServices
中资源的注册与删除; - availableConditionController:维护
APIServices
的可用状态,包括其引用Service
是否可用等; - autoRegistrationController:用于保持 API 中存在的一组特定的
APIServices
; - crdRegistrationController:负责将
CRD GroupVersions
自动注册到APIServices
中; - openAPIAggregationController:将
APIServices
资源的变化同步至提供的OpenAPI
文档;
在 kube-apiserver 中需要增加以下配置来开启 API Aggregation:
--proxy-client-cert-file=/etc/kubernetes/certs/proxy.crt
--proxy-client-key-file=/etc/kubernetes/certs/proxy.key
--requestheader-client-ca-file=/etc/kubernetes/certs/proxy-ca.crt
--requestheader-extra-headers-prefix=X-Remote-Extra-
--requestheader-group-headers=X-Remote-Group
--requestheader-username-headers=X-Remote-User
如果 kube-proxy 没有和 API server 运行在同一台主机上,那么需要确保启用了如下 apiserver 标记:
--enable-aggregator-routing=true
在apiserver 启动流程中,分析了AggregationApiserver
的初始化流程, 需要了解的可以回去看下。
AggregationApiserver 认证流程
与自定义资源定义(CRD)不同,除标准的 Kubernetes apiserver 外,Aggregation API 还涉及另一个服务器:扩展 apiserver。Kubernetes apiserver 将需要与您的扩展 apiserver 通信,并且您的扩展 apiserver 也需要与 Kubernetes apiserver 通信。为了确保此通信的安全,Kubernetes apiserver 使用 x509 证书向扩展 apiserver 认证。
AggregationApi 的请求链路如下:
defaultHandlerChain->aggregator->aggregation-apiserver->aggregator->user
大致流程如下:
- Kubernetes apiserver:对发出请求的用户身份认证,并对请求的 API 路径执行鉴权。
- Kubernetes apiserver:将请求转发到扩展 apiserver
- 扩展 apiserver:认证来自 Kubernetes apiserver 的请求
- 扩展 apiserver:对来自原始用户的请求鉴权
- 扩展 apiserver:执行对应操作返回
如图所示: aggregation-apiserver-auth
apiserver 与扩展 apiserver 通过证书认证,
- apiserver 配置
porxy-client
证书(使用 requestheader 根证书签发),扩展 apiserver 配置reqeustheader
根证书,如果没配置,会默认从 configmapkube-system/extension-apiserver-authentication
去找 - 扩展 apiserver 通过
extension-apiserver-authentication
获取 apiserver 的client-ca
,生成证书对,apiserver 可以使用client-ca
验证它 - 由于 apiserver->扩展 apiserver 通过
reqeustheader
方式认证,apiserver 会将接受到的请求经过认证,转换为 header,扩展 apiserver 通过 header 获取用户,再通过 apiserver 接口做权限校验。
有同学有疑问,为什么这里需要做两次认证,两次鉴权。这是由于扩展 apiserveer 是一个单独的服务器,如果接受非 apiserver 的请求也是需要做认证鉴权的。那能不能认证是 apiserver 后就不做鉴权了呢,这得需要 apiserver 在转发请求时加入鉴权信息就行。
AggregationApiserver 处理流程
apiserver 处理逻辑
在 apiserver 认证时,认证接受会将认证信息删除, 可参考前面的[apiserver 认证源码分析]
处理逻辑如下:
- 通过
context
获取 user 信息 - 构造请求,删除 reqeustheader 信息,通过 user 重新填充
- 通过
proxyRoundTripper
转发请求
(kube-apiserver-authentication-code.md) aggregation 的hander的实现:
// 通过context获取user
user, ok := genericapirequest.UserFrom(req.Context())
if !ok {
proxyError(w, req, "missing user", http.StatusInternalServerError)
return
}
// 构造请求url,通过apiservice配置的service/namespace随机得到某个endpoint后端
location := &url.URL{}
location.Scheme = "https"
rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName, handlingInfo.servicePort)
if err != nil {
klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err)
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return
}
location.Host = rloc.Host
location.Path = req.URL.Path
location.RawQuery = req.URL.Query().Encode()
// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
// 包裹请求信息,将user信息放到header中
proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, handlingInfo.proxyRoundTripper, req)
if err != nil {
proxyError(w, req, err.Error(), http.StatusInternalServerError)
return
}
proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)
// 调用后端
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
handler.ServeHTTP(w, newReq)
根据扩展 apiserver 找到后端时通过 service 获取对应 endpoint 列表,随机选择某个 endpoint、 实现如下:
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string, port int32) (*url.URL, error) {
svc, err := services.Services(namespace).Get(id)
if err != nil {
return nil, err
}
svcPort, err := findServicePort(svc, port)
if err != nil {
return nil, err
}
switch {
case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort:
// these are fine
default:
return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type)
}
eps, err := endpoints.Endpoints(namespace).Get(svc.Name)
if err != nil {
return nil, err
}
if len(eps.Subsets) == 0 {
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svc.Name))
}
// Pick a random Subset to start searching from.
ssSeed := rand.Intn(len(eps.Subsets))
// Find a Subset that has the port.
for ssi := 0; ssi < len(eps.Subsets); ssi++ {
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
if len(ss.Addresses) == 0 {
continue
}
for i := range ss.Ports {
if ss.Ports[i].Name == svcPort.Name {
// Pick a random address.
// 核心,随机选择endpoint
ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP
port := int(ss.Ports[i].Port)
return &url.URL{
Scheme: "https",
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, nil
}
}
}
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}
ProxyRoundTripper 创建在round_trippers.go
func NewAuthProxyRoundTripper(username string, groups []string, extra map[string][]string, rt http.RoundTripper) http.RoundTripper {
return &authProxyRoundTripper{
username: username,
groups: groups,
extra: extra,
rt: rt,
}
}
func (rt *authProxyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req = utilnet.CloneRequest(req)
// 包裹user信息
SetAuthProxyHeaders(req, rt.username, rt.groups, rt.extra)
return rt.rt.RoundTrip(req)
}
// SetAuthProxyHeaders stomps the auth proxy header fields. It mutates its argument.
func SetAuthProxyHeaders(req *http.Request, username string, groups []string, extra map[string][]string) {
// 清楚原始url的requestheader信息
req.Header.Del("X-Remote-User")
req.Header.Del("X-Remote-Group")
for key := range req.Header {
if strings.HasPrefix(strings.ToLower(key), strings.ToLower("X-Remote-Extra-")) {
req.Header.Del(key)
}
}
// 通过user重新填充信息
req.Header.Set("X-Remote-User", username)
for _, group := range groups {
req.Header.Add("X-Remote-Group", group)
}
for key, values := range extra {
for _, value := range values {
req.Header.Add("X-Remote-Extra-"+headerKeyEscape(key), value)
}
}
}
扩展 apiserver 处理逻辑
下以 metrics-server 为例说明扩展 apiserver 在收到 apiserver 请求后的处理
与 apiserver 初始化相同,metrics-server 也需要初始化生成genericServer
, 然后注册 apigroup pkg/metrics-server/config.go
func (c Config) Complete() (*MetricsServer, error) {
informer, err := c.informer()
if err != nil {
return nil, err
}
kubeletClient, err := c.kubeletClient()
if err != nil {
return nil, err
}
addressResolver := c.addressResolver()
// 创建scraper,负责抓取监控数据
scrape := scraper.NewScraper(informer.Core().V1().Nodes().Lister(), kubeletClient, addressResolver, c.ScrapeTimeout)
scraper.RegisterScraperMetrics(c.ScrapeTimeout)
RegisterServerMetrics(c.MetricResolution)
// 生成genericServer, 包裹有 DefaultBuildHandlerChain
genericServer, err := c.Apiserver.Complete(informer).New("metrics-server", genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
store := storage.NewStorage()
// 注册api
if err := api.Install(store, informer.Core().V1(), genericServer); err != nil {
return nil, err
}
return &MetricsServer{
GenericAPIServer: genericServer,
storage: store,
scraper: scrape,
resolution: c.MetricResolution,
}, nil
}
api 注册代码,通过Build
生成 apigroup,调用InstallAPIGroup
进行注册 pkg/api/install.go
// InstallStorage builds the metrics for the metrics.k8s.io API, and then installs it into the given API metrics-server.
func Install(metrics MetricsGetter, informers coreinf.Interface, server *genericapiserver.GenericAPIServer) error {
info := Build(metrics, informers)
// 注册apigroup
return server.InstallAPIGroup(&info)
}
// Build constructs APIGroupInfo the metrics.k8s.io API group using the given getters.
func Build(m MetricsGetter, informers coreinf.Interface) genericapiserver.APIGroupInfo {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(metrics.GroupName, Scheme, metav1.ParameterCodec, Codecs)
// 注册metrics相关api
node := newNodeMetrics(metrics.Resource("nodemetrics"), m, informers.Nodes().Lister())
pod := newPodMetrics(metrics.Resource("podmetrics"), m, informers.Pods().Lister())
metricsServerResources := map[string]rest.Storage{
"nodes": node,
"pods": pod,
}
apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = metricsServerResources
return apiGroupInfo
}
同 apiserver,metrics-server 收到请求后会经过DefaultBuildHandlerChain
- 认证,从 apiserver 转发来的请求是
reqeustheader
形式,metrics-server 会使用requestheader-ca
验证证书 - 鉴权,同 apiserver 一样
注意, 如果 apiserver 未配置
proxy-client
证书,metrics-server 认证不通过,即使 apiserver 认证通过,metrics-server 也会认为是匿名用户system:anonymous
最后,metrics-server 执行具体逻辑,返回结果。
扩容 apiserver 的创建,处理流程与 apiserver 完全一样,可以直接调用 apiserver 的库,扩展 apiserver 直接处理请求,不需要经过 webhook,性能更好,更强大的是完全不使用 etcd,替换成时序数据库或者其他数据库。后续可以分析下 CRD 与扩展 apiserver 的区别以及使用场景。
Recommend
-
32
本文通过记录kubelet连接apiserver超时问题的原因及修复办法。 上篇文章回顾:一文读懂HBase多租户 背 景 kubernetes是ma...
-
30
Kubernetes ApiServer 并发安全机制 30 January 2020 笔者在这篇文章中想要探讨的问题是:当向 Kubernetes ApiServer 并发的发起多个请求,对同一个 API 资源对象进行更新时,Kubernetes Ap...
-
8
大家好,我是杨鼎睿,这一次给大家带来的是 ETCD 的源码阅读。本文写就时是三部分,方便大家阅读,合成一篇,分别是 Server 篇, Storage 篇和 Utility 篇。 本文研究了 ETCD 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。...
-
9
大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 API Group 部分的源码,配备源码进行进一步理解,...
-
21
大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 Cacher 部分的源码,配备源码进行进一步理解,可...
-
8
大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 Generic API Server 部分的源码,配备源码进行进...
-
13
大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 Aggregator Server 部分的源码,配备源码进行进一...
-
3
大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 Master Server 部分的源码,配备源码进行进一步理...
-
3
大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 CRD 部分的源码,配备源码进行进一步理解,可以加...
-
4
apiserver 核心职责 提供Kubernetes API 代理集群组件,比如Kubernetes dashboard、流式日志、kubectl exec 会话 声明式API 命令式命令行操作,比如直接 kubectl run
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK