KubeVela 代理网关 Cluster Gateway 实现

KubeVela 的多集群管理依赖于 Cluster Gateway 组件,在 KubeVela 的 Helm Chart 中会自动安装。KubeVela 并不会直连集群,而是必须通过 Cluster Gateway 连接集群进行管理。

包括集群管理在内的功能都是依赖于 Cluster Gateway 实现的,所以 Cluster Gateway 是 KubeVela 多集群管理必不可少的一个组件。

KubeVela 通过 Cluster Gateway 访问集群

Cluster Gateway 使用的 apiserver 原生扩展接口 apiserver-aggregation 实现的代理功能。通过向 API Server 注册外部 API,可以将管理平面的 API Server 作为纳管集群的访问的入口,还可以避免内部集群的暴露问题。

Cluster Gateway 向 API Server 注册了以下接口:

Cluster Gateway 代理路径
1
/apis/cluster.core.oam.dev/v1alpha1/clustergateways/{clusterName}/proxy/{api}

接口中的参数如下:

  • clusterName:目标集群名;
  • api:转发到目标集群的 API Server 的请求路径。

入口和授权

通过查看应用注册的 Kubernetes 资源,可以大体了解到应用的权限要求、启动方式和依赖等,对理解应用原理和找到程序入口有很大帮助。

注册 API Server Aggregation

注册 API Server 扩展接口很简单,只需要创建一个 APIService 对象就可以实现:

注册 AA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
name: v1alpha1.cluster.core.oam.dev
labels:
api: cluster-extension-apiserver
apiserver: "true"
spec:
version: v1alpha1
group: cluster.core.oam.dev
groupPriorityMinimum: 2000
service:
name: gateway-service
namespace: {{ .Release.Namespace }}
port: 9443
versionPriority: 10
insecureSkipTLSVerify: true

这个配置文件,注册了一个版本为 v1alpha1API Group,组名为 cluster.core.oam.dev。在请求 API Server 的 /apis/cluster.core.oam.dev/v1alpha1/ 接口时,请求会被转发到 APIService 声明的 Service 中(示例中是 gateway-service 服务)。

代码中,在构造 API Server 的 Builder 中注册了 ClusterGateway 这个资源:

构造 ApiServer
1
2
3
4
5
6
7
8
9
10
func main() {

// registering metrics
metrics.Register()

cmd, err := builder.APIServer.
// +kubebuilder:scaffold:resource-register
WithResource(&clusterv1alpha1.ClusterGateway{}).
//...
}

ClusterGateway 注册了资源名 clustergateways,对应的处理代理请求的逻辑在 ClusterGatewayProxy 中:

pkg/apis/cluster/v1alpha1/clustergateway_types.go:146
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (in *ClusterGateway) GetGroupVersionResource() schema.GroupVersionResource {
return schema.GroupVersionResource{
Group: config.MetaApiGroupName,
Version: config.MetaApiVersionName,
Resource: config.MetaApiResourceName,
}
}
var MetaApiResourceName = "clustergateways"

func (in *ClusterGateway) GetArbitrarySubResources() []resource.ArbitrarySubResource {
return []resource.ArbitrarySubResource{
&ClusterGatewayProxy{},
&ClusterGatewayHealth{},
}
}

授权

Cluster Gateway 还创建了以下授权配置:

  • ClusterRolecluster-gateway:proxy 访问代理接口的角色;
  • ClusterRoleBinding:将上面的角色授权给 kubevela:client 组和 SA。

这个配置可以让 KubeVela 和 CLI 用户能使用这个资源接口访问集群。

Cluster Gateway 代码实现

由前面注册 APIService 部分可以知道,ClusterGatewayProxy 结构是处理代理逻辑的地方,可以说是 Cluster Gateway 业务逻辑的入口。

ClusterGatewayProxy 实现了 Connecter 接口,当连接进来时会调用对应的 Connect() 方法,在 Connect() 方法中根据请求信息获取 ClusterGateway 信息并构造一个 proxyHandler 对象:

Connect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func (c *ClusterGatewayProxy) Connect(ctx context.Context, id string, options runtime.Object, r registryrest.Responder) (http.Handler, error) {
proxyOpts, ok := options.(*ClusterGatewayProxyOptions)
if !ok {
return nil, fmt.Errorf("invalid options object: %#v", options)
}

// 从 Secrets 中获取集群信息,id 是集群名
parentStorage, ok := contextutil.GetParentStorageGetter(ctx)
if !ok {
return nil, fmt.Errorf("no parent storage found")
}
parentObj, err := parentStorage.Get(ctx, id, &metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("no such cluster %v", id)
}
clusterGateway := parentObj.(*ClusterGateway)

reqInfo, _ := request.RequestInfoFrom(ctx)
factory := request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
proxyReqInfo, _ := factory.NewRequestInfo(&http.Request{
URL: &url.URL{
Path: proxyOpts.Path,
},
Method: strings.ToUpper(reqInfo.Verb),
})
proxyReqInfo.Verb = reqInfo.Verb

// 校验权限
if config.AuthorizateProxySubpath {
user, _ := request.UserFrom(ctx)
var attr authorizer.Attributes
if proxyReqInfo.IsResourceRequest {
attr = authorizer.AttributesRecord{
User: user,
APIGroup: proxyReqInfo.APIGroup,
APIVersion: proxyReqInfo.APIVersion,
Resource: proxyReqInfo.Resource,
Subresource: proxyReqInfo.Subresource,
Namespace: proxyReqInfo.Namespace,
Name: proxyReqInfo.Name,
Verb: proxyReqInfo.Verb,
}
} else {
path, _ := url.ParseRequestURI(proxyReqInfo.Path)
attr = authorizer.AttributesRecord{
User: user,
Path: path.Path,
Verb: proxyReqInfo.Verb,
}
}

decision, reason, err := loopback.GetAuthorizer().Authorize(ctx, attr)
if err != nil {
return nil, errors.Wrapf(err, "authorization failed due to %s", reason)
}
if decision != authorizer.DecisionAllow {
return nil, fmt.Errorf("proxying by user %v is forbidden authorization failed", user.GetName())
}
}

// 返回代理实例
return &proxyHandler{
parentName: id,
path: proxyOpts.Path,
impersonate: proxyOpts.Impersonate,
clusterGateway: clusterGateway,
responder: r,
finishFunc: func(code int) {
metrics.RecordProxiedRequestsByResource(proxyReqInfo.Resource, proxyReqInfo.Verb, code)
metrics.RecordProxiedRequestsByCluster(id, code)
},
}, nil

proxyHandler 是一个实现了 http.Handler 接口的结构,接着将调用其 ServeHTTP() 方法来处理请求。

ServeHTTP() 方法中主要处理了以下逻辑:

  • 复制 request,作为转发请求,同时处理 URL 的变化;
  • 使用集群配置构造 RoundTripper,构造的 RoundTripper 支持处理授权和用户伪装的功能(Cluster Gateway 支持用户伪装,这个功能另外讨论,此处不深究);
  • 最后,利用 Kubernetes 提供的 UpgradeAwareHandler 实现代理功能。

UpgradeAwareHandler 代理

UpgradeAwareHandler 基于 http.ReverseProxy 实现:

UpgradeAwareHandler 代理实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// WebSocket 处理,如能升级,直接处理并返回
if h.tryUpgrade(w, req) {
return
}
if h.UpgradeRequired {
h.Responder.Error(w, req, errors.NewBadRequest("Upgrade request required"))
return
}

loc := *h.Location
loc.RawQuery = req.URL.RawQuery

// If original request URL ended in '/', append a '/' at the end of the
// of the proxy URL
if !strings.HasSuffix(loc.Path, "/") && strings.HasSuffix(req.URL.Path, "/") {
loc.Path += "/"
}

proxyRedirect := proxyRedirectsforRootPath(loc.Path, w, req)
if proxyRedirect {
return
}

if h.Transport == nil || h.WrapTransport {
h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
}

// WithContext creates a shallow clone of the request with the same context.
newReq := req.WithContext(req.Context())
newReq.Header = utilnet.CloneHeader(req.Header)
if !h.UseRequestLocation {
newReq.URL = &loc
}
if h.UseLocationHost {
// exchanging req.Host with the backend location is necessary for backends that act on the HTTP host header (e.g. API gateways),
// because req.Host has preference over req.URL.Host in filling this header field
newReq.Host = h.Location.Host
}

// create the target location to use for the reverse proxy
reverseProxyLocation := &url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host}
if h.AppendLocationPath {
reverseProxyLocation.Path = h.Location.Path
}

proxy := httputil.NewSingleHostReverseProxy(reverseProxyLocation)
proxy.Transport = h.Transport
proxy.FlushInterval = h.FlushInterval
proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags)
if h.RejectForwardingRedirects {
oldModifyResponse := proxy.ModifyResponse
proxy.ModifyResponse = func(response *http.Response) error {
code := response.StatusCode
if code >= 300 && code <= 399 && len(response.Header.Get("Location")) > 0 {
// close the original response
response.Body.Close()
msg := "the backend attempted to redirect this request, which is not permitted"
// replace the response
*response = http.Response{
StatusCode: http.StatusBadGateway,
Status: fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)),
Body: io.NopCloser(strings.NewReader(msg)),
ContentLength: int64(len(msg)),
}
} else {
if oldModifyResponse != nil {
if err := oldModifyResponse(response); err != nil {
return err
}
}
}
return nil
}
}
// ... 略
proxy.ServeHTTP(w, newReq)
}

KubeVela 请求 Cluster Gateway

在 KubeVela 中大致有三种需要操作集群的场景:

  • Controller:KubeVela 的核心 Controller,在执行应用管理和下发动作时需要连接集群进行操作;
  • KubeVela API Server:当安装 VelaUX 插件时会安装一个 API 服务,这个服务封装了 KubeVela 的功能并提供接口给 VelaUX 前端使用,在管理集群时也会连接 Cluster Gateway;
  • CLI:用户在使用 CLI 管理集群时(比如纳管集群),会通过 Kubernetes APIServer 连接 Cluster Gateway。

下面根据这三个场景,分别研究一下是怎么实现调用 Cluster Gateway 的。

KubeVela Controller

KubeVela Controller 的启动入口 run() 函数中,使用 ctrl.NewManager() 函数来创建 Manager,这个 Manager 用来保存一些共享的对象,比如 CachesClients,通常创建 Controller 都需要一个 Manager

cmd/core/app/server.go:140
1
2
3
4
5
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
// ... 略
NewClient: velaclient.DefaultNewControllerClient,
NewCache: sharding.BuildCache(scheme, &v1beta1.Application{}, &v1beta1.ApplicationRevision{}, &v1beta1.ResourceTracker{}),
})

调用 ctrl.NewManager() 时指定了 Client 的工厂方法为 velaclient.DefaultNewControllerClient,KubeVela 会创建定制的多集群客户端,也会在创建 Kubernetes 客户端时增加一些处理逻辑。

DefaultNewControllerClient() -> NewDefaultClient() -> pkgmulticluster.NewClient(),最后调用的 NewClient() 会使用 NewTransportWrapper() 获取一个 multicluster.Transport 并包装到 Client 配置中:

github.com/kubevela/pkg@v0.0.0-20230118103503-4a6096e79c1c/multicluster/client.go:135
1
2
3
4
5
func NewClient(config *rest.Config, options ClientOptions) (client.Client, error) {
wrapped := rest.CopyConfig(config)
wrapped.Wrap(NewTransportWrapper())
// ...略
}

multicluster.Transport 会获取请求的目标集群,如果非本地集群会对请求路径进行修改:

github.com/kubevela/pkg@v0.0.0-20230118103503-4a6096e79c1c/multicluster/transport.go:118
1
2
3
4
5
6
7
8
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
cluster := t.getClusterFor(req)
if !IsLocal(cluster) {
req = req.Clone(req.Context())
req.URL.Path = formatProxyURL(cluster, req.URL.Path)
}
return t.delegate.RoundTrip(req)
}

formatProxyURL() 函数将在原有 URL 前面增加集群代理的路径:

github.com/kubevela/pkg@v0.0.0-20230118103503-4a6096e79c1c/multicluster/transport.go:93
1
2
3
4
5
6
7
8
9
10
11
12
func formatProxyURL(cluster, originalPath string) string {
originalPath = strings.TrimPrefix(originalPath, "/")
return path.Clean(strings.Join([]string{
"/apis",
clustergatewayconfig.MetaApiGroupName,
clustergatewayconfig.MetaApiVersionName,
clustergatewayconfig.MetaApiResourceName,
cluster,
"proxy",
originalPath,
}, "/"))
}

如集群名为 cluster1,那么增加的前缀路径是:

1
/apis/cluster.core.oam.dev/v1alpha1/clustergateways/cluster1/proxy/

这样请求就会被管理平面的 APIServer 接收到并根据 APIServer Aggregation 配置转发给 Cluster Gateway 处理。

KubeVela API Server

KubeVela 的 API Server 并不是 kube-apiserver 类型的服务,而单纯只是一个 KubeVela 的 API 服务而已,通常提供给前端 VelaUX 使用。

当 KubeVela 的 API Server 启动时,运行通过以下调用链后,来到 NewClient() 函数:

server.Run() -> run() -> restServer.Run() -> s.buildIoCContainer() -> clients.GetKubeClient() -> pkgmulticluster.NewClient()

pkgmulticluster.NewClient 方法就是上面 KubeVela Controller 用来创建 Client 的方法,内部包装了 multicluster.Transport 用于修改请求的路径,将请求通过 APIServer 转发到 Cluster Gateway,这里不再细说。

KubeVela CLI

vela-cli 其实也是相同的实现,最终都是通过 pkgmulticluster.NewClient() 方法创建 Client,将请求通过 APIServer 转发到 Cluster Gateway 来实现。

CLI 这里特别注意的点

CLI 访问 APIServer 的请求(包括连接纳管集群的请求)都是从本地发出的,并且是请求的当前 ~/.kube/config 里配置的集群 Host 的域名。如果集群 Host 有 Context Path 会被去掉。如 ~/.kube/config里配置的集群 Host 如下:

1
https://imoe.tech/apiserver

处理后的请求的是:

1
https://imoe.tech/apis/cluster.core.oam.dev/v1alpha1/clustergateways/cluster1/proxy/

如果代理的时候是通过 Context Path 进行分流代理到不同 APIServer 的话,这里需要特别注意,大概率报错。

创建 Client 的逻辑在每个命令对应的 CommandRunE() 函数中实现,以集群纳管命令 vela cluster join 为例,创建代码在:

NewClusterJoinCommand() -> RunE() —> c.GetClient() -> pkgmulticluster.NewClient()

逻辑和之前介绍的相同,这里不详细讨论了。

总结

Cluster Gateway 利用 apiserver 的 apiserver-aggregation 扩展功能和 UpgradeAwareHandler 代理转发工具实现了访问 APIServer 就可以实现访问纳管集群功能。

KubeVela 的三个组件都是通过自定义 Transport 修改 Kubernetes Client 的请求路径,实现将请求转发到 Cluster Gateway 的 APIServer 扩展点进而访问纳管集群的管理功能。

引用

  1. apiserver-aggregation
作者

Jakes Lee

发布于

2023-04-26

更新于

2023-05-04

许可协议

评论