Java Server-Sent Event 服务端发送事件
实现后端单方面向前端发消息
1·后端提供订阅接口
import lombok.extern.slf4j.Slf4j;
import service.ServerSentEventService;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;import javax.annotation.Resource;/*** 用于提醒前端刷新列表*/
@Slf4j
@RestController
public class ServerSentEventController {@Resourceprivate ServerSentEventService serverSentEventService;@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamEvents() {Flux<String> flux = serverSentEventService.getFlux();log.info("flux :{}",flux);return flux;}}
import org.springframework.stereotype.Service;import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;@Slf4j
@Service
public class ServerSentEventService {private Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();public Flux<String> getFlux() {Flux<String> flux = sink.asFlux().onErrorResume(e -> {log.info("SSE-Flux出错: " + e.getMessage());log.info("重新创建flux。");sink = Sinks.many().multicast().onBackpressureBuffer();return sink.asFlux();}).doOnTerminate(() -> {log.info("SSE-客户端断开连接");log.info("重新创建Sink。");sink = Sinks.many().multicast().onBackpressureBuffer();}).startWith("服务器收到订阅");return flux;}public void sendMessage(String message) {sink.tryEmitNext(message);}
}
2前端订阅接口
const connect = () => {// eventSource = new EventSource('http://localhost/stream');eventSource = new EventSource('/stream');eventSource.onmessage = event => {console.log(`收到服务端消息:${event.data}`);//在这里对消息作处理};
}const closeConnection = () => {if (eventSource) {console.log('尝试关闭连接');eventSource.close();console.log('连接状态'+eventSource.readyState);}
};
3·后端发消息
//注入服务端发送事件service
@Resource
private ServerSentEventService serverSentEventService;//调用发消息方法serverSentEventService.sendMessage("%s_error_%s".formatted(requestUUID,"方案名不能为空。"));