nsq实例

1
2
3
4
5
6
7
nsqlookupd

nsqd --lookupd-tcp-address=0.0.0.0:4160 -tcp-address="0.0.0.0:4153" --data-path=./data1 >./log/nsqd1.log 2>&1 &

nsqd --lookupd-tcp-address=0.0.0.0:4160 -tcp-address="0.0.0.0:4154" --data-path=./data2 -http-address="0.0.0.0:4155" >./log/nsqd2.log 2>&1 &

nsqadmin --lookupd-http-address=127.0.0.1:4161 >./log/nsqadmin.log 2>&1 &

创建Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"log"
"github.com/bitly/go-nsq"
"io/ioutil"
"strconv"
)

var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags)

func sendMsg(message string){
//init default config
config := nsq.NewConfig()
w, _ := nsq.NewProducer("127.0.0.1:4150", config)
err := w.Ping()
if err != nil {
//192.168.2.117:4150,192.168.2.68:4150
log.Fatalln("error ping 10.50.115.16:4150", err)
// switch the second nsq. You can use nginx or HAProxy for HA.
w, _ = nsq.NewProducer("192.168.2.68:4150", config)
}
w.SetLogger(nullLogger, nsq.LogLevelInfo)

err2 := w.Publish("a-test", []byte(message))
if err2 != nil {
log.Panic("Could not connect nsq")
}
w.Stop()
}

func main() {
for i := 0; i < 2; i ++ {
sendMsg("msg index "+ strconv.Itoa(i + 10000))
}
}

创建Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"log"
"github.com/bitly/go-nsq"
"fmt"
)

func doSimpleConsumerTask(){
config := nsq.NewConfig()
q, _ := nsq.NewConsumer("a-test", "ch", config)
q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Printf("message: %v", string(message.Body))
message.Finish()
return nil
}))

ddr := []string {
"127.0.0.1:4151",
"127.0.0.1:4155",
}
err := q.ConnectToNSQLookupds(lookupAddr)

if err != nil {
log.Panic("Could not connect")
}

<-q.StopChan
stats := q.Stats()
fmt.Sprintf("message received %d, finished %d", stats.MessagesReceived, stats.MessagesFinished)
}

func main(){
doSimpleConsumerTask()
}