Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

PsnSvm.cxx

Go to the documentation of this file.
00001 /*
00002  * This file is part of openMask © INRIA, CNRS, Universite de Rennes 1 1993-2002, thereinafter the Software
00003  * 
00004  * The Software has been developped within the Siames Project. 
00005  * INRIA, the University of Rennes 1 and CNRS jointly hold intellectual property rights
00006  * 
00007  * The Software has been registered with the Agence pour la Protection des
00008  * Programmes (APP) under registration number IDDN.FR.001.510008.00.S.P.2001.000.41200
00009  *  
00010  * This file may be distributed under the terms of the Q Public License
00011  * version 1.0 as defined by Trolltech AS of Norway and appearing in the file
00012  * LICENSE.QPL included in the packaging of this file.
00013  *
00014  * Licensees holding valid specific licenses issued by INRIA, CNRS or Université de Rennes 1 
00015  * for the software may use this file in accordance with that specific license
00016  *
00017  */
00018 #include <unistd.h>
00019 #include <signal.h>
00020 #include <PsnSvm.h>
00021 #include <PsnSvmLink.h>
00022 #include "PsSynchronisationMessage.h"
00023 #include <PsnPvmMessage.h>
00024 #include <PsnPvmMulticastMessage.h>
00025 #include <PsnPvmUnicastMessage.h>
00026 #include <PsnProcess.h>
00027 #include <PsPvmController.h>
00028 #include "PsPvmNameServer.h"
00029 #include "PsPvmCentralNameServer.h"
00030 #include "stdio.h"
00031 #include "pvm3.h"
00032 #include "PsnPvmSvm.h"
00033 
00034 string PsnSvm::_slaveGroupName = "spawnedControllers";
00035 string PsnSvm::_masterSiteName = "OpenMaskMasterDistributedSite" ;
00036 //-----------------------------------------------------------------------
00037 
00038 PsnSvm::PsnSvm (PsNameToPointerMap<PsnProcess> * tab, const PsDate & latence) : 
00039    _processTable ( tab ),
00040    _synchronisationLatency ( latence ),
00041    _siteId ( -1 ), //negative values indicates an error
00042    _numberOfSlaves ( 0 )
00043 {
00044 }
00045 
00046 
00047 PsnSvm::~PsnSvm () 
00048 {
00049 }
00050 
00051 
00052 const PsDate & PsnSvm::getSynchronisationLatency() 
00053 {
00054   return _synchronisationLatency;
00055 }
00056 
00057 
00058 
00059 void PsnSvm::init (const PsDate & initialSimulationDate) 
00060 {
00061    if ( getParentSiteId () == 0 )  // wasn't created by a spawn
00062      {
00063        createDistributedSimulation ( initialSimulationDate ) ;
00064      }
00065    else
00066      {
00067        connectToDistributedSimulation () ;
00068      }
00069 }
00070 
00071 
00072 //-----------------------------------------------------------------------
00073 
00074 
00075 
00076 // aspects fonctionnement secondaire
00077 //****************************************************
00078 // RK 1302 : Suppression du nom du pss
00079 
00080 void PsnSvm::createDistributedSimulation(const PsDate & initialSimulationDate )
00081 {
00082 #ifdef _DEBUGPVMEXEC
00083    cerr << "PsnSvm::createDistributedSimulation : creating the distributed simulation" << endl ;
00084 #endif
00085    _siteName = _masterSiteName ;
00086 
00088      
00089      PsNameToPointerMap<PsnProcess>::iterator i ;
00090      
00091      for (i = _processTable->begin () ; i != _processTable->end () ; ++i ) 
00092        {
00093          addNewWorkstation ( (*i).second->getHostMachineName () ) ;
00094        }
00095 
00096      
00097      // creation des processus locaux
00098 #ifdef _DEBUGPVMEXEC
00099      cerr << "Creation des processus locaux..................." << endl ;
00100 #endif
00101      PsnProcess * processInfo ;
00102      int adrLoc ;
00103 
00104      vector<int> processSiteIds ;
00105      
00106      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00107        {
00108          processInfo = (*i).second ;
00109          
00110          // spawn a new process on the specified workstation
00111          adrLoc = spawnProcess (processInfo) ;
00112          
00113          // create a SvmLink, add it to processInfo
00114          processInfo->setSvmLink (createSvmLink (adrLoc)) ; 
00115          
00116          _idToProcessTable.insert ( make_pair ( adrLoc,processInfo ) ) ;
00117 
00118          processInfo->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( initialSimulationDate ) ;
00119 
00120          // send the name of the process (in the Svm) to the created process
00121          processInfo->getSvmLink ()->getOutgoingBuffer()<<(*i).first ;
00122          
00123          processInfo->getSvmLink ()->getOutgoingBuffer().send (PsnPvmMessage::SiteName) ;
00124 
00125          //prepare processSiteIds for the next step
00126          processSiteIds.push_back ( processInfo->getSvmLink ()->getTID() ) ;
00127 
00128 #ifdef _DEBUGPVMMESS
00129          cerr<<typeid((*i).first).name()<<endl;
00130          cerr<<(*i).first<<" of id "<<processInfo->getSvmLink ()->getTID()<<endl;
00131 #endif
00132        }
00133      
00134 
00135      // Encodage des attributs Svm de chaque processus
00136      PsnPvmMulticastMessage nameToIdMessage ( processSiteIds ) ;
00137      nameToIdMessage.insertTimeStamp ( initialSimulationDate ) ;
00138      
00139      for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00140        {
00141          nameToIdMessage << *((*i).second) ;
00142        }
00143      
00144      // Transfert des adresses vers chaque controleur local
00145      nameToIdMessage.send ( PsnPvmMessage::ProcessTable ) ;
00146      
00147      serveNameRequestsUntilEnd () ;
00148 
00149 }
00150 
00151 
00152 
00153 void PsnSvm::serveNameRequestsUntilEnd (void) 
00154 {
00155   bool serving = true ;
00156 
00157   //convert the actual name Server to a PvmCentralNameServer
00158   PsPvmCentralNameServer * nameServer = new PsPvmCentralNameServer ( *PsName::getNameServer() ) ;
00159   PsName::setNameServer (nameServer ) ;
00160 
00161 
00162   PsnPvmIncomingMessage receiveBuffer ;
00163   pair<PsnPvmMessage::MessageTag,  int> receivedRequest ;  
00164 
00165   while ( serving )
00166     {
00167 
00168       receivedRequest = waitForAnyRequests ( receiveBuffer ) ;
00169 
00170       switch (receivedRequest.first)
00171         {
00172         case PsnPvmMessage::EndOfSimulation :
00173            {
00174               const PsDate & endDate (receiveBuffer.getMessageDate() );
00175               PsName endedProcess ;
00176               receiveBuffer>>endedProcess ;
00177               cerr<<endedProcess<<" finished at "<<endDate<<endl;
00178               _processTable->erase ( endedProcess ) ;
00179               if ( _processTable->size() == 0 ) 
00180                  {  
00181                     disconnectFromDistributedSimulation ( endDate ) ;
00182                     serving = false ;
00183                  }
00184            }
00185            break;
00186         case PsnPvmMessage::NameServiceGetId :
00187           {
00188             int stringLength ;
00189             receiveBuffer>>stringLength ;
00190             char * stringId = new char[stringLength] ;
00191             receiveBuffer>>stringId ;
00192             assert ( stringId [stringLength - 1] == 0 ) ;
00193             cerr<<"Requested id for string "<<stringId<<" of length "<<stringLength
00194                 <<" from "<< hex << receivedRequest.second << dec;
00195             PsName::idType id = nameServer->getIdentifier( PsString(stringId) ) ;
00196             cerr<<" sending "<<id<<endl;
00197             delete [] stringId ;
00198             pvm_initsend( PsnPvmSvm::pvmDataEncoding ) ;
00199             pvm_pklong (&id, 1, 1);
00200             pvm_send (receivedRequest.second,PsnPvmMessage::NameServiceReturnId) ;  
00201           }
00202           break;
00203         case PsnPvmMessage::NameServiceGetString:
00204           {
00205             PsName::idType id ;
00206             receiveBuffer>>id ;
00207             cerr<<"Requested String for id "<<id
00208                 <<" from "<< hex << receivedRequest.second<< dec ;
00209             const PsString & resultingString ( nameServer->getStringAssociatedTo (id) ) ;
00210             cerr<<" sending "<<resultingString<<endl;
00211             pvm_initsend( PsnPvmSvm::pvmDataEncoding ) ;
00212             pvm_pkstr (const_cast<char *> (resultingString.getCString() ) ) ;
00213             pvm_send (receivedRequest.second,PsnPvmMessage::NameServiceReturnString) ;  
00214           }
00215           break;
00216         case PsnPvmMessage::NameServiceVerifyLocalNameServer:
00217           {
00218             nameServer->verifyCompatibilityWithLocalNameServer (receivedRequest.second, receiveBuffer) ;
00219           }
00220           break;
00221         default:
00222           //an unexpected request is received
00223           switch (PsController::warningLevel)
00224             {
00225             case PsController::AllWarnings :
00226               cerr<<"PsnSvm::serveNameRequestsUntilEnd() : unexpected request "<<receivedRequest.first<<endl;
00227               break ;
00228             case PsController::FatalWarnings :
00229               cerr<<"PsnSvm::serveNameRequestsUntilEnd() : unexpected request"<<receivedRequest.first<<endl;
00230               exit (2) ;              
00231               break;
00232             default://nothing to do
00233               break;
00234             }
00235           break;
00236         }
00237     }
00238 }
00239 
00240 
00241 
00242 void PsnSvm::connectToDistributedSimulation () 
00243 {
00244 #ifdef _DEBUGPVMEXEC
00245   cerr<<"PsnSvm::connectToDistributedSimulation (): connecting to the distributed simulation"<<endl;
00246 #endif
00247 
00248    PsnPvmIncomingMessage * np = NULL ;
00249 
00250    // connect to the global controller
00251    _linkToCentralSite = createSvmLink ( getParentSiteId () ) ;
00252 
00253    // join the group of slave processes
00254    joinSvmGroup (_slaveGroupName) ;
00255 
00256 
00257    // create a distributed version of the nameServer
00258    PsPvmNameServer * pvmNameServer = new PsPvmNameServer ( getParentSiteId() , *PsName::getNameServer() ) ; 
00259    PsName::setNameServer ( pvmNameServer ) ; 
00260 
00261    // find out the name of this site 
00262    PsDate initialDate ;
00263    np = & _linkToCentralSite->waitForMessage (PsnPvmMessage::SiteName) ;
00264    *np >> _siteName ;
00265 
00266    // find out the adresses of the other sites 
00267    // - extract the timestamp of the communication
00268    _linkToCentralSite->waitForMessage (PsnPvmMessage::ProcessTable) ;
00269 
00270    // - extract, for each process, it's adress in the virtual machine and create a link
00271    PsNameToPointerMap<PsnProcess>::iterator i ;
00272    for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00273      {
00274        ++_numberOfSlaves ; //we add ourselves, but as _numberOfSlaves is initialised to 0, it's ok
00275 
00276       // Remarque : creer un canal sur soi-meme n'est pas genant,
00277       // on met juste a jour les tids dans la table des pss
00278 
00279       PsnProcess * p = (*i).second ;
00280       // creation d'un canal sans destinataire 
00281       p->setSvmLink (createSvmLink ()) ;
00282       // maj du destinataire
00283       *np  >> *p ;
00284 
00285       _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ;
00286    }
00287 
00288    //wait for all other sites to be connected to the distributed simulation.
00289    //avoids sending messages to sites still connected or worse, not yet connected
00290    syncDistributedSites () ;
00291 }
00292 
00293 //-----------------------------------------------------------------------
00294 
00295 void PsnSvm::disconnectFromDistributedSimulation (const PsDate & endOfSimulationDate ) 
00296 {
00297 #ifdef _DEBUGPVMEXEC
00298    cout <<"PsnSvm::disconnectFromDistributedSimulation"<< endl ;
00299 #endif
00300    int recepient = getParentSiteId () ;
00301 
00302    if ( recepient != 0 )
00303       // a local PvmController has finished. Notify the central controller
00304       {
00305          PsnPvmUnicastMessage endMessage (recepient) ;
00306          
00307          endMessage.insertTimeStamp ( endOfSimulationDate ) ;
00308  
00309          endMessage << getSiteName () ;
00310 
00311          endMessage.send ( PsnPvmMessage::EndOfSimulation ) ;
00312       }
00313    else
00314       //the central controller is deconnecting. Notify all local controllers
00315       {
00316          vector<int> processSiteIds ;
00317          for (PsNameToPointerMap<PsnProcess>::iterator i = _processTable->begin () ; 
00318               i != _processTable-> end () ; 
00319               i ++) 
00320             {
00321                processSiteIds.push_back ( (i->second)->getSvmLink ()->getTID() ) ;
00322             }
00323          PsnPvmMulticastMessage endOfSimulationMessage ( processSiteIds ) ;          
00324 
00325          endOfSimulationMessage.insertTimeStamp ( endOfSimulationDate ) ;
00326 
00327          endOfSimulationMessage << getSiteName () ;
00328 
00329          endOfSimulationMessage.send ( PsnPvmMessage::EndOfSimulation ) ;
00330       }
00331 }
00332 
00333 
00334 //-----------------------------------------------------------------------
00335 
00336 
00337 void PsnSvm::broadcast (PsnPvmMessage::MessageTag tag, PsnPvmMulticastMessage * mess)
00338 {
00339   //the first implementation, commented, dies in pvm with an unexpected error
00340   //I haven't had time to trace the problem, so the second implementation is used
00341 
00342   /* first implementation
00343   initBeforeMessagePacking () ;
00344   
00345   if ( mess != NULL )
00346     {
00347       mess->packMessage() ;
00348     }
00349   else
00350     {
00351       mess = new PsnPvmOutgoingMessage () ;
00352       mess->insertTimeStamp(0) ;
00353       *mess << "dummy";
00354       mess->packMessage() ;
00355     }
00356   
00357   broadcastToGroup ( slaveGroup, tag ) ;
00358   */
00359 
00360 #ifdef _DEBUGPVMMESS
00361   cerr<<"PsnSvm::broadcast ("<<tag<<","<<mess<<") ; "<<endl;
00362 #endif
00363 
00364   PsNameToPointerMap<PsnProcess>::iterator processIter ; 
00365   PsnSvmLink * canal ; // Canal vers le precedent processus
00366   
00367   if (mess == NULL )
00368     {
00369       vector<int> distantRecepients ;
00370       for (processIter = _processTable->begin () ; processIter != _processTable->end () ; ++processIter) 
00371         {
00372           if (_siteName != (*processIter).second->getProcessName ()) 
00373             {
00374               // On connait alors le canal a observer
00375               distantRecepients.push_back ( (*processIter).second->getSvmLink ()->getTID () ) ;
00376             }
00377         }
00378 
00380       mess = new PsnPvmMulticastMessage ( distantRecepients ) ;
00381       mess->insertTimeStamp(0) ;
00382     }
00383 
00384   mess -> send ( tag ) ;
00385 
00386 }
00387 
00388 
00389 
00390 void PsnSvm::synchronizeOn ( PsPvmController & parsingController, PsnPvmMessage::MessageTag tag )
00391 {
00392   broadcast ( tag ) ;
00393 #ifdef _DEBUGPVMSYNCHRO
00394   cerr<< "PsnSvm::waitForAllProcesses("<<tag<<") "<<_numberOfSlaves <<endl;
00395   usleep(1000000) ;
00396 #endif
00397 
00398   int numberOfReadyProcesses = 1 ; //this process has entered the barrier
00399   
00400   while ( numberOfReadyProcesses != _numberOfSlaves )
00401      {
00402 
00403         // here, we ignore the results, because we suppose one process will only send tag once during synchronization
00404         waitForAnswerToBlockingRequest ( parsingController, tag ) ;
00405 
00406         numberOfReadyProcesses++ ;
00407 
00408 #ifdef _DEBUGPVMMESS
00409   cerr<< "PsnSvm::exitSoftBarrier numberToEnterBarrier: "<<numberOfReadyProcesses<<endl;
00410   usleep(1000000) ;
00411 #endif
00412 
00413      }
00414 }
00415 
00416 
00417 void PsnSvm::syncDistributedSites () 
00418 {
00419    groupBarrier ( _slaveGroupName, _numberOfSlaves ) ;
00420 }
00421 
00422 //-----------------------------------------------------------------------
00423 
00424 PsnSvmLink * PsnSvm::getLinkToProcessNamed (const PsName & nomPss) 
00425 {
00426    // On recupere le pss associe a l'objet
00427    if (_processTable->find (nomPss) != _processTable->end ()) 
00428       {
00429          // On a bien trouve le processus
00430          PsnProcess * pss = _processTable->getObjectOfIndex (nomPss) ;
00431          return (pss->getSvmLink ()) ;
00432       } 
00433    else {
00434       // On n'a pas le processus demande
00435 #ifdef _USESSTREAM
00436       ostringstream o ;
00437       o << "PsnSvm::getLinkToProcessNamed : Impossible de trouver le processus "
00438         << nomPss << " afin de recuperer son canal !" ;
00439       PsController::warning (o.str(), PsController::SomeWarnings ) ;
00440 #else
00441       ostrstream o ;
00442       o << "PsnSvm::getLinkToProcessNamed : Impossible de trouver le processus "
00443         << nomPss << " afin de recuperer son canal !" ;
00444       o.put ('\0') ;
00445       PsController::warning (o.str(), PsController::SomeWarnings ) ;
00446       delete o.str() ;
00447 #endif
00448       return NULL ;
00449    }
00450 }
00451 
00452 //-----------------------------------------------------------------------
00453 
00454 PsnProcess * PsnSvm::getProcessDescriptorNamed (const PsName & nomPss) 
00455 {
00456    // On recupere le pss associe a l'objet
00457    if (_processTable->find (nomPss) != _processTable->end ()) 
00458       {
00459          // On a bien trouve le processus
00460          PsnProcess * pss = _processTable->getObjectOfIndex (nomPss) ;
00461          return (pss) ;
00462       } 
00463    else 
00464       {
00465          // On n'a pas le processus demande
00466 #ifdef _USESSTREAM
00467          ostringstream o  ;
00468          o << "PsnSvm::getProcessDescriptorNamed: process " << nomPss << " unknown !" ;
00469          PsController::warning (o.str(), PsController::SomeWarnings ) ;
00470 #else
00471          ostrstream o  ;
00472          o << "PsnSvm::getProcessDescriptorNamed: process " << nomPss << " unknown !" ;
00473          o.put('\0') ;
00474          PsController::warning (o.str(), PsController::SomeWarnings ) ;
00475          delete o.str() ;
00476 #endif
00477          return NULL ;
00478       }
00479 }
00480 
00481 
00482 //-----------------------------------------------------------------------
00483 
00484 void PsnSvm::processReceivedMessages (PsPvmController & parsingController, const PsnPvmMessage::MessageTag tag) 
00485 {
00486 #ifdef _DEBUGPVMMESS
00487    cerr << "PsnSvm::processReceivedMessages"<<endl;
00488 #endif
00489    PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau
00490    PsnSvmLink * canal ; // Canal vers le precedent processus
00491    PsnPvmIncomingMessage * messageRecu = NULL ;
00492    for (pPss = _processTable->begin () ; 
00493         pPss != _processTable->end () ; 
00494         pPss ++) 
00495       {
00496          if (_siteName != (*pPss).second->getProcessName ()) 
00497             {
00498                // On connait alors le canal a observer
00499                canal = (*pPss).second->getSvmLink () ;
00500 #ifdef _DEBUGPVMMESS
00501                cout << "PsnSvm::processReceivedMessages from "
00502                     << (*pPss).second->getProcessName () << " tag : "
00503                     << tag << endl ;
00504                //canal->affiche () ;
00505 #endif
00506                // On fait une reception non bloquante
00507                messageRecu = canal->testForMessage (tag) ;
00508                while ( messageRecu->hasMessage () ) 
00509                   {
00510 #ifdef _DEBUGPVMMESS
00511                      cerr << "PsnSvm::processReceivedMessages : message received of size " <<messageRecu->getSize()<< endl;
00512 #endif        
00513 #ifdef _DEBUGPVMMESS
00514                      cerr << "PsnSvm::processReceivedMessages: extracted emission date: " <<messageREcu->getMessageDate()<< endl;
00515 #endif        
00516                      //cerr << "PsnSvm::processReceivedMessages : date " << dateEmission << endl ;
00517                      // On envoie le message et l'objet au controleur local
00518                      parsingController.parseSynchronisationMessage (messageRecu) ;
00519                      messageRecu = canal->testForMessage (tag) ;
00520                   }
00521             }
00522       }
00523 }
00524 
00525 //-----------------------------------------------------------------------
00526 
00527 void PsnSvm::waitAndProcessMessages (PsPvmController & parsingController, 
00528                                      const PsnPvmMessage::MessageTag tag) 
00529 {
00530 #ifdef _DEBUGPVMMESS
00531    cerr<<"PsnSvm::waitAndProcessMessages tag "<<tag<<endl<<"Processus : "<<endl;
00532    PsNameToPointerMap<PsnProcess>::iterator pPs ;
00533    for (pPs = _processTable->begin () ; pPs != _processTable->end () ; pPs ++) {
00534       cerr<<(*pPs).second->getProcessName ();
00535    }
00536    cerr<<endl;
00537 #endif
00538    PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau
00539    PsnSvmLink * canal ; // Canal vers le precedent processus
00540    PsnPvmIncomingMessage * messageRecu ;
00541 #ifdef _DEBUGPVMMESS
00542    cerr<<"PsnSvm::waitAndProcessMessages écoute de tous les processus"<<endl;
00543 #endif
00544    for (pPss = _processTable->begin () ; pPss!= _processTable->end () ; pPss ++) {
00545 #ifdef _DEBUGPVMMESS
00546       cerr<<"PsnSvm::waitAndProcessMessages écoute d'un de plus : "<<(*pPss).second->getProcessName ()<<endl;
00547 #endif
00548       if (_siteName != (*pPss).second->getProcessName ()) {
00549          // On connait alors le canal a observer
00550          canal = (*pPss).second->getSvmLink () ;
00551 #ifdef _DEBUGPVMMESS
00552          cout << "PsmSvm::waitAndProcessMessages sur "
00553               << (*pPss).second->getProcessName () << " tag : "
00554               << tag << endl ;
00555          canal->affiche () ;
00556 #endif
00557          // On recoit le message en bloquant
00558          messageRecu = & canal->waitForMessage (tag) ;
00559 #ifdef _DEBUGPVMMESS
00560          cerr << "PsnSvm::waitAndProcessMessages : message received"<< endl ;
00561 #endif
00562          // On envoie le message et l'objet au controleur local
00563          parsingController.parseSynchronisationMessage (messageRecu) ;
00564       }
00565    }
00566 }
00567 
00568 //-----------------------------------------------------------------------
00569 
00570 void PsnSvm::waitForMessage (PsPvmController & parsingController, const PsnPvmMessage::MessageTag tag) 
00571 {
00572    PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau
00573    PsnSvmLink * canal ; // Canal vers le precedent processus
00574    PsnPvmIncomingMessage * messageRecu = NULL ;
00575    bool rienRecu=true;
00576    while (rienRecu) {//on ne teste pas la nullite du message, 
00577       //car en cas de message ne contenant que la date, on le vide !
00578       for (pPss = _processTable->begin () ; pPss!= _processTable->end () ; pPss ++) {
00579          if (_siteName != (*pPss).second->getProcessName ()) {
00580             // On connait alors le canal a observer
00581             canal = (*pPss).second->getSvmLink () ;
00582 #ifdef _DEBUGPVMMESS
00583             cout << "PsmSvm::waitForMessage sur "
00584                  << (*pPss).second->getProcessName () << " tag : "
00585                  << tag << endl ;
00586             canal->affiche () ;
00587 #endif
00588             // On recoit le message en bloquant
00589             messageRecu = canal->testForMessage (tag) ;
00590             // On envoie le message et l'objet au controleur local
00591             if (messageRecu->hasMessage() ) 
00592                {
00593 #ifdef _DEBUGPVMMESS
00594                cerr << "PsnSvm::waitForMessage : message received" << endl ;
00595 #endif
00596                parsingController.parseSynchronisationMessage (messageRecu) ;
00597                rienRecu=false;
00598             }
00599          }
00600       }
00601       messageRecu = _linkToCentralSite->testForMessage (tag) ;
00602       //cerr << "PsnSvm::waitForMessage : message recu : " << *messageRecu << endl ;
00603       if (messageRecu->hasMessage() ) 
00604          {
00605 #ifdef _DEBUGPVMMESS
00606          cerr << "PsnSvm::waitForMessage : message received" << endl ;
00607 #endif
00608          parsingController.parseSynchronisationMessage (messageRecu) ;
00609          rienRecu=false;
00610       }
00611    }
00612 }
00613 
00614 //-----------------------------------------------------------------------
00615 
00616 void PsnSvm::synchroniseReceiveAndProcessMessages (PsPvmController & parsingController, const PsnPvmMessage::MessageTag tag) 
00617 {
00618    PsNameToPointerMap<PsnProcess> aRecevoir ;
00619 
00620    //First make a list of processes that we need to receive values from
00621    for (   PsNameToPointerMap<PsnProcess>::iterator pPssE = _processTable->begin () ; 
00622            pPssE != _processTable->end () ; 
00623            pPssE ++) 
00624       {
00625          if ( (pPssE->first != _siteName) &&
00626               ( pPssE->second->getDateOfLastMessage () +
00627                 pPssE->second->getPeriod () +
00628                 _synchronisationLatency < parsingController.getSimulatedDate () ) ) 
00629             {
00630                aRecevoir.addObjectWithIndex (pPssE->first, pPssE->second) ;
00631 #ifdef _DEBUGPVMMESS
00632                cerr << "PsnSvm::synchroniseReceiveAndProcessMessages: trying reception from: "
00633                     << pPssE->first << endl ;
00634 #endif
00635             }
00636       }
00637 
00638    
00639    PsnPvmIncomingMessage receiveBuffer ;
00640    PsnPvmMessage::MessageTag receivedRequest ;  
00641    PsnProcess * receivingProcess ;
00642    
00643    while (! aRecevoir.empty () ) 
00644       { 
00645 
00646          for ( PsNameToPointerMap<PsnProcess>::iterator processIterator = _processTable->begin () ;
00647                processIterator != _processTable->end () ;
00648                ++processIterator 
00649                )
00650             {
00651                //this new implementation doesn't do blocking receives, as some processes could send urgent requests
00652                receivingProcess = processIterator->second ;
00653 
00654                receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ;
00655                
00656                if ( receiveBuffer.hasMessage() )
00657                   {
00658                      
00659                      switch (receivedRequest)
00660                         {
00661                         case PsnPvmMessage::SynchronisationMessage :
00662                            {
00663                               //have the controller process the message
00664                               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
00665                               
00666                               receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
00667                               
00668                               //an additionnal test could be performed here to make sure correct date of last message has happened
00669                               aRecevoir.erase ( receivingProcess->getProcessName() ) ;
00670                            }
00671                         break ;
00672                         case PsnPvmMessage::MirrorNeedsInitialValues :
00673                            {
00674                               parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
00675                            }
00676                         break ;
00677                         case PsnPvmMessage::InitialValuesForMirror :
00678                            {
00679                               //have the controller process the message
00680                               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
00681                               
00682                            }
00683                         break ;
00684                         case PsnPvmMessage::LocalInitSuccessfull :
00685                         case PsnPvmMessage::ProcessTable :
00686                         case PsnPvmMessage::EndOfSimulation :
00687                         case PsnPvmMessage::EnterBarrier :
00688                         case PsnPvmMessage::SiteName :
00689                         case PsnPvmMessage::ExitBarrier :
00690                         case PsnPvmMessage::NameServiceGetId :
00691                         case PsnPvmMessage::NameServiceGetString :
00692                         case PsnPvmMessage::NameServiceReturnId :
00693                         case PsnPvmMessage::NameServiceReturnString :
00694                         case PsnPvmMessage::NameServiceVerifyLocalNameServer :
00695                         case PsnPvmMessage::NameServiceVerifyResult :
00696                         default:
00697                            {
00698                               PsController::warning ("PsnSvm::waitForAnswerToBlockingRequest(): received unexpected message",
00699                                                      PsController::AllWarnings ) ;
00700                            }
00701                         }
00702                   }
00703             }
00704       }
00705          
00706    
00707 #ifdef _DEBUGPVMMESS
00708    cerr << "PsnSvm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ;
00709 #endif
00710 }
00711 
00712 
00713 //-----------------------------------------------------------------------
00714 
00715 void PsnSvm::sendCurrentBuffersWithTag(const PsnPvmMessage::MessageTag tag) 
00716 {
00717    PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau
00718    PsnSvmLink * canal ; // Canal vers le precedent processus
00719 
00720 #ifdef _DEBUGPVMMESS
00721    cerr<<"PsnSvm::sendCurrentBufferWithTag tag "<<tag<<endl<<"Processus : "<<endl;
00722    PsNameToPointerMap<PsnProcess>::iterator pPs ;
00723    for (pPs = _processTable->begin () ; pPs != _processTable->end () ; pPs ++) 
00724      {
00725        cerr<<(*pPs).second->getProcessName ();
00726      }
00727    cerr<<endl;
00728 #endif
00729 
00730    for (pPss = _processTable->begin () ; pPss != _processTable->end () ; pPss ++) 
00731      {
00732 #ifdef _DEBUGPVMMESS
00733        cerr<<"Traitement des emissions vers "<<(*pPss).second->getProcessName ()<<endl;
00734 #endif
00735        if (_siteName != (*pPss).second->getProcessName ()) 
00736          {
00737            // On connait alors le canal a observer
00738            canal = (*pPss).second->getSvmLink () ;
00739 
00740            if ( tag == PsnPvmMessage::SynchronisationMessage ) 
00741              {
00742                 (*pPss).second->getSvmLink ()->getOutgoingBuffer()<<PsSynchronisationMessage::endOfSynchronisationFragment ;
00743              }
00744 
00745            // On transmet le message du canal
00746            canal->sendOutgoingBuffer ( tag ) ;
00747 #ifdef _DEBUGPVMMESS
00748            cerr << "PsmSvm::transmission avec tag : "
00749                 << tag << endl ;
00750 #endif
00751          }
00752 #ifdef _DEBUGPVMMESS
00753        else 
00754          {
00755            cerr << "Rien ŕ faire "<<endl ;
00756            //canal->affiche () ;
00757          }
00758 #endif
00759      }
00760 }
00761 
00762 //-----------------------------------------------------------------------
00763 
00764 void PsnSvm::waitForMessageFrom (PsPvmController & parsingController,
00765                                  const PsName & nomPss,
00766                                  const PsnPvmMessage::MessageTag tag) 
00767 {
00768    PsnSvmLink * canal ; // Canal vers le precedent processus
00769    PsnPvmIncomingMessage * messageRecu = NULL ;
00770    
00771    // On recupere le canal a observer
00772    canal = getLinkToProcessNamed (nomPss) ; 
00773 #ifdef _DEBUGPVMMESS
00774    cout << "PsmSvm::waitForMessageFrom sur "
00775         << nomPss << " tag : "
00776         << tag << endl ;
00777    canal->affiche () ;
00778 #endif
00779    // On recoit le message en bloquant
00780    messageRecu = & canal->waitForMessage (tag) ;
00781    // On envoie le message et l'objet au controleur local
00782    parsingController.parseSynchronisationMessage (messageRecu) ;
00783    // On libere le message ???
00784    //delete messageRecu;
00785 }
00786 
00787 
00788 //-----------------------------------------------------------------------
00790 
00791 void PsnSvm::timestampCurrentSendBuffers (const PsDate & date) 
00792 {
00793    PsnPvmOutgoingMessage * message ;
00794    PsNameToPointerMap<PsnProcess>::iterator i ;
00795    for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 
00796      {
00797         if (_siteName != (*i).second->getProcessName ())
00798            { 
00799               i->second->getSvmLink ()->getOutgoingBuffer ().insertTimeStamp ( date ) ;
00800            }
00801      }
00802 }
00803 
00804 
00805 const PsName & PsnSvm::getSiteName() const
00806 {
00807    return _siteName;
00808 }
00809 
00810 //-----------------------------------------------------------------------
00811 
00812 PsnProcess * PsnSvm::waitForAnswerToBlockingRequest (PsPvmController & parsingController, PsnPvmMessage::MessageTag tag)
00813 {
00814    // this implementation assumes that only oine thread at a time will enter this function
00815    // numberOfThread is an unsafe method to test this
00816    static int numberOfThreads = 0 ;
00817    ++numberOfThreads ;
00818    assert ( numberOfThreads == 1) ;
00819 
00820    PsnProcess * result ;
00821 
00822    PsnPvmIncomingMessage receiveBuffer ;
00823    pair<PsnPvmMessage::MessageTag,  int> receivedRequest ;  
00824    
00825    bool serving = true ;
00826    
00827    while ( serving ) 
00828       {
00829 
00830       receivedRequest = waitForAnyRequests ( receiveBuffer ) ;
00831 
00832       //find the sender process (result of this member function)
00833       map<int,PsnProcess *>::iterator i = _idToProcessTable.find ( receivedRequest.second ) ;
00834       
00835       // the message should come from a known process
00836       assert ( i != _idToProcessTable.end() ) ;
00837       
00838       result = i->second ;
00839       
00840       switch (receivedRequest.first)
00841         {
00842         case PsnPvmMessage::SynchronisationMessage :
00843            {
00844               //have the controller process the message
00845               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
00846 
00847               assert ( result != NULL ) ;
00848       
00849               result->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ;
00850            }
00851         break ;
00852         case PsnPvmMessage::MirrorNeedsInitialValues :
00853            {
00854               parsingController.sendInitialValuesToMirror ( receiveBuffer ) ;      
00855            }
00856         break ;
00857         case PsnPvmMessage::InitialValuesForMirror :
00858            {
00859               //have the controller process the message
00860               parsingController.parseSynchronisationMessage ( &receiveBuffer ) ;
00861               
00862            }
00863         break ;
00864         case PsnPvmMessage::LocalInitSuccessfull :
00865         case PsnPvmMessage::ProcessTable :
00866         case PsnPvmMessage::EndOfSimulation :
00867         case PsnPvmMessage::EnterBarrier :
00868         case PsnPvmMessage::SiteName :
00869         case PsnPvmMessage::ExitBarrier :
00870         case PsnPvmMessage::NameServiceGetId :
00871         case PsnPvmMessage::NameServiceGetString :
00872         case PsnPvmMessage::NameServiceReturnId :
00873         case PsnPvmMessage::NameServiceReturnString :
00874         case PsnPvmMessage::NameServiceVerifyLocalNameServer :
00875         case PsnPvmMessage::NameServiceVerifyResult :
00876         default:
00877            {
00878               PsController::warning ("PsnSvm::waitForAnswerToBlockingRequest(): received unexpected message",
00879                                      PsController::AllWarnings ) ;
00880            }
00881         }
00882 
00883         
00884 
00885       serving = ( receivedRequest.first != tag ) ;
00886 
00887       }
00888 
00889    --numberOfThreads ;
00890 
00891    return result ;
00892 }
00893 
00894 
00895 
00896 

logo OpenMask

Documentation generated on Mon Nov 25 15:25:01 2002

Generated with doxygen 1.2.12 by Dimitri van Heesch ,   1997-2001