上文介绍了Signal的基本知识,接下来介绍一下SignalR里面对象的生命周期以及流式处理,这两个比较重要的概念在前文开头的树莓派监控例子中有用到并且比较重要。

功能设计

   树莓派监控PiMonitR这个项目包含了几部分内容,首先是如何获取摄像头数据流,然后通过SignalR,传输到客户端展示拍到的内容,并且客户单能够控制开始监控和停止,本文将实现这一功能。界面设计如下:

▲ 摄像头监控页面

     界面如图所示,有两个按钮,Start Streaming点击之后,就可以通过流式数据传输从服务端传输到页面上来,Stop Stream会停止数据传输。下面是一个image标签,收到传输过来的流式数据之后,将image标签的src属性设置为接收到的数据。

       在 树莓派监控PiMonitR 例子中,使用了MMALSharp - Unofficial C# API for the Raspberry Pi camera. 这个第三方类库来获取树莓派官方摄像头的数据。我这里因为是在Windows平台上开发调试的,所以使用了OpenCVSharp3这个类库来获取本机摄像头数据,因为是台式机,所以这里使用了模拟摄像头WeCam,这些在后文会详细介绍。

SignalR对象的生命周期

   SignalR中最重要的对象就是Hub,当客户端JavaScript调用Hub里的方法时,每一次调用,都会实例化一个新的Hub。从本文的例子中,下面这个是获取摄像头图片的Hub类。

public CameraStreamHub(CameraService cameraSer)
{
    cameraService = cameraSer;
    Console.WriteLine("Guid:" + Guid.NewGuid().ToString());
}

    为了说明这一现象,在CameraStreamHub的构造函数中添加Guid打印,会发现在页面上只要通过JavaScript调用Hub里的方法,Guid打印出来的都不一样,这表示每一次调用CameraStreamHub就会实例化一次。

   这就存在一个问题,类似HTTP是无状态的那样,当一个用户打开我们的摄像头监控主页时,点击开始按钮后,新建了一个CameraStreamHub实例,他想要关闭,却发现关闭不了,因为点击关闭的时候,内部其实是又新建了一个CameraStreamHub的实例,跟之前点击开始的CameraStreamHub不是同一个对象。另外,如果是多个用户,同时打开了这一个主页,我们还需要区分,以及保存状态,SignalR的无状态使得要实现这些功能变得困难,要实现这一功能需要使用到SignalR的用户体系。

SignalR的用户体系

   SignalR本身是无状态的,如何保存用户当前的操作状态和数据呢,那么有一种方法是通过用户来进行。SignalR中的单个用户,可以有多个连接,例如,用户可以连接到桌面设备,以及移动设备上。每一个设备都有独立的SignalR连接,这些连接与相同的用户关联。如果某个消息要发给这个用户,那么所有跟这个用户相连接的设备都会接收到这条消息。一个连接对应的用户可以在Hub的Context.UserIdentifier属性中获得。

   默认情况下SignalR使用ClaimsPrincipal中的ClaimTypes.NameIdentifier来跟一个连接关联,作为某个用户的标识,具体信息可以查看 Use claims to customize identity handling 这篇文章,个人感觉这个跟ASP.NET Core里面的用户验证是一样的。

   SignalR中除了用户之外,还有用户组,组是一些列具有用户名的连接的集合。给组发送消息,组内的所有用户连接都能收到。如果要将消息发送给某个连接或者多个连接,建议使用组来发送,因为组是由SignalR来管理的。某个用户可以存在于多个组,类似聊天程序,某个聊天室就是一个组,这种情况下,使用组非常合适。可以通过一下方法来将用户加入和删除到某个。

public async Task AddToGroup(string groupName)
{
    await Groups.AddToGroupAsync(Context.ConnectionId, groupName);
    await Clients.Group(groupName).SendAsync("Send", $"{Context.ConnectionId} has joined the group {groupName}.");
}

public async Task RemoveFromGroup(string groupName)
{
    await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName);
    await Clients.Group(groupName).SendAsync("Send", $"{Context.ConnectionId} has left the group {groupName}.");
}

   可以看到,上面是将ConnectionId连接加入到某个组里的,更多详细信息,可以查看MSDN说明文档,Manage users and groups in SignalR

   在本文例子中,我们没有用到验证,所以没有用到用户,用到的是Context.ConnectionId 连接。对于客户端要与服务端的Hub进行通讯,必须要发起一个连接,连接成功之后,就会分配一个ConnectionId,在连接之后的所有操作,都是用这同一个ConnectionId,而不论Hub对象实例化多少次。这正是我们需要的结果。所以只需要把状态保存到Hub对象内部的静态字段中,以ConnectionId作为key即可把状态保存下来。  

   在本例中,还用到了流式处理,有点跟流媒体那样,详细可以查看 Use streaming in ASP.NET Core SignalR

SignalR的流式处理

   在前面的例子中,Hub里面的方法像客户端发送数据时,数据都是准备好了的。但是有些时候,服务端的数据并没有全部准备好,有时候只是准备或者来了一部分,使用流式处理(Streaming)就能够当数据到来时就能立即发送给客户端。而不用等待全部数据的到来。在本例中,我们从摄像头获取的数据是一帧一帧的,并不是全部的数据,所以使用流处理非常合适。为了获取摄像头线信息,并将数据能够推送到客户端,我们在项目下新建了Hubs文件夹,创建了名为CameraStreamHub的Hub类,内容如下:

public class CameraStreamHub : Hub
{
    public static Dictionary<string, bool> isStreamRunning = new Dictionary<string, bool>();
    private readonly CameraService cameraService;

    public CameraStreamHub(CameraService cameraSer)
    {
        cameraService = cameraSer;
        Console.WriteLine("Guid:" + Guid.NewGuid().ToString());
    }

    public ChannelReader<object> StartStream(CancellationToken cancellationT)
    {
        if (isStreamRunning.ContainsKey(Context.ConnectionId))
        {
            isStreamRunning[Context.ConnectionId] = true;
        }
        else
        {
            isStreamRunning.Add(Context.ConnectionId, true);
        }
        Console.WriteLine("Start ConnectionId:" + Context.ConnectionId.ToString());
        var channel = Channel.CreateUnbounded<object>();
        cameraService.Start();
        _ = WriteItemsAsync(channel.Writer, cancellationT);
        return channel.Reader;
    }

    private async Task WriteItemsAsync(ChannelWriter<object> writer, CancellationToken cancellationT)
    {
        try
        {
            bool isRunning;
            isStreamRunning.TryGetValue(Context.ConnectionId, out isRunning);
            while (isRunning)
            {
                cancellationT.ThrowIfCancellationRequested();
                byte[] b = cameraService.CapturePictureAsBytesArray();
                if (b != null)
                {
                    await writer.WriteAsync(b);
                }
                await Task.Delay(10, cancellationT);
                isStreamRunning.TryGetValue(Context.ConnectionId, out isRunning);
            }
        }
        catch (Exception ex)
        {
            writer.TryComplete(ex);
        }
        writer.TryComplete();
    }

    public void StopStream()
    {
        if (isStreamRunning.ContainsKey(Context.ConnectionId))
        {
            isStreamRunning[Context.ConnectionId] = false;
            Console.WriteLine("Stop ConnectionId:" + Context.ConnectionId.ToString());
            Clients.Caller.SendAsync("StopStream");
        }
    }
}

   跟普通的Hub一样,CameraStreamHub继承自Hub,在构造函数中,注入了CameraService类,这个类用来获取摄像头信息。目前这个类是获取本机摄像头数据,如果是笔记本就是笔记本摄像头信息,将来要部署在树莓派上的话,这里CameraService可以抽象成接口,或者重写一下,可以直接参考 树莓派监控PiMonitR 里的代码。我这里是台式机,所以用了虚拟摄像头。这个类的详细信息在下部分讲。

  该类中,定义了一个Dictionary用来保存ConnectionId对应的状态,比如开始和停止,注意,这里一定要用静态方法,才能将状态持久化,还可以通过依赖注入实现(我没试过)。这种静态方法以支持同时开多个不同的客户端分别控制各自的状态。

   Hub里面有两个对外提供的方法,最主要的是StartStream这个Streaming方法,他的返回值是ChannelReader,需要注意的是,这里一定要是ChannelReader(还可以是IAsyncEnumerable<T> )才能表示是Streaming,在StartStream方法中,首先判断是否包含ConnectionId如果包含状态更新为开始,如果没有,添加并设置为开始。

   接下来,使用Channel.CreateUnbounded<object>()创建一个能写入数据的channel对象,往Channel.Writer里面写入数据,就能将写入的数据马上推送到连接的客户端。接下来我们开启摄像头方法,最重要的是接下来的WriteItemsAsync方法。

   在该方法中,循环判断该ConnectionId是否已经开始,再判断是否中断,接下来,通过CameraService来获取一帧数据,如果数据不为空,则通过WriteAsync方法写入到ChannelWriter对象中。

   接下来暂停一段时间,继续循环判断。一旦用户终止或出错,则调用Channel.Writer对象的TryComplete对象表示本次Streaming结束。

   StopStream方法是一个普通的Hub方法,将该ConnectionId的连接状态置为false,并像该对象回调StopStream事件。

   最后,在Startup中,进行注入:

services.AddSignalR();
services.AddSingleton<CameraService>();

   以及进行映射: 

app.UseEndpoints(endpoints =>
{
    ...
    endpoints.MapHub<CameraStreamHub>("/CameraR");
});

CameraService的代码如下,我这里用的是OpenCVSharp3 这个类库,以及VCam虚拟摄像头程序来模拟台式机的摄像头。CameraService的代码如下:

public class CameraService
{
    private Mat latImage;
    private object imagelock = new object();
    private bool IsRuning;
    Thread t;
    public void Start()
    {
        IsRuning = true;
        if (t == null)
        {
            t = new Thread(ImageCapture);
            t.IsBackground = true;
            t.Start();
        }
    }

    public void Stop()
    {
        IsRuning = false;
        Thread.Sleep(100);
        try
        {
            t.Join();
        }
        catch (Exception)
        {
        }
        t = null;
    }

    /// <summary>
    /// 
    /// </summary>
    /// <param name="obj"></param>
    private void ImageCapture(object obj)
    {
        VideoCapture capture = null;
        Mat image = null;
        try
        {
            capture = new VideoCapture(0);//  //using (Window window = new Window("Camera"))
            image = new Mat(); // Frame image buffer
            {
                // When the movie playback reaches end, Mat.data becomes NULL.
                while (IsRuning)
                {
                    capture.Read(image); // same as cvQueryFrame
                    if (image.Empty()) break;
                    lock (imagelock)
                    {
                        latImage = image;
                        //window.ShowImage(image);
                        //Cv2.WaitKey(30);
                    }
                    Thread.Sleep(10);
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        finally
        {
            image.Dispose();
            capture.Dispose();
        }
    }

    public byte[] CapturePictureAsBytesArray()
    {
        return latImage == null ? null : latImage.ToBytes();
    }

    public Stream CapturePictureStream()
    {
        return latImage == null ? null : new MemoryStream(latImage.ToBytes());
    }
}

   主要逻辑在开启一个后台线程,每个一段时间,通过ImageCapture方法获取第一个摄像头的最新的一帧数据,存储下来,当外部调用CapturePictureAsBytesArray方法是,将最新一帧的图形转换为二进制或者Stream传出来。

  WeCam虚拟摄像头程序,可以在台式机电脑上虚拟出一个摄像头,可以自定义摄像头的内容,整个软件的操作界面如下:

   

▲WeCam虚拟摄像头界面,可以通过添加视频内容来自定义虚拟摄像头内容,图中播放的是电影《明日世界》

   至此,所有服务端的逻辑写完了。接下来看客户端如何使用JavaScript或者.NET Core Client跟Streaming的Hub进行交互。

SignalR Streaming客户端

   JavaScript客户端

   跟上篇一样,客户端可以使用JavaScript和.NET Client以及Java(我不会),首先看下在JavaScript中如何跟Streaming的Hub进行交易。在View\Home\Index.cshtml的内容如下:

@{
    ViewData["Title"] = "Home Page";
}

<div class="text-center">
    <h1 class="display-4">Welcome</h1>
    <p>Learn about <a href="https://docs.microsoft.com/aspnet/core">building Web apps with ASP.NET Core</a>.</p>

    <div>
        <input type="button" id="streamStartButton" value="Start Streaming" />
        <input type="button" id="streamStopButton" value="Stop Streaming" disabled />
    </div>
    <ul id="logContent"></ul>
    <img id="streamContent" width="800" height="600" src="" />
</div>

<script src="~/lib/jquery/dist/jquery.js"></script>
<script src="~/lib/microsoft-signalr/signalr.js"></script>
<script src="~/js/CameraStream.js"></script>

    这个界面就是本文开头部分的界面,内容很简单,两个按钮分别用来控制开始和停止,一个Image用来显示收到的数据帧。在内容底部,引入了signalr.js官方js文件,自定义了一个CameraStream.js文件,主要逻辑在这个文件里,我们在wwwroot下新建js目录,然后在该目录下添加一个CameraStream.js的文件,内容如下:

"use strict";

var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
    return new (P || (P = Promise))(function (resolve, reject) {
        function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
        function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
        function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
        step((generator = generator.apply(thisArg, _arguments || [])).next());
    });
};
var connection = new signalR.HubConnectionBuilder().withUrl("/CameraR").build();

(() => __awaiter(this, void 0, void 0, function* () {
    try {
        yield connection.start();
    }
    catch (e) {
        console.error(e.toString());
    }
}))();

const streamStartButton = document.getElementById('streamStartButton');
const streamStopButton = document.getElementById('streamStopButton');
const streamContent = document.getElementById('streamContent');
const logContent = document.getElementById('logContent');

document.addEventListener('DOMContentLoaded', function () {
    streamStartButton.addEventListener("click", (event) => __awaiter(this, void 0, void 0, function* () {
        streamStartButton.setAttribute("disabled", "disabled");
        streamStopButton.removeAttribute("disabled");
        try {
            connection.stream("StartStream")
                .subscribe({
                    next: (item) => {
                        streamContent.src = "data:image/jpg;base64," + item;
                    },
                    complete: () => {
                        var li = document.createElement("li");
                        li.textContent = "Stream completed";
                        logContent.appendChild(li);
                    },
                    error: (err) => {
                        var li = document.createElement("li");
                        li.textContent = err;
                        logContent.appendChild(li);
                    },
                });
        }
        catch (e) {
            console.error(e.toString());
        }
        event.preventDefault();
    }));

    streamStopButton.addEventListener("click", function () {
        streamStopButton.setAttribute("disabled", "disabled");
        streamStartButton.removeAttribute("disabled");
        connection.invoke("StopStream").catch(err => console.error(err.toString()));
        event.preventDefault();
    });

    connection.on("StopStream", () => {
        var li = document.createElement("li");
        li.textContent = "stream closed";
        logContent.appendChild(li);
        streamStopButton.setAttribute("disabled", "disabled");
        streamStartButton.removeAttribute("disabled");
    });
});

    _awaiter函数是我抄的树莓派监控PiMonitR ,我们定义了connection,因为是本机,所以不用谢域名信息,在改函数中用 yield 关键字来开启对服务端的连接。这里面最主要的是StreamStartButton的Click事件,可以看到,这里跟之前调用一般的Hub里的方法直接用invoke不同,调用Streaming方法,要使用steam方法,参数为函数方法名“StartStream”接下来使用".subscribe"方法,在next函数里面将收到的内容设置为image的src属性。

   streamStopButton的click事件,就是通过invoke来调用普通的Hub方法, connection.On("StopStream")接收Hub里的SendAsync的回调,跟前文的方式一样这里就不赘述了。

   现在JavaScript的客户单就做好了。

.NET 客户端

  跟上篇一样,我也写了.NET WinForm的展现方式,新建一个Windows Form应用程序,添加对“Microsoft.AspNetCore.SignalR.Client”包的引用,新建一个Form窗体,然后添加一个Picture控件,以及Start,Stop按钮,分别用来开始Stream,停止Stream。

public partial class Form1 : Form
{
    HubConnection connection;
    public Form1()
    {
        InitializeComponent();
        connection = new HubConnectionBuilder().WithUrl("https://localhost:44303/CameraR", options =>
        {
            //options.AccessTokenProvider = () => Task.FromResult("123456");
        }).Build();
        connection.Closed += async (error) => {

            await Task.Delay(new Random().Next(0, 5) * 1000);
            await connection.StartAsync();
        };
        connection.StartAsync();
    }

    private async void btnStart_Click(object sender, EventArgs e)
    {
        var cancellationTokenSource = new CancellationTokenSource();
        var channel = await connection.StreamAsChannelAsync<object>("StartStream", cancellationTokenSource.Token);

        // Wait asynchronously for data to become available
        while (await channel.WaitToReadAsync())
        {
            // Read all currently available data synchronously, before waiting for more data
            while (channel.TryRead(out var count))
            {
                byte[] b = Convert.FromBase64String(count.ToString());
                if (b != null && b.Length > 0)
                    this.BeginInvoke((MethodInvoker)delegate {
                        using (var ms = new MemoryStream(b, 0, b.Length))
                        {
                            pictureBox1.Image = Image.FromStream(ms, true);
                        }
                    });
            }
        }
        Console.WriteLine("Streaming completed");
    }

    private async void btnStop_Click(object sender, EventArgs e)
    {
        try
        {
            await connection.InvokeAsync("StopStream");
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
    }
}

    代码也很简单,在构造函数里初始化和开始连接。在Start方法里面里面,通过“connection.StreamAsChannelAsync<object>("StartStream", cancellationTokenSource.Token)”来获取channel对象,然后循环读取channel里面的内容,读取完成之后,将获取到的二进制转换为byte数组,最后转为设置PictureBox的image属性。

运行

    最后,先把服务端的承载Hub的ASP.NET Core应许运行起来,然后打开页面,另外再开一个浏览器,同时运行WinForm客户端,现在在三个客户端上,都可以看到摄像头Stream出来的信息了,当摄像头播放时,其它几个客户端能够实时更新😁。

▲ SignalR Streaming效果展示,左上Chrome,右上Edge,左下WinForm,右下WeCam虚拟摄像头输出内容