背景
最近做一个物联网相关的项目,需要让服务器主动推送消息给客户端。简单的话可以在客户端进行ajax轮询,但对于服务器来说会有大量的无效请求,客户端一多还容易扑街,于是准备上WebSocket。
环境为Spring Boot。
思路
关于客户端
客户端连接上的时候,将该连接对象存入服务器内存中的Map集合中管理,key为客户端ID,value为该客户端的WebSocket连接对象。
由于WebSocket连接无法跨服务器,后期若扩展的话,想主动向指定客户端下发消息,只能通过中间件查询到该客户端的连接在哪台服务器后,再进行下发消息。
关于交互数据格式
在例如查询这种一应一答模式的操作中,依旧是走http请求,其他通知类的操作就走WebSocket。
交互的过程使用JSON格式的字符串,根据JSON字符串中type的值进行区分通知类型。如下则是一个初始化的通知:
{
"type":"INIT",
"client":"user"
}
项目依赖
<!-- https://mvnrepository.com/artifact/org.java-websocket/Java-WebSocket -->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.2</version>
</dependency>
实现
上面这个依赖可以帮我们更好的管理WebSocket消息,只需要继承WebSocketServer类即可监听各类事件:
public class Server extends WebSocketServer {
public Server(int port) {
super(new InetSocketAddress(port));
}
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
System.out.println("有人进来了");
}
@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
}
@Override
public void onMessage(WebSocket conn, String message) {
System.out.println("收到消息:" + message);
}
@Override
public void onError(WebSocket conn, Exception ex) {
}
@Override
public void onStart() {
}
public static void main(String[] args) throws IOException, InterruptedException {
Server server = new Server(8888);
server.start();
System.out.println("服务器端启动,端口号为:" + server.getPort());
BufferedReader sysin = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String in = sysin.readLine();
server.broadcast(in);
if (in.equals("exit")) {
server.stop(1000);
break;
}
}
}
}
主要有以下五个事件:
- onStart: 服务器启动事件
- onOpen: 连接打开事件
- onMessage: 接收消息事件
- onClose: 连接关闭事件
- onError: 连接出错事件
前面说到的连接管理,就可以在onOpen
、onClose
、onError
这三个事件中进行管理,在有客户端连接时将WebSocket对象存入Map,而连接关闭或者连接出错时可以将客户端剔除。
onMessage则是我们的重点,客户端发来的消息都会通过这个方法接收,接收的类型为String,因为一开始约定好了通过JSON字符串进行交互,所以可以写一个工具类,将message和Java类进行映射转换。
更优雅的使用
项目中肯定会存在不同的消息类型,如果全部挤到onMessage一个方法中,通过if/else的方式去判断类型进行处理,后期会显得臃肿难以维护。于是想到了SpringMVC的请求方式,把JSON数据中的type类比SpringMVC的uri,仿照@RequestMessage注解根据type路由到不同的处理方法。
消息类型枚举
这里存储了消息的类型,后面会将Handler都交给Spring管理,根据客户端传入的type值寻找对应的处理器。但为了避免出现Bean名字冲突的情况,故在getName方法中拼接自定义前缀,我这里用的是WebSocketBeanName:
。
public enum RequestMessageTypeEnum {
/**
* 初始化
*/
INIT();
/**
* 防止对应的Mapping Bean名字重复
* @return
*/
public String getBeanName(){
return "WebSocketBeanName:" + this.toString();
}
}
消息接收对象
简单起见只保留消息类型
@Data
public class RequestMessage {
/**
* 消息类型
*/
private RequestMessageTypeEnum messageType;
}
自定义注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface WebSocketMapping {
RequestMessageTypeEnum value();
}
自定义注解Bean配置别名
由于我们是用type进行路由,为了防止type的值与Spring中其他Bean名字发生冲突,这里给WebSocketMapping注解修饰的类起别名。
@Slf4j
@Component
public class WebSocketMappingRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
String[] ary = beanFactory.getBeanNamesForAnnotation(WebSocketMapping.class);
for (String beanName : ary) {
//通过Spring的beanName获取bean的类型
Class<?> cls = beanFactory.getType(beanName);
if (cls != null && cls.getAnnotations().length > 0) {
for (Annotation annotation : cls.getAnnotations()) {
if (annotation instanceof WebSocketMapping) {
WebSocketMapping webSocketMapping = (WebSocketMapping) annotation;
// 注册bean别名
beanFactory.registerAlias(beanName, webSocketMapping.value().getBeanName());
}
}
}
}
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
if (log.isDebugEnabled()) {
log.debug("postProcessBeanDefinitionRegistry");
}
}
}
配置完成后,我们就可以根据WebSocketBeanName:
+ Type找到对应的Handler。
路由类
@Component
@Slf4j
public class MessageRouter {
private final ObjectMapper objectMapper = new ObjectMapper();
private final ApplicationContext applicationContext;
public MessageRouter(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
/**
* 根据消息Type找到对应的处理器进行处理
* @param conn
* @param message
*/
public void router(WebSocket conn, String message){
try {
RequestMessage requestMessage = objectMapper.readValue(message, RequestMessage.class);
// 根据类型执行不同操作
MessageHandler handler = applicationContext.getBean(requestMessage.getMessageType().getBeanName(), MessageHandler.class);
handler.handle(conn, requestMessage);
} catch (JsonParseException e){
e.printStackTrace();
log.error("数据包格式错误:{}", message);
} catch(Exception e) {
e.printStackTrace();
}
}
}
到这里环境就都搭建完成了,然后是Handler相关。
Handler接口以及类的使用
定义接口:
public interface MessageHandler {
/**
* 消息时间处理器
* @param conn 该消息的WebSocket连接对象
* @param requestMessage 消息体
*/
void handle(WebSocket conn, RequestMessage requestMessage);
}
具体的消息处理类(以INIT类型消息为例):
@WebSocketMapping(RequestMessageTypeEnum.INIT)
@Slf4j
public class InitMessageHandler implements MessageHandler{
@Override
public void handle(WebSocket conn, RequestMessage requestMessage) {
log.info("接收到INIT类型的消息:{}", requestMessage);
}
}
本文由 visionki 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Apr 30, 2021 at 08:50 am