KubeVela 的多集群管理依赖于 Cluster Gateway 组件,在 KubeVela 的 Helm Chart 中会自动安装。KubeVela 并不会直连集群,而是必须通过 Cluster Gateway 连接集群进行管理。
包括集群管理在内的功能都是依赖于 Cluster Gateway 实现的,所以 Cluster Gateway 是 KubeVela 多集群管理必不可少的一个组件。
Cluster Gateway 使用的 apiserver 原生扩展接口 apiserver-aggregation 实现的代理功能。通过向 API Server 注册外部 API,可以将管理平面的 API Server 作为纳管集群的访问的入口,还可以避免内部集群的暴露问题。
Cluster Gateway 向 API Server 注册了以下接口:
Cluster Gateway 代理路径 1 /apis/cluster.core.oam.dev/v1alpha1/clustergateways/{clusterName}/proxy/{api}
接口中的参数如下:
clusterName
:目标集群名;
api
:转发到目标集群的 API Server 的请求路径。
入口和授权 通过查看应用注册的 Kubernetes 资源,可以大体了解到应用的权限要求、启动方式和依赖等,对理解应用原理和找到程序入口有很大帮助。
注册 API Server Aggregation 注册 API Server 扩展接口很简单,只需要创建一个 APIService 对象就可以实现:
注册 AA 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 apiVersion: apiregistration.k8s.io/v1 kind: APIService metadata: name: v1alpha1.cluster.core.oam.dev labels: api: cluster-extension-apiserver apiserver: "true" spec: version: v1alpha1 group: cluster.core.oam.dev groupPriorityMinimum: 2000 service: name: gateway-service namespace: {{ .Release.Namespace }} port: 9443 versionPriority: 10 insecureSkipTLSVerify: true
这个配置文件,注册了一个版本为 v1alpha1
的 API Group
,组名为 cluster.core.oam.dev
。在请求 API Server 的 /apis/cluster.core.oam.dev/v1alpha1/
接口时,请求会被转发到 APIService
声明的 Service
中(示例中是 gateway-service
服务)。
代码中,在构造 API Server 的 Builder 中注册了 ClusterGateway
这个资源:
构造 ApiServer 1 2 3 4 5 6 7 8 9 10 func main () { metrics.Register() cmd, err := builder.APIServer. WithResource(&clusterv1alpha1.ClusterGateway{}). }
ClusterGateway
注册了资源名 clustergateways
,对应的处理代理请求的逻辑在 ClusterGatewayProxy
中:
pkg/apis/cluster/v1alpha1/clustergateway_types.go:146 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (in *ClusterGateway) GetGroupVersionResource() schema.GroupVersionResource { return schema.GroupVersionResource{ Group: config.MetaApiGroupName, Version: config.MetaApiVersionName, Resource: config.MetaApiResourceName, } } var MetaApiResourceName = "clustergateways" func (in *ClusterGateway) GetArbitrarySubResources() []resource.ArbitrarySubResource { return []resource.ArbitrarySubResource{ &ClusterGatewayProxy{}, &ClusterGatewayHealth{}, } }
授权 Cluster Gateway 还创建了以下授权配置:
ClusterRole
:cluster-gateway:proxy
访问代理接口的角色;
ClusterRoleBinding
:将上面的角色授权给 kubevela:client
组和 SA。
这个配置可以让 KubeVela 和 CLI 用户能使用这个资源接口访问集群。
Cluster Gateway 代码实现 由前面注册 APIService
部分可以知道,ClusterGatewayProxy
结构是处理代理逻辑的地方,可以说是 Cluster Gateway 业务逻辑的入口。
ClusterGatewayProxy
实现了 Connecter
接口,当连接进来时会调用对应的 Connect()
方法,在 Connect()
方法中根据请求信息获取 ClusterGateway
信息并构造一个 proxyHandler
对象:
Connect 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 func (c *ClusterGatewayProxy) Connect(ctx context.Context, id string , options runtime.Object, r registryrest.Responder) (http.Handler, error ) { proxyOpts, ok := options.(*ClusterGatewayProxyOptions) if !ok { return nil , fmt.Errorf("invalid options object: %#v" , options) } parentStorage, ok := contextutil.GetParentStorageGetter(ctx) if !ok { return nil , fmt.Errorf("no parent storage found" ) } parentObj, err := parentStorage.Get(ctx, id, &metav1.GetOptions{}) if err != nil { return nil , fmt.Errorf("no such cluster %v" , id) } clusterGateway := parentObj.(*ClusterGateway) reqInfo, _ := request.RequestInfoFrom(ctx) factory := request.RequestInfoFactory{ APIPrefixes: sets.NewString("api" , "apis" ), GrouplessAPIPrefixes: sets.NewString("api" ), } proxyReqInfo, _ := factory.NewRequestInfo(&http.Request{ URL: &url.URL{ Path: proxyOpts.Path, }, Method: strings.ToUpper(reqInfo.Verb), }) proxyReqInfo.Verb = reqInfo.Verb if config.AuthorizateProxySubpath { user, _ := request.UserFrom(ctx) var attr authorizer.Attributes if proxyReqInfo.IsResourceRequest { attr = authorizer.AttributesRecord{ User: user, APIGroup: proxyReqInfo.APIGroup, APIVersion: proxyReqInfo.APIVersion, Resource: proxyReqInfo.Resource, Subresource: proxyReqInfo.Subresource, Namespace: proxyReqInfo.Namespace, Name: proxyReqInfo.Name, Verb: proxyReqInfo.Verb, } } else { path, _ := url.ParseRequestURI(proxyReqInfo.Path) attr = authorizer.AttributesRecord{ User: user, Path: path.Path, Verb: proxyReqInfo.Verb, } } decision, reason, err := loopback.GetAuthorizer().Authorize(ctx, attr) if err != nil { return nil , errors.Wrapf(err, "authorization failed due to %s" , reason) } if decision != authorizer.DecisionAllow { return nil , fmt.Errorf("proxying by user %v is forbidden authorization failed" , user.GetName()) } } return &proxyHandler{ parentName: id, path: proxyOpts.Path, impersonate: proxyOpts.Impersonate, clusterGateway: clusterGateway, responder: r, finishFunc: func (code int ) { metrics.RecordProxiedRequestsByResource(proxyReqInfo.Resource, proxyReqInfo.Verb, code) metrics.RecordProxiedRequestsByCluster(id, code) }, }, nil
proxyHandler
是一个实现了 http.Handler
接口的结构,接着将调用其 ServeHTTP()
方法来处理请求。
在 ServeHTTP()
方法中主要处理了以下逻辑:
复制 request
,作为转发请求,同时处理 URL 的变化;
使用集群配置构造 RoundTripper
,构造的 RoundTripper
支持处理授权和用户伪装的功能(Cluster Gateway 支持用户伪装,这个功能另外讨论,此处不深究);
最后,利用 Kubernetes 提供的 UpgradeAwareHandler
实现代理功能。
UpgradeAwareHandler 代理 UpgradeAwareHandler
基于 http.ReverseProxy
实现:
UpgradeAwareHandler 代理实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if h.tryUpgrade(w, req) { return } if h.UpgradeRequired { h.Responder.Error(w, req, errors.NewBadRequest("Upgrade request required" )) return } loc := *h.Location loc.RawQuery = req.URL.RawQuery if !strings.HasSuffix(loc.Path, "/" ) && strings.HasSuffix(req.URL.Path, "/" ) { loc.Path += "/" } proxyRedirect := proxyRedirectsforRootPath(loc.Path, w, req) if proxyRedirect { return } if h.Transport == nil || h.WrapTransport { h.Transport = h.defaultProxyTransport(req.URL, h.Transport) } newReq := req.WithContext(req.Context()) newReq.Header = utilnet.CloneHeader(req.Header) if !h.UseRequestLocation { newReq.URL = &loc } if h.UseLocationHost { newReq.Host = h.Location.Host } reverseProxyLocation := &url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host} if h.AppendLocationPath { reverseProxyLocation.Path = h.Location.Path } proxy := httputil.NewSingleHostReverseProxy(reverseProxyLocation) proxy.Transport = h.Transport proxy.FlushInterval = h.FlushInterval proxy.ErrorLog = log.New(noSuppressPanicError{}, "" , log.LstdFlags) if h.RejectForwardingRedirects { oldModifyResponse := proxy.ModifyResponse proxy.ModifyResponse = func (response *http.Response) error { code := response.StatusCode if code >= 300 && code <= 399 && len (response.Header.Get("Location" )) > 0 { response.Body.Close() msg := "the backend attempted to redirect this request, which is not permitted" *response = http.Response{ StatusCode: http.StatusBadGateway, Status: fmt.Sprintf("%d %s" , response.StatusCode, http.StatusText(response.StatusCode)), Body: io.NopCloser(strings.NewReader(msg)), ContentLength: int64 (len (msg)), } } else { if oldModifyResponse != nil { if err := oldModifyResponse(response); err != nil { return err } } } return nil } } proxy.ServeHTTP(w, newReq) }
KubeVela 请求 Cluster Gateway 在 KubeVela 中大致有三种需要操作集群的场景:
Controller
:KubeVela 的核心 Controller,在执行应用管理和下发动作时需要连接集群进行操作;
KubeVela API Server
:当安装 VelaUX 插件时会安装一个 API 服务,这个服务封装了 KubeVela 的功能并提供接口给 VelaUX 前端使用,在管理集群时也会连接 Cluster Gateway;
CLI
:用户在使用 CLI 管理集群时(比如纳管集群),会通过 Kubernetes APIServer 连接 Cluster Gateway。
下面根据这三个场景,分别研究一下是怎么实现调用 Cluster Gateway 的。
KubeVela Controller KubeVela Controller 的启动入口 run()
函数中,使用 ctrl.NewManager()
函数来创建 Manager
,这个 Manager
用来保存一些共享的对象,比如 Caches
和 Clients
,通常创建 Controller 都需要一个 Manager
。
cmd/core/app/server.go:140 1 2 3 4 5 mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ NewClient: velaclient.DefaultNewControllerClient, NewCache: sharding.BuildCache(scheme, &v1beta1.Application{}, &v1beta1.ApplicationRevision{}, &v1beta1.ResourceTracker{}), })
调用 ctrl.NewManager()
时指定了 Client
的工厂方法为 velaclient.DefaultNewControllerClient
,KubeVela 会创建定制的多集群客户端,也会在创建 Kubernetes 客户端时增加一些处理逻辑。
DefaultNewControllerClient() -> NewDefaultClient() -> pkgmulticluster.NewClient()
,最后调用的 NewClient()
会使用 NewTransportWrapper()
获取一个 multicluster.Transport
并包装到 Client
配置中:
github.com/kubevela/pkg@v0.0.0-20230118103503-4a6096e79c1c/multicluster/client.go:135 1 2 3 4 5 func NewClient (config *rest.Config, options ClientOptions) (client.Client, error ) { wrapped := rest.CopyConfig(config) wrapped.Wrap(NewTransportWrapper()) }
multicluster.Transport
会获取请求的目标集群,如果非本地集群会对请求路径进行修改:
github.com/kubevela/pkg@v0.0.0-20230118103503-4a6096e79c1c/multicluster/transport.go:118 1 2 3 4 5 6 7 8 func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error ) { cluster := t.getClusterFor(req) if !IsLocal(cluster) { req = req.Clone(req.Context()) req.URL.Path = formatProxyURL(cluster, req.URL.Path) } return t.delegate.RoundTrip(req) }
formatProxyURL()
函数将在原有 URL 前面增加集群代理的路径:
github.com/kubevela/pkg@v0.0.0-20230118103503-4a6096e79c1c/multicluster/transport.go:93 1 2 3 4 5 6 7 8 9 10 11 12 func formatProxyURL (cluster, originalPath string ) string { originalPath = strings.TrimPrefix(originalPath, "/" ) return path.Clean(strings.Join([]string { "/apis" , clustergatewayconfig.MetaApiGroupName, clustergatewayconfig.MetaApiVersionName, clustergatewayconfig.MetaApiResourceName, cluster, "proxy" , originalPath, }, "/" )) }
如集群名为 cluster1
,那么增加的前缀路径是:
1 /apis/cluster.core.oam.dev/v1alpha1/clustergateways/cluster1/proxy/
这样请求就会被管理平面的 APIServer 接收到并根据 APIServer Aggregation 配置转发给 Cluster Gateway 处理。
KubeVela API Server KubeVela 的 API Server 并不是 kube-apiserver 类型的服务,而单纯只是一个 KubeVela 的 API 服务而已,通常提供给前端 VelaUX 使用。
当 KubeVela 的 API Server 启动时,运行通过以下调用链后,来到 NewClient()
函数:
server.Run() -> run() -> restServer.Run() -> s.buildIoCContainer() -> clients.GetKubeClient() -> pkgmulticluster.NewClient()
pkgmulticluster.NewClient
方法就是上面 KubeVela Controller 用来创建 Client
的方法,内部包装了 multicluster.Transport
用于修改请求的路径,将请求通过 APIServer 转发到 Cluster Gateway,这里不再细说。
KubeVela CLI vela-cli 其实也是相同的实现,最终都是通过 pkgmulticluster.NewClient()
方法创建 Client
,将请求通过 APIServer 转发到 Cluster Gateway 来实现。
CLI 访问 APIServer 的请求(包括连接纳管集群的请求)都是从本地发出的,并且是请求的当前 ~/.kube/config
里配置的集群 Host 的域名。如果集群 Host 有 Context Path 会被去掉。如 ~/.kube/config
里配置的集群 Host 如下:
1 https://imoe.tech/apiserver
处理后的请求的是:
1 https://imoe.tech/apis/cluster.core.oam.dev/v1alpha1/clustergateways/cluster1/proxy/
如果代理的时候是通过 Context Path 进行分流代理到不同 APIServer 的话,这里需要特别注意,大概率报错。
创建 Client
的逻辑在每个命令对应的 Command
的 RunE()
函数中实现,以集群纳管命令 vela cluster join
为例,创建代码在:
NewClusterJoinCommand() -> RunE() —> c.GetClient() -> pkgmulticluster.NewClient()
逻辑和之前介绍的相同,这里不详细讨论了。
总结 Cluster Gateway 利用 apiserver 的 apiserver-aggregation 扩展功能和 UpgradeAwareHandler 代理转发工具实现了访问 APIServer 就可以实现访问纳管集群功能。
KubeVela 的三个组件都是通过自定义 Transport
修改 Kubernetes Client 的请求路径,实现将请求转发到 Cluster Gateway 的 APIServer 扩展点进而访问纳管集群的管理功能。
引用
apiserver-aggregation