目录
一.下载JDK
二.下载kafka
三.启动
四.示例
1.创建工程logagent,在其目录中创建kafka目录和taillog目录
2.kafka.go
3.taillog.go
4.main.go
5.编译运行
一.下载JDK
这里下载JDK8,要与kafka版本适配,因为我第一次下载的JDK22,kafka_server运行有问题。
下载地址:
Java Downloads | Oracle
配置环境环境变量:
二.下载kafka
官方地址:
Apache Kafka
找到这个版本,下载2.11-2.3.0.tgz
三.启动
1.进入D:\kafka_2.12-2.3.0目录下 (我是下载在D盘的)
输入命令的时候可以使用Tab提示
2.在此路径再开启一个cmd 输入红色命令
此时server和zookeeper都启动了
四.示例
1.创建工程logagent,在其目录中创建kafka目录和taillog目录
kafka下创建kafka.go taillog下创建taillog.go 在logagent根目录下创建main.go
配置ini文件
在当前目录下创建一个config文件夹,放置一个config.ini文件
#config.ini
[kafka]
address = 127.0.0.1:9092
topic = web_log[taillog]
path=./my.log
下载第三方库:
go get github.com/IBM/sarama
go get github.com/hpcloud/tail
go get gopkg.in/ini.v1
2.kafka.go
package kafkaimport ("fmt""github.com/IBM/sarama"
)// 专门往kafka写日志的模块
var (client sarama.SyncProducer //声明了一个全局的kafka连接
)// 初始化client
func Init(addr []string) (err error){config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partitionconfig.Producer.Return.Successes = true // 成功交付的消息将在success channel返回client, err = sarama.NewSyncProducer(addr, config)if err != nil {fmt.Println("producer closed, err:", err)return} return
}func SendToKafka(topic,data string){msg := &sarama.ProducerMessage{}msg.Topic = "web_log"msg.Value = sarama.StringEncoder(data)//发送到kafkapid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send msg failed, err:", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
3.taillog.go
package taillogimport ("fmt""github.com/hpcloud/tail"
)// 专门收集日志文件模块
var tails *tail.Tailfunc Init(filename string) (err error) {config := tail.Config{ReOpen: true,Follow: true,Location: &tail.SeekInfo{Offset: 0, Whence: 2},MustExist: false,Poll: true,}tails, err = tail.TailFile(filename, config)if err != nil {fmt.Printf("tail %s failed, err:%v\n", filename, err)return}return
}func ReadChan() <-chan *tail.Line {return tails.Lines
}
4.main.go
package mainimport ("fmt""mymod/studygo/logagent/kafka""mymod/studygo/logagent/taillog""time""gopkg.in/ini.v1"
)func run(topic string) {//读取日志//发送到kafkafor {select {case line := <-taillog.ReadChan():kafka.SendToKafka(topic, line.Text)default:time.Sleep(time.Second * 1)}}}
func main() {//加载配置文件cfg, err := ini.Load("./config/config.ini")if err != nil {fmt.Println("load config failed,err:", err)return}address := cfg.Section("kafka").Key("address").String()topic := cfg.Section("kafka").Key("topic").String()taillogPath := cfg.Section("taillog").Key("path").String()//初始化kafka连接err = kafka.Init([]string{address})if err != nil {fmt.Println("init kafka failed,err:", err)return}fmt.Println("init kafka scucess!")//打开日志文件准备收集日志err = taillog.Init(taillogPath)if err != nil {fmt.Println("init taillog failed,err:", err)return}fmt.Println("init taillog scucess!")run(topic)
}
5.编译运行
如果此时该目录下没有my.log,会等待。不要停掉程序!
此时我们创建一个my.log并且输入内容
在CMD中使用命令:模拟消费者
bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 -topic=web_log --from-beginning
就可以看到从日志中读取的内容了。