LMLPHP后院

Kafka 快速入门官方教程技术

maybe yes 发表于 2017-07-18 20:08

希望对初学者有帮助。

此教程假设你开始一个新的环境,不存在 Kafka 和 ZooKeeper 数据。Kafka 控制台脚本在 Unix 环境和 Windows 环境略有不同,在 Windows 上请将 "bin/" 替换为 "bin\windows\" 然后加上脚本后缀 ".bat"。

下载

下载编译好的 Kafka 解压:

$ tar -zxf kafka_2.11-0.11.0.0.tgz
$ cd kafka_2.11-0.11.0.0

启动服务

启动一个单节点 ZooKeeper 实例,默认 2181 端口

$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

启动 Kafka,默认 9092 端口

$ nohup bin/kafka-server-start.sh config/server.properties &

创建一个主题

创建一个主题名为 test,包含一个分区,一个副本。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看主题

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

主题无需手动创建,当插入不存在的主题时会自动创建主题。

生产者发消息

可以通过文件或者标准输入发送消息,一行一条消息。

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

启动消费者

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

在不同消费者终端可以看到消息被显示出来,命令行工具有很多额外的参数,不加参数时会显示它的用法。

建立多代理集群

一个代理就是集群的一个成员。下面扩大集群到三个节点,Windows 中使用 copy 命令

$ cp config/server.properties config/server-1.properties
$ cp config/server.properties config/server-2.properties

配置参考:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker_id 在集群的每个节点是唯一并且持久的。在同一台机器上运行需要重新设置日志目录和端口。

$ bin/kafka-server-start.sh config/server-1.properties &
$ bin/kafka-server-start.sh config/server-2.properties &

创建三个副本的主题,如下:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

使用 describe 命令查看状态:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

输出示例:

Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

第一行是所有分区的汇总信息,下面每一行展示一个分区信息因为只有一个分区,所以显示一行。

  • leader 负责分区的读和写,随机选举产生。
  • replicas 分区的节点集合。
  • isr 同步中的副本集合,是节点的子集。

容错性测试,删除主节点,如下:

$ ps aux | grep server-1.properties
$ kill -9 {port}

// Windows
wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
taskkill /pid {port} /f

主节点变成 2,节点 1 不再是同步中的副本:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

虽然原始的主节点已经失效,但是消费者依然可以读取消息:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

使用 Kafka 连接器导入导出数据

往 test.txt 中写入两行数据

$ echo -e "foo\nbar" > test.txt

启动 Kafka 的连接器,使用默认的测试配置文件

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

这时,导出了数据在  test.sink.txt 文件:

$ cat test.sink.txt
foo
bar

数据被存储在主题 connect-test 中,查看主题中的数据:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

此时可以继续向 test.txt 中写入数据,你应该可以看到数据出现在 sink 文件中。

使用 Kafka 流处理数据

Kafka Streams 是用来创建关键实时应用的客户端库,数据存储在 Kafka 集群中。Kafka Streams 将客户端的编写简单性和部署标准 Java 和 Scala 应用程序与 Kafka 服务器端集群技术的优势相结合,使这些应用程序具有高度可扩展性,弹性,容错性,分布式和更多功能

2025-01-26 15:16:20 1737875780 0.036612