用OpenSocket开发一个简单的高性能高并发HttpServer
2023-06-13 09:17:24 时间
OpenSocket是一个跨全平台的高性能网络并发库。
它使用了高性能IO,Linux和安卓用epoll,Win32用IOCP,iOS和Mac用kqueue,其他系统使用select。
本文用这种高性能socket库,设计开发一个简单的HttpServer。
为了开发方便,我们使用OpenThread作为线程库。使用OpenThread的Actor模式设计高并发HttpServer。
设计思路如下:
创建5条线程,1条线程封装成监听者Listener,另外4条线程封装成接收者Accepter。
监听者Listener负责监听socket连接事件,监听到socket新连接事件后,就把fd发给其中一个接收者Accepter;
接收者Accepter接收到socket的fd后,打开该socket连接,与客户端进行网络通信。
此简单的HttpServer接收到Http报文后,进行response一份Http报文,然后关闭socket完成Http短连接操作。
具体源码如下:
#include <assert.h>
#include <map>
#include <set>
#include <memory>
#include <string.h>
#include "opensocket.h"
#include "open/openthread.h"
using namespace open;
const std::string TestServerIp_ = "0.0.0.0";
const int TestServerPort_ = 8888;
//msgType == 1
struct SocketProto : public OpenThreadProto
{
std::shared_ptr<OpenSocketMsg> data_;
static inline int ProtoType() { return 1; }
virtual inline int protoType() const { return SocketProto::ProtoType(); }
};
//msgType == 2
struct RegisterProto : public OpenThreadProto
{
int srcPid_;
static inline int ProtoType() { return 2; }
virtual inline int protoType() const { return RegisterProto::ProtoType(); }
RegisterProto() :srcPid_(-1) {}
};
//msgType == 3
struct NewClientProto : public OpenThreadProto
{
int accept_fd_;
std::string addr_;
static inline int ProtoType() { return 3; }
virtual inline int protoType() const { return NewClientProto::ProtoType(); }
NewClientProto() : accept_fd_(-1) {}
};
////////////App//////////////////////
class App
{
static void SocketFunc(const OpenSocketMsg* msg)
{
if (!msg) return;
if (msg->uid_ >= 0)
{
auto proto = std::shared_ptr<SocketProto>(new SocketProto);
proto->srcPid_ = -1;
proto->srcName_ = "OpenSocket";
proto->data_ = std::shared_ptr<OpenSocketMsg>((OpenSocketMsg*)msg);
if (!OpenThread::Send((int)msg->uid_, proto))
printf("SocketFunc dispatch faild pid = %lld\n", msg->uid_);
}
else
{
delete msg;
}
}
public:
static App Instance_;
OpenSocket openSocket_;
App() { openSocket_.run(App::SocketFunc); }
};
App App::Instance_;
////////////Listener//////////////////////
class Listener : public OpenThreadWorker
{
int listen_fd_;
unsigned int balance_;
std::set<int> setSlaveId_;
std::vector<int> vectSlaveId_;
public:
Listener(const std::string& name)
:OpenThreadWorker(name),
listen_fd_(-1)
{
balance_ = 0;
registers(SocketProto::ProtoType(), (OpenThreadHandle)&Listener::onSocketProto);
registers(RegisterProto::ProtoType(), (OpenThreadHandle)&Listener::onRegisterProto);
}
virtual ~Listener() {}
virtual void onStart()
{
listen_fd_ = App::Instance_.openSocket_.listen((uintptr_t)pid(), TestServerIp_, TestServerPort_, 64);
if (listen_fd_ < 0)
{
printf("Listener::onStart faild listen_fd_ = %d\n", listen_fd_);
assert(false);
}
App::Instance_.openSocket_.start((uintptr_t)pid(), listen_fd_);
printf("HTTP: %s:%d\n", TestServerIp_.c_str(), TestServerPort_);
}
void onRegisterProto(const RegisterProto& proto)
{
if (proto.srcPid_ >= 0)
{
if (setSlaveId_.find(proto.srcPid_) == setSlaveId_.end())
{
setSlaveId_.insert(proto.srcPid_);
vectSlaveId_.push_back(proto.srcPid_);
printf("Hello OpenSocket HttpServer, srcPid = %d\n", proto.srcPid_);
}
}
}
// new client socket dispatch to Accept
void notifyToSlave(int accept_fd, const std::string& addr)
{
if (!vectSlaveId_.empty())
{
auto proto = std::shared_ptr<NewClientProto>(new NewClientProto);
proto->accept_fd_ = accept_fd;
proto->addr_ = addr;
if (balance_ >= vectSlaveId_.size())
{
balance_ = 0;
}
int slaveId = vectSlaveId_[balance_++];
if (OpenThread::Send(slaveId, proto))
{
return;
}
printf("Listener::notifyToSlave send faild pid = %d\n", slaveId);
}
App::Instance_.openSocket_.close(pid_, accept_fd);
}
void onSocketProto(const SocketProto& proto)
{
const auto& msg = proto.data_;
switch (msg->type_)
{
case OpenSocket::ESocketAccept:
// linsten new client socket
notifyToSlave(msg->ud_, msg->data());
printf("Listener::onSocket [%s]ESocketAccept:acceptFd = %d\n", ThreadName((int)msg->uid_).c_str(), msg->ud_);
break;
case OpenSocket::ESocketClose:
break;
case OpenSocket::ESocketError:
printf("Listener::onSocket [%s]ESocketError:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketWarning:
printf("Listener::onSocket [%s]ESocketWarning:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketOpen:
break;
case OpenSocket::ESocketUdp:
case OpenSocket::ESocketData:
assert(false);
break;
default:
break;
}
}
};
////////////HttpRequest//////////////////////
struct HttpRequest
{
int fd_;
std::string addr_;
std::string method_;
std::string url_;
int code_;
int clen_;
std::string head_;
std::string body_;
std::map<std::string, std::string> headers_;
HttpRequest() :fd_(-1), code_(-1), clen_(-1) {}
//GET /xx/xx HTTP/x.x
bool parseHeader();
bool pushData(const char* data, size_t size);
};
////////////Accepter//////////////////////
class Accepter : public OpenThreadWorker
{
int listenId_;
std::map<int, HttpRequest> mapClient_;
public:
Accepter(const std::string& name)
:OpenThreadWorker(name),
listenId_(-1)
{
registers(SocketProto::ProtoType(), (OpenThreadHandle)&Accepter::onSocketProto);
registers(NewClientProto::ProtoType(), (OpenThreadHandle)&Accepter::onNewClientProto);
}
virtual ~Accepter() {}
virtual void onStart()
{
while (listenId_ < 0)
{
listenId_ = ThreadId("listener");
OpenThread::Sleep(1000);
}
auto proto = std::shared_ptr<RegisterProto>(new RegisterProto);
proto->srcPid_ = pid();
if (OpenThread::Send(listenId_, proto))
return;
printf("Accepter::onStart send faild pid = %d\n", listenId_);
}
void onNewClientProto(const NewClientProto& proto)
{
int accept_fd = proto.accept_fd_;
if (accept_fd >= 0)
{
auto iter = mapClient_.find(accept_fd);
if (iter != mapClient_.end())
{
assert(false);
mapClient_.erase(iter);
App::Instance_.openSocket_.close(pid(), accept_fd);
return;
}
auto& client = mapClient_[accept_fd];
client.fd_ = accept_fd;
client.addr_ = proto.addr_;
App::Instance_.openSocket_.start(pid_, accept_fd);
}
}
//GET /xx/xx HTTP/x.x
void onReadHttp(const std::shared_ptr<OpenSocketMsg> msg)
{
auto iter = mapClient_.find(msg->fd_);
if (iter == mapClient_.end())
{
App::Instance_.openSocket_.close(pid_, msg->fd_);
return;
}
auto& request = iter->second;
if (!request.pushData(msg->data(), msg->size()))
{
//Header too large.close connet.
if (request.head_.size() > 1024)
App::Instance_.openSocket_.close(pid_, msg->fd_);
return;
}
printf("new client:url = %s\n", request.url_.c_str());
std::string content;
content.append("<div>It's work!</div><br/>" + request.addr_ + "request:" + request.url_);
std::string buffer = "HTTP/1.1 200 OK\r\ncontent-length:" + std::to_string(content.size()) + "\r\n\r\n" + content;
App::Instance_.openSocket_.send(msg->fd_, buffer.data(), (int)buffer.size());
}
virtual void onSocketProto(const SocketProto& proto)
{
const auto& msg = proto.data_;
switch (msg->type_)
{
case OpenSocket::ESocketData:
onReadHttp(msg);
break;
case OpenSocket::ESocketClose:
mapClient_.erase(msg->fd_);
break;
case OpenSocket::ESocketError:
mapClient_.erase(msg->fd_);
printf("Accepter::onStart [%s]ESocketError:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketWarning:
printf("Accepter::onStart [%s]ESocketWarning:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketOpen:
{
auto iter = mapClient_.find(msg->fd_);
if (iter == mapClient_.end())
{
App::Instance_.openSocket_.close(pid_, msg->fd_);
return;
}
}
break;
case OpenSocket::ESocketAccept:
case OpenSocket::ESocketUdp:
assert(false);
break;
default:
break;
}
}
};
int main()
{
printf("start server==>>\n");
std::vector<OpenThreader*> vectServer = {
new Listener("listener"),
new Accepter("accepter1"),
new Accepter("accepter2"),
new Accepter("accepter3"),
new Accepter("accepter4")
};
for (size_t i = 0; i < vectServer.size(); ++i)
vectServer[i]->start();
printf("wait close==>>\n");
OpenThread::ThreadJoinAll();
for (size_t i = 0; i < vectServer.size(); ++i)
delete vectServer[i];
vectServer.clear();
printf("Pause\n");
return getchar();
}
bool HttpRequest::parseHeader()
{
if (!headers_.empty() || head_.size() < 12) return true;
std::string line;
const char* ptr = strstr(head_.c_str(), "\r\n");
if (!ptr) return false;
clen_ = -1;
line.append(head_.c_str(), ptr - head_.c_str());
int state = 0;
method_.clear();
url_.clear();
for (size_t k = 0; k < line.size(); ++k)
{
if (state == 0)
{
if (line[k] != ' ')
{
method_.push_back(line[k]);
continue;
}
state = 1;
while (k < line.size() && line[k] == ' ') ++k;
if (line[k] != ' ') --k;
}
else
{
if (line[k] != ' ')
{
url_.push_back(line[k]);
continue;
}
break;
}
}
line.clear();
int k = -1;
int j = -1;
std::string key;
std::string value;
for (size_t i = ptr - head_.c_str() + 2; i < head_.size() - 1; i++)
{
if (head_[i] == '\r' && head_[i + 1] == '\n')
{
if (j > 0)
{
k = 0;
while (k < line.size() && line[k] == ' ') ++k;
while (k >= 0 && line.back() == ' ') line.pop_back();
value = line.data() + j + 1;
while (j >= 0 && line[j] == ' ') j--;
key.clear();
key.append(line.data(), j);
for (size_t x = 0; x < key.size(); x++)
key[x] = std::tolower(key[x]);
headers_[key] = value;
}
++i;
j = -1;
line.clear();
continue;
}
line.push_back(head_[i]);
if (j < 0 && line.back() == ':')
{
j = (int)line.size() - 1;
}
}
clen_ = std::atoi(headers_["content-length"].c_str());
return true;
}
bool HttpRequest::pushData(const char* data, size_t size)
{
if (code_ == -1)
{
head_.append(data, size);
const char* ptr = strstr(head_.data(), "\r\n\r\n");
if (!ptr) return false;
code_ = 0;
body_.append(ptr + 4);
head_.resize(ptr - head_.data() + 2);
if (!parseHeader()) return false;
}
else
{
body_.append(data, size);
}
if (clen_ >= 0)
{
if (clen_ == 0 && clen_ == body_.size())
{
return true;
}
if (clen_ >= body_.size())
{
body_.resize(clen_);
return true;
}
}
else if (body_.size() > 2)
{
if (body_[body_.size() - 2] == '\r' && body_.back() == '\n')
{
body_.pop_back();
body_.pop_back();
return true;
}
}
return false;
}
编译和执行
请安装cmake工具,用cmake可以构建出VS或者XCode工程,就可以在vs或者xcode上编译运行。
源代码:https://github.com/openlinyou/opensocket
https://gitee.com/linyouhappy/opensocket
#克隆项目
git clone https://github.com/openlinyou/opensocket
cd ./opensocket
#创建build工程目录
mkdir build
cd build
cmake ..
#如果是win32,在该目录出现opensocket.sln,点击它就可以启动vs写代码调试
make
./httpserver
相关文章
- OpenSocket是跨全平台的高性能高并发网络库
- Go语言实现的可读性更高的并发神库
- OpenHttps是跨全平台的Actor模式、组件设计的高性能、高并发的超轻量、超迷你的Https框架
- java并发编程(1):Java多线程-基本线程类-基础知识复习笔记
- Redis高可用高性能缓存的应用系列06 - 热Key,大Key,并发竞争解决方案
- java并发编程(十)
- Memcached缓存失效时,如何处理高并发的查询DB详解架构师
- PHP与Redis实现高性能并发处理(phpredis并发)
- 实现Redis多线程高性能并发(redis多线程并发)
- 的性能优化MySQL高并发锁的性能提升之道(mysql高并发锁)
- Linux并发连接:探索极致的性能(linux的并发连接)
- 基于Redis的高性能并发锁(redis并发锁)
- 从12306说起 如何构建高并发高性能网站
- 告别Disruptor(一) 简洁优雅的高性能并发队列
- MySQL并发查询优化技巧(mysql 查询并发)
- Oracle数据库的高并发应对之道(oracle数据库并发量)
- Redis实现高性能高并发处理(redis高级并发功能)
- Redis高性能之路开启高并发之门(redis高并发的原因)
- Redis架设高性能分布式系统(redis高并发分布式)
- Redis锁实现安全并发的利器(redis锁是啥)
- Redis配置实现高性能的并发处理(redis 配置 并发)