handy源码分析

Handy网络库分析

最近想学一下Linux的网络编程,于是想先阅读一些网络库,所以这篇博客可能会有一些比较基本的知识点。

直接开始,我们先来看看利用Doxygen生成的Include依赖关系图

include-relationship

我们就按从下往上来分析一下各个头文件所实现的功能

slice.h

定义字符串切片类(包含front() / back() / begin() / end() 等类似容器的操作,但实际的实现是char*)以及切片的一些操作(返回第一个词,返回第一行,返回前n个字符等等)

port_posix.h

用于解决主机字节序到网络字节序的转换,根据 OS_LINUX / OS_MACOSX 实现不同的getHostByName

util.h

这里noncopyable是通过将拷贝构造函数拷贝赋值运算符声明为禁止使用(=delete)实现

这样实现的好处:

  • 在c++11之前实现该功能需要将这两个函数设为private,但依然无法避免成员函数和友元函数的使用
  • 通过更少/更明确的代码实现,且可复用(从noncopyable类派生)

该类的作用:

  • 防止对象被拷贝。尤其是在涉及到文件操作/网络连接的类中,资源的释放/关闭可能会出现问题
1
2
3
4
5
6
7
8
//boost::noncopyable
struct noncopyable
{
noncopyable() = default;

noncopyable(const noncopyable&) = delete;
noncopyable& operator=(const noncopyable&) = delete;
};

具体解释可看Explicitly Defaulted and Deleted Functions

util类则包含

  • 格式化输出string

    进行两次尝试:

    第1次尝试:

    申请char[500]的栈内存,va_start获取可变参数列表,通过vsnprintf()得到格式化后的字符串,判断返回大小是否超出限制,未超出结束尝试并返回,超出则进行第二次尝试

    第2次尝试:

    申请char[30000]的堆内存,并且由unique_ptr管理(销毁),通过vsnprintf()得到格式化后的字符串,判断返回大小是否超出限制,未超出结束尝试并返回,超出则截断,将字符串最后一字节设为结束字符‘\0’

  • 基础的时间函数(std::chrono库)

  • atoi() ---- 将字符串转换为整数,函数内部通过strtol()实现

  • 更改(添加)文件描述符flag(fcntl)

ExitCaller类继承于noncopyable,只有一个私有成员std::function<void()> functor_,即一个无参数返回int的可调用对象,并且在析构时进行调用执行。当实例为局部变量时,作用与golang的defer类似

net.h

net类包含一些设置函数

  • setNonBlock()设置文件的 阻塞/非阻塞IO

  • setReuseAddr() / setReusePort() / setNoDelay() 设置套接字的属性

    SO_REUSEADDR允许 绑定一个正处于TIME_WAIT中的本地地址 / 通配符IP(INADDR_ANY)地址冲突

    SO_REUSEPORT允许多个套接字绑定到同一地址端口组合(所有套接字必须都设置了SO_REUSEPORT)

    TCP_NODELAY 关闭Nagle缓存算法,允许小包的发送(适合延时敏感且数据量较小的情况)

    man socket

    How do SO_REUSEADDR and SO_REUSEPORT differ?

Ip4Addr类则是对sockaddr_in进行了一些简单的封装,提供一些常用的函数(format/getter/validate)

Buffer类是连续的动态伸缩的内存空间,用于缓存

内存空间大小由私有成员变量

char *buf_内存地址起始地址 / size_t b_正在使用内存空间(缓存数据但未使用)的起始地址相对buf的偏移量 / size_t e_正在使用内存空间的结束地址 + 1数据单元地址相对buf的偏移量 / size_t cap_已分配空间大小 / size_t exp_建议/期望分配大小

  • 添加数据:先调用allocRoom(len)分配足够大小空间,然后memcpy()拷贝数据

  • 分配空间:调用makeRoom(len)获得所分配空间的首地址

    在分配过程中,如果内存空间足够(e_ + len <= cap_),无需操作

    如果内存尾部空间不足

    1. 判断当前所需要使用的内存是否小于已分配的一半(size() + len < cap_/2),是则将有效数据移动到起始位置buf_
    2. 调用expand(len)重新分配一块内存,内存大小为max(exp_, max(2 * cap_, size() + len))
  • 获取数据:consume(size_t len)直接b_ += len,所以使用时需保证len的大小正确

codec.h

TCP的粘包/拆包问题:TCP是字节流协议,消息之间没有边界

一般的处理:

  • 发送定长包。如果每个消息的大小都是一样的,那么在接收对等方只要累计接收数据,直到数据等于一个定长的数值就将它作为一个消息。
  • 包尾加上\r\n标记。FTP协议正是这么做的。但问题在于如果数据正文中也含有\r\n,则会误判为消息的边界。
  • 包头加上包体长度。包头是定长的4个字节,说明了包体的长度。接收对等方先接收包体长度,依据包体长度来接收包体。
  • 使用更加复杂的应用层协议

CodecBase类 就是用来处理这些问题,其派生类LineCodec采用第二种方案,LengthCodec采用第三种方案

LengthCodec类的具体编码:“mBdt”(四个字节)+ 消息长度(四个字节)+消息

threads.h

SafeQueue类

要做到线程安全,我们需要利用锁。一般在封装线程安全的类时,我们会直接声明一个mutex成员变量,但是该类是通过私有继承mutex类实现,所以上锁的时候直接传入(*this):lock_guard<mutex> lk(*this)

push()任务时直接上锁,然后将任务添加到链表中,调用notify_one()唤醒一个等待条件变量ready_的线程

wait_ready()使用条件变量阻塞线程,直到 超时 / 队列中不为空(有任务) / 退出

pop_wait()调用wait_ready(),返回空任务(退出/超时)或者队列头任务(队列不为空)

ThreadPool类

构造一个最大等待任务数量为taskCapacity,线程数量为thread的线程池,当线程启动时,遍历线程集合,对于每一个线程,创建一个新的线程执行pop_wait()获取任务并执行,然后交换,即将新线程替换旧线程

logging.h

定义了8个日志等级

1
enum LogLevel { LFATAL = 0, LERROR, LUERR, LWARN, LINFO, LDEBUG, LTRACE, LALL };

Logger类是通过getLogger()获取一个静态单例(c++11保证局部静态变量的初始化是线程安全的),功能

  1. 可以绝对setLogLevel() / 相对adjustLogLevel()设置 / 获取getLogLevel()当前的日志输出等级

  2. 自定义设置日志文件,打开方式

    • O_APPEND将写入追加到文件的尾端
    • O_CREAT不存在则自动创建,权限为#define DEFFILEMODE (S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH) 即 User/Group/Others可读可写
    • O_WRONLY只写
    • O_CLOEXEC设置文件描述符close-on-exec,即当fork()的子进程成功调用exec()类函数时,会自动关闭该文件描述符,避免泄露给子进程

    修改成员函数的时候使用的是dup2(),避免close()+dup()两个函数会受到竞争条件的影响:比如在两个函数之间,成员函数被赋值为另一个fd

  3. 设置日志周期rotateInterval_

    • 如果当前时间与当前日志文件创建时间在同一周期内,无需新建日志文件
    • 否则,保存并重命名为 当前年月日时分
  4. 写入日志logv(int level, const char *file, int line, const char *func, const char *fmt...)

    • 如果写入日志等级高于当前日志等级,退出
    • 检查是否开启新的日志周期(重命名)
    • 写入详细的时间 + tid + 发生的文件/位置 + 事件内容 + ‘\n’

status.h

表示文件操作后的状态

表示格式:

state_[0…3] == length of state_

state_[4…7] == code

state_[8…] == message

file.h

file类中包含了一些文件io的静态成员函数,返回Status

  • getContent(const string &filename, string &cont):将文件内容读(append)到字符串中

  • writeContent(const string &filename, const string &cont):截断并从头往文件写入字符串中内容

  • createDir(const string &name)/ deleteDir(const string &name):创建/删除文件目录

  • getChildren(const string &dir, vector<string> *result):通过readdir()获取遍历目录文件信息,然后它们的文件名 push_back(d_name)*result

  • getFileSize((const string &fname, uint64_t *size)):通过调用stat(const char *path, struct stat *buf) 获取文件大小,并将*size赋值为文件大小buf.st_size

  • deleteFile(const string &fname):通过调用unlink(const char *pathname)删除文件

    unlink() 从文件系统删除一个名字.

    • 如果这个名字是指向文件的最后一个链接并且该文件当前没有被某进程打开,则删除该文件
    • 如果这个名字是指向文件的最后一个链接并且该文件正在被某进程打开,则等待指向该文件的最后一个文件描述符被关闭时删除该文件
    • 如果这个名字指向一个软链接,则删除该软连接
    • 如果名称指向一个socket、fifo或设备,则删除其名称,但打开该对象的进程可以继续使用它。
  • 调用writeContent(tmpName,cont)tmpName文件写入内容成功后,调用renameFile(tmpName,name)重命名

handy_imp.h

AutoContext类通过模板类实现对传入类的内存管理,自动(第一次访问时)new和(析构时)delete

poller.h

结合event_base一起看

基于PollerBase抽象类,根据Linux和MacOS不同平台派生类,实现了对epoll的封装

epoll

epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无需像 selectpoll 那样每次调用都要重复传入文件描述符集或事件集。但 epoll 需要使用一个额外的文件描述符来唯一标识内核中的这个事件表

这里我们先来看看epoll的几个函数

  • int epoll_create(int size):用来创建上述的文件描述符。size参数现在不起作用,只是给内核一个提示,告诉它事件表需要多大

  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

    fd 是要操作的文件描述符,op 参数则指定操作类型,有如下3种:

    • EPOLL_CTL_ADD:往事件表种注册 fd 上的事件
    • EPOLL_CTL_MOD:修改 fd 上的注册事件
    • EPOLL_CTL_DEL:删除 fd 上的注册事件

    event 参数指定事件,epoll_event 结构体定义了:events 成员描述事件类型,data 成员存储用户数据

  • int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout)

    epoll_wait()在一段超时时间内等待一组文件描述符上的事件,timeout 指定超时值,单位是ms,maxevents 指定最多监听多少个事件

    epoll_wait() 如果检测到事件,就将所有就绪的事件从内核事件表中复制它的第二个参数events指向的数组中

Level Trigger

epoll_wait() 检测到有事件发生并将其通知应用程序后,应用程序可以不立即处理该事件。当应用程序下一次调用 epoll_wait() 时,此事件仍然会被通知,直到该事件被处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void lt(epoll_event* events, int number, int epollfd, int listenfd){
char buf[BUFFER_SIZE];
for(int i = 0; i < number; i++){
int sockfd = events[i].data.fd;
if(sockfd == listenfd) { /*Regist*/ }
else if (events[i].events & EPOLLIN){
int ret = recv( sockfd, buf, BUFFER_SIZE - 1, 0);
if(ret <= 0){
close(sockfd);
continue;
}
//data processing
}
else {}
}
}

Edge Trigger

epoll_wait() 检测到有事件发生并将其通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait() 不再向应用程序通知该事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void et(epoll_event* events, int number, int epollfd, int listenfd){
char buf[BUFFER_SIZE];
for(int i = 0; i < number; i++){
int sockfd = events[i].data.fd;
if(sockfd == listenfd){ /*Regist*/ }
else if(events[i].events & EPOLLIN){
while(1){
int ret = recv( sockfd, buf, BUFFER_SIZE - 1, 0);
if(ret < 0){
if( (errno == EAGAIN ) || (errno == EWOULDBLOCK))break;
close(sockfd);
break;
}
else if(ret == 0)close(sockfd);
else { /*data processing*/ }
}
}
else{}
}
}

event_base.h

事件的到来是随机的、异步的。我们无法预知程序何时收到一个客户连接请求 / 暂停信号。所以程序需要循环等待并处理事件,这就是事件循环。在事件循环中,等待事件一般使用I/O复用技术实现。

将系统支持的各种I/O复用系统调用封装成统一的接口,称为事件多路分发器,一般包含三个函数:等待事件(select/poll/epoll_wait),添加事件,删除事件。

事件处理器执行事件对应的业务逻辑,通常包含一个或多个回调函数,这些回调函数在事件循环中被执行。

TimerRepeatable 类是一个带有编号,回调函数的定时器,通过其中的 at 过期时间,interval定时周期控制

事件循环EventBase::loop() -> EventsImp::loop() -> poller::loop_once()

事件分发器EventBase::safeCall() -> EventsImp::safeCall() ->

EventsBase类是对 EventsImp 类再进一层封装,简化操作,方便多线程事件派发器MultiBase的实现

EventsImp 类成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
EventBase *base_;		//所属的事件分发器
PollerBase *poller_;
std::atomic<bool> exit_; //设置退出
int wakeupFds_[2]; //用于唤醒IO线程
int nextTimeout_;
SafeQueue<Task> tasks_;

std::map<TimerId, TimerRepeatable> timerReps_; //循环定时任务
std::map<TimerId, Task> timers_; //定时任务
std::atomic<int64_t> timerSeq_; //给每个定时器编号
// 记录每个idle时间(单位秒)下所有的连接。链表中的所有连接,最新的插入到链表末尾。连接若有活动,会把连接从链表中移到链表尾部,做法参考memcache
std::map<int, std::list<IdleNode>> idleConns_; //空闲连接
std::set<TcpConnPtr> reconnectConns_; //重连连接
bool idleEnabled; //是否正在idle watcher模式

//调用poller::loop_once(),然后调用handleTimeouts()处理超时任务
void loop_once(int waitMs);
//循环调用loop_once(10000)直至设置退出,然后清空所有定时任务,清空连接,loop_once(0)
void loop();
//创建管道,然后向poller注册它的可读事件,用于唤醒IO线程
void init();

//如果连接空闲时间超过idle,重置该连接的空闲时间,将其从链表头部放到尾部,执行空闲超时回调
void callIdles();
//启用idle watcher模式,将注册事件添加到对应idle秒的链表末尾
IdleId registerIdle(int idle, const TcpConnPtr &con, const TcpCallBack &cb);
//取消id对应的空闲连接监控事件
void unregisterIdle(const IdleId &id);
//将id从所在的监控链表放到末尾
void updateIdle(const IdleId &id);

//将超时的任务移除并执行,然后调用refreshNearest()
void handleTimeouts();
//检查距离下一次超时的时间
void refreshNearest(const TimerId *tid = NULL);
//对于需要重复执行的超时任务,执行之前原子更新其定时器,回调函数为重新调用repeatableTimeout()
void repeatableTimeout(TimerRepeatable *tr);
//添加定时任务
//对于循环定时任务TimerId.first = -milli,放进 timers_ 和 timerReps_ 里
//对于一次定时任务TimerId.first = milli,放进 timers_ 里
TimerId runAt(int64_t milli, Task &&task, int64_t interval);
//取消定时任务
//与runAt()规则一样,将其从 timers_ / timerReps_ 里删除
bool cancel(TimerId timerid);

EventBase 类只有一个事件分发器,只在主线程上运行,所有的事件处理也是在主线程上

EventBase 类只有一个事件分发器,只在主线程上运行,所有的事件处理也是在主线程上

MultiBase类拥有多个事件分发器EventBase ,在执行loop()的时候,为第0 ~ n-1个事件分发器创建一个线程执行EventBase::loop(),第n个事件分发器(主Reactor,需处理accept)则在主线程上执行,所有的事件处理在各自的线程上

conn.h

分别使用TcpConn记录访问的tcp连接,TcpServer(包含事件分发器)处理具体的事件

enable_shared_from_this<>shared_from_this()

enable_shared_from_this派生的对象可以使用成员函数中的 shared_from_this 方法创建一个 shared_ptr 所有者,与现有 shared_ptr 所有者共享实例所有权。
否则,如果通过 this 创建新的 shared_ptr ,则它与现有的 shared_ptr 所有者不同,这可能导致无效引用或导致该对象被删除多次。

先来看看TcpConn类的一些成员函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//用Channel封装套接字 并 设置回调函数
void TcpConn::attach(EventBase *base, int fd, Ip4Addr local, Ip4Addr peer);
//建立非阻塞socket的tcp连接
void TcpConn::connect(EventBase *base, const string &host, unsigned short port, int timeout, const string &localip);
//将连接放到EventsImp的等待重连队列中,超时之后从队列中取出,重新connnect()
void TcpConn::reconnect();
//设置重连时间间隔,-1: 不重连,0:立即重连,其它:等待毫秒数,未设置不重连
void setReconnectInterval(int milli);
//通过EventBase::safeCall()关闭channel_
void TcpConn::close();

//表示当前连接失败或关闭
//(如果设置)重连 / 移出空闲监控 -> 清除回调函数 -> delete channel
void TcpConn::cleanup(const TcpConnPtr &con);

//判断当前状态是否正确(connected)
//然后调用read()/write()
//根据返回的错误码执行相应的操作
void TcpConn::handleRead(const TcpConnPtr &con);
void TcpConn::handleWrite(const TcpConnPtr &con);

//当接收到消息且处于handshaking状态就需要调用该函数,使连接监听读/写->只监听读
int TcpConn::handleHandshake(const TcpConnPtr &con)

//调用wirte()往socket写入数据,对不同的errno进行处理
//EINTR -> 重新write()
//EAGAIN or EWOULDBLOCK -> 不继续发送,启用写事件监听,HandleWrite()处理
ssize_t TcpConn::isend(const char *buf, size_t len);
//根据参数重载,实质调用isend()发送,如果未发送完成,添加到输出缓冲区,通过handleWrite()发送
void TcpConn::send();

//两种模式: onRead()和onMsg()只能选择其一
//onMsg()也是通过调用onRead()实现,只是需要先decode
void TcpConn::onMsg(CodecBase *codec, const MsgCallBack &cb);
//先encode再send()
void TcpConn::sendMsg(Slice msg);

//连接空闲idle秒后执行回调cb
void TcpConn::addIdleCB(int idle, const TcpCallBack &cb);

输入/输出缓冲区的设置:

输入/输出缓冲区的设置:

为什么 non-blocking 网络编程中应用层 buffer 是必须的?—— 陈硕的Blog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
struct TcpServer : private noncopyable {
TcpServer(EventBases *bases);
// return 0 on sucess, errno on error
//创建socket,为socket设置SO_REUSEADDR/SO_REUSEPORT/FD_CLOEXEC
//通过调用::bind绑定ip和端口
//lsiten()设置该套接字等待处理的队列长度为20
//将套接字封装为listen_channel_,设置套接字接收数据时的执行的函数
//poller::loop_once()->Channel::handleRead()->TcpServer::handleAccept()
int bind(const std::string &host, unsigned short port, bool reusePort = false);
//创建以bases为事件分发器的tcpserver,调用TcpServer::bind()
static TcpServerPtr startServer(EventBases *bases, const std::string &host, unsigned short port, bool reusePort = false);
~TcpServer() { delete listen_channel_; }
Ip4Addr getAddr() { return addr_; }
EventBase *getBase() { return base_; }
//添加tcp连接时,为新的tcp连接所设置的回调函数
void onConnCreate(const std::function<TcpConnPtr()> &cb) { createcb_ = cb; }
void onConnState(const TcpCallBack &cb) { statecb_ = cb; }
void onConnRead(const TcpCallBack &cb) {
readcb_ = cb;
assert(!msgcb_);
}
// 消息处理与Read回调冲突,只能调用一个
void onConnMsg(CodecBase *codec, const MsgCallBack &cb) {
codec_.reset(codec);
msgcb_ = cb;
assert(!readcb_);
}

private:
EventBase *base_;//当前服务器所使用的事件分发器
EventBases *bases_;//负责为新连接分配事件分发器
Ip4Addr addr_;
Channel *listen_channel_;//listen的套接字的Channel封装
//创建的tcp新连接的回调函数
TcpCallBack statecb_, readcb_;
MsgCallBack msgcb_;
std::function<TcpConnPtr()> createcb_;
std::unique_ptr<CodecBase> codec_;
//循环调用accept()当前等待处理的队列
//如果当前连接和分配给新连接的事件分发器一致,则直接创建新连接
//如果不一致,则通过safeCall调用创建新连接
void handleAccept();
};

还有一个半同步半异步服务器HSHA,利用线程池,为每一个消息处理都分配一个worker线程,即同步处理请求,异步管理IO

udp.h

参考conn.h

http.h

HTTP消息是服务器和客户端之间交换数据的方式。有两种类型的消息:客户端发送的用于触发服务器上某个操作的请求和来自服务器的响应

HTTP-RequestHTTP-Response

这里对这两种消息类型进行了抽象HttpMsg和实现HttpRequest / HttpResponse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
struct HttpMsg {
enum Result {Error, Complete, NotComplete, Continue100,};
HttpMsg() { HttpMsg::clear(); };
//内容添加到buf,返回写入的字节数
virtual int encode(Buffer &buf) = 0;
//尝试从buf中解析,默认复制body内容
virtual Result tryDecode(Slice buf, bool copyBody = true) = 0;
//清空消息相关的字段
virtual void clear();

std::string getHeader(const std::string &n) { return getValueFromMap_(headers, n); }
Slice getBody() { return body2.size() ? body2 : (Slice) body; }

//如果tryDecode返回Complete,则返回已解析的字节数
int getByte() { return scanned_; }

//保存Status line/Headers/Body
std::map<std::string, std::string> headers;
std::string version, body;
// body可能较大,为了避免数据复制,加入body2
Slice body2;

protected:
bool complete_;//是否完整读完一个消息
size_t contentLen_;//消息主体长度
size_t scanned_;//已经读过的长度
//line1存储第一行
//以第一个\r\n\r\n作为header和body的分割,如未找到,则header未读完,返回NotComplete
//循环读入每一行的,以第一个词(保证跟着":")为key, 后面为value保存到headers中
//最后根据headers中的"content-length"判断是否读完整个Http Message,没有则返回Continue100
//如果选择copyBody,则将消息主体拷贝到body中,否则仅仅将指针存放在body2中,返回Complete
Result tryDecode_(Slice buf, bool copyBody, Slice *line1);
std::string getValueFromMap_(std::map<std::string, std::string> &m, const std::string &n);
};

HTTP Request Target Request Target / HTTP Message - MDN web docs

HTTP Request Target Request Target / HTTP Message - MDN web docs

  • origin-form

    absolute-path [ “?” query ]

    the most common form, and it is used with GET, POST, HEAD, and OPTIONS methods.

    e.g. GET /background.png HTTP/1.0 HEAD /test.html?query=alibaba HTTP/1.1

  • absolute-form

    absolute-URI

    e.g. GET http://developer.mozilla.org/en-US/docs/Web/HTTP/Messages HTTP/1.1

  • authority-form

    The authority component of a URL, consisting of the domain name and optionally the port (prefixed by a ‘:’), is called the authority form. It is only used with CONNECT when setting up an HTTP tunnel.

    e.g. CONNECT developer.mozilla.org:80 HTTP/1.1

  • asterisk-form

    Used with OPTIONS, representing the server as a whole.

    e.g. OPTIONS * HTTP/1.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct HttpRequest : public HttpMsg {
HttpRequest() { clear(); }
std::map<std::string, std::string> args;
//保存HTTP method(GET/PUT/POST/HEAD...)
//query_uri保存完整的request target
//而uri则保存其中的绝对路径,在请求中与查询字符串以?分隔
std::string method, uri, query_uri;
std::string getArg(const std::string &n) { return getValueFromMap_(args, n); }

// override
// 按格式将Start line/headers(手动添加content-length和长连接)/body添加到buf中
virtual int encode(Buffer &buf);
//只支持origin-form的解析
//将query_uri中"?"后面的查询字符串,通过查找"&"和"=",提取query参数到args中
virtual Result tryDecode(Slice buf, bool copyBody = true);
virtual void clear() {
HttpMsg::clear();
args.clear();
method = "GET";
query_uri = uri = "";
}
};

HttpResponse类和HttpRequest只有以下区别(不考虑HTTP1.1和2的区别(chunked)):

  • HTTP response的起始行叫做status line,包含协议版本/状态码/状态的简单描述

    e.g. HTTP/1.1 404 Not Found

  • 与Request Header相对应的是Response Header,提供有关服务器的其他信息

    e.g. Vary / Accept-Ranges

所以解码只需通过HttpMsg继承的tryDecode_(Slice buf, bool copyBody, Slice *line1)完成,然后单独处理line1

HttpConnPtr类是本质上是一条Tcp连接,进一步封装主要是加入了HttpRequest,HttpResponse的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
struct HttpConnPtr {
TcpConnPtr tcp;
HttpConnPtr(const TcpConnPtr &con) : tcp(con) {}
//允许将HttpConnPtr转换成TcpConnPtr,就是获得其中的tcp
operator TcpConnPtr() const { return tcp; }
//重载->,返回shared_ptr<TcpCoon>中存储的指针
TcpConn *operator->() const { return tcp.get(); }
bool operator<(const HttpConnPtr &con) const { return tcp < con.tcp; }

typedef std::function<void(const HttpConnPtr &)> HttpCallBack;
//利用AutoContext类实现HttpContext的内存管理 并 提供访问
HttpRequest &getRequest() const { return tcp->internalCtx_.context<HttpContext>().req; }
HttpResponse &getResponse() const { return tcp->internalCtx_.context<HttpContext>().resp; }
void sendRequest() const { sendRequest(getRequest()); }
void sendResponse() const { sendResponse(getResponse()); }
//将http request编码后放到tcp的output缓冲区进行发送
void sendRequest(HttpRequest &req) const {}
//将http response编码后放到tcp的output缓冲区进行发送
void sendResponse(HttpResponse &resp) const {}
//文件作为Response
//先读取文件内容,然后将内容指针存到getResponse()的body2,sendResponse()
void sendFile(const std::string &filename) const;
//客户端:将tcp的输入缓冲区中已解析为HttpResponse的数据移除,然后清空getResponse()
//服务端:将tcp的输入缓冲区中已解析为HttpRequest的数据移除,然后清空getRequest()
void clearData() const;
//设置tcp数据到达时的回调函数:
//为tcp生成一个HttpConnPtr对象从而使用其中的handleRead()
//将tcp缓冲区中解析到getRequest()/getResponse(),并执行回调函数cb()
void onHttpMsg(const HttpCallBack &cb) const;
//当数据到达
//poller::loop_once()->Channel::handleRead()->TcpConn::handleRead()
//->TcpConn::readcb_()

protected:
struct HttpContext {
HttpRequest req;
HttpResponse resp;
};
//如果是服务器,从tcp的输入缓冲区解析到getRequest()
//解析出错,则关闭tcp连接,解析未完成则请求客户端继续发送,解析完成执行回调函数
//如果是客户端,从tcp的输入缓冲区解析到getResponse()
//解析出错,则关闭tcp连接,解析完成执行回调函数
void handleRead(const HttpCallBack &cb) const;
void logOutput(const char *title) const;
};
struct HttpServer : public TcpServer {
//TcpServer(bases)
//设置当前tcp连接accept其他connect请求时,新的tcp的连接方式:
//创建一条新的tcp连接,用它生成一个HttpConnPtr,设置该HttpConnPtr的onHttpMsg():
//当HttpConnPtr所拥有的tcp数据到来时,解析为httpRequest,并执行对应的cbs_,再执行defcb_
HttpServer(EventBases *bases);
template <class Conn = TcpConn>
void setConnType() {
conncb_ = [] { return TcpConnPtr(new Conn); };
}
//设置当接收到GET uri的请求时,需要执行的函数
void onGet(const std::string &uri, const HttpCallBack &cb) { cbs_["GET"][uri] = cb; }
//设置当接收到何种方法,何种uri的请求时需要执行的函数
void onRequest(const std::string &method, const std::string &uri, const HttpCallBack &cb) { cbs_[method][uri] = cb; }
//设置接收并以对应的方式处理完之后还需要执行的函数
//默认是发送404 Not Found的Http Response
void onDefault(const HttpCallBack &cb) { defcb_ = cb; }
private:
HttpCallBack defcb_; //处理完请求之后的回调函数
std::function<TcpConnPtr()> conncb_;
std::map<std::string, std::map<std::string, HttpCallBack>> cbs_;
//保存[method][uri]对应请求所需执行的函数
};

conf.h

Conf类是对INI配置文件读写的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 0 success
// -1 IOERROR
// >0 line no of error
int parse(const std::string &filename);

// Get a string value from INI file
std::string get(std::string section, std::string name, std::string default_value);
// Get an integer (long) value from INI file
long getInteger(std::string section, std::string name, long default_value);
// Get a real (floating point double) value from INI file
double getReal(std::string section, std::string name, double default_value);
// Get a boolean value from INI file
bool getBoolean(std::string section, std::string name, bool default_value);
// Get a string value from INI file
std::list<std::string> getStrings(std::string section, std::string name);

std::map<std::string, std::list<std::string>> values_;
std::string filename;

daemon.h

参考Unix Daemon Server Programming

创建守护进程一般有以下几步

  1. fork()父进程,让父进程退出,则孤儿进程会成为 init 的子进程

  2. 调用setsid()创建新会话,当前进程成为 新会话&新进程组 的领导进程,没有控制tty

    在Unix系统中,进程在进程组内运行,因此在一个进程组中的所有进程被视为单个实体,子进程也会继承父进程的进程组和会话。而服务器不应从启动它的进程中接收信号,因此它必须将自己与控制tty分离,所以我们需要通过setsid()进一步独立进程

  3. 大多数服务器以超级用户身份运行,出于安全原因,它们应保护自己创建的文件。umask() 设置用户掩码将防止在创建文件时可能出现的不安全文件特权。

  4. 服务器应在已知目录中运行 chdir("/servers/")

  5. 大多数服务一次只需要运行一个服务器副本。因此我们通过文件锁定loackf来保持单例。服务器的第一个实例将锁定文件,以便其他实例了解实例已在运行。 如果服务器终止,锁将自动释放,以便新实例可以运行。

  6. 进程可能会从用户或进程接收信号,因此最好抓住这些信号并相应地执行操作。 子进程终止时会发送SIGCHLD信号,服务器进程必须忽略或处理这些信号。

  7. 日志消息的处理 -> 标准IO / 写入文件 / 系统日志守护进程syslogd

stat-svr.h

HttpServer添加query 参数stat,方便外部的工具查看应用程序的状态

总结

EventBases负责线程与EventBase(持有poller)之间的联系,底层的poller(使用epoll水平触发) 检测到POLLIN / POLLOUT读写事件时,调用事件对应 ChannelhandleRead() / handleWrite() 执行 I/O,具体的操作取决于上层的协议,如TcpCoon 所持有的套接字的Channel的就会调用TcpConn::handleRead(),然后一次性把 socket 里的数据读完(从操作系统 buffer 搬到应用层 buffer),否则会反复触发 POLLIN 事件,造成 busy-loop 。对于数据不完整的情况,则数据依然存放在应用层buffer中直到构成完整的消息,因此每个 TcpConn 都需配置 input buffer,同理也需要output buffer,并且程序只负责调用send往应用层buffer中添加发送内容,然后由库接手,为 socket 注册 POLLOUT 事件,只要socket可写就将数据从应用层buffer搬到操作系统buffer,直到写完后停止POLLOUT

Reactor

线程模型:

每个线程最多有一个 EventBase,每个TcpConn 所在的线程由其所属的 EventBaseTcpServer通过调用allocBase()分配)决定

所有对IO和buffer的读写,都应该在IO线程中完成

单线程:同步处理请求,同步管理IO,全部在主线程完成,不需要考虑线程安全

多线程:

  • MultiBase —— 通过vector + atomic<int>实现对 EventBase 的分配(Round Robin)

    (类似于多个单线程模型,只是需要考虑线程安全)

    一个reactor对应一个EventBase。主Reactor只有一个,只负责监听新的连接,accept后将这个连接分配到子Reactor上,子Reactor可以有多个,这样可以分摊一个EventBase的压力。

    reactor-master

  • HSHA服务器(同步处理请求,异步管理IO)则主要是实现了上图下部的Thread Pool的例子,可以设置消息模式时,将每一个读写任务添加到queued tasks中即业务线程池,适用于阻塞型或者耗时型的任务,尽可能不影响每一个EventBase::loop()