5

go-kit 上手之example stringsvc3 通过代理实现分布式处理

 1 year ago
source link: https://studygolang.com/articles/11938?fr=sidebar
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

go-kit 上手之example stringsvc3 通过代理实现分布式处理

KingEasternSun · 2017-12-13 10:01:02 · 879 次点击 · 预计阅读时间 8 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2017-12-13 10:01:02 的文章,其中的信息可能已经有所发展或是发生改变。

代理中间件

stringsvc3没有完全按照官网中stringsvc3的写法,而是在stringsvc2的基础上增加了proxy.go
主要就是给uppercase增加了代理中间件,主要步骤分三步:
1)向特定地址代理服务器发送请求的client的编码和解码函数。
2)生成向特定地址代理服务器发送请求的client。
3)用client配合load balancer构建代理服务器中间件。


package main

import (
    "bytes"
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "net/http"
    "net/url"
    "strings"
    "time"

    "github.com/go-kit/kit/sd/lb"

    "golang.org/x/time/rate"

    "github.com/go-kit/kit/endpoint"
    "github.com/go-kit/kit/ratelimit"
    "github.com/sony/gobreaker"

    "io/ioutil"

    "github.com/go-kit/kit/circuitbreaker"

    "github.com/go-kit/kit/log"
    "github.com/go-kit/kit/sd"
    httptransport "github.com/go-kit/kit/transport/http"
)

定义所需类型

type ServiceMiddleware func(StringService) StringService

获取用户指定的代理服务器地址列表,本样例中,用户输入多个代理服务器用”,”分割

func split(s string) []string {
    a := strings.Split(s, ",")
    for i := range a {
        a[i] = strings.TrimSpace(a[i])
    }

    return a
}

根据httptransport.NewClient的参数需要,需要一个将client的request编码的函数(如下encodeRequest),以及将代理服务器返回的数据转为response的函数(如下:decodeUppercaseResponse)

往代理服务发送请求时,将request转为io.ReaderCloser

func encodeRequest(_ context.Context, r *http.Request, request interface{}) error {
    var buf bytes.Buffer
    if err := json.NewEncoder(&buf).Encode(request); err != nil {
        return err
    }
    r.Body = ioutil.NopCloser(&buf)
    return nil

}

func decodeUppercaseResponse(_ context.Context, r *http.Response) (interface{}, error) {
    var response uppercaseResponse
    if err := json.NewDecoder(r.Body).Decode(&response); err != nil {
        return nil, err
    }
    return response, nil
}

创建到特定地址代理服务器的client

func makeUppercaseProxy(ctc context.Context, instance string) endpoint.Endpoint {
    if !strings.HasPrefix(instance, "http") {
        instance = "http://" + instance
    }

    u, err := url.Parse(instance)
    if err != nil {
        panic(err)
    }
    if u.Path == "" {
        u.Path = "/uppercase"
    }

    return httptransport.NewClient(
        "GET",
        u,
        encodeRequest,
        decodeUppercaseResponse,
    ).Endpoint()
}

定义使用了代理机制的新服务

type proxymw struct {
    next      StringService //用于处理Count请求
    uppercase endpoint.Endpoint //load balance处理uppercase
}

//直接用当前服务处理Count请求
func (mw proxymw) Count(ctx context.Context, s string) int {
    return mw.next.Count(ctx, s)
}

//将uppercase请求发往各个代理服务器中(后面会讲到通过Load balancer实现)
func (mw proxymw) Uppercase(ctx context.Context, s string) (string, error) {
    response, err := mw.uppercase(ctx, uppercaseRequest{S: s})
    if err != nil {
        return "", err
    }

    resp := response.(uppercaseResponse)
    if resp.Err != "" {
        return resp.V, errors.New(resp.Err)
    }

    return resp.V, nil
}

根据用户输入的代理服务器地址生成对应的代理服务器中间件

//根据用户输入的多个地址,创建到多个服务器的代理
func proxyMiddleware(ctx context.Context, instances string, logger log.Logger) ServiceMiddleware {
    if instances == "" {
        logger.Log("proxy_to", "none")
        return func(next StringService) StringService { return next }
    }

    var (
        qps         = 100                    //请求频率超过多少会返回错误
        maxAttempts = 3                      //请求在放弃前重试多少次,用于 load balancer
        maxTime     = 250 * time.Millisecond // 请求在放弃前的超时时间,用于 load balancer
    )

    var (
        instanceList = split(instances)
        endpointer   sd.FixedEndpointer
    )
    logger.Log("proxy_to", fmt.Sprint(instanceList))
    for _, instance := range instanceList {
        var e endpoint.Endpoint
        e = makeUppercaseProxy(ctx, instance) //创建client
        e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) //添加breader
        e = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), qps))(e) //添加limiter
        endpointer = append(endpointer, e)
    }
    balancer := lb.NewRoundRobin(endpointer) //添加load balancer

    //Retry封装一个service load balancer,返回面向特定service method的load balancer。到这个endpoint的请求会自动通过
    //load balancer进行分配到各个代理服务器中。返回失败的请求会自动retry直到成功或者到达最大失败次数或者超时。
    retry := lb.Retry(maxAttempts, maxTime, balancer)//添加retry机制
    return func(next StringService) StringService {
        return proxymw{next, retry}
    }
}
func main() {
    var (
        listen = flag.String("listen", ":8080", "http lisetern address")
        proxy  = flag.String("proxy", "", "optional ")
    )
    flag.Parse()
    logger := log.NewLogfmtLogger(os.Stderr)
    logger = log.With(logger, "listern", *listen, "caller", log.DefaultCaller)
    //注意这里的 filedkeys要和 methodField 中的一致,不然会报错
    //fieldKeys := []string{"metod", "error"}
    //2017/10/19 18:09:28 http: panic serving [::1]:55246: label name "metod" missing in label map
    fieldKeys := []string{"method", "error"}
    requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
        Namespace: "my_gropu",
        Subsystem: "string_service",
        Name:      "request_count",
        Help:      "Number of requests received.",
    }, fieldKeys)

    requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
        Namespace: "my_gropu",
        Subsystem: "string_service",
        Name:      "request_latence_microseconds",
        Help:      "Number of requests in misroseconds.",
    }, fieldKeys)

    countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
        Namespace: "my_gropu",
        Subsystem: "string_service",
        Name:      "count_result",
        Help:      "The result of each count method.",
    }, []string{})

    //svc := stringService{}
    //cannot use logMiddleware literal (type logMiddleware) as type stringService in assignment

    var svc StringService
    svc = stringService{}
    svc = proxyMiddleware(context.Background(), *proxy, logger)(svc)
    svc = logMiddleware{logger, svc}
    svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc}

    uppercaseHandler := httptransport.NewServer(
        makeUppercaseEndpoint(svc),
        decodeUpperCaseRequest,
        encodeResponse,
    )

    countHandler := httptransport.NewServer(
        makeCountEndpoint(svc),
        decodeCountRequest,
        encodeResponse,
    )
    http.Handle("/uppercase", uppercaseHandler)
    http.Handle("/count", countHandler)
    http.Handle("/metrics", promhttp.Handler())
    logger.Log("msg", "HTTP", "addr", *listen)
    logger.Log("err", http.ListenAndServe(*listen, nil))
}

相对于stringsvc2.md,就增加了

    svc = proxyMiddleware(context.Background(), *proxy, logger)(svc)

可以看到,利用gokit的middlerware方式书写代码,增加功能非常简单。

Sean-MacBook-Air:stringsrv3 kes$ ./main -listen=:8001
listern=:8001 caller=proxy.go:109 proxy_to=none
listern=:8001 caller=main.go:329 msg=HTTP addr=:8001
listern=:8001 caller=main.go:186 logmethod=uppercase input=foo output=FOO err=null took=1.883µs


Sean-MacBook-Air:stringsrv3 kes$ ./main -listen=:8082
listern=:8082 caller=proxy.go:109 proxy_to=none
listern=:8082 caller=main.go:329 msg=HTTP addr=:8082
listern=:8082 caller=main.go:186 logmethod=uppercase input=bar output=BAR err=null took=1.993µs


Sean-MacBook-Air:stringsrv3 kes$ ./main -listen=:8080 -proxy=localhost:8001,localhost:8082
listern=:8080 caller=proxy.go:123 proxy_to="[localhost:8001 localhost:8082]"
listern=:8080 caller=main.go:329 msg=HTTP addr=:8080
listern=:8080 caller=main.go:186 logmethod=uppercase input=foo output=FOO err=null took=4.496073ms
listern=:8080 caller=main.go:186 logmethod=uppercase input=bar output=BAR err=null took=1.983719ms

Sean-MacBook-Air:goproject kes$ for s in foo bar ;do curl -d"{\"s\":\"$s\"}" localhost:8080/uppercase;done
{"v":"FOO"}
{"v":"BAR"}

有疑问加站长微信联系(非本文作者)

280

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK