红联Linux门户
Linux帮助

关于Linux下线程编程的学习

发布时间:2007-12-22 12:48:15来源:红联作者:wmbodbc
最近在学习Linux下的线程编程,我的系统环境Fedora 7.0,设计了一个这样的程序:

这是一个后台服务系统,它对外提供两个服务接口,接收到Client端的请求后,然后在数据库中查找相关数据,如果OK,再转发给另一个服务器处理,然后再将处理转发给客户端。

该服务与另一个服务器之前采用Socket长连接模式,在配置文件中可以定义连接数量,服务启动后,自动产生这么多的线程,线程之间的通讯暂时考虑是消息队列模式。

服务启动后同时启动另外两个线程,分别启动Socket服务,等待客户端连接请求。

另外启动一个线程用来记录通讯数据,是否开启这个功能可以通过配置文件来决定。

主线程负责监视这些子线程的状态,如果有某个线程异常终止、或者由于某些原因没有启动,监视线程可以定期去启动,使用alarm定时监视。

另外写了一个测试小程序,挺简单的,通过命令行输入程序并发数量,每个子进程执行测试数量,源码如下:[code]#include

extern int errno;

void workthread(struct sockaddr_in sin,int loopnum)
{
CLIENTPKG cLientPkg;
char SendMsg[257];
char RecvMsg[257];
int rc=0,j,nReturn,nLen,bJump=0;
int ii;

for(ii =1; ii<=loopnum; ii++)
{
printf("ThreadId=%d,loopnum :%d\n",(int)getpid(),ii);
rc=socket(PF_INET,SOCK_STREAM,0);
if(rc<0)
{
printf("Build socket Err:%d,%s!\n",errno,strerror(errno));
exit(0);
}

if(connect(rc,(struct sockaddr*)&sin,sizeof(sin))!=0)
{
printf("connect fail!\n");
exit(0);
}

memset(&cLientPkg,0,sizeof(cLientPkg));
memcpy(cLientPkg.pPID,"0000",4);
cLientPkg.cCommand=0x66;
memcpy(cLientPkg.pAreaID,"0000",4);
memcpy(cLientPkg.pNetID,"0000",4);
memcpy(cLientPkg.pOperater,"00000",5);
memcpy(cLientPkg.pMachID,"0121000100",10);
memcpy(cLientPkg.pQueryStart,"20030801",8);
memcpy(cLientPkg.pQueryEnd,"20031010",8);
memcpy(SendMsg,&cLientPkg,sizeof(cLientPkg));

if(send(rc,SendMsg,256,0)<0)
{
printf("Send Message fail.\n");
exit(0);
}

do
{
nLen=0;
do
{
nReturn=recv(rc,RecvMsg+nLen,256-nLen,0);
if(nReturn<0)
{
printf("Recv errno=%d\n",errno);
close(rc);
exit(0);
}
nLen+=nReturn;
} while (RecvMsg[6]!=(char)0xff) ;

if(RecvMsg[6]!=(char)0xff)
{
bJump=1;
}
else
{
bJump=0;
}
}while(bJump==1);

close(rc);
}
}

main(int argc,char* argv[])
{
char strAdd[16];
struct sockaddr_in sin;
int port=9998;
int kk,ii;
int forknum = 10,loopnum =10;

system("clear");

memset(strAdd,0,16);
memcpy(strAdd,"192.168.1.197",13);

bzero((char *)&sin,sizeof(sin));
sin.sin_family=AF_INET;
sin.sin_addr.s_addr=inet_addr(strAdd);
sin.sin_port=htons(port);

if (argc >=2 ) forknum = atoi(argv[1]);
if (argc >=3 ) loopnum = atoi(argv[2]);

for(kk=1;kk<=forknum ;kk++)
{
printf("kk:%d\n",kk);
if(fork()==0)
{
workthread(sin, loopnum);
exit(0);
}
}
return;
}


服务端处理Client请求的源码如下:

#include

extern CT MainCt;
extern int errno;
pthread_cond_t tcn_th_cond = PTHREAD_COND_INITIALIZER;
pthread_key_t p_key; //not used
struct tcn_thread_mgr *tcn_th_mgr = NULL;

int TreadClntrequest(struct tcn_thread_data *data)
{
int socket;
BYTE RecvRecMessage[HOSTMSGLEN+1];
int nRet;

//socket = pthread_getspecific(p_key);
socket = data->sock;

memset(RecvRecMessage,0,MSGLEN+1);

LogMessage("run TreadClntrequest:socket=%d",socket);
nRet = recv(socket,RecvRecMessage,sizeof(CLIENTPKG),0);
if(nRet != sizeof(CLIENTPKG))
{
LogMessage("recv error! errno=%d, errMsg:%s.",errno,strerror(errno));
}
else
{
RecvRecMessage[6] = (char)0xFF;
SendDataToClient(socket, RecvRecMessage, MAINPACK, MainCt.TraceData);
}

close(socket);
LogMessage("TreadClntrequest finish!");

return EXECOK;
}

//线程执行时,阻塞所有的信号
void thread_signal_init()
{
sigset_t thread_sigmask;
int rc;

if ((rc = sigfillset(&thread_sigmask)))
{
pthread_exit(NULL);
}

if ((rc = pthread_sigmask(SIG_BLOCK, &thread_sigmask, NULL)))
{
pthread_exit(NULL);
}
}

void* apctclnt_thread_run(void* args)
{
LogMessage("run apctclnt_thread_run");
struct tcn_thread_data *data = (struct tcn_thread_data *)args;
thread_signal_init();

// pthread_setspecific(p_key, args);

data->thread_id = pthread_self();
TreadClntrequest(data);

pthread_mutex_lock(&(tcn_th_mgr->lock));
if(tcn_th_mgr->num_active_threads >= tcn_th_mgr->max_threads)
{
LogMessage("call pthread_cond_signal");
pthread_cond_signal(&tcn_th_cond);
}

tcn_th_mgr->num_active_threads--;

data->thread_id = THREAD_NULL;
pthread_mutex_unlock(&(tcn_th_mgr->lock));

LogMessage("apctclnt_thread_run thread exit!");
pthread_exit(NULL);

return NULL;
}

int acptclnt_thread_create(int socket)
{
int rc;
pthread_t pTreateClnt;
pthread_attr_t a;
int stacksize;
int thread_num;

stacksize = 1024;

pthread_attr_init(&a);
pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED);//PTHREAD _CREATE_JOINABLE
pthread_attr_setstacksize(&a, stacksize);

LogMessage("acptclnt_thread_create socket= %d", socket);

pthread_mutex_lock(&tcn_th_mgr->lock);

while(tcn_th_mgr->num_active_threads >= tcn_th_mgr->max_threads)
{
LogMessage("call pthread_cond_wait");
pthread_cond_wait(&tcn_th_cond, &tcn_th_mgr->lock);
}

/* search for an open slot to store the thread data in */
for (thread_num = 0; thread_num < tcn_th_mgr->max_threads; thread_num++)
{
if (tcn_th_mgr->thread_data[thread_num].thread_id == THREAD_NULL)
break;
}

tcn_th_mgr->thread_data[thread_num].sock = socket;

rc = pthread_create(&tcn_th_mgr->thread_data[thread_num].thread_id,
&a, apctclnt_thread_run, (void *)(&(tcn_th_mgr->thread_data[thread_num])));
//rc = pthread_create(&pTreateClnt, &a, apctclnt_thread_run, (void*)socket);
if (rc != 0)
{
LogMessage("AcptClnt: create thread fail! rc =%d,errMsg:%s.",rc, strerror(rc));
close(socket);
}
else
{
LogMessage("create thread :apctclnt_thread_run ok.");
tcn_th_mgr->num_active_threads ++;
}

pthread_mutex_unlock(&tcn_th_mgr->lock);

pthread_attr_destroy(&a);

return EXECOK;
}

struct sigaction acptclnt_sa_int;

void acptclnt_signal_chld(int signal)
{
int stat;
LogMessage("do acptclnt_signal_chld");

wait3(NULL, WNOHANG, NULL);
/* while( waitpid(-1,&stat,WNOHANG) > 0)
{
LogMessage("have kill a child");
}
*/
return;
}

int acptclnt_threads_init()
{
pthread_key_create(&p_key, NULL);
tcn_th_mgr = calloc(1,sizeof(struct tcn_thread_mgr));
if (tcn_th_mgr == NULL)
{
return EXECFAIL;
}
else
{
tcn_th_mgr->max_threads = MainCt.MaxThreadNum;
tcn_th_mgr->thread_data = calloc(tcn_th_mgr->max_threads,sizeof(struct tcn_thread_data));
if (tcn_th_mgr->thread_data == NULL)
{
free(tcn_th_mgr);
return EXECFAIL;
}

pthread_mutex_init(&tcn_th_mgr->lock,NULL);
tcn_th_mgr->num_active_threads = 0;
}
return EXECOK;
}

void acptclnt_destroy()
{
if (tcn_th_mgr)
{
pthread_mutex_destroy(&tcn_th_mgr->lock);

if (tcn_th_mgr->thread_data)
free(tcn_th_mgr->thread_data);

free(tcn_th_mgr);
}
}

int acptclnt_signal_init()
{
sigset_t sigmask;

sigemptyset(&sigmask);
if (sigaddset(&sigmask, SIGCHLD))
{
LogMessage("sigaddset: %s", strerror(errno));
return EXECFAIL;
}

if (pthread_sigmask(SIG_UNBLOCK, &sigmask, NULL))
{
LogMessage("Setting thread signal mask: %s", strerror(errno));
return EXECFAIL;
}

acptclnt_sa_int.sa_handler = acptclnt_signal_chld;
if (sigaction(SIGCHLD, &acptclnt_sa_int, NULL))
{
LogMessage("signal SIGCHLD not registered: %s", strerror(errno));
return EXECFAIL;
}

return EXECOK;
}

/*
功能:接收处理程序
*/
void* AcptClnt()
{
struct sockaddr_in rstLclAddr;
struct sockaddr_in tcp_addr;
int wAddrLen;
socklen_t clilen;
int sockfd,accsockfd;


if (acptclnt_threads_init() != EXECOK)
{
pthread_exit(NULL);
}

if (acptclnt_signal_init() != EXECOK)
{
acptclnt_destroy();
pthread_exit(NULL);
}

wAddrLen = sizeof (rstLclAddr);
sockfd = socket (AF_INET, SOCK_STREAM, 0);
if(sockfd < 0)
{
LogMessage("AcptClnt: Create Socket error! errno=%d, errMsg:%s.",errno,strerror(errno));
acptclnt_destroy();
pthread_exit(NULL);
}
else
{
LogMessage("AcptClnt: Create Socket ok! socket=%d",sockfd);
}

rstLclAddr.sin_family = AF_INET;// "Internet" Address Family
rstLclAddr.sin_addr.s_addr = inet_addr(MainCt.HostAddr );//Local IP address
rstLclAddr.sin_port = htons(MainCt.ClientPort);// Set to "Echo" Port
if (bind (sockfd, (struct sockaddr*)&rstLclAddr, wAddrLen) < 0)
{
LogMessage("AcptClnt: Socket Bind error! IP:%s,Port:%d. errno=%d,errMsg:%s.",\
MainCt.HostAddr,MainCt.ClientPort,errno,strerror(errno));
acptclnt_destroy();
pthread_exit(NULL);
}

if((listen (sockfd, 100)) < 0)
{
LogMessage("AcptClnt: Socket Listen error! errno=%d, errMsg:%s.",errno,strerror(errno));
acptclnt_destroy();
pthread_exit(NULL);
}

LogMessage("AcptClnt Progress: Start Success!");

for(;;)
{
accsockfd = accept(sockfd,(struct sockaddr*) &tcp_addr,&clilen);
if(accsockfd<0)
{
LogMessage("AcptClnt: Socket Accept failed! errno=%d, errMsg:%s.",errno,strerror(errno));
}
else
{
LogMessage("AcptClnt: Accepted Client_Request...socket=%d",accsockfd);
acptclnt_thread_create(accsockfd);
}
continue;
}
}[/code]在这里acptclnt_thread_create函数中创建线程的部分有很多注意的地方,不过到现在理解还不是很深入。

最初的程序没有增加属性的部分,所以线程创建后采用的是缺省属性,当使用测试工具进行测试时 ./test 1 1000(只有一个进程运行,顺序发送1000个测试,是没有并发的情况),这时只能运行大约300次,服务端就会报错pthread_create报错误码为12,大概是不能分配堆栈了,当然程序还有许多可以优化的地方,即将大的数组变量定义为指针类型,需要时再分配,这样可以降低堆栈大小(正确否?)。

增加线程创建时的属性情况,将堆栈设置为1024(其他也设过,比如8*1024等等,也都会有次数的问题),运行./test 1 20000没有问题。

在配置文件中可以设置并发线程的数量,我定义为10个,当采用并发测试:./test 1000 5 ,服务到后来非常慢,已经不提供服务了,报Connect fail,所以考虑是不是子线程结束后没有及时释放资源?考虑增加线程分离属性,运行上面的并发测试也可以了,但是增加并发数量,如./test 5000 5依然会出现问题,甚至异常服务终止。

还没有想出好的方法来解决。(采用fork子进程模式好像比线程模式要稳定些)
文章评论

共有 1 条评论

  1. niutao0602 于 2007-12-30 10:13:32发表:

    Linux下有线程这一概念吗?我只听说过多进程编程!