When using or transitioning to Go modules support:
1 2 3 4 5 6 7 8 9
# Go client latest or explicit version go get github.com/nats-io/nats.go/@latest go get github.com/nats-io/nats.go/@v1.9.1 # For latest NATS Server, add /v2 at the end go get github.com/nats-io/nats-server/v2 # NATS Server v1 is installed otherwise # go get github.com/nats-io/nats-server
The helper methods creates two callback handlers to present the user JWT and sign the nonce challenge from the server. The core client library never has direct access to your private key and simply performs the callback for signing the server challenge. The helper will load and wipe and erase memory it uses for each connect or reconnect.
The helper also can take two entries, one for the JWT and one for the NKey seed file.
Bare Nkeys are also supported. The nkey seed should be in a read only file, e.g. seed.txt
1 2 3
> cat seed.txt # This is my seed nkey! SUAGMJH5XLGZKQQWAWKRZJIGMOU4HPFUYLXJMXOO5NLFEO2OOQJ5LPRDPM
This is a helper function which will load and decode and do the proper signing for the server nonce. It will clear memory in between invocations. You can choose to use the low level option and provide the public key and a signature callback on your own.
// tls as a scheme will enable secure connections by default. This will also verify the server name. nc, err := nats.Connect("tls://nats.demo.io:4443")
// If you are using a self-signed certificate, you need to have a tls.Config with RootCAs setup. // We provide a helper method to make this case easier. nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))
// If the server requires client certificate, there is an helper function for that too: cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem") nc, err = nats.Connect("tls://localhost:4443", cert)
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}
// Send via Go channels sendCh <- me
// Receive via Go channels who := <- recvCh
通配符订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// "*" matches any token, at any level of the subject. nc.Subscribe("foo.*.baz", func(m *Msg) { fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data)); })
nc.Subscribe("foo.bar.*", func(m *Msg) { fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data)); })
// ">" matches any length of the tail of a subject, and can only be the last token // E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22' nc.Subscribe("foo.>", func(m *Msg) { fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data)); })
// Matches all of the above nc.Publish("foo.bar.baz", []byte("Hello World"))
队列组
1 2 3 4 5 6 7 8
// All subscriptions with the same queue name will form a queue group. // Each message will be delivered to only one subscriber per queue group, // using queuing semantics. You can have as many queue groups as you wish. // Normal subscribers will continue to work as expected.
nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) { received += 1; })
var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"
nc, err := nats.Connect(servers)
// Optionally set ReconnectWait and MaxReconnect attempts. // This example means 10 seconds total per backend. nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))
// Optionally disable randomization of the server pool nc, err = nats.Connect(servers, nats.DontRandomize())
// Setup callbacks to be notified on disconnects, reconnects and connection closed. nc, err = nats.Connect(servers, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { fmt.Printf("Got disconnected! Reason: %q\n", err) }), nats.ReconnectHandler(func(nc *nats.Conn) { fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl()) }), nats.ClosedHandler(func(nc *nats.Conn) { fmt.Printf("Connection closed. Reason: %q\n", nc.LastError()) }) )
// When connecting to a mesh of servers with auto-discovery capabilities, // you may need to provide a username/password or token in order to connect // to any server in that mesh when authentication is required. // Instead of providing the credentials in the initial URL, you will use // new option setters: nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))
// For token based authentication: nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))
// You can even pass the two at the same time in case one of the server // in the mesh requires token instead of user name and password. nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"), nats.Token("S3cretT0ken"))
// Note that if credentials are specified in the initial URLs, they take // precedence on the credentials specified through the options. // For instance, in the connect call below, the client library will use // the user "my" and password "pwd" to connect to localhost:4222, however, // it will use username "foo" and password "bar" when (re)connecting to // a different server URL that it got as part of the auto-discovery. nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))
NATS Streaming subscriptions are similar to NATS subscriptions, but clients may start their subscription at an earlier point in the message stream, allowing them to receive messages that were published before this client registered interest.
// Subscribe starting with most recently published value sub, err := sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }, stan.StartWithLastReceived())
// Receive all stored values in order sub, err := sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }, stan.DeliverAllAvailable())
// Receive messages starting at a specific sequence number sub, err := sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }, stan.StartAtSequence(22))
// Subscribe starting at a specific time var startTime time.Time ... sub, err := sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }, stan.StartAtTime(startTime))
// Subscribe starting a specific amount of time in the past (e.g. 30 seconds ago) sub, err := sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }, stan.StartAtTimeDelta(time.ParseDuration("30s")))
持久订阅
Replay of messages offers great flexibility for clients wishing to begin processing at some earlier point in the data stream. However, some clients just need to pick up where they left off from an earlier session, without having to manually track their position in the stream of messages. Durable subscriptions allow clients to assign a durable name to a subscription when it is created. Doing this causes the NATS Streaming server to track the last acknowledged message for that clientID + durable name, so that only messages since the last acknowledged message will be delivered to the client.
// Create a NATS connection that you can configure the way you want nc, err = nats.Connect("tls://localhost:4443", nats.ClientCert("mycerts/client-cert.pem", "mycerts/client-key.pem")) if (err != nil) ...
// Then pass it to the stan.Connect() call. sc, err = stan.Connect("test-cluster", "me", stan.NatsConn(nc)) if (err != nil) ...
// Note that you will be responsible for closing the NATS Connection after the streaming // connection has been closed.
ah := func(nuid string, err error) { // process the ack ... }
for i := 1; i < 1000; i++ { // If the server is unable to keep up with the publisher, the number of outstanding acks will eventually // reach the max and this call will block guid, _ := sc.PublishAsync("foo", []byte("Hello World"), ah) }
// Subscribe with manual ack mode and a max in-flight limit of 25 sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received message #: %s\n", string(m.Data)) ... // Does not ack, or takes a very long time to ack ... // Message delivery will suspend when the number of unacknowledged messages reaches 25 }, stan.SetManualAckMode(), stan.MaxInflight(25))