#include "TLServer.h"

#include <stdlib.h>
#include <iostream>
#include <TMessage.h>

ClassImp(TLServer)



//
//
// Logs:
//
//  $Log: TLServer.cc,v $
//  Revision 1.9  2003/12/23 08:38:28  cvs
//  bug workaround in TLServer done.
//
//  Revision 1.8  2003/12/22 14:32:36  cvs
//  added simulation of laser -> LDataHeader version 5
//  modified hoppserver sigint interruption
//
//  Revision 1.7  2003/11/15 17:17:26  cvs
//  new scheme is working...
//  Clients (hoppclient) are connected as kLC_AMAX and free socket is given
//  TLServer (lserver) is multithreading
//  LServer (hoppserver) has a file which is always in memory - faster but needs more memory
//
//  Revision 1.6  2003/11/15 10:52:35  cvs
//  upgraded TLServeer to new scheme
//
//  Revision 1.5  2003/11/11 17:33:03  cvs
//  LServer upgraded
//
//  Revision 1.4  2003/11/03 13:54:40  cvs
//  TLServer is now threaded, LData and LDataHeader have now right Draw and Paint methods, LIV is controlled and comments were added.
//
//  Revision 1.3  2003/08/20 10:26:50  cvs
//  Added scripts directory with two utilities now:
//  modify_header which modify existing header and send it database
//  deliver_header program and script which deliver header to clients
//
//  libLComm was made ROOT compatible and GetBias(z) was added to LDataHeader
//
//  Revision 1.2  2003/06/26 09:27:11  cvs
//  added log fields...
//
using namespace std;

 TLServer::TLServer(const Int_t port, Bool_t reuse, const UInt_t maxmess) 
  : TServerSocket(port,reuse)
{
  //for port and reuse see TServerSocket Class
  fNMaxMess=maxmess;
  fMon=new TMonitor();
  fMon->Add(this);
  for(int i=0;i<kLC_MAX;i++) 
    fSocket[i]=0;
  for(int i=0;i<kLC_AMAX-kLC_MAX-1;i++) 
    fASocket[i]=0;
  for(int i=0;i<kLC_AMAX;i++)
    for(int j=0;j<kLC_AMAX;j++)
      fWaitingClient[i][j]=kFALSE;

  for(int i=0;i<kLC_AMAX;i++) pmThread[i]=0;
}

 TLServer::~TLServer() {
  Thread_stop();
  for(int i=0;i<kLC_MAX;i++) 
    delete fSocket[i];
  for(int i=0;i<kLC_AMAX-kLC_MAX-1;i++) 
    delete fASocket[i];
}

 void TLServer::Thread_stop(int num){
  //stop thread number num
  //if no number is set, then stop all threads!
  if (num==-1) 
    for(int i=0;i<kLC_AMAX;i++)
      if (pmThread[i]){
	TThread::Delete(pmThread[i]);
	delete pmThread[i];
	pmThread[i]=0;
      }
  if (num>-1) 
    if (pmThread[num]){
      TThread::Delete(pmThread[num]);
      delete pmThread[num];
      pmThread[num]=0;
    }
  
}

 void TLServer::Thread_func(void* arg){
  //This function is called with each new thread - with each new connection
  //First the connection in fS is accepted 
  //
  //there are two types of clients those normal and those who connect 
  //with  TLClient(const ELClientTypes elc=kLC_AMAX). For the last type
  //the free socket is found and the connection is accepted. 
  //See TLServer::Connect() for how the kLC_AMAX clients are connected.
  char str[80];
  TSocket* fs;
  TSocket* s=0;
  TMessage* fmess=0;
  TLServer* inst = (TLServer*) arg;//parent class
  Int_t meid=TThread::SelfId(); // get pthread id
  //  cout << "nThread 0, id:" <<meid<< " is running..n"<<endl;

  //see who wants to connect, always  start with new connection 
  //and call TLServer::Connect()
  //and exit.  new connection means  fs==inst
  TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
  fs=inst->fS;
  inst->fS=0;//clear 
  if (!fs) {
    cerr<<"Thread_func ERROR: No socketn";
    TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
    return;//End Thread
  }
  s=inst->Connect(fs,inst);//Accept connection
  inst->fMon->Activate(fs);
  if(!s) {
    cout<<"Connection failed, maybe Client already connectedn";
    TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
    return;//End Thread
  }
  TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
  
  //from this point we know this is a client, which wants to talk or read
  // find incoming socket
  Int_t fromelc=0;//index for fSocket
  Int_t afromelc=0;//index for fASocket
  //  TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
  while(s!=inst->fSocket[fromelc] && fromelc<kLC_MAX) fromelc++;  
  // amax socket
  if(fromelc==kLC_MAX) 
    while(s!=inst->fASocket[afromelc] && afromelc<kLC_AMAX-kLC_MAX-1)
      afromelc++; 
  //  TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
  int from_elc=fromelc;//this is index for fMessQueue and fWaitingClient
  if (fromelc==kLC_MAX) from_elc=afromelc+fromelc+1; //AMAX connection
  
  //start serving 
  while (1){
    Int_t len=s->Recv(fmess);//receive options string
  
    // Close Socket
    if(!len || len==-1) {//close connection
      //cout<<"Connection closed "<<fromelc<<" "<<afromelc<<endl;
      s->Close();
      delete s;
      TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
      if(fromelc==kLC_MAX) 
	inst->fASocket[afromelc]=0;
      else 
	inst->fSocket[fromelc]=0;
      for(int i=0;i<kLC_AMAX;i++)
	inst->fWaitingClient[i][from_elc]=kFALSE;
      TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
      if(fromelc==kLC_MAX) 
	cout<<"  --->Connection closed "<<afromelc<<"n";
      else
	cout<<"  --->Connection closed "<<fromelc<<"n";
      if (fmess) delete fmess;
      fmess=0;
      return;//End Thread
    }

    //ignore bad options
    if(fmess->What() != kMESS_STRING ) {
      cout<<"String Expected "<<fmess->What()<<endl;
      delete fmess; fmess=0;
      continue;
    }
    
    //read options string
    if (fmess) {
      fmess->ReadString(str,80);
      delete fmess; fmess=0;
    }
    
    char mtype;
    Int_t toelc=0;
    if (sscanf(str,"%c %d",&mtype,&toelc)!=2) {
      cerr<<"Thread_func ERROR: No options stringn";
      return;
    }
    if (fromelc==kLC_MAX){
      //AConnection
      if(mtype!='N' && mtype!='Q' )
	cout<<"AFrom("<<meid<<") "<<s->GetInetAddress().GetHostAddress()<<":"<<afromelc<<":"<<str<<":"<<toelc<<":"<<mtype<<endl;
    } else {
      //Connection
      if(mtype!='N' && mtype!='Q')
	cout<<"From ("<<meid<<") "<<s->GetInetAddress().GetHostAddress()<<":"<<fromelc<<":"<<str<<":"<<toelc<<":"<<mtype<<endl;
    }
    if (fmess) delete fmess;
    fmess=0;

    if(mtype == 'S') {  // sending client
      //receive data
      s->Recv(fmess); 
      
      TMessage *messw = new TMessage(kMESS_OBJECT);
      
      if(fmess->What() == kMESS_STRING ) {
	fmess->ReadString(str,80);
	messw->SetWhat(kMESS_STRING);
	messw->WriteString(str);
      } else {
	TObject * obj=(TObject*)fmess->ReadObject(fmess->GetClass());
	messw->Reset();
	messw->WriteObject(obj);
	//obj->Dump();
	delete obj;
      }
      delete fmess; fmess=0;

      //put data in queue
      TThread::Lock();//LLLLLOOOOOCCCCCKKKKK    
      inst->fMessQueue[from_elc][toelc].push(messw);      
      // Remove front message (older) if queue too large
      if(inst->fMessQueue[from_elc][toelc].size()>inst->fNMaxMess) {
	TMessage *messf=inst->fMessQueue[from_elc][toelc].front();
	inst->fMessQueue[from_elc][toelc].pop();
	delete messf;
	cout<<"Message poped("<<meid<<") "<<from_elc<<":"<<toelc<<endl;
      }

      /* DON'T UNDERSTAND WHY THIS DON'T WORK
      if(inst->fWaitingClient[from_elc][toelc]) { // Send to waiting client !!!
	TMessage *messr=inst->fMessQueue[from_elc][toelc].front();
	inst->fMessQueue[from_elc][toelc].pop();
	TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
	sprintf(str,"%d",from_elc);
	printf("debug1 %sn",str);
	inst->fSocket[toelc]->Send(str);
	//this line never executes!!!
	printf("debug2n");
	inst->fSocket[toelc]->Send(*messr);
	printf("debug3n");
	delete messr;
	printf("debug4n");
       	TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
	inst->fWaitingClient[from_elc][toelc]=kFALSE;
      }
      */
      TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
      continue;
    }

    if (mtype=='Q'){//query for queue size
      int size=0;
      TThread::Lock();//LLLLLOOOOOCCCCCKKKKK  
      size=inst->fMessQueue[from_elc][toelc].size();
      TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
      sprintf(str,"%d",toelc);
      s->Send(str);
      sprintf(str,"SIZE %d",size);
      s->Send(str);
      continue;
    }
    
    if (mtype=='R' || mtype=='N'){ // reading client
      TMessage *messr=0;
      TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
      if(!inst->fMessQueue[toelc][from_elc].empty()) { // reverse order from, to
	messr=inst->fMessQueue[toelc][from_elc].front();
	inst->fMessQueue[toelc][from_elc].pop(); // remove message from queue
	TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
	sprintf(str,"%d",toelc);
	s->Send(str);
	s->Send(*messr);
	//messr->Dump();
	delete messr; messr=0;
      } else { // no message
	if(mtype == 'N') { // nowait client
	  TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
	  //sprintf(str,"%d",toelc);
	  //s->Send(str); 
	  s->Send("EMPTY");
	} else {   // waiting client R, default
	  inst->fWaitingClient[toelc][from_elc] = kTRUE;
	  TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
	  bool test=kTRUE;
	  do {
	    TThread::Lock();//LLLLLOOOOOCCCCCKKKKK
	    test=inst->fMessQueue[toelc][from_elc].empty();
	    if (!test){
	      messr=inst->fMessQueue[toelc][from_elc].front();
	      inst->fMessQueue[toelc][from_elc].pop(); //remove mesg from queue
	      inst->fWaitingClient[toelc][from_elc] = kFALSE;
	      TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
	      sprintf(str,"%d",toelc);
	      s->Send(str);
	      s->Send(*messr);
	      delete messr; messr=0;
	    } else 
	      TThread::UnLock();//LLLLLOOOOOCCCCCKKKKK
	    //	    cout<<"waiting("<<meid<<")"<<s->GetInetAddress().GetHostAddress()<<endl;
	    sleep(1);
	  } while (test);
	  
	}
      }
      continue;
    } 

    //ignoring unknown options
    cout<<"Unknown option ("<<meid<<") "<<mtype<<" from ("<<from_elc<<")"<<s->GetInetAddress().GetHostAddress()<<endl;
  } //end of while(1)
  return;  
}

 TSocket* TLServer::Connect(TSocket* fs,TLServer* inst) 
{
  //the connection of socket fs is accepted if the appropriate
  //communication and ELClientType is avalable.
  //Thread must be locked!!!

  TSocket* s=Accept();
  char rstr[80];
  s->Recv(rstr,80);
  int elc;
  sscanf(rstr,"%d",&elc);
  if (elc<kLC_MAX){//NORMAL CONNECTION
    if(inst->fSocket[elc]==0) {
      inst->fSocket[elc]=s;
      inst->fSocket[elc]->Send(rstr);
      cout<<ClassName()<<" Send "<<rstr<<endl;
      return s;
    } else {
      // Drop connection
      s->Send("Denied");
      s->Close();
      delete s;
      return 0;
    }
  }
  if (elc==kLC_AMAX){//ACONNECTION
    int i=0;
    while(inst->fASocket[i]!=0 && i<kLC_AMAX-kLC_MAX-1) i++;
    if(i>=kLC_AMAX-kLC_MAX-1) {
      cout<<"No empty slot!n";
      s->Send("Denied");
      s->Close();
      delete s;
      return 0;
    }
    elc=i;
    inst->fASocket[elc]=s;
    inst->fASocket[elc]->Send(rstr);
    cout<<ClassName()<<" Send "<<rstr<<endl;
    return s;
  }
  if (s) {
    s->Close();
    delete s;
  }
  return 0;  
}


 void TLServer::Listen() 
{
  //Listen for a connection. For each connection starts new thread
  //if there are many connections, which only want to do bad, then 
  //this method is not good. Connection is not accepted here! Only the 
  //socket is stored in fS.
  //
  //Number of threads is equal to the size of LClientType enum.
  //With each new thread the Thread_func() is started. Then the 
  //listening to this client is deactivated, to avoid starting 
  //new thread for each message.
  //Thread pointers are stored in pmThread pointer array.
  //
  //See Thread_func() for more informations...

  while((fS=fMon->Select())) {
    //start new thread
    
    TThread::Lock();//LLLLLLLLLLOOOOOOOOOO
    int thrnum=0;
    while (thrnum<kLC_AMAX) {
      if (!pmThread[thrnum]) break;
      else {
	if (pmThread[thrnum]->GetState()==TThread::kCanceledState) {
	  Thread_stop(thrnum);
	  break;
	} else {
	  thrnum++;
	}
      }
    }
    TThread::UnLock();//LLLLLLLLLLOOOOOOOOOO
    if (thrnum>=kLC_AMAX) 
      cerr<<"Listen() WARNING: no more threads leftn";
    else {
      //      cout<<"Listen() WARNING: starting thread "<<thrnum<<"n";
      char label[20];
      sprintf(label,"*%d*",thrnum);
      pmThread[thrnum]=new TThread(label,
				   (void(*) (void *))&Thread_func,
				   (void*) this);
      fMon->DeActivate(fS); 
      pmThread[thrnum]->Run();
    } 
  }
}


ROOT page - Class index - Top of the page

This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.