type Reader interface { ReadHeader(*Message, MessageType) error ReadBody(interface{}) error }
type Writer interface { Write(*Message, interface{}) error }
// Marshaler is a simple encoding interface used for the broker/transport // where headers are not supported by the underlying implementation. type Marshaler interface { Marshal(interface{}) ([]byte, error) Unmarshal([]byte, interface{}) error String() string }
// Message represents detailed information about // the communication, likely followed by the body. // In the case of an error, body may be nil. type Message struct { Id string Type MessageType Target string Method string Endpoint string Error string
// The values read from the socket Header map[string]string Body []byte }
funcconfigure(c *consulRegistry, opts ...registry.Option) { // set opts for _, o := range opts { o(&c.opts) }
// use default config config := consul.DefaultConfig()
if c.opts.Context != nil { // Use the consul config passed in the options, if available if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok { config = co } if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok { c.connect = cn }
// Use the consul query options passed in the options, if available if qo, ok := c.opts.Context.Value("consul_query_options").(*consul.QueryOptions); ok && qo != nil { c.queryOptions = qo } if as, ok := c.opts.Context.Value("consul_allow_stale").(bool); ok { c.queryOptions.AllowStale = as } }
// check if there are any addrs iflen(c.opts.Addrs) > 0 { addr, port, err := net.SplitHostPort(c.opts.Addrs[0]) if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { port = "8500" addr = c.opts.Addrs[0] config.Address = fmt.Sprintf("%s:%s", addr, port) } elseif err == nil { config.Address = fmt.Sprintf("%s:%s", addr, port) } }
if config.HttpClient == nil { config.HttpClient = new(http.Client) }
// Selector builds on the registry as a mechanism to pick nodes // and mark their status. This allows host pools and other things // to be built using various algorithms. type Selector interface { Init(opts ...Option) error Options() Options // Select returns a function which should return the next node Select(service string, opts ...SelectOption) (Next, error) // Mark sets the success/error against a node Mark(service string, node *registry.Node, err error) // Reset returns state back to zero for a service Reset(service string) // Close renders the selector unusable Close() error // Name of the selector String() string }
// Server is a simple micro server abstraction type Server interface { Options() Options Init(...Option) error Handle(Handler) error NewHandler(interface{}, ...HandlerOption) Handler NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber Subscribe(Subscriber) error Start() error Stop() error String() string }
// Router handle serving messages type Router interface { // ServeRequest processes a request to completion ServeRequest(context.Context, Request, Response) error }
// Message is an async message interface type Message interface { Topic() string Payload() interface{} ContentType() string }
// Request is a synchronous request interface type Request interface { // Service name requested Service() string // The action requested Method() string // Endpoint name requested Endpoint() string // Content type provided ContentType() string // Header of the request Header() map[string]string // Body is the initial decoded value Body() interface{} // Read the undecoded request body Read() ([]byte, error) // The encoded message stream Codec() codec.Reader // Indicates whether its a stream Stream() bool }
// Response is the response writer for unencoded messages type Response interface { // Encoded writer Codec() codec.Writer // Write the header WriteHeader(map[string]string) // write a response directly to the client Write([]byte) error }
// Stream represents a stream established with a client. // A stream can be bidirectional which is indicated by the request. // The last error will be left in Error(). // EOF indicates end of the stream. type Stream interface { Context() context.Context Request() Request Send(interface{}) error Recv(interface{}) error Error() error Close() error }
// Handler interface represents a request handler. It's generated // by passing any type of public concrete object with endpoints into server.NewHandler. // Most will pass in a struct. // // Example: // // type Greeter struct {} // // func (g *Greeter) Hello(context, request, response) error { // return nil // } // type Handler interface { Name() string Handler() interface{} Endpoints() []*registry.Endpoint Options() HandlerOptions }
// Subscriber interface represents a subscription to a given topic using // a specific subscriber function or object with endpoints. type Subscriber interface { Topic() string Subscriber() interface{} Endpoints() []*registry.Endpoint Options() SubscriberOptions }
// Service is an interface that wraps the lower level libraries // within go-micro. Its a convenience method for building // and initialising services. type Service interface { Init(...Option) Options() Options Client() client.Client Server() server.Server Run() error String() string }
brew install protobuf go get -u -v github.com/golang/protobuf/{proto,protoc-gen-go} go get -u -v github.com/micro/protoc-gen-micro
protobuf也可以直接从源码安装
1 2 3 4 5 6 7 8
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.6.1/protobuf-all-3.6.1.tar.gz tar zxvf protobuf-all-3.6.1.tar.gz cd protobuf-3.6.1/ ./autogen.sh ./configure make make install protoc -h
// Init initialises options. Additionally it calls cmd.Init // which parses command line flags. cmd.Init is only called // on first Init. func(s *service) Init(opts ...Option) { // process options for _, o := range opts { o(&s.opts) }
s.once.Do(func() { // Initialise the command flags, overriding new service _ = s.opts.Cmd.Init( cmd.Broker(&s.opts.Broker), cmd.Registry(&s.opts.Registry), cmd.Transport(&s.opts.Transport), cmd.Client(&s.opts.Client), cmd.Server(&s.opts.Server), ) }) }
// The registry provides an interface for service discovery // and an abstraction over varying implementations // {consul, etcd, zookeeper, ...} type Registry interface { Init(...Option) error Options() Options Register(*Service, ...RegisterOption) error Deregister(*Service) error GetService(string) ([]*Service, error) ListServices() ([]*Service, error) Watch(...WatchOption) (Watcher, error) String() string }
// Watcher is an interface that returns updates // about services within the registry. type Watcher interface { // Next is a blocking call Next() (*Result, error) Stop() }
// used for default selection as the fall back defaultClient = "rpc" defaultServer = "rpc" defaultBroker = "http" defaultRegistry = "mdns" defaultSelector = "registry" defaultTransport = "http"
func(c *cmd) Before(ctx *cli.Context) error { // If flags are set then use them otherwise do nothing var serverOpts []server.Option var clientOpts []client.Option
// Set the client if name := ctx.String("client"); len(name) > 0 { // only change if we have the client and type differs if cl, ok := c.opts.Clients[name]; ok && (*c.opts.Client).String() != name { *c.opts.Client = cl() } }
// Set the server if name := ctx.String("server"); len(name) > 0 { // only change if we have the server and type differs if s, ok := c.opts.Servers[name]; ok && (*c.opts.Server).String() != name { *c.opts.Server = s() } }
// Set the broker if name := ctx.String("broker"); len(name) > 0 && (*c.opts.Broker).String() != name { b, ok := c.opts.Brokers[name] if !ok { return fmt.Errorf("Broker %s not found", name) }
// Set the registry if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name { r, ok := c.opts.Registries[name] if !ok { return fmt.Errorf("Registry %s not found", name) }
// Set the selector if name := ctx.String("selector"); len(name) > 0 && (*c.opts.Selector).String() != name { s, ok := c.opts.Selectors[name] if !ok { return fmt.Errorf("Selector %s not found", name) }
// No server option here. Should there be? clientOpts = append(clientOpts, client.Selector(*c.opts.Selector)) }
// Set the transport if name := ctx.String("transport"); len(name) > 0 && (*c.opts.Transport).String() != name { t, ok := c.opts.Transports[name] if !ok { return fmt.Errorf("Transport %s not found", name) }
// Parse the server options metadata := make(map[string]string) for _, d := range ctx.StringSlice("server_metadata") { var key, val string parts := strings.Split(d, "=") key = parts[0] iflen(parts) > 1 { val = strings.Join(parts[1:], "=") } metadata[key] = val }
if val := time.Duration(ctx.GlobalInt("register_interval")); val > 0 { serverOpts = append(serverOpts, server.RegisterInterval(val*time.Second)) }
// client opts if r := ctx.Int("client_retries"); r >= 0 { clientOpts = append(clientOpts, client.Retries(r)) }
if t := ctx.String("client_request_timeout"); len(t) > 0 { d, err := time.ParseDuration(t) if err != nil { return fmt.Errorf("failed to parse client_request_timeout: %v", t) } clientOpts = append(clientOpts, client.RequestTimeout(d)) }
if r := ctx.Int("client_pool_size"); r > 0 { clientOpts = append(clientOpts, client.PoolSize(r)) }
if t := ctx.String("client_pool_ttl"); len(t) > 0 { d, err := time.ParseDuration(t) if err != nil { return fmt.Errorf("failed to parse client_pool_ttl: %v", t) } clientOpts = append(clientOpts, client.PoolTTL(d)) }
// We have some command line opts for the server. // Lets set it up iflen(serverOpts) > 0 { if err := (*c.opts.Server).Init(serverOpts...); err != nil { log.Fatalf("Error configuring server: %v", err) } }
// Use an init option? iflen(clientOpts) > 0 { if err := (*c.opts.Client).Init(clientOpts...); err != nil { log.Fatalf("Error configuring client: %v", err) } }
// check if we already have a deadline d, ok := ctx.Deadline() if !ok { // no deadline so we create a new one ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) } else { // got a deadline so no need to setup context // but we need to set the timeout we pass along opt := WithRequestTimeout(d.Sub(time.Now())) opt(&callOpts) }
// should we noop right here? select { case <-ctx.Done(): return errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err())) default: }
// make copy of call method rcall := r.call
// wrap the call in reverse for i := len(callOpts.CallWrappers); i > 0; i-- { rcall = callOpts.CallWrappers[i-1](rcall) }
// make the call err = rcall(ctx, node, request, response, callOpts) r.opts.Selector.Mark(request.Service(), node, err) return err }
ch := make(chanerror, callOpts.Retries+1) var gerr error
for i := 0; i <= callOpts.Retries; i++ { gofunc(i int) { ch <- call(i) }(i)
select { case <-ctx.Done(): return errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err())) case err := <-ch: // if the call succeeded lets bail early if err == nil { returnnil }
retry, rerr := callOpts.Retry(ctx, request, i, err) if rerr != nil { return rerr }
// get optional handlers if h.ht.opts.Context != nil { handlers, ok := h.ht.opts.Context.Value("http_handlers").(map[string]http.Handler) if ok { for pattern, handler := range handlers { mux.Handle(pattern, handler) } } }
// default http2 server srv := &http.Server{ Handler: mux, }
// insecure connection use h2c if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) { srv.Handler = h2c.NewHandler(mux, &http2.Server{}) }
// declare a local error to see if we errored out already // keep track of the type, to make sure we return // the same one consistently rawStream := &rpcStream{ context: ctx, codec: cc.(codec.Codec), request: r, id: req.msg.Id, }
// Invoke the method, providing a new value for the reply. fn := func(ctx context.Context, req Request, stream interface{})error { returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(stream)}) if err := returnValues[0].Interface(); err != nil { // the function returned an error, we use that return err.(error) } elseif serr := rawStream.Error(); serr == io.EOF || serr == io.ErrUnexpectedEOF { returnnil } else { // no error, we send the special EOS error return lastStreamResponseError } }
// wrap the handler for i := len(router.hdlrWrappers); i > 0; i-- { fn = router.hdlrWrappers[i-1](fn) }
// Client API for Say service type SayService interface { //... others Stream(ctx context.Context, in *model.SRequest, opts ...client.CallOption) (Say_StreamService, error) BidirectionalStream(ctx context.Context, opts ...client.CallOption) (Say_BidirectionalStreamService, error) }