TCP存活检测机制
在阐述TCP提高检测连接存活状态之前,试想为什么在TCP协议设计以及实际开发中,为什么不提供即时的网络连接中断的通知呢?这方面主要有两个方面的原因,其一是这样会消耗掉大量的网络带宽,试想若存在着大量的不成熟的网络应用程序,网络带宽一定会消耗殆尽;其二则要从TCP设计之初说起,因为当年美国国防部设计TCP就是为了让在网络中断的情况下仍然通过其它途径维持通信的能力。目前Linux系统提供了KEEP_A
在阐述TCP提高检测连接存活状态之前,试想为什么在TCP协议设计以及实际开发中,为什么不提供即时的网络连接中断的通知呢?这方面主要有两个方面的原因,其一是这样会消耗掉大量的网络带宽,试想若存在着大量的不成熟的网络应用程序,网络带宽一定会消耗殆尽;其二则要从TCP设计之初说起,因为当年美国国防部设计TCP就是为了让在网络中断的情况下仍然通过其它途径维持通信的能力。目前Linux系统提供了KEEP_ALIVE机制去检测TCP的存活状态。
一、KEEP_ALIVE机制:
| | | |
| A | | B |
|_____| |_____|
^ ^
|--->--->--->-------------- SYN-------------->--->--->---|
|---<---<---<------------ SYN/ACK------------<---<---<---|
|--->--->--->-------------- ACK-------------->--->--->---|
| |
| systemcrash ---> X
|
| systemrestart ---> ^
| |
|--->--->--->-------------- PSH-------------->--->--->---|
|---<---<---<-------------- RST--------------<---<---<---|
| |
int keepalive = 1; // 开启KEEP_ALIVE
int keepidle = 60; // 如该连接在60秒内没有任何数据往来,则进行探测
int keepinterval = 5; //探测时发包的时间间隔为5秒(每个探测包的存活时间)
int keepcount = 3; //探测尝试的总次数.如果第1次探测包就收到响应了,则后2次的不再发.
setsockopt(rs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepalive , sizeof(keepalive ));
setsockopt(rs, SOL_TCP, TCP_KEEPIDLE, (void*)&keepidle , sizeof(keepidle ));
setsockopt(rs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepinterval , sizeof(keepinterval ));
setsockopt(rs, SOL_TCP, TCP_KEEPCNT, (void *)&keepcount , sizeof(keepcount ));
对于Linux系统,与keepidle,keepinterval以及keepcount所对应的设置值分别为:
/proc/sys/net/ipv4/tcp_keepalive_time
/proc/sys/net/ipv4/tcp_keepalive_intvl
/proc/sys/net/ipv4/tcp_keepalive_probes
二、心跳机制:
本节试图在应用程序中去实现一种简易的心跳机制以及带外数据的心跳机制。示例代码如下:
/*************************************readn.h*******************************/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdarg.h>
#include <string.h>
#include <errno.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#define INIT() ( program_name = \
strrchr( argv[ 0 ], '/' ) ) ? \
program_name++ : \
( program_name = argv[ 0 ] )
#define EXIT(s) exit( s )
#define CLOSE(s) if ( close( s ) ) error( 1, errno, \
"close failed" )
#define set_errno(e) errno = ( e )
#define isvalidsock(s) ( ( s ) >= 0 )
#define NLISTEN 5 /* max waiting connections */
#define TRUE 1
#define FALSE 0
#define OPTIMIZE_READLINE
typedef unsigned int u_int32_t;
typedef int SOCKET;
/*-----------------Start Heartbeat include file--------------------*/
#define MSG_TYPE1 1 /* application specific msg */
#define MSG_TYPE2 2 /* another one */
#define MSG_HEARTBEAT 3 /* heartbeat message */
typedef struct /* message structure */
{
u_int32_t type; /* MSG_TYPE1, ... */
char data[ 2000 ];
} msg_t;
#define T1 60 /* idle time before heartbeat */
#define T2 10 /* init a heartbeat and timeout to wait for response */
/*-----------------Heartbeat include file--------------------*/
SOCKET tcp_server( char *hname, char *sname ); //setup tcp server
int readvrec( SOCKET fd, char *bp, size_t len );
int readn( SOCKET fd, char *bp, size_t len);
SOCKET tcp_client( char *hname, char *sname );
SOCKET udp_server( char *, char * );
SOCKET udp_client( char *, char *, struct sockaddr_in * );
void set_address( char *hname, char *sname, struct sockaddr_in *sap, char *protocol );
#ifndef OPTIMIZE_READLINE
int readline(int fd, void *vptr, size_t maxlen);
#else
int readline( int fd, char *bufptr, size_t maxlen );
#endif
/**************************************readn.c************************************/
#include "readn.h"
//setup for a tcp server
SOCKET tcp_server( char *hname, char *sname )
{
struct sockaddr_in local;
SOCKET s;
const int on = 1;
set_address( hname, sname, &local, "tcp" );
s = socket( AF_INET, SOCK_STREAM, 0 );
if ( !isvalidsock( s ) )
{
error( 1, errno, "socket call failed" );
}
if ( setsockopt( s, SOL_SOCKET, SO_REUSEADDR, ( char * )&on, sizeof( on ) ) )
{
error( 1, errno, "setsockopt failed" );
}
if ( bind( s, ( struct sockaddr * ) &local, sizeof( local ) ) )
{
error( 1, errno, "bind failed" );
}
if ( listen( s, NLISTEN ) )
{
error( 1, errno, "listen failed" );
}
return s;
}
/* readn - read exactly n bytes */
int readn( SOCKET fd, char *bp, size_t len)
{
int cnt;
int rc;
int var;
/*dragon, setup NO BLOCK option, and maybe cause error about "Resource temporarily unavailable",
*so should exclude errno is EAGAIN
*/
// var = fcntl(fd, F_GETFL, 0);
// fcntl(fd, F_SETFL, var|O_NONBLOCK);
cnt = len;
while ( cnt > 0 )
{
//dragon, default option of recv is block
// rc = recv( fd, bp, cnt, 0 );
rc = recv( fd, bp, cnt, MSG_DONTWAIT); //NON_BLOCK operation
if ( rc < 0 ) /* read error? */
{
if ( errno == EINTR || errno == EAGAIN) /* interrupted? */
continue; /* restart the read */
return -1; /* return error */
}
if ( rc == 0 ) /* EOF? */
{
return len - cnt; /* return short count */
}
bp += rc;
cnt -= rc;
}
return len;
}
/* readvrec - read a variable record */
int readvrec( SOCKET fd, char *bp, size_t len )
{
u_int32_t reclen;
int rc = 0;
/* Retrieve the length of the record */
rc = readn( fd, ( char * )&reclen, sizeof( u_int32_t ) );
if ( rc != sizeof( u_int32_t ) )
{
return rc < 0 ? -1 : 0;
}
reclen = ntohl( reclen );
if ( reclen > len )
{
/*
* Not enough room for the record--
* discard it and return an error.
* Need to discard data in the receive buffer.
*/
//How to discard receive buffer
while ( reclen > 0 )
{
rc = readn( fd, bp, len );
if ( rc != len )
return rc < 0 ? -1 : 0;
reclen -= len;
if ( reclen < len )
len = reclen;
}
return -1;
}
/* Retrieve the record itself */
rc = readn( fd, bp, reclen );
if ( rc != reclen )
{
return rc < 0 ? -1 : 0;
}
return rc;
}
/*-------------------------Before optimize-------------------------*/
//PS: each read just only read one character, we can read more characters and store it to buffer, such as array,
// that is trading time for space.
#ifndef OPTIMIZE_READLINE
int readline(int fd, void *vptr, size_t maxlen)
{
int n, rc;
char c, *ptr;
ptr = vptr;
for (n = 1; n < maxlen; n++) {
if ( (rc = read(fd, &c, 1)) == 1) {
*ptr++ = c;
if (c == '\n')
break;
} else if (rc == 0) {
if (n == 1)
return(0); /* EOF, no data read */
else
break; /* EOF, some data was read */
} else
return(-1); /* error */
}
*ptr = 0;
return(n);
}
#else
int readline( int fd, char *bufptr, size_t len )
{
char *bufx = bufptr;
static char *bp;
static int cnt = 0;
static char b[ 1500 ];
char c;
while ( --len > 0 ) //should first plus to add an extra char buf to store ending character
{
if ( --cnt <= 0 )
{
cnt = recv( fd, b, sizeof( b ), 0 );
if ( cnt < 0 )
{
if ( errno == EINTR )
{
len++; /* the while will decrement */
continue;
}
return -1;
}
if ( cnt == 0 )
{
return 0;
}
bp = b;
}
c = *bp++;
*bufptr++ = c;
if ( c == '\n' )
{
*bufptr = '\0';
return bufptr - bufx;
}
}
set_errno( EMSGSIZE ); //return over size error
return -1;
}
#endif
/*-------------------------End optimize-------------------------*/
/* tcp_client - set up for a TCP client */
SOCKET tcp_client( char *hname, char *sname )
{
struct sockaddr_in peer;
SOCKET s;
set_address( hname, sname, &peer, "tcp" );
s = socket( AF_INET, SOCK_STREAM, 0 );
if ( !isvalidsock( s ) )
{
error( 1, errno, "socket call failed" );
}
if ( connect( s, ( struct sockaddr * )&peer, sizeof( peer ) ) )
{
error( 1, errno, "connect failed" );
}
return s;
}
/* set_address - fill in a sockaddr_in structure */
void set_address( char *hname, char *sname, struct sockaddr_in *sap, char *protocol )
{
struct servent *sp;
struct hostent *hp;
char *endptr;
short port;
bzero( sap, sizeof( *sap ) );
sap->sin_family = AF_INET;
if ( hname != NULL )
{
if ( !inet_aton( hname, &sap->sin_addr ) )
{
hp = gethostbyname( hname );
if ( hp == NULL )
error( 1, 0, "unknown host: %s\n", hname );
sap->sin_addr = *( struct in_addr * )hp->h_addr;
}
}
else
sap->sin_addr.s_addr = htonl( INADDR_ANY );
port = strtol( sname, &endptr, 0 );
if ( *endptr == '\0' )
sap->sin_port = htons( port );
else
{
sp = getservbyname( sname, protocol );
if ( sp == NULL )
error( 1, 0, "unknown service: %s\n", sname );
sap->sin_port = sp->s_port;
}
}
//setup an udp server
SOCKET udp_server( char *hname, char *sname )
{
SOCKET s;
struct sockaddr_in local;
set_address( hname, sname, &local, "udp" );
s = socket( AF_INET, SOCK_DGRAM, 0 );
if ( !isvalidsock( s ) )
{
error( 1, errno, "socket call failed" );
}
if ( bind( s, ( struct sockaddr * ) &local, sizeof( local ) ) )
{
error( 1, errno, "bind failed" );
}
return s;
}
/**************************************hb_client.c**********************************/
#include "readn.h"
static char *program_name = NULL;
static void error( int status, int err, char *fmt, ... )
{
va_list ap;
va_start( ap, fmt );
fprintf( stderr, "%s: ", program_name );
vfprintf( stderr, fmt, ap );
va_end( ap );
if ( err )
{
fprintf( stderr, ": %s (%d)\n", strerror( err ), err );
}
if ( status )
{
EXIT( status );
}
}
int main( int argc, char **argv )
{
fd_set allfd;
fd_set readfd;
msg_t msg;
struct timeval tv;
SOCKET s;
int rc;
int heartbeats = 0;
int cnt = sizeof( msg );
INIT();
s = tcp_client( argv[ 1 ], argv[ 2 ] );
FD_ZERO( &allfd );
FD_SET( s, &allfd );
tv.tv_sec = T1;
tv.tv_usec = 0;
for ( ;; )
{
readfd = allfd;
rc = select( s + 1, &readfd, NULL, NULL, &tv );
if ( rc < 0 )
{
error( 1, errno, "select failure" );
}
else if ( rc == 0 ) /* timed out */
{
if ( ++heartbeats > 3 )
{
error( 1, 0, "connection dead\n" );
}
else
{
error( 0, 0, "sending heartbeat #%d\n", heartbeats );
}
msg.type = htonl( MSG_HEARTBEAT );
rc = send( s, ( char * )&msg, sizeof( msg ), 0 );
if ( rc < 0 )
{
error( 1, errno, "send failure" );
}
tv.tv_sec = T2; //wait response timeout
continue;
}
if ( !FD_ISSET( s, &readfd ) )
{
error( 1, 0, "select returned invalid socket\n" );
}
//rc = 1
rc = recv( s, ( char * )&msg + sizeof( msg ) - cnt, cnt, 0 );
if ( rc == 0 )
{
error( 1, 0, "server terminated\n" );
}
if ( rc < 0 )
{
error( 1, errno, "recv failure" );
}
heartbeats = 0;
tv.tv_sec = T1;
cnt -= rc;
if ( cnt > 0 )
{
continue;
}
cnt = sizeof( msg ); //restore default value
/* process message */
}
}
/********************************hb_server.c***************************************/
#include "readn.h"
static char *program_name = NULL;
static void error( int status, int err, char *fmt, ... )
{
va_list ap;
va_start( ap, fmt );
fprintf( stderr, "%s: ", program_name );
vfprintf( stderr, fmt, ap );
va_end( ap );
if ( err )
fprintf( stderr, ": %s (%d)\n", strerror( err ), err );
if ( status )
EXIT( status );
}
int main( int argc, char **argv )
{
fd_set allfd;
fd_set readfd;
msg_t msg;
struct timeval tv;
SOCKET s;
SOCKET s1;
int rc;
int missed_heartbeats = 0;
int cnt = sizeof( msg );
INIT();
s = tcp_server( NULL, argv[ 1 ] );
s1 = accept( s, NULL, NULL );
if ( !isvalidsock( s1 ) )
{
error( 1, errno, "accept failed" );
}
//fill select timeout
tv.tv_sec = T1 + T2;
tv.tv_usec = 0;
FD_ZERO( &allfd );
FD_SET( s1, &allfd );
for ( ;; )
{
readfd = allfd;
rc = select( s1 + 1, &readfd, NULL, NULL, &tv );
if ( rc < 0 ) //error happens
{
error( 1, errno, "select failure" );
}
else if ( rc == 0 ) /* timed out */
{
if ( ++missed_heartbeats > 3 )
{
error( 1, 0, "connection dead\n" );
}
else
{
error( 0, 0, "missed heartbeat #%d\n", missed_heartbeats );
}
tv.tv_sec = T2;
continue;
}
if ( !FD_ISSET( s1, &readfd ) )
{
error( 1, 0, "select returned invalid socket\n" );
}
rc = recv( s1, ( char * )&msg + sizeof( msg ) - cnt, cnt, 0 );
if ( rc == 0 ) //client has performed an orderly shutdown
{
error( 1, 0, "client terminated\n" );
}
else if ( rc < 0 ) //error happens
{
error( 1, errno, "recv failure" );
}
//every time, if receive heatbeat response, then init missed_heartbeats to 0 and select timeout to T1+T2
missed_heartbeats = 0;
tv.tv_sec = T1 + T2;
cnt -= rc; /* in-line readn */
if ( cnt > 0 )
{
continue;
}
cnt = sizeof( msg );
switch ( ntohl( msg.type ) )
{
case MSG_TYPE1 :
/* process type1 message */
break;
case MSG_TYPE2 :
/* process type2 message */
break;
case MSG_HEARTBEAT :
rc = send( s1, ( char * )&msg, sizeof( msg ), 0 );
if ( rc < 0 )
{
error( 1, errno, "send failure" );
}
break;
default :
error( 1, 0, "unknown message type (%d)\n", ntohl( msg.type ) );
}
}
EXIT( 0 );
}
相应的Makefile文件如下:
all: hb_client hb_server hb_client1 hb_server1
CC = gcc
hb_client:
$(CC) hb_client.c readn.c -o hb_client
hb_server:
$(CC) hb_server.c readn.c -o hb_server
hb_client1:
$(CC) hb_client1.c readn.c -o hb_client1
hb_server1:
$(CC) hb_server1.c readn.c -o hb_server1
clean:
rm -rf hb_client hb_server hb_client1 hb_server1 *.o
上面的简易心跳机制采取了内建消息机制,实现了客户机与服务器的心跳连接检测,但是这个消息与数据域糅合在一起,不具有通用性,因此,可以基于上面的代码进行改进,就是心跳机制与数据处理分隔开,心跳机制也采用单独的连接。修改的代码如下:
/**************************************hb_client1********************************************/
#include "readn.h"
static char *program_name = NULL;
static void error( int status, int err, char *fmt, ... )
{
va_list ap;
va_start( ap, fmt );
fprintf( stderr, "%s: ", program_name );
vfprintf( stderr, fmt, ap );
va_end( ap );
if ( err )
{
fprintf( stderr, ": %s (%d)\n", strerror( err ), err );
}
if ( status )
{
EXIT( status );
}
}
int main( int argc, char **argv )
{
fd_set allfd;
fd_set readfd;
char msg[ 1024 ];
struct timeval tv;
struct sockaddr_in hblisten;
SOCKET sdata;
SOCKET shb;
SOCKET slisten;
int rc;
int hblistenlen = sizeof( hblisten );
int heartbeats = 0;
int maxfd1;
char hbmsg[ 1 ];
INIT();
slisten = tcp_server( NULL, "0" );
rc = getsockname( slisten, ( struct sockaddr * )&hblisten, &hblistenlen );
if ( rc )
{
error( 1, errno, "getsockname failure" );
}
//connect server and send heartbeat port number
sdata = tcp_client( argv[ 1 ], argv[ 2 ] );
rc = send( sdata, ( char * )&hblisten.sin_port, sizeof( hblisten.sin_port ), 0 );
if ( rc < 0 )
{
error( 1, errno, "send failure sending port" );
}
shb = accept( slisten, NULL, NULL );
if ( !isvalidsock( shb ) )
{
error( 1, errno, "accept failure" );
}
FD_ZERO( &allfd );
FD_SET( sdata, &allfd );
FD_SET( shb, &allfd );
maxfd1 = ( sdata > shb ? sdata: shb ) + 1;
tv.tv_sec = T1;
tv.tv_usec = 0;
for ( ;; )
{
readfd = allfd;
rc = select( maxfd1, &readfd, NULL, NULL, &tv );
if ( rc < 0 )
{
error( 1, errno, "select failure" );
}
if ( rc == 0 ) /* timed out */
{
if ( ++heartbeats > 3 )
{
error( 1, 0, "connection dead\n" );
}
error( 0, 0, "sending heartbeat #%d\n", heartbeats );
rc = send( shb, "", 1, 0 );
if ( rc < 0 )
{
error( 1, errno, "send failure" );
}
tv.tv_sec = T2;
continue;
}
if ( FD_ISSET( shb, &readfd ) )
{
rc = recv( shb, hbmsg, 1, 0 );
if ( rc == 0 )
{
error( 1, 0, "server terminated (shb)\n" );
}
if ( rc < 0 )
{
error( 1, errno, "bad recv on shb" );
}
}
//process data
if ( FD_ISSET( sdata, &readfd ) )
{
rc = recv( sdata, msg, sizeof( msg ), 0 );
if ( rc == 0 )
{
error( 1, 0, "server terminated (sdata)\n" );
}
if ( rc < 0 )
{
error( 1, errno, "recv failure" );
}
}
heartbeats = 0;
tv.tv_sec = T1;
}
}
/**********************************************hb_server1***********************************/
#include "readn.h"
static char *program_name = NULL;
static void error( int status, int err, char *fmt, ... )
{
va_list ap;
va_start( ap, fmt );
fprintf( stderr, "%s: ", program_name );
vfprintf( stderr, fmt, ap );
va_end( ap );
if ( err )
{
fprintf( stderr, ": %s (%d)\n", strerror( err ), err );
}
if ( status )
{
EXIT( status );
}
}
int main( int argc, char **argv )
{
fd_set allfd;
fd_set readfd;
char msg[ 1024 ];
struct sockaddr_in peer;
struct timeval tv;
SOCKET s;
SOCKET sdata;
SOCKET shb;
int rc;
int maxfd1;
int missed_heartbeats = 0;
int peerlen = sizeof( peer );
char hbmsg[ 1 ];
INIT();
//receive and parse heartbeat port number from client
s = tcp_server( NULL, argv[ 1 ] );
sdata = accept( s, ( struct sockaddr * )&peer, &peerlen );
if ( !isvalidsock( sdata ) )
{
error( 1, errno, "accept failed" );
}
rc = readn( sdata, ( char * )&peer.sin_port, sizeof( peer.sin_port ) );
if ( rc < 0 )
{
error( 1, errno, "error reading port number" );
}
shb = socket( PF_INET, SOCK_STREAM, 0 );
if ( !isvalidsock( shb ) )
{
error( 1, errno, "shb socket failure" );
}
//heartbeat client
rc = connect( shb, ( struct sockaddr * )&peer, peerlen );
if ( rc )
{
error( 1, errno, "shb connect error" );
}
tv.tv_sec = T1 + T2;
tv.tv_usec = 0;
FD_ZERO( &allfd );
FD_SET( sdata, &allfd );
FD_SET( shb, &allfd );
maxfd1 = ( sdata > shb ? sdata : shb ) + 1;
for ( ;; )
{
readfd = allfd;
rc = select( maxfd1, &readfd, NULL, NULL, &tv );
if ( rc < 0 )
{
error( 1, errno, "select failure" );
}
if ( rc == 0 ) /* timed out */
{
if ( ++missed_heartbeats > 3 )
{
error( 1, 0, "connection dead\n" );
}
error( 0, 0, "missed heartbeat #%d\n", missed_heartbeats );
tv.tv_sec = T2;
continue;
}
if ( FD_ISSET( shb, &readfd ) )
{
rc = recv( shb, hbmsg, 1, 0 );
if ( rc == 0 )
{
error( 1, 0, "client terminated\n" );
}
if ( rc < 0 )
{
error( 1, errno, "shb recv failure" );
}
rc = send( shb, hbmsg, 1, 0 );
if ( rc < 0 )
{
error( 1, errno, "shb send failure" );
}
}
if ( FD_ISSET( sdata, &readfd ) )
{
rc = recv( sdata, msg, sizeof( msg ), 0 );
if ( rc == 0 )
{
error( 1, 0, "client terminated\n" );
}
if ( rc < 0 )
{
error( 1, errno, "recv failure" );
}
}
missed_heartbeats = 0;
tv.tv_sec = T1 + T2;
}
EXIT( 0 );
}
对应的Makefile文件同上。
带外数据的心跳机制:
(未完待续)
更多推荐
所有评论(0)