#include "TLServer.h"
#include <stdlib.h>
#include <iostream>
#include <TMessage.h>
ClassImp(TLServer)
//
//
// Logs:
//
// $Log: TLServer.cc,v $
// Revision 1.9 2003/12/23 08:38:28 cvs
// bug workaround in TLServer done.
//
// Revision 1.8 2003/12/22 14:32:36 cvs
// added simulation of laser -> LDataHeader version 5
// modified hoppserver sigint interruption
//
// Revision 1.7 2003/11/15 17:17:26 cvs
// new scheme is working...
// Clients (hoppclient) are connected as kLC_AMAX and free socket is given
// TLServer (lserver) is multithreading
// LServer (hoppserver) has a file which is always in memory - faster but needs more memory
//
// Revision 1.6 2003/11/15 10:52:35 cvs
// upgraded TLServeer to new scheme
//
// Revision 1.5 2003/11/11 17:33:03 cvs
// LServer upgraded
//
// Revision 1.4 2003/11/03 13:54:40 cvs
// TLServer is now threaded, LData and LDataHeader have now right Draw and Paint methods, LIV is controlled and comments were added.
//
// Revision 1.3 2003/08/20 10:26:50 cvs
// Added scripts directory with two utilities now:
// modify_header which modify existing header and send it database
// deliver_header program and script which deliver header to clients
//
// libLComm was made ROOT compatible and GetBias(z) was added to LDataHeader
//
// Revision 1.2 2003/06/26 09:27:11 cvs
// added log fields...
//
using namespace std;
TLServer::TLServer(const Int_t port, Bool_t reuse, const UInt_t maxmess)
: TServerSocket(port,reuse)
{
//for port and reuse see TServerSocket Class
fNMaxMess=maxmess;
fMon=new TMonitor();
fMon->Add(this);
for(int i=0;i<kLC_MAX;i++)
fSocket[i]=0;
for(int i=0;i<kLC_AMAX-kLC_MAX-1;i++)
fASocket[i]=0;
for(int i=0;i<kLC_AMAX;i++)
for(int j=0;j<kLC_AMAX;j++)
fWaitingClient[i][j]=kFALSE;
for(int i=0;i<kLC_AMAX;i++) pmThread[i]=0;
}
TLServer::~TLServer() {
Thread_stop();
for(int i=0;i<kLC_MAX;i++)
delete fSocket[i];
for(int i=0;i<kLC_AMAX-kLC_MAX-1;i++)
delete fASocket[i];
}
void TLServer::Thread_stop(int num){
//stop thread number num
//if no number is set, then stop all threads!
if (num==-1)
for(int i=0;i<kLC_AMAX;i++)
if (pmThread[i]){
TThread::Delete(pmThread[i]);
delete pmThread[i];
pmThread[i]=0;
}
if (num>-1)
if (pmThread[num]){
TThread::Delete(pmThread[num]);
delete pmThread[num];
pmThread[num]=0;
}
}
void TLServer::Thread_func(void* arg){
//This function is called with each new thread - with each new connection
//First the connection in fS is accepted
//
//there are two types of clients those normal and those who connect
//with TLClient(const ELClientTypes elc=kLC_AMAX). For the last type
//the free socket is found and the connection is accepted.
//See TLServer::Connect() for how the kLC_AMAX clients are connected.
char str[80];
TSocket* fs;
TSocket* s=0;
TMessage* fmess=0;
TLServer* inst = (TLServer*) arg;//parent class
Int_t meid=TThread::SelfId(); // get pthread id
// cout << "nThread 0, id:" <<meid<< " is running..n"<<endl;
//see who wants to connect, always start with new connection
//and call TLServer::Connect()
//and exit. new connection means fs==inst
TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
fs=inst->fS;
inst->fS=0;//clear
if (!fs) {
cerr<<"Thread_func ERROR: No socketn";
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
return;//End Thread
}
s=inst->Connect(fs,inst);//Accept connection
inst->fMon->Activate(fs);
if(!s) {
cout<<"Connection failed, maybe Client already connectedn";
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
return;//End Thread
}
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
//from this point we know this is a client, which wants to talk or read
// find incoming socket
Int_t fromelc=0;//index for fSocket
Int_t afromelc=0;//index for fASocket
// TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
while(s!=inst->fSocket[fromelc] && fromelc<kLC_MAX) fromelc++;
// amax socket
if(fromelc==kLC_MAX)
while(s!=inst->fASocket[afromelc] && afromelc<kLC_AMAX-kLC_MAX-1)
afromelc++;
// TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
int from_elc=fromelc;//this is index for fMessQueue and fWaitingClient
if (fromelc==kLC_MAX) from_elc=afromelc+fromelc+1; //AMAX connection
//start serving
while (1){
Int_t len=s->Recv(fmess);//receive options string
// Close Socket
if(!len || len==-1) {//close connection
//cout<<"Connection closed "<<fromelc<<" "<<afromelc<<endl;
s->Close();
delete s;
TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
if(fromelc==kLC_MAX)
inst->fASocket[afromelc]=0;
else
inst->fSocket[fromelc]=0;
for(int i=0;i<kLC_AMAX;i++)
inst->fWaitingClient[i][from_elc]=kFALSE;
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
if(fromelc==kLC_MAX)
cout<<" --->Connection closed "<<afromelc<<"n";
else
cout<<" --->Connection closed "<<fromelc<<"n";
if (fmess) delete fmess;
fmess=0;
return;//End Thread
}
//ignore bad options
if(fmess->What() != kMESS_STRING ) {
cout<<"String Expected "<<fmess->What()<<endl;
delete fmess; fmess=0;
continue;
}
//read options string
if (fmess) {
fmess->ReadString(str,80);
delete fmess; fmess=0;
}
char mtype;
Int_t toelc=0;
if (sscanf(str,"%c %d",&mtype,&toelc)!=2) {
cerr<<"Thread_func ERROR: No options stringn";
return;
}
if (fromelc==kLC_MAX){
//AConnection
if(mtype!='N' && mtype!='Q' )
cout<<"AFrom("<<meid<<") "<<s->GetInetAddress().GetHostAddress()<<":"<<afromelc<<":"<<str<<":"<<toelc<<":"<<mtype<<endl;
} else {
//Connection
if(mtype!='N' && mtype!='Q')
cout<<"From ("<<meid<<") "<<s->GetInetAddress().GetHostAddress()<<":"<<fromelc<<":"<<str<<":"<<toelc<<":"<<mtype<<endl;
}
if (fmess) delete fmess;
fmess=0;
if(mtype == 'S') { // sending client
//receive data
s->Recv(fmess);
TMessage *messw = new TMessage(kMESS_OBJECT);
if(fmess->What() == kMESS_STRING ) {
fmess->ReadString(str,80);
messw->SetWhat(kMESS_STRING);
messw->WriteString(str);
} else {
TObject * obj=(TObject*)fmess->ReadObject(fmess->GetClass());
messw->Reset();
messw->WriteObject(obj);
//obj->Dump();
delete obj;
}
delete fmess; fmess=0;
//put data in queue
TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
inst->fMessQueue[from_elc][toelc].push(messw);
// Remove front message (older) if queue too large
if(inst->fMessQueue[from_elc][toelc].size()>inst->fNMaxMess) {
TMessage *messf=inst->fMessQueue[from_elc][toelc].front();
inst->fMessQueue[from_elc][toelc].pop();
delete messf;
cout<<"Message poped("<<meid<<") "<<from_elc<<":"<<toelc<<endl;
}
/* DON'T UNDERSTAND WHY THIS DON'T WORK
if(inst->fWaitingClient[from_elc][toelc]) { // Send to waiting client !!!
TMessage *messr=inst->fMessQueue[from_elc][toelc].front();
inst->fMessQueue[from_elc][toelc].pop();
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
sprintf(str,"%d",from_elc);
printf("debug1 %sn",str);
inst->fSocket[toelc]->Send(str);
//this line never executes!!!
printf("debug2n");
inst->fSocket[toelc]->Send(*messr);
printf("debug3n");
delete messr;
printf("debug4n");
TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
inst->fWaitingClient[from_elc][toelc]=kFALSE;
}
*/
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
continue;
}
if (mtype=='Q'){//query for queue size
int size=0;
TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
size=inst->fMessQueue[from_elc][toelc].size();
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
sprintf(str,"%d",toelc);
s->Send(str);
sprintf(str,"SIZE %d",size);
s->Send(str);
continue;
}
if (mtype=='R' || mtype=='N'){ // reading client
TMessage *messr=0;
TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
if(!inst->fMessQueue[toelc][from_elc].empty()) { // reverse order from, to
messr=inst->fMessQueue[toelc][from_elc].front();
inst->fMessQueue[toelc][from_elc].pop(); // remove message from queue
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
sprintf(str,"%d",toelc);
s->Send(str);
s->Send(*messr);
//messr->Dump();
delete messr; messr=0;
} else { // no message
if(mtype == 'N') { // nowait client
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
//sprintf(str,"%d",toelc);
//s->Send(str);
s->Send("EMPTY");
} else { // waiting client R, default
inst->fWaitingClient[toelc][from_elc] = kTRUE;
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
bool test=kTRUE;
do {
TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
test=inst->fMessQueue[toelc][from_elc].empty();
if (!test){
messr=inst->fMessQueue[toelc][from_elc].front();
inst->fMessQueue[toelc][from_elc].pop(); //remove mesg from queue
inst->fWaitingClient[toelc][from_elc] = kFALSE;
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
sprintf(str,"%d",toelc);
s->Send(str);
s->Send(*messr);
delete messr; messr=0;
} else
TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
// cout<<"waiting("<<meid<<")"<<s->GetInetAddress().GetHostAddress()<<endl;
sleep(1);
} while (test);
}
}
continue;
}
//ignoring unknown options
cout<<"Unknown option ("<<meid<<") "<<mtype<<" from ("<<from_elc<<")"<<s->GetInetAddress().GetHostAddress()<<endl;
} //end of while(1)
return;
}
TSocket* TLServer::Connect(TSocket* fs,TLServer* inst)
{
//the connection of socket fs is accepted if the appropriate
//communication and ELClientType is avalable.
//Thread must be locked!!!
TSocket* s=Accept();
char rstr[80];
s->Recv(rstr,80);
int elc;
sscanf(rstr,"%d",&elc);
if (elc<kLC_MAX){//NORMAL CONNECTION
if(inst->fSocket[elc]==0) {
inst->fSocket[elc]=s;
inst->fSocket[elc]->Send(rstr);
cout<<ClassName()<<" Send "<<rstr<<endl;
return s;
} else {
// Drop connection
s->Send("Denied");
s->Close();
delete s;
return 0;
}
}
if (elc==kLC_AMAX){//ACONNECTION
int i=0;
while(inst->fASocket[i]!=0 && i<kLC_AMAX-kLC_MAX-1) i++;
if(i>=kLC_AMAX-kLC_MAX-1) {
cout<<"No empty slot!n";
s->Send("Denied");
s->Close();
delete s;
return 0;
}
elc=i;
inst->fASocket[elc]=s;
inst->fASocket[elc]->Send(rstr);
cout<<ClassName()<<" Send "<<rstr<<endl;
return s;
}
if (s) {
s->Close();
delete s;
}
return 0;
}
void TLServer::Listen()
{
//Listen for a connection. For each connection starts new thread
//if there are many connections, which only want to do bad, then
//this method is not good. Connection is not accepted here! Only the
//socket is stored in fS.
//
//Number of threads is equal to the size of LClientType enum.
//With each new thread the Thread_func() is started. Then the
//listening to this client is deactivated, to avoid starting
//new thread for each message.
//Thread pointers are stored in pmThread pointer array.
//
//See Thread_func() for more informations...
while((fS=fMon->Select())) {
//start new thread
TThread::Lock();//LLLLLLLLLLOOOOOOOOOO
int thrnum=0;
while (thrnum<kLC_AMAX) {
if (!pmThread[thrnum]) break;
else {
if (pmThread[thrnum]->GetState()==TThread::kCanceledState) {
Thread_stop(thrnum);
break;
} else {
thrnum++;
}
}
}
TThread::UnLock();//LLLLLLLLLLOOOOOOOOOO
if (thrnum>=kLC_AMAX)
cerr<<"Listen() WARNING: no more threads leftn";
else {
// cout<<"Listen() WARNING: starting thread "<<thrnum<<"n";
char label[20];
sprintf(label,"*%d*",thrnum);
pmThread[thrnum]=new TThread(label,
(void(*) (void *))&Thread_func,
(void*) this);
fMon->DeActivate(fS);
pmThread[thrnum]->Run();
}
}
}
ROOT page - Class index - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.