当前位置: 首页> 游戏> 手游 > 营销型网站建设极速建站_线上投票怎么做_百度互联网营销是什么_湖南长沙疫情最新消息

营销型网站建设极速建站_线上投票怎么做_百度互联网营销是什么_湖南长沙疫情最新消息

时间:2025/7/12 0:03:59来源:https://blog.csdn.net/z1zyy/article/details/146315413 浏览次数:1次
营销型网站建设极速建站_线上投票怎么做_百度互联网营销是什么_湖南长沙疫情最新消息

目录

一.下载JDK

二.下载kafka

 三.启动

四.示例

1.创建工程logagent,在其目录中创建kafka目录和taillog目录

2.kafka.go

3.taillog.go

4.main.go

5.编译运行


一.下载JDK

这里下载JDK8,要与kafka版本适配,因为我第一次下载的JDK22,kafka_server运行有问题。

下载地址:

Java Downloads | Oracle

配置环境环境变量:

二.下载kafka

官方地址:

Apache Kafka

找到这个版本,下载2.11-2.3.0.tgz

 三.启动

1.进入D:\kafka_2.12-2.3.0目录下 (我是下载在D盘的)

输入命令的时候可以使用Tab提示

2.在此路径再开启一个cmd 输入红色命令

此时server和zookeeper都启动了

四.示例

1.创建工程logagent,在其目录中创建kafka目录和taillog目录

kafka下创建kafka.go  taillog下创建taillog.go  在logagent根目录下创建main.go

配置ini文件

在当前目录下创建一个config文件夹,放置一个config.ini文件

#config.ini

[kafka]
address = 127.0.0.1:9092
topic = web_log[taillog]
path=./my.log

下载第三方库:

go get github.com/IBM/sarama

go get github.com/hpcloud/tail

go get gopkg.in/ini.v1

2.kafka.go

package kafkaimport ("fmt""github.com/IBM/sarama"
)// 专门往kafka写日志的模块
var (client sarama.SyncProducer //声明了一个全局的kafka连接
)// 初始化client
func Init(addr []string) (err error){config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partitionconfig.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回client, err = sarama.NewSyncProducer(addr, config)if err != nil {fmt.Println("producer closed, err:", err)return} return
}func SendToKafka(topic,data string){msg := &sarama.ProducerMessage{}msg.Topic = "web_log"msg.Value = sarama.StringEncoder(data)//发送到kafkapid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send msg failed, err:", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

3.taillog.go

package taillogimport ("fmt""github.com/hpcloud/tail"
)// 专门收集日志文件模块
var tails *tail.Tailfunc Init(filename string) (err error) {config := tail.Config{ReOpen:    true,Follow:    true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},MustExist: false,Poll:      true,}tails, err = tail.TailFile(filename, config)if err != nil {fmt.Printf("tail %s failed, err:%v\n", filename, err)return}return
}func ReadChan() <-chan *tail.Line {return tails.Lines
}

4.main.go

package mainimport ("fmt""mymod/studygo/logagent/kafka""mymod/studygo/logagent/taillog""time""gopkg.in/ini.v1"
)func run(topic string) {//读取日志//发送到kafkafor {select {case line := <-taillog.ReadChan():kafka.SendToKafka(topic, line.Text)default:time.Sleep(time.Second * 1)}}}
func main() {//加载配置文件cfg, err := ini.Load("./config/config.ini")if err != nil {fmt.Println("load config failed,err:", err)return}address := cfg.Section("kafka").Key("address").String()topic := cfg.Section("kafka").Key("topic").String()taillogPath := cfg.Section("taillog").Key("path").String()//初始化kafka连接err = kafka.Init([]string{address})if err != nil {fmt.Println("init kafka failed,err:", err)return}fmt.Println("init kafka scucess!")//打开日志文件准备收集日志err = taillog.Init(taillogPath)if err != nil {fmt.Println("init taillog failed,err:", err)return}fmt.Println("init taillog scucess!")run(topic)
}

5.编译运行

如果此时该目录下没有my.log,会等待。不要停掉程序!

此时我们创建一个my.log并且输入内容

在CMD中使用命令:模拟消费者

bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 -topic=web_log --from-beginning

就可以看到从日志中读取的内容了。

关键字:营销型网站建设极速建站_线上投票怎么做_百度互联网营销是什么_湖南长沙疫情最新消息

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: