zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

一个简易socket通信结构

2023-03-31 11:01:48 时间

服务端

工作需要又需要用到socketTCP通讯,这么多年了,终于稍微能写点了。让我说其实也说不出个啥来,看了很多的异步后稍微对异步socket的导流 endreceive后 再beginreceive 形成一个内循环有了个认识,加上我自己的封包拆包机制,然后再仿那些其它的大多数代码结构弄点onReceive事件进行 收包触发。整个过程就算差不多了 ,基本上是能够可靠运行的 靠谱的 中规中矩的,要说啥创新读到嘛真的谈不上。代码中写了很多low逼注释 也是为了方便自己理解 请无视。下面是server端代码,使用异步机制accept 异步receive ,成员有 clients代表当前在线的客户端 客户端socket包装为EndpointClient ,有onClientAddDel 代表客户端上线掉线事件,有onReceive代表所有客户端的收包事件,clients由于是异步的多线程访问就要涉及多线程管控 所以使用lock ,服务端有sendToAll() 和SendToSomeOne()毫无疑问这也是通过调用特定的clients来做的。

以下是服务端代码

  1 public class MsgServerSchedule
  2 {
  3 
  4 
  5     Socket serverSocket;
  6     public Action<List<string>> onClientAddDel;
  7     public Action<Telegram_Base> onReceive;
  8     bool _isRunning = false;
  9 
 10     
 11     int port;
 12 
 13     public TelgramType telType;
 14 
 15     static List<EndpointClient> clients;
 16 
 17     public bool isRunning { get { return _isRunning; } }
 18     public MsgServerSchedule(int _port)
 19     {
 20         //any 就决定了 ip地址格式是v4
 21         //IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 7654);
 22         //socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 23 
 24         this.port = _port;
 25 
 26         clients = new List<EndpointClient>();
 27 
 28         Console.WriteLine("constructor");
 29 
 30     }
 31 
 32     public void Start()
 33     {
 34         try
 35         {
 36             IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
 37             serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 38             serverSocket.Bind(endPoint);
 39             serverSocket.Listen(port);
 40 
 41             serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), serverSocket);
 42 
 43             _isRunning = true;
 44             Console.WriteLine("start");
 45         }
 46         catch (Exception ex)
 47         {
 48             _isRunning = false;
 49             serverSocket = null;
 50 
 51             Console.WriteLine("服务启动出现错误,可能端口已被占用:"+port);
 52             Console.WriteLine(ex.Message);
 53         }
 54        
 55     }
 56 
 57     public void Stop()
 58     {
 59         for (int i = 0; i < clients.Count; i++)
 60         {
 61             clients[i].Close();                
 62         }
 63         ClientAddDelGetList(null, EndPointClientsChangeType.ClearAll);
 64         serverSocket.Close();
 65         _isRunning = false;
 66     }
 67 
 68     public void SendToAll(Telegram_Base tel)
 69     {
 70         for (int i = 0; i < clients.Count; i++)
 71         {
 72             clients[i].Send(tel);
 73         }
 74     }
 75 
 76     public void SendToSomeOne(Telegram_Base tel)
 77     {
 78         for (int i = 0; i < clients.Count; i++)
 79         {
 80             if(clients[i].remoteIPPort==tel.remoteIPPort)
 81             {
 82                 clients[i].Send(tel);
 83                 break;
 84             }
 85         }
 86     }
 87 
 88     //新增与删除客户端 秉持原子操作
 89     List<string> ClientAddDelGetList(EndpointClient cli, EndPointClientsChangeType changeType)
 90     {
 91         //异步同时有新客户端上线 与下线 不进行资源互斥访问会报错
 92         lock (this)
 93         {
 94             if (changeType == EndPointClientsChangeType.Add)
 95             {
 96                 clients.Add(cli);
 97             }
 98             else if(changeType== EndPointClientsChangeType.Del)
 99             {
100                 var beRemoveClient = clients.First(r => r.remoteIPPort == cli.remoteIPPort);
101                 if (beRemoveClient != null)
102                     clients.Remove(beRemoveClient);
103             }
104             else if(changeType== EndPointClientsChangeType.ClearAll)
105             {
106                 clients.Clear();
107             }
108             else if (changeType == EndPointClientsChangeType.GetAll)
109             {
110                 List<string> onLines = new List<string>(clients.Count);
111                 for (int i = 0; i < clients.Count; i++)
112                 {
113                     onLines.Add(clients[i].remoteIPPort);
114                 }
115                 return onLines;
116             }
117             else
118             {
119                 return null;
120             }
121         }
122         return null;
123     }
124     //异步监听客户端 有客户端到来时的回调
125     private void AcceptCallback(IAsyncResult iar)
126     {
127         //server端一直在receive 能够感知到客户端掉线 (连上后 关闭客户端 server立即有错误爆出)
128         //但是同情况 关闭server端 客户端无错误爆出 直到点发送 才有错误爆出
129         //由此得出 处于receive才会有掉线感知  ,send时发现发不出去自然也会有感知 跟人的正常思维理解是一样的
130         //虽然tcp是所谓的长连接 ,通过反复测试  ->但是双方相互都处在一个静止状态 是无法 确定在不在的  
131         //连上后平常的情况下 并没有数据流通 的 ,双方只是一个状态的保持而已。
132         //这也是为什么 好多服务 客户端 程序 都有个心跳机制(由此我们可以想到继续完善 弄个客户端列表 心跳超时的剔除列表 正常发消息send 或receive 异常的剔除列表 删除clientSocket
133         //其实非要说吧 一般情况 服务端一直在receive 不用心跳其实也是可以的(客户端可能是真的需要
134         //tcp底层就已经有了一个判断对方在不在的机制 , 对方直接关程序 结束进程 这些 只要tcp在receive就立即能够感知 所以说心跳 用不用看情况吧
135 
136         //tcp 不会丢包 哪怕是连接建立了   你还没开始receive   对方却先发了,
137         //对方只要是发了的数据 都由操作系统像个缓存样给你放那的 不会掉 你再隔10秒开始receive都能rec的到
138 
139         //tcp甚至在拔掉网线 再重新插上 都可以保证数据一致性
140         //tcp的包顺序能够保证 先发的先到
141 
142         //nures代码中那些beginreceivexxx  异步receive的核心机制就是 ,假定数据到的时候把数据保存到xxx数组
143         //真正endreceive的时候 其实数据已经接收 处理完成了
144 
145         try
146         {
147 
148             //处理完当前accept
149             Socket currentSocket = serverSocket.EndAccept(iar);
150 
151             EndpointClient client = new EndpointClient(currentSocket,telType);
152 
153             //新增客户端
154             ClientAddDelGetList(client, EndPointClientsChangeType.Add);
155             
156             if (onClientAddDel != null)
157             {
158                 List<string> onlines = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
159                 onClientAddDel(onlines);
160 
161                 //客户端异常掉线
162                 client.onClientDel = new Action<string>((_remoteIPPort) =>
163                 {
164                     ClientAddDelGetList(new EndpointClient(){ remoteIPPort=_remoteIPPort} , EndPointClientsChangeType.Del);
165 
166                     List<string> onlines2 = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll);
167                     onClientAddDel(onlines2);
168                 });
169             }
170 
171             
172 
173             //这句到时调用完成后 就会自动把 receivebuffer填充 //要接收的字节数 系统底层操作一次接收多少字节 其实意义不大
174             //总是从0开始(就是说并发时会覆盖
175 
176             Console.WriteLine(string.Format("new client ->{0}", currentSocket.RemoteEndPoint.ToString()));
177 
178             //currentSocket.Close();
179             //Application.Exit();
180 
181             //Thread.Sleep(1000 * 10);
182             client.onReceive += this.onReceive;
183 
184             client.BeginReceive();
185 
186 
187             //立即开始accept新的客户端
188             if (isRunning == true && serverSocket != null)
189                 serverSocket.BeginAccept(AcceptCallback, serverSocket);
190             //beginAccept 最开始的方法可以不一样 ,但最终肯定是一个不断accept的闭环过程
191             //整个过程就像个导流样 ,最开始用异步导流到一个固定的点 然后让其循环源源不断运转
192 
193             //加asynccallback 有什么不一样么
194             //socket.BeginAccept(new AsyncCallback( AcceptCallback), socket);
195 
196         }
197         catch (Exception ex)
198         {
199             Console.WriteLine("AcceptCallback Error");
200             Console.WriteLine(ex.Message);
201         }
202 
203     }
204 
205    
206 }

EndpointClient终端代码代表客户端的对口人,他的onReceive 等资源从服务端继承 ,如果服务端想给某个特定客户端发数据则会调用他们中的某一个 毫无疑问这是通过remoteIPport来判断的,这些都是编写基本socket结构轻车熟路的老套路

以下EndpointClient代码

  1 public class EndpointClient
  2 {
  3     Socket workingSocket;
  4     static int receiveBufferLenMax = 5000;
  5     byte[] onceReadDatas = new byte[receiveBufferLenMax];
  6     List<byte> receiveBuffer = new List< byte>(receiveBufferLenMax);
  7 
  8     public string remoteIPPort { get; set; }
  9     
 10     //当前残留数据区 接收数据的起始指针(也代表缓冲区数据长度
 11     int receiveBufferLen = 0;
 12 
 13 
 14     TelgramType telType;
 15 
 16     public Action<Telegram_Base> onReceive;
 17     public Action<string> onClientDel;
 18 
 19     public EndpointClient()
 20     {
 21 
 22     }
 23     public EndpointClient(Socket _socket,TelgramType _telType)
 24     {
 25         this.remoteIPPort = _socket.RemoteEndPoint.ToString();
 26         this.telType = _telType;
 27         workingSocket = _socket;
 28     }
 29 
 30     public void Send(Telegram_Base tel)
 31     {
 32         //try
 33         //{
 34             if(workingSocket==null)
 35             {
 36                 Console.WriteLine("未初始化的EndpointClient");
 37                 return;
 38             }
 39             if (tel is Telegram_Schedule)
 40             {
 41                 Telegram_Schedule telBeSend = tel as Telegram_Schedule;
 42                 if (telBeSend.dataBytes.Length != telBeSend.dataLen)
 43                 {
 44                     Console.WriteLine("尝试发送数据长度格式错误的报文");
 45                     return;
 46                 }
 47 
 48                 byte[] sendBytesHeader = telBeSend.dataBytesHeader;
 49                 byte[] sendbytes = telBeSend.dataBytes;
 50 
 51                 //数据超过缓冲区长度 会导致无法拆包
 52                 if (sendbytes.Length <= receiveBufferLenMax)
 53                 {
 54                     workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
 55                     workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0,null,null
 56                     
 57                     );
 58                 }
 59                 else
 60                 {
 61                     Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");
 62                     throw new Exception("发送到调度客户端的数据超过缓冲区长度");
 63                 }
 64 
 65             }
 66             else if (tel is Telegram_SDBMsg)
 67             {
 68 
 69             }
 70 
 71         //}
 72         //catch (Exception ex)
 73         //{
 74 
 75         //    Console.WriteLine(ex.Message);
 76         //    throw ex;
 77         //}
 78     }
 79 
 80     public void BeginReceive()
 81     {
 82         if (workingSocket == null)
 83         {
 84             Console.WriteLine("未初始化的EndpointClient");
 85             return;
 86         }
 87 
 88         receiveBufferLen = 0;
 89         workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
 90             ReceiveCallback,
 91         this);
 92     }
 93     private void ReceiveCallback(IAsyncResult iar)
 94     {
 95         try
 96         {
 97             EndpointClient cli = (EndpointClient)iar.AsyncState;
 98             Socket socket = cli.workingSocket;
 99             int reads = socket.EndReceive(iar);
100 
101             if (reads > 0)
102             {
103 
104                 for (int i = 0; i < reads; i++)
105                 {
106                     receiveBuffer.Add(onceReadDatas[i]);
107                 }
108 
109                 //具体填充了多少看返回值 此时 数据已经在buffer中了
110                 receiveBufferLen += reads;
111                 //加完了后解析 阻塞式处理 结束后开启新的接收
112                 SloveTelData();
113 
114                 if (receiveBufferLenMax - receiveBufferLen > 0)
115                 {
116                     //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收)
117                     socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
118                 }
119                 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了
120                 {
121                     Close();
122                     //移除自己
123                     if (onClientDel != null)
124                     {
125                         onClientDel(remoteIPPort);
126                     }
127                     Console.WriteLine("服务端接口解析数据出现异常");
128                     throw new Exception("服务端接口解析数据出现异常");
129                 }
130             }
131             else//reads==0 客户端已关闭
132             {
133                 Close();
134                 //移除自己
135                 if (onClientDel != null)
136                 {
137                     onClientDel(remoteIPPort);
138                 }
139             }
140         }
141         catch (Exception ex)
142         {
143             Close();
144             //移除自己
145             if (onClientDel != null)
146             {
147                 onClientDel(remoteIPPort);
148             }
149 
150             Console.WriteLine("ReceiveCallback Error");
151             Console.WriteLine(ex.Message);
152         }
153 
154     }
155     void SloveTelData()
156     {
157         //进行数据解析
158         SloveTelDataUtil slo = new SloveTelDataUtil();
159         
160         if (telType == TelgramType.Schedule)
161         {
162             List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen, this.remoteIPPort);
163             //buffer已经被处理一遍了 使用新的长度
164             receiveBufferLen = receiveBuffer.Count;
165             //解析出的每一个对象都触发 onreceive
166             for (int i = 0; i < dataEntitys.Count; i++)
167             {
168                 if (onReceive != null)
169                     onReceive(dataEntitys[i]);
170             }
171         }
172         else if (telType == TelgramType.SDBMsg)
173         {
174 
175         }
176 
177     }
178 
179    
180     public void Close()
181     {
182         try
183         {
184             receiveBuffer.Clear();
185             receiveBufferLen = 0;
186             if (workingSocket != null && workingSocket.Connected)
187                 workingSocket.Close();
188         }
189         catch (Exception ex)
190         {
191             Console.WriteLine(ex.Message);
192         }
193         
194     }
195 }

数据拆包与封包粘包处理

上面的代码可以看到 数据包处理都在receiveCallback里 SloveTelData,也是通用的套路 ,解析到完整的包后从缓冲区移除 解析多少个包触发多少次事件,残余数据留在缓冲区 然后继续开始新的beginReceive往缓冲区加。在异步机制中 到达endReceive的时候数据已经在缓冲区里了,这个自不用多说噻。数据包和粘包逻辑在公共类库里供客户端服务端共同调用

以下是粘包处理逻辑

  1 public class SloveTelDataUtil
  2 {
  3     List<Telegram_Schedule> solveList;
  4     public SloveTelDataUtil()
  5     {
  6     }
  7     
  8     List<byte> buffer;
  9     int bufferLen;
 10     int bufferIndex = 0;
 11     string remoteIPPort;
 12     public List<Telegram_Schedule> Slove_Telegram_Schedule( List< byte> _buffer,int _bufferLen,string _remoteIPPort)
 13     {
 14 
 15         solveList = new List<Telegram_Schedule>();
 16 
 17         bufferIndex = 0;
 18 
 19         buffer = _buffer;
 20         bufferLen = _bufferLen;
 21         remoteIPPort = _remoteIPPort;
 22 
 23         //小于最小长度 直接返回
 24         if (bufferLen < 12)
 25             return solveList;
 26 
 27         //进行数据解析
 28         bool anaysisOK = false;
 29         while (anaysisOK=AnaysisData_Schedule()==true)//直到解析的不能解析为止
 30         {                
 31         }
 32         return solveList;
 33     }
 34 
 35     public bool AnaysisData_Schedule()
 36     {
 37         if (bufferLen - bufferIndex < GlobalSymbol.Headerlen)
 38             return false;
 39 
 40         //解析出一个数据对象
 41         Telegram_Schedule telObj = new Telegram_Schedule();
 42 
 43         //必定是大于最小数据大小的
 44         telObj.dataBytesHeader = new byte[GlobalSymbol.Headerlen];
 45         buffer.CopyTo(bufferIndex, telObj.dataBytesHeader, 0, GlobalSymbol.Headerlen);
 46 
 47         byte[] btsHeader = new byte[4];
 48         byte[] btsCommand = new byte[4];
 49         byte[] btsLen = new byte[4];
 50 
 51         btsHeader[0] = buffer[bufferIndex];
 52         btsHeader[1] = buffer[bufferIndex+1];
 53         btsHeader[2] = buffer[bufferIndex+2];
 54         btsHeader[3] = buffer[bufferIndex+3];
 55 
 56         bufferIndex += 4;
 57 
 58         btsCommand[0] = buffer[bufferIndex];
 59         btsCommand[1] = buffer[bufferIndex + 1];
 60         btsCommand[2] = buffer[bufferIndex + 2];
 61         btsCommand[3] = buffer[bufferIndex + 3];
 62 
 63         bufferIndex += 4;
 64 
 65         btsLen[0] = buffer[bufferIndex];
 66         btsLen[1] = buffer[bufferIndex + 1];
 67         btsLen[2] = buffer[bufferIndex + 2];
 68         btsLen[3] = buffer[bufferIndex + 3];
 69 
 70         bufferIndex += 4;
 71 
 72         
 73 
 74         int dataLen = BitConverter.ToInt32(btsLen, 0);
 75         telObj.header = BitConverter.ToUInt32(btsHeader, 0);
 76         telObj.command = BitConverter.ToInt32(btsCommand, 0);
 77         telObj.remoteIPPort = remoteIPPort;
 78 
 79         if(dataLen>0)
 80         {
 81             //数据区小于得到的数据长度 说明数据部分还没接收到 不删除缓冲区 不做任何处理
 82             //下次来了连着头一起解析
 83             if (bufferLen - GlobalSymbol.Headerlen < dataLen)
 84             {
 85 
 86                 bufferIndex -= 12;//
 87 
 88 
 89                 return false;
 90 
 91             }
 92             else
 93             {
 94 
 95                 telObj.dataLen = dataLen;
 96                 telObj.dataBytes = new byte[dataLen];
 97                 buffer.CopyTo(bufferIndex, telObj.dataBytes, 0, dataLen);
 98                 
 99                 solveList.Add(telObj);
100                 //bufferIndex += dataLen;
101 
102                 //解析成功一次 移除已解析的
103                 for (int i = 0; i < GlobalSymbol.Headerlen+dataLen; i++)
104                 {
105                     buffer.RemoveAt(0);
106                 }
107                 bufferIndex = 0;
108                 bufferLen = buffer.Count;
109                 return true;
110             }
111         }
112         else
113         {
114             
115             telObj.dataLen = 0;
116             solveList.Add(telObj);
117             //bufferIndex += 0;
118             //解析成功一次 移除已解析的
119             for (int i = 0; i < GlobalSymbol.Headerlen; i++)
120             {
121                 buffer.RemoveAt(0);
122             }
123             //解析成功一次因为移除了缓冲区 bufferIndex置0
124             bufferIndex = 0;
125             bufferLen = buffer.Count;
126             return true;
127         }
128 
129     }
130 
131     
132     public List<Telegram_SDBMsg> Slove_Telegram_SDBMsg(ref byte[] buffer)
133     {
134         return new List<Telegram_SDBMsg>();
135     }
136 }

我们看到用到的数据包对象是Telegram_Schedule ,中间保存有报文数据,数据发送的目标等信息。

以下是数据包结构代码

 1 public class Telegram_Base
 2 {
 3     public string remoteIPPort { get; set; }
 4     //数据内容
 5     public byte[] dataBytes { get; set; }
 6     //头部内容的序列化
 7     public byte[] dataBytesHeader { get; set; }
 8 
 9     public string jsonStr { get; set; }
10     virtual public void SerialToBytes()
11     {
12 
13     }
14 
15     virtual public void SloveToTel()
16     {
17 
18     }
19 
20 }
21 
22 public class Telegram_Schedule:Telegram_Base
23 {
24     
25     //头部标识 4字节
26     public UInt32 header { get; set; }
27     //命令对应枚举的 int 4字节
28     public int command { get; set; }
29     //数据长度 4字节
30     public int dataLen { get; set; }
31 
32     
33 
34     override public void SerialToBytes()
35     {
36         //有字符串数据 但是待发送字节是空
37         if ((string.IsNullOrEmpty(jsonStr) == false ))//&& (dataBytes==null || dataBytes.Length==0)
38         {
39             dataBytes = Encoding.UTF8.GetBytes(jsonStr);
40             dataLen = dataBytes.Length;
41             dataBytesHeader = new byte[GlobalSymbol.Headerlen];
42           
43             header = GlobalSymbol.HeaderSymbol;
44             
45             byte[] btsHeader = BitConverter.GetBytes(header);
46             byte[] btsCommand = BitConverter.GetBytes(command);
47             byte[] btsLen = BitConverter.GetBytes(dataLen);
48 
49             Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4);
50             Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4);
51             Array.Copy(btsLen, 0, dataBytesHeader, 8, 4);
52 
53         }
54         else if((string.IsNullOrEmpty(jsonStr) == true )&& (dataBytes==null || dataBytes.Length==0)){
55             dataLen = 0;
56             dataBytes = new byte[0];
57 
58             dataBytesHeader = new byte[GlobalSymbol.Headerlen];
59 
60             header = GlobalSymbol.HeaderSymbol;
61 
62             byte[] btsHeader = BitConverter.GetBytes(header);
63             byte[] btsCommand = BitConverter.GetBytes(command);
64             byte[] btsLen = BitConverter.GetBytes(dataLen);
65 
66             Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4);
67             Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4);
68             Array.Copy(btsLen, 0, dataBytesHeader, 8, 4);
69         }
70     }
71 
72     override public void SloveToTel()
73     {
74         //只解析字符串数据部分 ,header 和len 在接收之初就已解析
75         this.jsonStr = Encoding.UTF8.GetString(this.dataBytes);
76     }
77 
78 }

客户端代码

最后是客户端,有了上面的结构,客户端就不足为谈了,稍微了解socket的人都熟知套路的 基本跟EndpointClient一致

  1 public class MsgClientSchedule
  2 {
  3     Socket workingSocket;
  4     //缓冲区最大数据长度
  5     static int receiveBufferLenMax = 5000;
  6     //单次receive数据(取决于tcp底层封包 但是不会超过缓冲区最大长度
  7     byte[] onceReadDatas = new byte[receiveBufferLenMax];
  8     //未解析到完整数据包时的残余数据保存区
  9     List<byte> receiveBuffer = new List<byte>(receiveBufferLenMax);
 10 
 11     string serverIP { get; set; }
 12     int serverPort { get; set; }
 13     public string localIPPort { get; set; }
 14 
 15     //残余缓冲区数据长度
 16     int receiveBufferLen = 0;
 17 
 18     bool _isConnected { get; set; }
 19 
 20     TelgramType telType;
 21 
 22     //收一个包时触发
 23     public Action<Telegram_Base> onReceive;
 24     //与服务端断链时触发
 25     public Action<string> onClientDel;
 26 
 27 
 28     public bool isConnected { get { return _isConnected; } }
 29     public MsgClientSchedule(string _serverIP,int _port)
 30     {
 31         serverIP = _serverIP;
 32         serverPort = _port;
 33         _isConnected = false;
 34     }
 35 
 36     public void Connect()
 37     {
 38         try
 39         {
 40             workingSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP);
 41             IPEndPoint ipport = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);
 42             workingSocket.Connect(ipport);
 43 
 44             localIPPort = workingSocket.LocalEndPoint.ToString();
 45             _isConnected = true;
 46             BeginReceive();
 47         }
 48         catch (Exception ex)
 49         {
 50             workingSocket = null;
 51             _isConnected = false;
 52 
 53             Console.WriteLine(ex.Message);
 54         }
 55 
 56     }
 57 
 58 
 59 
 60 
 61     public void Send(Telegram_Base tel)
 62     {
 63         try
 64         {
 65             if(_isConnected==false)
 66             {
 67                 Console.WriteLine("未连接到服务器");
 68                 return;
 69             }
 70             if (tel is Telegram_Schedule)
 71             {
 72                 Telegram_Schedule telBeSend = tel as Telegram_Schedule;
 73                 if (telBeSend.dataBytes.Length != telBeSend.dataLen)
 74                 {
 75                     Console.WriteLine("尝试发送数据长度格式错误的报文");
 76                     return;
 77                 }
 78                 byte[] sendBytesHeader = telBeSend.dataBytesHeader;
 79                 byte[] sendbytes = telBeSend.dataBytes;
 80 
 81                 //数据超过缓冲区长度 会导致无法拆包
 82                 if (sendbytes.Length <= receiveBufferLenMax)
 83                 {
 84                     workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);
 85                     workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0, null, null
 86                         
 87                     );
 88                 }
 89                 else
 90                 {
 91                     Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");
 92                     throw new Exception("发送到调度客户端的数据超过缓冲区长度");
 93                 }
 94 
 95                 
 96             }
 97             else if (tel is Telegram_SDBMsg)
 98             {
 99 
100             }
101 
102         }
103         catch (Exception ex)
104         {
105             Close();
106             Console.WriteLine(ex.Message);
107             //throw ex;
108         }
109     }
110 
111     public void BeginReceive()
112     {
113         receiveBufferLen = 0;
114         workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,
115             ReceiveCallback,
116             
117         this);
118     }
119     private void ReceiveCallback(IAsyncResult iar)
120     {
121         try
122         {
123             MsgClientSchedule cli = (MsgClientSchedule)iar.AsyncState;
124             Socket socket = cli.workingSocket;
125             int reads = socket.EndReceive(iar);
126 
127             if (reads > 0)
128             {
129 
130                 for (int i = 0; i < reads; i++)
131                 {
132                     receiveBuffer.Add(onceReadDatas[i]);
133                 }
134 
135                 //具体填充了多少看返回值 此时 数据已经在buffer中了
136 
137                 receiveBufferLen += reads;
138 
139                 //加完了后解析 阻塞式处理 结束后开启新的接收
140                 SloveTelData();
141 
142 
143 
144                 if (receiveBufferLenMax - receiveBufferLen > 0)
145                 {
146                     //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收)
147                     socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this);
148                 }
149                 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了
150                 {
151                     Close();
152                     
153                     Console.WriteLine("服务端接口解析数据出现异常");
154                     throw new Exception("服务端接口解析数据出现异常");
155                 }
156             }
157             else//reads==0客户端已关闭
158             {
159                 Close();                    
160             }
161         }
162         catch (Exception ex)
163         {
164             Close();
165             
166             Console.WriteLine("ReceiveCallback Error");
167             Console.WriteLine(ex.Message);
168         }
169 
170     }
171     private void SloveTelData()
172     {
173         
174         //进行数据解析
175         SloveTelDataUtil slo = new SloveTelDataUtil();
176 
177         if (telType == TelgramType.Schedule)
178         {
179             List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen,serverIP+":"+serverPort.ToString());
180             //buffer已经被处理一遍了 使用新的长度
181             receiveBufferLen = receiveBuffer.Count;
182             //解析出的每一个对象都触发 onreceive
183             for (int i = 0; i < dataEntitys.Count; i++)
184             {
185                 if (onReceive != null)
186                     onReceive(dataEntitys[i]);
187             }
188         }
189         else if (telType == TelgramType.SDBMsg)
190         {
191 
192         }
193 
194     }
195 
196 
197     public void Close()
198     {
199         try
200         {
201             _isConnected = false;
202 
203             receiveBuffer.Clear();
204             receiveBufferLen = 0;
205             if (workingSocket != null && workingSocket.Connected)
206                 workingSocket.Close();
207         }
208         catch (Exception ex)
209         {
210             Console.WriteLine(ex.Message);
211         }
212 
213     }
214 
215 }

服务端调用

构建一个winform基本项目

 1 List<string> clients;
 2 MsgServerSchedule server;
 3 private void btn_start_Click(object sender, EventArgs e)
 4 {
 5     server = new MsgServerSchedule(int.Parse(tbx_port.Text));
 6 
 7 
 8     server.Start();
 9     if (server.isRunning == true)
10     {
11         btn_start.Enabled = false;
12 
13         server.onReceive += new Action<Telegram_Base>(
14         (tel) =>
15         {
16             this.BeginInvoke(new Action(() =>
17             {
18                 if (tel is Telegram_Schedule)
19                 {
20                     Telegram_Schedule ts = tel as Telegram_Schedule;
21                     ts.SloveToTel();
22                     Console.WriteLine(string.Format("commandType:{0}", ((ScheduleTelCommandType)ts.command).ToString()));
23 
24                     tbx_msgs.Text += ts.remoteIPPort + ">" + ts.jsonStr + "
";
25 
26                     //数据回发测试
27                     string fromip = ts.remoteIPPort;
28                     string srcMsg = ts.jsonStr;
29                     string fromServerMsg = ts.jsonStr + " -from server";
30                     ts.jsonStr = fromServerMsg;
31 
32 
33                     //如果消息里有指向信息 则转送到对应的客户端
34                     if (clients != null)
35                     {
36                         string to = null;
37                         for (int i = 0; i < clients.Count; i++)
38                         {
39                             if (srcMsg.Contains(clients[i]))
40                             {
41                                 to = clients[i];
42                                 break;
43                             }
44                         }
45 
46                         if (to != null)
47                         {
48                             ts.remoteIPPort = to;
49                             string toMsg;
50                             //toMsg= srcMsg.Replace(to, "");
51                             toMsg = srcMsg.Replace(to, fromip);
52                             ts.jsonStr = toMsg;
53                             ts.SerialToBytes();
54 
55                             server.SendToSomeOne(ts);
56                         }
57                         else
58                         {
59                             ts.SerialToBytes();
60                             server.SendToSomeOne(ts);
61                         }
62                     }
63                 }
64             }));
65 
66         }
67         );
68 
69         server.onClientAddDel += new Action<List<string>>((onlines) =>
70         {
71             this.BeginInvoke(
72                 new Action(() =>
73                 {
74                     clients = onlines;
75                     listbox_clients.Items.Clear();
76 
77                     for (int i = 0; i < onlines.Count; i++)
78                     {
79                         listbox_clients.Items.Add(onlines[i]);
80                     }
81                 }));
82         });
83     }
84 }
85 private void btn_sendAll_Click(object sender, EventArgs e)
86 {
87     Telegram_Schedule tel = new Telegram_Schedule();
88     tel.header = GlobalSymbol.HeaderSymbol;
89     tel.command = (int)ScheduleTelCommandType.StartC2S;
90     tel.jsonStr = tbx_sendAll.Text;
91     tel.SerialToBytes();
92 
93     server.SendToAll(tel);
94 }

客户端调用

 1 MsgClientSchedule client;
 2 
 3 private void btn_start_Click(object sender, EventArgs e)
 4 {
 5     client = new MsgClientSchedule(tbx_ip.Text, int.Parse(tbx_port.Text));
 6 
 7     client.Connect();
 8 
 9     if (client.isConnected == true)
10     {
11         btn_start.Enabled = false;
12         
13         label1.Text = client.localIPPort;
14 
15         client.onReceive = new Action<Telegram_Base>((tel) =>
16         {
17             this.BeginInvoke(
18                 new Action(() =>
19                 {
20                     tel.SloveToTel();
21                     tbx_rec.Text += tel.jsonStr + "
";
22 
23                 }));
24         });
25     }
26 
27 }
28 
29 
30 
31 private void btn_send_Click(object sender, EventArgs e)
32 {
33 
34     if (client == null || client.isConnected == false)
35         return;
36 
37     //for (int i = 0; i < 2; i++)
38     //{
39         Telegram_Schedule tel = new Telegram_Schedule();
40         tel.command = (int)ScheduleTelCommandType.MsgC2S;
41     
42         tel.jsonStr = tbx_remoteip.Text+">"+ tbx_msgSend.Text;
43         tel.SerialToBytes();//发出前要先序列化
44 
45         client.Send(tel);
46     //}
47     
48 }

实现效果

可以多客户端连接互相自由发送消息,服务端可以编写转发规则代码,那些什么棋牌啊 互动白板 以及其他类似的应用就可以基于此之上发挥想象了