Go:tail,sarama,etcd,elasticsearch
目录:
tail
tail包是github.com/hpcloud/tail
,用于读取日志的文件
基本使用如下所示
package main
import (
"time"
"fmt"
"github.com/hpcloud/tail"
)
func main() {
fileName := "./my.log"
config := tail.Config{
ReOpen: true, // 重新打开
Follow: true, // 是否跟随原文件没有读取完的东西
Location: &tail.SeekInfo{
Offset: 0,
Whence: 2,
}, // 从文件那个位置读取
MustExist: false, // 是否日志文件必须存在
Poll: true, //
}
tails, err := tail.TailFile(fileName, config)
if err != nil {
fmt.Println("tail file failed, err:", err)
return
}
var (
line *tail.Line
ok bool
)
for {
line, ok = <-tails.Lines
if !ok {
fmt.Printf("tail file close reopen, filename: %s\n", tails.Filename)
time.Sleep(time.Second)
continue
}
fmt.Println("line: ", line.Text)
}
}
执行后在./my.log写入日志即可
$ go run main.go
2020/03/01 01:26:03 Waiting for ./my.log to appear...
2020/03/01 01:26:16 Seeked ./my.log - &{Offset:0 Whence:2}
2020/03/01 01:26:27 Re-opening moved/deleted file ./my.log ...
2020/03/01 01:26:27 Successfully reopened ./my.log
line: 123
line: 456
sarama
sarama是github/Shopify/sarama
,用于对kafka进行读写
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
// 初始化kafka producer
config := sarama.NewConfig()
// 配置
config.Producer.RequiredAcks = sarama.WaitForAll // 发送数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 使用随机的partition
config.Producer.Return.Successes = true // 成功写入的消息在successes channal返回
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"172.16.129.126:9092"}, config)
if err != nil {
fmt.Println("producer closed, err" , err)
return
}
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("test")
defer client.Close()
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err", err)
return
}
fmt.Printf("pid: %v, offset: %v\n", pid, offset)
}
执行结果
$ go run main.go
pid: 0, offset: 0
通过kafka的console读取
$ ./bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181/kafka --topic web_log --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
test
从kafka中读取数据
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
// kafka consumer
func main() {
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍历所有的分区
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 异步从每个分区消费信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
}
etcd
etcd的安装步骤参考github
如果出现
# github.com/coreos/etcd/clientv3/balancer/picker
../../pkg/mod/github.com/coreos/etcd@v3.3.18+incompatible/clientv3/balancer/picker/err.go:37:44: undefined: balancer.PickOptions
../../pkg/mod/github.com/coreos/etcd@v3.3.18+incompatible/clientv3/balancer/picker/roundrobin_balanced.go:55:54: undefined: balancer.PickOptions
# github.com/coreos/etcd/clientv3/balancer/resolver/endpoint
../../pkg/mod/github.com/coreos/etcd@v3.3.18+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:114:78: undefined: resolver.BuildOption
../../pkg/mod/github.com/coreos/etcd@v3.3.18+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:182:31: undefined: resolver.ResolveNowOption
将grpc的依赖版本由v1.27.0改为v1.26.0,参考别人博客
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Printf("connect to etcd failed, err: %v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// Put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "why", "wanghongyu")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err: %v\n", err)
return
}
fmt.Println("put to etcd success")
// Get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "why")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err: %v\n", err)
return
}
fmt.Printf("get from etcd success")
for _, ev := range resp.Kvs {
fmt.Printf("%s: %s\n", ev.Key, ev.Value)
}
}
运行结果
connect to etcd success
put to etcd success
get from etcd successwhy: wanghongyu
watch
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Printf("connect to etcd failed, err: %v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// Watch
ch := cli.Watch(context.Background(), "why")
for wresp := range ch {
for _, ev := range wresp.Events {
fmt.Printf("Type:%v , %v: %v\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
}
}
}
启动会夯住
connect to etcd success
另起一个终端修改数据
$ ./etcdctl --endpoints=http://127.0.0.1:2379 get why
why
wanghongyu
$ ./etcdctl --endpoints=http://127.0.0.1:2379 put why 123456
OK
会显示修改的类型和值
connect to etcd success
Type:PUT , why: 123456
Etcd
package main
import (
"context"
"fmt"
"github.com/olivere/elastic/v7"
)
type Person struct {
Name string `json:"name"`
Age int `json:"age"`
Married bool `json:"married"`
}
func main() {
client, err := elastic.NewClient(elastic.SetURL("http://192.168.1.7:9200"))
if err != nil {
// Handle error
panic(err)
}
fmt.Println("connect to es success")
p1 := Person{Name: "rion", Age: 22, Married: false}
put1, err := client.Index().
Index("user").
BodyJson(p1).
Do(context.Background())
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("Indexed user %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
}