社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

基于事件驱动架构构建微服务第13部分:使用来自Apache KAFKA的事件并将投影流传输到ElasticSearch

dotNET跨平台 • 2 年前 • 260 次点击  

原文链接:https://logcorner.com/building-microservices-through-event-driven-architecture-part13-read-model-projection-project-streams-into-elasticsearch/

在本教程中,我将展示如何从KAFKA读取流并将流投影到ElasticSearch中。

我必须使用来自KAFKA的消息,我从KAFKA读取的消息是事件流。所以我必须将这些流投影到结构化表示中。然后我会将投影索引到ElasticSearch中。

所以我会建立一个订阅KAFKA并监听事件的消费者。如果它接收到一个event,它将使用投影来创建该事件的结构演示。最后将其存储到一个nosql数据库ElasticSearch。

投影事件

实际上,读取数千个事件会花费太长时间,相反我们可以预先计算当前状态并将其存储到nosql数据库中。投影可以定义为从一系列事件中导出的当前状态

我定义了一个基本的泛型类Entity,因此每个投影都将从它派生。

我定义了一个抽象的泛型类Projection,它接受一个事件列表并将它们应用于具体类(在我们的例子中是 SpeechProjection)。

SpeechProjection是一个表示我想从事件(SpeechCreatedEvent、SpeechTitleChangedEvent、SpeechDescriptionChangedEvent、SpeechUrlChangedEvent和SpeechTypeChangedEvent)重建其状态的实体的类。

因此,对于与给定实体(语音)相关的每个事件,我必须将事件应用于实体。

ElasticSearch介绍

Elasticsearch是一种分布式RESTful搜索和分析引擎,能够处理越来越多的用例。作为Elastic Stack的核心,它集中存储你的数据,以实现闪电般的快速搜索、微调相关性和可轻松扩展的强大分析。https://www.elastic.co/elasticsearch/

转到以下链接安装elasticsearch:https://www.elastic.co/downloads/elasticsearch

你可以通过使用PowerShell运行以下命令 curl http://localhost:9200/ 或 Invoke-RestMethod http://localhost:9200 来验证安装是否正常

以下代码创建一个通用存储库以连接到弹性搜索,并执行CRUD操作。

创建工作服务

ASP.NET Core Worker Service模板为编写长时间运行的服务应用程序提供了一个起点。

我们可以使用工作服务来构建不需要用户交互或执行定期和长时间运行的工作负载的应用程序。

https://docs.microsoft.com/fr-fr/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-5.0&tabs=visual-studio

我将使用Worker Service构建一个消费服务,该服务消费来自APACHE KAFKA的事件并将它们索引到ElasticSearch

ConsumerHostedService

ConsumerHostedService是承载ConsumerService的后台服务

ConsumerService

ConsumerService调用服务总线,该总线在产生新事件时从Kafka接收通知。

服务总线

KafkaClient

KafkaClient实现了IServiceBusProvider的ReceiveAsync。它订阅了一个Kafka主题,因此当一个事件发布到该主题时,它会通知一个中介服务。

ElasticSearchNotifier实现了INotificationHandler。这个类的职责是反序列化输入事件并将其索引到elasticsearch。

测试

启动zookeeper

zookeeper-server-start.bat config\zookeeper.properties

启动Kafka

kafka-server-start.bat config\server.properties

启动ElasticSearch

启动下列工程:

  • LogCorner.EduSync.SignalR.Server

  • LogCorner.EduSync.Speech.Producer

  • LogCorner.EduSync.Speech.Consumer 启动下列工程:

  • LogCorner.EduSync.Speech.Presentation

启动Postman并且post一个新command 

你应该在消费者控制台上看到以下输出,使用postman上发布的命令

代码源可在此处获得:

https://github.com/logcorner/LogCorner.EduSync.Speech.Command https://github.com/logcorner/LogCorner.EduSync.Speech.ServiceBus/tree/Feature/Task/IndexMessagesToElasticSearch

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/123661
 
260 次点击