package nats_client import ( "git.inspur.com/sbg-jszt/cfn/cfn-schedule-agent/pkg/log" "github.com/nats-io/nats.go" "time" ) func Publish(subj string, data []byte) error { // Connect Options. opts := []nats.Option{nats.Name("NATS Sample Publisher")} opts = append(opts, nats.UserInfo(NatsConfig.User, NatsConfig.Password)) nc, err := nats.Connect(NatsConfig.Url, opts...) if err != nil { log.Errorf("连接nats失败:%s", err) return err } defer nc.Close() nc.Publish(subj, data) nc.Flush() if err := nc.LastError(); err != nil { //log.Fatal(err) log.Errorf("与nats通信失败:%s", err) return err } return nil } func Request(subj string, data []byte) (*nats.Msg, error) { // Connect Options. opts := []nats.Option{nats.Name("NATS Sample Publisher")} opts = append(opts, nats.UserInfo(NatsConfig.User, NatsConfig.Password)) nc, err := nats.Connect(NatsConfig.Url, opts...) if err != nil { log.Errorf("连接nats失败:%s", err) return nil, err } defer nc.Close() msg, err := nc.Request(subj, data, 5*time.Second) if err != nil { if nc.LastError() != nil { log.Errorf("%v for request", nc.LastError()) } log.Errorf("%v for request", err) return nil, err } nc.Flush() if err := nc.LastError(); err != nil { //log.Fatal(err) log.Errorf("与nats通信失败:%s", err) return nil, err } return msg, nil }