Kubernetes 代码中的 UpgradeAwareHandler

UpgradeAwareHandler 是 Kubernetes 里很重要的一个代码组件,在 Kubernetes 中用于代理和转发请求。

只要是有转发请求的地方都可以见到他的身影:

  1. kubectl 的命令 exec/attach/log/port-forward 等需要连接到容器的长连接;
  2. APIServer Aggregation 功能,需要将请求转发到外部 APIServer。

第三方的集群网关组件也会利用这个组件来实现转发代理,如:Karmada、KubeVela Cluster Gateway 等。

为什么都使用这个组件来转发请求?本文通过阅读源码,深入研究这个组件的实现原理以及使用方式。

源码

UpgradeAwareHandler 是一个代理转发组件,从名字就可以知道这个组件不只是转发 HTTP 请求,同时对于需要执行 Upgrade 操作的请求如 WebSocket 或 SPDY 的协议也可以很好地支持。

UpgradeAwareHandler 最简单的使用方式是通过 NewUpgradeAwareHandler() 方法构造,下面是 APIServer 里实现请求转发时调用的方法:

pkg/registry/core/pod/rest/subresources.go:216
1
2
3
4
5
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
return handler
}

NewUpgradeAwareHandler() 有以下几个参数:

  • location:上游地址,如果是一个升级请求,会使用这个地址进行 dial;
  • Transport:用来提供自定义的 RoundTripper,通常用户认证等信息都是通过这个参数的 RoundTripper 来配置的;
  • wrapTransport:是否使用默认 RoundTripperTransport 进行包裹;
  • upgradeRequired:请求是否需要升级,如果为 true 但升级失败会返回错误;
  • responder:错误时的响应输出。

构造完成后,调用 handler 的 handler.ServeHTTP(w, req) 方法将请求交由 UpgradeAwareHandler 处理就可以实现代理请求的转发。

请求处理

ServeHTTP 方法分为两类处理:需升级的请求和其它请求,代码如下:

vendor/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go:213
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if h.tryUpgrade(w, req) {
return
}
// 强制升级但升级失败返回错误响应
if h.UpgradeRequired {
h.Responder.Error(w, req, errors.NewBadRequest("Upgrade request required"))
return
}

loc := *h.Location
loc.RawQuery = req.URL.RawQuery

// If original request URL ended in '/', append a '/' at the end of the
// of the proxy URL
if !strings.HasSuffix(loc.Path, "/") && strings.HasSuffix(req.URL.Path, "/") {
loc.Path += "/"
}

// 处理重定向
proxyRedirect := proxyRedirectsforRootPath(loc.Path, w, req)
if proxyRedirect {
return
}

// 如果没配置自定义 Transport 或开启了 WrapTransport 就使用默认 Transport
if h.Transport == nil || h.WrapTransport {
h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
}

// WithContext creates a shallow clone of the request with the same context.
newReq := req.WithContext(req.Context())
newReq.Header = utilnet.CloneHeader(req.Header)
if !h.UseRequestLocation {
newReq.URL = &loc
}
if h.UseLocationHost {
// exchanging req.Host with the backend location is necessary for backends that act on the HTTP host header (e.g. API gateways),
// because req.Host has preference over req.URL.Host in filling this header field
newReq.Host = h.Location.Host
}

// create the target location to use for the reverse proxy
reverseProxyLocation := &url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host}
if h.AppendLocationPath {
reverseProxyLocation.Path = h.Location.Path
}

// 构造代理
proxy := httputil.NewSingleHostReverseProxy(reverseProxyLocation)
proxy.Transport = h.Transport
proxy.FlushInterval = h.FlushInterval
proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags)
if h.RejectForwardingRedirects {
oldModifyResponse := proxy.ModifyResponse
proxy.ModifyResponse = func(response *http.Response) error {
code := response.StatusCode
if code >= 300 && code <= 399 && len(response.Header.Get("Location")) > 0 {
// close the original response
response.Body.Close()
msg := "the backend attempted to redirect this request, which is not permitted"
// replace the response
*response = http.Response{
StatusCode: http.StatusBadGateway,
Status: fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)),
Body: io.NopCloser(strings.NewReader(msg)),
ContentLength: int64(len(msg)),
}
} else {
if oldModifyResponse != nil {
if err := oldModifyResponse(response); err != nil {
return err
}
}
}
return nil
}
}
if h.Responder != nil {
// if an optional error interceptor/responder was provided wire it
// the custom responder might be used for providing a unified error reporting
// or supporting retry mechanisms by not sending non-fatal errors to the clients
proxy.ErrorHandler = h.Responder.Error
}

// 转发代理请求
proxy.ServeHTTP(w, newReq)
}

代码中首先就先调用 tryUpgrade() 尝试进行升级。对于不升级的请求,复制一个 Request 对象,使用 Golang 的内置代理工具 httputil.NewSingleHostReverseProxy 代理。

请求升级

所谓请求升级指的是协议升级机制,是 HTTP/1.1 提供的特殊机制,允许将一个已建立的连接升级成新的、不相容的协议。

注意点

HTTP/2 明确禁止使用此机制;这个机制只属于 HTTP/1.1。

在实践中,这种机制主要用于引导 WebSocket 连接,在 Kubernetes 中还用于引导 SPDY/3.1 协议。

包含 Upgrade 的典型请求类似于:

1
2
3
4
GET /index.html HTTP/1.1
Host: www.example.com
Connection: upgrade
Upgrade: example/1, foo/2

Kubernetes 中判断一个请求是 Upgrade 请求使用以下方法:

vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go:99
1
2
3
4
5
6
7
8
func IsUpgradeRequest(req *http.Request) bool {
for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] {
if strings.Contains(strings.ToLower(h), strings.ToLower(HeaderUpgrade)) {
return true
}
}
return false
}

该方法只判断 Connection 头内容是不是 Upgrade。在 UpgradeAwareHandler 中只有该方法返回 true 时才执行升级逻辑:

vendor/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go:309
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool {
if !httpstream.IsUpgradeRequest(req) {
klog.V(6).Infof("Request was not an upgrade")
return false
}
// ...略

clone := utilnet.CloneRequest(req)
// ...略
backendConn, err = h.DialForUpgrade(clone)
if err != nil {
klog.V(6).Infof("Proxy connection error: %v", err)
h.Responder.Error(w, req, err)
return true
}
defer backendConn.Close()
// ...略
}

如果是升级请求就调用 h.DialForUpgrade(clone) 发起连接,DialForUpgrade 方法内还会根据是否配置了 UpgradeTransport 来确定是否将其加入 Transport 处理中:

vendor/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go:470
1
2
3
4
5
6
7
8
9
10
func (h *UpgradeAwareHandler) DialForUpgrade(req *http.Request) (net.Conn, error) {
if h.UpgradeTransport == nil {
return dial(req, h.Transport)
}
updatedReq, err := h.UpgradeTransport.WrapRequest(req)
if err != nil {
return nil, err
}
return dial(updatedReq, h.UpgradeTransport)
}

UpgradeTransportUpgradeRequestRoundTripper 接口的实例,这类实例用于在升级前对 Request 进行包装。这里和前面构造方法里提到的 Transport 是类似的:

  • h.UpgradeTransport:只在 Upgrade 请求时使用到,如果为 nil 则使用 h.Transport 代替;
  • h.Transport:通常传给 httputil.NewSingleHostReverseProxy 在非 Upgrade 请求使用,h.UpgradeTransport 未设置时也给 Upgrade 请求使用。

通过 dialURL 获取到连接后,调用 Write() 写入当前请求的数据:

vendor/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go:495
1
2
3
4
5
6
7
8
9
10
11
12
13
func dial(req *http.Request, transport http.RoundTripper) (net.Conn, error) {
conn, err := dialURL(req.Context(), req.URL, transport)
if err != nil {
return nil, fmt.Errorf("error dialing backend: %v", err)
}

if err = req.Write(conn); err != nil {
conn.Close()
return nil, fmt.Errorf("error sending request: %v", err)
}

return conn, err
}

回到 tryUpgrade() 方法,建立连接后,先读取部分 Response 信息用来判断响应代码是否正确:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool {
// ...略

backendConn, err = h.DialForUpgrade(clone)
if err != nil {
klog.V(6).Infof("Proxy connection error: %v", err)
h.Responder.Error(w, req, err)
return true
}
defer backendConn.Close()

// determine the http response code from the backend by reading from rawResponse+backendConn
backendHTTPResponse, headerBytes, err := getResponse(io.MultiReader(bytes.NewReader(rawResponse), backendConn))
if err != nil {
klog.V(6).Infof("Proxy connection error: %v", err)
h.Responder.Error(w, req, err)
return true
}

if len(headerBytes) > len(rawResponse) {
// we read beyond the bytes stored in rawResponse, update rawResponse to the full set of bytes read from the backend
rawResponse = headerBytes
}
// ...略
}

getResponse 方法会以 HTTP 的方式读取响应并返回 Response 对象,接着通过状态码判断是否升级。同时,getResponse 方法会读取一份 Response 的原始字节码,用于后面写回客户端通知客户端执行 Upgrade 动作切换协议。

通常对于处理升级的请求,正常第一个响应的内容只有请求头,内容是告知客户端 Upgrade 的结果,是否需要切换协议,后面的内容使用新协议进行传输。

如果是升级失败,后面需要将 Response 回写回客户端。所以获取 Response 后,首先对请求结果进行判断处理,如果响应码不是升级且是正常的响应码,就直接返回错误。

staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go:362
1
2
3
4
5
6
if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols && backendHTTPResponse.StatusCode < 400 {
err := fmt.Errorf("invalid upgrade response: status code %d", backendHTTPResponse.StatusCode)
klog.Errorf("Proxy upgrade error: %v", err)
h.Responder.Error(w, req, err)
return true
}

如果响应码是错误,需要将错误的响应写回客户端:

staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go:385
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols {
// If the backend did not upgrade the request, echo the response from the backend to the client and return, closing the connection.
klog.V(6).Infof("Proxy upgrade error, status code %d", backendHTTPResponse.StatusCode)
// set read/write deadlines
deadline := time.Now().Add(10 * time.Second)
backendConn.SetReadDeadline(deadline)
requestHijackedConn.SetWriteDeadline(deadline)
// write the response to the client
err := backendHTTPResponse.Write(requestHijackedConn)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
klog.Errorf("Error proxying data from backend to client: %v", err)
}
// Indicate we handled the request
return true
}

写回客户端是通过对原连接进行劫持实现的,使用 http.Hijacker 对原连接进行劫持后,可以得到原始的 net.Conn

staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go:371
1
2
3
4
5
6
7
8
9
10
11
12
13
requestHijacker, ok := w.(http.Hijacker)
if !ok {
klog.V(6).Infof("Unable to hijack response writer: %T", w)
h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w))
return true
}
requestHijackedConn, _, err := requestHijacker.Hijack()
if err != nil {
klog.V(6).Infof("Unable to hijack response: %v", err)
h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err))
return true
}
defer requestHijackedConn.Close()

注意点

注意,连接被劫持后,原有的 response 对象就不能使用了。

正常升级的请求,成功劫持连接后,将第一次读取到的响应,写回客户端,通知客户端切换协议:

staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go:402
1
2
3
4
5
6
if len(rawResponse) > 0 {
klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse))
if _, err = requestHijackedConn.Write(rawResponse); err != nil {
utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
}
}

由于双方的传输协议正常升级后不再是 HTTP 协议,内容也不再需要关心,所以后面的工作就变成双向数据对拷:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
writerComplete := make(chan struct{})
readerComplete := make(chan struct{})

go func() {
var writer io.WriteCloser
if h.MaxBytesPerSec > 0 {
writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec)
} else {
writer = backendConn
}
_, err := io.Copy(writer, requestHijackedConn)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
klog.Errorf("Error proxying data from client to backend: %v", err)
}
close(writerComplete)
}()

go func() {
var reader io.ReadCloser
if h.MaxBytesPerSec > 0 {
reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec)
} else {
reader = backendConn
}
_, err := io.Copy(requestHijackedConn, reader)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
klog.Errorf("Error proxying data from backend to client: %v", err)
}
close(readerComplete)
}()

// Wait for one half the connection to exit. Once it does the defer will
// clean up the other half of the connection.
select {
case <-writerComplete:
case <-readerComplete:
}
klog.V(6).Infof("Disconnecting from backend proxy %s\n Headers: %v", &location, clone.Header)

数据双向对拷用的是 io.Copy 方法,该方法只有在流出现错误只才会返回,所以使用了两个协程来实现。

当有一个方向的流被关闭时方法返回,requestHijackedConn.Close() 这个 defer 会关闭连接,所有关联的 IO 流都会 EOF 退出。

总结

UpgradeAwareHandler 作为一个代理转发组件,在 httputil.NewSingleHostReverseProxy 的基础上增加了对可升级请求的转发处理。

我们都知道,访问 API Server 是需要一些权限的,但代理转发的全程没有任何关于权限相关地处理,却又能将请求正确送达。这得益于 Golang 中的 RoundTripper 的设计模式,能将自定义逻辑插入到请求处理链中,实现不同模块的解耦。

引用

  1. 协议升级机制

Kubernetes 代码中的 UpgradeAwareHandler

https://blog.imoe.tech/2023/12/13/UpgradeAwareHandler/

作者

Jakes Lee

发布于

2023-12-13

更新于

2024-01-23

许可协议

评论