社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  docker

用 Docker 快速搭建 Kafka 集群

运维 • 3 年前 • 284 次点击  

来自:SegmentFault,作者:曾是然

链接:https://segmentfault.com/a/1190000022988499

版本

JDK 14ZookeeperKafka

安装 Zookeeper 和 Kafka

Kafka 依赖 Zookeeper,所以我们需要在安装 Kafka 之前先拥有 Zookeeper。准备如下的 docker-compose.yaml 文件,将文件中的主机地址 192.168.1.100 替换成你自己的环境中的主机地址即可。

version: "3"services:  zookeeper:    image: zookeeper    build:      context: ./    container_name: zookeeper    ports:- 2181:2181    volumes:- ./data/zookeeper/data:/data- ./data/zookeeper/datalog:/datalog- ./data/zookeeper/logs:/logs    restart: always  kafka_node_0:    depends_on:- zookeeper    build:      context: ./    container_name: kafka-node-0    image: wurstmeister/kafka    environment:      KAFKA_BROKER_ID: 0      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9092      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092      KAFKA_NUM_PARTITIONS: 3      KAFKA_DEFAULT_REPLICATION_FACTOR: 2    ports:- 9092:


    
9092    volumes:- ./data/kafka/node_0:/kafka    restart: unless-stopped  kafka_node_1:    depends_on:- kafka_node_0    build:      context: ./    container_name: kafka-node-1    image: wurstmeister/kafka    environment:      KAFKA_BROKER_ID: 1      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9093      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093      KAFKA_NUM_PARTITIONS: 3      KAFKA_DEFAULT_REPLICATION_FACTOR: 2    ports:- 9093:9093    volumes:- ./data/kafka/node_1:/kafka    restart: unless-stopped  kafka_node_2:    depends_on:- kafka_node_1    build:      context: ./    container_name: kafka-node-2    image: wurstmeister/kafka    environment:      KAFKA_BROKER_ID: 2      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9094      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094      KAFKA_NUM_PARTITIONS: 3      KAFKA_DEFAULT_REPLICATION_FACTOR: 2    ports:- 9094:9094    volumes:- ./data/kafka/node_2:/kafka


    
    restart: unless-stopped

输入 docker-compose up -d 运行脚本文件进行集群构建。等待一会儿,得到如下结果即为成功。

SpringBoot 集成 Kafka 集群

创建一个全新的 SpringBoot 工程,在 build.gradle 文件中添加下列依赖。

dependencies {......    implementation 'org.springframework.kafka:spring-kafka:2.5.2.RELEASE'    implementation 'com.alibaba:fastjson:1.2.71'}

1.在 application.properties 进行 Kafka 相关参数配置。

spring.kafka.bootstrap-servers=192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094spring.kafka.producer.retries=0spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.auto-offset-reset=latestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100

2.创建消息体类。

publicclassMessage{privateLong id;privateString message;privateDate sendAt;}

3.创建消息发送者

publicclassSender{@AutowiredprivateKafkaTemplate<String, String> kafkaTemplate;publicvoid send() {Message message = newMessage();        message.setId(System.currentTimeMillis());        message.setMessage(UUID.randomUUID().toString());        message.setSendAt(newDate());        log.info("message = {}", JSON.toJSONString(message));        kafkaTemplate.send("test", JSON.toJSONString(message));}}

4.创建消息接收者

publicclassReceiver{@KafkaListener(topics = {"test"}, groupId = "test")publicvoid listen(ConsumerRecord, ?> record) {Optional> message = Optional.ofNullable(record.value());if(message.isPresent()) {            log.info("receiver record = "+ record);            log.info("receiver message = "+ message.get());}}}

5.测试消息队列

publicclassQueueController{@AutowiredprivateSender sender;


    
@PostMapping("/test")publicvoid testQueue() {        sender.send();        sender.send();        sender.send();}}

得到如下日志即为集成成功。

到这里就我们就成功搭建了一个 Kafka 伪集群,并成功与 SpringBoot 进行整合。

如果你想了解Docker容器技术及应用,欢迎参加8月7日晚的公开课。本次公开课主要围绕Docker容器技术入门和应用实战,系统介绍Docker容器技术、在企业中应用、以及网络管理、储存卷管理和部署Nginx、PHP、MySQL、Redis实战等,干货满满。


●输入m获取文章目录

推荐↓↓↓

Linux学习

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/73836
 
284 次点击