使用 Kafka、SpringBoot、ReactJS 和 WebSockets 的实时聊天应用程序
最终结果
在本教程中,我们将构建一个简单的实时聊天应用程序,演示如何使用 Kafka 作为消息代理,以及使用 Java、SpringBoot 作为后端和在前端使用 ReactJS。
该项目仅供学习之用。它不包含可用于生产的代码。
什么是 Kafka
Apache Kafka 是一种广受欢迎的分布式消息传递系统,它提供快速、分布式、高度可扩展、高可用性、发布-订阅消息传递系统。
反过来,这解决了一个更困难的问题的一部分:
大型软件系统组件之间的通信和集成。
我们可以用 Kafka 做什么?
- 消息传递 - 应用程序之间的通信
- 网站活动跟踪(点击、搜索……)
- 指标收集——而不是写入日志
- 源和目标流处理。
安装
在开始项目之前,我们需要下载Zookeeper和Kafka。
您可以从这里下载 Kafka 。
将压缩文件的内容解压到您指定的文件夹中。
在 Kafka 目录中,转到该bin
文件夹。在这里,您会找到许多可用于运行 Kafka 应用程序的 Bash 脚本。
如果您使用的是 Windows,则文件夹中也有相同的脚本windows
。本教程使用 Linux 命令,但如果您运行的是 Microsoft 操作系统,则只需使用等效的 Windows 版本。
启动 Zookeeper
Zookeeper主要用于管理 Kafka 集群。它包含在下载的 Kafka 目录中。因此,我们无需单独下载。
要启动 zookeeper,请转到bin目录并输入以下命令。
./zookeeper-server-start.sh ../config/zookeeper.properties
启动 Kafka Broker
接下来,要启动 Kafka 代理,请在同一目录中运行以下命令
./kafka-server-start.sh ../config/server.properties
在启动 Kafka 之前,请确保 zookeeper 正在运行,因为 Kafka 从 Zookeeper 接收分区中保存的偏移量等信息。
创建 Kafka 主题
分别运行Zookeeper和Apache Kafka之后,我们可以创建一个Topic,并作为Producer和Consumer来发送和接收数据。
kafka-topics --create --topic kafka-chat --zookeeper localhost:2181 --replication-factor 1 --partitions 1
这里我们创建一个主题kafka-chat
来处理聊天消息。稍后我们会在聊天应用程序中用到这个主题。
现在,让我们编写一些代码。
使用 Java、SpringBoot 和 Kafka 进行后端开发
我们将使用 Spring Boot 开发后端。因此,请使用Spring Initializer
下载一个全新的 Spring Boot 项目,并填写以下信息。
- 项目:Maven项目
- 语言:Java
- 群组:com.shubh
- 工件:kafka-chat-server
- 依赖项:
- Apache Kafka 的 Spring
- Spring for Websocket
为什么选择 WebSocket?
由于 Apache Kafka 无法通过传统的 GET 和 POST 操作将消费者消息即时发送到客户端,因此
我使用 WebSocket 执行了这些操作。WebSocket 提供全双工双向通信,这意味着信息可以同时从客户端流向服务器,也可以反向传输。WebSocket
在聊天应用中被广泛使用。
首先让我们创建一个消息模式来保存消息内容
。Message.java
package com.shubh.kafkachat.model;
public class Message {
private String sender;
private String content;
private String timestamp;
public Message() {
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getSender() {
return sender;
}
public void setSender(String sender) {
this.sender = sender;
}
public Message(String sender, String content) {
this.sender = sender;
this.content = content;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "Message{" +
"sender='" + sender + '\'' +
", content='" + content + '\'' +
", timestamp='" + timestamp + '\'' +
'}';
}
}
开发Producer向Kafka Topic推送消息
首先,我们必须为生产者编写一个配置类。
生产者配置.java
@EnableKafka
@Configuration
public class ProducerConfiguration {
@Bean
public ProducerFactory<String, Message> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigurations());
}
@Bean
public Map<String, Object> producerConfigurations() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return configurations;
}
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
这个类创建了一个ProducerFactory
知道如何根据我们提供的配置创建生产者的类。
我们还声明了一个KafkaTemplate
Bean,用于在生产器上执行高级操作。换句话说,该模板可以执行诸如向主题发送消息之类的操作,并有效地隐藏底层细节。
在producerConfigurations
方法中,我们需要执行以下任务:
BOOTSTRAP_SERVERS_CONFIG
设置运行Kafka的服务器地址。KEY_SERIALIZER_CLASS_CONFIG
并VALUE_SERIALIZER_CLASS_CONFIG
从 Kafka 队列中反序列化键和值。
下一步是创建一个端点,用于将消息发送到 Kafka 主题。
为此创建以下控制器类。
聊天控制器.java
@RestController
public class ChatController {
@Autowired
private KafkaTemplate<String, Message> kafkaTemplate;
@PostMapping(value = "/api/send", consumes = "application/json", produces = "application/json")
public void sendMessage(@RequestBody Message message) {
message.setTimestamp(LocalDateTime.now().toString());
try {
//Sending the message to kafka topic queue
kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, message).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
如你所见,端点非常简单。当我们POST
向它发出请求时/api/send
,它会注入之前配置的 KafkaTemplate,并向kafka-chat
我们之前创建的主题发送一条消息。
让我们测试一下到目前为止构建的所有内容。运行类main
中的方法KafakaJavaApp.java
。要从命令行运行,请执行以下命令
mvn spring-boot:run
您的服务器应该在端口 8080 上运行,您可以向其发出 API 请求!
您可以使用 Postman 发出 POST 请求,如下所示。
但是,如何知道该命令是否成功向主题发送了消息?目前,您无法在应用程序内部使用消息,这意味着您无法确定!
幸运的是,有一种简单的方法可以立即创建一个消费者进行测试。在 Kafka 目录的 bin 文件夹中,运行以下命令:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-chat
再次点击http://localhost:8080/api/send
即可在运行 Kafka 消费者的终端中查看消息
现在让我们使用 Java 代码实现相同的功能。为此,我们需要用 Java 构建一个 Consumer 或 Listener。
开发一个消费者来监听 Kafka Topic。
类似地,ProducerConfig.java
我们需要有一个消费者配置来使消费者能够找到经纪人。
监听器配置.java
@EnableKafka
@Configuration
public class ListenerConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigurations(), new StringDeserializer(), new JsonDeserializer<>(Message.class));
}
@Bean
public Map<String, Object> consumerConfigurations() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
configurations.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID);
configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return configurations;
}
}
在消费者配置中,与生产者配置类似,我们设置键和值的反序列化器。
此外,我们还需要设置
- GROUP_ID_CONFIG 设置 Kafka 消费者组 ID
- AUTO_OFFSET_RESET_CONFIG 用于设置偏移配置。在本项目中,我们使用值“earliest”,以便从头开始获取队列中的所有值。我们也可以使用“latest”来仅获取最新值。
消息监听器.java
@Component
public class MessageListener {
@Autowired
SimpMessagingTemplate template;
@KafkaListener(
topics = KafkaConstants.KAFKA_TOPIC,
groupId = KafkaConstants.GROUP_ID
)
public void listen(Message message) {
System.out.println("sending via kafka listener..");
template.convertAndSend("/topic/group", message);
}
}
在这个类中,@KafkaListener 注释了将监听 Kafka 队列消息的方法,
而 template.convertAndSend 将转换消息并将其发送到 WebSocket 主题。
接下来,我们需要配置Websocket来将消息发送到客户端系统。
Spring WebSocket 配置
WebSocket配置.java
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// chat client will use this to connect to the server
registry.addEndpoint("/ws-chat").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic/");
}
}
接下来MessageMapping
在ChatController.java
@MessageMapping("/sendMessage")
@SendTo("/topic/group")
public Message broadcastGroupMessage(@Payload Message message) {
//Sending this message to all the subscribers
return message;
}
这将向所有订阅该主题的客户端广播该消息。
接下来我们来开发UI部分。
ReactJS 中的前端开发
我们将创建一个简单的聊天页面,其中包含消息列表和页面底部的文本字段,用于将消息发送到 Kafka 后端。
创建 React 应用
我们将使用 Create React App 来快速启动该应用程序。
npm install --g create-react-app
create-react-app chat-ui
cd chat-ui
安装依赖项
- axios
- socketjs
- 反应-stomp
- 材质用户界面
npm install socketjs react-stomp material-ui axios
您可以在此处参考material-ui的文档。
npm start
复制 CSS 样式
从这里复制 css 样式并将其粘贴到App.css
文件中。
接下来,将以下更改添加到App.js
App.js
import React, { useState } from 'react';
import SockJsClient from 'react-stomp';
import './App.css';
import Input from './components/Input/Input';
import LoginForm from './components/LoginForm';
import Messages from './components/Messages/Messages';
import chatAPI from './services/chatapi';
import { randomColor } from './utils/common';
const SOCKET_URL = 'http://localhost:8080/ws-chat/';
const App = () => {
const [messages, setMessages] = useState([])
const [user, setUser] = useState(null)
let onConnected = () => {
console.log("Connected!!")
}
let onMessageReceived = (msg) => {
console.log('New Message Received!!', msg);
setMessages(messages.concat(msg));
}
let onSendMessage = (msgText) => {
chatAPI.sendMessage(user.username, msgText).then(res => {
console.log('Sent', res);
}).catch(err => {
console.log('Error Occured while sending message to api');
})
}
let handleLoginSubmit = (username) => {
console.log(username, " Logged in..");
setUser({
username: username,
color: randomColor()
})
}
return (
<div className="App">
{!!user ?
(
<>
<SockJsClient
url={SOCKET_URL}
topics={['/topic/group']}
onConnect={onConnected}
onDisconnect={console.log("Disconnected!")}
onMessage={msg => onMessageReceived(msg)}
debug={false}
/>
<Messages
messages={messages}
currentUser={user}
/>
<Input onSendMessage={onSendMessage} />
</>
) :
<LoginForm onSubmit={handleLoginSubmit} />
}
</div>
)
}
export default App;
这里我们使用SocketJsCLient来react-stomp
连接到WebSocket。
或者,您也可以使用 SockJS 来sockjs-client
创建stompclient
并连接到 WebSocket。
接下来,我们需要创建消息子组件来显示消息列表。
import React from 'react'
const Messages = ({ messages, currentUser }) => {
let renderMessage = (message) => {
const { sender, content, color } = message;
const messageFromMe = currentUser.username === message.sender;
const className = messageFromMe ? "Messages-message currentUser" : "Messages-message";
return (
<li className={className}>
<span
className="avatar"
style={{ backgroundColor: color }}
/>
<div className="Message-content">
<div className="username">
{sender}
</div>
<div className="text">{content}</div>
</div>
</li>
);
};
return (
<ul className="messages-list">
{messages.map(msg => renderMessage(msg))}
</ul>
)
}
export default Messages
LoginForm.js
import React, { useState } from 'react';
import TextField from '@material-ui/core/TextField';
import Button from '@material-ui/core/Button';
const LoginForm = ({ onSubmit }) => {
const [username, setUsername] = useState("");
let handleUserNameChange = event => setUsername(event.target.value);
let handleSubmit = () => {
onSubmit(username);
}
return (
<div>
<TextField
label="Type your username"
placeholder="Username"
onChange={handleUserNameChange}
margin="normal"
onKeyPress={event => {
if (event.key === 'Enter') {
handleSubmit();
}
}}
/>
<br />
<Button variant="contained" color="primary" onClick={handleSubmit} >
Login
</Button>
</div>
)
}
export default LoginForm
最终结果
在多个窗口中打开应用程序,并在一个窗口中发送消息。所有其他浏览器窗口都应显示已发送的消息。
我们正在使用 SockJS 来监听从服务器端 WebSocket 发送的消息。
源代码
您可以在我的Github页面中找到完整的源代码。
文章来源:https://dev.to/subhransu/realtime-chat-app-using-kafka-springboot-reactjs-and-websockets-lc