qwert456 发表于 2018-11-14 18:19:43

controller侧与agent侧分布式通迅实现-睿云智合

一.背景 在k8s分布式系统中,通迅成为重要的部分。 本文分享一下如何使用通迅中间件。
本文代码相关技术如下:rabbitmqredisgolang k8s集群与集群之间通讯, 我们都可以使用相同的中间件rabbitmq。
本文使用最简单的模式 LB,单实例的RPC调用。 二.分布式调用结构 2.1 rabbitmq lb模式调用 http://pan.xici.com/group5/M02/4A/95/rBABqVvinw2EKknxAAAAAB_Kd78665.png/1010 . agent1 agent2 agent3 同时上报自己在线时, rabbitmq自动调用 controller1 或 controller2 中其中一个实例,再由controller X 写入redis中去。当controller1、controller2需要所有agent状态时, 读取数据都是redis,所以都是一致的。 . agent1 agent2 agent3 获取配置信息时, rabbitmq也自动调用controller1或 controller2 其中一个实例。再由controller X 读取redis或者mysql数据,再返回给agent。不论是调用到controller1还是controller2,返回的数据都是一致。
2.2 rabbitmq 单实例模式调用 http://pan.xici.com/group5/M00/4A/90/rBABqFvinxmEdL1hAAAAABXjtGs988.png/1010 controller 实例下发配置信息时: setp1. 获得当前在线的agent。setp2. 单实例模式rpc调用。向所有的agent发送配置信息。setp3. 可以明确了解有没有agent时下发配置失败的。如果都失败,则本次调用失败.。只要有一个失败,就可以认为需要重发一次命令。 三.代码实现 3.1. rabbitmq rpc 调用 客户端实现 package ingress import ("fmt""time""context""github.com/wzhliang/xing""wise2c/wisecloud-ingress-agent/communicate""wise2c/wisecloud-ingress-agent/log""wise2c/wisecloud-ingress-agent/common" ) type ControllerClient struct {Producer*xing.ClientClient    communicate.ControllerHelperClient} func NewControllerClient() *ControllerClient { agent := &ControllerClient{} //amqp_url := "amqp://guest:guest@localhost:5672/"amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",common.MQUser,common.MQPassword,common.MQHost,common.MQPort,) var err erroragent.Producer, err = xing.NewClient(globalRPCAgentName,amqp_url,xing.SetIdentifier(&xing.NoneIdentifier{}),xing.SetSerializer(&xing.JSONSerializer{}),)if err != nil {log.Error("xing.NewClient() is failed.%s", err.Error())return agent} //LB RPCtarget := fmt.Sprint("ingress.controller")agent.Client = communicate.NewControllerHelperClient(target, agent.Producer) return agent} func (this *ControllerClient) Close() {if this.Producer == nil {return} this.Producer.Close()//this.closed = true} func (this *ControllerClient) OnlineAgent(name string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)defer cancel() log.Debug("OnlineAgent(%s)", name)_, err := this.Client.OnlineAgent(ctx,&communicate.OnlineAgentRequest{Name: name, })if err != nil {return err} return nil} func (this *ControllerClient) GetIngressConfigs(uuid string) (string, error){
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)defer cancel() response, err := this.Client.GetIngressConfigs(ctx,&communicate.GetIngressConfigsRequest{Uuid:   uuid,})if err != nil {return "error", err} return response.Content, err} func AgentHeartbeatToController() {if globalControllerClient == nil {return} var err errorcontent := ""for { err = globalControllerClient.OnlineAgent(globalRPCAgentName)if err != nil {log.Error(err.Error())} //1 time / 2 second.time.Sleep(time.Millisecond*2000)}} 3.2. rabbitmq rpc 调用 服务端实现 package ingress import ("fmt""context""github.com/wzhliang/xing""wise2c/wisecloud-ingress-controller/communicate""wise2c/wisecloud-ingress-controller/log""wise2c/wisecloud-ingress-controller/common") type ControllerServerImp struct{} func (g *ControllerServerImp) OnlineAgent(ctx context.Context, req *communicate.OnlineAgentRequest, rsp *communicate.Void) error {log.Debug("OnlineAgent(%s)", req.Name) err := globalAgentClient.manager.OnlineAgent(req.Name)if err != nil {return err}return nil} func (g *ControllerServerImp) GetIngressConfigs(ctx context.Context, req *communicate.GetIngressConfigsRequest, rsp *communicate.GetIngressConfigsResponse) error { log.Info("GetIngressConfigs(%s)", req.Uuid)rsp.Content = "ok" return nil} func RunRPCServer() {//globalRPCControllerName = fmt.Sprintf("host.controller.%s", common.GetGuid()) //LB RPC.globalRPCControllerName = fmt.Sprintf("ingress.controller") //amqp_url := "amqp://guest:guest@localhost:5672/"amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",common.MQUser,common.MQPassword,common.MQHost,common.MQPort,) svc, err := xing.NewService(globalRPCControllerName,amqp_url,xing.SetSerializer(&xing.JSONSerializer{}),xing.SetBrokerTimeout(15, 5),)if err != nil {log.Error(fmt.Sprintf("MQURL=%s NewService is failed. %s", amqp_url, err.Error()))} communicate.RegisterControllerHelperHandler(svc, &ControllerServerImp{}) log.Info("RPC Server is starting. Connect to the rabbitmq[%s].", amqp_url)err = svc.Run()if err != nil {log.Error(err.Error())}} 3.3. rabbitmq 单实例rpc调用 客户端实现1. rpc lb调用实时上报agent是否在线,实现了类似consul的服务发现的功能.2. ClientManager可以通过redis中的实时数据,管理所有的rpc client. 当 agent 下线,或者3秒之间没有上报状态,则清除指定的rpc client.3. 这样每次下发配置时,可以实时发送到每个rpc单实例服务器实例. package ingress import ("fmt""sync""time""errors""context""github.com/wzhliang/xing""github.com/astaxie/beego/utils""wise2c/wisecloud-ingress-controller/communicate""wise2c/wisecloud-ingress-controller/log""wise2c/wisecloud-ingress-controller/common") type AgentHelper struct {mutex         *sync.Mutexclosed       bool Helper      communicate.AgentHelperClientProducer      *xing.Client} func NewAgentHelper(agent_name string) *AgentHelper {var err error agent := &AgentHelper{mutex:   new(sync.Mutex),closed:false,} //amqp_url := "amqp://guest:guest@localhost:5672/"amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",common.MQUser,common.MQPassword,common.MQHost,common.MQPort,) agent.Producer, err = xing.NewClient(globalRPCControllerName,amqp_url,xing.SetIdentifier(&xing.NoneIdentifier{}),xing.SetSerializer(&xing.JSONSerializer{}),)if err != nil {log.Error(fmt.Sprintf("MQURL=%s NewClient is failed. %s",amqp_url,err.Error()))return agent} //target := fmt.Sprint("ingress.agent.%s", agent_name)agent.Helper = communicate.NewAgentHelperClient(agent_name, agent.Producer) return agent} func (this *AgentHelper) Close() {this.mutex.Lock()defer this.mutex.Unlock() if this.Producer != nil {this.Producer.Close()}this.closed = true} func (this *AgentHelper) SetIngressConfig(content string) error{this.mutex.Lock()defer this.mutex.Unlock() if this.closed {return errors.New("the client is closed.")} ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)defer cancel() log.Info("SetIngressConfig(%s)", content)_, err := this.Helper.SetIngressConfig(ctx,&communicate.SetIngressConfigRequest{content,}) return err} func (this *AgentHelper) DelIngressConfig(uuid string) error {this.mutex.Lock()defer this.mutex.Unlock() if this.closed {return errors.New("the client is closed.")} ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)defer cancel() log.Info("DelIngressConfig(%s)", uuid)_, err := this.Helper.DelIngressConfig(ctx,&communicate.DelIngressConfigRequest{uuid,}) return err} ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////type ClientManager struct {mutex         *sync.MutexPool   *utils.BeeMap} func NewClientManager() *ClientManager {return &ClientManager{mutex:      new(sync.Mutex),Pool:       utils.NewBeeMap(),}} func (p *ClientManager) Init() {go p.RunConnect()} func (p *ClientManager) RunConnect() {for {names, err := globalRedisClient.GetAgentNames()if err != nil {log.Error(err.Error())} name_map := mapint{}for _, name := range names {name_map = 1} for key, v := range p.Pool.Items() {//log.Error("key=%s", key)_, ok := name_mapif ok {//log.Warning("find the %s", key)continue} if v != nil {log.Warning("Close the AgentHelper %s", key)v.(*AgentHelper).Close()} log.Warning("Delete the Pool %s", key)p.Pool.Delete(key)} for _, name := range names {if !p.Pool.Check(name) {log.Warning("New the AgentHelper %s", name)p.Pool.Set(name, NewAgentHelper(name))}} time.Sleep(time.Second * 1)}} func (p *ClientManager) GetClients() mapinterface{} {return p.Pool.Items()} func (p *ClientManager) OnlineAgent(agent_name string) error {p.mutex.Lock()defer p.mutex.Unlock() globalRedisClient.OnlineAgent(agent_name, "1") if !p.Pool.Check(agent_name) {p.Pool.Set(agent_name, NewAgentHelper(agent_name))} return nil} func (p *ClientManager) OfflineAgent(agent_name string) error {p.mutex.Lock()defer p.mutex.Unlock() globalRedisClient.OfflineAgent(agent_name) agent_helper :=p.Pool.Get(agent_name)if agent_helper != nil {agent_helper.(*AgentHelper).Close()p.Pool.Delete(agent_name)} return nil}////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////type AgentClient struct {manager *ClientManager} func NewAgentClient() *AgentClient {return &AgentClient{manager:    NewClientManager(),}} func (client *AgentClient) Init() {client.manager.Init()} type AgentHandlerCallback func(request, response interface{}, helper *AgentHelper) error func (client *AgentClient) AgentHandler(request, response interface{}, callback AgentHandlerCallback) (err error) {maps := client.manager.GetClients() count := 0for k, v := range maps {name := k.(string)if v == nil {continue} helper := v.(*AgentHelper)if callback == nil {continue} err = callback(request, response, helper)if err != nil {return errors.New(fmt.Sprintf("%s %s", name, err.Error()))} count += 1} if count > 0 {return nil} return errors.New("no agent online.")} func (this *AgentClient) SetIngressConfig(request, response interface{}) error {return this.AgentHandler(request, response, func(request, response interface{}, helper *AgentHelper) error {return helper.SetIngressConfig(request.(string))})} func (this *AgentClient) DelIngressConfig(request, response interface{}) error {return this.AgentHandler(request, response, func(request, response interface{}, helper *AgentHelper) error {return helper.DelIngressConfig(request.(string))})}
3.4. rabbitmq 单实例rpc调用.服务端实现 package ingress import ("fmt""context""github.com/wzhliang/xing""wise2c/wisecloud-ingress-agent/communicate""wise2c/wisecloud-ingress-agent/log""wise2c/wisecloud-ingress-agent/common") type AgentServerImp struct{} func (g *AgentServerImp) SetIngressConfig(ctx context.Context, req *communicate.SetIngressConfigRequest, rsp *communicate.Void) error {log.Info("SetIngressConfig(%s)", req.Content) config := &Wise2cIngressConfig{}err := config.Parse([]byte(req.Content))if err != nil {log.Error(err.Error())return err} globalIngressProcess.SetIngressConfig(config) return nil} func (g *AgentServerImp) DelIngressConfig(ctx context.Context, req *communicate.DelIngressConfigRequest, rsp *communicate.Void) error {log.Info("DelIngressConfig(%s)", req.Uuid) globalIngressProcess.DelIngressConfig(req.Uuid) return nil} func RunRPCServer() {//amqp_url := "amqp://guest:guest@localhost:5672/"amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",common.MQUser,common.MQPassword,common.MQHost,common.MQPort,) svc, err := xing.NewService(globalRPCAgentName,amqp_url,xing.SetSerializer(&xing.JSONSerializer{}),)if err != nil {log.Error(fmt.Sprintf("MQURL=%s, NewService() is failed. %s", amqp_url,err.Error()))} communicate.RegisterAgentHelperHandler(svc, &AgentServerImp{}) log.Info("RPC Server is starting. Connect to the rabbitmq[%s].", amqp_url)go LoopRPC(svc)} func LoopRPC(svc *xing.Client) {err := svc.Run()if err != nil {log.Error(err.Error())}} 四.总结 ● 通过 rabbitmq lb调用方式,可以实现从agent侧上报数据到controller侧或者agent侧拉取需要的数据。 ● 通过rabbitmq 单实例调用方式,由于有了之前lb上报agent状态,或者使用第三方 consul.etcd中服务发现功能。我们可以实现从controller侧下发配置到每一个agent,在每个agent实例中完成相同的功能。
页: [1]
查看完整版本: controller侧与agent侧分布式通迅实现-睿云智合