JAVA实现WebSocket服务器
in 默认分类 with 0 comment

JAVA实现WebSocket服务器

in 默认分类 with 0 comment

背景

最近做一个物联网相关的项目,需要让服务器主动推送消息给客户端。简单的话可以在客户端进行ajax轮询,但对于服务器来说会有大量的无效请求,客户端一多还容易扑街,于是准备上WebSocket。

环境为Spring Boot。

思路

关于客户端

客户端连接上的时候,将该连接对象存入服务器内存中的Map集合中管理,key为客户端ID,value为该客户端的WebSocket连接对象。

由于WebSocket连接无法跨服务器,后期若扩展的话,想主动向指定客户端下发消息,只能通过中间件查询到该客户端的连接在哪台服务器后,再进行下发消息。

关于交互数据格式

在例如查询这种一应一答模式的操作中,依旧是走http请求,其他通知类的操作就走WebSocket。
交互的过程使用JSON格式的字符串,根据JSON字符串中type的值进行区分通知类型。如下则是一个初始化的通知:

{
    "type":"INIT",
    "client":"user"
}

项目依赖

<!-- https://mvnrepository.com/artifact/org.java-websocket/Java-WebSocket -->
<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.5.2</version>
</dependency>

实现

上面这个依赖可以帮我们更好的管理WebSocket消息,只需要继承WebSocketServer类即可监听各类事件:

public class Server extends WebSocketServer {

    public Server(int port) {
        super(new InetSocketAddress(port));
    }

    @Override
    public void onOpen(WebSocket conn, ClientHandshake handshake) {
        System.out.println("有人进来了");
    }

    @Override
    public void onClose(WebSocket conn, int code, String reason, boolean remote) {

    }

    @Override
    public void onMessage(WebSocket conn, String message) {
        System.out.println("收到消息:" + message);
    }

    @Override
    public void onError(WebSocket conn, Exception ex) {

    }

    @Override
    public void onStart() {

    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = new Server(8888);
        server.start();
        System.out.println("服务器端启动,端口号为:" + server.getPort());
        BufferedReader sysin = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String in = sysin.readLine();
            server.broadcast(in);
            if (in.equals("exit")) {
                server.stop(1000);
                break;
            }
        }
    }
}

主要有以下五个事件:

前面说到的连接管理,就可以在onOpenonCloseonError这三个事件中进行管理,在有客户端连接时将WebSocket对象存入Map,而连接关闭或者连接出错时可以将客户端剔除。

onMessage则是我们的重点,客户端发来的消息都会通过这个方法接收,接收的类型为String,因为一开始约定好了通过JSON字符串进行交互,所以可以写一个工具类,将message和Java类进行映射转换。

更优雅的使用

项目中肯定会存在不同的消息类型,如果全部挤到onMessage一个方法中,通过if/else的方式去判断类型进行处理,后期会显得臃肿难以维护。于是想到了SpringMVC的请求方式,把JSON数据中的type类比SpringMVC的uri,仿照@RequestMessage注解根据type路由到不同的处理方法。

消息类型枚举

这里存储了消息的类型,后面会将Handler都交给Spring管理,根据客户端传入的type值寻找对应的处理器。但为了避免出现Bean名字冲突的情况,故在getName方法中拼接自定义前缀,我这里用的是WebSocketBeanName:

public enum RequestMessageTypeEnum {
    /**
     * 初始化
     */
    INIT();

    /**
     * 防止对应的Mapping Bean名字重复
     * @return
     */
    public String getBeanName(){
        return "WebSocketBeanName:" + this.toString();
    }
}

消息接收对象

简单起见只保留消息类型

@Data
public class RequestMessage {
    /**
     * 消息类型
     */
    private RequestMessageTypeEnum messageType;
}

自定义注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface WebSocketMapping {
    RequestMessageTypeEnum value();
}

自定义注解Bean配置别名

由于我们是用type进行路由,为了防止type的值与Spring中其他Bean名字发生冲突,这里给WebSocketMapping注解修饰的类起别名。

@Slf4j
@Component
public class WebSocketMappingRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor {

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        String[] ary = beanFactory.getBeanNamesForAnnotation(WebSocketMapping.class);
        for (String beanName : ary) {
            //通过Spring的beanName获取bean的类型
            Class<?> cls = beanFactory.getType(beanName);
            if (cls != null && cls.getAnnotations().length > 0) {
                for (Annotation annotation : cls.getAnnotations()) {
                    if (annotation instanceof WebSocketMapping) {
                        WebSocketMapping webSocketMapping = (WebSocketMapping) annotation;
                        // 注册bean别名
                        beanFactory.registerAlias(beanName, webSocketMapping.value().getBeanName());
                    }
                }
            }
        }
    }

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        if (log.isDebugEnabled()) {
            log.debug("postProcessBeanDefinitionRegistry");
        }
    }
}

配置完成后,我们就可以根据WebSocketBeanName: + Type找到对应的Handler。

路由类

@Component
@Slf4j
public class MessageRouter {

    private final ObjectMapper objectMapper = new ObjectMapper();

    private final ApplicationContext applicationContext;

    public MessageRouter(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    /**
     * 根据消息Type找到对应的处理器进行处理
     * @param conn
     * @param message
     */
    public void router(WebSocket conn, String message){
        try {
            RequestMessage requestMessage = objectMapper.readValue(message, RequestMessage.class);
            // 根据类型执行不同操作
            MessageHandler handler = applicationContext.getBean(requestMessage.getMessageType().getBeanName(), MessageHandler.class);
            handler.handle(conn, requestMessage);
        } catch (JsonParseException e){
            e.printStackTrace();
            log.error("数据包格式错误:{}", message);
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
}

到这里环境就都搭建完成了,然后是Handler相关。

Handler接口以及类的使用

定义接口:

public interface MessageHandler {

    /**
     * 消息时间处理器
     * @param conn 该消息的WebSocket连接对象
     * @param requestMessage 消息体
     */
    void handle(WebSocket conn, RequestMessage requestMessage);
}

具体的消息处理类(以INIT类型消息为例):

@WebSocketMapping(RequestMessageTypeEnum.INIT)
@Slf4j
public class InitMessageHandler implements MessageHandler{

    @Override
    public void handle(WebSocket conn, RequestMessage requestMessage) {
        log.info("接收到INIT类型的消息:{}", requestMessage);
    }
}
Responses