3

kubernetes扩展apiserver实现分析

 1 year ago
source link: https://qingwave.github.io/kube-apiserver-aggretation-api/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Apr 24, 2020 · cloud

kubernetes扩展apiserver实现分析



Kubernetes 提供了丰富的扩展功能,实现自定义资源有两种方式CRDAggregation API。相对于CRD,扩展 API 功能更丰富,可以实现单独的存储。今天来聊一聊,k8s 是如是实现扩展 api 的,它与 apiserver 之间又是如何协作的

AggregationApiserver 介绍

Aggregator类似于一个七层负载均衡,将来自用户的请求拦截转发给其他服务器,并且负责整个 APIServer 的 Discovery 功能。

通过APIServices对象关联到某个Service来进行请求的转发,其关联的Service类型进一步决定了请求转发形式。Aggregator包括一个GenericAPIServer和维护自身状态的Controller。其中 GenericAPIServer主要处理apiregistration.k8s.io组下的APIService资源请求。

主要 controller 包括:

  1. apiserviceRegistrationController:负责APIServices中资源的注册与删除;
  2. availableConditionController:维护APIServices的可用状态,包括其引用Service是否可用等;
  3. autoRegistrationController:用于保持 API 中存在的一组特定的APIServices
  4. crdRegistrationController:负责将CRD GroupVersions自动注册到APIServices中;
  5. openAPIAggregationController:将APIServices资源的变化同步至提供的OpenAPI文档;

在 kube-apiserver 中需要增加以下配置来开启 API Aggregation:

--proxy-client-cert-file=/etc/kubernetes/certs/proxy.crt
--proxy-client-key-file=/etc/kubernetes/certs/proxy.key
--requestheader-client-ca-file=/etc/kubernetes/certs/proxy-ca.crt
--requestheader-extra-headers-prefix=X-Remote-Extra-
--requestheader-group-headers=X-Remote-Group
--requestheader-username-headers=X-Remote-User

如果 kube-proxy 没有和 API server 运行在同一台主机上,那么需要确保启用了如下 apiserver 标记:

--enable-aggregator-routing=true

apiserver 启动流程中,分析了AggregationApiserver的初始化流程, 需要了解的可以回去看下。

AggregationApiserver 认证流程

与自定义资源定义(CRD)不同,除标准的 Kubernetes apiserver 外,Aggregation API 还涉及另一个服务器:扩展 apiserver。Kubernetes apiserver 将需要与您的扩展 apiserver 通信,并且您的扩展 apiserver 也需要与 Kubernetes apiserver 通信。为了确保此通信的安全,Kubernetes apiserver 使用 x509 证书向扩展 apiserver 认证。

AggregationApi 的请求链路如下:

defaultHandlerChain->aggregator->aggregation-apiserver->aggregator->user

大致流程如下:

  1. Kubernetes apiserver:对发出请求的用户身份认证,并对请求的 API 路径执行鉴权。
  2. Kubernetes apiserver:将请求转发到扩展 apiserver
  3. 扩展 apiserver:认证来自 Kubernetes apiserver 的请求
  4. 扩展 apiserver:对来自原始用户的请求鉴权
  5. 扩展 apiserver:执行对应操作返回

如图所示: aggregation-apiserver-auth

apiserver 与扩展 apiserver 通过证书认证,

  • apiserver 配置porxy-client证书(使用 requestheader 根证书签发),扩展 apiserver 配置reqeustheader根证书,如果没配置,会默认从 configmap kube-system/extension-apiserver-authentication 去找
  • 扩展 apiserver 通过extension-apiserver-authentication获取 apiserver 的client-ca,生成证书对,apiserver 可以使用client-ca验证它
  • 由于 apiserver->扩展 apiserver 通过reqeustheader方式认证,apiserver 会将接受到的请求经过认证,转换为 header,扩展 apiserver 通过 header 获取用户,再通过 apiserver 接口做权限校验。

有同学有疑问,为什么这里需要做两次认证,两次鉴权。这是由于扩展 apiserveer 是一个单独的服务器,如果接受非 apiserver 的请求也是需要做认证鉴权的。那能不能认证是 apiserver 后就不做鉴权了呢,这得需要 apiserver 在转发请求时加入鉴权信息就行。

AggregationApiserver 处理流程

apiserver 处理逻辑

在 apiserver 认证时,认证接受会将认证信息删除, 可参考前面的[apiserver 认证源码分析]

处理逻辑如下:

  1. 通过context获取 user 信息
  2. 构造请求,删除 reqeustheader 信息,通过 user 重新填充
  3. 通过proxyRoundTripper转发请求

(kube-apiserver-authentication-code.md) aggregation 的hander的实现:

// 通过context获取user
	user, ok := genericapirequest.UserFrom(req.Context())
	if !ok {
		proxyError(w, req, "missing user", http.StatusInternalServerError)
		return
  }
  // 构造请求url,通过apiservice配置的service/namespace随机得到某个endpoint后端
  location := &url.URL{}
	location.Scheme = "https"
	rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName, handlingInfo.servicePort)
	if err != nil {
		klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err)
		proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
		return
	}
	location.Host = rloc.Host
	location.Path = req.URL.Path
  location.RawQuery = req.URL.Query().Encode()

  // we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
  // 包裹请求信息,将user信息放到header中
	proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, handlingInfo.proxyRoundTripper, req)
	if err != nil {
		proxyError(w, req, err.Error(), http.StatusInternalServerError)
		return
	}
  proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)

  // 调用后端
  handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
	handler.ServeHTTP(w, newReq)

根据扩展 apiserver 找到后端时通过 service 获取对应 endpoint 列表,随机选择某个 endpoint、 实现如下:

// ResourceLocation returns a URL to which one can send traffic for the specified service.
func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string, port int32) (*url.URL, error) {
	svc, err := services.Services(namespace).Get(id)
	if err != nil {
		return nil, err
	}

	svcPort, err := findServicePort(svc, port)
	if err != nil {
		return nil, err
	}

	switch {
	case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort:
		// these are fine
	default:
		return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type)
	}

	eps, err := endpoints.Endpoints(namespace).Get(svc.Name)
	if err != nil {
		return nil, err
	}
	if len(eps.Subsets) == 0 {
		return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svc.Name))
	}

	// Pick a random Subset to start searching from.
	ssSeed := rand.Intn(len(eps.Subsets))

	// Find a Subset that has the port.
	for ssi := 0; ssi < len(eps.Subsets); ssi++ {
		ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
		if len(ss.Addresses) == 0 {
			continue
		}
		for i := range ss.Ports {
			if ss.Ports[i].Name == svcPort.Name {
				// Pick a random address.
				// 核心,随机选择endpoint
				ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP
				port := int(ss.Ports[i].Port)
				return &url.URL{
					Scheme: "https",
					Host:   net.JoinHostPort(ip, strconv.Itoa(port)),
				}, nil
			}
		}
	}
	return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}

ProxyRoundTripper 创建在round_trippers.go

func NewAuthProxyRoundTripper(username string, groups []string, extra map[string][]string, rt http.RoundTripper) http.RoundTripper {
	return &authProxyRoundTripper{
		username: username,
		groups:   groups,
		extra:    extra,
		rt:       rt,
	}
}

func (rt *authProxyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  req = utilnet.CloneRequest(req)
  // 包裹user信息
	SetAuthProxyHeaders(req, rt.username, rt.groups, rt.extra)

	return rt.rt.RoundTrip(req)
}

// SetAuthProxyHeaders stomps the auth proxy header fields.  It mutates its argument.
func SetAuthProxyHeaders(req *http.Request, username string, groups []string, extra map[string][]string) {
  // 清楚原始url的requestheader信息
	req.Header.Del("X-Remote-User")
	req.Header.Del("X-Remote-Group")
	for key := range req.Header {
		if strings.HasPrefix(strings.ToLower(key), strings.ToLower("X-Remote-Extra-")) {
			req.Header.Del(key)
		}
	}

  // 通过user重新填充信息
	req.Header.Set("X-Remote-User", username)
	for _, group := range groups {
		req.Header.Add("X-Remote-Group", group)
	}
	for key, values := range extra {
		for _, value := range values {
			req.Header.Add("X-Remote-Extra-"+headerKeyEscape(key), value)
		}
	}
}

扩展 apiserver 处理逻辑

下以 metrics-server 为例说明扩展 apiserver 在收到 apiserver 请求后的处理

与 apiserver 初始化相同,metrics-server 也需要初始化生成genericServer, 然后注册 apigroup pkg/metrics-server/config.go

func (c Config) Complete() (*MetricsServer, error) {
	informer, err := c.informer()
	if err != nil {
		return nil, err
	}
	kubeletClient, err := c.kubeletClient()
	if err != nil {
		return nil, err
	}
	addressResolver := c.addressResolver()

	// 创建scraper,负责抓取监控数据
	scrape := scraper.NewScraper(informer.Core().V1().Nodes().Lister(), kubeletClient, addressResolver, c.ScrapeTimeout)

	scraper.RegisterScraperMetrics(c.ScrapeTimeout)
	RegisterServerMetrics(c.MetricResolution)

	// 生成genericServer, 包裹有 DefaultBuildHandlerChain
	genericServer, err := c.Apiserver.Complete(informer).New("metrics-server", genericapiserver.NewEmptyDelegate())
	if err != nil {
		return nil, err
	}

	store := storage.NewStorage()
	// 注册api
	if err := api.Install(store, informer.Core().V1(), genericServer); err != nil {
		return nil, err
	}
	return &MetricsServer{
		GenericAPIServer: genericServer,
		storage:          store,
		scraper:          scrape,
		resolution:       c.MetricResolution,
	}, nil
}

api 注册代码,通过Build生成 apigroup,调用InstallAPIGroup进行注册 pkg/api/install.go

// InstallStorage builds the metrics for the metrics.k8s.io API, and then installs it into the given API metrics-server.
func Install(metrics MetricsGetter, informers coreinf.Interface, server *genericapiserver.GenericAPIServer) error {
	info := Build(metrics, informers)
	// 注册apigroup
	return server.InstallAPIGroup(&info)
}

// Build constructs APIGroupInfo the metrics.k8s.io API group using the given getters.
func Build(m MetricsGetter, informers coreinf.Interface) genericapiserver.APIGroupInfo {
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(metrics.GroupName, Scheme, metav1.ParameterCodec, Codecs)

	// 注册metrics相关api
	node := newNodeMetrics(metrics.Resource("nodemetrics"), m, informers.Nodes().Lister())
	pod := newPodMetrics(metrics.Resource("podmetrics"), m, informers.Pods().Lister())
	metricsServerResources := map[string]rest.Storage{
		"nodes": node,
		"pods":  pod,
	}
	apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = metricsServerResources

	return apiGroupInfo
}

同 apiserver,metrics-server 收到请求后会经过DefaultBuildHandlerChain

  • 认证,从 apiserver 转发来的请求是reqeustheader形式,metrics-server 会使用requestheader-ca验证证书
  • 鉴权,同 apiserver 一样

注意, 如果 apiserver 未配置proxy-client证书,metrics-server 认证不通过,即使 apiserver 认证通过,metrics-server 也会认为是匿名用户system:anonymous

最后,metrics-server 执行具体逻辑,返回结果。

扩容 apiserver 的创建,处理流程与 apiserver 完全一样,可以直接调用 apiserver 的库,扩展 apiserver 直接处理请求,不需要经过 webhook,性能更好,更强大的是完全不使用 etcd,替换成时序数据库或者其他数据库。后续可以分析下 CRD 与扩展 apiserver 的区别以及使用场景。


Recommend

  • 32

    本文通过记录kubelet连接apiserver超时问题的原因及修复办法。 上篇文章回顾:一文读懂HBase多租户 背 景 kubernetes是ma...

  • 30

    Kubernetes ApiServer 并发安全机制 30 January 2020 笔者在这篇文章中想要探讨的问题是:当向 Kubernetes ApiServer 并发的发起多个请求,对同一个 API 资源对象进行更新时,Kubernetes Ap...

  • 8

    大家好,我是杨鼎睿,这一次给大家带来的是 ETCD 的源码阅读。本文写就时是三部分,方便大家阅读,合成一篇,分别是 Server 篇, Storage 篇和 Utility 篇。 本文研究了 ETCD 部分的源码,配备源码进行进一步理解,可以加深理解,增强相关设计能力。...

  • 9

    大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 API Group 部分的源码,配备源码进行进一步理解,...

  • 21

    大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 Cacher 部分的源码,配备源码进行进一步理解,可...

  • 8

    大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 Generic API Server 部分的源码,配备源码进行进...

  • 13

    大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 Aggregator Server 部分的源码,配备源码进行进一...

  • 3

    大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 Master Server 部分的源码,配备源码进行进一步理...

  • 3

    大家好,我是杨鼎睿,这一次给大家带来的是 API Server 的源码阅读。包括之前的 etcd 源码阅读,整个 API Server 共 109 张源码及源码图,文章最后有 API Server 系列目录。欢迎大家的阅读。 本文研究了 CRD 部分的源码,配备源码进行进一步理解,可以加...

  • 4
    • qiankunli.github.io 2 years ago
    • Cache

    Kubernetes源码分析——apiserver

    apiserver 核心职责 提供Kubernetes API 代理集群组件,比如Kubernetes dashboard、流式日志、kubectl exec 会话 声明式API 命令式命令行操作,比如直接 kubectl run

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK