CrazyAirhead

疯狂的傻瓜,傻瓜也疯狂——傻方能执著,疯狂才专注!

0%

Web 开发 —— 高阶 WebSocket 和 SSE

说明

随着大语言模型的流行,因为使用了 SSE 或者 Websocket的技术进行流式的交换,使得 SSE 和 Websocket也火了起来,今天我们来讲解,如何在 Solon 中实现 Websocket 服务端和 SSE 服务端。

Websocket

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

我们这里通过一个简易的 im 的服务做为例子。

依赖

如果使用了solon-web,默认使用的是smart-http,已经集成了 websocket,不需要添加其他的依赖。

插件 适配框架 包大小 信号协议支持 端口
solon-boot-smarthttp smart-http (aio) 0.4Mb http, ws 相同端口
solon-boot-jetty + solon-boot-jetty-add-websocket jetty (nio) 1.9Mb http, ws 相同端口
solon-boot-undertow undertow (nio) 4.3Mb http, ws 相同端口
solon-boot-websocket websocket (nio) 0.4Mb ws 独立端口
solon-boot-websocket-netty netty (nio) ws 独立端口

独立的 WebSocket 插件,会使用独立的端口,且默认为:主端口 + 10000,但都可以通过配置进行修改。

启用

在主类开启 Websocket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.example.demo.web;

import org.noear.solon.Solon;
import org.noear.solon.annotation.SolonMain;

/**
* @author airhead
*/
@SolonMain
public class DemoWeb05App {
public static void main(String[] args) {
Solon.start(
DemoWeb05App.class,
args,
app -> {
app.enableWebSocket(true);
});
}
}

开放端点

这里集成了 SimpleWebSocketListener,已经提供了基本的实现,可以根据实际的需要实现需要的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.example.demo.web.im.endpoint;

import com.example.demo.web.im.service.ImService;
import java.io.IOException;
import org.noear.solon.annotation.Inject;
import org.noear.solon.net.annotation.ServerEndpoint;
import org.noear.solon.net.websocket.WebSocket;
import org.noear.solon.net.websocket.listener.SimpleWebSocketListener;

/**
* @author airhead
*/
@ServerEndpoint("/im.ws")
public class ImWebsocket extends SimpleWebSocketListener {
@Inject private ImService service;

@Override
public void onOpen(WebSocket socket) {
service.onOpen(socket);
}

@Override
public void onMessage(WebSocket socket, String text) throws IOException {
service.onMessage(socket, text);
}
}

实现逻辑

此处只是做简单的鉴权,和消息的回复,如果要实现完整的 IM 服务,还需要做会话的管理的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.example.demo.web.im.service;

import java.io.IOException;
import org.dromara.hutool.core.text.StrUtil;
import org.noear.solon.annotation.Component;
import org.noear.solon.net.websocket.WebSocket;

/**
* @author airhead
*/
@Component
public class ImService {
public void onOpen(WebSocket socket) {
String token = socket.param("token");
if (StrUtil.isBlank(token)) {
socket.close();
}

// 省略了管理 socket 的管理
}

public void onMessage(WebSocket socket, String text) throws IOException {
socket.send("> " + text + "\r\n" + "消息已阅");
}
}

验证

img

SSE

SSE 全称是 Server-Sent Event,网页自动获取来自服务器的更新,SSE 是单向消息传递。

我们这里通过一个简易的 LLM 的服务做为例子。

依赖

要实现 SSE 需要引入 solon-web-sse。

1
2
3
4
5
6
7
dependencies {
implementation platform(project(":demo-parent"))

implementation("org.noear:solon-web")
implementation("org.noear:solon-web-sse")

}

开放端点

客户端先要调用 open 方法,初始化 SseEmitter 连接,之后就可以通过 SseEmitter 给客户端发送消息了,为了方便测试这里还提供了 send 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example.demo.web.chat.controller;

import com.example.demo.web.chat.service.ChatService;
import org.noear.solon.annotation.Controller;
import org.noear.solon.annotation.Inject;
import org.noear.solon.annotation.Mapping;
import org.noear.solon.web.sse.SseEmitter;

/**
* @author airhead
*/
@Controller
@Mapping("/chat")
public class ChatController {
@Inject private ChatService service;

@Mapping(value = "/open/{id}")
public SseEmitter open(String id) {
return service.open(id);
}

@Mapping("/send/{id}")
public String send(String id) {
return service.send(id);
}

@Mapping("/close/{id}")
public String close(String id) {
return service.close(id);
}
}

实现逻辑

与 Websocket 不同的是,SSE 是单向连接,请求的过程是不带会话连接的,所以需要自己管理好会话。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.example.demo.web.chat.service;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.noear.snack.ONode;
import org.noear.solon.Utils;
import org.noear.solon.annotation.Component;
import org.noear.solon.core.handle.Result;
import org.noear.solon.web.sse.SseEmitter;
import org.noear.solon.web.sse.SseEvent;

/**
* @author airhead
*/
@Component
@Slf4j
public class ChatService {
static Map<String, SseEmitter> emitterMap = new HashMap<>();

public SseEmitter open(String id) {
SseEmitter sseEmitter =
new SseEmitter(60 * 1000L)
.onCompletion(() -> emitterMap.remove(id))
.onError(e -> log.error("初始化 sse 错误", e))
.onInited(s -> emitterMap.put(id, s));

try {
sseEmitter.send("Ok");
} catch (IOException e) {
log.error("发送 sse 错误", e);
throw new RuntimeException(e);
}

// 初始化后,才能使用
return sseEmitter;
}

public String send(String id) {
SseEmitter emitter = emitterMap.get(id);
if (emitter == null) {
return "No user: " + id;
}

String msg = "test msg -> " + System.currentTimeMillis();
System.out.println(msg);
try {
emitter.send(msg);
// reconnectTime 用于提示前端重连时间
emitter.send(new SseEvent().id(Utils.guid()).data(msg).reconnectTime(1000L));
emitter.send(ONode.stringify(Result.succeed(msg)));
} catch (IOException e) {
log.error("发送 sse 错误", e);
throw new RuntimeException(e);
}

return "Ok";
}

public String close(String id) {
SseEmitter emitter = emitterMap.get(id);
if (emitter != null) {
emitter.complete();
}

return "Ok";
}
}

验证

ApiFox 支持测试 SSE,连接成功之后就可以调用 send 方法。

img

也可以直接使用 curl 进行测试

img

欢迎关注我的其它发布渠道