loc := *h.Location loc.RawQuery = req.URL.RawQuery
// If original request URL ended in '/', append a '/' at the end of the // of the proxy URL if !strings.HasSuffix(loc.Path, "/") && strings.HasSuffix(req.URL.Path, "/") { loc.Path += "/" }
// 如果没配置自定义 Transport 或开启了 WrapTransport 就使用默认 Transport if h.Transport == nil || h.WrapTransport { h.Transport = h.defaultProxyTransport(req.URL, h.Transport) }
// WithContext creates a shallow clone of the request with the same context. newReq := req.WithContext(req.Context()) newReq.Header = utilnet.CloneHeader(req.Header) if !h.UseRequestLocation { newReq.URL = &loc } if h.UseLocationHost { // exchanging req.Host with the backend location is necessary for backends that act on the HTTP host header (e.g. API gateways), // because req.Host has preference over req.URL.Host in filling this header field newReq.Host = h.Location.Host }
// create the target location to use for the reverse proxy reverseProxyLocation := &url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host} if h.AppendLocationPath { reverseProxyLocation.Path = h.Location.Path }
// 构造代理 proxy := httputil.NewSingleHostReverseProxy(reverseProxyLocation) proxy.Transport = h.Transport proxy.FlushInterval = h.FlushInterval proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags) if h.RejectForwardingRedirects { oldModifyResponse := proxy.ModifyResponse proxy.ModifyResponse = func(response *http.Response)error { code := response.StatusCode if code >= 300 && code <= 399 && len(response.Header.Get("Location")) > 0 { // close the original response response.Body.Close() msg := "the backend attempted to redirect this request, which is not permitted" // replace the response *response = http.Response{ StatusCode: http.StatusBadGateway, Status: fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)), Body: io.NopCloser(strings.NewReader(msg)), ContentLength: int64(len(msg)), } } else { if oldModifyResponse != nil { if err := oldModifyResponse(response); err != nil { return err } } } returnnil } } if h.Responder != nil { // if an optional error interceptor/responder was provided wire it // the custom responder might be used for providing a unified error reporting // or supporting retry mechanisms by not sending non-fatal errors to the clients proxy.ErrorHandler = h.Responder.Error }
funcIsUpgradeRequest(req *http.Request)bool { for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] { if strings.Contains(strings.ToLower(h), strings.ToLower(HeaderUpgrade)) { returntrue } } returnfalse }
func(h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool { if !httpstream.IsUpgradeRequest(req) { klog.V(6).Infof("Request was not an upgrade") returnfalse } // ...略
// determine the http response code from the backend by reading from rawResponse+backendConn backendHTTPResponse, headerBytes, err := getResponse(io.MultiReader(bytes.NewReader(rawResponse), backendConn)) if err != nil { klog.V(6).Infof("Proxy connection error: %v", err) h.Responder.Error(w, req, err) returntrue }
iflen(headerBytes) > len(rawResponse) { // we read beyond the bytes stored in rawResponse, update rawResponse to the full set of bytes read from the backend rawResponse = headerBytes } // ...略 }
if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols { // If the backend did not upgrade the request, echo the response from the backend to the client and return, closing the connection. klog.V(6).Infof("Proxy upgrade error, status code %d", backendHTTPResponse.StatusCode) // set read/write deadlines deadline := time.Now().Add(10 * time.Second) backendConn.SetReadDeadline(deadline) requestHijackedConn.SetWriteDeadline(deadline) // write the response to the client err := backendHTTPResponse.Write(requestHijackedConn) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { klog.Errorf("Error proxying data from backend to client: %v", err) } // Indicate we handled the request returntrue }
gofunc() { var writer io.WriteCloser if h.MaxBytesPerSec > 0 { writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec) } else { writer = backendConn } _, err := io.Copy(writer, requestHijackedConn) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { klog.Errorf("Error proxying data from client to backend: %v", err) } close(writerComplete) }()
gofunc() { var reader io.ReadCloser if h.MaxBytesPerSec > 0 { reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec) } else { reader = backendConn } _, err := io.Copy(requestHijackedConn, reader) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { klog.Errorf("Error proxying data from backend to client: %v", err) } close(readerComplete) }()
// Wait for one half the connection to exit. Once it does the defer will // clean up the other half of the connection. select { case <-writerComplete: case <-readerComplete: } klog.V(6).Infof("Disconnecting from backend proxy %s\n Headers: %v", &location, clone.Header)