介绍
Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比,有以下不同:
- 它被设计为一个分布式系统,易于向外扩展;
- 它同时为发布和订阅提供高吞吐量;
- 它支持多订阅者,当失败时能自动平衡消费者;
- 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。
Kafka 安装
下载
https://kafka.apache.org/downloads
mkdir -p /storage/app/kafka && cd storage/app/kafka
wget https://mirrors.gigenet.com/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar xvf kafka_2.13-2.7.0.tgz
修改配置
vim /storage/app/kafka/kafka_2.13-2.7.0/config/server.properties
broker.id=1
log.dirs=/storage/app/kafka/kafka_2.13-2.7.0/logs
需要注意的是 kafka 依赖 zookeeper, 建议使用外部 zk, 但也支持使用内置的 zk
启动
使用安装包中的脚本启动单节点 Zookeeper 实例:
/storage/app/kafka/kafka_2.13-2.7.0/bin/zookeeper-server-start.sh\
-daemon /storage/app/kafka/kafka_2.13-2.7.0/config/zookeeper.properties
启动 kafka
/storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-server-start.sh\
/storage/app/kafka/kafka_2.13-2.7.0/config/server.properties
配置 kafka 到 systemd
新建配置 vim /etc/systemd/system/kafka.service
[Unit]
Description=kafka
Documentation=kafka
After=network-online.target
Wants=network-online.target
[Service]
Environment=" "
EnvironmentFile=-/etc/default/%p
ExecStart=/storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-server-start.sh /storage/app/kafka/kafka_2.13-2.7.0/config/server.properties
ExecStop=/bin/kill -HUP $MAINPID
Restart=on-failure
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure
KillSignal=SIGINT
[Install]
WantedBy=multi-user.target
重载 systemd
systemctl daemon-reload
启动 kafka
systemctl start kafka.service
查看 kafka 状态
systemctl status kafka.service
root@jansora-Vostro-3669:/storage/app/kafka/kafka_2.13-2.7.0/config# systemctl status kafka.service
● kafka.service - kafka
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
Active: active (running) since Tue 2021-01-19 12:27:40 CST; 4s ago
Main PID: 1061890 (java)
Tasks: 74 (limit: 19037)
Memory: 323.4M
CGroup: /system.slice/kafka.service
└─1061890 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineL>
1月 19 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,115] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPu>
1月 19 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,144] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChan>
1月 19 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,172] INFO [SocketServer brokerId=1] Starting socket server acceptors and processors>
1月 19 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,199] INFO [SocketServer brokerId=1] Started data-plane acceptor and processor(s) fo>
1月 19 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,199] INFO [SocketServer brokerId=1] Started socket server acceptors and processors >
1月 19 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,202] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)
1月 19 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,202] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.common.utils.AppInfoPa>
1月 19 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,202] INFO Kafka startTimeMs: 1611030463200 (org.apache.kafka.common.utils.AppInfoPa>
1月 19 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,209] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
1月 19 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,236] INFO [broker-1-to-controller-send-thread]: Recorded new controller, from now o>
演示
发送消息
--broker-list
当前 kafka 服务
--topic
kafka 主题
/storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
发送消息
>1
>2
>3
>4
>5
>6
>7
>8
>9
接收消息
--bootstrap-server
当前 kafka 服务
--topic
kafka 主题
--from-beginning
从开始记录
/storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
1
2
3
4
5
6
7
8
9