当前位置:首页 > 编程笔记 > 正文
已解决

SpringBoot实现SSE构建实时数据单向推送

来自网友在路上 172872提问 提问时间:2023-11-21 23:31:47阅读次数: 72

最佳答案 问答题库728位专家为你答疑解惑

  • SSE 是一种单向通信,只允许服务器向客户端发送数据。客户端无法向服务器发送数据。
  • SSE 建立在 HTTP 协议之上,使用标准 HTTP 请求和响应。
  • SSE 不需要额外的库或协议处理,客户端可以使用浏览器的原生 EventSource API 来接收数据。
  • SSE 支持跨域通信,可以通过 CORS(跨域资源共享)机制进行配置。
  • SSE 在现代浏览器中也得到广泛支持,但与 WebSocket 相比,它的历史要长一些。

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** 数据管理器用于管理Server-Sent Events (SSE) 的订阅和数据推送。* @author xiaobo*/
@Component
public class DataManager {private final Map<String, List<SseEmitter>> dataEmitters = new HashMap<>();/*** 订阅特定数据类型的SSE连接。** @param dataType 要订阅的数据类型* @param emitter  SSE连接*/public void subscribe(String dataType, SseEmitter emitter) {dataEmitters.computeIfAbsent(dataType, k -> new ArrayList<>()).add(emitter);emitter.onCompletion(() -> removeEmitter(dataType, emitter));emitter.onTimeout(() -> removeEmitter(dataType, emitter));}/*** 推送特定数据类型的数据给所有已订阅的连接。** @param dataType 要推送的数据类型* @param data     要推送的数据*/public void pushData(String dataType, String data) {List<SseEmitter> emitters = dataEmitters.getOrDefault(dataType, new ArrayList<>());emitters.forEach(emitter -> {try {emitter.send(SseEmitter.event().data(data, MediaType.TEXT_PLAIN));} catch (IOException e) {removeEmitter(dataType, emitter);}});}private void removeEmitter(String dataType, SseEmitter emitter) {List<SseEmitter> emitters = dataEmitters.get(dataType);if (emitters != null) {emitters.remove(emitter);}}
}

import com.todoitbo.baseSpringbootDasmart.sse.DataManager;  
import org.springframework.http.MediaType;  
import org.springframework.http.ResponseEntity;  
import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;  import javax.annotation.Resource;  /**  * @author xiaobo  */
@RestController  
@RequestMapping("/environment")  
public class EnvironmentController {  @Resource    private DataManager dataManager;  @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)  public SseEmitter subscribe() {  SseEmitter emitter = new SseEmitter();  dataManager.subscribe("environment", emitter);  return emitter;  }  // 示例:推送环境监测数据给前端  @GetMapping("/push/{testText}")  public ResponseEntity<String> pushEnvironmentData(@PathVariable String testText) {  dataManager.pushData("environment", testText);  return ResponseEntity.ok("Data pushed successfully.");  }  
}

 如果没有数据产生会出现连接超时问题。

默认情况下,EventSource对象会自动重连,以保持连接的持久性。

第一次订阅SSE连接后,即使后端没有数据产生,之后也能接收到数据。这可以通过定期发送心跳数据

@Scheduled(fixedRate = 30000) // 每30秒发送一次心跳数据
public void sendHeartbeat() {dataManager.pushData("heartbeat", "Heartbeat data");
}

<!DOCTYPE html>
<html>
<head><title>SSE Data Receiver</title>
</head>
<body><h1>Real-time Data Display</h1><div id="data-container"></div><script>const dataContainer = document.getElementById('data-container');// 创建一个 EventSource 对象,指定 SSE 服务器端点的 URLconst eventSource = new EventSource('http://127.0.0.1:13024/environment/subscribe'); // 根据你的控制器端点来设置URLeventSource.onopen = function(event) {};// 添加事件处理程序,监听服务器端发送的事件eventSource.onmessage = (event) => {const data = event.data;// 在这里处理从服务器接收到的数据// 可以将数据显示在页面上或进行其他操作const newDataElement = document.createElement('p');newDataElement.textContent = data;dataContainer.appendChild(newDataElement);};eventSource.onerror = (error) => {// 处理连接错误console.error('Error occurred:', error);// 重新建立连接eventSource.close();setTimeout(() => {// 重新建立连接eventSource = new EventSource('/environment/subscribe');}, 1000); // 1秒后重试};</script>
</body>
</html>

 精简版后端

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;@RestController
@SpringBootApplication
public class SseApplication {private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();private final AtomicLong counter = new AtomicLong();public static void main(String[] args) {SpringApplication.run(SseApplication.class, args);}@GetMapping("/sse")public SseEmitter handleSse() {SseEmitter emitter = new SseEmitter();emitters.add(emitter);emitter.onCompletion(() -> emitters.remove(emitter));new Thread(() -> {try {for (int i = 0; i < 10; i++) {emitter.send(SseEmitter.event().id(String.valueOf(counter.incrementAndGet())).name("message").data("This is message " + i));Thread.sleep(1000);}emitter.complete();} catch (IOException | InterruptedException e) {emitter.completeWithError(e);}}).start();return emitter;}
}

查看全文

99%的人还看了

猜你感兴趣

版权申明

本文"SpringBoot实现SSE构建实时数据单向推送":http://eshow365.cn/6-41663-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!