基于 HTTP 构建的服务标准模型包括客户端Client 和服务端Server。HTTP 请求从客户端发出,服务端接收到请求后进行处理,然后响应返回给客户端。因此 HTTP 服务器的工作就在于如何接受来自客户端的请求,并向客户端返回响应。典型的 HTTP 服务如下图:

img

Client

以下是一个简单示例,在例子中,我们通过 http 标准库向给定的 url:http://httpbin.org/get发送了一个 Get 请求,请求参数为 name=makonike。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
"io"
"net/http"
)

func main() {
resp, err := http.Get("http://httpbin.org/get?name=makonike")
if err != nil {
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
fmt.Println(string(body))
}

http.Get()中,它调用的是DefaultClient.Get(),DefaultClient 是 Client 的一个空实例,实际上调用的是Client.Get()

1
2
// DefaultClient is the default Client and is used by Get, Head, and Post.
var DefaultClient = &Client{}

Client 结构体

1
2
3
4
5
6
type Client struct {
Transport RoundTripper
CheckRedirect func(req *Request, via []*Request) error
Jar CookieJar
Timeout time.Duration
}

Client 结构体总共包含四个字段:

  • Transport:表示 HTTP 事务,用于处理客户端请求连接并等待服务端的响应
  • CheckRedirect:用于指定处理重定向策略
  • Jar:用于存储和管理请求中的 cookie
  • Timeout:设置客户端请求的最大超时时间,包括连接、任何重定向以及读取响应的时间

初始化请求

Get()向指定 url 发出 Get 请求,其中先要根据请求类型构造一个完整的请求,包含请求头、请求体和请求参数,然后才根据请求的完整结构来执行请求。NewRequest()用于构造请求,它会调用NewRequestWithContext(),它返回一个 Request 结构体,其中包含了一个 HTTP 请求的所有信息。

1
2
3
4
5
6
7
8
9
10
11
12
func (c *Client) Get(url string) (resp *Response, err error) {
req, err := NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return c.Do(req)
}

// NewRequest wraps NewRequestWithContext using context.Background.
func NewRequest(method, url string, body io.Reader) (*Request, error) {
return NewRequestWithContext(context.Background(), method, url, body)
}

Request 是请求的抽象结构体,其中有很多字段都是我们已熟知的,这里仅仅列举一部分:

1
2
3
4
5
6
7
type Request struct {
Method string // 请求方法
URL *url.URL // 请求路径
Header Header // 请求头
Body io.ReadCloser // 请求体
...
}

我们直接来看看NewRequestWithContext(),它的作用是将请求封装成一个 Request 结构体并返回。它强制请求方法不为空,并校验了请求方法的有效性,同时解析 url,最终封装请求信息为一个 Request 结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error) {
// 如果请求方法为空,那么将其设为 GET
if method == "" {
method = "GET"
}
// 校验请求方法是否被支持
if !validMethod(method) {
return nil, fmt.Errorf("net/http: invalid method %q", method)
}
// 上下文信息,前文使用的是 context.Background(),返回一个空的 context
if ctx == nil {
return nil, errors.New("net/http: nil Context")
}
// 解析 url
u, err := urlpkg.Parse(url)
if err != nil {
return nil, err
}
rc, ok := body.(io.ReadCloser)
if !ok && body != nil {
rc = io.NopCloser(body)
}
// The host's colon:port should be normalized. See Issue 14836.
u.Host = removeEmptyPort(u.Host)
req := &Request{
ctx: ctx,
Method: method,
URL: u,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(Header),
Body: rc,
Host: u.Host,
}
...
return req, nil
}

获取到一个完整的 Request 结构体后,Get()会调用c.Do(req)发送请求,它会调用c.do(req),然后调用c.send(req, deadline),最终会调用到send()Client.Do 的逻辑主要分为两段,一段是调用 send() 发送请求接收 Response,关闭 Req.Body,另一段是对需要 redirect 的请求执行重定向操作,并关闭 Resp.Body。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
...
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
...
return resp, nil, nil
}

func (c *Client) Do(req *Request) (*Response, error) {
return c.do(req)
}

func (c *Client) do(req *Request) (retres *Response, reterr error) {
// 发送请求并接收 Response
if resp, didTimeout, err = c.send(req, deadline); err != nil {
// c.send() always closes req.Body
reqBodyClosed = true
if !deadline.IsZero() && didTimeout() {
err = &httpError{
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
return nil, uerr(err)
}
// 检查是否应该重定向
var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
return resp, nil
}
req.closeBody()
}

func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}

Client.send()调用send()进行下一步处理前,会先调用c.transport()获取 DefaultTransport 实例。

1
2
3
4
5
6
7
8
9
10
11
12
var DefaultTransport RoundTripper = &Transport{
Proxy: ProxyFromEnvironment, // 对特定的请求返回代理
DialContext: defaultTransportDialContext(&net.Dialer{ // 指定底层 TCP 连接的创建函数
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}),
ForceAttemptHTTP2: true,
MaxIdleConns: 100, // 最大空闲连接数
IdleConnTimeout: 90 * time.Second, // 空闲连接超时时间
TLSHandshakeTimeout: 10 * time.Second, // TLS 握手超时时间
ExpectContinueTimeout: 1 * time.Second,
}

RoundTripper 负责 HTTP 请求的建立,发送,接收 HTTP 应答以及关闭,但是不对 HTTP 响应进行额外处理,例如:redirects, authentication, or cookies 等上层协议细节。Transport 实现了 RoundTripper 接口,它实现的RoundTrip()方法会具体地处理请求,处理完毕后返回 Response。

1
2
3
type RoundTripper interface {
RoundTrip(*Request) (*Response, error)
}

回到Client.send(),它会调用send(),这个函数的主要逻辑交给获取到的 Transport 实现的RoundTrip()来执行,RoundTrip()会调用到roundTrip()

1
2
3
4
5
6
7
8
9
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
...
resp, err = rt.RoundTrip(req)
...
}

func (t *Transport) RoundTrip(req *Request) (*Response, error) {
return t.roundTrip(req)
}

roundTrip()中会做两件事,一是调用 Transport 的 getConn() 获取连接,二是在获取到连接后,调用 persistConn 的 roundTrip() 发送请求,等待响应结果。persistConn 是对连接的一层封装,是持久化连接的抽象,通常表示 keep-alive 连接,也可以表示 non-keep-alive 连接,它与 Client 和 Transport 的关系大致如下图:

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)
...
// 根据 url 的协议选择对应的实现来替代默认的逻辑,主要用了 useRegisteredProtocol()
scheme := req.URL.Scheme
if altRT := t.alternateRoundTripper(req); altRT != nil {
if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
return resp, err
}
var err error
req, err = rewindBody(req)
if err != nil {
return nil, err
}
}
...
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}
// 封装请求
treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// 获取连接
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
}
// 等待响应结果
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
resp.Request = origReq
return resp, nil
}
...
}
}

func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
if !t.useRegisteredProtocol(req) {
return nil
}
altProto, _ := t.altProto.Load().(map[string]RoundTripper)
return altProto[req.URL.Scheme]
}

func (t *Transport) useRegisteredProtocol(req *Request) bool {
if req.URL.Scheme == "https" && req.requiresHTTP1() {
return false
}
return true
}

func (r *Request) requiresHTTP1() bool {
return hasToken(r.Header.Get("Connection"), "upgrade") &&
ascii.EqualFold(r.Header.Get("Upgrade"), "websocket")
}

获取连接

getConn()有两个阶段:

  1. 调用queueForIdleConn()获取空闲连接
  2. 当不存在空闲连接时,调用queueForDial()创建新的连接

img

可以看到关系图如上。getConn()先封装请求为 wantConn 结构体,然后调用queueForIdleConn()获取空闲连接,如果成功获取则返回连接,失败了则调用queueForDial()创建一个新连接,进行后续的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}
// 将请求封装成 wantConn 结构体
w := &wantConn{
cm: cm,
key: cm.key(), // 注意这个字段:其实是 connectMethodKey 结构体类型
// 是对 url 关键字段的分解,代表连接的目标方 (代理,协议,目的地址)
// 包含了 proxyURL,targetAddr、Scheme 和 onlyH1
// onlyH1 bool // whether to disable HTTP/2 and force HTTP/1
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {
w.cancel(t, err)
}
}()

// Queue for idle connection.
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
// Trace only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
t.setReqCanceler(treq.cancelKey, func(error) {})
return pc, nil
}

cancelc := make(chan error, 1)
t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })

// Queue for permission to dial.
t.queueForDial(w)

// Wait for completion or cancellation.
select {
// 成功获取连接后进入该分支
case <-w.ready:
// Trace success but only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
}
if w.err != nil {
select {
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// return below
}
}
return w.pc, w.err
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}

// 对 url 关键字段的分解,代表连接的目标方 (代理,协议,目的地址)
func (cm *connectMethod) key() connectMethodKey {
proxyStr := ""
targetAddr := cm.targetAddr
if cm.proxyURL != nil {
proxyStr = cm.proxyURL.String()
if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
targetAddr = ""
}
}
return connectMethodKey{
proxy: proxyStr,
scheme: cm.targetScheme,
addr: targetAddr,
onlyH1: cm.onlyH1,
}
}

来看看queueForIdleConn(),它先根据 wantConn.key 去 idleConn map 中看看是否存在空闲的 connection list,如果能获取到,则获取列表中最后一个 connection 返回,否则将当前 wantConn 加入到 idleConnWait map 中。这部分的逻辑比较简单,直接看代码就能理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
// 禁用长连接的话每次都需要新的 connection,直接返回 false 即可
if t.DisableKeepAlives {
return false
}
t.idleMu.Lock()
defer t.idleMu.Unlock()
// 阻止关闭空闲连接,因为现在我们正在找一个空闲连接用
t.closeIdle = false
...
// If IdleConnTimeout is set, calculate the oldest
// persistConn.idleAt time we're willing to use a cached idle
// conn.
var oldTime time.Time
// 设置超时
if t.IdleConnTimeout > 0 {
oldTime = time.Now().Add(-t.IdleConnTimeout)
}

// 看看最近使用的空闲连接,找到 w.key 相同的 connection list
// values 是 persistConn 的列表
// persistConn 是对连接的一层封装,通常表示 keep-alive 连接,也可以表示 non-keep-alive 连接
if list, ok := t.idleConn[w.key]; ok {
stop := false
delivered := false
for len(list) > 0 && !stop {
// 有连接,获取最后一个
pconn := list[len(list)-1]

// 看看这个 connection 是不是等太久了
tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
if tooOld {
// 异步清除
go pconn.closeConnIfStillIdle()
}
// 如果标记已断开,或者这个 connection 等太久了执行异步清除,就得忽略它找下一个。
if pconn.isBroken() || tooOld {
list = list[:len(list)-1]
continue
}
// 尝试将整个 connection 写到 wantConn 中
delivered = w.tryDeliver(pconn, nil)
// 如果操作成功,需要将 connection 从 idle connection list 中删除
if delivered {
if pconn.alt != nil {
// HTTP/2: multiple clients can share pconn.
// Leave it in the list.
} else {
// HTTP/1: only one client can use pconn.
// Remove it from the list.
t.idleLRU.remove(pconn)
list = list[:len(list)-1]
}
}
stop = true
}
if len(list) > 0 {
t.idleConn[w.key] = list
} else {
// 如果 list 为 0 了,则将对应的 list 从 map 中删除,可以避免下次判断,尽早去创建连接
// 这里也是为了防止 idleConn 的 list 过多。
delete(t.idleConn, w.key)
}
if stop {
return delivered
}
}
// 如果找不到空闲 connection
if t.idleConnWait == nil {
t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
}
// 将 wantConn 加入到 idleConnWait map 中
q := t.idleConnWait[w.key]
q.cleanFront()
q.pushBack(w)
t.idleConnWait[w.key] = q
return false
}

来看看queueForDial(),它在获取不到同一个 url 对应的空闲连接时调用,会尝试去创建一个连接,这里主要是参数校验,创建连接的逻辑在dialConnFor()中。调用过程如下:

  1. 先校验 MaxConnsPerHost 是否未设置或已到达上限。如果校验不通过则将请求放在 connsPerHostWait map 中。
  2. 如果校验通过,会异步调用dialConnFor()创建连接
  3. dialConnFor()首先调用 dialConn() 创建 TCP 连接,然后启用两个异步线程来处理数据,然后调用 tryDeliver 将连接绑定在 wantConn 上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (t *Transport) queueForDial(w *wantConn) {
w.beforeDial()
// 小于 0 说明每个 host 的最大 connection 数量无限制,直接异步调用 dialConnFor(),然后返回
if t.MaxConnsPerHost <= 0 {
go t.dialConnFor(w)
return
}

t.connsPerHostMu.Lock()
defer t.connsPerHostMu.Unlock()
// 连接数没达到上限,异步建立连接
// MaxConnsPerHost 控制某个 host 的所有连接,包括创建中,正在使用的,以及空闲的连接
// 一旦超过限制,dial 会阻塞
if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
if t.connsPerHost == nil {
t.connsPerHost = make(map[connectMethodKey]int)
}
t.connsPerHost[w.key] = n + 1
go t.dialConnFor(w)
return
}
// 建立的连接数已到上限,需要进入等待队列
if t.connsPerHostWait == nil {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.connsPerHostWait[w.key]
q.cleanFront()
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}

func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()
// 建立连接
pc, err := t.dialConn(w.ctx, w.cm)
// 连接绑定 wantConn
delivered := w.tryDeliver(pc, err)
// 连接建立成功,但是绑定 wantConn 失败:可能是 HTTP/2 或被共享的
// 那么将该连接放到 idleConnection map 中
if err == nil && (!delivered || pc.alt != nil) {
// pconn was not passed to w,
// or it is HTTP/2 and can be shared.
// Add to the idle connection pool.
t.putOrCloseIdleConn(pc)
}
if err != nil {
t.decConnsPerHost(w.key)
}
}

dialConnFor()调用dialConn()创建 TCP 连接,然后调用tryDeliver()将其与 wantConn 绑定。在dialConn()中,会根据 scheme 的不同设置不同的连接配置,如下是 HTTP 连接的创建过程,创建完成后会开俩 goroutine 为该连接异步处理读写数据。创建的连接结构体包含了俩 channel:writech 负责写入请求数据,reqch 负责响应数据。开的两个 goroutine 异步循环就是处理其中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
// 持久化连接
pconn = &persistConn{
t: t,
cacheKey: cm.key(), // 分解 url,as cache key
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
...
// 根据不同 scheme 进行不同的配置
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
// 执行 TLS 握手过程
...
} else {
// 建立 tcp 连接
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}
// proxy ...
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
if e, ok := alt.(erringRoundTripper); ok {
// pconn.conn was closed by next (http2configureTransports.upgradeFn).
return nil, e.RoundTripErr()
}
return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
}
}
// 设置读写 buffer
pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
// 俩 buffer 都关联了 persistConn
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
// 为每个连接开俩 gorotuine 异步处理读写数据
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}

// 建立连接
func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
if t.DialContext != nil {
return t.DialContext(ctx, network, addr)
}
if t.Dial != nil {
c, err := t.Dial(network, addr)
if c == nil && err == nil {
err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
}
return c, err
}
return zeroDialer.DialContext(ctx, network, addr)
}

建立连接用到了 DialContext,它用于创建指定网络协议(如 tcp,udp),指定地址的连接,如下:

1
2
3
4
5
6
Dial("tcp", "golang.org:http")
Dial("tcp", "192.0.2.1:http")
Dial("tcp", "198.51.100.1:80")
Dial("udp", "[2001:db8::1]:domain")
Dial("udp", "[fe80::1%lo0]:53")
Dial("tcp", ":80")

通过注释可以看到t.Dial(network, addr)已经弃用了,这里兼容保留,优先使用 DialContext。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Deprecated: Use DialContext instead, which allows the transport
// to cancel dials as soon as they are no longer needed.
// If both are set, DialContext takes priority.
Dial func(network, addr string) (net.Conn, error)

// Examples:
// Dial("ip4:1", "192.0.2.1")
// Dial("ip6:ipv6-icmp", "2001:db8::1")
// Dial("ip6:58", "fe80::1%lo0")

func Dial(network, address string) (Conn, error) {
var d Dialer
return d.Dial(network, address)
}

func (d *Dialer) Dial(network, address string) (Conn, error) {
return d.DialContext(context.Background(), network, address)
}
// 发现最终也是调用的 DialContext()

最初我们获得的 DefaultTransport 中就初始化了 DialContext,其接口实现者 net.Dialer 包含了创建 TCP 连接的各种选项,如超时设置 timeout(TCP 连接建立超时时间,OS 中一般为 3mins),Deadline(限制了确定的时刻,与 timeout 作用类似),TCP 四元组的原始 IP 地址 LocalAddr 等。

DialContext 创建连接分为三个阶段

  1. resolveAddrList:根据本地地址网络类型以及地址族拆分目标地址,返回地址列表。
  2. dialSerial:对 resolveAddrList 返回的地址依次尝试创建连接,返回第一个创建成功的连接,否则返回第一个错误 firstErr。
  3. setKeepAlice:创建完连接后,检查该连接是否为 TCP 连接,如果是,则设置 KeepAlice 时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
if ctx == nil {
panic("nil context")
}
deadline := d.deadline(ctx, time.Now())
if !deadline.IsZero() {
if d, ok := ctx.Deadline(); !ok || deadline.Before(d) {
subCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
ctx = subCtx
}
}
if oldCancel := d.Cancel; oldCancel != nil {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-oldCancel:
cancel()
case <-subCtx.Done():
}
}()
ctx = subCtx
}

// Shadow the nettrace (if any) during resolve so Connect events don't fire for DNS lookups.
resolveCtx := ctx
if trace, _ := ctx.Value(nettrace.TraceKey{}).(*nettrace.Trace); trace != nil {
shadow := *trace
shadow.ConnectStart = nil
shadow.ConnectDone = nil
resolveCtx = context.WithValue(resolveCtx, nettrace.TraceKey{}, &shadow)
}

addrs, err := d.resolver().resolveAddrList(resolveCtx, "dial", network, address, d.LocalAddr)
if err != nil {
return nil, &OpError{Op: "dial", Net: network, Source: nil, Addr: nil, Err: err}
}

sd := &sysDialer{
Dialer: *d,
network: network,
address: address,
}

var primaries, fallbacks addrList
if d.dualStack() && network == "tcp" {
primaries, fallbacks = addrs.partition(isIPv4)
} else {
primaries = addrs
}

var c Conn
if len(fallbacks) > 0 {
c, err = sd.dialParallel(ctx, primaries, fallbacks)
} else {
c, err = sd.dialSerial(ctx, primaries)
}
if err != nil {
return nil, err
}

if tc, ok := c.(*TCPConn); ok && d.KeepAlive >= 0 {
setKeepAlive(tc.fd, true)
ka := d.KeepAlive
if d.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(tc.fd, ka)
testHookSetKeepAlive(ka)
}
return c, nil
}

发送请求等待响应

img

分析完 getConn() 后,我们回头来分析 Client.Do 的第二步resp, err = rt.RoundTrip(req)。它先将请求数据写入到 writech channel 中,writeLoop 接收到数据后就会处理请求。然后roundTrip()将 requestAndChan 结构体放入 reqch channel。roundTrip()循环等待,当 readLoop 读取到响应数据后就会通过 requestAndChan 结构体保存的 channel,将数据封装为 responseAndError 结构体回写,这样roundTrip()接到响应数据后就结束循环等待并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (t *Transport) roundTrip(req *Request) (*Response, error) {
...
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
}

var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
// http/1,走这
resp, err = pconn.roundTrip(treq)
}
...
}

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
...
startBytesWritten := pc.nwrite
writeErrCh := make(chan error, 1)
// 将请求数据写到 writech 中
pc.writech <- writeRequest{req, writeErrCh, continueCh}
// 用于接收响应的 channel
resc := make(chan responseAndError)
// 将用于接收响应的 channel 封装为 requestAndChan,写到 reqch channel 中
pc.reqch <- requestAndChan{
req: req.Request,
cancelKey: req.cancelKey,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}

for {
testHookWaitResLoop()
select {
...
// 接收响应数据
case re := <-resc:
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
// 返回响应数据
return re.res, nil
...
}
}
}

writeLoop 会请求数据 writeRequest,将 writech channel 中获取到的数据写入 TCP 连接中,并发送到目标服务器。当调用Request.write()向请求写入数据时,实际上直接将数据写入了 persistConnWriter 封装的 TCP 连接中bw.Flush(),TCP 协议栈负责将 HTTP 请求中的内容发送到目标服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
select {
case wr := <-pc.writech:
startBytesWritten := pc.nwrite
// 向 TCP 连接写入数据,并发送到目标服务器。
// 里面是将数据写到 pc.bw 中,然后调用 bw.Flush()
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
...
case <-pc.closech:
return
}
}
}

type persistConnWriter struct {
pc *persistConn
}

func (w persistConnWriter) Write(p []byte) (n int, err error) {
n, err = w.pc.conn.Write(p)
w.pc.nwrite += int64(n)
return
}

TCP 连接中的响应数据通过 roundTrip 传入的 channel 再回写,然后 roundTrip 就会接收到数据并返回获取到的响应数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
...
alive := true
for alive {
pc.readLimit = pc.maxHeaderResponseSize()
// 获取 roundTrip 发送的结构体
rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context())

var resp *Response
if err == nil {
// 读取数据
resp, err = pc.readResponse(rc, trace)
} else {
err = transportReadFromServerError{err}
closeErr = err
}
...
// 将响应数据写回到 channel 中
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
}
}

至此,HTTP 标准包中 Client 端的大致处理流程已经介绍完了,我们总结一下大致步骤:

  1. 初始化请求:首先通过 NewRequest() 校验请求信息,并将请求封装为 request 结构体,然后调用 Client.Do() 发送请求。
  2. 发送请求准备Client.Do() 中调用了 Client.send() 通过 Client.transport() 获取默认的 Transport 实例,然后调用 send()。
  3. 获取连接:send() 中先通过 getConn() 获取连接,其中先调用 queueForIdleConn() 获取空闲连接,如果获取不到,则调用 queueForDial() 创建新的连接。获取连接前需要校验连接数是否超过限制,然后异步调用 dialConnFor() 创建连接,其中是调用 dialConn() 来创建 TCP 连接,然后启用两个异步 goroutine 来处理数据,调用 tryDeliver() 将连接绑定在 wantConn 上。
  4. 发送请求并等待响应:成功获取连接后,在 send() 的下文调用 RoundTrip(),它将请求数据写入 writech 中,异步执行的 writeLoop 接收到数据后就会处理请求。然后 roundTrip() 将封装的 requestAndChan 结构体放入 reqch,进入循环等待。当 readLoop 读取到响应数据后会通过 requestAndChan 保存的 channel,将数据封装为 responseAndError 结构体写回。循环等待的 roundTrip() 接收到响应数据后就结束循环等待,并返回。

Server

同样以一个简单的例子开头:下面的例子监听 8000 端口,并在客户端请求路径/时在控制台打印Hello World

1
2
3
4
5
6
7
8
func HelloHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello World")
}

func main() {
http.HandleFunc("/", HelloHandler)
http.ListenAndServe(":8000", nil)
}

借用 luozhiyun 博主的图囊括一下流程:

img

从上图看出大致步骤:

  1. 注册 handler 到 handler map 中。
  2. open listener 开启循环监听,每听到一个连接就会创建一个 goroutine。
  3. 在创建好的 goroutine 中 accept loop 循环等待接收数据,异步处理请求。
  4. 有数据到来时根据请求地址去 handler map 中 match handler,然后将 request 交给 handler 处理。

注册处理器

上述例子中,使用了http.HandleFunc()来注册 handler。其调用的是DefaultServeMux.HandleFunc(pattern, handler),最终调用mux.Handle(pattern, HandlerFunc(handler))来为给定的 pattern 注册 handler。如果对应的 pattern 已经存在一个 handler,则会 panic。其实 mux.m 就是一个 hash 表,用于精确匹配。当 pattern 尾字符为’/'时,放入[]muxEntry 用于部分匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}

type HandlerFunc func(ResponseWriter, *Request)

// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}

// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
if handler == nil {
panic("http: nil handler")
}
mux.Handle(pattern, HandlerFunc(handler))
}

func (mux *ServeMux) Handle(pattern string, handler Handler) {
mux.mu.Lock()
defer mux.mu.Unlock()

if pattern == "" {
panic("http: invalid pattern")
}
if handler == nil {
panic("http: nil handler")
}
// 已存在,panic
if _, exist := mux.m[pattern]; exist {
panic("http: multiple registrations for " + pattern)
}

if mux.m == nil {
mux.m = make(map[string]muxEntry)
}
e := muxEntry{h: handler, pattern: pattern}
// 其实就是一个 map,存储了 muxEntry 对象,封装了 pattern 和 handler。
mux.m[pattern] = e
// 如果尾字符为'/',则将对应的 muxEntry 放到 []muxEntry 中,用于部分匹配
if pattern[len(pattern)-1] == '/' {
// 保证了长到短有序
mux.es = appendSorted(mux.es, e)
}

if pattern[0] != '/' {
mux.hosts = true
}
}

func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
n := len(es)
i := sort.Search(n, func(i int) bool {
return len(es[i].pattern) < len(e.pattern)
})
if i == n {
return append(es, e)
}
// we now know that i points at where we want to insert
es = append(es, muxEntry{}) // try to grow the slice in place, any entry works.
copy(es[i+1:], es[i:]) // Move shorter entries down
es[i] = e
return es
}

循环监听

监听通过调用http.ListenAndServe(":8000", nil)实现,其调用的是server.ListenAndServe()。server 封装了给定的 addr 和 handler,handler 通常情况下为 nil,这个 handler 用于处理全局的请求。核心逻辑是net.Listen()server.Serve()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
// 监听端口
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
// 循环接收监听到的网络请求
return srv.Serve(ln)
}

func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}

Serve() 接收每个连接,并为每个连接创建一个 goroutine 来读取请求,然后调用 srv.Handler 来回复它们。里面用到了一个循环去接收监听到的网络连接,如果并发很高的话,可能会一次性创建太多协程,导致处理不过来的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (srv *Server) Serve(l net.Listener) error {
...
l = &onceCloseListener{Listener: l}
defer l.Close()
...
baseCtx := context.Background()
...
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept()
if err != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
...
return err
}
connCtx := ctx
...
// 为每个连接创建新的net/http.conn,是HTTP连接服务端的抽象
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
}
}

处理请求

读取请求调用的是c.readRequest(ctx),分发请求需要看具体的实现,调用这个接口serverHandler{c.server}.ServeHTTP(w, w.req)时,最终调用的是ServeMux.ServeHTTP(),然后通过 handler 调用到match()进行路由匹配。最终是调用handler.ServeHTTP()处理请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
...
// HTTP/1.x from here on.
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()

c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)

for {
// 读取请求
w, err := c.readRequest(ctx)
...
// 根据匹配到的 handler 处理请求
serverHandler{c.server}.ServeHTTP(w, w.req)
...
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
...
}
}

func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
// 一般设为 nil,会默认填充 DefaultServeMux
if handler == nil {
handler = DefaultServeMux
}
// 如果请求*或请求方法为 OPTIONS(与同源策略相关,表示一种试探请求)
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
...
handler.ServeHTTP(rw, req)
}

// 将请求分派给 handler
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
}
w.WriteHeader(StatusBadRequest)
return
}
h, _ := mux.Handler(r)
h.ServeHTTP(w, r)
}


// 处理器
func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {
...
return mux.handler(host, r.URL.Path)
}

func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) {
mux.mu.RLock()
defer mux.mu.RUnlock()

// Host-specific pattern takes precedence over generic ones
if mux.hosts {
h, pattern = mux.match(host + path)
}
if h == nil {
h, pattern = mux.match(path)
}
if h == nil {
h, pattern = NotFoundHandler(), ""
}
return
}

// Find a handler on a handler map given a path string.
// Most-specific (longest) pattern wins.
func (mux *ServeMux) match(path string) (h Handler, pattern string) {
// 先检查精确匹配
v, ok := mux.m[path]
if ok {
return v.h, v.pattern
}
// 找最长的合法匹配,一直匹配到下一个父节点路由,直到根路由
// mux.es 包含的路径是长到短有序的
for _, e := range mux.es {
if strings.HasPrefix(path, e.pattern) {
return e.h, e.pattern
}
}
return nil, ""
}

最后调用handler.ServeHTTP()来处理请求。

1
2
3
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}

至此,HTTP 标准库为 Server 端提供请求处理的流程大致介绍完了。简单来看就是注册处理器(在 Go Web 中,挺多人称呼 handler 为“中间件”,虽然我不太能理解= =)、监听端口、处理请求,当请求到来时为其创建一个 goroutine 处理请求,主要是根据 url 来分派 handler,调用 handler 方法来处理请求。

接口型函数

在上文中,我们看到了一个比较特殊的实现:HandlerFunc。HTTP 标准库中定义了一个接口Handler,代表处理器,只包含一个方法ServeHTTP(ResponseWriter, *Request),接着定义了一个函数类型HandlerFunc,它的参数与返回值都和 Handler 里的 ServeHTTP 方法是一致的。而且 HandlerFunc 还定义了 ServeHTTP 方法,并在其中调用自己,这样就实现了接口 Handler。所以 HadnlerFunc 是一个实现了接口的函数类型,简称为接口型函数。

1
2
3
4
5
6
7
8
9
10
11

type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}

type HandlerFunc func(ResponseWriter, *Request)

// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}

优势

我们以上文的mux.Handle(pattern, HandlerFunc(handler))为例,它的作用是使用某个处理器方法来处理匹配路径的请求,而 handler 则代表了处理器方法。

1
2
3
4
5
6
7

func (mux *ServeMux) Handle(pattern string, handler Handler) {
...
e := muxEntry{h: handler, pattern: pattern}
mux.m[pattern] = e
...
}

我们可以使用多种方式来调用这个函数,例如将 HandlerFunc 类型的函数作为参数,这样支持匿名函数,也支持普通的函数,还有一种方式是将实现了 Handler 接口的结构体作为参数,但是这种方式适用于逻辑较为复杂的场景,如果需要的信息比较多,而且还有很多中间状态要保持,那么封装为一个结构体显然更符合情况。

通过接口型函数,该方法的参数既可以是普通的函数类型,也可以是结构体,使用起来更为灵活,可读性更好。

1
2
3
4
5
6
7
8
9
10

func home(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("hello, index page"))
}

func main() {
http.Handle("/home", http.HandlerFunc(home))
_ = http.ListenAndServe("localhost:8000", nil)
}

总结

HTTP 标准库的实现相对来说比较简单,感觉能学到的东西比较少,不过想要学习更多的网络框架、Web 框架,HTTP 标准库还是先要了解的。

参考与推荐阅读