WebSocket服务器(物联网下行通知神器)

HttpServer是一个轻量级Web服务器,用于在嵌入式设备以及客户端环境中提供简单Web服务,同时也支持标准WebSocket服务。

WebSocket服务器(物联网下行通知神器)

本文例程基于vs2022,基础例程可参考:https://www.yuque.com/smartstone/nx/httpserver

WebSocket服务端

WebSocket服务端功能由HttpServer提供,只是映射到WebSocket特有的处理器上。

vs2022新建.NET6.0控制台项目,Nuget引用 NewLife.Core,使用以下例程:

using NewLife.Http;
using NewLife.Log;
using System;


XTrace.UseConsole();


var server = new HttpServer
{
    Port = 8080,
    Log = XTrace.Log,
    SessionLog = XTrace.Log
};
server.Map("/ws", new MyWebSocket());
server.Start();


Console.ReadLine();


class MyWebSocket : IHttpHandler
{
    /// 处理请求
    /// 
    public virtual void ProcessRequest(IHttpContext context)
    {
        var ws = context.WebSocket;
        ws.Handler = ProcessMessage;


        var source = new CancellationTokenSource();
        Task.Run(() => consumeMessage(ws, "nodeCode", source));


        WriteLog("WebSocket连接 {0}", context.Connection.Remote);
    }


    /// 处理消息
    /// 
    /// 
    public virtual void ProcessMessage(WebSocket socket, WebSocketMessage message)
    {
        var remote = socket.Context.Connection.Remote;
        var msg = message.Payload?.ToStr();
        switch (message.Type)
        {
            case WebSocketMessageType.Text:
                WriteLog("WebSocket收到[{0}] {1}", message.Type, msg);
                // 群发所有客户端
                socket.SendAll($"[{remote}]说,{msg}");
                break;
            case WebSocketMessageType.Close:
                WriteLog("WebSocket关闭[{0}] [{1}] {2}", remote, message.CloseStatus, message.StatusDescription);
                break;
            case WebSocketMessageType.Ping:
            case WebSocketMessageType.Pong:
                WriteLog("WebSocket心跳[{0}] {1}", message.Type, msg);
                break;
            default:
                WriteLog("WebSocket收到[{0}] {1}", message.Type, msg);
                break;
        }
    }


    private void WriteLog(String format, params Object[] args) => XTrace.WriteLine(format, args);
}

映射路由/ws到一个自定义处理器MyWebSocket上,该处理器包括了 ProcessRequest 和 ProcessMessage 。

  • ProcessRequest。收到WebSocket请求时触发一次,此时可验证访问者是否合法,例如借助JWT等Token技术。Handler属性设置为ProcessMessage,用于处理后续WebSocket消息。

  • ProcessMessage。建立WebSocket握手后,每次收到WebSocket消息(数据帧),都将调用该方法,包括二进制、文本、心跳和断开等多种消息类型。

  • Send。发送消息给客户端。

  • SendAll。群发消息给所有客户端。

  • Close。关闭连接。

跑起来:

WebSocket服务器(物联网下行通知神器)

可以看到,仍然是普通HttpServer监听8080端口。保持打开,不要关闭,下面客户端测试需要用到

WebClient客户端

借助.NET自身的ClientWebSocket,可以轻松构建WebSocket通信。

vs2022新建.NET6.0控制台项目,Nuget引用 NewLife.Core,使用以下例程:

using NewLife;
using NewLife.Data;
using NewLife.Log;
using System;
using System.Net.WebSockets;


XTrace.UseConsole();


var client = new ClientWebSocket();
await client.ConnectAsync(new Uri("ws://127.0.0.1:8080/ws"), default);
await client.SendAsync("Hello NewLife".GetBytes(), WebSocketMessageType.Text, true, default);


var buf = new Byte[1024];
var rs = await client.ReceiveAsync(buf, default);
XTrace.WriteLine(new Packet(buf, 0, rs.Count).ToStr());


await client.CloseAsync(WebSocketCloseStatus.NormalClosure, "通信完成", default);
XTrace.WriteLine("Close [{0}] {1}", client.CloseStatus, client.CloseStatusDescription);


Console.ReadLine();

建立到服务端的连接后,向服务端发送字符串“Hello NewLife”,然后使用1024缓冲区接收一次响应数据,接着友好断开连接。

跑起来:

WebSocket服务器(物联网下行通知神器)

查看服务端:

WebSocket服务器(物联网下行通知神器)

可以看到,服务端ProcessRequest收到了客户端的WebSocket连接请求。两次ProcessMessage,第一次收到Text数据帧,也就是文本“Hello NewLife”,第二次是Close数据帧。

客户端也收到了服务端SendAll群发的数据,感兴趣的同学可以多开几个客户端试试。

物联网平台中使用

在物联网平台中,设备与服务端建立WebSocket长连接后,可以实时下发通知。

我们使用消息队列架构,如果队列中有消息,则通过WebSocket推给设备端。

消息大循环结合WebSocket如下:

    private async Task consumeMessage(WebSocket socket, String node, CancellationTokenSource source)
    {
        var cancellationToken = source.Token;
        var queue = QueueHost.GetQueue($"cmd:{node}");
        try
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var msg = await queue.TakeOneAsync(10_000);
                if (msg != null)
                {
                    XTrace.WriteLine("WebSocket发送 {0} {1}", node, msg);


                    socket.Send(msg.GetBytes(), WebSocketMessageType.Text);
                }
                else
                {
                    await Task.Delay(1_000, cancellationToken);
                }
            }
        }
        catch (Exception ex)
        {
            XTrace.WriteException(ex);
        }
        finally
        {
            source.Cancel();
        }
    }

再修改 ProcessRequest ,握手后异步启动大循环:

    /// 处理请求
    /// 
    public virtual void ProcessRequest(IHttpContext context)
    {
        var ws = context.WebSocket;
        ws.Handler = ProcessMessage;


        var source = new CancellationTokenSource();
        Task.Run(() => consumeMessage(ws, "nodeCode", source));


        WriteLog("WebSocket连接 {0}", context.Connection.Remote);
    }

借助Redis消息队列,每个设备一个Topic,对应一个WebSocket连接和消费大循环。

注:以上代码来自星尘 StarServer。

本文章来源于互联网,如有侵权,请联系删除!

相关推荐: 适合于物联网通信的FEC纠错码

在物联网通信中,经常遇到数据传输成功率的问题,这时候使用FEC纠错码,即可完美实现成功率100%。 感谢Luigi Rizzo大神开源了FEC的代码,这是一种基于前向纠错的编码技术(又称RS-FEC),通过在原始传输码中加入冗余纠错码,对信息进行特定的编码处理…