Py学习  »  docker

用 Docker 快速搭建 Kafka 集群

马哥Linux运维 • 3 年前 • 326 次点击  

版本

JDK 14ZookeeperKafka

安装 Zookeeper 和 Kafka

Kafka 依赖 Zookeeper,所以我们需要在安装 Kafka 之前先拥有 Zookeeper。准备如下的 docker-compose.yaml 文件,将文件中的主机地址 192.168.1.100 替换成你自己的环境中的主机地址即可。

version: "3"services:  zookeeper:    image: zookeeper    build:      context: ./    container_name: zookeeper    ports:      - 2181:2181    volumes:      - ./data/zookeeper/data:/data      - ./data/zookeeper/datalog:/datalog      - ./data/zookeeper/logs:/logs    restart: always  kafka_node_0:    depends_on:      - zookeeper    build:      context: ./


    
    container_name: kafka-node-0    image: wurstmeister/kafka    environment:      KAFKA_BROKER_ID: 0      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9092      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092      KAFKA_NUM_PARTITIONS: 3      KAFKA_DEFAULT_REPLICATION_FACTOR: 2    ports:      - 9092:9092    volumes:      - ./data/kafka/node_0:/kafka    restart: unless-stopped  kafka_node_1:    depends_on:      - kafka_node_0    build:      context: ./    container_name: kafka-node-1    image: wurstmeister/kafka    environment:      KAFKA_BROKER_ID: 1      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9093      KAFKA_LISTENERS


    
: PLAINTEXT://0.0.0.0:9093      KAFKA_NUM_PARTITIONS: 3      KAFKA_DEFAULT_REPLICATION_FACTOR: 2    ports:      - 9093:9093    volumes:      - ./data/kafka/node_1:/kafka    restart: unless-stopped  kafka_node_2:    depends_on:      - kafka_node_1    build:      context: ./    container_name: kafka-node-2    image: wurstmeister/kafka    environment:      KAFKA_BROKER_ID: 2      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9094      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094      KAFKA_NUM_PARTITIONS: 3      KAFKA_DEFAULT_REPLICATION_FACTOR: 2    ports:      - 9094:9094    volumes:      - ./data/kafka/node_2:/kafka


    
    restart: unless-stopped

输入 docker-compose up -d 运行脚本文件进行集群构建。等待一会儿,得到如下结果即为成功。

SpringBoot 集成 Kafka 集群

创建一个全新的 SpringBoot 工程,在 build.gradle 文件中添加下列依赖。

dependencies {    ...    ...    implementation 'org.springframework.kafka:spring-kafka:2.5.2.RELEASE'    implementation 'com.alibaba:fastjson:1.2.71'}

1.在 application.properties 进行 Kafka 相关参数配置。

spring.kafka.bootstrap-servers=192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094spring.kafka.producer.retries=0spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value


    
-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.auto-offset-reset=latestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100

2.创建消息体类。

public class Message {    private Long id;    private String message;    private Date sendAt;}

3.创建消息发送者

public class Sender {    @Autowired    private KafkaTemplate<String, String> kafkaTemplate;    public void send() {        Message message = new Message();        message.setId


    
(System.currentTimeMillis());        message.setMessage(UUID.randomUUID().toString());        message.setSendAt(new Date());        log.info("message = {}", JSON.toJSONString(message));        kafkaTemplate.send("test", JSON.toJSONString(message));    }}

4.创建消息接收者

public class Receiver {    @KafkaListener(topics = {"test"}, groupId = "test")    public void listen(ConsumerRecord, ?> record) {        Optional> message = Optional.ofNullable(record.value());        if (message.isPresent()) {            log.info("receiver record = " + record);            log.info("receiver message = " + message.get());        }    }}


    

5.测试消息队列

public class QueueController {    @Autowired    private Sender sender;    @PostMapping("/test")    public void testQueue() {        sender.send();        sender.send();        sender.send();    }}

得到如下日志即为集成成功。

到这里就我们就成功搭建了一个 Kafka 伪集群,并成功与 SpringBoot 进行整合。

如果你想了解Docker容器技术及应用,欢迎参加8月7日晚的公开课。本次公开课主要围绕Docker容器技术入门和应用实战,系统介绍Docker容器技术、在企业中应用、以及网络管理、储存卷管理和部署Nginx、PHP、MySQL、Redis实战等,干货满满。

作者:曾是然
链接:https://segmentfault.com/a/1190000022988499
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/72227
 
326 次点击