zl程序教程

您现在的位置是:首页 >  其他

当前栏目

SignalR服务器端消息推送

2023-03-14 22:55:04 时间

SignalR服务器端消息推送

某些场景下,需要服务端向客户端发送请求。.net中采用封装了WebSocet的SignalR进行消息的处理。WebSocket独立于http,但是WebSocket服务器一般都部署在Web服务器上,所以需要借助http完成初始握手,并共享http的端口。

SignalR基本使用

SignalR中一个重要的组件就是集线器hub,他用于在WebSocket服务器端和所有客户端之间进行数据交换,所有连接到同一个集线器上的程序都可以互相通信。

  1. 创建一个继承自Hub的类(Microsoft.AspNetCore.SignalR命名空间)的类,所有客户端和服务器都通过这个集线器进行通信。

public class ChatRoomHub:Hub

{

   public Task SendPublicMessage(string message)

   {

         string connId = this.Context.ConnectionId;//获得发送消息端的连接ID

         string msg = $"{connId} {DateTime.Now}:{message}";

       //发送到连接到集线器的所有客户端上

         return Clients.All.SendAsync("ReceivePublicMessage", msg);

   }

}

  1. 编辑Program.cs,在builder.Build之前调用

builder.Services.AddSignalR();

//如果是前后端分离项目,WebSocket初始化握手需要通过http,所以启用跨域

string[] urls = new[] { "http://localhost:3000" };

builder.Services.AddCors(options =>

   options.AddDefaultPolicy(builder => builder.WithOrigins(urls)

       .AllowAnyMethod().AllowAnyHeader().AllowCredentials())

);

var app = builder.Build();

app.UseCors();

//在MapControllers之前调用,启用中间件

//当客户端通过SignalRq请求/Hubs/ChatRoomHub时,由ChatRoomHub处理

app.MapHub<ChatRoomHub>("/Hubs/ChatRoomHub");

app.MapControllers();

  1. 前端vue组件

<template>

 <div>

   <input type="text" v-model="state.userMessage" v-on:keypress="txtMsgOnkeypress" />

   <div> <ul> <li v-for="(msg,index) in state.messages" :key="index" >{{msg}}</li> </ul>

   </div>

 </div>

</template>

<script>

import * as signalR from '@microsoft/signalr'

export default {

 data() {

   return {

     name: "Login",

     state: {

       userMessage: "",

       messages: [],

     },

     connection: "",

   };

 },

 mounted() {

   this.connectInit();

 },

 methods: {

   async txtMsgOnkeypress(e) {

     if (e.keyCode != 13) return;

       //invoke调用集线器的方法,后面的方法名为集线器中定义的方法名

     await this.connection.invoke("SendPublicMessage", this.state.userMessage);

     this.state.userMessage = "";

   },

   async connectInit() {

       //创建客户端到服务端的连接

     this.connection = new signalR.HubConnectionBuilder()

       .withUrl("http://localhost:7112/Hubs/ChatRoomHub")//服务端的地址

       .withAutomaticReconnect()//断开后重新连接,但是ConnectionId会改变

       .build();//构建完成一个客户端到集线器的连接

     await this.connection.start();//启动连接

       //用on来检测服务器使用SendAsync方法发送的消息,注意名称要相同

     this.connection.on("ReceivePublicMessage", (msg) => {

       this.state.messages.push(msg);

     });

   },

 },

};

</script>

 

<style lang="less" scoped>

</style>

 

SignalR分布部署

假设聊天室被部署到两台服务器上,客户端1、2在A服务器,客户端3、4在B服务器上,此时,1只能和2通信,3只能和4通信。微软提供了Redis服务器来解决这个问题。

  1. Nugt安装Microsoft.AspNetCore.SignalR.StackExchangeRedis
  2. 在Program.cs中的builder.Services.AddSignalR()后面加上

//第一个参数为redis服务器连接字符串

builder.Services.AddSignalR().AddStackExchangeRedis("127.0.0.1", options =>

{

   options.Configuration.ChannelPrefix = "Test1_";

});

SignalR身份验证

要求只有通过验证的用户才能连接集线器。

使用步骤如下(身份验证部分可参考(8.2 JWT(代替Session)):

  1. 在配置系统中配置一个名字为JWT的节点,配置相应的节点,并且创建一个JWTOption类。
  2. NuGet安装Microsoft.AspNetCore.Authentication.JwtBearer
  3. 对JWT进行配置在builder.Build之前添加

services.Configure<JWTOptions>(builder.Configuration.GetSection("JWT"));//实体配置类

services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)//配置授权的各种属性

.AddJwtBearer(x => //配置JWT的承载

{

   //配置JWT绑定到JWTOptions新的实例,返回一个JWTOptions实例

     JWTOptions? jwtOpt = builder.Configuration.GetSection("JWT").Get<JWTOptions>();

     byte[] keyBytes = Encoding.UTF8.GetBytes(jwtOpt.SigningKey);

     var secKey = new SymmetricSecurityKey(keyBytes);

     x.TokenValidationParameters = new()//设置令牌验证参数

     {

         ValidateIssuer = false,

         ValidateAudience = false,

         ValidateLifetime = true,

         ValidateIssuerSigningKey = true,

         IssuerSigningKey = secKey

     };

    x.Events = new JwtBearerEvents

       {

           OnMessageReceived = context =>

           {

               //JWT默认放到了Authorization请求头中,但是WebSocket不支持请求头,

               //所以将JWT放到了URL中,然后在服务器中检测URL中的JWT

               var accessToken = context.Request.Query["access_token"];

               var path = context.HttpContext.Request.Path;

               if (!string.IsNullOrEmpty(accessToken) &&

                   (path.StartsWithSegments("/Hubs/ChatRoomHub")))

               {

                   //如果请求URL中有JWT并且请求路径为集线器

                   //就把JWT复制给Token,这样就可以直接解析和使用JWT了

                   context.Token = accessToken;

               }

               return Task.CompletedTask;

           }

       };

});

 

  1. 在Program.cs中的app.UseAuthorization()前面加上app.UseAuthentication(),解决跨域和MapHub

 builder.Services.AddSignalR();

 //如果是前后端分离项目,WebSocket初始化握手需要通过http,所以启用跨域

 string[] urls = new[] { "http://localhost:3000" };

 builder.Services.AddCors(options =>

     options.AddDefaultPolicy(builder => builder.WithOrigins(urls)

         .AllowAnyMethod().AllowAnyHeader().AllowCredentials())

 );

 var app = builder.Build();

 app.UseCors();

 //在MapControllers之前调用,启用中间件

 //当客户端通过SignalRq请求/Hubs/ChatRoomHub时,由ChatRoomHub处理

 app.MapHub<ChatRoomHub>("/Hubs/ChatRoomHub");

 app.UseAuthentication();

 app.UseAuthorization();

 app.MapControllers();


  1. 在控制类中增加登陆并且创建JWT的操作方法 (参考8.2 JWT(代替Session)
  2. 在集线器类上增加[Authorize]

   [Authorize]

   public class ChatRoomHub:Hub

   {

       public Task SendPublicMessage(string message)

       {

           //可以直接拿到name

           string name = this.Context.User!.FindFirst(ClaimTypes.Name)!.Value;

           string msg = $"{name} {DateTime.Now}:{message}";

           return Clients.All.SendAsync("ReceivePublicMessage", msg);

       }

   }

//[Authorize]可以加到集线器类上,也可以加到类中某个方法上

//如果加到方法上,则任意客户端可以连接到集线器,只是不能调用那个方法,这样不推荐

  1. 前端页面

image

<template>

 <div>

   <fieldset>

     <legend>登录</legend>

     <div>

       用户名:<input  type="text"  v-model="state.loginData.name"  />

     </div>

     <div>

       密码:<input  type="password" v-model="state.loginData.password" >

     </div>

     <div>

       <input type="button" value="登录" v-on:click="loginClick" />

     </div>

   </fieldset>

   公屏: <input type="text"  v-model="state.userMessage" v-on:keypress="txtMsgOnkeypress" />

   <div>  <ul> <li v-for="(msg,index) in state.messages"  :key="index"  >{{msg}}</li> </ul>

   </div>

 </div>

</template>

<script>

import * as signalR from "@microsoft/signalr";

import axios from 'axios';

export default {

 data() {

   return {

     connection: '',

     state: {

       accessToken: "",

       userMessage: "",

       messages: [],

       loginData: { name: "", password: "" },

       privateMsg: { destUserName: "", message: "" },

     },

   };

 },

 methods: {

   async startConn() {

       const transport = signalR.HttpTransportType.WebSockets;

       //skipNegotiation跳过协商

       //transport强制采用的通信方式

       const options = { skipNegotiation: true, transport: transport };

       //将JWT传递给服务器端

       options.accessTokenFactory = () => this.state.accessToken;

     this.connection = new signalR.

     HubConnectionBuilder()

                   .withUrl('http://localhost:7173/Hubs/ChatRoomHub', options)

                   .withAutomaticReconnect().build();

     try {

       await this.connection.start();

     } catch (err) {

       alert(err);

       return;

     }

     this.connection.on("ReceivePublicMessage", (msg) => {

       this.state.messages.push(msg);

     });

     alert("登陆成功可以聊天了");

   },

   async loginClick() {

     

   const {data:resp} = await axios.post('http://localhost:7173/api/Identity/Login',

                   this.state.loginData);

                   console.log(resp);

               this.state.accessToken = resp.data;

               this.startConn();

   },

   async txtMsgOnkeypress(e) {

     if (e.keyCode != 13) return;

     try {

       await this.connection.invoke(

         "SendPublicMessage",

         this.state.userMessage

       );

     } catch (err) {

       alert(err);

       return;

     }

     this.state.userMessage = "";

   }

     

 },

};

</script>

<style scoped>

</style>

 

针对部分客户端的消息推送

之前使用了Clients.All.SendAsync向连接到当前集线器的所有客户端进行消息推送,但是某些场景需要针对特定用户进行消息推送。

进行客户端筛选的时候,有3个筛选参数,ConnectionId,组以及用户ID。

参数说明
ConnectionId是SignalR为每个客户端分配的Id
组有唯一的名字,对于连接到同一集线器的用户,可以自定义分组
用户ID对应于Claim.NameIdentifier的Claim值

另外集线器(Hub)有一个Groups属性,他可以对组成员进行管理。在将连接加入到组中的时候,如果组不存在则自动创建,注意,当客户端重连之后,需要将连接重新加入组。

方法名参数说明
AddToGroupAsyncstring connectionId,string groupName将connectionId放到groupName组中
RemoveFromGroupAsyncstring connectionId,string groupName将connectionId从groupName组中移除

集线器(Hub)的Clients属性可以对当前集线器用户进行筛选。

方法名参数说明
Caller只读属性获取当前连接的客户端
Others只读属性获取除了当前连接外的所有客户端
OthersInGroupstring groupName获取组中除了当前连接之外的所有客户端
All只读属性获取所有客户端
AllExceptIReadOnlyList<string>excludedConnectionIds所有客户端,除了ConnectionId在excludedConnectionIds之外的所有客户端
Clientstring connectionId获取connectionId客户端
ClientsIReadOnlyList<string>connectionIds获取包含在connectionIds的客户端
Groupstring groupNamegroupName组中的客户端
GroupsIReadOnlyList<string>groupNames获取多个组的客户端
GroupsExceptstring groupName,IReadOnlyList<string>excludedConnectionIds获取所有组名为groupName的组中,除了ConnectionId在excludedConnectionIds中的客户端
Userstring userId获取用户id为userId的客户端
UsersIReadOnlyList<string> userIds包含在userIds中的客户端

基于上面的代码,增加向特定客户端发送消息的功能

  1. 集线器类中增加

      //参数包含目标用户名

     public async Task<string> SendPrivateMessage(string destUserName, string message)

       {

           User? destUser = UserManager.FindByName(destUserName);//获取目标用户

           if (destUser == null)

           {

               return "DestUserNotFound";

           }

           string destUserId = destUser.Id.ToString();//目标用户的id

           string srcUserName = this.Context.User!.FindFirst(ClaimTypes.Name)!.Value;//发送端的用户

           string time = DateTime.Now.ToShortTimeString();

           //过滤出目标用户,并发送消息

           await this.Clients.User(destUserId).SendAsync("ReceivePrivateMessage",

               srcUserName, time, message);

           return "ok";

       }

  1. 前端页面增加私聊功能

image

//在template中增加

...

<div>

     私聊给<input

       type="text"

       v-model="state.privateMsg.destUserName"

     />

     <input

       type="text"

       v-model="state.privateMsg.message"

       v-on:keypress="txtPrivateMsgOnkeypress"

     />

</div>

 

<script>

   //增加私聊接收方法

   ...

this.connection.on("ReceivePrivateMessage", (srcUser, time, msg) => {

       this.state.messages.push(srcUser + " " + time + "===" + msg);

     });

   //增加私聊发送方法

   ...

async txtPrivateMsgOnkeypress(e) {

     if (e.keyCode != 13) return;

     const destUserName = this.state.privateMsg.destUserName;

     const msg = this.state.privateMsg.message;

     try {

       const ret = await connection.invoke(

         "SendPrivateMessage",

         destUserName,

         msg

       );

       if (ret != "ok") {

         alert(ret);

       }

     } catch (err) {

       alert(err);

       return;

     }

     state.privateMsg.message = "";

   }

</script>

注意:SignalR不会消息持久化,如果目标用户不在线就收不到消息,再次上线仍然收不到。如果需要持久化,则需要自行保存在数据库

外部向集线器推送消息

不通过集线器向客户端发送消息。

实现新增一个用户,向聊天室所有客户端推送欢迎xxx的消息。

  1. 在控制器中通过构造函数注入IHubContext服务,并向连接到ChatRoomHub集线器中的客户端推送消息。

public class Test1Controller : ControllerBase

   {

         private readonly IHubContext<ChatRoomHub> hubContext;

         public Test1Controller(IHubContext<ChatRoomHub> hubContext)

         {

             this.hubContext = hubContext;

         }

}

  1. 为控制器增加一个用于新增用户的操作。

         [HttpPost]

         public async Task<IActionResult> AddUser(AddNewUserRequest req)

         {

             //这里省略执行用户注册的代码

             await hubContext.Clients.All.SendAsync("UserAdded", req.UserName);

             return Ok();

         }

  1. 在前端增加UserAdded的监听代码

this.connection.on("UserAdded", (userName) => {

       this.state.messages.push("系统消息:欢迎" + userName + "加入我们!");

     });

注意:IHubContext不能向“当前连接的所有客户端(Caller)”、“除了当前连接之外的客户端”推送消息(others),因为实在集线器之外调用,所以请求不在一个SignalR连接中,也就没有SignalR连接的概念

建议:在使用SignalR的时候,Hub类中不应该有数据库操作等比较好事的操作,Hub类只应该用于消息发布,且SignalR客户端给服务器端传递消息的时间不能超过30s,否则会报错