概述
在写 Go 网络程序的时候,有个 TCP 连接的方法系列是我很常用的:SetDeadline
、SetReadDeadline
、SetWriteDeadline
,他们的作用就是等待 TCP 连接的读写操作,如果在预设的时间点还没有读或者写操作的话,读的方法就会直接返回,并且返回一个 os.ErrDeadlineExceeded
类型的错误,这通常用于在判定连接是否已经失联的情况。
为什么要用 Deadline
对 TCP 有一些了解的同学可能都会问,TCP 不是有 Keepalive 么,为什么还需要单独用 Connection 的 Deadline。很明显,我们知道 TCP 的 keepalive 是系统级的,它的配置路径在 /proc/sys/net/ipv4/
目录下:
[root@liqiang.io]# ls -al /proc/sys/net/ipv4/tcp_keepalive_*
-rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_intvl
-rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_probes
-rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_time
如果选择用 TCP 的 keepalive 配置,那么你只能通过系统的全局配置来生效,但是这在企业级网络编程中都是不太实际的,每个应用可能都有自己的独特需求,所以不能一概而论使用同一份配置。
如何使用 Deadline
一个简单的使用 Deadline 的 demo 为:
[root@liqiang.io]# cat test.go
func (c *ApplicationConnection) readLoop() {
var err error
c.conn, err = net.Dial("tcp", c.GetDialerAddr())
if err != nil {
c.logger.Error(c.ctx, "connect to %s: %v", d.GetDialerAddr(), err)
return
}
c.conn.SetReadDeadline(time.Now().Add(time.Second * 10))
for {
bytes, err := io.ReadAll(c.conn)
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {
c.logger.Trace(c.ctx, "Read timeout, send a heartbeat message")
c.heartbeat()
continue
}
c.logger.Error(c.ctx, "Failed to copy data from listener to Dialer: %v", err)
return
}
c.conn.SetReadDeadline(time.Now().Add(time.Second * 10))
c.process(bytes)
}
}
这里我们可以设置一个等待时间,如果在预期时间内连接没有数据可以读取,那么就会返回一个错误,我们即可根据需要对连接进行处理;如果在这期间有数据可以读取,那么我们需要注意要重置一下 Deadline 的值,不然,这个值还是会有效的。
如何实现 Deadline
在知道如何使用之后,接下来的问题就是 Go 又是如何实现 Connection 的 Deadline 的呢?在之前 Go 的源码分析中,我们知道了在网络上 Go 底层还是使用的 epoll,那么丢与 Deadline ,Go runtime 又是如何做到的?
我可以想象的一个解决方法是将 Connection 包装成一个 struct 结构,struct 结构里面包含原始的连接信息,deadline 信息(一个 timer),以及 timer 处理函数,然后在 epoll 中添加对应的 timer,这样,当 timer 被 trigger 的时候,就表示这期间都没有可读的事件,所以可以直接调用处理函数,从而达到 deadline 的效果。
但是,Go 如何实现的,还是要具体地看代码(这里我依照之前的 Go 源码分析逻辑,看的是 1.12 的代码):
[root@liqiang.io]# internal/poll/fd_poll_runtime.go
func (fd *FD) SetReadDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, 'r')
---> internal/poll/fd_poll_runtime.go
func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
... ...
runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
---> runtime/netpoll.go
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
... ...
netpollgoready(rg, 3)
---> runtime/netpoll.go
... ...
pd.rt.f = rtf
pd.rt.when = pd.rd
pd.rt.arg = pd
pd.rt.seq = pd.rseq
addtimer(&pd.rt)
可以看到,Go 的实现大体上和我的设想差不多,只不过多了更多的条件保障之类的。
C++ 如何实现
最近在尝试 C++ 编写一些网络程序,所以自然而然我也想看下 C++ 是否支持类似的功能,但是好像没有直接支持的方式,于是我就基于 libevent 自己实现了一下,代码主要分为几部分:
- 连接封装结构
[root@liqiang.io]# cat echo_server.cpp
struct TimeoutConnection {
long int last_read_ts;
std::string name;
evutil_socket_t fd;
struct event_base *base;
struct bufferevent *bev;
struct event *timeout_event;
};
- 连接建立部分
static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) {
std::cout << get_current_time() << " listener_cb" << std::endl;
struct event_base *base = (event_base *) user_data;
struct bufferevent *bev;
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
std::cerr << get_current_time() << " failed to constructing bufferevent!" << std::endl;
event_base_loopbreak(base);
return;
}
struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) malloc(sizeof(struct TimeoutConnection));
timeoutConn->last_read_ts = std::time(nullptr);
timeoutConn->name = "test";
timeoutConn->fd = fd;
timeoutConn->base = base;
timeoutConn->bev = bev;
timeoutConn->timeout_event = event_new(base, -1, EV_PERSIST, timeout_cb, timeoutConn);
bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, timeoutConn);
bufferevent_enable(bev, EV_READ);
struct timeval tv = {.tv_sec = 5, .tv_usec = 0};
auto result = event_add(timeoutConn->timeout_event, &tv);
if (result != 0) {
std::cout << "event_add failed" << std::endl;
}
}
- 数据读取部分
[root@liqiang.io]# cat echo_server.cpp
static void conn_readcb(struct bufferevent *bev, void *ptr) {
struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) ptr;
timeoutConn->last_read_ts = std::time(nullptr);
char buf[1024];
int n;
struct evbuffer *input = bufferevent_get_input(bev);
while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0) {
std::cout << get_current_time() << " connection " << timeoutConn->name << " recv: " << buf << std::endl;
bufferevent_write(bev, buf, n);
}
}
- timer 实现部分
[root@liqiang.io]# cat echo_server.cpp
static void timeout_cb(evutil_socket_t fd, short what, void *arg) {
std::cout << get_current_time() << " connection timeout invoked" << std::endl;
auto currTs = std::time(nullptr);
struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) arg;
if (currTs - timeoutConn->last_read_ts > 10) {
std::cout << get_current_time() << " connection " << timeoutConn->name << " timeout" << std::endl;
std::cout << get_current_time() << " connection " << timeoutConn->name << " last read at: " << timeoutConn->last_read_ts << std::endl;
bufferevent_free(timeoutConn->bev);
event_free(timeoutConn->timeout_event);
free(timeoutConn);
return;
}
std::cout << get_current_time() << " connection " << timeoutConn->name << " last read at: " << timeoutConn->last_read_ts << std::endl;
}
概述
到这里差不多就了解了这部分的实现了,倒是也不怎么复杂。但是,可以深究的事情也很多,例如 Go 底层的 timer 是如何实现的这些,但是这些我以前也写过类似的(Linux 实现定时器),所以就不过多深究了。