0x00
想实现一个完备的微服务架构,于是就开始了。
0x01
设想应该具备的功能:
- 主从分布式集群结构
- 所有节点能互相通信,具有心跳包检测、远程服务调用、http代理、主节点选举功能
- 任务推送、任务监控、结果推送
- 子节点同时作为缓存服务器
- 作为系统服务运行
有点类似OSPF路由的思想
0x02
很好,依据设想画出以下架构图:
0x03
接着开始写gRPC服务(communicate.proto),主要包含任务类、心跳类、节点选举类rpc。作为示例,就简单的将抓取目标源码作为任务了。
syntax = "proto3";
package communicate;
service Tasks{
rpc GetTask(ID) returns (Task){}
rpc ListTask(Query) returns (ID){}
rpc CreateTask(Task) returns (OptResult){}
// rpc NotifyTask(ID)
rpc DelTask(ID) returns (OptResult){}
rpc KillTask(ID) returns (OptResult){}
rpc GetTaskResult(ID) returns (TaskResult){}
}
message ID{
repeated string id = 1;
}
message Task{
string id = 1;
string name = 2;
string target = 3;
// we can change type field 2 enum type when we know exactly what types of tasks are
string type = 4;
enum State {
WAITING = 0;
INQUEUE = 1;
RUNNING = 2;
RUNTIME_ERROR = 3;
SUCCEED = 4;
KILLED = 5;
}
State state = 5;
repeated TaskResult results = 6;
}
message TaskResult{
string code = 1;
}
message Query {
repeated int32 state = 1;
}
message OptResult{
map <string, bool> succeed = 1;
}
service HeartBeats{
rpc IsAlive(HeartBeat) returns (OK){}
rpc AskAlive(Node) returns (OK){}
}
message OK{
bool ok = 1;
}
message HeartBeat{
bool alive = 1;
string who = 2;
}
message Node{
string name = 1;
string token = 2;
string host = 3;
}
service Elections{
rpc RequestElection(Secret) returns (OptResult){}
rpc VerifyNode(Node) returns (OptResult){}
rpc UpdateSecret(Node) returns (OptResult){}
}
message Election{
Secret secret = 1;
Node from = 2;
}
message Secret{
string sec_code = 1;
}
编译下proto文件:
$ mkdir communicate
$ protoc --go_out=./communicate/ --go-grpc_out=./communicate/ --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative communicate.proto
0x04
接着开始做web层的东西,准备用gin和vue做了
web.go:
package main
import (
"os"
"fmt"
"log"
"net/http"
"context"
"github.com/go-redis/redis/v8"
"github.com/satori/go.uuid"
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
pb "../communicate"
"../conf"
)
var ctx_ = context.Background()
var rdb = redis.NewClient(&redis.Options{
Addr: conf.RedisAddr,
Password: conf.RedisPwd,
DB: conf.RedisDB,
})
var conn, _ = grpc.Dial("localhost:6666", grpc.WithInsecure())
var task_client = pb.NewTasksClient(conn)
type Ids struct{
Vals []string `json:"ids"`
}
func list_task(c *gin.Context){
res, err := task_client.ListTask(ctx_, &pb.Query{})
if err != nil{
log.Print(err)
}
tasks := []pb.Task{}
for _, id := range res.Id{
task := rdb.HGetAll(ctx_, id).Val()
tasks = append(tasks, pb.Task{Id: id, Name: task["name"], State: 1})
}
c.HTML(http.StatusOK, "index.html", gin.H{
"title": "Task center",
"tasks": tasks,
"total": len(tasks),
})
}
func create_task(c *gin.Context){
name := c.PostForm("name")
target := c.PostForm("target")
if name == "" || target == ""{
c.HTML(200, "create_task.html", gin.H{
"title": "Create Task",
"err_msg": "name and target is required !",
})
} else {
var task pb.Task
task.Name = name
task.Target = target
task.State = 0
task.Type = c.PostForm("type")
id, err := uuid.NewV4()
if err != nil {
fmt.Printf("Something went wrong: %s", err)
return
}
task.Id = fmt.Sprintf("%s", id)
res, err := task_client.CreateTask(ctx_, &task)
fmt.Printf("%v\n", res)
if err != nil{
log.Print(err)
}
c.HTML(200, "create_task.html", gin.H{
"title": "Create Task",
})
}
}
func del_task(c *gin.Context){
ids := Ids{}
c.BindJSON(&ids)
var ids_ pb.ID
for _, id := range ids.Vals{
ids_.Id = append(ids_.Id, id)
}
res, err := task_client.DelTask(ctx_, &ids_)
if err != nil{
log.Print(err)
}
c.JSON(200, res.Succeed)
}
func main(){
r := gin.Default()
workdir := os.Getenv("WORKDIR")
r.LoadHTMLGlob(workdir + "templates/*")
r.Static("js", workdir + "js")
defer conn.Close()
_ = pb.NewTasksClient(conn)
r.GET("/index", list_task)
r.GET("/", list_task)
r.GET("/create_task", func(c *gin.Context){
c.HTML(200, "create_task.html", gin.H{
"title": "Create Task",
})
})
r.POST("/create_task", create_task)
r.POST("/del_task", del_task)
r.Run(":8080")
}
templates/index.html:
<html>
<head>
{{ block "css.tmpl" . }}{{end}}
<title>{{ .title }}</title>
</head>
<body>
{{ block "nav.tmpl" . }}{{end}}
<section class="content" id="showcontent">
<form method="POST" action="/del_task" id="opt_task">
<div class="row">
<div class="col-md-12">
<div class="box box-primary">
<div class="box-header">
<h3 class="box-title">任务列表</h3>
</div>
<div class="box-body table-responsive no-padding">
<table class="table table-hover">
<tbody>
<tr>
<th>
<input type="checkbox" id="select_all"> 全选/反选
</th>
<th>ID</th>
<th>任务名</th>
<th>状态</th>
</tr>
{{ range $i, $task := .tasks }}
<tr>
<td id="{{ $task.Id }}">
<input type="checkbox" class="custom-control-input"
name="box" v-model="task" value='{{$task.Id}}' />
</td>
<td class="td_bd">
<a href="#"> {{$task.Id}} </a>
</td>
<td> {{ $task.Name }} </td>
<td> {{ $task.State }} </td>
</tr>
{{ end }}
</tbody>
</table>
</div>
</div>
</div>
</div>
<h3>总任务: <b id="sum"> {{ .total }} </b></h3>
<a href="#" class="btn btn-primary" role="button" id="stop" name="buy_or_del">
<span class="glyphicon glyphicon-ok"></span> 停止任务</a>      
<a href="" class="btn btn-danger" role="button" @click="del([[task]])">
<span class="glyphicon glyphicon-trash" ></span> 删除任务</a>
</form>
</section>
{{ block "footer.tmpl" . }}{{end}}
<script src="js/tasks.js"></script>
</body>
</html>
js/tasks.js:
Vue.createApp({
delimiters: ['[[', ']]'],
data(){
return {
task: []
}
},
methods: {
del(task){
axios.post('/del_task', {
ids: this.task
}).then(function(res){
var data = res.data;
for (id in data){
if (data[id]){
console.log('deleted '+id);
} else {
console.log('failed 2 delete '+id);
}
}
}).catch(function(err){
console.log(err);
});
},
select_all(){
// TODO
}
}
}).mount('#opt_task');
哇~, 前后端通讯好麻烦,遇到一堆问题。。。
先是vue的app挂载不上DOM节点,后来将vue的script放在body最下面解决
然后是axios不能发起post请求,发现一是修改js代码后没刷新浏览器缓存,二是使用了未定义数据,三是点击事件绑定的DOM节点绑错了。。。
然后修改绑定节点,使用this获取数据,刷新本地缓存,才看到post过来
接着虽然请求发出了,但是gin并没有收到想要数据,查了半天也没找到哪里出问题了, 查了gin的文档,切换Form和FormArray都拿不到数据。没办法只有用大杀器wireshark抓包了,发现axios传过来的数据是json并且想要的数据还嵌套了几层, 所以让gin绑定json结构体去获取数据,切换vue的点击事件方法调用为原生js的方法调用,解决。
0x05
接着写grpc的Server端,数据相关就直接用redis搞了(server.go):
package main
import (
"log"
"net"
"context"
"google.golang.org/grpc"
pb "../communicate"
"../conf"
"../utils"
)
var ctx_ = context.Background()
type Task map[string]interface{}
type TaskServer struct{
*pb.UnimplementedTasksServer
}
func (s *TaskServer) GetTask(ctx context.Context, id *pb.ID) (*pb.Task, error){
// TODO:
// find task from redis or mongo
return &pb.Task{}, nil
}
func (s *TaskServer) ListTask(ctx context.Context, query *pb.Query) (*pb.ID, error){ // TODO:
var task_ids pb.ID
if query.State == nil{
keys := conf.Rdb.Keys(ctx_, "*").Val()
for _, key := range keys{
//task := conf.Rdb.HGetAll(ctx_, key).Val()
task_ids.Id = append(task_ids.Id, key)
}
}
return &task_ids, nil
}
func (s *TaskServer) CreateTask(ctx context.Context, task *pb.Task) (*pb.OptResult, error){
var result pb.OptResult
id := task.Id
if err := conf.Rdb.HMSet(ctx_, id, "id", id, "name", task.Name, "Target", task.Target, "type", task.Type, "state", int32(task.State)).Err(); err != nil{
return &result, err
}
if que := utils.GetMinQue(); que != ""{
go utils.PushTask(que, id)
}
result.Succeed = make(map[string]bool)
result.Succeed[id] = true
return &result, nil
}
func (s *TaskServer) DelTask(ctx context.Context, id *pb.ID) (*pb.OptResult, error){
var result pb.OptResult
result.Succeed = make(map[string]bool)
for _, id := range id.Id{
if err := conf.Rdb.Del(ctx_, id).Err(); err == nil{
result.Succeed[id] = true
} else {
result.Succeed[id] = false
log.Printf("%v", err)
}
}
return &result, nil
}
func (s *TaskServer) KillTask(ctx context.Context, id *pb.ID) (*pb.OptResult, error){
// TODO:
return &pb.OptResult{}, nil
}
func (s *TaskServer) GetTaskResult(ctx context.Context, id *pb.ID) (*pb.TaskResult, error){
// TODO:
return &pb.TaskResult{}, nil
}
type HeartBeatServer struct{
*pb.UnimplementedHeartBeatsServer
}
func (s *HeartBeatServer) IsAlive (ctx context.Context, hb *pb.HeartBeat)(*pb.OK, error){
return &pb.OK{}, nil
}
func (s *HeartBeatServer) AskAlive (ctx context.Context, node *pb.Node)(*pb.OK, error){
var ok pb.OK
// TODO: varify
ok.Ok = true
return &ok, nil
}
type ElectionServer struct{
// TODO:
*pb.UnimplementedElectionsServer
}
func (s *ElectionServer) RequestElection(ctx context.Context, secret *pb.Secret)(*pb.OptResult, error){
// TODO:
return &pb.OptResult{}, nil
}
func (s *ElectionServer) VerifyNode(ctx context.Context, node *pb.Node)(*pb.OptResult, error){
// TODO:
return &pb.OptResult{}, nil
}
func (s *ElectionServer) UpdateNode(ctx context.Context, node *pb.Node)(*pb.OptResult, error){
// TODO:
return &pb.OptResult{}, nil
}
func main(){
lis, err := net.Listen("tcp", ":6666")
if err != nil{
log.Fatalf("failed 2 listen :6666")
}
s := grpc.NewServer()
pb.RegisterTasksServer(s, &TaskServer{})
pb.RegisterHeartBeatsServer(s, &HeartBeatServer{})
pb.RegisterElectionsServer(s, &ElectionServer{})
go utils.Work()
go utils.HandlePublic()
go utils.ImAlive()
go utils.WillUDie()
s.Serve(lis)
}
conf/settings.go :
package conf
import (
"os"
"fmt"
"log"
"io/ioutil"
"encoding/json"
"github.com/go-redis/redis/v8"
"google.golang.org/grpc"
pb "../communicate"
)
type Node struct{
Host string
Port int32
MaxTask int32
Name string
Alive bool
Token string
}
type Config struct{
RedisAddr string `json:"RedisAddr"`
RedisPwd string `json:"RedisPwd"`
RedisDB int `json:"RedisDB"`
LocalHost string `json:"LocalHost"`
Token string `json:"Token"`
Master string `json:"Master"`
}
var (
RedisAddr = "localhost:6379"
RedisPwd = ""
RedisDB = 2
TaskQue = "tasks"
Rdb *redis.Client
)
var (
Name = "master"
Master = "127.0.0.1"
NodeType = "master"
LocalHost = "127.0.0.1"
Token = "098f6bcd4621d373cade4e832627b4f6"
HeartBeatGAP int = 60
RPCDefaultPort int32 = 6666
GrpcConns = make(map[string] interface {})
TaskStubs = make(map[string] interface {})
HeartStubs = make(map[string] interface {})
ElectionStubs = make(map[string] interface {})
Nodes = map[string] Node {
"172.17.0.1": Node{
Host: "172.17.0.1",
Port: 6666,
MaxTask: 10,
Name: "node1",
Alive: true,
Token: "098f6bcd4621d373cade4e832627b4f6",
},
"172.17.0.2": Node{
Host: "172.17.0.2",
Port: 6666,
MaxTask: 10,
Alive: true,
Name: "node2",
Token: "098f6bcd4621d373cade4e832627b4f6",
},
}
PubChannel = "Public"
PubMsgPrefix = map[string] string {
"0": "Heart Beat",
"1": "Request Election",
"2": "Election Vote",
"3": "Some other thing",
}
TaskChannels = []string{"172.17.0.1", "172.17.0.2",}
)
func init(){
type_ := os.Getenv("NODETYPE")
if type_ != ""{
NodeType = type_
}
data_path := os.Getenv("DATA")
if data_path == ""{
data_path = "../conf/"
}
// load & parse config file
var config Config
cf, err := os.Open(data_path + "config.json")
if err != nil{
log.Printf("[-] open config file failed !")
log.Printf("%v", err)
} else {
defer cf.Close()
data, _ := ioutil.ReadAll(cf)
json.Unmarshal(data, &config)
Rdb = redis.NewClient(&redis.Options{
Addr: config.RedisAddr,
Password: config.RedisPwd,
DB: config.RedisDB,
})
Token = config.Token
LocalHost = config.LocalHost
Master = config.Master
}
// load & parse node list
cf, err = os.Open(data_path+"nodes.json")
if err != nil{
log.Printf("[-] open node file failed !")
log.Printf("%v", err)
} else {
defer cf.Close()
data, _ := ioutil.ReadAll(cf)
json.Unmarshal(data, &Nodes)
}
for host, node := range Nodes{
conn, err := grpc.Dial(fmt.Sprintf("%s:%v", host, node.Port), grpc.WithInsecure())
if err != nil{
log.Printf("[-] error: can not connect to %s:%v\n", host, node.Port)
} else {
GrpcConns[host] = conn
task_client := pb.NewTasksClient(conn)
TaskStubs[host] = task_client
heart_client := pb.NewHeartBeatsClient(conn)
HeartStubs[host] = heart_client
election_client := pb.NewElectionsClient(conn)
ElectionStubs[host] = election_client
}
}
}
utils/tools.go :
package utils
import (
"log"
"time"
"context"
"../conf"
)
var ctx = context.Background()
func GetMinQue() string {
var size int32 = 0
h := ""
for host, node := range conf.Nodes{
if size < node.MaxTask{
size = node.MaxTask
h = host
}
}
return h
}
func PushTask(que, task_id string){
if err := conf.Rdb.Publish(ctx, que, task_id).Err(); err != nil{
log.Printf("%v", err)
} else {
log.Printf("[*] push task(%v) 2 %v", task_id, que)
}
}
func Work(){
sub := conf.Rdb.Subscribe(ctx, "test")
//sub := conf.Rdb.Subscribe(ctx, conf.LocalHost)
for {
msg, _ := sub.ReceiveMessage(ctx)
id := msg.Payload
log.Printf("recv task: %v", id)
go start_work(GetTask(id))
}
}
func GetTask(id string) map[string]string {
result := conf.Rdb.HGetAll(ctx, id)
if result.Err() != nil {
return nil
}
return result.Val()
}
func start_work(task map[string]string){
if task != nil{
// TODO:
// do some work
log.Printf("[*] starting 2 run task: ", task["id"])
time.Sleep(10 * time.Second)
go NotifyMaster(task["id"])
go save_result()
}
}
func NotifyMaster(id string){
log.Printf("[+] I finished task: %s", id)
}
func save_result(){
// TODO
}
func HandlePublic(){
sub := conf.Rdb.Subscribe(ctx, conf.PubChannel)
for {
msg, _ := sub.ReceiveMessage(ctx)
msg_ := msg.Payload
log.Printf("recv pub msg: %v", msg_)
if len(msg_) > 2{
flag := string(msg_[0])
real_msg := msg_[2:]
if flag == "0"{
HeardPeopleBeating(real_msg)
} else if flag == "1"{
//
}
}
}
}
ok,任务主体功能下发删除算是搞定了。
单机可以正常跑起来了
gin的web服务与grpc的server通讯,然后集群间也通过grpc通讯。
然后是grpc的client端:
好吧,突然想起来client已经内置在gin层了。。。
这里主要功能点是:
- 每个节点都会读取json配置文件
- 集群中只能有一台master,其他需要为slave,节点会从NODETYPE环境变量中确定自己是什么类型,这里是为了方便配合打docker镜像使用
- 每个节点都订阅以自己host命名的信道,用于收取任务
- 配置中预留了一个Public信道
0x06
本来心跳相关也准备用grpc做的,结果发现grpc不能实现广播。啊,难受
不得已只能暂时用redis的publish功能曲线救国了-_-
这里正好可以用上面预留的Public信道实现,有先见之明 哈哈哈
心跳功能差不多了(heart.go):
package utils
import (
"fmt"
"log"
"time"
"sync"
"errors"
"encoding/json"
"google.golang.org/grpc"
pb "../communicate"
"../conf"
)
type Beat struct{
Host string
Name string
}
var (
deadline = make(map[string]int32)
m = sync.Mutex{}
)
func init(){
for _, node := range conf.Nodes{
deadline[node.Host] = 10
}
}
func ImAlive(){
for {
msg := Beat{
Host: conf.LocalHost,
Name: conf.Name,
}
msg_, _ := json.Marshal(msg)
conf.Rdb.Publish(ctx, conf.PubChannel, "0:"+string(msg_))
log.Printf("[*] I'm beating")
time.Sleep(time.Duration(conf.HeartBeatGAP) * time.Second)
}
}
func HeardPeopleBeating(msg string){
var beat Beat
if err := json.Unmarshal([]byte(msg), &beat); err != nil{
log.Printf("[-] invalid heart beat message")
} else if beat.Host != conf.LocalHost{
log.Printf("[*] heard %v's heart beat", beat.Name)
UpdateNodes(beat)
}
}
func UpdateNodes(beat Beat){
is_known_node := false
for _, node := range conf.Nodes{
if node.Host == beat.Host{
node.Alive = true
is_known_node = true
m.Lock()
if deadline[beat.Host] < 10{
deadline[beat.Host] ++
}
m.Unlock()
}
}
if !is_known_node{
go ValidateNode(beat)
}
}
func ValidateNode(beat Beat){
TryEstablish(beat)
}
func TryEstablish(beat Beat){
host := beat.Host
conn, err := grpc.Dial(fmt.Sprintf("%s:%v", host, conf.RPCDefaultPort), grpc.WithInsecure())
if err != nil{
log.Printf("[-] warning: %s is not a node.", host)
} else {
heart_client, err := AskWhoUAre(conn, beat)
if err == nil{
conf.GrpcConns[host] = conn
conf.HeartStubs[host] = heart_client
task_client := pb.NewTasksClient(conn)
conf.TaskStubs[host] = task_client
election_client := pb.NewElectionsClient(conn)
conf.ElectionStubs[host] = election_client
var node conf.Node
node.Host = host
node.Name = beat.Name
node.Port = conf.RPCDefaultPort
node.Alive = true
conf.Nodes[host] = node
} else {
defer conn.Close()
}
}
}
func AskWhoUAre(conn *grpc.ClientConn, beat Beat) (pb.HeartBeatsClient, error){
heart_client := pb.NewHeartBeatsClient(conn)
var node pb.Node
node.Name = beat.Name
node.Host = beat.Host
res, err := heart_client.AskAlive(ctx, &node)
if err == nil && res.Ok{
return heart_client, nil
}
return nil, errors.New("node is illegal")
}
func WillUDie(){
for {
time.Sleep(time.Duration(conf.HeartBeatGAP) * time.Second)
for _, node := range conf.Nodes{
m.Lock()
deadline[node.Host] --
m.Unlock()
if deadline[node.Host] <= 0{
log.Printf("[-] warning: %v is dead!", node.Host)
delete(conf.Nodes, node.Host)
delete(deadline, node.Host)
IsMasterDead(node)
}
}
}
}
func IsMasterDead(node conf.Node){
if node.Host == conf.Master{
ReqElection()
}
}
- 每分钟心跳一次
- 当收到的心跳不属于已知节点时,做节点验证,验证通过则追加到已知节点列表
- 当十分钟后还没听到已知节点的心跳时,主动验证该节点是否还活着并判断是否移出节点列表
0x07
啊,go写的好累。歇会儿,换docker玩玩。。
基于ubuntu做个镜像(Dockerfile),这里先提前用go build构建了web和rpc的程序
go1.15构建出来的elf文件大小比go1.14构建的小了2M多,6的一批
FROM ubuntu
WORKDIR /work/
ENV WORKDIR /work/
ENV GOPATH /root/go
ENV NODETYPE slave
ENV DATA /data/
RUN apt update
RUN DEBIAN_FRONTEND=noninteractive apt install -y tzdata
# RUN apt install -y golang-1.14 && ln -s /usr/lib/go-1.14/bin/go /usr/bin/go
RUN apt install -y vim iproute2 wget curl redis-server git &&\
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN mkdir -p ${WORKDIR} ${DATA}
COPY conf/*.json ${DATA}
COPY rpc/rpc web run.sh ${WORKDIR}
COPY conf/redis.conf /etc/redis/redis.conf
EXPOSE 8080
ENTRYPOINT ["sh", "/work/run.sh"]
刚开始构建镜像后发现rpc程序一直报空指针的错导致容器一直起不来,后来想到应该是redis没加
然后加了redis-server后用RUN service redis-server start也起不来redis
最后想到镜像只是一个文件系统,服务起来只是一个运行时,应该在容器起来时拉起redis
所以包裹了一层bash:
#!/bin/bash
service redis-server start
/work/web &
/work/rpc
构建镜像
$ sudo docker build -t micro:arch --no-cache .
$ sudo docker run -d -p 8080:8080 image_id
$ sudo docker logs container_id
web程序运行就直接panic,说是找不到静态文件加载,
关键是第一次运行是是可以正常加载运行的,就吃个饭的功夫,重启就加载不上了,期间也没修改过哪里的环境和代码,就很神奇。
然后用WORKDIR加资源的路径做绝对路径加载
OK, 完美运行!
0x08
再用docker-compose搭个集群试试(docker-compose.yml):
version: "3"
services:
master:
image: "micro:arch"
environment:
- NODETYPE=master
container_name: master
ports:
- "8080:8080"
expose:
- "6666"
volumes:
- "./conf:/data"
slave1:
image: micro:arch
container_name: slave1
depends_on:
- master
environment:
- NODETYPE=slave
ports:
- "8081:8080"
expose:
- "6666"
volumes:
- "./conf:/data"
slave2:
image: micro:arch
container_name: slave2
depends_on:
- master
environment:
- NODETYPE=slave
ports:
- "8082:8080"
expose:
- "6666"
volumes:
- "./conf:/data"
想要几台子节点都可以继续扩展,但是扩展的同时不能让端口冲突,相关配置文件也得改
需要注意的是compose是python的一个库
$ pip3 install docker-compose
$ docker-compose up -d
根据yaml文件我这里起了三台
0x09
然后突然想起来各节点都是用的自己的redis,需要所有节点都连到主节点redis才行。
于是修改本机redis.conf绑定到全网,通过dockerfile COPY进去,发现配置文件不兼容,跑不起来–
好吧,那就用sed实时修改配置吧
RUN sed -i "s/^bind 127.0.0.1 ::1$/bind 0.0.0.0 ::1/" /etc/redis/redis.conf
重新构建镜像,起docker compose
共用后数据都相通了
0x0A
然后得用nginx搞个反向代理,文件名随便取,后缀是conf的,如果不是conf后缀得在nginx默认配置文件里include your/conf/file
/etc/nginx/conf.d/a.conf
server {
listen 81;
server_name master;
index index index.html;
location /* {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header Source-IP $remote_addr;
}
}
这里代理到了81端口
0x0B
这里是程序运行需要加载的配置文件
config.json:
{
"LocalHost": "127.0.0.1",
"Token": "098f6bcd4621d373cade4e832627b4f6",
"RedisAddr": "127.0.0.1:6379",
"RedisPwd": "",
"RedisDB": 2,
"Master": "master"
}
nodes.json
{
"slave1": {
"Host": "slave1",
"Port": 6666,
"MaxTask": 10,
"Name": "node1",
"Alive": true,
"Token": "098f6bcd4621d373cade4e832627b4f6"
},
"slave2": {
"Host": "slave2",
"Port": 6666,
"MaxTask": 10,
"Alive": true,
"Name": "node2",
"Token": "098f6bcd4621d373cade4e832627b4f6"
}
}
0x0C
最后,如果程序跑在虚拟机或实体机里应该需要开机自启、宕机重启等功能,可以用systemd实现
rpc.service :
[Unit]
Description=it's rpc service
[Service]
Type=forking
RestartSec=60
WatchdogSec=1
Environment=NODETYPE='master' DATA='/data/' WORKDIR='dir/to/work/'
ExecStart=path/to/rpc
[Install]
WantedBy=multi-user.target
web.service :
[Unit]
Description=it's gin web service
After=rpc.service
[Service]
Type=forking
RestartSec=60
WatchdogSec=1
Environment=NODETYPE='master' DATA='/data/' WORKDIR='dir/to/work/'
ExecStart=path/to/web
[Install]
WantedBy=multi-user.target
因为web在启动之时就与grpc服务建立连接,所以web启动之前rpc程序要先启动
这里设置宕机60秒后尝试再次启动,并以fock方式启动
将service文件放到/etc/systemd/system/目录下
设置开机自启:
$ sudo systemctl enable rpc.service
$ sudo systemctl enable web.service
如果以docker方式运行,就可以不用systemd方式了,直接用compose方式做容器管理。
0x0D
最终的目录结构树如下:
$ tree .
.
├── communicate
│ ├── communicate_grpc.pb.go
│ └── communicate.pb.go
├── communicate.proto
├── conf
│ ├── config.json
│ ├── nginx.conf
│ ├── nodes.json
│ ├── rpc.service
│ ├── settings.go
│ └── web.service
├── docker-compose.yml
├── Dockerfile
├── rpc
│ ├── rpc
│ └── server.go
├── run.sh
├── utils
│ ├── election.go
│ ├── heart.go
│ └── tools.go
└── web
├── cache
├── js
│ └── tasks.js
├── templates
│ ├── create_task.html
│ ├── css.tmpl
│ ├── footer.tmpl
│ ├── index.html
│ ├── nav.tmpl
│ └── test.html
├── web
└── web.go
8 directories, 26 files
其中rpc/rpc和web/web是go build出来的elf文件