gomicro深度学习

1.整体架构介绍

产品嘴里的一个小项目,从立项到开发上线,随着时间和需求的不断激增,会越来越复杂,变成一个大项目,如果前期项目架构没设计的不好,代码会越来越臃肿,难以维护,后期的每次产品迭代上线都会牵一发而动全身。项目微服务化,松耦合模块间的关系,是一个很好的选择,随然增加了维护成本,但是还是很值得的。

img

微服务化项目除了稳定性我个人还比较关心的几个问题:

  • 一: 服务间数据传输的效率和安全性。

  • 二: 服务的动态扩充,也就是服务的注册和发现,服务集群化。

  • 三: 微服务功能的可订制化,因为并不是所有的功能都会很符合你的需求,难免需要根据自己的需要二次开发一些功能。

go-micro是go语言下的一个很好的rpc微服务框架,功能很完善,而且我关心的几个问题也解决的很好:

  • 一:服务间传输格式为protobuf,效率上没的说,非常的快,也很安全。

  • 二:go-micro的服务注册和发现是多种多样的。我个人比较喜欢etcdv3的服务服务发现和注册。

  • 三:主要的功能都有相应的接口,只要实现相应的接口,就可以根据自己的需要订制插件。

1.1通信流程

go-micro的通信流程大至如下

img

Server监听客户端的调用,和Brocker推送过来的信息进行处理。并且Server端需要向Register注册自己的存在或消亡,这样Client才能知道自己的状态。

Register服务的注册的发现。

Client端从Register中得到Server的信息,然后每次调用都根据算法选择一个的Server进行通信,当然通信是要经过编码/解码,选择传输协议等一系列过程的。

如果有需要通知所有的Server端可以使用Brocker进行信息的推送。

Brocker信息队列进行信息的接收和发布。

go-micro之所以可以高度订制和他的框架结构是分不开的,go-micro由8个关键的interface组成,每一个interface都可以根据自己的需求重新实现,这8个主要的inteface也构成了go-micro的框架结构。

img

这些接口go-micro都有他自己默认的实现方式,还有一个go-plugins是对这些接口实现的可替换项。你也可以根据需求实现自己的插件。

1.2Transort

服务之间通信的接口。也就是服务发送和接收的最终实现方式,是由这些接口定制的。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Transport interface {
Init(...Option) error
Options() Options
Dial(addr string, opts ...DialOption) (Client, error)
Listen(addr string, opts ...ListenOption) (Listener, error)
String() string
}

type Socket interface {
Recv(*Message) error
Send(*Message) error
Close() error
Local() string
Remote() string
}

type Client interface {
Socket
}

type Listener interface {
Addr() string
Close() error
Accept(func(Socket)) error
}

Transport 的Listen方法是一般是Server端进行调用的,他监听一个端口,等待客户端调用。

Transport 的Dial就是客户端进行连接服务的方法。他返回一个Client接口,这个接口返回一个Client接口,这个Client嵌入了Socket接口,这个接口的方法就是具体发送和接收通信的信息。

http传输是go-micro默认的同步通信机制。当然还有很多其他的插件:grpc,nats,tcp,udp,rabbitmq,nats,都是目前已经实现了的方式。在go-plugins里你都可以找到。

1.3Codec

有了传输方式,下面要解决的就是传输编码和解码问题,go-micro有很多种编码解码方式,默认的实现方式是protobuf,当然也有其他的实现方式,json、protobuf、jsonrpc、mercury等等。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type Codec interface {
Reader
Writer
Close() error
String() string
}

type Reader interface {
ReadHeader(*Message, MessageType) error
ReadBody(interface{}) error
}

type Writer interface {
Write(*Message, interface{}) error
}

// Marshaler is a simple encoding interface used for the broker/transport
// where headers are not supported by the underlying implementation.
type Marshaler interface {
Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error
String() string
}

// Message represents detailed information about
// the communication, likely followed by the body.
// In the case of an error, body may be nil.
type Message struct {
Id string
Type MessageType
Target string
Method string
Endpoint string
Error string

// The values read from the socket
Header map[string]string
Body []byte
}

Codec接口的Write方法就是编码过程,两个Read是解码过程。

1.4Registry

服务的注册和发现,目前实现的consul,mdns, etcd,etcdv3,zookeeper,kubernetes.等等,

1
2
3
4
5
6
7
8
9
10
11
12
13
// The registry provides an interface for service discovery
// and an abstraction over varying implementations
// {consul, etcd, zookeeper, ...}
type Registry interface {
Init(...Option) error
Options() Options
Register(*Service, ...RegisterOption) error
Deregister(*Service) error
GetService(string) ([]*Service, error)
ListServices() ([]*Service, error)
Watch(...WatchOption) (Watcher, error)
String() string
}

简单来说,就是Service 进行Register,来进行注册,Client 使用watch方法进行监控,当有服务加入或者删除时这个方法会被触发,以提醒客户端更新Service信息。

默认的是服务注册和发现是mdns

consul.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

func configure(c *consulRegistry, opts ...registry.Option) {
// set opts
for _, o := range opts {
o(&c.opts)
}

// use default config
config := consul.DefaultConfig()

if c.opts.Context != nil {
// Use the consul config passed in the options, if available
if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok {
config = co
}
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
c.connect = cn
}

// Use the consul query options passed in the options, if available
if qo, ok := c.opts.Context.Value("consul_query_options").(*consul.QueryOptions); ok && qo != nil {
c.queryOptions = qo
}
if as, ok := c.opts.Context.Value("consul_allow_stale").(bool); ok {
c.queryOptions.AllowStale = as
}
}

// check if there are any addrs
if len(c.opts.Addrs) > 0 {
addr, port, err := net.SplitHostPort(c.opts.Addrs[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
addr = c.opts.Addrs[0]
config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
}
}

if config.HttpClient == nil {
config.HttpClient = new(http.Client)
}

// requires secure connection?
if c.opts.Secure || c.opts.TLSConfig != nil {

config.Scheme = "https"
// We're going to support InsecureSkipVerify
config.HttpClient.Transport = newTransport(c.opts.TLSConfig)
}

// set timeout
if c.opts.Timeout > 0 {
config.HttpClient.Timeout = c.opts.Timeout
}

// create the client
client, _ := consul.NewClient(config)

// set address/client
c.Address = config.Address
c.Client = client
}

我个人比较喜欢etcdv3集群。大家可以根据自己的喜好选择。

1.5Selector

以Registry为基础,Selector 是客户端级别的负载均衡,当有客户端向服务发送请求时, selector根据不同的算法从Registery中的主机列表,得到可用的Service节点,进行通信。目前实现的有循环算法和随机算法,默认的是随机算法。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Selector builds on the registry as a mechanism to pick nodes
// and mark their status. This allows host pools and other things
// to be built using various algorithms.
type Selector interface {
Init(opts ...Option) error
Options() Options
// Select returns a function which should return the next node
Select(service string, opts ...SelectOption) (Next, error)
// Mark sets the success/error against a node
Mark(service string, node *registry.Node, err error)
// Reset returns state back to zero for a service
Reset(service string)
// Close renders the selector unusable
Close() error
// Name of the selector
String() string
}

默认的是实现是本地缓存,当前实现的有blacklist,label,named等方式。

1.6Broker

Broker是消息发布和订阅的接口。很简单的一个例子,因为服务的节点是不固定的,如果有需要修改所有服务行为的需求,可以使服务订阅某个主题,当有信息发布时,所有的监听服务都会收到信息,根据你的需要做相应的行为。

源码:

1
2
3
4
5
6
7
8
9
10
11
// Broker is an interface used for asynchronous messaging.
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}

Broker默认的实现方式是http方式,但是这种方式不要在生产环境用。go-plugins里有很多成熟的消息队列实现方式,有kafka、nsq、rabbitmq、redis,等等。

1.7Client

Client是请求服务的接口,他封装Transport和Codec进行rpc调用,也封装了Brocker进行信息的发布。

源码:

1
2
3
4
5
6
7
8
9
10
type Client interface {
Init(...Option) error
Options() Options
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
String() string
}

当然他也支持双工通信 Stream 这些具体的实现方式和使用方式,以后会详细解说。

默认的是rpc实现方式,他还有grpc和http方式,在go-plugins里可以找到

1.8Server

Server看名字大家也知道是做什么的了。监听等待rpc请求。监听broker的订阅信息,等待信息队列的推送等。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Server is a simple micro server abstraction
type Server interface {
Options() Options
Init(...Option) error
Handle(Handler) error
NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error
Start() error
Stop() error
String() string
}

// Router handle serving messages
type Router interface {
// ServeRequest processes a request to completion
ServeRequest(context.Context, Request, Response) error
}

// Message is an async message interface
type Message interface {
Topic() string
Payload() interface{}
ContentType() string
}

// Request is a synchronous request interface
type Request interface {
// Service name requested
Service() string
// The action requested
Method() string
// Endpoint name requested
Endpoint() string
// Content type provided
ContentType() string
// Header of the request
Header() map[string]string
// Body is the initial decoded value
Body() interface{}
// Read the undecoded request body
Read() ([]byte, error)
// The encoded message stream
Codec() codec.Reader
// Indicates whether its a stream
Stream() bool
}

// Response is the response writer for unencoded messages
type Response interface {
// Encoded writer
Codec() codec.Writer
// Write the header
WriteHeader(map[string]string)
// write a response directly to the client
Write([]byte) error
}

// Stream represents a stream established with a client.
// A stream can be bidirectional which is indicated by the request.
// The last error will be left in Error().
// EOF indicates end of the stream.
type Stream interface {
Context() context.Context
Request() Request
Send(interface{}) error
Recv(interface{}) error
Error() error
Close() error
}

// Handler interface represents a request handler. It's generated
// by passing any type of public concrete object with endpoints into server.NewHandler.
// Most will pass in a struct.
//
// Example:
//
// type Greeter struct {}
//
// func (g *Greeter) Hello(context, request, response) error {
// return nil
// }
//
type Handler interface {
Name() string
Handler() interface{}
Endpoints() []*registry.Endpoint
Options() HandlerOptions
}

// Subscriber interface represents a subscription to a given topic using
// a specific subscriber function or object with endpoints.
type Subscriber interface {
Topic() string
Subscriber() interface{}
Endpoints() []*registry.Endpoint
Options() SubscriberOptions
}

默认的是rpc实现方式,他还有grpc和http方式,在go-plugins里可以找到

1.9Service

Service是Client和Server的封装,他包含了一系列的方法使用初始值去初始化Service和Client,使我们可以很简单的创建一个rpc服务。

源码:

1
2
3
4
5
6
7
8
9
10
11
// Service is an interface that wraps the lower level libraries
// within go-micro. Its a convenience method for building
// and initialising services.
type Service interface {
Init(...Option)
Options() Options
Client() client.Client
Server() server.Server
Run() error
String() string
}

2.入门例子

上一篇帖子简单介绍了go-micro的整体框架结构,这一篇主要写go-micro使用方式的例子,中间会穿插一些go-micro的源码,和调用流程图,帮大家更好的理解go-micro的底层。更详细更具体的调用流程和细节,会在以后的帖子里详细讲解。

例子的github地址: gomicrorpc 跑一遍例子,也就会明白个大概。

安装所需要的环境

go-micro服务发现默认使用的是consul,(2019年源码修改了默认使用mdns)

1
2
brew install consul
consul agent -dev

者直接使用使用docker跑

1
docker run -p 8300:8300 -p 8301:8301 -p 8301:8301/udp -p 8302:8302/udp -p 8302:8302 -p 8400:8400 -p 8500:8500 -p 53:53/udp consul

我个人更喜欢etcdv3原因我上一篇也有提到过,gomicro服务发现不支持consul集群,我之前也写过etcdv3 集群的搭建和使用帖子,有时间大家可以看一下

安装go-micro框架

1
go get github.com/micro/go-micro

安装protobuf和依赖 prtobuf的基础知识我这里就不讲了,如果不了解的可以看一下官方文档,就是一个跨平台,跨语言的数据序列化库,简单易学。

是go-micro用于帮助我们生成服务接口和一系列的调用代码

1
2
3
brew install protobuf
go get -u -v github.com/golang/protobuf/{proto,protoc-gen-go}
go get -u -v github.com/micro/protoc-gen-micro

protobuf也可以直接从源码安装

1
2
3
4
5
6
7
8
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.6.1/protobuf-all-3.6.1.tar.gz
tar zxvf protobuf-all-3.6.1.tar.gz
cd protobuf-3.6.1/
./autogen.sh
./configure
make
make install
protoc -h

安装micro工具包,这个安装是可选项,micro提供了一系列的工具来帮助我们更好的使用go-micro。

1
go get github.com/micro/micro

例子1

创建proto文件common.proto,这个文件包含了传入和返回的参数,参数包含了常用的基础类型、数组、map等。还有一个Say 服务,这个服务里有一个rpc方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
syntax = "proto3";

package model;

message SayParam {
string msg = 1;
}

message Pair {
int32 key = 1;
string values = 2;
}

message SayResponse {
string msg = 1;
// 数组
repeated string values = 2;
// map
map<string, Pair> header = 3;
RespType type = 4;
}

enum RespType {
NONE = 0;
ASCEND = 1;
DESCEND = 2;
}

// 服务接口
service Say {
rpc Hello(SayParam) returns (SayResponse) {}
}

在根目录下运行,生成两个模板文件

1
protoc --proto_path=$GOPATH/src:. --micro_out=. --go_out=. example1/proto/*.proto 

一个文件是proto的go 结构文件,还有一个go-micro rpc的接口文件。

server 端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type Say struct {}

func (s *Say) Hello(ctx context.Context, req *model.SayParam, rsp *model.SayResponse) error {
fmt.Println("received", req.Msg)
rsp.Header = make(map[string]*model.Pair)
rsp.Header["name"] = &model.Pair{Key: 1, Values: "abc"}

rsp.Msg = "hello world"
rsp.Values = append(rsp.Values, "a", "b")
rsp.Type = model.RespType_DESCEND

return nil
}


func main() {
// 我这里用的etcd 做为服务发现,如果使用consul可以去掉
reg := etcdv3.NewRegistry(func(op *registry.Options){
op.Addrs = []string{
"http://192.168.3.34:2379", "http://192.168.3.18:2379", "http://192.168.3.110:2379",
}
})

// 初始化服务
service := micro.NewService(
micro.Name("lp.srv.eg1"),
micro.Registry(reg),
)
service.Init()
// 注册 Handler
model.RegisterSayHandler(service.Server(), new(Say))

// run server
if err := service.Run(); err != nil {
panic(err)
}
}

服务发现我使用的是etcdv3 替换了默认的consul

micro.NewService 初始化服务,然后返回一个Service接口的实例,newService()方法的大概流程如下,

img

先是给各个接口初始化默认值,再使用传入的值替换默认值,这也是go-micro可替换插件的地方。

service有一个Init()可选方法,这是一个单例方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called
// on first Init.
func (s *service) Init(opts ...Option) {
// process options
for _, o := range opts {
o(&s.opts)
}

s.once.Do(func() {
// Initialise the command flags, overriding new service
_ = s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker),
cmd.Registry(&s.opts.Registry),
cmd.Transport(&s.opts.Transport),
cmd.Client(&s.opts.Client),
cmd.Server(&s.opts.Server),
)
})
}

用于始化cmd的一些信息

service.Run()方法 调用流程

img

因为在初始化的时候没有指定端口,系统会自动分配一个端口号分给Server,并把这个server的信息注册到Register。

BeferStart和AfterStart也都是可以自定义的

client 端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
// 我这里用的etcd 做为服务发现,如果使用consul可以去掉
reg := etcdv3.NewRegistry(func(op *registry.Options){
op.Addrs = []string{
"http://192.168.3.34:2379", "http://192.168.3.18:2379", "http://192.168.3.110:2379",
}
})

// 初始化服务
service := micro.NewService(
micro.Registry(reg),
)
service.Init()
sayClent := model.NewSayService("lp.srv.eg1", service.Client())


rsp, err := sayClent.Hello(context.Background(), &model.SayParam{Msg: "hello server"})
if err != nil {
panic(err)
}

fmt.Println(rsp)

}

上面根据proto文件的生成的两个文件中有一个是rpc的接口文件,接口文件已经帮我们把调用方法的整个流程封装好了。

只需要给出服务名称和licent就可以。然后调用Hello方法

源码:

1
2
3
4
5
6
7
8
9
func (c *sayService) Hello(ctx context.Context, in *SayParam, opts ...client.CallOption) (*SayResponse, error) {
req := c.c.NewRequest(c.name, "Say.Hello", in)
out := new(SayResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

主要的流程里都在c.c.Call方法里。简单来说流程如下

img

就是得到节点信息address,根据address去查询 pool里是否有连接,如果有则取出来,如果没有则创建,然后进行数据传输,传输完成后把client放回到pool内。pool的大小也是可以控制的,这部分的代码读起来特别爽,具体的细节和处理流程会在以后的帖子里详细讲解

例子2

例子1,做了一个简单的服务,已经不能再简单了,只是为了能让大家熟悉一下go-micro。看完例子1后应该会有更多的想法,想使用更多的go-micro的功能,比如protobuf生成的类都在一起,如果想model和api分开怎么处理,怎么使用go-micro的双向流,怎么使用消息推送,等等。所以我就双做了一个小例子,这个例子里包含了一些东西。

这个例子我就只说一下组织结构,也没有多少代码,大家有时间看一下就ok了。

proto下的两个文件夹,一个model一个rpcapi,是把数据和api分开,api引用了model

看一下rpcapi

1
2
3
4
5
6
7
8
9
10
11
syntax = "proto3";

package rpcapi;
import "github.com/lpxxn/gomicrorpc/example2/proto/model/common.proto";

// 服务接口
service Say {
rpc Hello(model.SayParam) returns (model.SayResponse) {}
rpc Stream(model.SRequest) returns (stream model.SResponse) {}

}

​ import了model里的common.proto

在生成的时候一个只要go_out另一个只要micro_out就好了

1
2
protoc --proto_path=$GOPATH/src:. --go_out=. example2/proto/model/*.proto 
protoc --proto_path=$GOPATH/src:. --micro_out=. example2/proto/rpcapi/*.proto

订阅一个信息

1
2
3
4
// Register Subscribers
if err := server.Subscribe(server.NewSubscriber(common.Topic1, subscriber.Handler)); err != nil {
panic(err)
}

当有信息发送时,所有订阅了lp.srv.eg2.topic1这个信息的服务都会收到信息

客户端发送信息

1
2
p := micro.NewPublisher(common.Topic1, service.Client())
p.Publish(context.TODO(), &model.SayParam{Msg: lib.RandomStr(lib.Random(3, 10))})

如果是生产环境一定不要用go-micro默认的信息发布和订阅处理方式,micro的插件plugin里是有很多成熟的插件。

使用双向流的小功能

这个方法只是每次向客户端发送一些数据,每次只发送一部分。比如我们给客户端推送的数据很大时,一次性全都推过去,是不太正确的做法,分批推送还是比较好的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *Say) Stream(ctx context.Context, req *model.SRequest, stream rpcapi.Say_StreamStream) error {

for i := 0; i < int(req.Count); i++ {
rsp := &model.SResponse{}
for j := lib.Random(3, 5); j < 10; j++ {
rsp.Value = append(rsp.Value, lib.RandomStr(lib.Random(3, 10)))
}
if err := stream.Send(rsp); err != nil {
return err
}
// 模拟处理过程
time.Sleep(time.Microsecond * 50)
}
return nil

return nil
}

3.Registry服务的注册和发现

服务的注册与发现是微服务必不可少的功能,这样系统才能有更高的性能,更高的可用性。go-micro框架的服务发现有自己能用的接口Registry。只要实现这个接口就可以定制自己的服务注册和发现。

go-micro在客户端做的负载,典型的Balancing-aware Client模式。

img

服务端把服务的地址信息保存到Registry, 然后定时的心跳检查,或者定时的重新注册服务。客户端监听Registry,最好是把服务信息保存到本地,监听服务的变动,更新缓存。当调用服务端的接口是时,根据客户端的服务列表和负载算法选择服务端进行通信。

go-micro的能用Registry接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// The registry provides an interface for service discovery
// and an abstraction over varying implementations
// {consul, etcd, zookeeper, ...}
type Registry interface {
Init(...Option) error
Options() Options
Register(*Service, ...RegisterOption) error
Deregister(*Service) error
GetService(string) ([]*Service, error)
ListServices() ([]*Service, error)
Watch(...WatchOption) (Watcher, error)
String() string
}

// Watcher is an interface that returns updates
// about services within the registry.
type Watcher interface {
// Next is a blocking call
Next() (*Result, error)
Stop()
}

这个接口还是很简单明了的,看方法也大概能猜到主要的作用

Register方法和Deregister是服务端用于注册服务的,Watcher接口是客户端用于监听服务信息变化的。

接下来我以go-micro的etcdv3为Registry的例给大家详细讲解一下go-micro的详细服务发现过程

go-micro 服务端注册服务

流程图

img

服务端看上去流程还是比较简单的,当服务端调用Run()方法时,会调用service.Start()方法。这个除了监听端口,启动服务,还会把服务的ip端口号信息,和所有的公开接口的元数据信息保存到我们选择的Register服务器上去。

看上去没有问题,但是,如果我们的节点发生故障,也是需要告诉Register把我们的节点信息删除掉。

Run()方法中有个go s.run(ex) 方法的调用,这个方法就是根据我们设置interval去重新注册服务,当然比较保险的方式是我们把服务的ttl也设置上,这样当服务在未知的情况下崩溃,到了ttl的时间Register服务也会自动把信息删除掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *service) run(exit chan bool) {
if s.opts.RegisterInterval <= time.Duration(0) {
return
}

t := time.NewTicker(s.opts.RegisterInterval)

for {
select {
case <-t.C:
s.register()
case <-exit:
t.Stop()
return
}
}
}

设置服务的ttl和 interval

1
2
3
4
5
6
7
// 初始化服务
service := micro.NewService(
micro.Name(common.ServiceName),
micro.RegisterTTL(time.Second*30),
micro.RegisterInterval(time.Second*20),
micro.Registry(reg),
)

ttl就是注册服务的过期时间,interval就是间隔多久再次注册服务。如果系统崩溃,过期时间也会把服务删除掉。客户端当然也会有相应的判断,下面会详细解说

客户端发现服务

客户端的服务发现要步骤多一些,但并不复杂,他涉及到服务选择Selector和服务发现Register两部分。

Selector是基于服务发现的,根据你选择的主机选择算法,返回主机的信息。默认的情况,go-micro是每次要得到服务器主机的信息都要去Register去获取。但是查看cmd.go的源码你会发现默认初始化的值,selector的默认flag是cache。DefaultSelectors里的cache对应的就是初始化cacheSelector方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
DefaultFlags = []cli.Flag{
cli.StringFlag{
Name: "client",
EnvVar: "MICRO_CLIENT",
Usage: "Client for go-micro; rpc",
},
cli.StringFlag{
Name: "client_request_timeout",
EnvVar: "MICRO_CLIENT_REQUEST_TIMEOUT",
Usage: "Sets the client request timeout. e.g 500ms, 5s, 1m. Default: 5s",
},
cli.IntFlag{
Name: "client_retries",
EnvVar: "MICRO_CLIENT_RETRIES",
Value: client.DefaultRetries,
Usage: "Sets the client retries. Default: 1",
},
cli.IntFlag{
Name: "client_pool_size",
EnvVar: "MICRO_CLIENT_POOL_SIZE",
Usage: "Sets the client connection pool size. Default: 1",
},
cli.StringFlag{
Name: "client_pool_ttl",
EnvVar: "MICRO_CLIENT_POOL_TTL",
Usage: "Sets the client connection pool ttl. e.g 500ms, 5s, 1m. Default: 1m",
},
cli.IntFlag{
Name: "register_ttl",
EnvVar: "MICRO_REGISTER_TTL",
Usage: "Register TTL in seconds",
},
cli.IntFlag{
Name: "register_interval",
EnvVar: "MICRO_REGISTER_INTERVAL",
Usage: "Register interval in seconds",
},
cli.StringFlag{
Name: "server",
EnvVar: "MICRO_SERVER",
Usage: "Server for go-micro; rpc",
},
cli.StringFlag{
Name: "server_name",
EnvVar: "MICRO_SERVER_NAME",
Usage: "Name of the server. go.micro.srv.example",
},
cli.StringFlag{
Name: "server_version",
EnvVar: "MICRO_SERVER_VERSION",
Usage: "Version of the server. 1.1.0",
},
cli.StringFlag{
Name: "server_id",
EnvVar: "MICRO_SERVER_ID",
Usage: "Id of the server. Auto-generated if not specified",
},
cli.StringFlag{
Name: "server_address",
EnvVar: "MICRO_SERVER_ADDRESS",
Usage: "Bind address for the server. 127.0.0.1:8080",
},
cli.StringFlag{
Name: "server_advertise",
EnvVar: "MICRO_SERVER_ADVERTISE",
Usage: "Used instead of the server_address when registering with discovery. 127.0.0.1:8080",
},
cli.StringSliceFlag{
Name: "server_metadata",
EnvVar: "MICRO_SERVER_METADATA",
Value: &cli.StringSlice{},
Usage: "A list of key-value pairs defining metadata. version=1.0.0",
},
cli.StringFlag{
Name: "broker",
EnvVar: "MICRO_BROKER",
Usage: "Broker for pub/sub. http, nats, rabbitmq",
},
cli.StringFlag{
Name: "broker_address",
EnvVar: "MICRO_BROKER_ADDRESS",
Usage: "Comma-separated list of broker addresses",
},
cli.StringFlag{
Name: "registry",
EnvVar: "MICRO_REGISTRY",
Usage: "Registry for discovery. consul, mdns",
},
cli.StringFlag{
Name: "registry_address",
EnvVar: "MICRO_REGISTRY_ADDRESS",
Usage: "Comma-separated list of registry addresses",
},
cli.StringFlag{
Name: "selector",
EnvVar: "MICRO_SELECTOR",
Usage: "Selector used to pick nodes for querying",
},
cli.StringFlag{
Name: "transport",
EnvVar: "MICRO_TRANSPORT",
Usage: "Transport mechanism used; http",
},
cli.StringFlag{
Name: "transport_address",
EnvVar: "MICRO_TRANSPORT_ADDRESS",
Usage: "Comma-separated list of transport addresses",
},
}

DefaultBrokers = map[string]func(...broker.Option) broker.Broker{
"http": http.NewBroker,
"memory": memory.NewBroker,
"nats": nats.NewBroker,
}

DefaultClients = map[string]func(...client.Option) client.Client{
"rpc": client.NewClient,
"mucp": cmucp.NewClient,
"grpc": cgrpc.NewClient,
}

DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
"consul": consul.NewRegistry,
"gossip": gossip.NewRegistry,
"mdns": mdns.NewRegistry,
"memory": rmem.NewRegistry,
}

DefaultSelectors = map[string]func(...selector.Option) selector.Selector{
"default": selector.NewSelector,
"dns": dns.NewSelector,
"cache": selector.NewSelector,
"router": router.NewSelector,
"static": static.NewSelector,
}

DefaultServers = map[string]func(...server.Option) server.Server{
"rpc": server.NewServer,
"mucp": smucp.NewServer,
"grpc": sgrpc.NewServer,
}

DefaultTransports = map[string]func(...transport.Option) transport.Transport{
"memory": tmem.NewTransport,
"http": thttp.NewTransport,
"grpc": tgrpc.NewTransport,
}

// used for default selection as the fall back
defaultClient = "rpc"
defaultServer = "rpc"
defaultBroker = "http"
defaultRegistry = "mdns"
defaultSelector = "registry"
defaultTransport = "http"

但是当你在执行service.Init()方法时,go-micro会把默认的selector替换成cacheSelector,具体的实现是在cmd.go的Before方法里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185

func (c *cmd) Before(ctx *cli.Context) error {
// If flags are set then use them otherwise do nothing
var serverOpts []server.Option
var clientOpts []client.Option

// Set the client
if name := ctx.String("client"); len(name) > 0 {
// only change if we have the client and type differs
if cl, ok := c.opts.Clients[name]; ok && (*c.opts.Client).String() != name {
*c.opts.Client = cl()
}
}

// Set the server
if name := ctx.String("server"); len(name) > 0 {
// only change if we have the server and type differs
if s, ok := c.opts.Servers[name]; ok && (*c.opts.Server).String() != name {
*c.opts.Server = s()
}
}

// Set the broker
if name := ctx.String("broker"); len(name) > 0 && (*c.opts.Broker).String() != name {
b, ok := c.opts.Brokers[name]
if !ok {
return fmt.Errorf("Broker %s not found", name)
}

*c.opts.Broker = b()
serverOpts = append(serverOpts, server.Broker(*c.opts.Broker))
clientOpts = append(clientOpts, client.Broker(*c.opts.Broker))
}

// Set the registry
if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {
r, ok := c.opts.Registries[name]
if !ok {
return fmt.Errorf("Registry %s not found", name)
}

*c.opts.Registry = r()
serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))

if err := (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)); err != nil {
log.Fatalf("Error configuring registry: %v", err)
}

clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))

if err := (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)); err != nil {
log.Fatalf("Error configuring broker: %v", err)
}
}

// Set the selector
if name := ctx.String("selector"); len(name) > 0 && (*c.opts.Selector).String() != name {
s, ok := c.opts.Selectors[name]
if !ok {
return fmt.Errorf("Selector %s not found", name)
}

*c.opts.Selector = s(selector.Registry(*c.opts.Registry))

// No server option here. Should there be?
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
}

// Set the transport
if name := ctx.String("transport"); len(name) > 0 && (*c.opts.Transport).String() != name {
t, ok := c.opts.Transports[name]
if !ok {
return fmt.Errorf("Transport %s not found", name)
}

*c.opts.Transport = t()
serverOpts = append(serverOpts, server.Transport(*c.opts.Transport))
clientOpts = append(clientOpts, client.Transport(*c.opts.Transport))
}

// Parse the server options
metadata := make(map[string]string)
for _, d := range ctx.StringSlice("server_metadata") {
var key, val string
parts := strings.Split(d, "=")
key = parts[0]
if len(parts) > 1 {
val = strings.Join(parts[1:], "=")
}
metadata[key] = val
}

if len(metadata) > 0 {
serverOpts = append(serverOpts, server.Metadata(metadata))
}

if len(ctx.String("broker_address")) > 0 {
if err := (*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...)); err != nil {
log.Fatalf("Error configuring broker: %v", err)
}
}

if len(ctx.String("registry_address")) > 0 {
if err := (*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...)); err != nil {
log.Fatalf("Error configuring registry: %v", err)
}
}

if len(ctx.String("transport_address")) > 0 {
if err := (*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...)); err != nil {
log.Fatalf("Error configuring transport: %v", err)
}
}

if len(ctx.String("server_name")) > 0 {
serverOpts = append(serverOpts, server.Name(ctx.String("server_name")))
}

if len(ctx.String("server_version")) > 0 {
serverOpts = append(serverOpts, server.Version(ctx.String("server_version")))
}

if len(ctx.String("server_id")) > 0 {
serverOpts = append(serverOpts, server.Id(ctx.String("server_id")))
}

if len(ctx.String("server_address")) > 0 {
serverOpts = append(serverOpts, server.Address(ctx.String("server_address")))
}

if len(ctx.String("server_advertise")) > 0 {
serverOpts = append(serverOpts, server.Advertise(ctx.String("server_advertise")))
}

if ttl := time.Duration(ctx.GlobalInt("register_ttl")); ttl > 0 {
serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second))
}

if val := time.Duration(ctx.GlobalInt("register_interval")); val > 0 {
serverOpts = append(serverOpts, server.RegisterInterval(val*time.Second))
}

// client opts
if r := ctx.Int("client_retries"); r >= 0 {
clientOpts = append(clientOpts, client.Retries(r))
}

if t := ctx.String("client_request_timeout"); len(t) > 0 {
d, err := time.ParseDuration(t)
if err != nil {
return fmt.Errorf("failed to parse client_request_timeout: %v", t)
}
clientOpts = append(clientOpts, client.RequestTimeout(d))
}

if r := ctx.Int("client_pool_size"); r > 0 {
clientOpts = append(clientOpts, client.PoolSize(r))
}

if t := ctx.String("client_pool_ttl"); len(t) > 0 {
d, err := time.ParseDuration(t)
if err != nil {
return fmt.Errorf("failed to parse client_pool_ttl: %v", t)
}
clientOpts = append(clientOpts, client.PoolTTL(d))
}

// We have some command line opts for the server.
// Lets set it up
if len(serverOpts) > 0 {
if err := (*c.opts.Server).Init(serverOpts...); err != nil {
log.Fatalf("Error configuring server: %v", err)
}
}

// Use an init option?
if len(clientOpts) > 0 {
if err := (*c.opts.Client).Init(clientOpts...); err != nil {
log.Fatalf("Error configuring client: %v", err)
}
}

return nil
}

cacheSelector 会把从Register里获取的主机信息缓存起来。并设置超时时间,如果超时则重新获取。在获取主机信息的时候他会单独跑一个协程,去watch服务的注册,如果有新节点发现,则加到缓存中,如果有节点故障则删除缓存中的节点信息。当client还要根据selector选择的主机选择算法才能得到主机信息,目前只有两种算法,循环和随机法。为了增加执行效率,很client端也会设置缓存连接池,这个点,以后会详细说。

所以大概的客户端服务发现流程是下面这样

img

主要的调用过程都在Call方法内

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}

next, err := r.next(request, callOpts)
if err != nil {
return err
}

// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}

// should we noop right here?
select {
case <-ctx.Done():
return errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
default:
}

// make copy of call method
rcall := r.call

// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
rcall = callOpts.CallWrappers[i-1](rcall)
}

// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}

// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}

// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}

// make the call
err = rcall(ctx, node, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return err
}

ch := make(chan error, callOpts.Retries+1)
var gerr error

for i := 0; i <= callOpts.Retries; i++ {
go func(i int) {
ch <- call(i)
}(i)

select {
case <-ctx.Done():
return errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {
return nil
}

retry, rerr := callOpts.Retry(ctx, request, i, err)
if rerr != nil {
return rerr
}

if !retry {
return err
}

gerr = err
}
}

return gerr
}

主要的思路是

  1. 从Selector里得到选择主机策略方法next。

  2. 根据Retory是否重试调用服务,调用服务的过程是,从next 方法内得到主机,连接并传输数据 ,如果失败则重试,重试时,会根据主机选择策略方法next重新得到一个新的主机进行操作。

4.rpc方法调用过程详解

上一篇帖子go微服务框架go-micro深度学习(三) Registry服务的注册和发现详细解释了go-micro是如何做服务注册和发现在,服务端注册server信息,client获取server的地址信息,就可以和服务建立连接,然后就可以进行通信了。这篇帖子详细说一下,go-micro的通信协议、编码,和具体服务方法的调用过程是如何实现的,文中的代码还是我github上的例子: gomicrorpc

go-micro 支持很多通信协议:http、tcp、grpc等,支持的编码方式也很多有json、protobuf、bytes、jsonrpc等。也可以根据自己的需要实现通信协议和编码方式。go-micro 默认的通信协议是http,默认的编码方式是protobuf,我就以默认的方式来分解他的具体实现。

img

服务的启动

go-micro在启动的时候会选择默认通信协议http和protobuf编码方式,但他是如何路由到具体方法的?在go-micro服务端启动的时候我们需要注册Handler,也就是我们具体实现结构体 ,如例子中注册方法时,我们调用的RegisterSayHandler方法

1
2
// 注册 Handler
rpcapi.RegisterSayHandler(service.Server(), new(handler.Say))

这个方法内部的体实现主要是利用了反射的力量,注册的对象是实现了rpc接口的方法,如我们的Say实现了SayHandler。go-micro默认的router会利用反射把Say对象的信息完全提取出来,解析出结构体内的方法及方法的参数,保存到一个map内-> map[结构体名称][方法信息集合]

具体的实现在rpc_router.go里router的Handle(Handler)方法,组织完成后map的是下图这样,保存了很多反射信息,用以将来调用。

下面是这个方法的主要代码,删除了一些,很希望大家读一下rpc_router.go里面的代码,prepareMethod方法是具体利用反射提取信息的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (router *router) Handle(h Handler) error {
router.mu.Lock()
defer router.mu.Unlock()
// ....

rcvr := h.Handler()
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)

// check name
// ....
s.name = h.Name()
s.method = make(map[string]*methodType)

// Install the methods
for m := 0; m < s.typ.NumMethod(); m++ {
method := s.typ.Method(m)
// prepareMethod会把所有解析的信息返回来
if mt := prepareMethod(method); mt != nil {
s.method[method.Name] = mt
}
}
// .....
// save handler
router.serviceMap[s.name] = s
return nil
}

serviceMap里保存的就是反射后的信息,下图是我用goland的debug得到的保存信息

img

路由信息处理完后,主要的工作就已经完成了,然后注册服务并启动服务,启动的服务是一个http的服务,我们可以看一下http_transport.go里的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

func (h *httpTransportListener) Accept(fn func(Socket)) error {
// create handler mux
mux := http.NewServeMux()

// register our transport handler
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var buf *bufio.ReadWriter
var con net.Conn

// read a regular request
if r.ProtoMajor == 1 {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
r.Body = ioutil.NopCloser(bytes.NewReader(b))
// hijack the conn
hj, ok := w.(http.Hijacker)
if !ok {
// we're screwed
http.Error(w, "cannot serve conn", http.StatusInternalServerError)
return
}

conn, bufrw, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
buf = bufrw
con = conn
}

// save the request
ch := make(chan *http.Request, 1)
ch <- r

fn(&httpTransportSocket{
ht: h.ht,
w: w,
r: r,
rw: buf,
ch: ch,
conn: con,
local: h.Addr(),
remote: r.RemoteAddr,
})
})

// get optional handlers
if h.ht.opts.Context != nil {
handlers, ok := h.ht.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {
mux.Handle(pattern, handler)
}
}
}

// default http2 server
srv := &http.Server{
Handler: mux,
}

// insecure connection use h2c
if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) {
srv.Handler = h2c.NewHandler(mux, &http2.Server{})
}

// begin serving
return srv.Serve(h.listener)
}

服务的简单流程图如下 ,选择通信协议和编码方式->注册服务方法->启动服务并注册服务信息

img

客户端调用服务方法

客户端在启动的时候也要选择默认的通信协议http,和protobuf编码。客户端在调用rpc方法的时候如:

1
rsp, err := client.Hello(context.Background(), &model.SayParam{Msg: "hello server"})

go-micro为我们自动生成的rpcapi.micro.go里我们可以看一上Hello的具体实现,没有几行代码,但内部还是做了很多工作

1
2
3
4
5
6
7
8
9
func (c *sayService) Hello(ctx context.Context, in *model.SayParam, opts ...client.CallOption) (*model.SayResponse, error) {
req := c.c.NewRequest(c.name, "Say.Hello", in)
out := new(model.SayResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

他的实现方式是封装request,然后调用服务方法。这个request 是非常重要的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func newRequest(service, endpoint string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
var opts RequestOptions

for _, o := range reqOpts {
o(&opts)
}

// set the content-type specified
if len(opts.ContentType) > 0 {
contentType = opts.ContentType
}

return &rpcRequest{
service: service,
method: endpoint,
endpoint: endpoint,
body: request,
contentType: contentType,
opts: opts,
}
}

这个方法返回一个rpcRequest里面包含了详细的调用信息,servicec:服务名,method和endpoint目前是一样的是方法名这里是Say.Hello,contentType是protobuf,body 是具体的信息,也要要进行编码的内容,使用protobuf进行编码,然后会把这些信息放到一个http.Request里,再从Register或者从缓存获取服务器信息,连接服务器,发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

func (h *httpTransportClient) Send(m *Message) error {
header := make(http.Header)

for k, v := range m.Header {
header.Set(k, v)
}

reqB := bytes.NewBuffer(m.Body)
defer reqB.Reset()
buf := &buffer{
reqB,
}

req := &http.Request{
Method: "POST",
URL: &url.URL{
Scheme: "http",
Host: h.addr,
},
Header: header,
Body: buf,
ContentLength: int64(reqB.Len()),
Host: h.addr,
}

h.Lock()
h.bl = append(h.bl, req)
select {
case h.r <- h.bl[0]:
h.bl = h.bl[1:]
default:
}
h.Unlock()

// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}

return req.Write(h.conn)
}

简单流程图如下

client:封装参数-> 编码数据->连接服务->发送数据->接收返回数据,并解码。

service: 接收数据->解码数据,找到相应的实例和方法,利用反射调用具体方法->编码返数据->发送给客户端。

img

服务端处理请求

当服务端监接收到数据后,从http的Request里的Header中读取到相应的信息:编码方式,endpoint,请求的数据,由路由器进行对比和匹配找到保存的反射信息,利用反射把请求的数据根据相应的编码方式进行解码,再利用反射调用具体的方法,处理完把返回数据进行编码,组织一个http.Response传输给用户,客户端接收到数据后进行解码读取数据。 rpc_router.go里的call方法就是具体的调用过程方法,有时间大家可以读一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error {
defer router.freeRequest(req)

function := mtype.method.Func
var returnValues []reflect.Value

r := &rpcRequest{
service: req.msg.Target,
contentType: req.msg.Header["Content-Type"],
method: req.msg.Method,
endpoint: req.msg.Endpoint,
body: req.msg.Body,
}

// only set if not nil
if argv.IsValid() {
r.rawBody = argv.Interface()
}

if !mtype.stream {
fn := func(ctx context.Context, req Request, rsp interface{}) error {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})

// The return value for the method is an error.
if err := returnValues[0].Interface(); err != nil {
return err.(error)
}

return nil
}

// wrap the handler
for i := len(router.hdlrWrappers); i > 0; i-- {
fn = router.hdlrWrappers[i-1](fn)
}

// execute handler
if err := fn(ctx, r, replyv.Interface()); err != nil {
return err
}

// send response
return router.sendResponse(sending, req, replyv.Interface(), cc, true)
}

// declare a local error to see if we errored out already
// keep track of the type, to make sure we return
// the same one consistently
rawStream := &rpcStream{
context: ctx,
codec: cc.(codec.Codec),
request: r,
id: req.msg.Id,
}

// Invoke the method, providing a new value for the reply.
fn := func(ctx context.Context, req Request, stream interface{}) error {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(stream)})
if err := returnValues[0].Interface(); err != nil {
// the function returned an error, we use that
return err.(error)
} else if serr := rawStream.Error(); serr == io.EOF || serr == io.ErrUnexpectedEOF {
return nil
} else {
// no error, we send the special EOS error
return lastStreamResponseError
}
}

// wrap the handler
for i := len(router.hdlrWrappers); i > 0; i-- {
fn = router.hdlrWrappers[i-1](fn)
}

// client.Stream request
r.stream = true

// execute handler
return fn(ctx, r, rawStream)
}

5.stream 调用过程详解

上一篇写了一下rpc调用过程的实现方式,简单来说就是服务端把实现了接口的结构体对象进行反射,抽取方法,签名,保存,客户端调用的时候go-micro封请求数据,服务端接收到请求时,找到需要调用调用的对象和对应的方法,利用反射进行调用,返回数据。 但是没有说stream的实现方式,感觉单独写一篇帖子来说这个更好一些。上一篇帖子是基础,理解了上一篇,stream实现原理一点即破。先说一下使用方式,再说原理。
当前go-micro对 rpc 调用的方式大概如下:
普通的rpc调用 是这样:

1
2
3
4
1.连接服务器或者从缓存池得到连接
2.客户端 ->发送数据 -> 服务端接收
3.服务端 ->返回数据 -> 客户端处理数据
4.关闭连接或者把连接返回到缓存池

当前 rps stream的实现方式 是这样子:

1
2
3
4
1. 连接服务器
2. 客户端多次发送请求-> 服务端接收
3. 服务端多次返回数据-> 客户端处理数据
4. 关闭连接

当数据量比较大的时候我们可以用stream方式分批次传输数据。对于客户端还是服务端没有限制,我们可以根据自己的需要使用stream方式,使用方式也非常的简单,在定义接口的时候在参数或者返回值前面加上stream然后就可以多次进行传输了,使用的代码还是之前写的例子,代码都在github上:
比如我的例子中定义了两个使用stream的接口,一个只在返回值使用stream,另一个是在参数和返回值前都加上了stream,最终的使用方式没有区别

1
2
rpc Stream(model.SRequest) returns (stream model.SResponse) {}
rpc BidirectionalStream(stream model.SRequest) returns (stream model.SResponse) {}

看一下go-micro为我们生成的代码rpcapi.micro.go里,不要被吓到,生成了很多代码,但是没啥理解不了的
Server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Server API for Say service
type SayHandler interface {
// .... others
Stream(context.Context, *model.SRequest, Say_StreamStream) error
BidirectionalStream(context.Context, Say_BidirectionalStreamStream) error
}
type Say_StreamStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*model.SResponse) error
}
type Say_BidirectionalStreamStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*model.SResponse) error
Recv() (*model.SRequest, error)
}
// .... others

Client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Client API for Say service
type SayService interface {
//... others
Stream(ctx context.Context, in *model.SRequest, opts ...client.CallOption) (Say_StreamService, error)
BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error)
}

type Say_StreamService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Recv() (*model.SResponse, error)
}

type Say_BidirectionalStreamService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*model.SRequest) error
Recv() (*model.SResponse, error)
}

    你会发现参数前面加了 Stream后,生成的代码会把你的参数变成一个接口,这个接口主要要的方法是

1
2
3
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error

剩下的两个接口方法是根据你是发送还是接收生成的,如果有发送就会有Send(你的参数),如果有接收会生成Rev() (你的参数, error),但这两个方法只是为了让你使用时方便,里面调用的还是SendMsg(interface)和RecvMsg(interface)方法,但是他们是怎么工作的,如何多次发送和接收传输的数据,是不是感觉很神奇。

我就以TsBidirectionalStream 方法为例开始分析,上一篇和再早之前的帖子已经说了服务端启动的时候都做了哪些操作,这里就不再赘述,
服务端的实现,很简单,不断的获取客户端发过来的数据,再给客户端一次一次的返回一些数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
模拟数据
*/
func (s *Say) BidirectionalStream(ctx context.Context, stream rpcapi.Say_BidirectionalStreamStream) error {
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
for i := int64(0); i < req.Count; i++ {
if err := stream.Send(&model.SResponse{Value: []string {lib.RandomStr(lib.Random(3, 6))}}); err != nil {
return err
}
}
}
return nil
}

启动服务,服务开始监听客户端传过来的数据.....
客户端调用服务端方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 调用 
func TsBidirectionalStream(client rpcapi.SayService) {
rspStream, err := client.BidirectionalStream(context.Background())
if err != nil {
panic(err)
}
// send
go func() {
rspStream.Send(&model.SRequest{Count: 2})
rspStream.Send(&model.SRequest{Count: 5})
// close the stream
if err := rspStream.Close(); err != nil {
fmt.Println("stream close err:", err)
}
}()
// recv
idx := 1
for {
rsp, err := rspStream.Recv()

if err == io.EOF {
break
} else if err != nil {
panic(err)
}

fmt.Printf("test stream get idx %d data %v\n", idx, rsp)
idx++
}
fmt.Println("Read Value End")
}

当客户端在调用rpc的stream方法是要很得到stream

1
2
3
4
5
6
7
8
9
10
rspStream, err := client.BidirectionalStream(context.Background())
//
func (c *sayService) BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error) {
req := c.c.NewRequest(c.name, "Say.BidirectionalStream", &model.SRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
return &sayServiceBidirectionalStream{stream}, nil
}

这个调用c.c.Stream(ctx, req, opts...)是关键,他的内部实现就是和服务器进行连接,然后返回一个stream,进行操作。

1
2
客户端:和服务端建立连接,返回Stream,进行接收和发送数据
服务端:接收客户端连接请求,利用反射找到相应的方法,组织Strem,传给方法,进行数据的发送和接收

建立连接的时候就是一次rpc调用,服务端接受连接,然后客户端发送一次调用,但是传输的是空数据,服务端利用反射找到具体的方法,组织stream,调用具体方法,利用这个连接,客户端和服务端进行多次通信。

img