75

golang nats[5] cluster集群

 6 years ago
source link: https://studygolang.com/articles/14320?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

集群模式

nats的集群模式对客户端来说并不是透明的。

所以集群对发布者和订阅者都是有影响的。

发布者和订阅者都知道连接的是一个集群的服务,而不是一个单点服务,换句话说发布者订阅者都必须指明集群中各个节点的地址。

当然,发布者和订阅者可以只针对集群中的某节点发布消息和订阅消息,不过这并不是集群模式的目的。

目的

提高可用性和可伸缩性。

实现原理

可用性,多个节点,挂掉任意一个,不影响整个集群对外提供服务。

伸缩性,服务端支持随意增加节点。订阅者可以感知服务端节点的变动,但是发布者并不能自动感知。

3个node的集群

$ gnatsd -p 4222 -m 4333 -cluster nats://localhost:4248 -routes nats://localhost:5248,nats://localhost:6248 -DV  
$ gnatsd -p 5222 -m 5333 -cluster nats://localhost:5248 -routes nats://localhost:4248,nats://localhost:6248 -DV 
$ gnatsd -p 6222 -m 6333 -cluster nats://localhost:6248 -routes nats://localhost:4248,nats://localhost:5248 -DV

-p 端口:服务端口,发布者,订阅者需要使用此端口。

-m 端口: 监控端口。

-cluster 地址:作为集群节点对其他节点提供服务的地址,其他节点需要连接的地址。(其他节点的-routes 可以填写此地址)

-routes 地址:此节点,路由到其他地址的列表(也就是其他节点的-cluster)

-DV Debug and trace

gnatsd -p 服务提供端口 -m 服务监控端口 -cluster 集群内node地址 -routes 集群内其他node地址列表 -DV

Server

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "flag"
    "fmt"
    "time"
)

const (
    //url   = "nats://192.168.3.125:4222"
    //url = nats.DefaultURL
    url = "nats://localhost:4222,nats://localhost:6222"
    //url = "nats://localhost:4222,nats://localhost:5222,nats://localhost:6222"
)

var (
    nc  *nats.Conn
    err error
)

func init() {

    if nc, err = nats.Connect(url, nats.DontRandomize(), nats.MaxReconnects(5), nats.ReconnectWait(2*time.Second), nats.DisconnectHandler(func(nc *nats.Conn) {
        fmt.Printf("Got disconnected!\n")
    }),
        nats.ReconnectHandler(func(_ *nats.Conn) {
            fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
        }),
        nats.ClosedHandler(func(nc *nats.Conn) {
            fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
        }), nats.DiscoveredServersHandler(func(conn *nats.Conn) {
            fmt.Printf("Got Discover Server %v!\n", nc.ConnectedUrl())
        }), nats.ErrorHandler(func(conn *nats.Conn, subscription *nats.Subscription, e error) {
            fmt.Printf("Got Error Server %v!\n",e)
        })); checkErr(err) {
        //

    }
}

func main() {
    var (
        servername = flag.String("servername", "y", "name for server")
        queueGroup = flag.String("group", "", "group name for Subscribe")
        subj       = flag.String("subj", "abc", "subject name")
    )
    flag.Parse()

    log.Println(*servername, *queueGroup, *subj)
    startService(*subj, *servername+" worker1", *queueGroup)
    //startService(*subj, *servername+" worker2", *queueGroup)
    //startService(*subj, *servername+" worker3", *queueGroup)

    select {}
}

//receive message
func startService(subj, name, queue string) {
    go async(nc, subj, name, queue)
}

func async(nc *nats.Conn, subj, name, queue string) {
    _, e := nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
        log.Println(name, "Received a message From Async : ", string(msg.Data))
    })

    checkErr(e)
}

func checkErr(err error) bool {
    if err != nil {
        log.Println("error:", err)
        return false
    }
    return true
}

Client

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "strconv"
    "github.com/pborman/uuid"
    "flag"
    "time"
    "fmt"
)

const (
    //url   = "nats://192.168.3.125:4222"
    //url = "nats://localhost:4222"
    //url = "nats://localhost:4222,nats://localhost:6222"
    url = "nats://localhost:4222,nats://localhost:5222,nats://localhost:6222"
    //url = "nats://localhost:5222"
)

var (
    nc  *nats.Conn
    err error
)

func init() {
    if nc, err = nats.Connect(url, nats.DontRandomize(), nats.MaxReconnects(10), nats.ReconnectWait(2*time.Second), nats.DisconnectHandler(func(nc *nats.Conn) {
        fmt.Printf("Got disconnected!\n")
    }),
        nats.ReconnectHandler(func(_ *nats.Conn) {
            fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
        }),
        nats.ClosedHandler(func(nc *nats.Conn) {
            fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
        })); checkErr(err) {
        //
    }
    nc.SetDiscoveredServersHandler(func(conn *nats.Conn) {
        
    })
}

func main() {
    var (
        subj = flag.String("subj", "abc", "subject name")
    )
    flag.Parse()
    log.Println(*subj)
    startClient(*subj)

    time.Sleep(time.Second)
}

//send message to server
func startClient(subj string) {
    for i := 0; i < 1; i++ {
        id := uuid.New()
        log.Println(id)
        nc.Publish(subj, []byte(id+" Golang "+strconv.Itoa(i)))
        //nc.Publish(subj, []byte(id+" Rain "+strconv.Itoa(i)))
        //nc.Publish(subj, []byte(id+" Fog "+strconv.Itoa(i)))
        //nc.Publish(subj, []byte(id+" Cloudy "+strconv.Itoa(i)))
    }
}

func checkErr(err error) bool {
    if err != nil {
        log.Println(err)
        return false
    }
    return true
}

注意

  • 发布者和订阅者都需要指明3个节点的ur地址
    nats://localhost:4222,nats://localhost:5222,nats://localhost:6222
  • 如果3个node都不可用,发布者会发送消息失败。
  • 如果3个node至少有一个可用,订阅者就会收到消息。
  • 如果3个node全都不可用,订阅者会自动断开连接。
  • 增加一个node nats://localhost:7222 ,订阅者可以自动连接。
  • 增加node后,3个node全都不可用,订阅者不会断开连接,可以接受从新node发布的消息。
  • 3个node恢复后,订阅者可以接受3个node的消息。

后续

发布者和订阅者

  • 原始集群中node都不可用
  • 主动查询可用node
  • 接受可用node通知
  • 向可用node发送消息,订阅可用node的消息
  • 以上内容需要配合服务发现中间件或者自己实现

配置文件启动

$ gnatsd -c nodea.cfg
$ gnatsd -c nodeb.cfg
$ gnatsd -c nodec.cfg

nodea.cfg

listen: localhost:4222 # host/port to listen for client connections

http: localhost:4333 # HTTP monitoring port

# Authorization for client connections
#authorization {
  #user:     yasenagat
  # ./util/mkpasswd -p T0pS3cr3t
  #password: $2a$11$W2zko751KUvVy59mUTWmpOdWjpEm5qhcCZRd05GjI/sSOT.xtiHyG
  #ytc
  #token:   $2a$11$ZuYXelbdaRQnOcADEx40yOtinCvEi9c3X64K2Kyx7wLJq7ECPUnA2
  #timeout:  1
#}

# Cluster definition

cluster {

  listen: localhost:4248 # host/port for inbound route connections

  # Authorization for route connections
  #authorization {
    #user: user2
    # ./util/mkpasswd -p T0pS3cr3tT00!
    #password: $2a$11$xH8dkGrty1cBNtZjhPeWJewu/YPbSU.rXJWmS6SFilOBXzmZoMk9m
    #yctc
    #token: $2a$11$d/RrRseSiPOd/fxurspFquSirrjseRFRFGHdRbte7D8wj2laCLcVS
    #timeout: 0.5
  #}

  # Routes are actively solicited and connected to from this server.
  # Other servers can connect to us if they supply the correct credentials
  # in their routes definitions from above.

  routes = [
    nats-route://127.0.0.1:5248
    nats-route://127.0.0.1:6248
  ]
}

# logging options
debug:   false
trace:   true
logtime: false
log_file: "nodea.log"

# pid file
pid_file: "nodea.pid"

# Some system overides

# max_connections
max_connections: 100

# max_subscriptions (per connection)
max_subscriptions: 1000

# maximum protocol control line
max_control_line: 512

# maximum payload
max_payload: 65536

# Duration the server can block on a socket write to a client.  Exceeding the
# deadline will designate a client as a slow consumer.
write_deadline: "2s"

nodeb.cfg

listen: localhost:5222 # host/port to listen for client connections

http: localhost:5333 # HTTP monitoring port

# Authorization for client connections
authorization {
  #user:     yasenagat
  # ./util/mkpasswd -p T0pS3cr3t
  #password: $2a$11$W2zko751KUvVy59mUTWmpOdWjpEm5qhcCZRd05GjI/sSOT.xtiHyG
  #ytb
  token:   $2a$11$ToARKoxzTSTXxKCljOFe4eDmiPQ/EcaB0M7V8mGE1tfgOv97.iECe
  timeout:  1
}

# Cluster definition

cluster {

  listen: localhost:5248 # host/port for inbound route connections

  # Authorization for route connections
  authorization {
    #user: user1
    # ./util/mkpasswd -p T0pS3cr3tT00!
    #password: pass1
    #yctb
    token: $2a$11$EriHSUV8WO7PWUXTxOCY5uP7MhAswLE2tqQQPuz6kaoF89KhO8CcW
    timeout: 0.5
  }

  # Routes are actively solicited and connected to from this server.
  # Other servers can connect to us if they supply the correct credentials
  # in their routes definitions from above.

  routes = [
    nats-route://127.0.0.1:4248
    nats-route://127.0.0.1:6248
  ]
}

# logging options
debug:   false
trace:   true
logtime: false
log_file: "nodeb.log"

# pid file
pid_file: "nodeb.pid"

# Some system overides

# max_connections
max_connections: 100

# max_subscriptions (per connection)
max_subscriptions: 1000

# maximum protocol control line
max_control_line: 512

# maximum payload
max_payload: 65536

# Duration the server can block on a socket write to a client.  Exceeding the
# deadline will designate a client as a slow consumer.
write_deadline: "2s"

nodec.cfg

listen: localhost:6222 # host/port to listen for client connections

http: localhost:6333 # HTTP monitoring port

# Authorization for client connections
#authorization {
  #user:     yasenagat
  # ./util/mkpasswd -p T0pS3cr3t
  #password: $2a$11$W2zko751KUvVy59mUTWmpOdWjpEm5qhcCZRd05GjI/sSOT.xtiHyG
  #ytc
  #token:   $2a$11$HZy0M3lcxxzJRsFhtAoiX.jCuqKLyztcYYZPWRtlR.APhs/4mFYGC
  #timeout:  1
#}

# Cluster definition

cluster {

  listen: localhost:6248 # host/port for inbound route connections

  # Authorization for route connections
  #authorization {
    #user: user2
    # ./util/mkpasswd -p T0pS3cr3tT00!
    #password: $2a$11$xH8dkGrty1cBNtZjhPeWJewu/YPbSU.rXJWmS6SFilOBXzmZoMk9m
    #yctc
    #token: $2a$11$srwaIbFHGwIt37t3GrPynOHSpZ2LHTtw1QXWuznXGOaknEwulP4o6
    #timeout: 0.5
  #}

  # Routes are actively solicited and connected to from this server.
  # Other servers can connect to us if they supply the correct credentials
  # in their routes definitions from above.

  routes = [
    nats-route://127.0.0.1:5248
    nats-route://127.0.0.1:4248
  ]
}

# logging options
debug:   false
trace:   true
logtime: false
log_file: "nodec.log"

# pid file
pid_file: "nodec.pid"

# Some system overides

# max_connections
max_connections: 100

# max_subscriptions (per connection)
max_subscriptions: 1000

# maximum protocol control line
max_control_line: 512

# maximum payload
max_payload: 65536

# Duration the server can block on a socket write to a client.  Exceeding the
# deadline will designate a client as a slow consumer.
write_deadline: "2s"

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK