基于HTTP构建的服务标准模型包括客户端Client和服务端Server。HTTP请求从客户端发出,服务端接收到请求后进行处理,然后响应返回给客户端。因此HTTP服务器的工作就在于如何接受来自客户端的请求,并向客户端返回响应。典型的HTTP服务如下图:

img

Client

以下是一个简单示例,在例子中,我们通过http标准库向给定的url:http://httpbin.org/get发送了一个Get请求,请求参数为name=makonike。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
"io"
"net/http"
)

func main() {
resp, err := http.Get("http://httpbin.org/get?name=makonike")
if err != nil {
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
fmt.Println(string(body))
}

http.Get()中,它调用的是DefaultClient.Get(),DefaultClient是Client的一个空实例,实际上调用的是Client.Get()

1
2
// DefaultClient is the default Client and is used by Get, Head, and Post.
var DefaultClient = &Client{}

Client结构体

1
2
3
4
5
6
type Client struct {
Transport RoundTripper
CheckRedirect func(req *Request, via []*Request) error
Jar CookieJar
Timeout time.Duration
}

Client结构体总共包含四个字段:

  • Transport:表示HTTP事务,用于处理客户端请求连接并等待服务端的响应
  • CheckRedirect:用于指定处理重定向策略
  • Jar:用于存储和管理请求中的cookie
  • Timeout:设置客户端请求的最大超时时间,包括连接、任何重定向以及读取响应的时间

初始化请求

Get()向指定url发出Get请求,其中先要根据请求类型构造一个完整的请求,包含请求头、请求体和请求参数,然后才根据请求的完整结构来执行请求。NewRequest()用于构造请求,它会调用NewRequestWithContext(),它返回一个Request结构体,其中包含了一个HTTP请求的所有信息。

1
2
3
4
5
6
7
8
9
10
11
12
func (c *Client) Get(url string) (resp *Response, err error) {
req, err := NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return c.Do(req)
}

// NewRequest wraps NewRequestWithContext using context.Background.
func NewRequest(method, url string, body io.Reader) (*Request, error) {
return NewRequestWithContext(context.Background(), method, url, body)
}

Request是请求的抽象结构体,其中有很多字段都是我们已熟知的,这里仅仅列举一部分:

1
2
3
4
5
6
7
type Request struct {
Method string // 请求方法
URL *url.URL // 请求路径
Header Header // 请求头
Body io.ReadCloser // 请求体
...
}

我们直接来看看NewRequestWithContext(),它的作用是将请求封装成一个Request结构体并返回。它强制请求方法不为空,并校验了请求方法的有效性,同时解析url,最终封装请求信息为一个Request结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error) {
// 如果请求方法为空,那么将其设为GET
if method == "" {
method = "GET"
}
// 校验请求方法是否被支持
if !validMethod(method) {
return nil, fmt.Errorf("net/http: invalid method %q", method)
}
// 上下文信息,前文使用的是context.Background(),返回一个空的context
if ctx == nil {
return nil, errors.New("net/http: nil Context")
}
// 解析url
u, err := urlpkg.Parse(url)
if err != nil {
return nil, err
}
rc, ok := body.(io.ReadCloser)
if !ok && body != nil {
rc = io.NopCloser(body)
}
// The host's colon:port should be normalized. See Issue 14836.
u.Host = removeEmptyPort(u.Host)
req := &Request{
ctx: ctx,
Method: method,
URL: u,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(Header),
Body: rc,
Host: u.Host,
}
...
return req, nil
}

获取到一个完整的Request结构体后,Get()会调用c.Do(req)发送请求,它会调用c.do(req),然后调用c.send(req, deadline),最终会调用到send()。Client.Do的逻辑主要分为两段,一段是调用send()发送请求接收Response,关闭Req.Body,另一段是对需要redirect的请求执行重定向操作,并关闭Resp.Body。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
...
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
...
return resp, nil, nil
}

func (c *Client) Do(req *Request) (*Response, error) {
return c.do(req)
}

func (c *Client) do(req *Request) (retres *Response, reterr error) {
// 发送请求并接收Response
if resp, didTimeout, err = c.send(req, deadline); err != nil {
// c.send() always closes req.Body
reqBodyClosed = true
if !deadline.IsZero() && didTimeout() {
err = &httpError{
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
return nil, uerr(err)
}
// 检查是否应该重定向
var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
return resp, nil
}
req.closeBody()
}

func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}

Client.send()调用send()进行下一步处理前,会先调用c.transport()获取DefaultTransport实例。

1
2
3
4
5
6
7
8
9
10
11
12
var DefaultTransport RoundTripper = &Transport{
Proxy: ProxyFromEnvironment, // 对特定的请求返回代理
DialContext: defaultTransportDialContext(&net.Dialer{ // 指定底层TCP连接的创建函数
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}),
ForceAttemptHTTP2: true,
MaxIdleConns: 100, // 最大空闲连接数
IdleConnTimeout: 90 * time.Second, // 空闲连接超时时间
TLSHandshakeTimeout: 10 * time.Second, // TLS握手超时时间
ExpectContinueTimeout: 1 * time.Second,
}

RoundTripper负责HTTP请求的建立,发送,接收HTTP应答以及关闭,但是不对HTTP响应进行额外处理,例如:redirects, authentication, or cookies等上层协议细节。Transport实现了RoundTripper接口,它实现的RoundTrip()方法会具体地处理请求,处理完毕后返回Response。

1
2
3
type RoundTripper interface {
RoundTrip(*Request) (*Response, error)
}

回到Client.send(),它会调用send(),这个函数的主要逻辑交给获取到的Transport实现的RoundTrip()来执行,RoundTrip()会调用到roundTrip()

1
2
3
4
5
6
7
8
9
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
...
resp, err = rt.RoundTrip(req)
...
}

func (t *Transport) RoundTrip(req *Request) (*Response, error) {
return t.roundTrip(req)
}

roundTrip()中会做两件事,一是调用Transport的getConn()获取连接,二是在获取到连接后,调用persistConn的roundTrip()发送请求,等待响应结果。persistConn是对连接的一层封装,是持久化连接的抽象,通常表示keep-alive连接,也可以表示non-keep-alive连接,它与Client和Transport的关系大致如下图:

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)
...
// 根据url的协议选择对应的实现来替代默认的逻辑,主要用了useRegisteredProtocol()
scheme := req.URL.Scheme
if altRT := t.alternateRoundTripper(req); altRT != nil {
if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
return resp, err
}
var err error
req, err = rewindBody(req)
if err != nil {
return nil, err
}
}
...
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}
// 封装请求
treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// 获取连接
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
}
// 等待响应结果
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
resp.Request = origReq
return resp, nil
}
...
}
}

func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
if !t.useRegisteredProtocol(req) {
return nil
}
altProto, _ := t.altProto.Load().(map[string]RoundTripper)
return altProto[req.URL.Scheme]
}

func (t *Transport) useRegisteredProtocol(req *Request) bool {
if req.URL.Scheme == "https" && req.requiresHTTP1() {
return false
}
return true
}

func (r *Request) requiresHTTP1() bool {
return hasToken(r.Header.Get("Connection"), "upgrade") &&
ascii.EqualFold(r.Header.Get("Upgrade"), "websocket")
}

获取连接

getConn()有两个阶段:

  1. 调用queueForIdleConn()获取空闲连接
  2. 当不存在空闲连接时,调用queueForDial()创建新的连接

img

可以看到关系图如上。getConn()先封装请求为wantConn结构体,然后调用queueForIdleConn()获取空闲连接,如果成功获取则返回连接,失败了则调用queueForDial()创建一个新连接,进行后续的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}
// 将请求封装成wantConn结构体
w := &wantConn{
cm: cm,
key: cm.key(), // 注意这个字段:其实是connectMethodKey结构体类型
// 是对url关键字段的分解,代表连接的目标方(代理,协议,目的地址)
// 包含了proxyURL,targetAddr、Scheme和onlyH1
// onlyH1 bool // whether to disable HTTP/2 and force HTTP/1
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {
w.cancel(t, err)
}
}()

// Queue for idle connection.
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
// Trace only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
t.setReqCanceler(treq.cancelKey, func(error) {})
return pc, nil
}

cancelc := make(chan error, 1)
t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })

// Queue for permission to dial.
t.queueForDial(w)

// Wait for completion or cancellation.
select {
// 成功获取连接后进入该分支
case <-w.ready:
// Trace success but only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
}
if w.err != nil {
select {
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// return below
}
}
return w.pc, w.err
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}

// 对url关键字段的分解,代表连接的目标方(代理,协议,目的地址)
func (cm *connectMethod) key() connectMethodKey {
proxyStr := ""
targetAddr := cm.targetAddr
if cm.proxyURL != nil {
proxyStr = cm.proxyURL.String()
if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
targetAddr = ""
}
}
return connectMethodKey{
proxy: proxyStr,
scheme: cm.targetScheme,
addr: targetAddr,
onlyH1: cm.onlyH1,
}
}

来看看queueForIdleConn(),它先根据wantConn.key去idleConn map中看看是否存在空闲的connection list,如果能获取到,则获取列表中最后一个connection返回,否则将当前wantConn加入到idleConnWait map中。这部分的逻辑比较简单,直接看代码就能理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
// 禁用长连接的话每次都需要新的connection,直接返回false即可
if t.DisableKeepAlives {
return false
}
t.idleMu.Lock()
defer t.idleMu.Unlock()
// 阻止关闭空闲连接,因为现在我们正在找一个空闲连接用
t.closeIdle = false
...
// If IdleConnTimeout is set, calculate the oldest
// persistConn.idleAt time we're willing to use a cached idle
// conn.
var oldTime time.Time
// 设置超时
if t.IdleConnTimeout > 0 {
oldTime = time.Now().Add(-t.IdleConnTimeout)
}

// 看看最近使用的空闲连接,找到w.key相同的connection list
// values是persistConn的列表
// persistConn是对连接的一层封装,通常表示keep-alive连接,也可以表示non-keep-alive连接
if list, ok := t.idleConn[w.key]; ok {
stop := false
delivered := false
for len(list) > 0 && !stop {
// 有连接,获取最后一个
pconn := list[len(list)-1]

// 看看这个connection是不是等太久了
tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
if tooOld {
// 异步清除
go pconn.closeConnIfStillIdle()
}
// 如果标记已断开,或者这个connection等太久了执行异步清除,就得忽略它找下一个。
if pconn.isBroken() || tooOld {
list = list[:len(list)-1]
continue
}
// 尝试将整个connection写到wantConn中
delivered = w.tryDeliver(pconn, nil)
// 如果操作成功,需要将connection从idle connection list中删除
if delivered {
if pconn.alt != nil {
// HTTP/2: multiple clients can share pconn.
// Leave it in the list.
} else {
// HTTP/1: only one client can use pconn.
// Remove it from the list.
t.idleLRU.remove(pconn)
list = list[:len(list)-1]
}
}
stop = true
}
if len(list) > 0 {
t.idleConn[w.key] = list
} else {
// 如果list为0了,则将对应的list从map中删除,可以避免下次判断,尽早去创建连接
// 这里也是为了防止idleConn的list过多。
delete(t.idleConn, w.key)
}
if stop {
return delivered
}
}
// 如果找不到空闲connection
if t.idleConnWait == nil {
t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
}
// 将wantConn加入到idleConnWait map中
q := t.idleConnWait[w.key]
q.cleanFront()
q.pushBack(w)
t.idleConnWait[w.key] = q
return false
}

来看看queueForDial(),它在获取不到同一个url对应的空闲连接时调用,会尝试去创建一个连接,这里主要是参数校验,创建连接的逻辑在dialConnFor()中。调用过程如下:

  1. 先校验MaxConnsPerHost是否未设置或已到达上限。如果校验不通过则将请求放在connsPerHostWait map中。
  2. 如果校验通过,会异步调用dialConnFor()创建连接
  3. dialConnFor()首先调用dialConn()创建TCP连接,然后启用两个异步线程来处理数据,然后调用tryDeliver将连接绑定在wantConn上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (t *Transport) queueForDial(w *wantConn) {
w.beforeDial()
// 小于0说明每个host的最大connection数量无限制,直接异步调用dialConnFor(),然后返回
if t.MaxConnsPerHost <= 0 {
go t.dialConnFor(w)
return
}

t.connsPerHostMu.Lock()
defer t.connsPerHostMu.Unlock()
// 连接数没达到上限,异步建立连接
// MaxConnsPerHost 控制某个host的所有连接,包括创建中,正在使用的,以及空闲的连接
// 一旦超过限制,dial会阻塞
if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
if t.connsPerHost == nil {
t.connsPerHost = make(map[connectMethodKey]int)
}
t.connsPerHost[w.key] = n + 1
go t.dialConnFor(w)
return
}
// 建立的连接数已到上限,需要进入等待队列
if t.connsPerHostWait == nil {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.connsPerHostWait[w.key]
q.cleanFront()
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}

func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()
// 建立连接
pc, err := t.dialConn(w.ctx, w.cm)
// 连接绑定wantConn
delivered := w.tryDeliver(pc, err)
// 连接建立成功,但是绑定wantConn失败:可能是HTTP/2或被共享的
// 那么将该连接放到idleConnection map中
if err == nil && (!delivered || pc.alt != nil) {
// pconn was not passed to w,
// or it is HTTP/2 and can be shared.
// Add to the idle connection pool.
t.putOrCloseIdleConn(pc)
}
if err != nil {
t.decConnsPerHost(w.key)
}
}

dialConnFor()调用dialConn()创建TCP连接,然后调用tryDeliver()将其与wantConn绑定。在dialConn()中,会根据scheme的不同设置不同的连接配置,如下是HTTP连接的创建过程,创建完成后会开俩goroutine为该连接异步处理读写数据。创建的连接结构体包含了俩channel:writech负责写入请求数据,reqch负责响应数据。开的两个goroutine异步循环就是处理其中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
// 持久化连接
pconn = &persistConn{
t: t,
cacheKey: cm.key(), // 分解url,as cache key
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
...
// 根据不同scheme进行不同的配置
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
// 执行TLS握手过程
...
} else {
// 建立tcp连接
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}
// proxy ...
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
if e, ok := alt.(erringRoundTripper); ok {
// pconn.conn was closed by next (http2configureTransports.upgradeFn).
return nil, e.RoundTripErr()
}
return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
}
}
// 设置读写buffer
pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
// 俩buffer都关联了persistConn
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
// 为每个连接开俩gorotuine异步处理读写数据
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}

// 建立连接
func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
if t.DialContext != nil {
return t.DialContext(ctx, network, addr)
}
if t.Dial != nil {
c, err := t.Dial(network, addr)
if c == nil && err == nil {
err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
}
return c, err
}
return zeroDialer.DialContext(ctx, network, addr)
}

建立连接用到了DialContext,它用于创建指定网络协议(如tcp,udp),指定地址的连接,如下:

1
2
3
4
5
6
Dial("tcp", "golang.org:http")
Dial("tcp", "192.0.2.1:http")
Dial("tcp", "198.51.100.1:80")
Dial("udp", "[2001:db8::1]:domain")
Dial("udp", "[fe80::1%lo0]:53")
Dial("tcp", ":80")

通过注释可以看到t.Dial(network, addr)已经弃用了,这里兼容保留,优先使用DialContext。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Deprecated: Use DialContext instead, which allows the transport
// to cancel dials as soon as they are no longer needed.
// If both are set, DialContext takes priority.
Dial func(network, addr string) (net.Conn, error)

// Examples:
// Dial("ip4:1", "192.0.2.1")
// Dial("ip6:ipv6-icmp", "2001:db8::1")
// Dial("ip6:58", "fe80::1%lo0")

func Dial(network, address string) (Conn, error) {
var d Dialer
return d.Dial(network, address)
}

func (d *Dialer) Dial(network, address string) (Conn, error) {
return d.DialContext(context.Background(), network, address)
}
// 发现最终也是调用的DialContext()

最初我们获得的DefaultTransport中就初始化了DialContext,其接口实现者net.Dialer包含了创建TCP连接的各种选项,如超时设置timeout(TCP连接建立超时时间,OS中一般为3mins),Deadline(限制了确定的时刻,与timeout作用类似),TCP四元组的原始IP地址LocalAddr等。

DialContext创建连接分为三个阶段

  1. resolveAddrList:根据本地地址网络类型以及地址族拆分目标地址,返回地址列表。
  2. dialSerial:对resolveAddrList返回的地址依次尝试创建连接,返回第一个创建成功的连接,否则返回第一个错误firstErr。
  3. setKeepAlice:创建完连接后,检查该连接是否为TCP连接,如果是,则设置KeepAlice时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
if ctx == nil {
panic("nil context")
}
deadline := d.deadline(ctx, time.Now())
if !deadline.IsZero() {
if d, ok := ctx.Deadline(); !ok || deadline.Before(d) {
subCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
ctx = subCtx
}
}
if oldCancel := d.Cancel; oldCancel != nil {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-oldCancel:
cancel()
case <-subCtx.Done():
}
}()
ctx = subCtx
}

// Shadow the nettrace (if any) during resolve so Connect events don't fire for DNS lookups.
resolveCtx := ctx
if trace, _ := ctx.Value(nettrace.TraceKey{}).(*nettrace.Trace); trace != nil {
shadow := *trace
shadow.ConnectStart = nil
shadow.ConnectDone = nil
resolveCtx = context.WithValue(resolveCtx, nettrace.TraceKey{}, &shadow)
}

addrs, err := d.resolver().resolveAddrList(resolveCtx, "dial", network, address, d.LocalAddr)
if err != nil {
return nil, &OpError{Op: "dial", Net: network, Source: nil, Addr: nil, Err: err}
}

sd := &sysDialer{
Dialer: *d,
network: network,
address: address,
}

var primaries, fallbacks addrList
if d.dualStack() && network == "tcp" {
primaries, fallbacks = addrs.partition(isIPv4)
} else {
primaries = addrs
}

var c Conn
if len(fallbacks) > 0 {
c, err = sd.dialParallel(ctx, primaries, fallbacks)
} else {
c, err = sd.dialSerial(ctx, primaries)
}
if err != nil {
return nil, err
}

if tc, ok := c.(*TCPConn); ok && d.KeepAlive >= 0 {
setKeepAlive(tc.fd, true)
ka := d.KeepAlive
if d.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(tc.fd, ka)
testHookSetKeepAlive(ka)
}
return c, nil
}

发送请求等待响应

img

分析完getConn()后,我们回头来分析Client.Do的第二步resp, err = rt.RoundTrip(req)。它先将请求数据写入到writech channel中,writeLoop接收到数据后就会处理请求。然后roundTrip()将requestAndChan结构体放入reqch channel。roundTrip()循环等待,当readLoop读取到响应数据后就会通过requestAndChan结构体保存的channel,将数据封装为responseAndError结构体回写,这样roundTrip()接到响应数据后就结束循环等待并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (t *Transport) roundTrip(req *Request) (*Response, error) {
...
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
}

var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
// http/1,走这
resp, err = pconn.roundTrip(treq)
}
...
}

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
...
startBytesWritten := pc.nwrite
writeErrCh := make(chan error, 1)
// 将请求数据写到writech中
pc.writech <- writeRequest{req, writeErrCh, continueCh}
// 用于接收响应的channel
resc := make(chan responseAndError)
// 将用于接收响应的channel封装为requestAndChan,写到reqch channel中
pc.reqch <- requestAndChan{
req: req.Request,
cancelKey: req.cancelKey,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}

for {
testHookWaitResLoop()
select {
...
// 接收响应数据
case re := <-resc:
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
// 返回响应数据
return re.res, nil
...
}
}
}

writeLoop会请求数据writeRequest,将writech channel中获取到的数据写入TCP连接中,并发送到目标服务器。当调用Request.write()向请求写入数据时,实际上直接将数据写入了persistConnWriter封装的TCP连接中bw.Flush(),TCP协议栈负责将HTTP请求中的内容发送到目标服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
select {
case wr := <-pc.writech:
startBytesWritten := pc.nwrite
// 向TCP连接写入数据,并发送到目标服务器。
// 里面是将数据写到pc.bw中,然后调用bw.Flush()
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
...
case <-pc.closech:
return
}
}
}

type persistConnWriter struct {
pc *persistConn
}

func (w persistConnWriter) Write(p []byte) (n int, err error) {
n, err = w.pc.conn.Write(p)
w.pc.nwrite += int64(n)
return
}

TCP连接中的响应数据通过roundTrip传入的channel再回写,然后roundTrip就会接收到数据并返回获取到的响应数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
...
alive := true
for alive {
pc.readLimit = pc.maxHeaderResponseSize()
// 获取roundTrip发送的结构体
rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context())

var resp *Response
if err == nil {
// 读取数据
resp, err = pc.readResponse(rc, trace)
} else {
err = transportReadFromServerError{err}
closeErr = err
}
...
// 将响应数据写回到channel中
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
}
}

至此,HTTP标准包中Client端的大致处理流程已经介绍完了,我们总结一下大致步骤:

  1. 初始化请求:首先通过NewRequest()校验请求信息,并将请求封装为request结构体,然后调用Client.Do()发送请求。
  2. 发送请求准备:Client.Do()中调用了Client.send()通过Client.transport()获取默认的Transport实例,然后调用send()。
  3. 获取连接:send()中先通过getConn()获取连接,其中先调用queueForIdleConn()获取空闲连接,如果获取不到,则调用queueForDial()创建新的连接。获取连接前需要校验连接数是否超过限制,然后异步调用dialConnFor()创建连接,其中是调用dialConn()来创建TCP连接,然后启用两个异步goroutine来处理数据,调用tryDeliver()将连接绑定在wantConn上。
  4. 发送请求并等待响应:成功获取连接后,在send()的下文调用RoundTrip(),它将请求数据写入writech中,异步执行的writeLoop接收到数据后就会处理请求。然后roundTrip()将封装的requestAndChan结构体放入reqch,进入循环等待。当readLoop读取到响应数据后会通过requestAndChan保存的channel,将数据封装为responseAndError结构体写回。循环等待的roundTrip()接收到响应数据后就结束循环等待,并返回。

Server

同样以一个简单的例子开头:下面的例子监听8000端口,并在客户端请求路径/时在控制台打印Hello World

1
2
3
4
5
6
7
8
func HelloHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello World")
}

func main() {
http.HandleFunc("/", HelloHandler)
http.ListenAndServe(":8000", nil)
}

借用luozhiyun博主的图囊括一下流程:

img

从上图看出大致步骤:

  1. 注册handler到handler map中。
  2. open listener 开启循环监听,每听到一个连接就会创建一个goroutine。
  3. 在创建好的goroutine中accept loop循环等待接收数据,异步处理请求。
  4. 有数据到来时根据请求地址去handler map中match handler,然后将request交给handler处理。

注册处理器

上述例子中,使用了http.HandleFunc()来注册handler。其调用的是DefaultServeMux.HandleFunc(pattern, handler),最终调用mux.Handle(pattern, HandlerFunc(handler))来为给定的pattern注册handler。如果对应的pattern已经存在一个handler,则会panic。其实mux.m就是一个hash表,用于精确匹配。当pattern尾字符为’/‘时,放入[]muxEntry用于部分匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}

type HandlerFunc func(ResponseWriter, *Request)

// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}

// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
if handler == nil {
panic("http: nil handler")
}
mux.Handle(pattern, HandlerFunc(handler))
}

func (mux *ServeMux) Handle(pattern string, handler Handler) {
mux.mu.Lock()
defer mux.mu.Unlock()

if pattern == "" {
panic("http: invalid pattern")
}
if handler == nil {
panic("http: nil handler")
}
// 已存在,panic
if _, exist := mux.m[pattern]; exist {
panic("http: multiple registrations for " + pattern)
}

if mux.m == nil {
mux.m = make(map[string]muxEntry)
}
e := muxEntry{h: handler, pattern: pattern}
// 其实就是一个map,存储了muxEntry对象,封装了pattern和handler。
mux.m[pattern] = e
// 如果尾字符为'/',则将对应的muxEntry放到[]muxEntry中,用于部分匹配
if pattern[len(pattern)-1] == '/' {
// 保证了长到短有序
mux.es = appendSorted(mux.es, e)
}

if pattern[0] != '/' {
mux.hosts = true
}
}

func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
n := len(es)
i := sort.Search(n, func(i int) bool {
return len(es[i].pattern) < len(e.pattern)
})
if i == n {
return append(es, e)
}
// we now know that i points at where we want to insert
es = append(es, muxEntry{}) // try to grow the slice in place, any entry works.
copy(es[i+1:], es[i:]) // Move shorter entries down
es[i] = e
return es
}

循环监听

监听通过调用http.ListenAndServe(":8000", nil)实现,其调用的是server.ListenAndServe()。server封装了给定的addr和handler,handler通常情况下为nil,这个handler用于处理全局的请求。核心逻辑是net.Listen()server.Serve()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
// 监听端口
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
// 循环接收监听到的网络请求
return srv.Serve(ln)
}

func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}

Serve()接收每个连接,并为每个连接创建一个goroutine来读取请求,然后调用srv.Handler来回复它们。里面用到了一个循环去接收监听到的网络连接,如果并发很高的话,可能会一次性创建太多协程,导致处理不过来的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (srv *Server) Serve(l net.Listener) error {
...
l = &onceCloseListener{Listener: l}
defer l.Close()
...
baseCtx := context.Background()
...
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept()
if err != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
...
return err
}
connCtx := ctx
...
// 为每个连接创建新的net/http.conn,是HTTP连接服务端的抽象
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
}
}

处理请求

读取请求调用的是c.readRequest(ctx),分发请求需要看具体的实现,调用这个接口serverHandler{c.server}.ServeHTTP(w, w.req)时,最终调用的是ServeMux.ServeHTTP(),然后通过handler调用到match()进行路由匹配。最终是调用handler.ServeHTTP()处理请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
...
// HTTP/1.x from here on.
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()

c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)

for {
// 读取请求
w, err := c.readRequest(ctx)
...
// 根据匹配到的handler处理请求
serverHandler{c.server}.ServeHTTP(w, w.req)
...
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
...
}
}

func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
// 一般设为nil,会默认填充DefaultServeMux
if handler == nil {
handler = DefaultServeMux
}
// 如果请求*或请求方法为OPTIONS(与同源策略相关,表示一种试探请求)
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
...
handler.ServeHTTP(rw, req)
}

// 将请求分派给handler
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
}
w.WriteHeader(StatusBadRequest)
return
}
h, _ := mux.Handler(r)
h.ServeHTTP(w, r)
}


// 处理器
func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {
...
return mux.handler(host, r.URL.Path)
}

func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) {
mux.mu.RLock()
defer mux.mu.RUnlock()

// Host-specific pattern takes precedence over generic ones
if mux.hosts {
h, pattern = mux.match(host + path)
}
if h == nil {
h, pattern = mux.match(path)
}
if h == nil {
h, pattern = NotFoundHandler(), ""
}
return
}

// Find a handler on a handler map given a path string.
// Most-specific (longest) pattern wins.
func (mux *ServeMux) match(path string) (h Handler, pattern string) {
// 先检查精确匹配
v, ok := mux.m[path]
if ok {
return v.h, v.pattern
}
// 找最长的合法匹配,一直匹配到下一个父节点路由,直到根路由
// mux.es包含的路径是长到短有序的
for _, e := range mux.es {
if strings.HasPrefix(path, e.pattern) {
return e.h, e.pattern
}
}
return nil, ""
}

最后调用handler.ServeHTTP()来处理请求。

1
2
3
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}

至此,HTTP标准库为Server端提供请求处理的流程大致介绍完了。简单来看就是注册处理器(在Go Web中,挺多人称呼handler为“中间件”,虽然我不太能理解= =)、监听端口、处理请求,当请求到来时为其创建一个goroutine处理请求,主要是根据url来分派handler,调用handler方法来处理请求。

接口型函数

在上文中,我们看到了一个比较特殊的实现:HandlerFunc。HTTP标准库中定义了一个接口Handler,代表处理器,只包含一个方法ServeHTTP(ResponseWriter, *Request),接着定义了一个函数类型HandlerFunc,它的参数与返回值都和Handler里的ServeHTTP方法是一致的。而且HandlerFunc还定义了ServeHTTP方法,并在其中调用自己,这样就实现了接口Handler。所以HadnlerFunc是一个实现了接口的函数类型,简称为接口型函数。

1
2
3
4
5
6
7
8
9
10
11

type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}

type HandlerFunc func(ResponseWriter, *Request)

// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}

优势

我们以上文的mux.Handle(pattern, HandlerFunc(handler))为例,它的作用是使用某个处理器方法来处理匹配路径的请求,而handler则代表了处理器方法。

1
2
3
4
5
6
7

func (mux *ServeMux) Handle(pattern string, handler Handler) {
...
e := muxEntry{h: handler, pattern: pattern}
mux.m[pattern] = e
...
}

我们可以使用多种方式来调用这个函数,例如将HandlerFunc类型的函数作为参数,这样支持匿名函数,也支持普通的函数,还有一种方式是将实现了Handler接口的结构体作为参数,但是这种方式适用于逻辑较为复杂的场景,如果需要的信息比较多,而且还有很多中间状态要保持,那么封装为一个结构体显然更符合情况。

通过接口型函数,该方法的参数既可以是普通的函数类型,也可以是结构体,使用起来更为灵活,可读性更好。

1
2
3
4
5
6
7
8
9
10

func home(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("hello, index page"))
}

func main() {
http.Handle("/home", http.HandlerFunc(home))
_ = http.ListenAndServe("localhost:8000", nil)
}

总结

HTTP标准库的实现相对来说比较简单,感觉能学到的东西比较少,不过想要学习更多的网络框架、Web框架,HTTP标准库还是先要了解的。

参考与推荐阅读