分布式组件etcd应用-睿云智合技术漫谈分享
一、搭建etcd环境 我将使用 etcd v3版本, so,本地先建个单机版本ETCD环境。 docker pull xieyanze/etcd3:latest docker run —name etcd-v3.0.9 -d -v /tmp:/data \\ -p 2379:2379 -p 2380:2380 -p 4001:4001 -p 7001:7001 xieyanze/etcd3:latest docker exec -it etcd-v3.0.9 sh当前默认还是v2版本通过设定环境变量export ETCDCTL_API=3,设置成V3版本。export ETCDCTL_API=3 etcdctl put /test/ok 11etcdctl put /test/ok 22 etcdctl del/test/gg#删除所有/test前缀的节点etcdctl del/test --prefix etcdctl get /test/ok# 前缀查询etcdctl get /test/ok --prefix 二.软件逻辑结构 http://pan.xici.com/group5/M00/4A/91/rBABqFvio-WEYWjSAAAAAKqpzmE461.png/1010 1. k8s master clusterdev-7
dev-82. k8s slave cluster 1 env1
dev-1
dev-2
dev-33. k8s slave cluster 2 env2
dev-4
dev-5
dev-6三. controller 与 agent 服务注册与发现
实现原理:注意: etcd v3版本, k/v 的超时间时TTL最小5秒种.
1.每2秒钟,每个服务向etcd发送一次心跳包,证明自己还活着.
2.当服务退出时, 主动删除etcd的key. 或者等到TTL超时之后,自动下线.
3.controller需要获得agent的状态,直接GET 就能获得当前agent在线状态
4.agent需要获得controller的状态,直接GET 就能获得当前controller在线状态 http://pan.xici.com/group4/M01/47/5E/rBABplvio_GETOFwAAAAAFGkEwc974.png/1010 controller 目录
目录值TTL
ingress/controller/dev7_xxx{"ip":xxx}5
ingress/controller/dev8_xxx{"ip":xxx}5
agent 目录
目录值TTL
ingress/agent/env1/dev1_xxx{"ip":xxx}5
ingress/agent/env1/dev2_xxx{"ip":xxx}5
ingress/agent/env1/dev3_yyy{"ip":xxx}5
ingress/agent/env2/dev4_xxx{"ip":xxx}5
ingress/agent/env2/dev5_xxx{"ip":xxx}5
ingress/agent/env2/dev6_yyy{"ip":xxx}5
四.软件业务的实现. 4.1 controller side: 1. 客户端调用controller restful api.controller 直接写入ETCD,同时写入副本到mysql.2. controller 如果关注于agent的变化.只需要watch ingress/agent这个目录3. controller 是无状态,不需要同步多个实例之间的数据,可以任意的scale它的实例数.4. 如果controller挂掉之后,重启加载mysql的数据库同步到etcd中. 4.2 controller需要了解规则执行状态 http://pan.xici.com/group4/M01/47/5E/rBABplvio_yEEKb5AAAAAKQrHVY482.png/1010 etcd 目录
目录值TTL
ingress/ingress_config/env1/${config_uuid1}/status/dev1_xxx15
ingress/ingress_config/env1/${config_uuid1}/status/dev2_xxx15
ingress/ingress_config/env1/${config_uuid1}/status/dev3_xxx15
ingress/ingress_config/env1/${config_uuid2}/status/dev1_xxx15
ingress/ingress_config/env1/${config_uuid2}/status/dev2_xxx15
ingress/ingress_config/env1/${config_uuid2}/status/dev3_xxx15
ingress/ingress_config/env2/${config_uuid3}/status/dev4_xxx15
ingress/ingress_config/env2/${config_uuid3}/status/dev5_xxx15
ingress/ingress_config/env2/${config_uuid3}/status/dev6_xxx15
agent的执行状态直接写入配置状态中,
先获得当前ingress/agent/env1目录下的agent列表,对比ingress/ingress_agent/env1/${config_uuid1}/status目录下的规则完成之后反馈列表, 每一个都存在时,则全部执行成功.4.3 agent side:1. 不同集群agent 通过etcd的watch功能在第一时间,获得监听到所有数据的变化 新建,删除,更新2. 不同集群agent 定时3分钟获得自已环境下的列表信息,同步处理相关信息3. 如果agent挂了之后, 重启加载一次etcd中所有的ingress_conifg.五. 代码实例 5.1 etcd clientv3 的封装1.连接管理,支持TLS
2.增,删,查, 支持自动超时的设值
3.watch 监听目录或KEY的值的变化(PUT,DELETE)package main import ("fmt""time"// "github.com/coreos/etcd/pkg/transport""github.com/coreos/etcd/clientv3""golang.org/x/net/context"// "sync") type EtcdData struct {Key stringValue string} type EtcdHelper struct {RequestTimeout time.DurationClient *clientv3.Client} func NewEtcdHelper() *EtcdHelper { //tlsInfo := transport.TLSInfo{// CertFile: "/tmp/test-certs/test-name-1.pem",// KeyFile: "/tmp/test-certs/test-name-1-key.pem",// TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",//}//tlsInfo := transport.TLSInfo{// CertFile: "./tls/apiserver.crt",// KeyFile: "./tls/apiserver.key",//}//tlsConfig, err := tlsInfo.ClientConfig()//if err != nil {// fmt.Printf("%s", err.Error())// return nil//} //cli, err := clientv3.New(clientv3.Config{// Endpoints: []string{"dev-7:2379"},// DialTimeout: 3 * time.Second,// TLS: tlsConfig,//}) cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"},//Endpoints: []string{"http://dev-7:2379"},DialTimeout: 3 * time.Second,}) if err != nil {fmt.Printf("%s", err.Error())return nil} return &EtcdHelper{RequestTimeout: 5 *time.Second,Client: cli,}} func (c *EtcdHelper) Release() {if c.Client != nil {c.Client.Close()}} func (c *EtcdHelper) PutValue(key string, value string, ttl int64) error {ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)defer cancel() // minimum lease TTL is 5-secondresp, err := c.Client.Grant(context.TODO(), ttl)if err != nil {fmt.Printf("%s\\n", err.Error())return err} _, err = c.Client.Put(ctx, key, value, clientv3.WithLease(resp.ID))if err != nil {fmt.Printf("%s\\n", err.Error())return err} return nil} func (c *EtcdHelper) SetValue(key string, value string) error {ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)defer cancel() _, err := c.Client.Put(ctx, key, value)if err != nil {fmt.Printf("%s\\n", err.Error())return err} return nil} func (c *EtcdHelper) GetValue(key string) []EtcdData {ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)defer cancel()resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix())if err != nil {fmt.Printf("%s\\n", err.Error())return nil} var kv_slice []EtcdDatafor _, ev := range resp.Kvs {//fmt.Printf("%s : %s\\n", ev.Key, ev.Value) kv := EtcdData{string(ev.Key), string(ev.Value)}kv_slice = append(kv_slice, kv)} return kv_slice} func (c *EtcdHelper) DelValue(key string) error {ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)defer cancel()_, err := c.Client.Delete(ctx, key, clientv3.WithPrefix())if err != nil {fmt.Printf("%s\\n", err.Error())return err} return nil} func (c *EtcdHelper) Watch(key string) {rch := c.Client.Watch(context.Background(), key, clientv3.WithPrefix())for wresp := range rch {for _, ev := range wresp.Events {fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)}}} func (c *EtcdHelper) Listen(key string) clientv3.WatchChan {return c.Client.Watch(context.Background(), key, clientv3.WithPrefix())}5.2 controller 的代码实现 1.controller上线,下线功能
2.controller定时发送心跳包到etcd.
3.controller监听agent的变化.(1-3)完成服务注册与发现
4.controller通过下发配置到etcd,通知所有watch ingress_config变化的agent////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////type ControllerClient struct {Period time.DurationName stringIP stringHelper *EtcdHelper StopChachan int //Lock *sync.Mutex} func NewControllerClient(name string, host_ip string) *ControllerClient {return &ControllerClient{Period:2,Name: name,IP: host_ip,Helper:NewEtcdHelper(),StopCha: make(chan int, 10),//Lock: new(sync.Mutex),}} func (cc *ControllerClient) Init(display bool) {go func() {cc.OnLine() for {select {case <-cc.StopCha:fmt.Printf("online goroutinue is exited.")returncase <-time.After(time.Second * cc.Period):cc.OnLine()}}}() if display {go func() {watch_chan := cc.Helper.Listen("/ingress/agent")for wresp := range watch_chan {for _, ev := range wresp.Events {fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)}}}()}} func (cc *ControllerClient) OnLine() {key := fmt.Sprintf("/ingress/controller/%s", cc.Name) //cc.Lock.Lock()//defer cc.Lock.Unlock()err := cc.Helper.PutValue(key, "1", 5) if err != nil{fmt.Printf(err.Error())}} func (cc *ControllerClient) OffLine() {close(cc.StopCha) key := fmt.Sprintf("/ingress/controller/%s", cc.Name) //cc.Lock.Lock()//defer cc.Lock.Unlock()err := cc.Helper.DelValue(key) if err != nil{fmt.Printf(err.Error())}} func (cc *ControllerClient) GetIngressConfig(env_uuid string, uuid string) []EtcdData {//TODO. first save to mysql.key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)//cc.Lock.Lock()//defer cc.Lock.Unlock()return cc.Helper.GetValue(key) } func (cc *ControllerClient) SetIngressConfig(env_uuid string, uuid string, config string) {//TODO. first save to mysql. key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)//cc.Lock.Lock()//defer cc.Lock.Unlock()err := cc.Helper.SetValue(key, config) if err != nil{fmt.Printf(err.Error())}} func (cc *ControllerClient) DelIngressConfig(env_uuid string, uuid string) {//TODO. first update to mysql. key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid) //cc.Lock.Lock()//defer cc.Lock.Unlock()err := cc.Helper.DelValue(key) if err != nil {fmt.Printf(err.Error())}} 5.3 agent代码实现 1.agent上线,下线功能
2.agent定时发送心跳包到etcd.
3.agent监听(watch) controller的变化.(1-3)完成服务注册与发现
4.agnet监听(watch) ingress_config变化的agent,实时完成更新或设置配置,删除配置功能. ////////////////////////////////////////////////////////////////////////////////////////////type AgentClient struct {LivePeriod time.DurationFirstConfigPerid time.DurationSyncConfigPeriod time.Duration Name stringEnvUUID stringIP stringHelper *EtcdHelper StopCha chan struct{}} func NewAgentClient(name string, env_uuid string, host_ip string) *AgentClient {return &AgentClient{LivePeriod: 2,FirstConfigPerid: 3,SyncConfigPeriod: 60, Name: name,EnvUUID: env_uuid,IP: host_ip,Helper: NewEtcdHelper(),StopCha: make(chan struct{}, 1),}} func (ac *AgentClient) Init(display bool) {//我还活着,不要干掉我.go func() {ac.OnLine() for {select {case <-ac.StopCha:returncase <-time.After(time.Second *ac.LivePeriod):ac.OnLine()}}}() //if display {// go func() {// watch_chan := cc.Helper.Listen("/ingress/agent")// for wresp := range watch_chan {// for _, ev := range wresp.Events {// fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)// }// }// }()//} //重启之后,第一次同步 和 定期同步.//go func() {//// time.Sleep(time.Second * ac.FirstConfigPerid)// ac.SyncIngressConfigs()//// for {// select {// case <-ac.StopCha:// return// case <-time.After(time.Second * ac.SyncConfigPeriod):// ac.SyncIngressConfigs()// }// }//}() if display {//监听controller变化(等待处理掉线自动重连后,重监听)go func() {watch_chan := ac.Helper.Listen("/ingress/controller")for wresp := range watch_chan {for _, ev := range wresp.Events {fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)}}}()} //监听本环境下ingress_config的变化(等待处理掉线自动重连, 重监听)go func() {key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)watch_chan := ac.Helper.Listen(key)for wresp := range watch_chan {for _, ev := range wresp.Events {fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value) switch ev.Type.String() {case "PUT":fmt.Printf("agent=%s SetIngressConfig(%s, %s)\\n", ac.Name, ev.Kv.Key, ev.Kv.Value)//TODO: SetIngressConfig(key, value)breakcase "DELETE":fmt.Printf("agent=%s DelIngressConfig(%s)\\n", ac.Name,ev.Kv.Key)//TODO: DelIngressConfig(key)break}}}}()} func (ac *AgentClient) OnLine() {key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)err := ac.Helper.PutValue(key, fmt.Sprintf(`{"name":"%s", "env_uuid":"%s", "ip":"%s"}`, ac.Name, ac.EnvUUID, ac.IP), 5)if err != nil{fmt.Printf(err.Error())}} func (ac *AgentClient) OffLine() {//ac.StopCha <- 1close(ac.StopCha) key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)err := ac.Helper.DelValue(key)if err != nil{fmt.Printf(err.Error())}} func (ac *AgentClient) UpdateIngressStatus(uuid string) {key := fmt.Sprintf("/ingress/ingress_config_status/%s/%s/%s", ac.EnvUUID, uuid, ac.Name)err := ac.Helper.DelValue(key)if err != nil{fmt.Printf(err.Error())}} //服务重启之后,第一次先调用 并用 定时同步func (ac *AgentClient) SyncIngressConfigs() {key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)kv_slice := ac.Helper.GetValue(key)if kv_slice != nil {//TODO: ingressConfig.SyncIngressConfigs(kv_slice)for _, kv := range kv_slice {fmt.Printf("name=%s, key:%s-----value:%s\\n", ac.Name, kv.Key, kv.Value)}}} ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////func main() { controller1 := NewControllerClient("dev-7_001", "192.168.0.10")controller1.Init(false)controller2 := NewControllerClient("dev-8_002", "192.168.0.11")controller2.Init(false)controller3 := NewControllerClient("dev-8_003", "192.168.0.12")controller3.Init(false) agent1 := NewAgentClient("dev-1_001", "1", "192.168.0.1")agent1.Init(false) agent2 := NewAgentClient("dev-2_001", "1", "192.168.0.2")agent2.Init(false) agent3 := NewAgentClient("dev-3_001", "1", "192.168.0.3")agent3.Init(false) agent4 := NewAgentClient("dev-4_001", "1", "192.168.0.4")agent4.Init(false) agent5 := NewAgentClient("dev-5_001", "1", "192.168.0.5")agent5.Init(false) agent6 := NewAgentClient("dev-6_001", "1", "192.168.0.6")agent6.Init(false) agent7 := NewAgentClient("dev-7_001", "1", "192.168.0.7")agent7.Init(false) agent8 := NewAgentClient("dev-8_001", "1", "192.168.0.8")agent8.Init(false) agent9 := NewAgentClient("dev-9_001", "1", "192.168.0.9")agent9.Init(false) agent10 := NewAgentClient("dev-10_001", "1", "192.168.0.10")agent10.Init(false) time.Sleep(time.Second*1)controller3.SetIngressConfig("1", "0001", `{"config":"helloworld"}`)controller3.DelIngressConfig("1", "0001") controller3.SetIngressConfig("1", "0002", `{"config":"helloworld"}`)controller3.DelIngressConfig("1", "0002") controller3.SetIngressConfig("1", "0003", `{"config":"helloworld"}`)controller3.DelIngressConfig("1", "0003") controller3.SetIngressConfig("1", "0004", `{"config":"helloworld"}`)controller3.DelIngressConfig("1", "0004") controller3.SetIngressConfig("1", "0005", `{"config":"helloworld"}`)controller3.DelIngressConfig("1", "0005") forever := make(chan struct{})<-forever}
页:
[1]