Go:tail,sarama,etcd,elasticsearch

时间:March 1, 2020 分类:

目录:

tail

tail包是github.com/hpcloud/tail,用于读取日志的文件

基本使用如下所示

package main

import (
    "time"
    "fmt"
    "github.com/hpcloud/tail"
)

func main() {
    fileName := "./my.log"
    config := tail.Config{
        ReOpen: true, // 重新打开
        Follow: true, // 是否跟随原文件没有读取完的东西
        Location: &tail.SeekInfo{
            Offset: 0, 
            Whence: 2,
        },  // 从文件那个位置读取
        MustExist: false, // 是否日志文件必须存在
        Poll: true, // 
    }
    tails, err := tail.TailFile(fileName, config)
    if err != nil {
        fmt.Println("tail file failed, err:", err)
        return
    }
    var (
        line *tail.Line
        ok  bool
    )
    for {
        line, ok = <-tails.Lines
        if !ok {
            fmt.Printf("tail file close reopen, filename: %s\n", tails.Filename)
            time.Sleep(time.Second)
            continue
        }
        fmt.Println("line: ", line.Text)
    }
}

执行后在./my.log写入日志即可

$ go run main.go 
2020/03/01 01:26:03 Waiting for ./my.log to appear...
2020/03/01 01:26:16 Seeked ./my.log - &{Offset:0 Whence:2}
2020/03/01 01:26:27 Re-opening moved/deleted file ./my.log ...
2020/03/01 01:26:27 Successfully reopened ./my.log
line:  123
line:  456

sarama

sarama是github/Shopify/sarama,用于对kafka进行读写

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

func main() {
        // 初始化kafka producer
        config := sarama.NewConfig()
        // 配置
        config.Producer.RequiredAcks = sarama.WaitForAll // 发送数据需要leader和follow都确认
        config.Producer.Partitioner = sarama.NewRandomPartitioner // 使用随机的partition
        config.Producer.Return.Successes = true // 成功写入的消息在successes channal返回

        // 连接kafka
        client, err := sarama.NewSyncProducer([]string{"172.16.129.126:9092"}, config)
        if err != nil {
                fmt.Println("producer closed, err" , err)
                return
        }

        msg := &sarama.ProducerMessage{}
        msg.Topic = "web_log"
        msg.Value = sarama.StringEncoder("test")
        defer client.Close()
        pid, offset, err := client.SendMessage(msg)
        if err != nil {
                fmt.Println("send msg failed, err", err)
                return
        }
        fmt.Printf("pid: %v, offset: %v\n", pid, offset)
}

执行结果

$ go run main.go 
pid: 0, offset: 0

通过kafka的console读取

$ ./bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181/kafka --topic web_log --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
test

从kafka中读取数据

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

// kafka consumer

func main() {
    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
        return
    }
    partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
    if err != nil {
        fmt.Printf("fail to get list of partition:err%v\n", err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList { // 遍历所有的分区
        // 针对每个分区创建一个对应的分区消费者
        pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
            return
        }
        defer pc.AsyncClose()
        // 异步从每个分区消费信息
        go func(sarama.PartitionConsumer) {
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
            }
        }(pc)
    }
}

etcd

etcd的安装步骤参考github

如果出现

# github.com/coreos/etcd/clientv3/balancer/picker
../../pkg/mod/github.com/coreos/etcd@v3.3.18+incompatible/clientv3/balancer/picker/err.go:37:44: undefined: balancer.PickOptions
../../pkg/mod/github.com/coreos/etcd@v3.3.18+incompatible/clientv3/balancer/picker/roundrobin_balanced.go:55:54: undefined: balancer.PickOptions
# github.com/coreos/etcd/clientv3/balancer/resolver/endpoint
../../pkg/mod/github.com/coreos/etcd@v3.3.18+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:114:78: undefined: resolver.BuildOption
../../pkg/mod/github.com/coreos/etcd@v3.3.18+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:182:31: undefined: resolver.ResolveNowOption

将grpc的依赖版本由v1.27.0改为v1.26.0,参考别人博客

package main

import (
    "context"
    "fmt"
    "time"
    "go.etcd.io/etcd/clientv3"
)


func main() {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints: []string{"127.0.0.1:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        fmt.Printf("connect to etcd failed, err: %v\n", err)
        return
    }
        fmt.Println("connect to etcd success")
    defer cli.Close()
    // Put
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    _, err = cli.Put(ctx, "why", "wanghongyu")
    cancel()
    if err != nil {
        fmt.Printf("put to etcd failed, err: %v\n", err)
        return
    }
        fmt.Println("put to etcd success")
    // Get
    ctx, cancel = context.WithTimeout(context.Background(), time.Second)
    resp, err := cli.Get(ctx, "why")
    cancel()
    if err != nil {
        fmt.Printf("get from etcd failed, err: %v\n", err)
        return
    }
        fmt.Printf("get from etcd success")
    for _, ev := range resp.Kvs {
        fmt.Printf("%s: %s\n", ev.Key, ev.Value)
    }
}

运行结果

connect to etcd success
put to etcd success
get from etcd successwhy: wanghongyu

watch

package main

import (
    "context"
    "fmt"
    "time"
    "go.etcd.io/etcd/clientv3"
)


func main() {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints: []string{"127.0.0.1:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        fmt.Printf("connect to etcd failed, err: %v\n", err)
        return
    }
        fmt.Println("connect to etcd success")
    defer cli.Close()
    // Watch
    ch := cli.Watch(context.Background(), "why")
    for wresp := range ch {
        for _, ev := range wresp.Events {
            fmt.Printf("Type:%v , %v: %v\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
        }
    }
}

启动会夯住

connect to etcd success

另起一个终端修改数据

$ ./etcdctl --endpoints=http://127.0.0.1:2379 get why
why
wanghongyu
$ ./etcdctl --endpoints=http://127.0.0.1:2379 put why 123456
OK

会显示修改的类型和值

connect to etcd success
Type:PUT , why: 123456

Etcd

package main

import (
    "context"
    "fmt"
    "github.com/olivere/elastic/v7"
)

type Person struct {
    Name    string `json:"name"`
    Age     int    `json:"age"`
    Married bool   `json:"married"`
}

func main() {
    client, err := elastic.NewClient(elastic.SetURL("http://192.168.1.7:9200"))
    if err != nil {
        // Handle error
        panic(err)
    }

    fmt.Println("connect to es success")
    p1 := Person{Name: "rion", Age: 22, Married: false}
    put1, err := client.Index().
        Index("user").
        BodyJson(p1).
        Do(context.Background())
    if err != nil {
        // Handle error
        panic(err)
    }
    fmt.Printf("Indexed user %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
}