/* 编译命令:gcc -Wall -g -o server server.c -lpthread */

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <errno.h>
#include <signal.h>

#define PORT 8458
#define MAX_LISTEN 20
typedef struct MsgHead{
 unsigned char type;
 unsigned char BL[4];
}MH;

pthread_mutex_t mutex;
pthread_attr_t attr;

char *get_time_string(char *sbuf)
{
    time_t Now;
    struct tm *tm;
    struct tm temp_time;
 
    time (&Now);
 
    tm = localtime_r(&Now, &temp_time);
    sprintf(sbuf, "%.4d%.2d%.2d_%.2d%.2d%.2d",
            tm->tm_year+1900, tm->tm_mon+1, tm->tm_mday,
            tm->tm_hour, tm->tm_min, tm->tm_sec);
 
    return sbuf;
}

char *GetFileName(char *filename)
{
 char strTime[20];
 char strFileName[64];
 
 memset(strTime,0x00,sizeof(strTime));
 memset(strFileName,0x00,sizeof(strFileName));

 sleep(1);

 get_time_string(strTime);

 strcpy(strFileName,"rec_");
 strcat(strFileName,strTime);
 strcat(strFileName,".dat");  
 
 strcpy(filename,strFileName);

 return filename;
}

int tcp_recv(int fd,int len, char *strRecv)
{
 char *p = strRecv;
 int iRet = -1,iRecvLen = -1,count = 0;
 fd_set rset;
 struct timeval tv;
 
 tv.tv_sec = 0;
 tv.tv_usec = 0;

 while(len > 0)
 {
  FD_ZERO(&rset);
  FD_SET(fd,&rset);
  
  iRet = select (fd+1,&rset,NULL,NULL,&tv);
  //printf("iRet = %d\n",iRet);
  if (iRet < 0)
  {
   printf("tcp select error:%s\n",strerror(errno));
   break;
  }
  else if (iRet == 0)
  {
   sleep(1);
   continue;
  }

  iRecvLen = recv(fd,p,len,0);
  
  if (iRecvLen > 0)
  { 
   len -= iRecvLen;
   p += iRecvLen;
   count += iRecvLen;
  } 
 }
 return count; 

void *recv_msg(void *clfd)
{
 MH *MsgHead = NULL;
 int iMHL,iBodyLen,iRet = -1,iType = -1,iRecCount = 0;
 char RecvBuf[1024] = {"\0"},sFileName[64] = {"\0"};
 int fd = *((int *)clfd); 
 FILE *fp = NULL; 

 printf("clfd=%d\n",fd); 
 iMHL = sizeof(MH);

 /* 打开一个文件用于存放sql 语句 */
 pthread_mutex_lock(&mutex);
 GetFileName(sFileName);
 pthread_mutex_unlock(&mutex);
 
 printf("Filename:%s\n",sFileName);
 
 if ( 0 > strlen(sFileName))
 {
  printf("get filename failed\n");
  return NULL;
 }
    
 fp = fopen(sFileName,"w+");
 if (!fp)
 {
  printf("open file %s failed: %s\n",sFileName,strerror(errno));
  return NULL;
 } 

 fprintf(fp,"%s\n","set termout on");
 fprintf(fp,"%s\n","set feed off");
 
 while(1)
 {
  memset(RecvBuf,0X00,sizeof(RecvBuf));
  //Get message head
  iRet = tcp_recv(fd,iMHL,RecvBuf);
  if (iRet != iMHL)
  {
   printf("get msg head failed !\n");
   continue;
  }

  iBodyLen = 0;
  MsgHead = (MH *)RecvBuf;
  iType = (int)MsgHead->type;
  //printf("type :%d\n",iType);
  if (2 == iType)
  {
   printf("\nServer tell:client %d exit!\n\n",fd);
   //sleep(10);
   fprintf(fp,"%s\n","commit;");
   fclose(fp);
   close(fd);
   return 0;
  }

  iBodyLen += MsgHead->BL[0] * 256 * 256 * 256;
  iBodyLen += MsgHead->BL[1] * 256 * 256;
  iBodyLen += MsgHead->BL[2] * 256;
  iBodyLen += MsgHead->BL[3];
  //printf("BodyLen=%d\n",iBodyLen);
  
  memset(RecvBuf,0X00,sizeof(RecvBuf));
  iRet = tcp_recv(fd,iBodyLen,RecvBuf);
  if (iRet == iBodyLen)
  {
   RecvBuf[iBodyLen] = '\0';
   if (1000 > iRecCount)
   {
    fprintf(fp,"%s\n",RecvBuf);
    iRecCount++;
   }
   else
   {
    /* 本文件已写满1000 条记录 */
    fprintf(fp,"%s\n","commit;");
    fclose(fp);

    /* 重新打开一个文件写记录 */
    iRecCount = 0;
    memset(sFileName,0x00,sizeof(sFileName));
    
    pthread_mutex_lock(&mutex);
    GetFileName(sFileName);
    pthread_mutex_unlock(&mutex);
    printf("Filename:%s\n",sFileName);
 
    if ( 0 > strlen(sFileName))
    {
     printf("get filename failed\n");
     return NULL;
    }
    fp = fopen(sFileName,"w+");
    fprintf(fp,"%s\n","set termout on");
    fprintf(fp,"%s\n","set feed off");

    fprintf(fp,"%s\n",RecvBuf);
    iRecCount++;
   }
   
  }
 } 
 return 0;
}

void sig_handler(int signo)
{
 switch (signo) 
 {
  case SIGTERM:
  case SIGINT:
   printf("received SIGTERM or SIGINT(tid:%u)", (unsigned int)pthread_self());
   break;
  
  case SIGHUP:
   printf("received SIGHUP(tid:%u)", (unsigned int)pthread_self());
   break;
  
  case SIGUSR1:
   printf("received SIGUSR1(tid:%u)", (unsigned int)pthread_self());
   break;

 
  case SIGPIPE:
   printf("received SIGPIPE(tid:%u)", (unsigned int)pthread_self());
   break;
  
  default:
   printf("received signal %d(tid:%u)", signo, (unsigned int)pthread_self());
   break;
 }
}

int setup_sig_handlers(void)
{
 struct sigaction act;
 
 act.sa_handler = sig_handler;
 sigemptyset(&act.sa_mask);
 act.sa_flags = 0;
 sigaction(SIGTERM,&act,NULL);
 sigaction(SIGINT, &act, NULL);
 sigaction(SIGHUP, &act, NULL);
 sigaction(SIGUSR1,&act,NULL);
 sigaction(SIGUSR2,&act,NULL);
 sigaction(SIGPIPE, &act, NULL);

 return 0;
}

int main()
{
 struct sockaddr_in SerAddr,CliAddr;
 int serfd = -1,clfd = -1,iSize = -1,count = 0;
 int bufSize = 4 * 1024 * 1024;
 pthread_t tid[256];
 
 pthread_attr_init(&attr);
 pthread_mutex_init(&mutex,NULL);
 
 pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
 
 setup_sig_handlers();
 
 iSize = sizeof(SerAddr);
 serfd = socket(PF_INET,SOCK_STREAM,0);
 if (0 > serfd)
 {
  printf("init socket failed:%d\n",serfd);
  return 0;
 }


 memset(&(SerAddr),0x00,sizeof(struct sockaddr_in));
 memset(&(CliAddr),0x00,sizeof(struct sockaddr_in));

 SerAddr.sin_family = AF_INET;
 SerAddr.sin_port = htons(PORT);
 SerAddr.sin_addr.s_addr = htonl(INADDR_ANY);

 if ( 0  > bind(serfd,(struct sockaddr *)&SerAddr,iSize))
 {
  printf("server bind  to port %d failed \n",PORT);
  return 0;
 }
 
 if ( 0 > setsockopt(serfd,SOL_SOCKET,SO_RCVBUF,&bufSize,sizeof(bufSize)))
 {
  printf("setsockopt error:%s",strerror(errno));
  return -1;
 } 

 if (0 > listen(serfd,MAX_LISTEN))
 {
  printf("listen failed\n");
  return 0;
 }
 
 while(1)
 {
  clfd = accept(serfd,(struct sockaddr *)&CliAddr,&iSize);
  if (0 > clfd)
  {
   printf("accept failed %s",strerror(errno));
   sleep(1);
   continue;
  }
  
  if (count < 256)
  { 
   pthread_create(&tid[count++],&attr,recv_msg,(void *)&clfd);
  }
  else
  {
   printf("\n\ntoo many threads \n\n");
   return - 1;
  }   
 }
 pthread_mutex_destroy(&mutex);
 pthread_attr_destroy(&attr);
 return 0;
}
 
/* 客户端 */

/* 编译命令  gcc -Wall -g -o client client.c */

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <errno.h>
#include <ctype.h>

typedef struct MsgHead{
 unsigned char type;
        unsigned char BL[4];
}MH;

int con_serv()
{
 struct sockaddr_in SerAddr;
 int tcpfd = -1,iPort = 8458,iCount = 0;
 char sIP[64] = {"\0"};
 int sndBufSize = 4 * 1024 * 1024;

 tcpfd = socket(AF_INET,SOCK_STREAM,0);
 
 if(tcpfd < 0)
 {
  printf("socket error:%s\n",strerror(errno));
  return -1;
 }
 
 if ( 0 > setsockopt(tcpfd, SOL_SOCKET, SO_SNDBUF, (char *)&sndBufSize, sizeof(sndBufSize)))
    {
            printf("make_tcp(): setsockopt failed for server address\n");
            return -1;
    } 
 
 memset(&(SerAddr),0x00,sizeof(SerAddr));
 memset(sIP,0x00,sizeof(sIP));
 strcpy(sIP,"127.0.0.1");

 SerAddr.sin_family = AF_INET;
 SerAddr.sin_port = htons(iPort);
 SerAddr.sin_addr.s_addr = inet_addr(sIP);

 for (iCount = 0; iCount < 3; iCount++)
 {
  if (0 > connect(tcpfd,(struct sockaddr *)&SerAddr,sizeof(struct sockaddr)))
  {
   printf("the %d times connect to machine: %s,port:%d failed \n",iCount + 1,sIP,iPort);
  }
  else
  {
   return tcpfd;
  } 
 } 
 return -1; 
 

void tcp_send(int tcpfd,int num)
{
 fd_set wset;
 struct timeval tv = {1,0};
 int i = 0,iRet = -1;
 char buf[128] = {"\0"},*p = NULL;
 char Msg[1024] = {"\0"};
 int iLen = 128;
 MH MsgHead,*MsHe = NULL;

 for (i = 0; i < num; i++)
 {
  memset(Msg,0x00,sizeof(Msg));
  memset(buf,0x00,sizeof(buf));
  p = Msg;
  //sleep(1);
  sprintf(buf,"string%d-string%d-string%d-string%d-string%d",i + 1,i + 1,i + 1,i + 1,i + 1);
  MsgHead.BL[0] = (unsigned char)(strlen(buf) / 256 /256 /256);
  MsgHead.BL[1] = (unsigned char)(strlen(buf) / 256 /256 );
  MsgHead.BL[2] = (unsigned char)(strlen(buf) / 256 );
  MsgHead.BL[3] = (unsigned char)(strlen(buf) % 256 ); 
  MsgHead.type = (unsigned char)0;

  memcpy(p,&MsgHead,sizeof(MH));
  p = p + sizeof(MH);

  memcpy(p,buf,strlen(buf));
  p = NULL;    
  iLen = sizeof(MH) + strlen(buf);
 
  while(iLen > 0)
  {
   FD_ZERO(&wset);
   FD_SET(tcpfd,&wset);
   iRet = select(tcpfd + 1,NULL,&wset,NULL,&tv);
   if ( 0 > iRet)
   {
    printf("select error%s  reconnect\n",strerror(errno));
    close(tcpfd);
    sleep(1);
    tcpfd = con_serv();
    continue ;
   }
   else if ( 0 == iRet)
   {
    continue;
   }
   else
   {
    iRet = send(tcpfd,Msg,iLen,0);
    if ( 0 > iRet )
    {
     printf("send failed\n");
    }
    else
    {
     iLen -= iRet; 
    } 
   }
  } 
 }

 printf("tcpfd:%d already send %d record,",tcpfd,num);
 
 printf("send diconnect request\n");
 MsHe = (MH *)Msg;
    memset(Msg,0x00,sizeof(Msg));
 MsHe->BL[0] = (unsigned char)0;
    MsHe->BL[1] = (unsigned char)0;
    MsHe->BL[2] = (unsigned char)0;
    MsHe->BL[3] = (unsigned char)0;
    MsHe->type = (unsigned char)2;
 iLen = sizeof(MH);
 iRet = send(tcpfd,Msg,iLen,0);
    if ( iRet != iLen)
    {
        printf("send failed\n");
    }
   
 return;
}

void *execute(void *num)
{
 int tcpfd = -1;
 int number = *((int *)num);
 
 printf("pthread_id = %u\n",(unsigned int)pthread_self());

 tcpfd = con_serv();

 if ( 0 < tcpfd )
 {
  tcp_send(tcpfd,number);
 }
 else
 {
  printf("connect to server failed!\n");
  return NULL; 
 } 
 //sleep(10);
 close(tcpfd);
 printf("\nclient exit!\n\n");
 return NULL;
}


int  main(int argc,char **argv)
{
 int num = 0,i = 0,sendrec = 0;
 pthread_t tid;
 
 if (argc == 1)
 {
  num = 5;
 }
 else
 {
  for (i = 0; i < strlen(argv[1]); i++)
  {
   if (!isdigit(argv[1][i]))
   {
    printf("please added a number string\n!");
    return 0;
   }
  }
  num = atoi(argv[1]);
  if (0 >= num || 65536 < num)
  {
   num = 5;
  }
  
 }
 printf("send %d record\n",num);
 
 sendrec = num;
 for (i = 0; i < 5; i++)
 {
  sendrec *= (i + 1);
  pthread_create(&tid,NULL,execute,(void *)&sendrec);
  sleep(1);
  sendrec = num;
 }
 for (i = 0; i < 5; i++)
 {
  pthread_join(tid,NULL);
 }
 return 0;  
}

Logo

更多推荐