【aria2源码解读】【kafka producer源码分析】【ubuntu编译火狐源码】kvs源码

时间:2025-01-18 12:00:27 来源:光年框架源码 编辑:个性主页源码

1.gRPC负载均衡(自定义负载均衡策略--etcd实现)

kvs源码

gRPC负载均衡(自定义负载均衡策略--etcd实现)

       èƒŒæ™¯

       åœ¨å·¥ä½œå­¦ä¹ ä¸­ä½¿ç”¨gRPC的地方比较多,通常我们都使用的是自带的负载均衡算法,但是在某些场景下我们需要对服务的版本进行控制比如[appV2只能去链接userV3],aria2源码解读在这样的情况下就只能选自定义负载均衡策略

目标

       å®žçŽ°åŸºäºŽç‰ˆæœ¬ï¼ˆversion)的grpc负载均衡器,了解过程后可自己实现更多的负载均衡功能

       æ³¨å†Œä¸­å¿ƒ

       EtcdLease是一种检测客户端存活状况的机制。群集授予具有生存时间的租约。如果etcd群集在给定的TTL时间内未收到keepAlive,则租约到期。为了将租约绑定到键值存储中,每个key最多可以附加一个租约

       æœåŠ¡æ³¨å†Œ(注册服务)

       å®šæ—¶æŠŠæœ¬åœ°æœåŠ¡ï¼ˆAPP)地址,版本等信息注册到服务器

       æœåŠ¡å‘现(客户端发起服务解析请求(APP))

       æŸ¥è¯¢æ³¨å†Œä¸­å¿ƒï¼ˆAPP)下有那些服务

       å¹¶å‘所有的服务建立HTTP2长链接

       é€šè¿‡Etcdwatch监听服务(APP),通过变化更新链接

       è´Ÿè½½å‡è¡¡(客户端发起请求(APP))

       è´Ÿè½½å‡è¡¡é€‰æ‹©åˆé€‚的服务(APPHTTP2长链接)

       å‘起调用

服务注册(注册服务)

       æºç register.go

funcNewRegister(opt...RegisterOptions)(*Register,error){ s:=&Register{ opts:newOptions(opt...),}varctx,cancel=context.WithTimeout(context.Background(),time.Duration(s.opts.RegisterTtl)*time.Second)defercancel()data,err:=json.Marshal(s.opts)iferr!=nil{ returnnil,err}etcdCli,err:=clientv3.New(s.opts.EtcdConf)iferr!=nil{ returnnil,err}s.etcdCli=etcdCli//申请租约resp,err:=etcdCli.Grant(ctx,s.opts.RegisterTtl)iferr!=nil{ returns,err}s.name=fmt.Sprintf("%s/%s",s.opts.Node.Path,s.opts.Node.Id)//注册节点_,err=etcdCli.Put(ctx,s.name,string(data),clientv3.WithLease(resp.ID))iferr!=nil{ returns,err}//续约租约s.keepAliveChan,err=etcdCli.KeepAlive(context.Background(),resp.ID)iferr!=nil{ returns,err}returns,nil}

       åœ¨etcd里面我们可以看到如下信息APPv1版本服务在节点的key/hwholiday/srv/app/app-beb3cb-eb-eb-d-2cfdc7c

{ "node":{ "name":"app","path":"/hwholiday/srv/app","id":"app-beb3cb-eb-eb-d-2cfdc7c","version":"v1","address":"...:"}}

       APPv2版本服务在节点的key/hwholiday/srv/app/app-beb3cb-eb-eb-d-2cfdc7c

{ "node":{ "name":"app","path":"/hwholiday/srv/app","id":"app--eb-eb-c0-2cfdc7c","version":"v2","address":"...:"},}服务发现(客户端发起服务解析请求(APP))

       æºç discovery.go实现grpc内的resolver.Builder接口(Builder创建一个解析器,用于监视名称解析更新)

funcNewDiscovery(opt...ClientOptions)resolver.Builder{ s:=&Discovery{ opts:newOptions(opt...),}etcdCli,err:=clientv3.New(s.opts.EtcdConf)iferr!=nil{ panic(err)}s.etcdCli=etcdClireturns}//Build当调用`grpc.Dial()`时执行func(d*Discovery)Build(targetresolver.Target,ccresolver.ClientConn,optsresolver.BuildOptions)(resolver.Resolver,error){ d.cc=ccres,err:=d.etcdCli.Get(context.Background(),d.opts.SrvName,clientv3.WithPrefix())iferr!=nil{ returnnil,err}for_,v:=rangeres.Kvs{ iferr=d.AddNode(v.Key,v.Value);err!=nil{ log.Println(err)continue}}gofunc(dd*Discovery){ dd.watcher()}(d)returnd,err}//根据官方的建议我们把从注册中心拿到的服务信息储存到Attributes中//Attributescontainsarbitrarydataabouttheresolverintendedfor//consumptionbytheloadbalancingpolicy.//属性包含有关供负载平衡策略使用的解析器的任意数据。//Attributes*attributes.Attributesfunc(d*Discovery)AddNode(key,val[]byte)error{ vardata=new(register.Options)err:=json.Unmarshal(val,data)iferr!=nil{ returnerr}addr:=resolver.Address{ Addr:data.Node.Address}addr=SetNodeInfo(addr,data)d.Node.Store(string(key),addr)returnd.cc.UpdateState(resolver.State{ Addresses:d.GetAddress()})}负载均衡(客户端发起请求(APP))

       æºç version_balancer.go

       gRPC提供了PickerBuilder和Picker接口让我们实现自己的负载均衡策略

//PickerBuilder创建balancer.Picker。typePickerBuilderinterface{ //Build返回一个选择器,gRPC将使用它来选择一个SubConn。Build(infoPickerBuildInfo)balancer.Picker}//gRPC使用Picker来选择一个SubConn来发送RPC。//每次平衡器的内部状态发生变化时,它都会从它的快照中生成一个新的选择器。//gRPC使用的选择器可以通过ClientConn.UpdateState()更新。typePickerinterface{ //选择合适的子链接发送请求Pick(infoPickInfo)(PickResult,error)}

       ä»Žä¸Šé¢å¾—知我们可以干事的地方在Build方法或者Pick方法(调用gRPC方法时先执行Build再执行Pick)

       Build(infoPickerBuildInfo)balancer.Pickerinfo里面有服务的链接,和链接对应的刚刚通过AddNode方法存入的服务信息这里我们可以基于grpc-client层面来做负载,比如(加权随机负载)

       Pick(infoPickInfo)(PickResult,error)info里面有调用的方法名和context.Context通过context.Context我们可以获得这个来获取发起请求的时候填入的参数,这样我们可以很灵活的针对每个方法进行不同的负载这里我们可以基于grpc-client-api层面来做负载

func(*rrPickerBuilder)Build(infobase.PickerBuildInfo)balancer.Picker{ iflen(info.ReadySCs)==0{ returnbase.NewErrPicker(balancer.ErrNoSubConnAvailable)}varscs=make(map[balancer.SubConn]*register.Options,len(info.ReadySCs))forconn,addr:=rangeinfo.ReadySCs{ nodeInfo:=GetNodeInfo(addr.Address)ifnodeInfo!=nil{ scs[conn]=nodeInfo}}iflen(scs)==0{ returnbase.NewErrPicker(balancer.ErrNoSubConnAvailable)}return&rrPicker{ node:scs,}}func(p*rrPicker)Pick(infobalancer.PickInfo)(balancer.PickResult,error){ p.mu.Lock()deferp.mu.Unlock()version:=info.Ctx.Value("version")varsubConns[]balancer.SubConnforconn,node:=rangep.node{ ifversion!=""{ ifnode.Node.Version==version.(string){ subConns=append(subConns,conn)}}}iflen(subConns)==0{ returnbalancer.PickResult{ },errors.New("nomatchfoundconn")}index:=rand.Intn(len(subConns))sc:=subConns[index]returnbalancer.PickResult{ SubConn:sc},nil}客户的使用我们定义的version负载均衡策略r:=discovery.NewDiscovery(discovery.SetName("hwholiday.srv.app"),discovery.SetEtcdConf(clientv3.Config{ Endpoints:[]string{ "...:"},DialTimeout:time.Second*5,}))resolver.Register(r)//连接服务器conn,err:=grpc.Dial("hwholiday.srv.app",//没有使用这个参数grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "LoadBalancingPolicy":"%s"}`,"version")),grpc.WithInsecure(),)iferr!=nil{ log.Fatalf("net.Connecterr:%v",err)}deferconn.Close()//调用服务apiClient:=api.NewApiClient(conn)ctx:=context.WithValue(context.Background(),"version","v1")_,err=apiClient.ApiTest(ctx,&api.Request{ Input:"v1v1v1v1v1"})iferr!=nil{ fmt.Println(err)}运行效果

       æµ‹è¯•æºç 

       è¿è¡ŒAPP服务v1,调用grpc-client使用v1

       APP打印

       å¯åŠ¨æˆåŠŸ===>0.0.0.0:

       input:"v1v1v1v1v1"

       grpc-client打印

       ===RUNTestClient

       v1v1v1v1v1v1v1v1v1v1

       è¿è¡ŒAPP服务v1,调用grpc-client使用v2

       APP打印

       å¯åŠ¨æˆåŠŸ===>0.0.0.0:

       grpc-client打印

       ===RUNTestClient

       rpcerror:code=Unavailabledesc=nomatchfoundconn

总结

       è¯¦æƒ…介绍地址

       æºç åœ°å€:/hwholiday/learning_tools/tree/master/etcd

       é€šè¿‡å­¦ä¹ æˆ‘们可以实现基于version的负载策略,这里只是提供一种思路怎么去实现可能我的这个例子不太适合这个,但是提供了一种思路,欢迎一起讨论。

copyright © 2016 powered by 皮皮网   sitemap