zl程序教程

您现在的位置是:首页 >  后端

当前栏目

IOCP使用acceptEX进行异步接收

异步 使用 进行 接收 IOCP
2023-06-13 09:15:01 时间

大家好,又见面了,我是你们的朋友全栈君。

示例代码为什么要调用WSAIoctl ()函数

#include <winsock2.h>
#include <windows.h>
#include <string>
#include <iostream>
#include<process.h>
#include <ws2tcpip.h>
#include <mswsock.h>
using namespace std;
//#pragma comment(lib,"MSWSOCK.lib")
#pragma comment(lib,"ws2_32.lib")
#pragma comment(lib,"kernel32.lib")
HANDLE g_hIOCP;
enum IO_OPERATION{ 

IO_READ,
IO_WRITE,
ACCEPT
};
class IO_DATA{ 

public:
OVERLAPPED                  Overlapped;
WSABUF                      wsabuf;
int                         nBytes;
IO_OPERATION                opCode;
char						buffer[1024];
int							BufferLen;
SOCKET                      client;
SOCKET						server;
};
LPFN_ACCEPTEX lpfnAcceptEx = NULL;//AcceptEx函数指针
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockAddrs = NULL;
char buffer[1024];
string ValueToIP(const int nValue)
{ 
 
char strTemp[20];
sprintf( strTemp,"%d.%d.%d.%d",
(nValue&0xff000000)>>24,
(nValue&0x00ff0000)>>16,
(nValue&0x0000ff00)>>8,
(nValue&0x000000ff) );
return string(strTemp);
}
unsigned int _stdcall WorkerThread (LPVOID WorkThreadContext) { 

IO_DATA *lpIOContext = NULL; 
DWORD nBytes = 0;
DWORD dwFlags = 0; 
int nRet = 0;
DWORD dwIoSize = 0; 
void * lpCompletionKey = NULL;
LPOVERLAPPED lpOverlapped = NULL;
while(1){ 

bool ret = GetQueuedCompletionStatus(g_hIOCP, &dwIoSize,(LPDWORD)&lpCompletionKey,(LPOVERLAPPED *)&lpOverlapped, INFINITE);
if ( !ret ) { 

if ( GetLastError() == WAIT_TIMEOUT ) { 

continue;
}
cout << "ret:" << ret << endl;
}
lpIOContext = (IO_DATA *)lpOverlapped;
if(lpIOContext->opCode == IO_READ) // a read operation complete
{ 

cout << "io_read...." << lpIOContext->wsabuf.buf << "end" << endl;
ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
strcpy(buffer, "ok.....\n");
lpIOContext->wsabuf.buf = buffer;
lpIOContext->wsabuf.len = strlen(lpIOContext->wsabuf.buf);
lpIOContext->opCode = IO_WRITE;
lpIOContext->nBytes = strlen(buffer);
dwFlags = 0;
nBytes = 0;
nRet = WSASend(
lpIOContext->client,
&lpIOContext->wsabuf, 1, &nBytes,
dwFlags,
&(lpIOContext->Overlapped), NULL);
if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { 

cout << "WASSend Failed::Reason Code::"<< WSAGetLastError() << endl;
closesocket(lpIOContext->client);
delete lpIOContext;
continue;
}
memset(buffer, NULL, sizeof(buffer));
}
else if(lpIOContext->opCode == IO_WRITE) //a write operation complete
{ 

cout << "io_write...." << endl;
// Write operation completed, so post Read operation.
lpIOContext->opCode = IO_READ; 
nBytes = 1024;
dwFlags = 0;
lpIOContext->wsabuf.buf = buffer;
lpIOContext->wsabuf.len = nBytes;
lpIOContext->nBytes = nBytes;
ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
nRet = WSARecv(
lpIOContext->client,
&lpIOContext->wsabuf, 1, &nBytes,
&dwFlags,
&lpIOContext->Overlapped, NULL);
if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { 

cout << "WASRecv Failed::Reason Code1::"<< WSAGetLastError() << endl;
closesocket(lpIOContext->client);
delete lpIOContext;
continue;
} 
cout<<lpIOContext->wsabuf.buf<<endl;
} else if (lpIOContext->opCode == ACCEPT) { 

//sockaddr* client_addr, *server_addr;
//GetAcceptExSockaddrs(lpIOContext->wsabuf.buf, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &client_addr, (LPINT)sizeof(sockaddr_in), &server_addr, (LPINT)sizeof(sockaddr_in));
//cout << "接收到一个连接 ip:" << ntohs(((sockaddr_in*)client_addr)->sin_port) << " port:" << endl;
SOCKADDR_IN* remote = NULL;
SOCKADDR_IN* local = NULL;
int remoteLen = sizeof(SOCKADDR_IN);
int localLen = sizeof(SOCKADDR_IN);
lpfnGetAcceptExSockAddrs(lpIOContext->wsabuf.buf, 0,
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, (LPSOCKADDR*)&local, &localLen, (LPSOCKADDR*)&remote, &remoteLen);
cout << "接收到一个连接 local ip:" << ValueToIP(local->sin_addr.s_addr) << "local port:" << ntohs(local->sin_port) << "remote ip:" << ValueToIP(remote->sin_addr.s_addr) << 
"remote port:" << ntohs(remote->sin_port) << endl;
CreateIoCompletionPort((HANDLE)lpIOContext->client,
g_hIOCP, (u_long) 0, 0);
IO_DATA* io_data = new IO_DATA;
io_data->opCode = IO_READ;
io_data->wsabuf.buf = io_data->buffer;
io_data->wsabuf.len = io_data->BufferLen = 1024;
io_data->client = lpIOContext->client;
//{WSAID_ACCEPTEX, WSAID_GETACCEPTEXSOCKADDRS}
ZeroMemory(&io_data->Overlapped, sizeof(io_data->Overlapped));
if (WSARecv(io_data->client, &io_data->wsabuf, 1, (DWORD*)&nBytes, &dwFlags, &io_data->Overlapped, NULL) == SOCKET_ERROR) { 

if (WSAGetLastError() == WSA_IO_PENDING) { 

cout << "WSARecv Pending..." << endl;
}
}
lpIOContext->client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
lpIOContext->opCode = ACCEPT;
lpIOContext->wsabuf.buf = lpIOContext->buffer;
lpIOContext->wsabuf.len = lpIOContext->BufferLen = 1024;
lpIOContext->nBytes = 0;
memset(&lpIOContext->Overlapped, 0, sizeof(lpIOContext->Overlapped));
while ( lpfnAcceptEx(lpIOContext->server, lpIOContext->client, lpIOContext->wsabuf.buf, 0,
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
&nBytes, &lpIOContext->Overlapped) == FALSE && WSAGetLastError() != ERROR_IO_PENDING) { 

cout << "lpfnAcceptEx 失败 继续accept" << endl;
}
} else { 

cout << "没有匹配的类型" << endl;
}
}
return 0;
}
unsigned int _stdcall TestThread(LPVOID arg) { 

printf("test thread\n");
for (int i = 0; i < 10; i++) { 

printf("run: %d .....\n", i);
Sleep(1000);
}
return 0;
}
int main ()
{ 

WSADATA wsaData;
WSAStartup(MAKEWORD(2,2), &wsaData);
SOCKET    m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED);
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
int g_ThreadCount = sysInfo.dwNumberOfProcessors * 2;
g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0, g_ThreadCount);
CreateIoCompletionPort((HANDLE) m_socket, g_hIOCP, (u_long) 0, 0);
sockaddr_in server;
server.sin_family = AF_INET;
server.sin_port = htons(6000);
server.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
bind(m_socket ,(sockaddr*)&server,sizeof(server));
listen(m_socket, 8);
//CreateIoCompletionPort((HANDLE)m_socket,g_hIOCP,0,0);
for( int i=0;i < 1; ++i){ 

HANDLE  hThread;
hThread = (HANDLE)_beginthreadex(NULL, 0, WorkerThread, 0, 0, NULL);
CloseHandle(hThread);
}
//-------------------------------------------------------------------------------------
DWORD dwBytes = 0;
IO_DATA* per_io_data = new IO_DATA;
per_io_data->wsabuf.buf = per_io_data->buffer;
per_io_data->wsabuf.len = per_io_data->BufferLen = 1024;
per_io_data->opCode = ACCEPT;
memset(&per_io_data->Overlapped, 0, sizeof(per_io_data->Overlapped));
int iResult = WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx, sizeof (GuidAcceptEx), 
&lpfnAcceptEx, sizeof (lpfnAcceptEx), 
&dwBytes, NULL, NULL);
if (iResult == SOCKET_ERROR) { 

wprintf(L"WSAIoctl failed with error: %u iResult:%d\n", WSAGetLastError(), iResult);
getchar();
closesocket(m_socket);
WSACleanup();
return 1;
}
if (SOCKET_ERROR == WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs), &lpfnGetAcceptExSockAddrs, sizeof(lpfnGetAcceptExSockAddrs),
&dwBytes, NULL, NULL))
{ 

cerr << "WSAIoctl failed with error code: " << WSAGetLastError() << endl;
getchar();
if (INVALID_SOCKET != m_socket)
{ 

closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
//goto EXIT_CODE;
return -1;
}
per_io_data->client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (per_io_data->client == INVALID_SOCKET) { 

wprintf(L"Create accept socket failed with error: %u\n", WSAGetLastError());
getchar();
closesocket(m_socket);
WSACleanup();
return 1;
}
per_io_data->server = m_socket;
//bool bRetVal = lpfnAcceptEx(m_socket, per_io_data->client, per_io_data->wsabuf.buf,
// per_io_data->wsabuf.len - ((sizeof (SOCKADDR_IN) + 16) * 2),
// sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
// &dwBytes, &per_io_data->Overlapped);
bool bRetVal = lpfnAcceptEx(per_io_data->server, per_io_data->client, per_io_data->wsabuf.buf,
0,
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
&dwBytes, &per_io_data->Overlapped);
if (bRetVal == FALSE && WSAGetLastError() != ERROR_IO_PENDING) { 

printf("AcceptEx failed with error: %u\n", WSAGetLastError());
getchar();
closesocket(per_io_data->client);
closesocket(m_socket);
WSACleanup();
return 1;
}
//*****************************************************************************
m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED);
CreateIoCompletionPort((HANDLE) m_socket, g_hIOCP, (u_long) 0, 0);
server.sin_family = AF_INET;
server.sin_port = htons(6020);
server.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
bind(m_socket ,(sockaddr*)&server,sizeof(server));
listen(m_socket, 8);
IO_DATA* per_io_data2 = new IO_DATA;
per_io_data2->wsabuf.buf = per_io_data2->buffer;
per_io_data2->wsabuf.len = per_io_data2->BufferLen = 1024;
per_io_data2->opCode = ACCEPT;
memset(&per_io_data2->Overlapped, 0, sizeof(per_io_data2->Overlapped));
per_io_data2->client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
per_io_data2->server = m_socket;
//int iResult = WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
// &GuidAcceptEx, sizeof (GuidAcceptEx), 
// &lpfnAcceptEx, sizeof (lpfnAcceptEx), 
// &dwBytes, NULL, NULL);
//if (iResult == SOCKET_ERROR) { 

// wprintf(L"WSAIoctl failed with error: %u iResult:%d\n", WSAGetLastError(), iResult);
// getchar();
// closesocket(m_socket);
// WSACleanup();
// return 1;
//}
//if (SOCKET_ERROR == WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs,
// sizeof(GuidGetAcceptExSockAddrs), &lpfnGetAcceptExSockAddrs, sizeof(lpfnGetAcceptExSockAddrs),
// &dwBytes, NULL, NULL))
//{ 

// cerr << "WSAIoctl failed with error code: " << WSAGetLastError() << endl;
// getchar();
// if (INVALID_SOCKET != m_socket)
// { 

// closesocket(m_socket);
// m_socket = INVALID_SOCKET;
// }
// //goto EXIT_CODE;
// return -1;
//}
bRetVal = lpfnAcceptEx(per_io_data2->server, per_io_data2->client, per_io_data2->wsabuf.buf,
0,
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
&dwBytes, &per_io_data2->Overlapped);
if (bRetVal == FALSE && WSAGetLastError() != ERROR_IO_PENDING) { 

printf("AcceptEx failed with error2222: %u\n", WSAGetLastError());
getchar();
closesocket(per_io_data2->client);
closesocket(m_socket);
WSACleanup();
return 1;
}
//---------------------------------------------------------------------------------------
//SOCKET client = accept( m_socket, NULL, NULL );
//cout << "Client connected." << endl;
//if (CreateIoCompletionPort((HANDLE)client, g_hIOCP, 0, 0) == NULL){ 

// cout << "Binding Client Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl;
// closesocket(client);
//}
//else { //post a recv request
// IO_DATA * data = new IO_DATA;
// memset(buffer, NULL ,1024);
// memset(&data->Overlapped, 0 , sizeof(data->Overlapped));
// data->opCode = IO_READ;
// data->nBytes = 0;
// data->wsabuf.buf = buffer;
// data->wsabuf.len = sizeof(buffer);
// data->client = client;
// DWORD nBytes= 1024 ,dwFlags=0;
// int nRet = WSARecv(client,&data->wsabuf, 1, &nBytes,
// &dwFlags,
// &data->Overlapped, NULL);
// if(nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())){ 

// cout << "WASRecv Failed::Reason Code::"<< WSAGetLastError() << endl;
// closesocket(client);
// delete data;
// }
// cout<<data->wsabuf.buf<<endl;
//}
HANDLE m_oHandle = CreateEvent(NULL, true, false, NULL);
WaitForSingleObject(m_oHandle, INFINITE);
closesocket(m_socket);
WSACleanup();
return 0;
}

代码部分疑惑说明 说明:①WSAAcceptEx函数作用是投递accept操作到完成端口内核,只有该函数可以完成此功能

②WSAAcceptEx是扩展函数,本身没有定义在winsock2中,所以需要类似动态加载的方式获取函数指针,Windows提供了函数WSAIoctl和参数SIO_GET_EXTENSION_FUNCTION_POINTER来获取函数指针,宏WSAID_ACCEPTEX是该函数对应的GUID号,完成端口的其他几个功能函数也是用这种方式取得,WSAloctl的第一个参数是一个有效socket,不是特定socket。

③acceptex 参数说明:1)监听socket;2)预先定义的连接socket,等同于其他模式accept函数返回的socket,但是这里的socket是提前申请好的,所以在高并发连接时不会有新建socket的开销,提供了性能;3)该参数为0,表示连接即返回,不为0时传入一个缓存,连接并收到第一份数据时返回,缓存内是收到的数据,高并发时设置缓存,可以等到真实数据发送过来时返回,降低完成端口处理的并发数;4)缓存长度,因为收到数据时,会把远端地址信息和本地地址信息写入缓存,所以长度要减去两个地址信息的长度,也即参数5)+6)的和;5)和6):远端地址、本地地址信息长度,16是预留的16个字节,目前内核没有用到,但是空间要保留,所以必须加16;7)实际接收的数据,没用;8)很关键,后文详细说明(见注①)。

6)此时监听socket的accept操作就被投递给了完成端口内核,接下来就是不断询问完成端口是否有操作抵达

参考 https://blog.csdn.net/blwinner/article/details/52882954

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/187868.html原文链接:https://javaforall.cn