当前位置: 首页> 健康> 养生 > 尚硅谷大数据技术-Kafka视频教程-笔记01【Kafka 入门】

尚硅谷大数据技术-Kafka视频教程-笔记01【Kafka 入门】

时间:2025/7/8 10:40:10来源:https://blog.csdn.net/weixin_44949135/article/details/136452836 浏览次数:0次

视频地址:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)_哔哩哔哩_bilibili

  1. 尚硅谷大数据技术-Kafka视频教程-笔记01【Kafka 入门】
  2. 尚硅谷大数据技术-Kafka视频教程-笔记02【Kafka 外部系统集成】
  3. 尚硅谷大数据技术-Kafka视频教程-笔记03【Kafka 生产调优手册】
  4. 尚硅谷大数据技术-Kafka视频教程-笔记04【Kafka 源码解析】

目录

01_尚硅谷大数据技术之Kafka

第 1 章 Kafka 概述

p001

p002

p003

p004

p005

第 2 章 Kafka 快速入门

p006

p007

p008

p009

第 3 章 Kafka 生产者

p010

p011

p012

p013

p014

第 4 章 Kafka Broker

第 5 章 Kafka 消费者

第 6 章 Kafka-Eagle 监控

第 7 章 Kafka-Kraft 模式


01_尚硅谷大数据技术之Kafka

第 1 章 Kafka 概述

p001

p002

p003

  1. flume:时刻监控数据文件的变化,每产生一条数据日志都能监控的到,并将数据传送到hadoop集群。
  2. kafka:数据量太大,对数据进行缓冲。
  1. 同步处理:时刻处理,一步一步地做完。
  2. 异步处理:先处理核心事务。

p004

消息队列的两种模式:

  1. 点对点模式:
    1. 只产生一个主题的数据;
    2. 数据消费后就删除了。
  2. 发布/订阅模式:
    1. 可以有多个主题的数据;
    2. 数据消费后不删除;
    3. 多个消费者相互独立。

p005

  1. zookeeper:kafka中的一部分数据存储到kafka中,zookeeper帮助kafka存储记录服务器节点运行的状态,zk记录谁是leader。
  2. kafka:数据分区存储。

第 2 章 Kafka 快速入门

p006

  1. Apache Kafka
  2. Apache Kafka

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=localhost:2181
zookeeper.connect=node001:2181,node002:2181,node003:2181/kafka

zk采用目录树进行存储,根目录下有zookeeper节点,不采用node003:2181/kafka方式进行存储的话,kafka的信息就会打散到zookeeper里面去,对kafka集群进行注销或删除的话,需要挨个删除,不利于后续管理。

[atguigu@node001 ~]$ vim /opt/module/kafka/kafka_2.12-3.0.0/config/server.properties 
[atguigu@node001 ~]$ sudo vim /etc/profile.d/my_env.sh
[atguigu@node001 ~]$ source /etc/profile
[atguigu@node001 ~]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
==================== node001 ====================
sending incremental file listsent 47 bytes  received 12 bytes  39.33 bytes/sec
total size is 1,201  speedup is 20.36
==================== node002 ====================
sending incremental file list
my_env.shsent 599 bytes  received 47 bytes  1,292.00 bytes/sec
total size is 1,201  speedup is 1.86
==================== node003 ====================
sending incremental file list
my_env.shsent 599 bytes  received 47 bytes  1,292.00 bytes/sec
total size is 1,201  speedup is 1.86
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ zookeeper.sh start
---------- zookeeper node001 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node002 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node003 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ xcall jps
=============== node001 ===============
4291 QuorumPeerMain
4346 Jps
=============== node002 ===============
3570 QuorumPeerMain
3630 Jps
=============== node003 ===============
3426 QuorumPeerMain
3478 Jps
[atguigu@node001 ~]$ cd /opt/module/kafka/kafka_2.12-3.0.0/
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-server-start.sh 
USAGE: bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@node001 kafka_2.12-3.0.0]$ jpsall 
================ node001 ================
4817 Jps
4291 QuorumPeerMain
4756 Kafka
================ node002 ================
3570 QuorumPeerMain
3724 Jps
================ node003 ================
3426 QuorumPeerMain
3564 Jps
[atguigu@node001 kafka_2.12-3.0.0]$ 

p007

#!/bin/bashcase $1 in
"start"){for i in node001 node002 node003doecho "--------------- $i Kafka 启动 ---------------"ssh $i "/opt/module/kafka/kafka_2.12-3.0.0/bin/kafka-server-start.sh -daemon /opt/module/kafka/kafka_2.12-3.0.0/config/server.properties"done
};;
"stop"){for i in node001 node002 node003doecho "--------------- $i Kafka 停止 ---------------"ssh $i "/opt/module/kafka/kafka_2.12-3.0.0/bin/kafka-server-stop.sh "done
};;
"status") {for i in node001 node002 node003doecho "--------------- $i Kafka 状态 ---------------"ssh $i "/opt/module/kafka/kafka_2.12-3.0.0/bin/kafka-topics.sh "done
}
;;
esac

p008

2.2 Kafka 命令行操作

[atguigu@node001 kafka_2.12-3.0.0]$ pwd
/opt/module/kafka/kafka_2.12-3.0.0
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh # 查看操作主题命令参数
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        replica assignment, and/or           configuration for the topic.         
--at-min-isr-partitions                  if set when describing topics, only    show partitions whose isr count is   equal to the configured minimum.     
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  connect to>                              to.                                  
--command-config <String: command        Property file containing configs to be config property file>                    passed to Admin Client. This is used only with --bootstrap-server option  for describing and altering broker   configs.                             
--config <String: name=value>            A topic configuration override for the topic being created or altered. The  following is a list of valid         configurations:                      cleanup.policy                        compression.type                      delete.retention.ms                   file.delete.delay.ms                  flush.messages                        flush.ms                              follower.replication.throttled.       replicas                             index.interval.bytes                  leader.replication.throttled.replicas local.retention.bytes                 local.retention.ms                    max.compaction.lag.ms                 max.message.bytes                     message.downconversion.enable         message.format.version                message.timestamp.difference.max.ms   message.timestamp.type                min.cleanable.dirty.ratio             min.compaction.lag.ms                 min.insync.replicas                   preallocate                           remote.storage.enable                 retention.bytes                       retention.ms                          segment.bytes                         segment.index.bytes                   segment.jitter.ms                     segment.ms                            unclean.leader.election.enable        See the Kafka documentation for full   details on the topic configs. It is  supported only in combination with --create if --bootstrap-server option  is used (the kafka-configs CLI       supports altering topic configs with a --bootstrap-server option).        
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   removed for an existing topic (see   the list of configurations under the --config option). Not supported with the --bootstrap-server option.       
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--exclude-internal                       exclude internal topics when running   list or describe command. The        internal topics will be listed by    default                              
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting or    describing topics, the action will   only execute if the topic exists.    
--if-not-exists                          if set when creating topics, the       action will only execute if the      topic does not already exist.        
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic being created or altered (WARNING:   If partitions are increased for a    topic that has a key, the partition  logic or ordering of the messages    will be affected). If not supplied   for create, defaults to the cluster  default.                             
--replica-assignment <String:            A list of manual partition-to-broker   broker_id_for_part1_replica1 :           assignments for the topic being      broker_id_for_part1_replica2 ,           created or altered.                  broker_id_for_part2_replica1 :                                                broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        replication factor>                      partition in the topic being         created. If not supplied, defaults   to the cluster default.              
--topic <String: topic>                  The topic to create, alter, describe   or delete. It also accepts a regular expression, except for --create      option. Put topic name in double     quotes and use the '\' prefix to     escape regular expression symbols; e.g. "test\.topic".                    
--topics-with-overrides                  if set when describing topics, only    show topics that have overridden     configs                              
--unavailable-partitions                 if set when describing topics, only    show partitions whose leader is not  available                            
--under-min-isr-partitions               if set when describing topics, only    show partitions whose isr count is   less than the configured minimum.    
--under-replicated-partitions            if set when describing topics, only    show under replicated partitions     
--version                                Display Kafka version.                 
[atguigu@node001 kafka_2.12-3.0.0]$ 
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  connect to>                              to.--topic <String: topic>                  The topic to create, alter, describe   or delete. It also accepts a regular expression, except for --create      option. Put topic name in double     quotes and use the '\' prefix to     escape regular expression symbols; e.g. "test\.topic".
  1. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --list
  2. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first --create --partitions 1 --replication-factor 3 # 创建first主题,设置三个副本
  3. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --describe
  4. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --alter --partitions 3
  5. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --describe
  6. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --alter --partitions 1 # 报错,分区只能增加,不能减少!
  7. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --alter --replication-factor 2 # 不能通过命令行去修改副本
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --list
__consumer_offsets
__transaction_state
action_topic
appVideo_topic
display_topic
dwd_examination_test_paper
dwd_examination_test_question
dwd_interaction_comment
dwd_interaction_favor_add
dwd_interaction_review
dwd_learn_play
dwd_trade_cart_add
dwd_trade_order_detail
dwd_trade_pay_suc_detail
dwd_traffic_action_log
dwd_traffic_display_log
dwd_traffic_error_log
dwd_traffic_page_log
dwd_traffic_play_pre_process
dwd_traffic_start_log
dwd_traffic_unique_visitor_detail
dwd_traffic_user_jump_detail
dwd_user_user_login
dwd_user_user_register
error_topic
first
maxwell
nifi
nifiOutput
page_topic
start_topic
topic_db
topic_log
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first --create --partitions 1 --replication-factor 3
Error while executing topic command : Topic 'first' already exists.
[2024-03-04 16:59:58,015] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'first' already exists.(kafka.admin.TopicCommand$)
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --create --partitions 1 --replication-factor 3
Created topic first01.
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --describe
Topic: first01  TopicId: 8_ayAUYdRbODZCeFMBE8Cg PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824Topic: first01  Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
[atguigu@node001 kafka_2.12-3.0.0]$ 

p009

[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first --create --partitions 1 --replication-factor 3 # 创建first主题,设置三个副本

之后,创建生产者,向first主题发送数据,

[atguigu@node001 ~]$ cd /opt/module/kafka/kafka_2.12-3.0.0/bin
[atguigu@node001 bin]$ ./kafka-console-producer.sh

[atguigu@node001 ~]$ cd /opt/module/kafka/kafka_2.12-3.0.0/bin
[atguigu@node001 bin]$ ./kafka-console-producer.sh
Missing required option(s) [bootstrap-server]
Option                                   Description                            
------                                   -----------                            
--batch-size <Integer: size>             Number of messages to send in a single batch if they are not being sent     synchronously. (default: 200)        
--bootstrap-server <String: server to    REQUIRED unless --broker-list          connect to>                              (deprecated) is specified. The server(s) to connect to. The broker list   string in the form HOST1:PORT1,HOST2:PORT2.                               
--broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server     instead; ignored if --bootstrap-     server is specified.  The broker     list string in the form HOST1:PORT1, HOST2:PORT2.                         
--compression-codec [String:             The compression codec: either 'none',  compression-codec]                       'gzip', 'snappy', 'lz4', or 'zstd'.  If specified without value, then it  defaults to 'gzip'                   
--help                                   Print usage information.               
--line-reader <String: reader_class>     The class name of the class to use for reading lines from standard in. By   default each line is read as a       separate message. (default: kafka.   tools.                               ConsoleProducer$LineMessageReader)   
--max-block-ms <Long: max block on       The max time that the producer will    send>                                    block for during a send request      (default: 60000)                     
--max-memory-bytes <Long: total memory   The total memory used by the producer  in bytes>                                to buffer records waiting to be sent to the server. (default: 33554432)   
--max-partition-memory-bytes <Long:      The buffer size allocated for a        memory in bytes per partition>           partition. When records are received which are smaller than this size the producer will attempt to             optimistically group them together   until this size is reached.          (default: 16384)                     
--message-send-max-retries <Integer>     Brokers can fail receiving the message for multiple reasons, and being      unavailable transiently is just one  of them. This property specifies the number of retries before the         producer give up and drop this       message. (default: 3)                
--metadata-expiry-ms <Long: metadata     The period of time in milliseconds     expiration interval>                     after which we force a refresh of    metadata even if we haven't seen any leadership changes. (default: 300000)
--producer-property <String:             A mechanism to pass user-defined       producer_prop>                           properties in the form key=value to  the producer.                        
--producer.config <String: config file>  Producer config properties file. Note  that [producer-property] takes       precedence over this config.         
--property <String: prop>                A mechanism to pass user-defined       properties in the form key=value to  the message reader. This allows      custom configuration for a user-     defined message reader. Default      properties include:                  parse.key=true|false                  key.separator=<key.separator>         ignore.error=true|false               
--request-required-acks <String:         The required acks of the producer      request required acks>                   requests (default: 1)                
--request-timeout-ms <Integer: request   The ack timeout of the producer        timeout ms>                              requests. Value must be non-negative and non-zero (default: 1500)         
--retry-backoff-ms <Integer>             Before each retry, the producer        refreshes the metadata of relevant   topics. Since leader election takes  a bit of time, this property         specifies the amount of time that    the producer waits before refreshing the metadata. (default: 100)         
--socket-buffer-size <Integer: size>     The size of the tcp RECV size.         (default: 102400)                    
--sync                                   If set message send requests to the    brokers are synchronously, one at a  time as they arrive.                 
--timeout <Integer: timeout_ms>          If set and the producer is running in  asynchronous mode, this gives the    maximum amount of time a message     will queue awaiting sufficient batch size. The value is given in ms.      (default: 1000)                      
--topic <String: topic>                  REQUIRED: The topic id to produce      messages to.                         
--version                                Display Kafka version.                 
[atguigu@node001 bin]$ 
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-console-producer.sh --bootstrap-server node001:9092 --topic first01 # 生产者
>hello
>123
[atguigu@node002 kafka_2.12-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server node001:9092 --topic first01 # 消费者
hello
123
--------------------------------------------------
[atguigu@node002 kafka_2.12-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server node001:9092 --topic first01 --from-beginning # --from-beginning,把主题中所有的数据都读取出来(包括历史数据)

第 3 章 Kafka 生产者

p010

kafka由三部分组成:生产者、broker、消费者。

3.1.1 发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程 中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

p011

3.2 异步发送 API

3.2.1 普通异步发送

p012

p013

p014

第 4 章 Kafka Broker

第 5 章 Kafka 消费者

第 6 章 Kafka-Eagle 监控

第 7 章 Kafka-Kraft 模式

关键字:尚硅谷大数据技术-Kafka视频教程-笔记01【Kafka 入门】

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: