使用 Kafka、SpringBoot、ReactJS 和 WebSockets 的实时聊天应用程序最终结果

2025-05-27

使用 Kafka、SpringBoot、ReactJS 和 WebSockets 的实时聊天应用程序

最终结果

在本教程中,我们将构建一个简单的实时聊天应用程序,演示如何使用 Kafka 作为消息代理,以及使用 Java、SpringBoot 作为后端和在前端使用 ReactJS。

该项目仅供学习之用。它不包含可用于生产的代码。

什么是 Kafka

Apache Kafka 是一种广受欢迎的分布式消息传递系统,它提供快速、分布式、高度可扩展、高可用性、发布-订阅消息传递系统。

反过来,这解决了一个更困难的问题的一部分:

大型软件系统组件之间的通信和集成。

我们可以用 Kafka 做什么?

  • 消息传递 - 应用程序之间的通信
  • 网站活动跟踪(点击、搜索……)
  • 指标收集——而不是写入日志
  • 源和目标流处理。

安装

在开始项目之前,我们需要下载ZookeeperKafka

您可以从这里下载 Kafka

将压缩文件的内容解压到您指定的文件夹中。
在 Kafka 目录中,转到该bin文件夹​​。在这里,您会找到许多可用于运行 Kafka 应用程序的 Bash 脚本。

如果您使用的是 Windows,则文件夹中也有相同的脚本windows。本教程使用 Linux 命令,但如果您运行的是 Microsoft 操作系统,则只需使用等效的 Windows 版本。

启动 Zookeeper

Zookeeper主要用于管理 Kafka 集群。它包含在下载的 Kafka 目录中。因此,我们无需单独下载。

要启动 zookeeper,请转到bin目录并输入以下命令。

./zookeeper-server-start.sh ../config/zookeeper.properties
Enter fullscreen mode Exit fullscreen mode
启动 Kafka Broker

接下来,要启动 Kafka 代理,请在同一目录中运行以下命令

./kafka-server-start.sh ../config/server.properties
Enter fullscreen mode Exit fullscreen mode

在启动 Kafka 之前,请确保 zookeeper 正在运行,因为 Kafka 从 Zookeeper 接收分区中保存的偏移量等信息。

创建 Kafka 主题

分别运行Zookeeper和Apache Kafka之后,我们可以创建一个Topic,并作为Producer和Consumer来发送和接收数据。

kafka-topics --create --topic kafka-chat --zookeeper localhost:2181 --replication-factor 1 --partitions 1

Enter fullscreen mode Exit fullscreen mode

这里我们创建一个主题kafka-chat来处理聊天消息。稍后我们会在聊天应用程序中用到这个主题。

现在,让我们编写一些代码。

使用 Java、SpringBoot 和 Kafka 进行后端开发

我们将使用 Spring Boot 开发后端。因此,请使用Spring Initializer
下载一个全新的 Spring Boot 项目,并填写以下信息。

  • 项目:Maven项目
  • 语言:Java
  • 群组:com.shubh
  • 工件:kafka-chat-server
  • 依赖项:
    • Apache Kafka 的 Spring
    • Spring for Websocket

为什么选择 WebSocket?

由于 Apache Kafka 无法通过传统的 GET 和 POST 操作将消费者消息即时发送到客户端,因此
我使用 WebSocket 执行了这些操作。WebSocket 提供全双工双向通信,这意味着信息可以同时从客户端流向服务器,也可以反向传输。WebSocket
在聊天应用中被广泛使用。

首先让我们创建一个消息模式来保存消息内容
。Message.java

package com.shubh.kafkachat.model;

public class Message {
    private String sender;
    private String content;
    private String timestamp;

    public Message() {
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }

    public String getSender() {
        return sender;
    }

    public void setSender(String sender) {
        this.sender = sender;
    }

    public Message(String sender, String content) {
        this.sender = sender;
        this.content = content;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return "Message{" +
                "sender='" + sender + '\'' +
                ", content='" + content + '\'' +
                ", timestamp='" + timestamp + '\'' +
                '}';
    }
}

Enter fullscreen mode Exit fullscreen mode

开发Producer向Kafka Topic推送消息

首先,我们必须为生产者编写一个配置类。

生产者配置.java

@EnableKafka
@Configuration
public class ProducerConfiguration {
    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigurations());
    }

    @Bean
    public Map<String, Object> producerConfigurations() {
        Map<String, Object> configurations = new HashMap<>();
        configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return configurations;
    }

    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
Enter fullscreen mode Exit fullscreen mode

这个类创建了一个ProducerFactory知道如何根据我们提供的配置创建生产者的类。

我们还声明了一个KafkaTemplateBean,用于在生产器上执行高级操作。换句话说,该模板可以执行诸如向主题发送消息之类的操作,并有效地隐藏底层细节。

producerConfigurations方法中,我们需要执行以下任务:

  • BOOTSTRAP_SERVERS_CONFIG设置运行Kafka的服务器地址。
  • KEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG从 Kafka 队列中反序列化键和值。

下一步是创建一个端点,用于将消息发送到 Kafka 主题。
为此创建以下控制器类。

聊天控制器.java
@RestController
public class ChatController {

    @Autowired
    private KafkaTemplate<String, Message> kafkaTemplate;

    @PostMapping(value = "/api/send", consumes = "application/json", produces = "application/json")
    public void sendMessage(@RequestBody Message message) {
        message.setTimestamp(LocalDateTime.now().toString());
        try {
            //Sending the message to kafka topic queue
            kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, message).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

}
Enter fullscreen mode Exit fullscreen mode

如你所见,端点非常简单。当我们POST向它发出请求时/api/send,它会注入之前配置的 KafkaTemplate,并向kafka-chat我们之前创建的主题发送一条消息。

让我们测试一下到目前为止构建的所有内容。运行类main中的方法KafakaJavaApp.java。要从命令行运行,请执行以下命令

mvn spring-boot:run
Enter fullscreen mode Exit fullscreen mode

您的服务器应该在端口 8080 上运行,您可以向其发出 API 请求!
您可以使用 Postman 发出 POST 请求,如下所示。

CapturePost

但是,如何知道该命令是否成功向主题发送了消息?目前,您无法在应用程序内部使用消息,这意味着您无法确定!

幸运的是,有一种简单的方法可以立即创建一个消费者进行测试。在 Kafka 目录的 bin 文件夹中,运行以下命令:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-chat
Enter fullscreen mode Exit fullscreen mode

再次点击http://localhost:8080/api/send即可在运行 Kafka 消费者的终端中查看消息

现在让我们使用 Java 代码实现相同的功能。为此,我们需要用 Java 构建一个 Consumer 或 Listener。

开发一个消费者来监听 Kafka Topic。

类似地,ProducerConfig.java我们需要有一个消费者配置来使消费者能够找到经纪人。

监听器配置.java

@EnableKafka
@Configuration
public class ListenerConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, Message> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigurations(), new StringDeserializer(), new JsonDeserializer<>(Message.class));
    }

    @Bean
    public Map<String, Object> consumerConfigurations() {
        Map<String, Object> configurations = new HashMap<>();
        configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
        configurations.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID);
        configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return configurations;
    }
}
Enter fullscreen mode Exit fullscreen mode

在消费者配置中,与生产者配置类似,我们设置键和值的反序列化器。
此外,我们还需要设置

  • GROUP_ID_CONFIG 设置 Kafka 消费者组 ID
  • AUTO_OFFSET_RESET_CONFIG 用于设置偏移配置。在本项目中,我们使用值“earliest”,以便从头开始获取队列中的所有值。我们也可以使用“latest”来仅获取最新值。

消息监听器.java

@Component
public class MessageListener {
    @Autowired
    SimpMessagingTemplate template;

    @KafkaListener(
            topics = KafkaConstants.KAFKA_TOPIC,
            groupId = KafkaConstants.GROUP_ID
    )
    public void listen(Message message) {
        System.out.println("sending via kafka listener..");
        template.convertAndSend("/topic/group", message);
    }
}

Enter fullscreen mode Exit fullscreen mode

在这个类中,@KafkaListener 注释了将监听 Kafka 队列消息的方法,
而 template.convertAndSend 将转换消息并将其发送到 WebSocket 主题。

接下来,我们需要配置Websocket来将消息发送到客户端系统。

Spring WebSocket 配置

WebSocket配置.java

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // chat client will use this to connect to the server
        registry.addEndpoint("/ws-chat").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic/");
    }
}
Enter fullscreen mode Exit fullscreen mode

接下来MessageMappingChatController.java

    @MessageMapping("/sendMessage")
    @SendTo("/topic/group")
    public Message broadcastGroupMessage(@Payload Message message) {
        //Sending this message to all the subscribers
        return message;
    }
Enter fullscreen mode Exit fullscreen mode

这将向所有订阅该主题的客户端广播该消息。

接下来我们来开发UI部分。

ReactJS 中的前端开发

我们将创建一个简单的聊天页面,其中包含消息列表和页面底部的文本字段,用于将消息发送到 Kafka 后端。

创建 React 应用

我们将使用 Create React App 来快速启动该应用程序。

npm install --g create-react-app
create-react-app chat-ui
cd chat-ui
Enter fullscreen mode Exit fullscreen mode

安装依赖项

  • axios
  • socketjs
  • 反应-stomp
  • 材质用户界面
npm install socketjs react-stomp material-ui axios
Enter fullscreen mode Exit fullscreen mode

您可以在此处参考material-ui的文档

npm start
Enter fullscreen mode Exit fullscreen mode

复制 CSS 样式

从这里复制 css 样式并将其粘贴到App.css文件中。

接下来,将以下更改添加到App.js

App.js

import React, { useState } from 'react';
import SockJsClient from 'react-stomp';
import './App.css';
import Input from './components/Input/Input';
import LoginForm from './components/LoginForm';
import Messages from './components/Messages/Messages';
import chatAPI from './services/chatapi';
import { randomColor } from './utils/common';


const SOCKET_URL = 'http://localhost:8080/ws-chat/';

const App = () => {
  const [messages, setMessages] = useState([])
  const [user, setUser] = useState(null)

  let onConnected = () => {
    console.log("Connected!!")
  }

  let onMessageReceived = (msg) => {
    console.log('New Message Received!!', msg);
    setMessages(messages.concat(msg));
  }

  let onSendMessage = (msgText) => {
    chatAPI.sendMessage(user.username, msgText).then(res => {
      console.log('Sent', res);
    }).catch(err => {
      console.log('Error Occured while sending message to api');
    })
  }

  let handleLoginSubmit = (username) => {
    console.log(username, " Logged in..");

    setUser({
      username: username,
      color: randomColor()
    })

  }

  return (
    <div className="App">
      {!!user ?
        (
          <>
            <SockJsClient
              url={SOCKET_URL}
              topics={['/topic/group']}
              onConnect={onConnected}
              onDisconnect={console.log("Disconnected!")}
              onMessage={msg => onMessageReceived(msg)}
              debug={false}
            />
            <Messages
              messages={messages}
              currentUser={user}
            />
            <Input onSendMessage={onSendMessage} />
          </>
        ) :
        <LoginForm onSubmit={handleLoginSubmit} />
      }
    </div>
  )
}

export default App;
Enter fullscreen mode Exit fullscreen mode

这里我们使用SocketJsCLientreact-stomp连接到WebSocket。

或者,您也可以使用 SockJS 来sockjs-client创建stompclient并连接到 WebSocket。

接下来,我们需要创建消息子组件来显示消息列表。

import React from 'react'

const Messages = ({ messages, currentUser }) => {

    let renderMessage = (message) => {
        const { sender, content, color } = message;
        const messageFromMe = currentUser.username === message.sender;
        const className = messageFromMe ? "Messages-message currentUser" : "Messages-message";
        return (
            <li className={className}>
                <span
                    className="avatar"
                    style={{ backgroundColor: color }}
                />
                <div className="Message-content">
                    <div className="username">
                        {sender}
                    </div>
                    <div className="text">{content}</div>
                </div>
            </li>
        );
    };

    return (
        <ul className="messages-list">
            {messages.map(msg => renderMessage(msg))}
        </ul>
    )
}


export default Messages
Enter fullscreen mode Exit fullscreen mode

LoginForm.js

import React, { useState } from 'react';
import TextField from '@material-ui/core/TextField';
import Button from '@material-ui/core/Button';

const LoginForm = ({ onSubmit }) => {

    const [username, setUsername] = useState("");
    let handleUserNameChange = event => setUsername(event.target.value);

    let handleSubmit = () => {
        onSubmit(username);
    }

    return (
        <div>
            <TextField
                label="Type your username"
                placeholder="Username"
                onChange={handleUserNameChange}
                margin="normal"
                onKeyPress={event => {
                    if (event.key === 'Enter') {
                        handleSubmit();
                    }
                }}
            />
            <br />
            <Button variant="contained" color="primary" onClick={handleSubmit} >
                Login
             </Button>

        </div>
    )
}

export default LoginForm
Enter fullscreen mode Exit fullscreen mode

最终结果

在多个窗口中打开应用程序,并在一个窗口中发送消息。所有其他浏览器窗口都应显示已发送的消息。

聊天应用程序

我们正在使用 SockJS 来监听从服务器端 WebSocket 发送的消息。

源代码

您可以在我的Github页面中找到完整的源代码

文章来源:https://dev.to/subhransu/realtime-chat-app-using-kafka-springboot-reactjs-and-websockets-lc
PREV
React + TypeScript + ESLint + Prettier 完整设置 ✈
NEXT
8 个你“必须尝试”的 JavaScript 动画库