go的net/http标准库提供了一套完整的http请求和响应服务
服务端部分:go标准库---net/http服务端
今天从客户端的角度来学习分析下,受限介绍Client相关的数据结构。
1、数据结构
(1)对应服务端,客户端也封装了一个对象,相较Server,Client的成员少一些,其中最主要的就是Transport,负责客户端请求发起到返回响应的实现。
type Client struct {Transport RoundTripper //http请求的具体实现Jar CookieJar // cookieTimeout time.Duration //超时
}
(2) Transport是一个RoundTripper的接口类型,该接口类型实现一个RoundTrip方法,该方法负责实现传入请求,返回请求对应的响应,是客户端请求的核心
type RoundTripper interface {RoundTrip(*Request) (*Response, error)
}
(3) Request是请求的结构体,包含了请求方法、请求体、请求地址,协议等信息
type Request struct {Method string //请求方法URL *url.URL //请求的地址信息Header Header //请求头Proto string // 协议Body io.ReadCloser //请求体Host string //服务器主机Response *Response// ...}
(4)Response响应体,包含请求状态、协议、响应头、响应体、请求体等
type Response struct {StatusCode int // 请求状态Proto string // 协议Header Header //相应头Body io.ReadCloser // 响应体Request *Request // 对应的请求体// ...
}
2、流程分析
以Post请求为例,按照如下的请求函数进行梳理,如果直接调用标准库的请求函数,将使用默认的客户端实例DefaultClient,入参有路径和文本类型一级请求体参数
var DefaultClient = &Client{}func Post(url, contentType string, body io.Reader) (resp *Response, err error) {return DefaultClient.Post(url, contentType, body)
}
在Client.Post中,首先根据传入的参数构建一个Request结构体,在请求头中传入文本类型,最后执行Client.Do函数
func (c *Client) Post(url, contentType string, body io.Reader) (resp *Response, err error) {req, err := NewRequest("POST", url, body)if err != nil {return nil, err}req.Header.Set("Content-Type", contentType)return c.Do(req)
}
首先介绍下NewRequest这个函数,主要是构造Request实例,其中调用的是NewRequestWithContext这个函数,先检查请求的方法,如果为空默认是Get方法,接着检验方法的合法性,并格式化处理地址,最终返回一个Request实例
func NewRequest(method, url string, body io.Reader) (*Request, error) {return NewRequestWithContext(context.Background(), method, url, body)
}func NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error) {if method == "" {method = "GET"}if !validMethod(method) {return nil, fmt.Errorf("net/http: invalid method %q", method)}// ...u, err := urlpkg.Parse(url)// ...req := &Request{ctx: ctx,Method: method,URL: u,Proto: "HTTP/1.1",ProtoMajor: 1,ProtoMinor: 1,Header: make(Header),Body: rc,Host: u.Host,}// ...return req, nil
}
返回Request实例后作为参数依次执行Client.Do、Client.do,通过Client.send发送请求并处理cookie,send方法中传入c.transport()。返回transport实例
func (c *Client) Do(req *Request) (*Response, error) {return c.do(req)
}func (c *Client) do(req *Request) (retres *Response, reterr error) {// ...var (deadline = c.deadline()reqs []*Requestresp *ResponsecopyHeaders = c.makeHeadersCopier(req))// ...for {// ...if resp, didTimeout, err = c.send(req, deadline);// ...}
}func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {if c.Jar != nil {for _, cookie := range c.Jar.Cookies(req.URL) {req.AddCookie(cookie)}}resp, didTimeout, err = send(req, c.transport(), deadline)if err != nil {return nil, didTimeout, err}if c.Jar != nil {if rc := resp.Cookies(); len(rc) > 0 {c.Jar.SetCookies(req.URL, rc)}}return resp, nil, nil
}func (c *Client) transport() RoundTripper {if c.Transport != nil {return c.Transport}return DefaultTransport
}
在send中,通过RoundTrip实现请求连接和交互,每次请求都会创建一个transportRequest请求,通过Transport.getConn获取一个请求连接
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {// ...resp, err = rt.RoundTrip(req)// ...return resp, nil, nil
}func (t *Transport) roundTrip(req *Request) (*Response, error) {// ...for {// ...treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}cm, err := t.connectMethodForRequest(treq)pconn, err := t.getConn(treq, cm)var resp *Responseif pconn.alt != nil {resp, err = pconn.alt.RoundTrip(req)} else {resp, err = pconn.roundTrip(treq)}// ...}
}
getConn中,辉县创建一个wantConn对象,通过queueForIdleConn处理限制连接队列并返回一个空闲可用的连接*persistConn,如果没有找到,调用queueForDial会创建新的连接,如果连接准备好了就返回
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {// ...w := &wantConn{cm: cm,key: cm.key(),ctx: ctx,ready: make(chan struct{}, 1),beforeDial: testHookPrePendingDial,afterDial: testHookPostPendingDial,}defer func() {if err != nil {w.cancel(t, err)}}()if delivered := t.queueForIdleConn(w); delivered {pc := w.pcif pc.alt == nil && trace != nil && trace.GotConn != nil {trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))}t.setReqCanceler(treq.cancelKey, func(error) {})return pc, nil}t.queueForDial(w)select {case <-w.ready:// ...return w.pc, w.errcase <-req.Cancel:// ...case <-req.Context().Done():// ...case err := <-cancelc:// ...}
}
接下来介绍queueForIdleConn和queueForDial,从现有的连接队列中获取空闲连接,如果有的话,尝试是否能够绑定到wantConn上,并关闭wantConn的ready通道,唤醒其他地方的ready等待
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {// ...if list, ok := t.idleConn[w.key]; ok {for len(list) > 0 && !stop {pconn := list[len(list)-1]delivered = w.tryDeliver(pconn, nil)if delivered {// ...}}stop = true}if stop {return delivered}}// ...
}func (w *wantConn) tryDeliver(pc *persistConn, err error) bool {// ...w.ctx = nilw.pc = pcw.err = errclose(w.ready)return true
}
queueForDial中会创建新的连接,异步执行dialConnFor,dialConn和tryDeliver配合绑定
func (t *Transport) queueForDial(w *wantConn) {if t.MaxConnsPerHost <= 0 {go t.dialConnFor(w)return}if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {go t.dialConnFor(w)return}
}func (t *Transport) dialConnFor(w *wantConn) {// ...pc, err := t.dialConn(ctx, w.cm)delivered := w.tryDeliver(pc, err)// ...
}
其中dailConn首先创建一个persistConn实例,dial创建连接,赋值给pconn并返回,同时一部读写请求和响应
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {pconn = &persistConn{t: t,cacheKey: cm.key(),reqch: make(chan requestAndChan, 1),writech: make(chan writeRequest, 1),closech: make(chan struct{}),writeErrCh: make(chan error, 1),writeLoopDone: make(chan struct{}),}conn, err := t.dial(ctx, "tcp", cm.addr())pconn.conn = conngo pconn.readLoop()go pconn.writeLoop()return pconn, nil
}
readLoop负责读取响应
func (pc *persistConn) readLoop() { // ...alive := truefor alive {// ...rc := <-pc.reqch// ...var resp *Response// ...resp, err = pc.readResponse(rc, trace)// ... }
}
writeLoop负责接收请求的信息,将数据传递到服务端
func (pc *persistConn) writeLoop() { for {select {case wr := <-pc.writech:// ...err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))// ... }
}
roundTrip与上面的读写异步配合,将请求写入writech,在writeLoop中接受并传递给服务端,同时也通过reqch在readLoop中接受服务端的响应数据。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {// ...writeErrCh := make(chan error, 1)pc.writech <- writeRequest{req, writeErrCh, continueCh}resc := make(chan responseAndError)pc.reqch <- requestAndChan{req: req.Request,cancelKey: req.cancelKey,ch: resc,addedGzip: requestedGzip,continueCh: continueCh,callerGone: gone,}// ...for { select {// ...case re := <-resc:// ...return re.res, nil// ...}}
}