基于 HTTP 构建的服务标准模型包括客户端 Client 和服务端 Server。HTTP 请求从客户端发出,服务端接收到请求后进行处理,然后响应返回给客户端。因此 HTTP 服务器的工作就在于如何接受来自客户端的请求,并向客户端返回响应。典型的 HTTP 服务如下图:
Client
以下是一个简单示例,在例子中,我们通过 http 标准库向给定的 url:http://httpbin.org/get 发送了一个 Get 请求,请求参数为 name=makonike。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package mainimport ( "fmt" "io" "net/http" ) func main () { resp, err := http.Get("http://httpbin.org/get?name=makonike" ) if err != nil { return } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) fmt.Println(string (body)) }
在http.Get()
中,它调用的是DefaultClient.Get()
,DefaultClient 是 Client 的一个空实例,实际上调用的是Client.Get()
。
1 2 var DefaultClient = &Client{}
Client 结构体
1 2 3 4 5 6 type Client struct { Transport RoundTripper CheckRedirect func (req *Request, via []*Request) error Jar CookieJar Timeout time.Duration }
Client 结构体总共包含四个字段:
Transport :表示 HTTP 事务,用于处理客户端请求连接并等待服务端的响应
CheckRedirect :用于指定处理重定向策略
Jar :用于存储和管理请求中的 cookie
Timeout :设置客户端请求的最大超时时间,包括连接、任何重定向以及读取响应的时间
初始化请求
Get()
向指定 url 发出 Get 请求,其中先要根据请求类型构造一个完整的请求,包含请求头、请求体和请求参数,然后才根据请求的完整结构来执行请求。NewRequest()
用于构造请求,它会调用NewRequestWithContext()
,它返回一个 Request 结构体,其中包含了一个 HTTP 请求的所有信息。
1 2 3 4 5 6 7 8 9 10 11 12 func (c *Client) Get(url string ) (resp *Response, err error ) { req, err := NewRequest("GET" , url, nil ) if err != nil { return nil , err } return c.Do(req) } func NewRequest (method, url string , body io.Reader) (*Request, error ) { return NewRequestWithContext(context.Background(), method, url, body) }
Request 是请求的抽象结构体,其中有很多字段都是我们已熟知的,这里仅仅列举一部分:
1 2 3 4 5 6 7 type Request struct { Method string URL *url.URL Header Header Body io.ReadCloser ... }
我们直接来看看NewRequestWithContext()
,它的作用是将请求封装成一个 Request 结构体并返回 。它强制请求方法不为空,并校验了请求方法的有效性,同时解析 url,最终封装请求信息为一个 Request 结构体。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func NewRequestWithContext (ctx context.Context, method, url string , body io.Reader) (*Request, error ) { if method == "" { method = "GET" } if !validMethod(method) { return nil , fmt.Errorf("net/http: invalid method %q" , method) } if ctx == nil { return nil , errors.New("net/http: nil Context" ) } u, err := urlpkg.Parse(url) if err != nil { return nil , err } rc, ok := body.(io.ReadCloser) if !ok && body != nil { rc = io.NopCloser(body) } u.Host = removeEmptyPort(u.Host) req := &Request{ ctx: ctx, Method: method, URL: u, Proto: "HTTP/1.1" , ProtoMajor: 1 , ProtoMinor: 1 , Header: make (Header), Body: rc, Host: u.Host, } ... return req, nil }
获取到一个完整的 Request 结构体后,Get()
会调用c.Do(req)
发送请求,它会调用c.do(req)
,然后调用c.send(req, deadline)
,最终会调用到send()
。Client.Do 的逻辑主要分为两段,一段是调用 send() 发送请求接收 Response ,关闭 Req.Body,另一段是对需要 redirect 的请求执行重定向操作 ,并关闭 Resp.Body。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func () bool , err error ) { ... resp, didTimeout, err = send(req, c.transport(), deadline) if err != nil { return nil , didTimeout, err } ... return resp, nil , nil } func (c *Client) Do(req *Request) (*Response, error ) { return c.do(req) } func (c *Client) do(req *Request) (retres *Response, reterr error ) { if resp, didTimeout, err = c.send(req, deadline); err != nil { reqBodyClosed = true if !deadline.IsZero() && didTimeout() { err = &httpError{ err: err.Error() + " (Client.Timeout exceeded while awaiting headers)" , timeout: true , } } return nil , uerr(err) } var shouldRedirect bool redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0 ]) if !shouldRedirect { return resp, nil } req.closeBody() } func (c *Client) transport() RoundTripper { if c.Transport != nil { return c.Transport } return DefaultTransport }
在Client.send()
调用send()
进行下一步处理前,会先调用c.transport()
获取 DefaultTransport 实例。
1 2 3 4 5 6 7 8 9 10 11 12 var DefaultTransport RoundTripper = &Transport{ Proxy: ProxyFromEnvironment, DialContext: defaultTransportDialContext(&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, }), ForceAttemptHTTP2: true , MaxIdleConns: 100 , IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, }
RoundTripper 负责 HTTP 请求的建立,发送,接收 HTTP 应答以及关闭,但是不对 HTTP 响应进行额外处理,例如:redirects, authentication, or cookies 等上层协议细节。Transport 实现了 RoundTripper 接口,它实现的RoundTrip()
方法会具体地处理请求,处理完毕后返回 Response。
1 2 3 type RoundTripper interface { RoundTrip(*Request) (*Response, error ) }
回到Client.send()
,它会调用send()
,这个函数的主要逻辑交给获取到的 Transport 实现的RoundTrip()
来执行,RoundTrip()
会调用到roundTrip()
。
1 2 3 4 5 6 7 8 9 func send (ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func () bool , err error ) { ... resp, err = rt.RoundTrip(req) ... } func (t *Transport) RoundTrip(req *Request) (*Response, error ) { return t.roundTrip(req) }
roundTrip()
中会做两件事,一是调用 Transport 的 getConn() 获取连接 ,二是在获取到连接后,调用 persistConn 的 roundTrip() 发送请求 ,等待响应结果。persistConn 是对连接的一层封装,是持久化连接的抽象,通常表示 keep-alive 连接,也可以表示 non-keep-alive 连接,它与 Client 和 Transport 的关系大致如下图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 func (t *Transport) roundTrip(req *Request) (*Response, error ) { t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) ctx := req.Context() trace := httptrace.ContextClientTrace(ctx) ... scheme := req.URL.Scheme if altRT := t.alternateRoundTripper(req); altRT != nil { if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol { return resp, err } var err error req, err = rewindBody(req) if err != nil { return nil , err } } ... for { select { case <-ctx.Done(): req.closeBody() return nil , ctx.Err() default : } treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey} cm, err := t.connectMethodForRequest(treq) if err != nil { req.closeBody() return nil , err } pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(cancelKey, nil ) req.closeBody() return nil , err } var resp *Response if pconn.alt != nil { t.setReqCanceler(cancelKey, nil ) resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) } if err == nil { resp.Request = origReq return resp, nil } ... } } func (t *Transport) alternateRoundTripper(req *Request) RoundTripper { if !t.useRegisteredProtocol(req) { return nil } altProto, _ := t.altProto.Load().(map [string ]RoundTripper) return altProto[req.URL.Scheme] } func (t *Transport) useRegisteredProtocol(req *Request) bool { if req.URL.Scheme == "https" && req.requiresHTTP1() { return false } return true } func (r *Request) requiresHTTP1() bool { return hasToken(r.Header.Get("Connection" ), "upgrade" ) && ascii.EqualFold(r.Header.Get("Upgrade" ), "websocket" ) }
获取连接
getConn()
有两个阶段:
调用queueForIdleConn()
获取空闲连接
当不存在空闲连接时,调用queueForDial()
创建新的连接
可以看到关系图如上。getConn()
先封装请求为 wantConn 结构体,然后调用queueForIdleConn()
获取空闲连接,如果成功获取则返回连接,失败了则调用queueForDial()
创建一个新连接,进行后续的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error ) { req := treq.Request trace := treq.trace ctx := req.Context() if trace != nil && trace.GetConn != nil { trace.GetConn(cm.addr()) } w := &wantConn{ cm: cm, key: cm.key(), ctx: ctx, ready: make (chan struct {}, 1 ), beforeDial: testHookPrePendingDial, afterDial: testHookPostPendingDial, } defer func () { if err != nil { w.cancel(t, err) } }() if delivered := t.queueForIdleConn(w); delivered { pc := w.pc if pc.alt == nil && trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(pc.idleAt)) } t.setReqCanceler(treq.cancelKey, func (error ) {}) return pc, nil } cancelc := make (chan error , 1 ) t.setReqCanceler(treq.cancelKey, func (err error ) { cancelc <- err }) t.queueForDial(w) select { case <-w.ready: if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()}) } if w.err != nil { select { case <-req.Cancel: return nil , errRequestCanceledConn case <-req.Context().Done(): return nil , req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil , err default : } } return w.pc, w.err case <-req.Cancel: return nil , errRequestCanceledConn case <-req.Context().Done(): return nil , req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil , err } } func (cm *connectMethod) key() connectMethodKey { proxyStr := "" targetAddr := cm.targetAddr if cm.proxyURL != nil { proxyStr = cm.proxyURL.String() if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https" ) && cm.targetScheme == "http" { targetAddr = "" } } return connectMethodKey{ proxy: proxyStr, scheme: cm.targetScheme, addr: targetAddr, onlyH1: cm.onlyH1, } }
来看看queueForIdleConn()
,它先根据 wantConn.key 去 idleConn map 中看看是否存在空闲的 connection list,如果能获取到,则获取列表中最后一个 connection 返回,否则将当前 wantConn 加入到 idleConnWait map 中。这部分的逻辑比较简单,直接看代码就能理解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool ) { if t.DisableKeepAlives { return false } t.idleMu.Lock() defer t.idleMu.Unlock() t.closeIdle = false ... var oldTime time.Time if t.IdleConnTimeout > 0 { oldTime = time.Now().Add(-t.IdleConnTimeout) } if list, ok := t.idleConn[w.key]; ok { stop := false delivered := false for len (list) > 0 && !stop { pconn := list[len (list)-1 ] tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0 ).Before(oldTime) if tooOld { go pconn.closeConnIfStillIdle() } if pconn.isBroken() || tooOld { list = list[:len (list)-1 ] continue } delivered = w.tryDeliver(pconn, nil ) if delivered { if pconn.alt != nil { } else { t.idleLRU.remove(pconn) list = list[:len (list)-1 ] } } stop = true } if len (list) > 0 { t.idleConn[w.key] = list } else { delete (t.idleConn, w.key) } if stop { return delivered } } if t.idleConnWait == nil { t.idleConnWait = make (map [connectMethodKey]wantConnQueue) } q := t.idleConnWait[w.key] q.cleanFront() q.pushBack(w) t.idleConnWait[w.key] = q return false }
来看看queueForDial()
,它在获取不到同一个 url 对应的空闲连接时调用,会尝试去创建一个连接,这里主要是参数校验,创建连接的逻辑在dialConnFor()
中。调用过程如下:
先校验 MaxConnsPerHost 是否未设置或已到达上限。如果校验不通过则将请求放在 connsPerHostWait map 中。
如果校验通过,会异步调用dialConnFor()
创建连接
dialConnFor()
首先调用 dialConn() 创建 TCP 连接,然后启用两个异步线程来处理数据,然后调用 tryDeliver 将连接绑定在 wantConn 上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func (t *Transport) queueForDial(w *wantConn) { w.beforeDial() if t.MaxConnsPerHost <= 0 { go t.dialConnFor(w) return } t.connsPerHostMu.Lock() defer t.connsPerHostMu.Unlock() if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { if t.connsPerHost == nil { t.connsPerHost = make (map [connectMethodKey]int ) } t.connsPerHost[w.key] = n + 1 go t.dialConnFor(w) return } if t.connsPerHostWait == nil { t.connsPerHostWait = make (map [connectMethodKey]wantConnQueue) } q := t.connsPerHostWait[w.key] q.cleanFront() q.pushBack(w) t.connsPerHostWait[w.key] = q } func (t *Transport) dialConnFor(w *wantConn) { defer w.afterDial() pc, err := t.dialConn(w.ctx, w.cm) delivered := w.tryDeliver(pc, err) if err == nil && (!delivered || pc.alt != nil ) { t.putOrCloseIdleConn(pc) } if err != nil { t.decConnsPerHost(w.key) } }
dialConnFor()
调用dialConn()
创建 TCP 连接,然后调用tryDeliver()
将其与 wantConn 绑定。在dialConn()
中,会根据 scheme 的不同设置不同的连接配置,如下是 HTTP 连接的创建过程,创建完成后会开俩 goroutine 为该连接异步处理读写数据。创建的连接结构体包含了俩 channel:writech 负责写入请求数据,reqch 负责响应数据。开的两个 goroutine 异步循环就是处理其中的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error ) { pconn = &persistConn{ t: t, cacheKey: cm.key(), reqch: make (chan requestAndChan, 1 ), writech: make (chan writeRequest, 1 ), closech: make (chan struct {}), writeErrCh: make (chan error , 1 ), writeLoopDone: make (chan struct {}), } ... if cm.scheme() == "https" && t.hasCustomTLSDialer() { ... } else { conn, err := t.dial(ctx, "tcp" , cm.addr()) if err != nil { return nil , wrapErr(err) } pconn.conn = conn if cm.scheme() == "https" { var firstTLSHost string if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { return nil , wrapErr(err) } if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil { return nil , wrapErr(err) } } } if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { alt := next(cm.targetAddr, pconn.conn.(*tls.Conn)) if e, ok := alt.(erringRoundTripper); ok { return nil , e.RoundTripErr() } return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil } } pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) go pconn.readLoop() go pconn.writeLoop() return pconn, nil } func (t *Transport) dial(ctx context.Context, network, addr string ) (net.Conn, error ) { if t.DialContext != nil { return t.DialContext(ctx, network, addr) } if t.Dial != nil { c, err := t.Dial(network, addr) if c == nil && err == nil { err = errors.New("net/http: Transport.Dial hook returned (nil, nil)" ) } return c, err } return zeroDialer.DialContext(ctx, network, addr) }
建立连接用到了 DialContext,它用于创建指定网络协议(如 tcp,udp),指定地址的连接,如下:
1 2 3 4 5 6 Dial("tcp" , "golang.org:http" ) Dial("tcp" , "192.0.2.1:http" ) Dial("tcp" , "198.51.100.1:80" ) Dial("udp" , "[2001:db8::1]:domain" ) Dial("udp" , "[fe80::1%lo0]:53" ) Dial("tcp" , ":80" )
通过注释可以看到t.Dial(network, addr)
已经弃用了,这里兼容保留,优先使用 DialContext。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Dial func (network, addr string ) (net.Conn, error ) func Dial (network, address string ) (Conn, error ) { var d Dialer return d.Dial(network, address) } func (d *Dialer) Dial(network, address string ) (Conn, error ) { return d.DialContext(context.Background(), network, address) }
最初我们获得的 DefaultTransport 中就初始化了 DialContext,其接口实现者 net.Dialer 包含了创建 TCP 连接的各种选项,如超时设置 timeout(TCP 连接建立超时时间,OS 中一般为 3mins),Deadline(限制了确定的时刻,与 timeout 作用类似),TCP 四元组的原始 IP 地址 LocalAddr 等。
DialContext 创建连接分为三个阶段
resolveAddrList:根据本地地址网络类型以及地址族拆分目标地址,返回地址列表。
dialSerial:对 resolveAddrList 返回的地址依次尝试创建连接,返回第一个创建成功的连接,否则返回第一个错误 firstErr。
setKeepAlice:创建完连接后,检查该连接是否为 TCP 连接,如果是,则设置 KeepAlice 时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 func (d *Dialer) DialContext(ctx context.Context, network, address string ) (Conn, error ) { if ctx == nil { panic ("nil context" ) } deadline := d.deadline(ctx, time.Now()) if !deadline.IsZero() { if d, ok := ctx.Deadline(); !ok || deadline.Before(d) { subCtx, cancel := context.WithDeadline(ctx, deadline) defer cancel() ctx = subCtx } } if oldCancel := d.Cancel; oldCancel != nil { subCtx, cancel := context.WithCancel(ctx) defer cancel() go func () { select { case <-oldCancel: cancel() case <-subCtx.Done(): } }() ctx = subCtx } resolveCtx := ctx if trace, _ := ctx.Value(nettrace.TraceKey{}).(*nettrace.Trace); trace != nil { shadow := *trace shadow.ConnectStart = nil shadow.ConnectDone = nil resolveCtx = context.WithValue(resolveCtx, nettrace.TraceKey{}, &shadow) } addrs, err := d.resolver().resolveAddrList(resolveCtx, "dial" , network, address, d.LocalAddr) if err != nil { return nil , &OpError{Op: "dial" , Net: network, Source: nil , Addr: nil , Err: err} } sd := &sysDialer{ Dialer: *d, network: network, address: address, } var primaries, fallbacks addrList if d.dualStack() && network == "tcp" { primaries, fallbacks = addrs.partition(isIPv4) } else { primaries = addrs } var c Conn if len (fallbacks) > 0 { c, err = sd.dialParallel(ctx, primaries, fallbacks) } else { c, err = sd.dialSerial(ctx, primaries) } if err != nil { return nil , err } if tc, ok := c.(*TCPConn); ok && d.KeepAlive >= 0 { setKeepAlive(tc.fd, true ) ka := d.KeepAlive if d.KeepAlive == 0 { ka = defaultTCPKeepAlive } setKeepAlivePeriod(tc.fd, ka) testHookSetKeepAlive(ka) } return c, nil }
发送请求等待响应
分析完 getConn() 后,我们回头来分析 Client.Do 的第二步resp, err = rt.RoundTrip(req)
。它先将请求数据写入到 writech channel 中,writeLoop 接收到数据后就会处理请求。然后roundTrip()
将 requestAndChan 结构体放入 reqch channel。roundTrip()
循环等待,当 readLoop 读取到响应数据后就会通过 requestAndChan 结构体保存的 channel,将数据封装为 responseAndError 结构体回写,这样roundTrip()
接到响应数据后就结束循环等待并返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 func (t *Transport) roundTrip(req *Request) (*Response, error ) { ... pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(cancelKey, nil ) req.closeBody() return nil , err } var resp *Response if pconn.alt != nil { t.setReqCanceler(cancelKey, nil ) resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) } ... } func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error ) { ... startBytesWritten := pc.nwrite writeErrCh := make (chan error , 1 ) pc.writech <- writeRequest{req, writeErrCh, continueCh} resc := make (chan responseAndError) pc.reqch <- requestAndChan{ req: req.Request, cancelKey: req.cancelKey, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } for { testHookWaitResLoop() select { ... case re := <-resc: if (re.res == nil ) == (re.err == nil ) { panic (fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v" , re.res == nil )) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v" , re.res, re.err, re.err) } if re.err != nil { return nil , pc.mapRoundTripError(req, startBytesWritten, re.err) } return re.res, nil ... } } }
writeLoop 会请求数据 writeRequest,将 writech channel 中获取到的数据写入 TCP 连接中,并发送到目标服务器。当调用Request.write()
向请求写入数据时,实际上直接将数据写入了 persistConnWriter 封装的 TCP 连接中bw.Flush()
,TCP 协议栈负责将 HTTP 请求中的内容发送到目标服务器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (pc *persistConn) writeLoop() { defer close (pc.writeLoopDone) for { select { case wr := <-pc.writech: startBytesWritten := pc.nwrite err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) ... case <-pc.closech: return } } } type persistConnWriter struct { pc *persistConn } func (w persistConnWriter) Write(p []byte ) (n int , err error ) { n, err = w.pc.conn.Write(p) w.pc.nwrite += int64 (n) return }
TCP 连接中的响应数据通过 roundTrip 传入的 channel 再回写,然后 roundTrip 就会接收到数据并返回获取到的响应数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (pc *persistConn) readLoop() { closeErr := errReadLoopExiting defer func () { pc.close (closeErr) pc.t.removeIdleConn(pc) }() ... alive := true for alive { pc.readLimit = pc.maxHeaderResponseSize() rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response if err == nil { resp, err = pc.readResponse(rc, trace) } else { err = transportReadFromServerError{err} closeErr = err } ... select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } } }
至此,HTTP 标准包中 Client 端的大致处理流程已经介绍完了,我们总结一下大致步骤:
初始化请求 :首先通过 NewRequest() 校验请求信息,并将请求封装为 request 结构体,然后调用 Client.Do () 发送请求。
发送请求准备 :Client.Do () 中调用了 Client.send() 通过 Client.transport() 获取默认的 Transport 实例,然后调用 send()。
获取连接 :send() 中先通过 getConn() 获取连接,其中先调用 queueForIdleConn() 获取空闲连接,如果获取不到,则调用 queueForDial() 创建新的连接。获取连接前需要校验连接数是否超过限制,然后异步调用 dialConnFor() 创建连接,其中是调用 dialConn() 来创建 TCP 连接,然后启用两个异步 goroutine 来处理数据,调用 tryDeliver() 将连接绑定在 wantConn 上。
发送请求并等待响应 :成功获取连接后,在 send() 的下文调用 RoundTrip(),它将请求数据写入 writech 中,异步执行的 writeLoop 接收到数据后就会处理请求。然后 roundTrip() 将封装的 requestAndChan 结构体放入 reqch,进入循环等待。当 readLoop 读取到响应数据后会通过 requestAndChan 保存的 channel,将数据封装为 responseAndError 结构体写回。循环等待的 roundTrip() 接收到响应数据后就结束循环等待,并返回。
Server
同样以一个简单的例子开头:下面的例子监听 8000 端口,并在客户端请求路径/
时在控制台打印Hello World
。
1 2 3 4 5 6 7 8 func HelloHandler (w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Hello World" ) } func main () { http.HandleFunc("/" , HelloHandler) http.ListenAndServe(":8000" , nil ) }
借用 luozhiyun 博主的图囊括一下流程:
从上图看出大致步骤:
注册 handler 到 handler map 中。
open listener 开启循环监听,每听到一个连接就会创建一个 goroutine。
在创建好的 goroutine 中 accept loop 循环等待接收数据,异步处理请求。
有数据到来时根据请求地址去 handler map 中 match handler,然后将 request 交给 handler 处理。
注册处理器
上述例子中,使用了http.HandleFunc()
来注册 handler。其调用的是DefaultServeMux.HandleFunc(pattern, handler)
,最终调用mux.Handle(pattern, HandlerFunc(handler))
来为给定的 pattern 注册 handler。如果对应的 pattern 已经存在一个 handler,则会 panic。其实 mux.m 就是一个 hash 表,用于精确匹配。当 pattern 尾字符为’/'时,放入[]muxEntry 用于部分匹配。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 type Handler interface { ServeHTTP(ResponseWriter, *Request) } type HandlerFunc func (ResponseWriter, *Request) func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) { f(w, r) } func (mux *ServeMux) HandleFunc(pattern string , handler func (ResponseWriter, *Request) ) { if handler == nil { panic ("http: nil handler" ) } mux.Handle(pattern, HandlerFunc(handler)) } func (mux *ServeMux) Handle(pattern string , handler Handler) { mux.mu.Lock() defer mux.mu.Unlock() if pattern == "" { panic ("http: invalid pattern" ) } if handler == nil { panic ("http: nil handler" ) } if _, exist := mux.m[pattern]; exist { panic ("http: multiple registrations for " + pattern) } if mux.m == nil { mux.m = make (map [string ]muxEntry) } e := muxEntry{h: handler, pattern: pattern} mux.m[pattern] = e if pattern[len (pattern)-1 ] == '/' { mux.es = appendSorted(mux.es, e) } if pattern[0 ] != '/' { mux.hosts = true } } func appendSorted (es []muxEntry, e muxEntry) []muxEntry { n := len (es) i := sort.Search(n, func (i int ) bool { return len (es[i].pattern) < len (e.pattern) }) if i == n { return append (es, e) } es = append (es, muxEntry{}) copy (es[i+1 :], es[i:]) es[i] = e return es }
循环监听
监听通过调用http.ListenAndServe(":8000", nil)
实现,其调用的是server.ListenAndServe()
。server 封装了给定的 addr 和 handler,handler 通常情况下为 nil,这个 handler 用于处理全局的请求。核心逻辑是net.Listen()
与server.Serve()
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (srv *Server) ListenAndServe() error { if srv.shuttingDown() { return ErrServerClosed } addr := srv.Addr if addr == "" { addr = ":http" } ln, err := net.Listen("tcp" , addr) if err != nil { return err } return srv.Serve(ln) } func ListenAndServe (addr string , handler Handler) error { server := &Server{Addr: addr, Handler: handler} return server.ListenAndServe() }
Serve() 接收每个连接,并为每个连接创建一个 goroutine 来读取请求,然后调用 srv.Handler 来回复它们。里面用到了一个循环去接收监听到的网络连接,如果并发很高的话,可能会一次性创建太多协程,导致处理不过来的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (srv *Server) Serve(l net.Listener) error { ... l = &onceCloseListener{Listener: l} defer l.Close() ... baseCtx := context.Background() ... ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { rw, err := l.Accept() if err != nil { select { case <-srv.getDoneChan(): return ErrServerClosed default : } ... return err } connCtx := ctx ... c := srv.newConn(rw) c.setState(c.rwc, StateNew, runHooks) go c.serve(connCtx) } }
处理请求
读取请求调用的是c.readRequest(ctx)
,分发请求需要看具体的实现,调用这个接口serverHandler{c.server}.ServeHTTP(w, w.req)
时,最终调用的是ServeMux.ServeHTTP()
,然后通过 handler 调用到match()
进行路由匹配。最终是调用handler.ServeHTTP()
处理请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 func (c *conn) serve(ctx context.Context) { c.remoteAddr = c.rwc.RemoteAddr().String() ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr()) ... ctx, cancelCtx := context.WithCancel(ctx) c.cancelCtx = cancelCtx defer cancelCtx() c.r = &connReader{conn: c} c.bufr = newBufioReader(c.r) c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4 <<10 ) for { w, err := c.readRequest(ctx) ... serverHandler{c.server}.ServeHTTP(w, w.req) ... w.cancelCtx() if c.hijacked() { return } w.finishRequest() ... } } func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) { handler := sh.srv.Handler if handler == nil { handler = DefaultServeMux } if req.RequestURI == "*" && req.Method == "OPTIONS" { handler = globalOptionsHandler{} } ... handler.ServeHTTP(rw, req) } func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) { if r.RequestURI == "*" { if r.ProtoAtLeast(1 , 1 ) { w.Header().Set("Connection" , "close" ) } w.WriteHeader(StatusBadRequest) return } h, _ := mux.Handler(r) h.ServeHTTP(w, r) } func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string ) { ... return mux.handler(host, r.URL.Path) } func (mux *ServeMux) handler(host, path string ) (h Handler, pattern string ) { mux.mu.RLock() defer mux.mu.RUnlock() if mux.hosts { h, pattern = mux.match(host + path) } if h == nil { h, pattern = mux.match(path) } if h == nil { h, pattern = NotFoundHandler(), "" } return } func (mux *ServeMux) match(path string ) (h Handler, pattern string ) { v, ok := mux.m[path] if ok { return v.h, v.pattern } for _, e := range mux.es { if strings.HasPrefix(path, e.pattern) { return e.h, e.pattern } } return nil , "" }
最后调用handler.ServeHTTP()
来处理请求。
1 2 3 func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) { f(w, r) }
至此,HTTP 标准库为 Server 端提供请求处理的流程大致介绍完了。简单来看就是注册处理器(在 Go Web 中,挺多人称呼 handler 为“中间件”,虽然我不太能理解= =)、监听端口、处理请求,当请求到来时为其创建一个 goroutine 处理请求,主要是根据 url 来分派 handler,调用 handler 方法来处理请求。
接口型函数
在上文中,我们看到了一个比较特殊的实现:HandlerFunc。HTTP 标准库中定义了一个接口Handler
,代表处理器,只包含一个方法ServeHTTP(ResponseWriter, *Request)
,接着定义了一个函数类型HandlerFunc
,它的参数与返回值都和 Handler 里的 ServeHTTP 方法是一致的。而且 HandlerFunc 还定义了 ServeHTTP 方法,并在其中调用自己,这样就实现了接口 Handler。所以 HadnlerFunc 是一个实现了接口的函数类型,简称为接口型函数。
1 2 3 4 5 6 7 8 9 10 11 type Handler interface { ServeHTTP(ResponseWriter, *Request) } type HandlerFunc func (ResponseWriter, *Request) func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) { f(w, r) }
优势
我们以上文的mux.Handle(pattern, HandlerFunc(handler))
为例,它的作用是使用某个处理器方法来处理匹配路径的请求,而 handler 则代表了处理器方法。
1 2 3 4 5 6 7 func (mux *ServeMux) Handle(pattern string , handler Handler) { ... e := muxEntry{h: handler, pattern: pattern} mux.m[pattern] = e ... }
我们可以使用多种方式来调用这个函数,例如将 HandlerFunc 类型的函数作为参数,这样支持匿名函数,也支持普通的函数,还有一种方式是将实现了 Handler 接口的结构体作为参数,但是这种方式适用于逻辑较为复杂的场景,如果需要的信息比较多,而且还有很多中间状态要保持,那么封装为一个结构体显然更符合情况。
通过接口型函数,该方法的参数既可以是普通的函数类型,也可以是结构体,使用起来更为灵活,可读性更好。
1 2 3 4 5 6 7 8 9 10 func home (w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte ("hello, index page" )) } func main () { http.Handle("/home" , http.HandlerFunc(home)) _ = http.ListenAndServe("localhost:8000" , nil ) }
总结
HTTP 标准库的实现相对来说比较简单,感觉能学到的东西比较少,不过想要学习更多的网络框架、Web 框架,HTTP 标准库还是先要了解的。
参考与推荐阅读