19

golang如何使用Spring Cloud Stream

 3 years ago
source link: https://studygolang.com/articles/31018
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

慎重声明,只代表本人观点,不一定代表实际。看了差不多半天Spring Cloud Stream中的kafka源代码,差不多断断续续折腾了一个月,终于在golang中使用kafka发送给Spring Cloud Stream并且成功处理

Spring Cloud Stream当使用@StreamListener中的condition,通过head进行选择的时候,其中MessageHeader是需要包含三个信息:

id UUID类型

contentType 字符串类型,内容类型,可以为:application/json

spring_json_header_types header中的值类型,使用golang的时候,例如:{"partitionKey":"java.lang.String","scst_partition":"java.lang.Integer","contentType":"java.lang.String"}

注意id必须基于java序列化格式,可以参考: https://www.jianshu.com/p/08fe6ffe26d5

直接上代码:

func testKafka() {
    sarama.Logger = log.New(os.Stdout, "", log.LstdFlags)

    config := sarama.NewConfig()

    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Version = sarama.MaxVersion

    client, err := sarama.NewClient(strings.Split("localhost:9092", ","), config)

    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    producer, err := sarama.NewSyncProducerFromClient(client)

    if err != nil {
        log.Fatalf("unable to create producer: %q", err)
    }

    defer producer.Close()

    serviceErrorLog := &ServiceErrorLog{ApplicationName: "test-service", ServerIp: "127.0.0.1", Path: "/", QueryParams: "/QueryParams", Message: "Message", Trace: "Trace", LogTime: time.Now().Format(DATE_TIME_PATTERN)}

    if err == nil {
        key, err1 := uuid.NewRandom()

        if err1 != nil {
            log.Fatalf("unable to create uuid: %q", err1)
        }

        //headers := &MessageHeader{Id: key.String(), ContentType: "application/json", PartitionKey: "service-error-logs", Timestamp: time.Now().UnixNano()}

        //genericMessage := &ServiceErrorLogGenericMessage{Headers: *headers, Payload: *serviceErrorLog}

        text, _ := json.Marshal(serviceErrorLog)

        id := utility.UUIDJavaBytes(key)
        contentType := []byte("application/json")
        partitionKey := []byte("service-error-logs")
        springJsonHeaderTypes := []byte{123, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 75, 101, 121, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 115, 99, 115, 116, 95, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125}

        message := &sarama.ProducerMessage{
            Topic: "log-service-topic",
            Headers: []sarama.RecordHeader{
                {Key: []byte("id"), Value: id},
                {Key: []byte("contentType"), Value: contentType},
                {Key: []byte("partitionKey"), Value: partitionKey},
                {Key: []byte("spring_json_header_types"), Value: springJsonHeaderTypes},
            },
            Value: sarama.StringEncoder(text)}

        fmt.Println(message)

        partition, offset, err := producer.SendMessage(message)

        fmt.Println(partition, offset, err)

    }

}

有疑问加站长微信联系

iiUfA3j.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK