Kafka

介绍

官网:http://kafka.apache.org/

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

Apache Kafka与传统消息系统相比,有以下不同:

  • 它被设计为一个分布式系统,易于向外扩展;
  • 它同时为发布和订阅提供高吞吐量;
  • 它支持多订阅者,当失败时能自动平衡消费者;
  • 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。

Kafka 安装

下载

https://kafka.apache.org/downloads

mkdir -p /storage/app/kafka && cd storage/app/kafka
wget https://mirrors.gigenet.com/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar xvf kafka_2.13-2.7.0.tgz

修改配置

vim /storage/app/kafka/kafka_2.13-2.7.0/config/server.properties

broker.id=1
log.dirs=/storage/app/kafka/kafka_2.13-2.7.0/logs

需要注意的是 kafka 依赖 zookeeper, 建议使用外部 zk, 但也支持使用内置的 zk

启动

使用安装包中的脚本启动单节点 Zookeeper 实例:

/storage/app/kafka/kafka_2.13-2.7.0/bin/zookeeper-server-start.sh\
-daemon /storage/app/kafka/kafka_2.13-2.7.0/config/zookeeper.properties

启动 kafka

/storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-server-start.sh\
 /storage/app/kafka/kafka_2.13-2.7.0/config/server.properties

配置 kafka 到 systemd

新建配置 vim /etc/systemd/system/kafka.service

[Unit]
Description=kafka
Documentation=kafka
After=network-online.target
Wants=network-online.target

[Service]
Environment=" "
EnvironmentFile=-/etc/default/%p
ExecStart=/storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-server-start.sh /storage/app/kafka/kafka_2.13-2.7.0/config/server.properties
ExecStop=/bin/kill -HUP $MAINPID
Restart=on-failure
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure

KillSignal=SIGINT

[Install]
WantedBy=multi-user.target

重载 systemd

systemctl daemon-reload

启动 kafka

systemctl start kafka.service 

查看 kafka 状态

systemctl status kafka.service 
root@jansora-Vostro-3669:/storage/app/kafka/kafka_2.13-2.7.0/config# systemctl status kafka.service 
● kafka.service - kafka
     Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
     Active: active (running) since Tue 2021-01-19 12:27:40 CST; 4s ago
   Main PID: 1061890 (java)
      Tasks: 74 (limit: 19037)
     Memory: 323.4M
     CGroup: /system.slice/kafka.service
             └─1061890 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineL>

119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,115] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPu>
119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,144] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChan>
119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,172] INFO [SocketServer brokerId=1] Starting socket server acceptors and processors>
119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,199] INFO [SocketServer brokerId=1] Started data-plane acceptor and processor(s) fo>
119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,199] INFO [SocketServer brokerId=1] Started socket server acceptors and processors >
119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,202] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)
119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,202] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.common.utils.AppInfoPa>
119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,202] INFO Kafka startTimeMs: 1611030463200 (org.apache.kafka.common.utils.AppInfoPa>
119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,209] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,236] INFO [broker-1-to-controller-send-thread]: Recorded new controller, from now o>

演示

发送消息

--broker-list 当前 kafka 服务

--topic kafka 主题

/storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

发送消息

>1
>2
>3
>4
>5
>6
>7
>8
>9

接收消息

--bootstrap-server 当前 kafka 服务

--topic kafka 主题

--from-beginning 从开始记录

/storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
1
2
3
4
5
6
7
8
9

评论栏