Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

PsnSvm Class Reference

Defines an abstract message passing virtual machine. More...

#include <PsnSvm.h>

Inheritance diagram for PsnSvm:

Inheritance graph
[legend]
Collaboration diagram for PsnSvm:

Collaboration graph
[legend]
List of all members.

Public Methods

 PsnSvm (PsNameToPointerMap< PsnProcess > *tab, const PsDate &latence)
 constructor

virtual ~PsnSvm ()
 Destructor.

virtual void init (const PsDate &initialSimulationDate)
 initialise the distributed virtual machine

virtual void syncDistributedSites ()
 sync all the distributed sites.

virtual void connectToDistributedSimulation ()
 connect the distributed virtual machine to its peers

virtual void createDistributedSimulation (const PsDate &initialSimulationDate)
 create a distributed simulation

virtual void broadcast (PsnPvmMessage::MessageTag, PsnPvmMulticastMessage *message=NULL)
 broadcast a message

virtual void synchronizeOn (PsPvmController &, PsnPvmMessage::MessageTag tag)
 enter a synchronisation barrier, between all distributed sites except the central site this is a blocking call, but the virtual machine still answer requests.

virtual void disconnectFromDistributedSimulation (const PsDate &deconnexionDate)
 Disconnect the site from the distributed simulation.

virtual const PsNamegetSiteName () const
 get the site name in the simulation

virtual PsnSvmLinkgetLinkToProcessNamed (const PsName &processName)
 get acces to the link to a given process

virtual PsnProcessgetProcessDescriptorNamed (const PsName &processName)
 get acces to a given process descriptor

void processReceivedMessages (PsPvmController &parsingController, const PsnPvmMessage::MessageTag tag)
 Attempt to receive and process messages from all sites, in a non blocking fashion.

void waitAndProcessMessages (PsPvmController &parsingController, const PsnPvmMessage::MessageTag tag)
 receive and process messages from all sites, and wait for the messages to arrive if necessary

void waitForMessage (PsPvmController &parsingController, const PsnPvmMessage::MessageTag tag)
 wait until a message has arrived from one site

void waitForMessageFrom (PsPvmController &parsingController, const PsName &processName, const PsnPvmMessage::MessageTag tag)
 receive a message from a particular process

void synchroniseReceiveAndProcessMessages (PsPvmController &parsingController, const PsnPvmMessage::MessageTag tag)
 receive and process messages from all sites (except the central site), and wait until at least a recent (as defined by latency) message has arrived from each of these sites

PsnProcesswaitForAnswerToBlockingRequest (PsPvmController &parsingController, PsnPvmMessage::MessageTag tag)
 receive and process messages from all sites (except the central site), and wait until at least on message of tag tag has arrived

void sendCurrentBuffersWithTag (const PsnPvmMessage::MessageTag tag)
 send the messages contained in the send buffers of all links

void timestampCurrentSendBuffers (const PsDate &date)
 timestamp the messages contained in the send buffers of all links

virtual const PsDategetSynchronisationLatency ()
 get the authorized latency for the relaxed receive


Protected Methods

virtual void addNewWorkstation (const PsName &m)=0
 add a new machine to the virtual machine

virtual void removeWorkstation (const PsName &m)=0
 remove a work station from the virtual machine

virtual int spawnProcess (PsnProcess *p)=0
 spawn a copy of this proc'ess on another site

virtual PsnSvmLinkcreateSvmLink (const int &d=0)=0
 create a link to a distant site

virtual int getParentSiteId ()=0
 get the site id having spanned this process.

virtual int getSiteId ()=0
 get the id in the svm of this site

virtual void groupBarrier (string groupName, int numberToJoin)=0
 block the calling process until numberToJoin processes of group groupName have called groupBarrier

virtual void broadcastToGroup (string groupName, PsnPvmMessage::MessageTag)=0
 broadcast an empty tagged message to all members of a broadcast group

virtual void initBeforeMessagePacking ()=0
 do any usefull initialisations before messages are packed

virtual int nonblockingReceive (PsnPvmMessage::MessageTag tag)=0
 try and receive message in a non-blocking fashion the received messages are lost

virtual pair< PsnPvmMessage::MessageTag,
int > 
waitForAnyRequests (PsnPvmIncomingMessage &receiveBuffer)=0
 wait for any message sent to this site.

virtual void joinSvmGroup (string &groupName)=0
 join a synchronisation group join a group of processes in the siames virtual machine

virtual void serveNameRequestsUntilEnd ()
 for the central site: go into server mode, receiving and processing any reveived request until all sites have disconnected


Protected Attributes

PsNameToPointerMap< PsnProcess > * _processTable
 map containing all information about the different processes

map< int, PsnProcess * > _idToProcessTable
 map storing correspondance between processId and PsnProcess

int _siteId
 workstation identifier in the svm

PsnSvmLink_linkToCentralSite
 the link to the central site

PsName _siteName
 Name of the process associated to this site.

int _numberOfSlaves
 number of slaves ( PvmControllers ) in the simulation

const PsDate _synchronisationLatency
 in millisecond, the authorized latency when synchronising all the sites


Static Protected Attributes

string _slaveGroupName = "spawnedControllers"
 name of the group all the slave processes join

string _masterSiteName = "OpenMaskMasterDistributedSite"
 reserved name of the master distributed site


Detailed Description

Defines an abstract message passing virtual machine.

Author:
Siames
Version:
2.1

Definition at line 40 of file PsnSvm.h.


Constructor & Destructor Documentation

PsnSvm::PsnSvm PsNameToPointerMap< PsnProcess > *    tab,
const PsDate   latence
 

constructor

Parameters:
tab a table containing the a map between a processName and the data structure definig a process
latence the authorized synchronisation latency

Definition at line 38 of file PsnSvm.cxx.

References PsDate.

00038                                                                             : 
00039    _processTable ( tab ),
00040    _synchronisationLatency ( latence ),
00041    _siteId ( -1 ), //negative values indicates an error
00042    _numberOfSlaves ( 0 )
00043 {
00044 }

PsnSvm::~PsnSvm   [virtual]
 

Destructor.

Definition at line 47 of file PsnSvm.cxx.

00048 {
00049 }


Member Function Documentation

virtual void PsnSvm::addNewWorkstation const PsName   m [protected, pure virtual]
 

add a new machine to the virtual machine

Implemented in PsnPvmSvm.

Referenced by createDistributedSimulation().

void PsnSvm::broadcast PsnPvmMessage::MessageTag    tag,
PsnPvmMulticastMessage   mess = NULL
[virtual]
 

broadcast a message

reusing mess

Definition at line 337 of file PsnSvm.cxx.

References _processTable, _siteName, map< PsName, PsnProcess * >::begin(), map< PsName, PsnProcess * >::end(), PsnPvmOutgoingMessage::insertTimeStamp(), PsnPvmMessage::MessageTag, and vector< T, Alloc >::push_back().

Referenced by synchronizeOn().

00338 {
00339   //the first implementation, commented, dies in pvm with an unexpected error
00340   //I haven't had time to trace the problem, so the second implementation is used
00341 
00342   /* first implementation
00343   initBeforeMessagePacking () ;
00344   
00345   if ( mess != NULL )
00346     {
00347       mess->packMessage() ;
00348     }
00349   else
00350     {
00351       mess = new PsnPvmOutgoingMessage () ;
00352       mess->insertTimeStamp(0) ;
00353       *mess << "dummy";
00354       mess->packMessage() ;
00355     }
00356   
00357   broadcastToGroup ( slaveGroup, tag ) ;
00358   */
00359 
00360 #ifdef _DEBUGPVMMESS
00361   cerr<<"PsnSvm::broadcast ("<<tag<<","<<mess<<") ; "<<endl;
00362 #endif
00363 
00364   PsNameToPointerMap<PsnProcess>::iterator processIter ; 
00365   PsnSvmLink * canal ; // Canal vers le precedent processus
00366   
00367   if (mess == NULL )
00368     {
00369       vector<int> distantRecepients ;
00370       for (processIter = _processTable->begin () ; processIter != _processTable->end () ; ++processIter) 
00371         {
00372           if (_siteName != (*processIter).second->getProcessName ()) 
00373             {
00374               // On connait alors le canal a observer
00375               distantRecepients.push_back ( (*processIter).second->getSvmLink ()->getTID () ) ;
00376             }
00377         }
00378 
00380       mess = new PsnPvmMulticastMessage ( distantRecepients ) ;
00381       mess->insertTimeStamp(0) ;
00382     }
00383 
00384   mess -> send ( tag ) ;
00385 
00386 }

virtual void PsnSvm::broadcastToGroup string    groupName,
PsnPvmMessage::MessageTag   
[protected, pure virtual]
 

broadcast an empty tagged message to all members of a broadcast group

Implemented in PsnPvmSvm.

void PsnSvm::connectToDistributedSimulation   [virtual]
 

connect the distributed virtual machine to its peers

Definition at line 242 of file PsnSvm.cxx.

References _idToProcessTable, _linkToCentralSite, _numberOfSlaves, _processTable, _siteName, _slaveGroupName, map< PsName, PsnProcess * >::begin(), createSvmLink(), map< PsName, PsnProcess * >::end(), PsName::getNameServer(), getParentSiteId(), PsnProcess::getSvmLink(), PsnSvmLink::getTID(), map< int, PsnProcess * >::insert(), joinSvmGroup(), make_pair(), PsnPvmMessage::ProcessTable, PsDate, PsName::setNameServer(), PsnProcess::setSvmLink(), PsnPvmMessage::SiteName, syncDistributedSites(), and PsnSvmLink::waitForMessage().

Referenced by init().

00243 {
00244 #ifdef _DEBUGPVMEXEC
00245   cerr<<"PsnSvm::connectToDistributedSimulation (): connecting to the distributed simulation"<<endl;
00246 #endif
00247 
00248    PsnPvmIncomingMessage * np = NULL ;
00249 
00250    // connect to the global controller
00251    _linkToCentralSite = createSvmLink ( getParentSiteId () ) ;
00252 
00253    // join the group of slave processes
00254    joinSvmGroup (_slaveGroupName) ;
00255 
00256 
00257    // create a distributed version of the nameServer
00258    PsPvmNameServer * pvmNameServer = new PsPvmNameServer ( getParentSiteId() , *PsName::getNameServer() ) ; 
00259    PsName::setNameServer ( pvmNameServer ) ; 
00260 
00261    // find out the name of this site 
00262    PsDate initialDate ;
00263    np = & _linkToCentralSite->waitForMessage (PsnPvmMessage::SiteName) ;
00264    *np >> _siteName ;
00265 
00266    // find out the adresses of the other sites 
00267    // - extract the timestamp of the communication
00268    _linkToCentralSite->waitForMessage (PsnPvmMessage::ProcessTable) ;
00269 
00270    // - extract, for each process, it's adress in the virtual machine and create a link
00271    PsNameToPointerMap<PsnProcess>::iterator i ;
00272    for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00273      {
00274        ++_numberOfSlaves ; //we add ourselves, but as _numberOfSlaves is initialised to 0, it's ok
00275 
00276       // Remarque : creer un canal sur soi-meme n'est pas genant,
00277       // on met juste a jour les tids dans la table des pss
00278 
00279       PsnProcess * p = (*i).second ;
00280       // creation d'un canal sans destinataire 
00281       p->setSvmLink (createSvmLink ()) ;
00282       // maj du destinataire
00283       *np  >> *p ;
00284 
00285       _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ;
00286    }
00287 
00288    //wait for all other sites to be connected to the distributed simulation.
00289    //avoids sending messages to sites still connected or worse, not yet connected
00290    syncDistributedSites () ;
00291 }

void PsnSvm::createDistributedSimulation const PsDate   initialSimulationDate [virtual]
 

create a distributed simulation

Parameters:
initialSimulationDate: the initial simualtion date needed to start timestamping mexchanged messages

Definition at line 80 of file PsnSvm.cxx.

References _idToProcessTable, _masterSiteName, _processTable, _siteName, addNewWorkstation(), map< PsName, PsnProcess * >::begin(), createSvmLink(), map< PsName, PsnProcess * >::end(), PsnSvmLink::getOutgoingBuffer(), PsnProcess::getSvmLink(), PsnSvmLink::getTID(), map< int, PsnProcess * >::insert(), PsnPvmOutgoingMessage::insertTimeStamp(), make_pair(), PsnPvmMessage::ProcessTable, PsDate, vector< T, Alloc >::push_back(), PsnPvmMulticastMessage::send(), PsnPvmOutgoingMessage::send(), serveNameRequestsUntilEnd(), PsnProcess::setSvmLink(), PsnPvmMessage::SiteName, and spawnProcess().

Referenced by init().

00081 {
00082 #ifdef _DEBUGPVMEXEC
00083    cerr << "PsnSvm::createDistributedSimulation : creating the distributed simulation" << endl ;
00084 #endif
00085    _siteName = _masterSiteName ;
00086 
00088      
00089      PsNameToPointerMap<PsnProcess>::iterator i ;
00090      
00091      for (i = _processTable->begin () ; i != _processTable->end () ; ++i ) 
00092        {
00093          addNewWorkstation ( (*i).second->getHostMachineName () ) ;
00094        }
00095 
00096      
00097      // creation des processus locaux
00098 #ifdef _DEBUGPVMEXEC
00099      cerr << "Creation des processus locaux..................." << endl ;
00100 #endif
00101      PsnProcess * processInfo ;
00102      int adrLoc ;
00103 
00104      vector<int> processSiteIds ;
00105      
00106      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00107        {
00108          processInfo = (*i).second ;
00109          
00110          // spawn a new process on the specified workstation
00111          adrLoc = spawnProcess (processInfo) ;
00112          
00113          // create a SvmLink, add it to processInfo
00114          processInfo->setSvmLink (createSvmLink (adrLoc)) ; 
00115          
00116          _idToProcessTable.insert ( make_pair ( adrLoc,processInfo ) ) ;
00117 
00118          processInfo->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( initialSimulationDate ) ;
00119 
00120          // send the name of the process (in the Svm) to the created process
00121          processInfo->getSvmLink ()->getOutgoingBuffer()<<(*i).first ;
00122          
00123          processInfo->getSvmLink ()->getOutgoingBuffer().send (PsnPvmMessage::SiteName) ;
00124 
00125          //prepare processSiteIds for the next step
00126          processSiteIds.push_back ( processInfo->getSvmLink ()->getTID() ) ;
00127 
00128 #ifdef _DEBUGPVMMESS
00129          cerr<<typeid((*i).first).name()<<endl;
00130          cerr<<(*i).first<<" of id "<<processInfo->getSvmLink ()->getTID()<<endl;
00131 #endif
00132        }
00133      
00134 
00135      // Encodage des attributs Svm de chaque processus
00136      PsnPvmMulticastMessage nameToIdMessage ( processSiteIds ) ;
00137      nameToIdMessage.insertTimeStamp ( initialSimulationDate ) ;
00138      
00139      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00140        {
00141          nameToIdMessage << *((*i).second) ;
00142        }
00143      
00144      // Transfert des adresses vers chaque controleur local
00145      nameToIdMessage.send ( PsnPvmMessage::ProcessTable ) ;
00146      
00147      serveNameRequestsUntilEnd () ;
00148 
00149 }

virtual PsnSvmLink* PsnSvm::createSvmLink const int &    d = 0 [protected, pure virtual]
 

create a link to a distant site

Parameters:
d the site id of the distant site

Implemented in PsnPvmSvm.

Referenced by connectToDistributedSimulation(), and createDistributedSimulation().

void PsnSvm::disconnectFromDistributedSimulation const PsDate   deconnexionDate [virtual]
 

Disconnect the site from the distributed simulation.

Parameters:
deconnexionDate. date at wich the disconnection is happening (to timestamp the disconnection message) This will trigger diconnection on a sites.

Definition at line 295 of file PsnSvm.cxx.

References _processTable, map< PsName, PsnProcess * >::begin(), PsnPvmMessage::EndOfSimulation, getParentSiteId(), getSiteName(), PsnPvmOutgoingMessage::insertTimeStamp(), PsDate, vector< T, Alloc >::push_back(), PsnPvmMulticastMessage::send(), and PsnPvmUnicastMessage::send().

Referenced by serveNameRequestsUntilEnd().

00296 {
00297 #ifdef _DEBUGPVMEXEC
00298    cout <<"PsnSvm::disconnectFromDistributedSimulation"<< endl ;
00299 #endif
00300    int recepient = getParentSiteId () ;
00301 
00302    if ( recepient != 0 )
00303       // a local PvmController has finished. Notify the central controller
00304       {
00305          PsnPvmUnicastMessage endMessage (recepient) ;
00306          
00307          endMessage.insertTimeStamp ( endOfSimulationDate ) ;
00308  
00309          endMessage << getSiteName () ;
00310 
00311          endMessage.send ( PsnPvmMessage::EndOfSimulation ) ;
00312       }
00313    else
00314       //the central controller is deconnecting. Notify all local controllers
00315       {
00316          vector<int> processSiteIds ;
00317          for (PsNameToPointerMap<PsnProcess>::iterator i = _processTable->begin () ; 
00318               i != _processTable-> end () ; 
00319               i ++) 
00320             {
00321                processSiteIds.push_back ( (i->second)->getSvmLink ()->getTID() ) ;
00322             }
00323          PsnPvmMulticastMessage endOfSimulationMessage ( processSiteIds ) ;          
00324 
00325          endOfSimulationMessage.insertTimeStamp ( endOfSimulationDate ) ;
00326 
00327          endOfSimulationMessage << getSiteName () ;
00328 
00329          endOfSimulationMessage.send ( PsnPvmMessage::EndOfSimulation ) ;
00330       }
00331 }

PsnSvmLink * PsnSvm::getLinkToProcessNamed const PsName   processName [virtual]
 

get acces to the link to a given process

Definition at line 424 of file PsnSvm.cxx.

References _processTable, map< PsName, PsnProcess * >::end(), map< PsName, PsnProcess * >::find(), PsNameToPointerMap< PsnProcess >::getObjectOfIndex(), PsnProcess::getSvmLink(), PsController::SomeWarnings, and PsController::warning().

Referenced by waitForMessageFrom().

00425 {
00426    // On recupere le pss associe a l'objet
00427    if (_processTable->find (nomPss) != _processTable->end ()) 
00428       {
00429          // On a bien trouve le processus
00430          PsnProcess * pss = _processTable->getObjectOfIndex (nomPss) ;
00431          return (pss->getSvmLink ()) ;
00432       } 
00433    else {
00434       // On n'a pas le processus demande
00435 #ifdef _USESSTREAM
00436       ostringstream o ;
00437       o << "PsnSvm::getLinkToProcessNamed : Impossible de trouver le processus "
00438         << nomPss << " afin de recuperer son canal !" ;
00439       PsController::warning (o.str(), PsController::SomeWarnings ) ;
00440 #else
00441       ostrstream o ;
00442       o << "PsnSvm::getLinkToProcessNamed : Impossible de trouver le processus "
00443         << nomPss << " afin de recuperer son canal !" ;
00444       o.put ('\0') ;
00445       PsController::warning (o.str(), PsController::SomeWarnings ) ;
00446       delete o.str() ;
00447 #endif
00448       return NULL ;
00449    }
00450 }

virtual int PsnSvm::getParentSiteId   [protected, pure virtual]
 

get the site id having spanned this process.

Returns:
the site id, 0 meaning the process wasn't spawned by the virtual machine

Implemented in PsnPvmSvm.

Referenced by connectToDistributedSimulation(), disconnectFromDistributedSimulation(), and init().

PsnProcess * PsnSvm::getProcessDescriptorNamed const PsName   processName [virtual]
 

get acces to a given process descriptor

Definition at line 454 of file PsnSvm.cxx.

References _processTable, map< PsName, PsnProcess * >::end(), map< PsName, PsnProcess * >::find(), PsNameToPointerMap< PsnProcess >::getObjectOfIndex(), PsController::SomeWarnings, and PsController::warning().

00455 {
00456    // On recupere le pss associe a l'objet
00457    if (_processTable->find (nomPss) != _processTable->end ()) 
00458       {
00459          // On a bien trouve le processus
00460          PsnProcess * pss = _processTable->getObjectOfIndex (nomPss) ;
00461          return (pss) ;
00462       } 
00463    else 
00464       {
00465          // On n'a pas le processus demande
00466 #ifdef _USESSTREAM
00467          ostringstream o  ;
00468          o << "PsnSvm::getProcessDescriptorNamed: process " << nomPss << " unknown !" ;
00469          PsController::warning (o.str(), PsController::SomeWarnings ) ;
00470 #else
00471          ostrstream o  ;
00472          o << "PsnSvm::getProcessDescriptorNamed: process " << nomPss << " unknown !" ;
00473          o.put('\0') ;
00474          PsController::warning (o.str(), PsController::SomeWarnings ) ;
00475          delete o.str() ;
00476 #endif
00477          return NULL ;
00478       }
00479 }

virtual int PsnSvm::getSiteId   [protected, pure virtual]
 

get the id in the svm of this site

Implemented in PsnPvmSvm.

const PsName & PsnSvm::getSiteName   const [virtual]
 

get the site name in the simulation

Definition at line 805 of file PsnSvm.cxx.

References _siteName.

Referenced by disconnectFromDistributedSimulation().

00806 {
00807    return _siteName;
00808 }

const PsDate & PsnSvm::getSynchronisationLatency   [virtual]
 

get the authorized latency for the relaxed receive

Definition at line 52 of file PsnSvm.cxx.

References _synchronisationLatency, and PsDate.

00053 {
00054   return _synchronisationLatency;
00055 }

virtual void PsnSvm::groupBarrier string    groupName,
int    numberToJoin
[protected, pure virtual]
 

block the calling process until numberToJoin processes of group groupName have called groupBarrier

Parameters:
numberToJoin number of processes to wait for
groupName the name of the process group

Implemented in PsnPvmSvm.

Referenced by syncDistributedSites().

void PsnSvm::init const PsDate   initialSimulationDate [virtual]
 

initialise the distributed virtual machine

Definition at line 59 of file PsnSvm.cxx.

References connectToDistributedSimulation(), createDistributedSimulation(), getParentSiteId(), and PsDate.

00060 {
00061    if ( getParentSiteId () == 0 )  // wasn't created by a spawn
00062      {
00063        createDistributedSimulation ( initialSimulationDate ) ;
00064      }
00065    else
00066      {
00067        connectToDistributedSimulation () ;
00068      }
00069 }

virtual void PsnSvm::initBeforeMessagePacking   [protected, pure virtual]
 

do any usefull initialisations before messages are packed

Implemented in PsnPvmSvm.

virtual void PsnSvm::joinSvmGroup string &    groupName [protected, pure virtual]
 

join a synchronisation group join a group of processes in the siames virtual machine

Parameters:
groupName the name of the group of processes to join. This parameter is not const, because of the underlying C compatibility layer

Implemented in PsnPvmSvm.

Referenced by connectToDistributedSimulation().

virtual int PsnSvm::nonblockingReceive PsnPvmMessage::MessageTag    tag [protected, pure virtual]
 

try and receive message in a non-blocking fashion the received messages are lost

Parameters:
tag the tag of the messages to receive
Returns:
number of received messages with that tag

Implemented in PsnPvmSvm.

void PsnSvm::processReceivedMessages PsPvmController   parsingController,
const PsnPvmMessage::MessageTag    tag
 

Attempt to receive and process messages from all sites, in a non blocking fashion.

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive

Definition at line 484 of file PsnSvm.cxx.

References _processTable, _siteName, map< PsName, PsnProcess * >::begin(), map< PsName, PsnProcess * >::end(), PsnPvmMessage::getSize(), PsnPvmIncomingMessage::hasMessage(), PsnPvmMessage::MessageTag, PsPvmController::parseSynchronisationMessage(), and PsnSvmLink::testForMessage().

00485 {
00486 #ifdef _DEBUGPVMMESS
00487    cerr << "PsnSvm::processReceivedMessages"<<endl;
00488 #endif
00489    PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau
00490    PsnSvmLink * canal ; // Canal vers le precedent processus
00491    PsnPvmIncomingMessage * messageRecu = NULL ;
00492    for (pPss = _processTable->begin () ; 
00493         pPss != _processTable->end () ; 
00494         pPss ++) 
00495       {
00496          if (_siteName != (*pPss).second->getProcessName ()) 
00497             {
00498                // On connait alors le canal a observer
00499                canal = (*pPss).second->getSvmLink () ;
00500 #ifdef _DEBUGPVMMESS
00501                cout << "PsnSvm::processReceivedMessages from "
00502                     << (*pPss).second->getProcessName () << " tag : "
00503                     << tag << endl ;
00504                //canal->affiche () ;
00505 #endif
00506                // On fait une reception non bloquante
00507                messageRecu = canal->testForMessage (tag) ;
00508                while ( messageRecu->hasMessage () ) 
00509                   {
00510 #ifdef _DEBUGPVMMESS
00511                      cerr << "PsnSvm::processReceivedMessages : message received of size " <<messageRecu->getSize()<< endl;
00512 #endif        
00513 #ifdef _DEBUGPVMMESS
00514                      cerr << "PsnSvm::processReceivedMessages: extracted emission date: " <<messageREcu->getMessageDate()<< endl;
00515 #endif        
00516                      //cerr << "PsnSvm::processReceivedMessages : date " << dateEmission << endl ;
00517                      // On envoie le message et l'objet au controleur local
00518                      parsingController.parseSynchronisationMessage (messageRecu) ;
00519                      messageRecu = canal->testForMessage (tag) ;
00520                   }
00521             }
00522       }
00523 }

virtual void PsnSvm::removeWorkstation const PsName   m [protected, pure virtual]
 

remove a work station from the virtual machine

Implemented in PsnPvmSvm.

void PsnSvm::sendCurrentBuffersWithTag const PsnPvmMessage::MessageTag    tag
 

send the messages contained in the send buffers of all links

Parameters:
tag the tag to associate to the message

Definition at line 715 of file PsnSvm.cxx.

References _processTable, _siteName, map< PsName, PsnProcess * >::begin(), map< PsName, PsnProcess * >::end(), PsSynchronisationMessage::endOfSynchronisationFragment, PsnPvmMessage::MessageTag, PsnSvmLink::sendOutgoingBuffer(), and PsnPvmMessage::SynchronisationMessage.

00716 {
00717    PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau
00718    PsnSvmLink * canal ; // Canal vers le precedent processus
00719 
00720 #ifdef _DEBUGPVMMESS
00721    cerr<<"PsnSvm::sendCurrentBufferWithTag tag "<<tag<<endl<<"Processus : "<<endl;
00722    PsNameToPointerMap<PsnProcess>::iterator pPs ;
00723    for (pPs = _processTable->begin () ; pPs != _processTable->end () ; pPs ++) 
00724      {
00725        cerr<<(*pPs).second->getProcessName ();
00726      }
00727    cerr<<endl;
00728 #endif
00729 
00730    for (pPss = _processTable->begin () ; pPss != _processTable->end () ; pPss ++) 
00731      {
00732 #ifdef _DEBUGPVMMESS
00733        cerr<<"Traitement des emissions vers "<<(*pPss).second->getProcessName ()<<endl;
00734 #endif
00735        if (_siteName != (*pPss).second->getProcessName ()) 
00736          {
00737            // On connait alors le canal a observer
00738            canal = (*pPss).second->getSvmLink () ;
00739 
00740            if ( tag == PsnPvmMessage::SynchronisationMessage ) 
00741              {
00742                 (*pPss).second->getSvmLink ()->getOutgoingBuffer()<<PsSynchronisationMessage::endOfSynchronisationFragment ;
00743              }
00744 
00745            // On transmet le message du canal
00746            canal->sendOutgoingBuffer ( tag ) ;
00747 #ifdef _DEBUGPVMMESS
00748            cerr << "PsmSvm::transmission avec tag : "
00749                 << tag << endl ;
00750 #endif
00751          }
00752 #ifdef _DEBUGPVMMESS
00753        else 
00754          {
00755            cerr << "Rien ŕ faire "<<endl ;
00756            //canal->affiche () ;
00757          }
00758 #endif
00759      }
00760 }

void PsnSvm::serveNameRequestsUntilEnd   [protected, virtual]
 

for the central site: go into server mode, receiving and processing any reveived request until all sites have disconnected

Definition at line 153 of file PsnSvm.cxx.

References _processTable, PsController::AllWarnings, disconnectFromDistributedSimulation(), PsnPvmMessage::EndOfSimulation, map< PsName, PsnProcess * >::erase(), PsController::FatalWarnings, pair< T1, T2 >::first, PsString::getCString(), PsNameServerT< STL_ALLOC >::getIdentifier(), PsnPvmIncomingMessage::getMessageDate(), PsName::getNameServer(), PsNameServerT< STL_ALLOC >::getStringAssociatedTo(), PsName::idType, PsnPvmMessage::NameServiceGetId, PsnPvmMessage::NameServiceGetString, PsnPvmMessage::NameServiceReturnId, PsnPvmMessage::NameServiceReturnString, PsnPvmMessage::NameServiceVerifyLocalNameServer, PsDate, PsnPvmSvm::pvmDataEncoding, pair< T1, T2 >::second, PsName::setNameServer(), map< PsName, PsnProcess * >::size(), PsPvmCentralNameServer::verifyCompatibilityWithLocalNameServer(), and waitForAnyRequests().

Referenced by createDistributedSimulation().

00154 {
00155   bool serving = true ;
00156 
00157   //convert the actual name Server to a PvmCentralNameServer
00158   PsPvmCentralNameServer * nameServer = new PsPvmCentralNameServer ( *PsName::getNameServer() ) ;
00159   PsName::setNameServer (nameServer ) ;
00160 
00161 
00162   PsnPvmIncomingMessage receiveBuffer ;
00163   pair<PsnPvmMessage::MessageTag,  int> receivedRequest ;  
00164 
00165   while ( serving )
00166     {
00167 
00168       receivedRequest = waitForAnyRequests ( receiveBuffer ) ;
00169 
00170       switch (receivedRequest.first)
00171         {
00172         case PsnPvmMessage::EndOfSimulation :
00173            {
00174               const PsDate & endDate (receiveBuffer.getMessageDate() );
00175               PsName endedProcess ;
00176               receiveBuffer>>endedProcess ;
00177               cerr<<endedProcess<<" finished at "<<endDate<<endl;
00178               _processTable->erase ( endedProcess ) ;
00179               if ( _processTable->size() == 0 ) 
00180                  {  
00181                     disconnectFromDistributedSimulation ( endDate ) ;
00182                     serving = false ;
00183                  }
00184            }
00185            break;
00186         case PsnPvmMessage::NameServiceGetId :
00187           {
00188             int stringLength ;
00189             receiveBuffer>>stringLength ;
00190             char * stringId = new char[stringLength] ;
00191             receiveBuffer>>stringId ;
00192             assert ( stringId [stringLength - 1] == 0 ) ;
00193             cerr<<"Requested id for string "<<stringId<<" of length "<<stringLength
00194                 <<" from "<< hex << receivedRequest.second << dec;
00195             PsName::idType id = nameServer->getIdentifier( PsString(stringId) ) ;
00196             cerr<<" sending "<<id<<endl;
00197             delete [] stringId ;
00198             pvm_initsend( PsnPvmSvm::pvmDataEncoding ) ;
00199             pvm_pklong (&id, 1, 1);
00200             pvm_send (receivedRequest.second,PsnPvmMessage::NameServiceReturnId) ;  
00201           }
00202           break;
00203         case PsnPvmMessage::NameServiceGetString:
00204           {
00205             PsName::idType id ;
00206             receiveBuffer>>id ;
00207             cerr<<"Requested String for id "<<id
00208                 <<" from "<< hex << receivedRequest.second<< dec ;
00209             const PsString & resultingString ( nameServer->getStringAssociatedTo (id) ) ;
00210             cerr<<" sending "<<resultingString<<endl;
00211             pvm_initsend( PsnPvmSvm::pvmDataEncoding ) ;
00212             pvm_pkstr (const_cast<char *> (resultingString.getCString() ) ) ;
00213             pvm_send (receivedRequest.second,PsnPvmMessage::NameServiceReturnString) ;  
00214           }
00215           break;
00216         case PsnPvmMessage::NameServiceVerifyLocalNameServer:
00217           {
00218             nameServer->verifyCompatibilityWithLocalNameServer (receivedRequest.second, receiveBuffer) ;
00219           }
00220           break;
00221         default:
00222           //an unexpected request is received
00223           switch (PsController::warningLevel)
00224             {
00225             case PsController::AllWarnings :
00226               cerr<<"PsnSvm::serveNameRequestsUntilEnd() : unexpected request "<<receivedRequest.first<<endl;
00227               break ;
00228             case PsController::FatalWarnings :
00229               cerr<<"PsnSvm::serveNameRequestsUntilEnd() : unexpected request"<<receivedRequest.first<<endl;
00230               exit (2) ;              
00231               break;
00232             default://nothing to do
00233               break;
00234             }
00235           break;
00236         }
00237     }
00238 }

virtual int PsnSvm::spawnProcess PsnProcess   p [protected, pure virtual]
 

spawn a copy of this proc'ess on another site

Implemented in PsnPvmSvm.

Referenced by createDistributedSimulation().

void PsnSvm::syncDistributedSites   [virtual]
 

sync all the distributed sites.

blocks the calling process until all sites except the central site have called this member function

Definition at line 417 of file PsnSvm.cxx.

References _numberOfSlaves, _slaveGroupName, and groupBarrier().

Referenced by connectToDistributedSimulation().

00418 {
00419    groupBarrier ( _slaveGroupName, _numberOfSlaves ) ;
00420 }

void PsnSvm::synchroniseReceiveAndProcessMessages PsPvmController   parsingController,
const PsnPvmMessage::MessageTag    tag
 

receive and process messages from all sites (except the central site), and wait until at least a recent (as defined by latency) message has arrived from each of these sites

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive

Definition at line 616 of file PsnSvm.cxx.

References _processTable, _siteName, _synchronisationLatency, PsNameToPointerMap< typeObjet >::addObjectWithIndex(), PsController::AllWarnings, map< PsName, PsnProcess * >::begin(), map< PsName, typeObjet * >::empty(), map< PsName, PsnProcess * >::end(), PsnPvmMessage::EndOfSimulation, PsnPvmMessage::EnterBarrier, map< PsName, typeObjet * >::erase(), PsnPvmMessage::ExitBarrier, PsnPvmIncomingMessage::getMessageDate(), PsnProcess::getProcessName(), PsController::getSimulatedDate(), PsnProcess::getSvmLink(), PsnPvmIncomingMessage::hasMessage(), PsnPvmMessage::InitialValuesForMirror, PsnPvmMessage::LocalInitSuccessfull, PsnPvmMessage::MessageTag, PsnPvmMessage::MirrorNeedsInitialValues, PsnPvmMessage::NameServiceGetId, PsnPvmMessage::NameServiceGetString, PsnPvmMessage::NameServiceReturnId, PsnPvmMessage::NameServiceReturnString, PsnPvmMessage::NameServiceVerifyLocalNameServer, PsnPvmMessage::NameServiceVerifyResult, PsPvmController::parseSynchronisationMessage(), PsnPvmMessage::ProcessTable, PsPvmController::sendInitialValuesToMirror(), PsnProcess::setDateOfLastMessage(), PsnPvmMessage::SiteName, PsnPvmMessage::SynchronisationMessage, PsnSvmLink::testForAnyMessage(), and PsController::warning().

00617 {
00618    PsNameToPointerMap<PsnProcess> aRecevoir ;
00619 
00620    //First make a list of processes that we need to receive values from
00621    for (   PsNameToPointerMap<PsnProcess>::iterator pPssE = _processTable->begin () ; 
00622            pPssE != _processTable->end () ; 
00623            pPssE ++) 
00624       {
00625          if ( (pPssE->first != _siteName) &&
00626               ( pPssE->second->getDateOfLastMessage () +
00627                 pPssE->second->getPeriod () +
00628                 _synchronisationLatency < parsingController.getSimulatedDate () ) ) 
00629             {
00630                aRecevoir.addObjectWithIndex (pPssE->first, pPssE->second) ;
00631 #ifdef _DEBUGPVMMESS
00632                cerr << "PsnSvm::synchroniseReceiveAndProcessMessages: trying reception from: "
00633                     << pPssE->first << endl ;
00634 #endif
00635             }
00636       }
00637 
00638    
00639    PsnPvmIncomingMessage receiveBuffer ;
00640    PsnPvmMessage::MessageTag receivedRequest ;  
00641    PsnProcess * receivingProcess ;
00642    
00643    while (! aRecevoir.empty () ) 
00644       { 
00645 
00646          for ( PsNameToPointerMap<PsnProcess>::iterator processIterator = _processTable->begin () ;
00647                processIterator != _processTable->end () ;
00648                ++processIterator 
00649                )
00650             {
00651                //this new implementation doesn't do blocking receives, as some processes could send urgent requests
00652                receivingProcess = processIterator->second ;
00653 
00654                receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
00655                
00656                if ( receiveBuffer.hasMessage() )
00657                   {
00658                      
00659                      switch (receivedRequest)
00660                         {
00661                         case PsnPvmMessage::SynchronisationMessage :
00662                            {
00663                               //have the controller process the message
00664                               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
00665                               
00666                               receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
00667                               
00668                               //an additionnal test could be performed here to make sure correct date of last message has happened
00669                               aRecevoir.erase ( receivingProcess->getProcessName() ) ;
00670                            }
00671                         break ;
00672                         case PsnPvmMessage::MirrorNeedsInitialValues :
00673                            {
00674                               parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
00675                            }
00676                         break ;
00677                         case PsnPvmMessage::InitialValuesForMirror :
00678                            {
00679                               //have the controller process the message
00680                               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
00681                               
00682                            }
00683                         break ;
00684                         case PsnPvmMessage::LocalInitSuccessfull :
00685                         case PsnPvmMessage::ProcessTable :
00686                         case PsnPvmMessage::EndOfSimulation :
00687                         case PsnPvmMessage::EnterBarrier :
00688                         case PsnPvmMessage::SiteName :
00689                         case PsnPvmMessage::ExitBarrier :
00690                         case PsnPvmMessage::NameServiceGetId :
00691                         case PsnPvmMessage::NameServiceGetString :
00692                         case PsnPvmMessage::NameServiceReturnId :
00693                         case PsnPvmMessage::NameServiceReturnString :
00694                         case PsnPvmMessage::NameServiceVerifyLocalNameServer :
00695                         case PsnPvmMessage::NameServiceVerifyResult :
00696                         default:
00697                            {
00698                               PsController::warning ("PsnSvm::waitForAnswerToBlockingRequest(): received unexpected message",
00699                                                      PsController::AllWarnings ) ;
00700                            }
00701                         }
00702                   }
00703             }
00704       }
00705          
00706    
00707 #ifdef _DEBUGPVMMESS
00708    cerr << "PsnSvm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ;
00709 #endif
00710 }

void PsnSvm::synchronizeOn PsPvmController  ,
PsnPvmMessage::MessageTag    tag
[virtual]
 

enter a synchronisation barrier, between all distributed sites except the central site this is a blocking call, but the virtual machine still answer requests.

Definition at line 390 of file PsnSvm.cxx.

References _numberOfSlaves, broadcast(), PsnPvmMessage::MessageTag, and waitForAnswerToBlockingRequest().

00391 {
00392   broadcast ( tag ) ;
00393 #ifdef _DEBUGPVMSYNCHRO
00394   cerr<< "PsnSvm::waitForAllProcesses("<<tag<<") "<<_numberOfSlaves <<endl;
00395   usleep(1000000) ;
00396 #endif
00397 
00398   int numberOfReadyProcesses = 1 ; //this process has entered the barrier
00399   
00400   while ( numberOfReadyProcesses != _numberOfSlaves )
00401      {
00402 
00403         // here, we ignore the results, because we suppose one process will only send tag once during synchronization
00404         waitForAnswerToBlockingRequest ( parsingController, tag ) ;
00405 
00406         numberOfReadyProcesses++ ;
00407 
00408 #ifdef _DEBUGPVMMESS
00409   cerr<< "PsnSvm::exitSoftBarrier numberToEnterBarrier: "<<numberOfReadyProcesses<<endl;
00410   usleep(1000000) ;
00411 #endif
00412 
00413      }
00414 }

void PsnSvm::timestampCurrentSendBuffers const PsDate   date
 

timestamp the messages contained in the send buffers of all links

Definition at line 791 of file PsnSvm.cxx.

References _processTable, _siteName, map< PsName, PsnProcess * >::begin(), map< PsName, PsnProcess * >::end(), and PsDate.

00792 {
00793    PsnPvmOutgoingMessage * message ;
00794    PsNameToPointerMap<PsnProcess>::iterator i ;
00795    for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00796      {
00797         if (_siteName != (*i).second->getProcessName ())
00798            { 
00799               i->second->getSvmLink ()->getOutgoingBuffer ().insertTimeStamp ( date ) ;
00800            }
00801      }
00802 }

void PsnSvm::waitAndProcessMessages PsPvmController   parsingController,
const PsnPvmMessage::MessageTag    tag
 

receive and process messages from all sites, and wait for the messages to arrive if necessary

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive

Definition at line 527 of file PsnSvm.cxx.

References _processTable, _siteName, map< PsName, PsnProcess * >::begin(), map< PsName, PsnProcess * >::end(), PsnPvmMessage::MessageTag, PsPvmController::parseSynchronisationMessage(), and PsnSvmLink::waitForMessage().

00529 {
00530 #ifdef _DEBUGPVMMESS
00531    cerr<<"PsnSvm::waitAndProcessMessages tag "<<tag<<endl<<"Processus : "<<endl;
00532    PsNameToPointerMap<PsnProcess>::iterator pPs ;
00533    for (pPs = _processTable->begin () ; pPs != _processTable->end () ; pPs ++) {
00534       cerr<<(*pPs).second->getProcessName ();
00535    }
00536    cerr<<endl;
00537 #endif
00538    PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau
00539    PsnSvmLink * canal ; // Canal vers le precedent processus
00540    PsnPvmIncomingMessage * messageRecu ;
00541 #ifdef _DEBUGPVMMESS
00542    cerr<<"PsnSvm::waitAndProcessMessages écoute de tous les processus"<<endl;
00543 #endif
00544    for (pPss = _processTable->begin () ; pPss!= _processTable->end () ; pPss ++) {
00545 #ifdef _DEBUGPVMMESS
00546       cerr<<"PsnSvm::waitAndProcessMessages écoute d'un de plus : "<<(*pPss).second->getProcessName ()<<endl;
00547 #endif
00548       if (_siteName != (*pPss).second->getProcessName ()) {
00549          // On connait alors le canal a observer
00550          canal = (*pPss).second->getSvmLink () ;
00551 #ifdef _DEBUGPVMMESS
00552          cout << "PsmSvm::waitAndProcessMessages sur "
00553               << (*pPss).second->getProcessName () << " tag : "
00554               << tag << endl ;
00555          canal->affiche () ;
00556 #endif
00557          // On recoit le message en bloquant
00558          messageRecu = & canal->waitForMessage (tag) ;
00559 #ifdef _DEBUGPVMMESS
00560          cerr << "PsnSvm::waitAndProcessMessages : message received"<< endl ;
00561 #endif
00562          // On envoie le message et l'objet au controleur local
00563          parsingController.parseSynchronisationMessage (messageRecu) ;
00564       }
00565    }
00566 }

PsnProcess * PsnSvm::waitForAnswerToBlockingRequest PsPvmController   parsingController,
PsnPvmMessage::MessageTag    tag
 

receive and process messages from all sites (except the central site), and wait until at least on message of tag tag has arrived

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive
Returns:
a pointer to the process having sent the unblocking message

Definition at line 812 of file PsnSvm.cxx.

References _idToProcessTable, PsController::AllWarnings, map< int, PsnProcess * >::end(), PsnPvmMessage::EndOfSimulation, PsnPvmMessage::EnterBarrier, PsnPvmMessage::ExitBarrier, map< int, PsnProcess * >::find(), pair< T1, T2 >::first, PsnPvmIncomingMessage::getMessageDate(), PsnPvmMessage::InitialValuesForMirror, PsnPvmMessage::LocalInitSuccessfull, PsnPvmMessage::MessageTag, PsnPvmMessage::MirrorNeedsInitialValues, PsnPvmMessage::NameServiceGetId, PsnPvmMessage::NameServiceGetString, PsnPvmMessage::NameServiceReturnId, PsnPvmMessage::NameServiceReturnString, PsnPvmMessage::NameServiceVerifyLocalNameServer, PsnPvmMessage::NameServiceVerifyResult, PsPvmController::parseSynchronisationMessage(), PsnPvmMessage::ProcessTable, pair< T1, T2 >::second, PsPvmController::sendInitialValuesToMirror(), PsnProcess::setDateOfLastMessage(), PsnPvmMessage::SiteName, PsnPvmMessage::SynchronisationMessage, waitForAnyRequests(), and PsController::warning().

Referenced by synchronizeOn().

00813 {
00814    // this implementation assumes that only oine thread at a time will enter this function
00815    // numberOfThread is an unsafe method to test this
00816    static int numberOfThreads = 0 ;
00817    ++numberOfThreads ;
00818    assert ( numberOfThreads == 1) ;
00819 
00820    PsnProcess * result ;
00821 
00822    PsnPvmIncomingMessage receiveBuffer ;
00823    pair<PsnPvmMessage::MessageTag,  int> receivedRequest ;  
00824    
00825    bool serving = true ;
00826    
00827    while ( serving ) 
00828       {
00829 
00830       receivedRequest = waitForAnyRequests ( receiveBuffer ) ;
00831 
00832       //find the sender process (result of this member function)
00833       map<int,PsnProcess *>::iterator i = _idToProcessTable.find ( receivedRequest.second ) ;
00834       
00835       // the message should come from a known process
00836       assert ( i != _idToProcessTable.end() ) ;
00837       
00838       result = i->second ;
00839       
00840       switch (receivedRequest.first)
00841         {
00842         case PsnPvmMessage::SynchronisationMessage :
00843            {
00844               //have the controller process the message
00845               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
00846 
00847               assert ( result != NULL ) ;
00848       
00849               result->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
00850            }
00851         break ;
00852         case PsnPvmMessage::MirrorNeedsInitialValues :
00853            {
00854               parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
00855            }
00856         break ;
00857         case PsnPvmMessage::InitialValuesForMirror :
00858            {
00859               //have the controller process the message
00860               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
00861               
00862            }
00863         break ;
00864         case PsnPvmMessage::LocalInitSuccessfull :
00865         case PsnPvmMessage::ProcessTable :
00866         case PsnPvmMessage::EndOfSimulation :
00867         case PsnPvmMessage::EnterBarrier :
00868         case PsnPvmMessage::SiteName :
00869         case PsnPvmMessage::ExitBarrier :
00870         case PsnPvmMessage::NameServiceGetId :
00871         case PsnPvmMessage::NameServiceGetString :
00872         case PsnPvmMessage::NameServiceReturnId :
00873         case PsnPvmMessage::NameServiceReturnString :
00874         case PsnPvmMessage::NameServiceVerifyLocalNameServer :
00875         case PsnPvmMessage::NameServiceVerifyResult :
00876         default:
00877            {
00878               PsController::warning ("PsnSvm::waitForAnswerToBlockingRequest(): received unexpected message",
00879                                      PsController::AllWarnings ) ;
00880            }
00881         }
00882 
00883         
00884 
00885       serving = ( receivedRequest.first != tag ) ;
00886 
00887       }
00888 
00889    --numberOfThreads ;
00890 
00891    return result ;
00892 }

virtual pair<PsnPvmMessage::MessageTag, int> PsnSvm::waitForAnyRequests PsnPvmIncomingMessage   receiveBuffer [protected, pure virtual]
 

wait for any message sent to this site.

This call is blocking

Parameters:
receiveBuffer : the user buffer to use to receive any message a pair giving the tag of the received message and the siteId of the sender

Implemented in PsnPvmSvm.

Referenced by serveNameRequestsUntilEnd(), and waitForAnswerToBlockingRequest().

void PsnSvm::waitForMessage PsPvmController   parsingController,
const PsnPvmMessage::MessageTag    tag
 

wait until a message has arrived from one site

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive

Definition at line 570 of file PsnSvm.cxx.

References _linkToCentralSite, _processTable, _siteName, map< PsName, PsnProcess * >::begin(), map< PsName, PsnProcess * >::end(), PsnPvmIncomingMessage::hasMessage(), PsnPvmMessage::MessageTag, PsPvmController::parseSynchronisationMessage(), and PsnSvmLink::testForMessage().

00571 {
00572    PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau
00573    PsnSvmLink * canal ; // Canal vers le precedent processus
00574    PsnPvmIncomingMessage * messageRecu = NULL ;
00575    bool rienRecu=true;
00576    while (rienRecu) {//on ne teste pas la nullite du message, 
00577       //car en cas de message ne contenant que la date, on le vide !
00578       for (pPss = _processTable->begin () ; pPss!= _processTable->end () ; pPss ++) {
00579          if (_siteName != (*pPss).second->getProcessName ()) {
00580             // On connait alors le canal a observer
00581             canal = (*pPss).second->getSvmLink () ;
00582 #ifdef _DEBUGPVMMESS
00583             cout << "PsmSvm::waitForMessage sur "
00584                  << (*pPss).second->getProcessName () << " tag : "
00585                  << tag << endl ;
00586             canal->affiche () ;
00587 #endif
00588             // On recoit le message en bloquant
00589             messageRecu = canal->testForMessage (tag) ;
00590             // On envoie le message et l'objet au controleur local
00591             if (messageRecu->hasMessage() ) 
00592                {
00593 #ifdef _DEBUGPVMMESS
00594                cerr << "PsnSvm::waitForMessage : message received" << endl ;
00595 #endif
00596                parsingController.parseSynchronisationMessage (messageRecu) ;
00597                rienRecu=false;
00598             }
00599          }
00600       }
00601       messageRecu = _linkToCentralSite->testForMessage (tag) ;
00602       //cerr << "PsnSvm::waitForMessage : message recu : " << *messageRecu << endl ;
00603       if (messageRecu->hasMessage() ) 
00604          {
00605 #ifdef _DEBUGPVMMESS
00606          cerr << "PsnSvm::waitForMessage : message received" << endl ;
00607 #endif
00608          parsingController.parseSynchronisationMessage (messageRecu) ;
00609          rienRecu=false;
00610       }
00611    }
00612 }

void PsnSvm::waitForMessageFrom PsPvmController   parsingController,
const PsName   processName,
const PsnPvmMessage::MessageTag    tag
 

receive a message from a particular process

Parameters:
parsingController the controller wich will parse the received messages
tag the tag of the messages to receive
processName the name of the process to receive a message from

Definition at line 764 of file PsnSvm.cxx.

References getLinkToProcessNamed(), PsnPvmMessage::MessageTag, PsPvmController::parseSynchronisationMessage(), and PsnSvmLink::waitForMessage().

00767 {
00768    PsnSvmLink * canal ; // Canal vers le precedent processus
00769    PsnPvmIncomingMessage * messageRecu = NULL ;
00770    
00771    // On recupere le canal a observer
00772    canal = getLinkToProcessNamed (nomPss) ; 
00773 #ifdef _DEBUGPVMMESS
00774    cout << "PsmSvm::waitForMessageFrom sur "
00775         << nomPss << " tag : "
00776         << tag << endl ;
00777    canal->affiche () ;
00778 #endif
00779    // On recoit le message en bloquant
00780    messageRecu = & canal->waitForMessage (tag) ;
00781    // On envoie le message et l'objet au controleur local
00782    parsingController.parseSynchronisationMessage (messageRecu) ;
00783    // On libere le message ???
00784    //delete messageRecu;
00785 }


Member Data Documentation

map<int,PsnProcess *> PsnSvm::_idToProcessTable [protected]
 

map storing correspondance between processId and PsnProcess

Definition at line 240 of file PsnSvm.h.

Referenced by connectToDistributedSimulation(), createDistributedSimulation(), and waitForAnswerToBlockingRequest().

PsnSvmLink* PsnSvm::_linkToCentralSite [protected]
 

the link to the central site

Definition at line 248 of file PsnSvm.h.

Referenced by connectToDistributedSimulation(), and waitForMessage().

string PsnSvm::_masterSiteName = "OpenMaskMasterDistributedSite" [static, protected]
 

reserved name of the master distributed site

Definition at line 35 of file PsnSvm.cxx.

Referenced by createDistributedSimulation().

int PsnSvm::_numberOfSlaves [protected]
 

number of slaves ( PvmControllers ) in the simulation

Definition at line 257 of file PsnSvm.h.

Referenced by connectToDistributedSimulation(), syncDistributedSites(), and synchronizeOn().

PsNameToPointerMap<PsnProcess>* PsnSvm::_processTable [protected]
 

map containing all information about the different processes

Definition at line 237 of file PsnSvm.h.

Referenced by broadcast(), connectToDistributedSimulation(), createDistributedSimulation(), disconnectFromDistributedSimulation(), getLinkToProcessNamed(), getProcessDescriptorNamed(), processReceivedMessages(), sendCurrentBuffersWithTag(), serveNameRequestsUntilEnd(), synchroniseReceiveAndProcessMessages(), timestampCurrentSendBuffers(), waitAndProcessMessages(), and waitForMessage().

int PsnSvm::_siteId [protected]
 

workstation identifier in the svm

Definition at line 244 of file PsnSvm.h.

PsName PsnSvm::_siteName [protected]
 

Name of the process associated to this site.

Definition at line 252 of file PsnSvm.h.

Referenced by broadcast(), connectToDistributedSimulation(), createDistributedSimulation(), getSiteName(), processReceivedMessages(), sendCurrentBuffersWithTag(), synchroniseReceiveAndProcessMessages(), timestampCurrentSendBuffers(), waitAndProcessMessages(), and waitForMessage().

string PsnSvm::_slaveGroupName = "spawnedControllers" [static, protected]
 

name of the group all the slave processes join

Definition at line 34 of file PsnSvm.cxx.

Referenced by connectToDistributedSimulation(), and syncDistributedSites().

const PsDate PsnSvm::_synchronisationLatency [protected]
 

in millisecond, the authorized latency when synchronising all the sites

Definition at line 262 of file PsnSvm.h.

Referenced by getSynchronisationLatency(), and synchroniseReceiveAndProcessMessages().


The documentation for this class was generated from the following files:
logo OpenMask

Documentation generated on Mon Nov 25 15:26:30 2002

Generated with doxygen 1.2.12 by Dimitri van Heesch ,   1997-2001