马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?注册
×
一、搭建etcd环境 我将使用 etcd v3版本, so,本地先建个单机版本ETCD环境。 docker pull xieyanze/etcd3:latest docker run —name etcd-v3.0.9 -d -v /tmp:/data \\ -p 2379:2379 -p 2380:2380 -p 4001:4001 -p 7001:7001 xieyanze/etcd3:latest docker exec -it etcd-v3.0.9 sh 当前默认还是v2版本通过设定环境变量export ETCDCTL_API=3,设置成V3版本。 export ETCDCTL_API=3 etcdctl put /test/ok 11 etcdctl put /test/ok 22 etcdctl del /test/gg #删除所有/test前缀的节点 etcdctl del /test --prefix etcdctl get /test/ok # 前缀查询 etcdctl get /test/ok --prefix 二.软件逻辑结构 1. k8s master cluster
dev-7
dev-8 2. k8s slave cluster 1 env1
dev-1
dev-2
dev-3 3. k8s slave cluster 2 env2
dev-4
dev-5
dev-6 三. controller 与 agent 服务注册与发现
实现原理:注意: etcd v3版本, k/v 的超时间时TTL最小5秒种.
1.每2秒钟,每个服务向etcd发送一次心跳包,证明自己还活着.
2.当服务退出时, 主动删除etcd的key. 或者等到TTL超时之后,自动下线.
3.controller需要获得agent的状态,直接GET [ingress/agent/${env_uuid}/]就能获得当前agent在线状态
4.agent需要获得controller的状态,直接GET [ingress/controller]就能获得当前controller在线状态 controller 目录 | | | ingress/controller/dev7_xxx | | | ingress/controller/dev8_xxx | | |
agent 目录 | | | ingress/agent/env1/dev1_xxx | | | ingress/agent/env1/dev2_xxx | | | ingress/agent/env1/dev3_yyy | | | ingress/agent/env2/dev4_xxx | | | ingress/agent/env2/dev5_xxx | | | ingress/agent/env2/dev6_yyy | | |
四.软件业务的实现. 4.1 controller side: 1. 客户端调用controller restful api.controller 直接写入ETCD,同时写入副本到mysql. 2. controller 如果关注于agent的变化.只需要watch ingress/agent这个目录 3. controller 是无状态,不需要同步多个实例之间的数据,可以任意的scale它的实例数. 4. 如果controller挂掉之后,重启加载mysql的数据库同步到etcd中. 4.2 controller需要了解规则执行状态 etcd 目录 | | | ingress/ingress_config/env1/${config_uuid1}/status/dev1_xxx | | | ingress/ingress_config/env1/${config_uuid1}/status/dev2_xxx | | | ingress/ingress_config/env1/${config_uuid1}/status/dev3_xxx | | | ingress/ingress_config/env1/${config_uuid2}/status/dev1_xxx | | | ingress/ingress_config/env1/${config_uuid2}/status/dev2_xxx | | | ingress/ingress_config/env1/${config_uuid2}/status/dev3_xxx | | | ingress/ingress_config/env2/${config_uuid3}/status/dev4_xxx | | | ingress/ingress_config/env2/${config_uuid3}/status/dev5_xxx | | | ingress/ingress_config/env2/${config_uuid3}/status/dev6_xxx | | |
agent的执行状态直接写入配置状态中,
先获得当前ingress/agent/env1目录下的agent列表,对比ingress/ingress_agent/env1/${config_uuid1}/status目录下的规则完成之后反馈列表, 每一个都存在时,则全部执行成功. 4.3 agent side: 1. 不同集群agent 通过etcd的watch功能在第一时间,获得监听到所有数据的变化 新建,删除,更新 2. 不同集群agent 定时3分钟获得自已环境下的列表信息,同步处理相关信息 3. 如果agent挂了之后, 重启加载一次etcd中所有的ingress_conifg. 五. 代码实例 5.1 etcd clientv3 的封装 1.连接管理,支持TLS
2.增,删,查, 支持自动超时的设值
3.watch 监听目录或KEY的值的变化(PUT,DELETE) package main import ( "fmt" "time" // "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" // "sync" ) type EtcdData struct { Key string Value string } type EtcdHelper struct { RequestTimeout time.Duration Client *clientv3.Client } func NewEtcdHelper() *EtcdHelper { //tlsInfo := transport.TLSInfo{ // CertFile: "/tmp/test-certs/test-name-1.pem", // KeyFile: "/tmp/test-certs/test-name-1-key.pem", // TrustedCAFile: "/tmp/test-certs/trusted-ca.pem", //} //tlsInfo := transport.TLSInfo{ // CertFile: "./tls/apiserver.crt", // KeyFile: "./tls/apiserver.key", //} //tlsConfig, err := tlsInfo.ClientConfig() //if err != nil { // fmt.Printf("%s", err.Error()) // return nil //} //cli, err := clientv3.New(clientv3.Config{ // Endpoints: []string{"dev-7:2379"}, // DialTimeout: 3 * time.Second, // TLS: tlsConfig, //}) cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, //Endpoints: []string{"http://dev-7:2379"}, DialTimeout: 3 * time.Second, }) if err != nil { fmt.Printf("%s", err.Error()) return nil } return &EtcdHelper{ RequestTimeout: 5 *time.Second, Client: cli, } } func (c *EtcdHelper) Release() { if c.Client != nil { c.Client.Close() } } func (c *EtcdHelper) PutValue(key string, value string, ttl int64) error { ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout) defer cancel() // minimum lease TTL is 5-second resp, err := c.Client.Grant(context.TODO(), ttl) if err != nil { fmt.Printf("%s\\n", err.Error()) return err } _, err = c.Client.Put(ctx, key, value, clientv3.WithLease(resp.ID)) if err != nil { fmt.Printf("%s\\n", err.Error()) return err } return nil } func (c *EtcdHelper) SetValue(key string, value string) error { ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout) defer cancel() _, err := c.Client.Put(ctx, key, value) if err != nil { fmt.Printf("%s\\n", err.Error()) return err } return nil } func (c *EtcdHelper) GetValue(key string) []EtcdData { ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout) defer cancel() resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { fmt.Printf("%s\\n", err.Error()) return nil } var kv_slice []EtcdData for _, ev := range resp.Kvs { //fmt.Printf("%s : %s\\n", ev.Key, ev.Value) kv := EtcdData{string(ev.Key), string(ev.Value)} kv_slice = append(kv_slice, kv) } return kv_slice } func (c *EtcdHelper) DelValue(key string) error { ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout) defer cancel() _, err := c.Client.Delete(ctx, key, clientv3.WithPrefix()) if err != nil { fmt.Printf("%s\\n", err.Error()) return err } return nil } func (c *EtcdHelper) Watch(key string) { rch := c.Client.Watch(context.Background(), key, clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } } func (c *EtcdHelper) Listen(key string) clientv3.WatchChan { return c.Client.Watch(context.Background(), key, clientv3.WithPrefix()) } 5.2 controller 的代码实现 1.controller上线,下线功能
2.controller定时发送心跳包到etcd.
3.controller监听agent的变化.(1-3)完成服务注册与发现
4.controller通过下发配置到etcd,通知所有watch ingress_config变化的agent //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// type ControllerClient struct { Period time.Duration Name string IP string Helper *EtcdHelper StopCha chan int //Lock *sync.Mutex } func NewControllerClient(name string, host_ip string) *ControllerClient { return &ControllerClient{ Period: 2, Name: name, IP: host_ip, Helper: NewEtcdHelper(), StopCha: make(chan int, 10), //Lock: new(sync.Mutex), } } func (cc *ControllerClient) Init(display bool) { go func() { cc.OnLine() for { select { case <-cc.StopCha: fmt.Printf("online goroutinue is exited.") return case <-time.After(time.Second * cc.Period): cc.OnLine() } } }() if display { go func() { watch_chan := cc.Helper.Listen("/ingress/agent") for wresp := range watch_chan { for _, ev := range wresp.Events { fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }() } } func (cc *ControllerClient) OnLine() { key := fmt.Sprintf("/ingress/controller/%s", cc.Name) //cc.Lock.Lock() //defer cc.Lock.Unlock() err := cc.Helper.PutValue(key, "1", 5) if err != nil { fmt.Printf(err.Error()) } } func (cc *ControllerClient) OffLine() { close(cc.StopCha) key := fmt.Sprintf("/ingress/controller/%s", cc.Name) //cc.Lock.Lock() //defer cc.Lock.Unlock() err := cc.Helper.DelValue(key) if err != nil { fmt.Printf(err.Error()) } } func (cc *ControllerClient) GetIngressConfig(env_uuid string, uuid string) []EtcdData { //TODO. first save to mysql. key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid) //cc.Lock.Lock() //defer cc.Lock.Unlock() return cc.Helper.GetValue(key) } func (cc *ControllerClient) SetIngressConfig(env_uuid string, uuid string, config string) { //TODO. first save to mysql. key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid) //cc.Lock.Lock() //defer cc.Lock.Unlock() err := cc.Helper.SetValue(key, config) if err != nil { fmt.Printf(err.Error()) } } func (cc *ControllerClient) DelIngressConfig(env_uuid string, uuid string) { //TODO. first update to mysql. key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid) //cc.Lock.Lock() //defer cc.Lock.Unlock() err := cc.Helper.DelValue(key) if err != nil { fmt.Printf(err.Error()) } } 5.3 agent代码实现 1.agent上线,下线功能
2.agent定时发送心跳包到etcd.
3.agent监听(watch) controller的变化.(1-3)完成服务注册与发现
4.agnet监听(watch) ingress_config变化的agent,实时完成更新或设置配置,删除配置功能. //////////////////////////////////////////////////////////////////////////////////////////// type AgentClient struct { LivePeriod time.Duration FirstConfigPerid time.Duration SyncConfigPeriod time.Duration Name string EnvUUID string IP string Helper *EtcdHelper StopCha chan struct{} } func NewAgentClient(name string, env_uuid string, host_ip string) *AgentClient { return &AgentClient{ LivePeriod: 2, FirstConfigPerid: 3, SyncConfigPeriod: 60, Name: name, EnvUUID: env_uuid, IP: host_ip, Helper: NewEtcdHelper(), StopCha: make(chan struct{}, 1), } } func (ac *AgentClient) Init(display bool) { //我还活着,不要干掉我. go func() { ac.OnLine() for { select { case <-ac.StopCha: return case <-time.After(time.Second *ac.LivePeriod): ac.OnLine() } } }() //if display { // go func() { // watch_chan := cc.Helper.Listen("/ingress/agent") // for wresp := range watch_chan { // for _, ev := range wresp.Events { // fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value) // } // } // }() //} //重启之后,第一次同步 和 定期同步. //go func() { // // time.Sleep(time.Second * ac.FirstConfigPerid) // ac.SyncIngressConfigs() // // for { // select { // case <-ac.StopCha: // return // case <-time.After(time.Second * ac.SyncConfigPeriod): // ac.SyncIngressConfigs() // } // } //}() if display { //监听controller变化(等待处理掉线自动重连后,重监听) go func() { watch_chan := ac.Helper.Listen("/ingress/controller") for wresp := range watch_chan { for _, ev := range wresp.Events { fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }() } //监听本环境下ingress_config的变化(等待处理掉线自动重连, 重监听) go func() { key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID) watch_chan := ac.Helper.Listen(key) for wresp := range watch_chan { for _, ev := range wresp.Events { fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value) switch ev.Type.String() { case "PUT": fmt.Printf("agent=%s SetIngressConfig(%s, %s)\\n", ac.Name, ev.Kv.Key, ev.Kv.Value) //TODO: SetIngressConfig(key, value) break case "DELETE": fmt.Printf("agent=%s DelIngressConfig(%s)\\n", ac.Name, ev.Kv.Key) //TODO: DelIngressConfig(key) break } } } }() } func (ac *AgentClient) OnLine() { key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name) err := ac.Helper.PutValue(key, fmt.Sprintf(`{"name":"%s", "env_uuid":"%s", "ip":"%s"}`, ac.Name, ac.EnvUUID, ac.IP), 5) if err != nil { fmt.Printf(err.Error()) } } func (ac *AgentClient) OffLine() { //ac.StopCha <- 1 close(ac.StopCha) key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name) err := ac.Helper.DelValue(key) if err != nil { fmt.Printf(err.Error()) } } func (ac *AgentClient) UpdateIngressStatus(uuid string) { key := fmt.Sprintf("/ingress/ingress_config_status/%s/%s/%s", ac.EnvUUID, uuid, ac.Name) err := ac.Helper.DelValue(key) if err != nil { fmt.Printf(err.Error()) } } //服务重启之后,第一次先调用 并用 定时同步 func (ac *AgentClient) SyncIngressConfigs() { key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID) kv_slice := ac.Helper.GetValue(key) if kv_slice != nil { //TODO: ingressConfig.SyncIngressConfigs(kv_slice) for _, kv := range kv_slice { fmt.Printf("name=%s, key:%s-----value:%s\\n", ac.Name, kv.Key, kv.Value) } } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func main() { controller1 := NewControllerClient("dev-7_001", "192.168.0.10") controller1.Init(false) controller2 := NewControllerClient("dev-8_002", "192.168.0.11") controller2.Init(false) controller3 := NewControllerClient("dev-8_003", "192.168.0.12") controller3.Init(false) agent1 := NewAgentClient("dev-1_001", "1", "192.168.0.1") agent1.Init(false) agent2 := NewAgentClient("dev-2_001", "1", "192.168.0.2") agent2.Init(false) agent3 := NewAgentClient("dev-3_001", "1", "192.168.0.3") agent3.Init(false) agent4 := NewAgentClient("dev-4_001", "1", "192.168.0.4") agent4.Init(false) agent5 := NewAgentClient("dev-5_001", "1", "192.168.0.5") agent5.Init(false) agent6 := NewAgentClient("dev-6_001", "1", "192.168.0.6") agent6.Init(false) agent7 := NewAgentClient("dev-7_001", "1", "192.168.0.7") agent7.Init(false) agent8 := NewAgentClient("dev-8_001", "1", "192.168.0.8") agent8.Init(false) agent9 := NewAgentClient("dev-9_001", "1", "192.168.0.9") agent9.Init(false) agent10 := NewAgentClient("dev-10_001", "1", "192.168.0.10") agent10.Init(false) time.Sleep(time.Second*1) controller3.SetIngressConfig("1", "0001", `{"config":"helloworld"}`) controller3.DelIngressConfig("1", "0001") controller3.SetIngressConfig("1", "0002", `{"config":"helloworld"}`) controller3.DelIngressConfig("1", "0002") controller3.SetIngressConfig("1", "0003", `{"config":"helloworld"}`) controller3.DelIngressConfig("1", "0003") controller3.SetIngressConfig("1", "0004", `{"config":"helloworld"}`) controller3.DelIngressConfig("1", "0004") controller3.SetIngressConfig("1", "0005", `{"config":"helloworld"}`) controller3.DelIngressConfig("1", "0005") forever := make(chan struct{}) <-forever }
|