找回密码
 注册

QQ登录

只需一步,快速开始

搜索
查看: 6016|回复: 0

分布式组件etcd应用-睿云智合技术漫谈分享

[复制链接]
发表于 2018-11-14 18:22:40 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?注册

×

一、搭建etcd环境

我将使用 etcd v3版本, so,本地先建个单机版本ETCD环境。

docker pull xieyanze/etcd3:latest
docker run —name etcd-v3.0.9 -d -v /tmp:/data \\
          -p 2379:2379 -p 2380:2380 -p 4001:4001 -p 7001:7001 xieyanze/etcd3:latest
         
docker exec -it etcd-v3.0.9 sh
当前默认还是v2版本通过设定环境变量export ETCDCTL_API=3,设置成V3版本。
export ETCDCTL_API=3
etcdctl put /test/ok 11
etcdctl put /test/ok 22
etcdctl del  /test/gg
#删除所有/test前缀的节点
etcdctl del  /test --prefix
etcdctl get /test/ok
# 前缀查询
etcdctl get /test/ok --prefix

.软件逻辑结构


                               
登录/注册后可看大图

1. k8s master cluster
dev-7
dev-8

2. k8s slave cluster 1 env1
dev-1
dev-2
dev-3

3. k8s slave cluster 2 env2
dev-4
dev-5
dev-6

. controller agent 服务注册与发现


实现原理:

注意: etcd v3版本, k/v 的超时间时TTL最小5秒种.
1.每2秒钟,每个服务向etcd发送一次心跳包,证明自己还活着.
2.当服务退出时, 主动删除etcd的key. 或者等到TTL超时之后,自动下线.
3.controller需要获得agent的状态,直接GET [ingress/agent/${env_uuid}/]就能获得当前agent在线状态
4.agent需要获得controller的状态,直接GET [ingress/controller]就能获得当前controller在线状态


                               
登录/注册后可看大图

controller 目录

目录

TTL

ingress/controller/dev7_xxx

{"ip":xxx}

5

ingress/controller/dev8_xxx

{"ip":xxx}

5

agent 目录

目录

TTL

ingress/agent/env1/dev1_xxx

{"ip":xxx}

5

ingress/agent/env1/dev2_xxx

{"ip":xxx}

5

ingress/agent/env1/dev3_yyy

{"ip":xxx}

5

ingress/agent/env2/dev4_xxx

{"ip":xxx}

5

ingress/agent/env2/dev5_xxx

{"ip":xxx}

5

ingress/agent/env2/dev6_yyy

{"ip":xxx}

5

.软件业务的实现.

4.1 controller side:

1. 客户端调用controller restful api.controller 直接写入ETCD,同时写入副本到mysql.

2. controller 如果关注于agent的变化.只需要watch ingress/agent这个目录

3. controller 是无状态,不需要同步多个实例之间的数据,可以任意的scale它的实例数.

4. 如果controller挂掉之后,重启加载mysql的数据库同步到etcd中.

4.2 controller需要了解规则执行状态


                               
登录/注册后可看大图

etcd 目录

目录

TTL

ingress/ingress_config/env1/${config_uuid1}/status/dev1_xxx

1

5

ingress/ingress_config/env1/${config_uuid1}/status/dev2_xxx

1

5

ingress/ingress_config/env1/${config_uuid1}/status/dev3_xxx

1

5

ingress/ingress_config/env1/${config_uuid2}/status/dev1_xxx

1

5

ingress/ingress_config/env1/${config_uuid2}/status/dev2_xxx

1

5

ingress/ingress_config/env1/${config_uuid2}/status/dev3_xxx

1

5

ingress/ingress_config/env2/${config_uuid3}/status/dev4_xxx

1

5

ingress/ingress_config/env2/${config_uuid3}/status/dev5_xxx

1

5

ingress/ingress_config/env2/${config_uuid3}/status/dev6_xxx

1

5

agent的执行状态直接写入配置状态中,
先获得当前ingress/agent/env1目录下的agent列表,对比ingress/ingress_agent/env1/${config_uuid1}/status目录下的规则完成之后反馈列表, 每一个都存在时,则全部执行成功.

4.3 agent side:

1. 不同集群agent 通过etcd的watch功能在第一时间,获得监听到所有数据的变化 新建,删除,更新

2. 不同集群agent 定时3分钟获得自已环境下的列表信息,同步处理相关信息

3. 如果agent挂了之后, 重启加载一次etcd中所有的ingress_conifg.

. 代码实例

5.1 etcd clientv3 的封装

1.连接管理,支持TLS
2.增,删,查, 支持自动超时的设值
3.watch 监听目录或KEY的值的变化(PUT,DELETE)

package main
import (
"fmt"
"time"
// "github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
// "sync"
)
type EtcdData struct {
Key              string
Value            string
}
type EtcdHelper struct {
RequestTimeout   time.Duration
Client           *clientv3.Client
}
func NewEtcdHelper() *EtcdHelper {
//tlsInfo := transport.TLSInfo{
// CertFile:      "/tmp/test-certs/test-name-1.pem",
// KeyFile:       "/tmp/test-certs/test-name-1-key.pem",
// TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
//}
//tlsInfo := transport.TLSInfo{
// CertFile:      "./tls/apiserver.crt",
// KeyFile:       "./tls/apiserver.key",
//}
//tlsConfig, err := tlsInfo.ClientConfig()
//if err != nil {
// fmt.Printf("%s", err.Error())
// return nil
//}
//cli, err := clientv3.New(clientv3.Config{
// Endpoints: []string{"dev-7:2379"},
// DialTimeout: 3 * time.Second,
// TLS:         tlsConfig,
//})
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
//Endpoints: []string{"http://dev-7:2379"},
DialTimeout: 3 * time.Second,
})
if err != nil {
fmt.Printf("%s", err.Error())
return nil
}
return &EtcdHelper{
RequestTimeout: 5 *time.Second,
Client: cli,
}
}
func (c *EtcdHelper) Release() {
if c.Client != nil {
c.Client.Close()
}
}
func (c *EtcdHelper) PutValue(key string, value string, ttl int64) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
// minimum lease TTL is 5-second
resp, err := c.Client.Grant(context.TODO(), ttl)
if err != nil {
fmt.Printf("%s\\n", err.Error())
return err
}
_, err = c.Client.Put(ctx, key, value, clientv3.WithLease(resp.ID))
if err != nil {
fmt.Printf("%s\\n", err.Error())
return err
}
return nil
}
func (c *EtcdHelper) SetValue(key string, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
_, err := c.Client.Put(ctx, key, value)
if err != nil {
fmt.Printf("%s\\n", err.Error())
return err
}
return nil
}
func (c *EtcdHelper) GetValue(key string) []EtcdData {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
fmt.Printf("%s\\n", err.Error())
return nil
}
var kv_slice []EtcdData
for _, ev := range resp.Kvs {
//fmt.Printf("%s : %s\\n", ev.Key, ev.Value)
kv := EtcdData{string(ev.Key), string(ev.Value)}
kv_slice = append(kv_slice, kv)
}
return kv_slice
}
func (c *EtcdHelper) DelValue(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
_, err := c.Client.Delete(ctx, key, clientv3.WithPrefix())
if err != nil {
fmt.Printf("%s\\n", err.Error())
return err
}
return nil
}
func (c *EtcdHelper) Watch(key string) {
rch := c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
func (c *EtcdHelper) Listen(key string) clientv3.WatchChan {
return c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
}

5.2 controller 的代码实现

1.controller上线,下线功能
2.controller定时发送心跳包到etcd.
3.controller监听agent的变化.(1-3)完成服务注册与发现
4.controller通过下发配置到etcd,通知所有watch ingress_config变化的agent

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type ControllerClient struct {
Period   time.Duration
Name     string
IP       string
Helper   *EtcdHelper
StopCha  chan int
//Lock     *sync.Mutex
}
func NewControllerClient(name string, host_ip string) *ControllerClient {
return &ControllerClient{
Period:  2,
Name:    name,
IP:      host_ip,
Helper:  NewEtcdHelper(),
StopCha: make(chan int, 10),
//Lock:    new(sync.Mutex),
}
}
func (cc *ControllerClient) Init(display bool) {
go func() {
cc.OnLine()
for {
select {
case <-cc.StopCha:
fmt.Printf("online goroutinue is exited.")
return
case <-time.After(time.Second * cc.Period):
cc.OnLine()
}
}
}()
if display {
go func() {
watch_chan := cc.Helper.Listen("/ingress/agent")
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
}
}
func (cc *ControllerClient) OnLine() {
key := fmt.Sprintf("/ingress/controller/%s", cc.Name)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.PutValue(key, "1", 5)
if err != nil  {
fmt.Printf(err.Error())
}
}
func (cc *ControllerClient) OffLine() {
close(cc.StopCha)
key := fmt.Sprintf("/ingress/controller/%s", cc.Name)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.DelValue(key)
if err != nil  {
fmt.Printf(err.Error())
}
}
func (cc *ControllerClient) GetIngressConfig(env_uuid string, uuid string) []EtcdData {
//TODO. first save to mysql.
key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
return cc.Helper.GetValue(key)
}
func (cc *ControllerClient) SetIngressConfig(env_uuid string, uuid string, config string) {
//TODO. first save to mysql.
key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.SetValue(key, config)
if err != nil  {
fmt.Printf(err.Error())
}
}
func (cc *ControllerClient) DelIngressConfig(env_uuid string, uuid string) {
//TODO. first update to mysql.
key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.DelValue(key)
if err != nil {
fmt.Printf(err.Error())
}
}

5.3 agent代码实现

1.agent上线,下线功能
2.agent定时发送心跳包到etcd.
3.agent监听(watch) controller的变化.(1-3)完成服务注册与发现
4.agnet监听(watch) ingress_config变化的agent,实时完成更新或设置配置,删除配置功能.

////////////////////////////////////////////////////////////////////////////////////////////
type AgentClient struct {
LivePeriod         time.Duration
FirstConfigPerid   time.Duration
SyncConfigPeriod   time.Duration
Name         string
EnvUUID      string
IP           string
Helper       *EtcdHelper
StopCha      chan struct{}
}
func NewAgentClient(name string, env_uuid string, host_ip string) *AgentClient {
return &AgentClient{
LivePeriod:         2,
FirstConfigPerid:   3,
SyncConfigPeriod:   60,
Name:           name,
EnvUUID:        env_uuid,
IP:             host_ip,
Helper:         NewEtcdHelper(),
StopCha:        make(chan struct{}, 1),
}
}
func (ac *AgentClient) Init(display bool) {
//我还活着,不要干掉我.
go func() {
ac.OnLine()
for {
select {
case <-ac.StopCha:
return
case <-time.After(time.Second *ac.LivePeriod):
ac.OnLine()
}
}
}()
//if display {
// go func() {
// watch_chan := cc.Helper.Listen("/ingress/agent")
// for wresp := range watch_chan {
// for _, ev := range wresp.Events {
// fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
// }
// }
// }()
//}
//重启之后,第一次同步 和 定期同步.
//go func() {
//
// time.Sleep(time.Second * ac.FirstConfigPerid)
// ac.SyncIngressConfigs()
//
// for {
// select {
// case <-ac.StopCha:
// return
// case <-time.After(time.Second * ac.SyncConfigPeriod):
// ac.SyncIngressConfigs()
// }
// }
//}()
if display {
//监听controller变化(等待处理掉线自动重连后,重监听)
go func() {
watch_chan := ac.Helper.Listen("/ingress/controller")
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
}
//监听本环境下ingress_config的变化(等待处理掉线自动重连, 重监听)
go func() {
key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
watch_chan := ac.Helper.Listen(key)
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
switch ev.Type.String() {
case "PUT":
fmt.Printf("agent=%s SetIngressConfig(%s, %s)\\n", ac.Name, ev.Kv.Key, ev.Kv.Value)
//TODO: SetIngressConfig(key, value)
break
case "DELETE":
fmt.Printf("agent=%s DelIngressConfig(%s)\\n", ac.Name,  ev.Kv.Key)
//TODO: DelIngressConfig(key)
break
}
}
}
}()
}
func (ac *AgentClient) OnLine() {
key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
err := ac.Helper.PutValue(key, fmt.Sprintf(`{"name":"%s", "env_uuid":"%s", "ip":"%s"}`, ac.Name, ac.EnvUUID, ac.IP), 5)
if err != nil  {
fmt.Printf(err.Error())
}
}
func (ac *AgentClient) OffLine() {
//ac.StopCha <- 1
close(ac.StopCha)
key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
err := ac.Helper.DelValue(key)
if err != nil  {
fmt.Printf(err.Error())
}
}
func (ac *AgentClient) UpdateIngressStatus(uuid string) {
key := fmt.Sprintf("/ingress/ingress_config_status/%s/%s/%s", ac.EnvUUID, uuid, ac.Name)
err := ac.Helper.DelValue(key)
if err != nil  {
fmt.Printf(err.Error())
}
}
//服务重启之后,第一次先调用 并用 定时同步
func (ac *AgentClient) SyncIngressConfigs() {
key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
kv_slice := ac.Helper.GetValue(key)
if kv_slice != nil {
//TODO: ingressConfig.SyncIngressConfigs(kv_slice)
for _, kv := range kv_slice {
fmt.Printf("name=%s, key:%s-----value:%s\\n", ac.Name, kv.Key, kv.Value)
}
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func main() {
controller1 := NewControllerClient("dev-7_001", "192.168.0.10")
controller1.Init(false)
controller2 := NewControllerClient("dev-8_002", "192.168.0.11")
controller2.Init(false)
controller3 := NewControllerClient("dev-8_003", "192.168.0.12")
controller3.Init(false)
agent1 := NewAgentClient("dev-1_001", "1", "192.168.0.1")
agent1.Init(false)
agent2 := NewAgentClient("dev-2_001", "1", "192.168.0.2")
agent2.Init(false)
agent3 := NewAgentClient("dev-3_001", "1", "192.168.0.3")
agent3.Init(false)
agent4 := NewAgentClient("dev-4_001", "1", "192.168.0.4")
agent4.Init(false)
agent5 := NewAgentClient("dev-5_001", "1", "192.168.0.5")
agent5.Init(false)
agent6 := NewAgentClient("dev-6_001", "1", "192.168.0.6")
agent6.Init(false)
agent7 := NewAgentClient("dev-7_001", "1", "192.168.0.7")
agent7.Init(false)
agent8 := NewAgentClient("dev-8_001", "1", "192.168.0.8")
agent8.Init(false)
agent9 := NewAgentClient("dev-9_001", "1", "192.168.0.9")
agent9.Init(false)
agent10 := NewAgentClient("dev-10_001", "1", "192.168.0.10")
agent10.Init(false)
time.Sleep(time.Second*1)
controller3.SetIngressConfig("1", "0001", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0001")
controller3.SetIngressConfig("1", "0002", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0002")
controller3.SetIngressConfig("1", "0003", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0003")
controller3.SetIngressConfig("1", "0004", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0004")
controller3.SetIngressConfig("1", "0005", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0005")
forever := make(chan struct{})
<-forever
}

routeros
您需要登录后才可以回帖 登录 | 注册

本版积分规则

QQ|Archiver|手机版|小黑屋|软路由 ( 渝ICP备15001194号-1|渝公网安备 50011602500124号 )

GMT+8, 2025-1-22 16:09 , Processed in 0.122533 second(s), 15 queries , Gzip On, Redis On.

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表