00001 /* 00002 * This file is part of openMask © INRIA, CNRS, Universite de Rennes 1 1993-2002, thereinafter the Software 00003 * 00004 * The Software has been developped within the Siames Project. 00005 * INRIA, the University of Rennes 1 and CNRS jointly hold intellectual property rights 00006 * 00007 * The Software has been registered with the Agence pour la Protection des 00008 * Programmes (APP) under registration number IDDN.FR.001.510008.00.S.P.2001.000.41200 00009 * 00010 * This file may be distributed under the terms of the Q Public License 00011 * version 1.0 as defined by Trolltech AS of Norway and appearing in the file 00012 * LICENSE.QPL included in the packaging of this file. 00013 * 00014 * Licensees holding valid specific licenses issued by INRIA, CNRS or Université de Rennes 1 00015 * for the software may use this file in accordance with that specific license 00016 * 00017 */ 00018 #include <unistd.h> 00019 #include <signal.h> 00020 #include <PsnSvm.h> 00021 #include <PsnSvmLink.h> 00022 #include "PsSynchronisationMessage.h" 00023 #include <PsnPvmMessage.h> 00024 #include <PsnPvmMulticastMessage.h> 00025 #include <PsnPvmUnicastMessage.h> 00026 #include <PsnProcess.h> 00027 #include <PsPvmController.h> 00028 #include "PsPvmNameServer.h" 00029 #include "PsPvmCentralNameServer.h" 00030 #include "stdio.h" 00031 #include "pvm3.h" 00032 #include "PsnPvmSvm.h" 00033 00034 string PsnSvm::_slaveGroupName = "spawnedControllers"; 00035 string PsnSvm::_masterSiteName = "OpenMaskMasterDistributedSite" ; 00036 //----------------------------------------------------------------------- 00037 00038 PsnSvm::PsnSvm (PsNameToPointerMap<PsnProcess> * tab, const PsDate & latence) : 00039 _processTable ( tab ), 00040 _synchronisationLatency ( latence ), 00041 _siteId ( -1 ), //negative values indicates an error 00042 _numberOfSlaves ( 0 ) 00043 { 00044 } 00045 00046 00047 PsnSvm::~PsnSvm () 00048 { 00049 } 00050 00051 00052 const PsDate & PsnSvm::getSynchronisationLatency() 00053 { 00054 return _synchronisationLatency; 00055 } 00056 00057 00058 00059 void PsnSvm::init (const PsDate & initialSimulationDate) 00060 { 00061 if ( getParentSiteId () == 0 ) // wasn't created by a spawn 00062 { 00063 createDistributedSimulation ( initialSimulationDate ) ; 00064 } 00065 else 00066 { 00067 connectToDistributedSimulation () ; 00068 } 00069 } 00070 00071 00072 //----------------------------------------------------------------------- 00073 00074 00075 00076 // aspects fonctionnement secondaire 00077 //**************************************************** 00078 // RK 1302 : Suppression du nom du pss 00079 00080 void PsnSvm::createDistributedSimulation(const PsDate & initialSimulationDate ) 00081 { 00082 #ifdef _DEBUGPVMEXEC 00083 cerr << "PsnSvm::createDistributedSimulation : creating the distributed simulation" << endl ; 00084 #endif 00085 _siteName = _masterSiteName ; 00086 00088 00089 PsNameToPointerMap<PsnProcess>::iterator i ; 00090 00091 for (i = _processTable->begin () ; i != _processTable->end () ; ++i ) 00092 { 00093 addNewWorkstation ( (*i).second->getHostMachineName () ) ; 00094 } 00095 00096 00097 // creation des processus locaux 00098 #ifdef _DEBUGPVMEXEC 00099 cerr << "Creation des processus locaux..................." << endl ; 00100 #endif 00101 PsnProcess * processInfo ; 00102 int adrLoc ; 00103 00104 vector<int> processSiteIds ; 00105 00106 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00107 { 00108 processInfo = (*i).second ; 00109 00110 // spawn a new process on the specified workstation 00111 adrLoc = spawnProcess (processInfo) ; 00112 00113 // create a SvmLink, add it to processInfo 00114 processInfo->setSvmLink (createSvmLink (adrLoc)) ; 00115 00116 _idToProcessTable.insert ( make_pair ( adrLoc,processInfo ) ) ; 00117 00118 processInfo->getSvmLink ()->getOutgoingBuffer().insertTimeStamp ( initialSimulationDate ) ; 00119 00120 // send the name of the process (in the Svm) to the created process 00121 processInfo->getSvmLink ()->getOutgoingBuffer()<<(*i).first ; 00122 00123 processInfo->getSvmLink ()->getOutgoingBuffer().send (PsnPvmMessage::SiteName) ; 00124 00125 //prepare processSiteIds for the next step 00126 processSiteIds.push_back ( processInfo->getSvmLink ()->getTID() ) ; 00127 00128 #ifdef _DEBUGPVMMESS 00129 cerr<<typeid((*i).first).name()<<endl; 00130 cerr<<(*i).first<<" of id "<<processInfo->getSvmLink ()->getTID()<<endl; 00131 #endif 00132 } 00133 00134 00135 // Encodage des attributs Svm de chaque processus 00136 PsnPvmMulticastMessage nameToIdMessage ( processSiteIds ) ; 00137 nameToIdMessage.insertTimeStamp ( initialSimulationDate ) ; 00138 00139 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00140 { 00141 nameToIdMessage << *((*i).second) ; 00142 } 00143 00144 // Transfert des adresses vers chaque controleur local 00145 nameToIdMessage.send ( PsnPvmMessage::ProcessTable ) ; 00146 00147 serveNameRequestsUntilEnd () ; 00148 00149 } 00150 00151 00152 00153 void PsnSvm::serveNameRequestsUntilEnd (void) 00154 { 00155 bool serving = true ; 00156 00157 //convert the actual name Server to a PvmCentralNameServer 00158 PsPvmCentralNameServer * nameServer = new PsPvmCentralNameServer ( *PsName::getNameServer() ) ; 00159 PsName::setNameServer (nameServer ) ; 00160 00161 00162 PsnPvmIncomingMessage receiveBuffer ; 00163 pair<PsnPvmMessage::MessageTag, int> receivedRequest ; 00164 00165 while ( serving ) 00166 { 00167 00168 receivedRequest = waitForAnyRequests ( receiveBuffer ) ; 00169 00170 switch (receivedRequest.first) 00171 { 00172 case PsnPvmMessage::EndOfSimulation : 00173 { 00174 const PsDate & endDate (receiveBuffer.getMessageDate() ); 00175 PsName endedProcess ; 00176 receiveBuffer>>endedProcess ; 00177 cerr<<endedProcess<<" finished at "<<endDate<<endl; 00178 _processTable->erase ( endedProcess ) ; 00179 if ( _processTable->size() == 0 ) 00180 { 00181 disconnectFromDistributedSimulation ( endDate ) ; 00182 serving = false ; 00183 } 00184 } 00185 break; 00186 case PsnPvmMessage::NameServiceGetId : 00187 { 00188 int stringLength ; 00189 receiveBuffer>>stringLength ; 00190 char * stringId = new char[stringLength] ; 00191 receiveBuffer>>stringId ; 00192 assert ( stringId [stringLength - 1] == 0 ) ; 00193 cerr<<"Requested id for string "<<stringId<<" of length "<<stringLength 00194 <<" from "<< hex << receivedRequest.second << dec; 00195 PsName::idType id = nameServer->getIdentifier( PsString(stringId) ) ; 00196 cerr<<" sending "<<id<<endl; 00197 delete [] stringId ; 00198 pvm_initsend( PsnPvmSvm::pvmDataEncoding ) ; 00199 pvm_pklong (&id, 1, 1); 00200 pvm_send (receivedRequest.second,PsnPvmMessage::NameServiceReturnId) ; 00201 } 00202 break; 00203 case PsnPvmMessage::NameServiceGetString: 00204 { 00205 PsName::idType id ; 00206 receiveBuffer>>id ; 00207 cerr<<"Requested String for id "<<id 00208 <<" from "<< hex << receivedRequest.second<< dec ; 00209 const PsString & resultingString ( nameServer->getStringAssociatedTo (id) ) ; 00210 cerr<<" sending "<<resultingString<<endl; 00211 pvm_initsend( PsnPvmSvm::pvmDataEncoding ) ; 00212 pvm_pkstr (const_cast<char *> (resultingString.getCString() ) ) ; 00213 pvm_send (receivedRequest.second,PsnPvmMessage::NameServiceReturnString) ; 00214 } 00215 break; 00216 case PsnPvmMessage::NameServiceVerifyLocalNameServer: 00217 { 00218 nameServer->verifyCompatibilityWithLocalNameServer (receivedRequest.second, receiveBuffer) ; 00219 } 00220 break; 00221 default: 00222 //an unexpected request is received 00223 switch (PsController::warningLevel) 00224 { 00225 case PsController::AllWarnings : 00226 cerr<<"PsnSvm::serveNameRequestsUntilEnd() : unexpected request "<<receivedRequest.first<<endl; 00227 break ; 00228 case PsController::FatalWarnings : 00229 cerr<<"PsnSvm::serveNameRequestsUntilEnd() : unexpected request"<<receivedRequest.first<<endl; 00230 exit (2) ; 00231 break; 00232 default://nothing to do 00233 break; 00234 } 00235 break; 00236 } 00237 } 00238 } 00239 00240 00241 00242 void PsnSvm::connectToDistributedSimulation () 00243 { 00244 #ifdef _DEBUGPVMEXEC 00245 cerr<<"PsnSvm::connectToDistributedSimulation (): connecting to the distributed simulation"<<endl; 00246 #endif 00247 00248 PsnPvmIncomingMessage * np = NULL ; 00249 00250 // connect to the global controller 00251 _linkToCentralSite = createSvmLink ( getParentSiteId () ) ; 00252 00253 // join the group of slave processes 00254 joinSvmGroup (_slaveGroupName) ; 00255 00256 00257 // create a distributed version of the nameServer 00258 PsPvmNameServer * pvmNameServer = new PsPvmNameServer ( getParentSiteId() , *PsName::getNameServer() ) ; 00259 PsName::setNameServer ( pvmNameServer ) ; 00260 00261 // find out the name of this site 00262 PsDate initialDate ; 00263 np = & _linkToCentralSite->waitForMessage (PsnPvmMessage::SiteName) ; 00264 *np >> _siteName ; 00265 00266 // find out the adresses of the other sites 00267 // - extract the timestamp of the communication 00268 _linkToCentralSite->waitForMessage (PsnPvmMessage::ProcessTable) ; 00269 00270 // - extract, for each process, it's adress in the virtual machine and create a link 00271 PsNameToPointerMap<PsnProcess>::iterator i ; 00272 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00273 { 00274 ++_numberOfSlaves ; //we add ourselves, but as _numberOfSlaves is initialised to 0, it's ok 00275 00276 // Remarque : creer un canal sur soi-meme n'est pas genant, 00277 // on met juste a jour les tids dans la table des pss 00278 00279 PsnProcess * p = (*i).second ; 00280 // creation d'un canal sans destinataire 00281 p->setSvmLink (createSvmLink ()) ; 00282 // maj du destinataire 00283 *np >> *p ; 00284 00285 _idToProcessTable.insert ( make_pair ( p->getSvmLink()->getTID(),p ) ) ; 00286 } 00287 00288 //wait for all other sites to be connected to the distributed simulation. 00289 //avoids sending messages to sites still connected or worse, not yet connected 00290 syncDistributedSites () ; 00291 } 00292 00293 //----------------------------------------------------------------------- 00294 00295 void PsnSvm::disconnectFromDistributedSimulation (const PsDate & endOfSimulationDate ) 00296 { 00297 #ifdef _DEBUGPVMEXEC 00298 cout <<"PsnSvm::disconnectFromDistributedSimulation"<< endl ; 00299 #endif 00300 int recepient = getParentSiteId () ; 00301 00302 if ( recepient != 0 ) 00303 // a local PvmController has finished. Notify the central controller 00304 { 00305 PsnPvmUnicastMessage endMessage (recepient) ; 00306 00307 endMessage.insertTimeStamp ( endOfSimulationDate ) ; 00308 00309 endMessage << getSiteName () ; 00310 00311 endMessage.send ( PsnPvmMessage::EndOfSimulation ) ; 00312 } 00313 else 00314 //the central controller is deconnecting. Notify all local controllers 00315 { 00316 vector<int> processSiteIds ; 00317 for (PsNameToPointerMap<PsnProcess>::iterator i = _processTable->begin () ; 00318 i != _processTable-> end () ; 00319 i ++) 00320 { 00321 processSiteIds.push_back ( (i->second)->getSvmLink ()->getTID() ) ; 00322 } 00323 PsnPvmMulticastMessage endOfSimulationMessage ( processSiteIds ) ; 00324 00325 endOfSimulationMessage.insertTimeStamp ( endOfSimulationDate ) ; 00326 00327 endOfSimulationMessage << getSiteName () ; 00328 00329 endOfSimulationMessage.send ( PsnPvmMessage::EndOfSimulation ) ; 00330 } 00331 } 00332 00333 00334 //----------------------------------------------------------------------- 00335 00336 00337 void PsnSvm::broadcast (PsnPvmMessage::MessageTag tag, PsnPvmMulticastMessage * mess) 00338 { 00339 //the first implementation, commented, dies in pvm with an unexpected error 00340 //I haven't had time to trace the problem, so the second implementation is used 00341 00342 /* first implementation 00343 initBeforeMessagePacking () ; 00344 00345 if ( mess != NULL ) 00346 { 00347 mess->packMessage() ; 00348 } 00349 else 00350 { 00351 mess = new PsnPvmOutgoingMessage () ; 00352 mess->insertTimeStamp(0) ; 00353 *mess << "dummy"; 00354 mess->packMessage() ; 00355 } 00356 00357 broadcastToGroup ( slaveGroup, tag ) ; 00358 */ 00359 00360 #ifdef _DEBUGPVMMESS 00361 cerr<<"PsnSvm::broadcast ("<<tag<<","<<mess<<") ; "<<endl; 00362 #endif 00363 00364 PsNameToPointerMap<PsnProcess>::iterator processIter ; 00365 PsnSvmLink * canal ; // Canal vers le precedent processus 00366 00367 if (mess == NULL ) 00368 { 00369 vector<int> distantRecepients ; 00370 for (processIter = _processTable->begin () ; processIter != _processTable->end () ; ++processIter) 00371 { 00372 if (_siteName != (*processIter).second->getProcessName ()) 00373 { 00374 // On connait alors le canal a observer 00375 distantRecepients.push_back ( (*processIter).second->getSvmLink ()->getTID () ) ; 00376 } 00377 } 00378 00380 mess = new PsnPvmMulticastMessage ( distantRecepients ) ; 00381 mess->insertTimeStamp(0) ; 00382 } 00383 00384 mess -> send ( tag ) ; 00385 00386 } 00387 00388 00389 00390 void PsnSvm::synchronizeOn ( PsPvmController & parsingController, PsnPvmMessage::MessageTag tag ) 00391 { 00392 broadcast ( tag ) ; 00393 #ifdef _DEBUGPVMSYNCHRO 00394 cerr<< "PsnSvm::waitForAllProcesses("<<tag<<") "<<_numberOfSlaves <<endl; 00395 usleep(1000000) ; 00396 #endif 00397 00398 int numberOfReadyProcesses = 1 ; //this process has entered the barrier 00399 00400 while ( numberOfReadyProcesses != _numberOfSlaves ) 00401 { 00402 00403 // here, we ignore the results, because we suppose one process will only send tag once during synchronization 00404 waitForAnswerToBlockingRequest ( parsingController, tag ) ; 00405 00406 numberOfReadyProcesses++ ; 00407 00408 #ifdef _DEBUGPVMMESS 00409 cerr<< "PsnSvm::exitSoftBarrier numberToEnterBarrier: "<<numberOfReadyProcesses<<endl; 00410 usleep(1000000) ; 00411 #endif 00412 00413 } 00414 } 00415 00416 00417 void PsnSvm::syncDistributedSites () 00418 { 00419 groupBarrier ( _slaveGroupName, _numberOfSlaves ) ; 00420 } 00421 00422 //----------------------------------------------------------------------- 00423 00424 PsnSvmLink * PsnSvm::getLinkToProcessNamed (const PsName & nomPss) 00425 { 00426 // On recupere le pss associe a l'objet 00427 if (_processTable->find (nomPss) != _processTable->end ()) 00428 { 00429 // On a bien trouve le processus 00430 PsnProcess * pss = _processTable->getObjectOfIndex (nomPss) ; 00431 return (pss->getSvmLink ()) ; 00432 } 00433 else { 00434 // On n'a pas le processus demande 00435 #ifdef _USESSTREAM 00436 ostringstream o ; 00437 o << "PsnSvm::getLinkToProcessNamed : Impossible de trouver le processus " 00438 << nomPss << " afin de recuperer son canal !" ; 00439 PsController::warning (o.str(), PsController::SomeWarnings ) ; 00440 #else 00441 ostrstream o ; 00442 o << "PsnSvm::getLinkToProcessNamed : Impossible de trouver le processus " 00443 << nomPss << " afin de recuperer son canal !" ; 00444 o.put ('\0') ; 00445 PsController::warning (o.str(), PsController::SomeWarnings ) ; 00446 delete o.str() ; 00447 #endif 00448 return NULL ; 00449 } 00450 } 00451 00452 //----------------------------------------------------------------------- 00453 00454 PsnProcess * PsnSvm::getProcessDescriptorNamed (const PsName & nomPss) 00455 { 00456 // On recupere le pss associe a l'objet 00457 if (_processTable->find (nomPss) != _processTable->end ()) 00458 { 00459 // On a bien trouve le processus 00460 PsnProcess * pss = _processTable->getObjectOfIndex (nomPss) ; 00461 return (pss) ; 00462 } 00463 else 00464 { 00465 // On n'a pas le processus demande 00466 #ifdef _USESSTREAM 00467 ostringstream o ; 00468 o << "PsnSvm::getProcessDescriptorNamed: process " << nomPss << " unknown !" ; 00469 PsController::warning (o.str(), PsController::SomeWarnings ) ; 00470 #else 00471 ostrstream o ; 00472 o << "PsnSvm::getProcessDescriptorNamed: process " << nomPss << " unknown !" ; 00473 o.put('\0') ; 00474 PsController::warning (o.str(), PsController::SomeWarnings ) ; 00475 delete o.str() ; 00476 #endif 00477 return NULL ; 00478 } 00479 } 00480 00481 00482 //----------------------------------------------------------------------- 00483 00484 void PsnSvm::processReceivedMessages (PsPvmController & parsingController, const PsnPvmMessage::MessageTag tag) 00485 { 00486 #ifdef _DEBUGPVMMESS 00487 cerr << "PsnSvm::processReceivedMessages"<<endl; 00488 #endif 00489 PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau 00490 PsnSvmLink * canal ; // Canal vers le precedent processus 00491 PsnPvmIncomingMessage * messageRecu = NULL ; 00492 for (pPss = _processTable->begin () ; 00493 pPss != _processTable->end () ; 00494 pPss ++) 00495 { 00496 if (_siteName != (*pPss).second->getProcessName ()) 00497 { 00498 // On connait alors le canal a observer 00499 canal = (*pPss).second->getSvmLink () ; 00500 #ifdef _DEBUGPVMMESS 00501 cout << "PsnSvm::processReceivedMessages from " 00502 << (*pPss).second->getProcessName () << " tag : " 00503 << tag << endl ; 00504 //canal->affiche () ; 00505 #endif 00506 // On fait une reception non bloquante 00507 messageRecu = canal->testForMessage (tag) ; 00508 while ( messageRecu->hasMessage () ) 00509 { 00510 #ifdef _DEBUGPVMMESS 00511 cerr << "PsnSvm::processReceivedMessages : message received of size " <<messageRecu->getSize()<< endl; 00512 #endif 00513 #ifdef _DEBUGPVMMESS 00514 cerr << "PsnSvm::processReceivedMessages: extracted emission date: " <<messageREcu->getMessageDate()<< endl; 00515 #endif 00516 //cerr << "PsnSvm::processReceivedMessages : date " << dateEmission << endl ; 00517 // On envoie le message et l'objet au controleur local 00518 parsingController.parseSynchronisationMessage (messageRecu) ; 00519 messageRecu = canal->testForMessage (tag) ; 00520 } 00521 } 00522 } 00523 } 00524 00525 //----------------------------------------------------------------------- 00526 00527 void PsnSvm::waitAndProcessMessages (PsPvmController & parsingController, 00528 const PsnPvmMessage::MessageTag tag) 00529 { 00530 #ifdef _DEBUGPVMMESS 00531 cerr<<"PsnSvm::waitAndProcessMessages tag "<<tag<<endl<<"Processus : "<<endl; 00532 PsNameToPointerMap<PsnProcess>::iterator pPs ; 00533 for (pPs = _processTable->begin () ; pPs != _processTable->end () ; pPs ++) { 00534 cerr<<(*pPs).second->getProcessName (); 00535 } 00536 cerr<<endl; 00537 #endif 00538 PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau 00539 PsnSvmLink * canal ; // Canal vers le precedent processus 00540 PsnPvmIncomingMessage * messageRecu ; 00541 #ifdef _DEBUGPVMMESS 00542 cerr<<"PsnSvm::waitAndProcessMessages écoute de tous les processus"<<endl; 00543 #endif 00544 for (pPss = _processTable->begin () ; pPss!= _processTable->end () ; pPss ++) { 00545 #ifdef _DEBUGPVMMESS 00546 cerr<<"PsnSvm::waitAndProcessMessages écoute d'un de plus : "<<(*pPss).second->getProcessName ()<<endl; 00547 #endif 00548 if (_siteName != (*pPss).second->getProcessName ()) { 00549 // On connait alors le canal a observer 00550 canal = (*pPss).second->getSvmLink () ; 00551 #ifdef _DEBUGPVMMESS 00552 cout << "PsmSvm::waitAndProcessMessages sur " 00553 << (*pPss).second->getProcessName () << " tag : " 00554 << tag << endl ; 00555 canal->affiche () ; 00556 #endif 00557 // On recoit le message en bloquant 00558 messageRecu = & canal->waitForMessage (tag) ; 00559 #ifdef _DEBUGPVMMESS 00560 cerr << "PsnSvm::waitAndProcessMessages : message received"<< endl ; 00561 #endif 00562 // On envoie le message et l'objet au controleur local 00563 parsingController.parseSynchronisationMessage (messageRecu) ; 00564 } 00565 } 00566 } 00567 00568 //----------------------------------------------------------------------- 00569 00570 void PsnSvm::waitForMessage (PsPvmController & parsingController, const PsnPvmMessage::MessageTag tag) 00571 { 00572 PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau 00573 PsnSvmLink * canal ; // Canal vers le precedent processus 00574 PsnPvmIncomingMessage * messageRecu = NULL ; 00575 bool rienRecu=true; 00576 while (rienRecu) {//on ne teste pas la nullite du message, 00577 //car en cas de message ne contenant que la date, on le vide ! 00578 for (pPss = _processTable->begin () ; pPss!= _processTable->end () ; pPss ++) { 00579 if (_siteName != (*pPss).second->getProcessName ()) { 00580 // On connait alors le canal a observer 00581 canal = (*pPss).second->getSvmLink () ; 00582 #ifdef _DEBUGPVMMESS 00583 cout << "PsmSvm::waitForMessage sur " 00584 << (*pPss).second->getProcessName () << " tag : " 00585 << tag << endl ; 00586 canal->affiche () ; 00587 #endif 00588 // On recoit le message en bloquant 00589 messageRecu = canal->testForMessage (tag) ; 00590 // On envoie le message et l'objet au controleur local 00591 if (messageRecu->hasMessage() ) 00592 { 00593 #ifdef _DEBUGPVMMESS 00594 cerr << "PsnSvm::waitForMessage : message received" << endl ; 00595 #endif 00596 parsingController.parseSynchronisationMessage (messageRecu) ; 00597 rienRecu=false; 00598 } 00599 } 00600 } 00601 messageRecu = _linkToCentralSite->testForMessage (tag) ; 00602 //cerr << "PsnSvm::waitForMessage : message recu : " << *messageRecu << endl ; 00603 if (messageRecu->hasMessage() ) 00604 { 00605 #ifdef _DEBUGPVMMESS 00606 cerr << "PsnSvm::waitForMessage : message received" << endl ; 00607 #endif 00608 parsingController.parseSynchronisationMessage (messageRecu) ; 00609 rienRecu=false; 00610 } 00611 } 00612 } 00613 00614 //----------------------------------------------------------------------- 00615 00616 void PsnSvm::synchroniseReceiveAndProcessMessages (PsPvmController & parsingController, const PsnPvmMessage::MessageTag tag) 00617 { 00618 PsNameToPointerMap<PsnProcess> aRecevoir ; 00619 00620 //First make a list of processes that we need to receive values from 00621 for ( PsNameToPointerMap<PsnProcess>::iterator pPssE = _processTable->begin () ; 00622 pPssE != _processTable->end () ; 00623 pPssE ++) 00624 { 00625 if ( (pPssE->first != _siteName) && 00626 ( pPssE->second->getDateOfLastMessage () + 00627 pPssE->second->getPeriod () + 00628 _synchronisationLatency < parsingController.getSimulatedDate () ) ) 00629 { 00630 aRecevoir.addObjectWithIndex (pPssE->first, pPssE->second) ; 00631 #ifdef _DEBUGPVMMESS 00632 cerr << "PsnSvm::synchroniseReceiveAndProcessMessages: trying reception from: " 00633 << pPssE->first << endl ; 00634 #endif 00635 } 00636 } 00637 00638 00639 PsnPvmIncomingMessage receiveBuffer ; 00640 PsnPvmMessage::MessageTag receivedRequest ; 00641 PsnProcess * receivingProcess ; 00642 00643 while (! aRecevoir.empty () ) 00644 { 00645 00646 for ( PsNameToPointerMap<PsnProcess>::iterator processIterator = _processTable->begin () ; 00647 processIterator != _processTable->end () ; 00648 ++processIterator 00649 ) 00650 { 00651 //this new implementation doesn't do blocking receives, as some processes could send urgent requests 00652 receivingProcess = processIterator->second ; 00653 00654 receivedRequest = receivingProcess->getSvmLink ()->testForAnyMessage ( receiveBuffer ) ; 00655 00656 if ( receiveBuffer.hasMessage() ) 00657 { 00658 00659 switch (receivedRequest) 00660 { 00661 case PsnPvmMessage::SynchronisationMessage : 00662 { 00663 //have the controller process the message 00664 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 00665 00666 receivingProcess->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 00667 00668 //an additionnal test could be performed here to make sure correct date of last message has happened 00669 aRecevoir.erase ( receivingProcess->getProcessName() ) ; 00670 } 00671 break ; 00672 case PsnPvmMessage::MirrorNeedsInitialValues : 00673 { 00674 parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 00675 } 00676 break ; 00677 case PsnPvmMessage::InitialValuesForMirror : 00678 { 00679 //have the controller process the message 00680 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 00681 00682 } 00683 break ; 00684 case PsnPvmMessage::LocalInitSuccessfull : 00685 case PsnPvmMessage::ProcessTable : 00686 case PsnPvmMessage::EndOfSimulation : 00687 case PsnPvmMessage::EnterBarrier : 00688 case PsnPvmMessage::SiteName : 00689 case PsnPvmMessage::ExitBarrier : 00690 case PsnPvmMessage::NameServiceGetId : 00691 case PsnPvmMessage::NameServiceGetString : 00692 case PsnPvmMessage::NameServiceReturnId : 00693 case PsnPvmMessage::NameServiceReturnString : 00694 case PsnPvmMessage::NameServiceVerifyLocalNameServer : 00695 case PsnPvmMessage::NameServiceVerifyResult : 00696 default: 00697 { 00698 PsController::warning ("PsnSvm::waitForAnswerToBlockingRequest(): received unexpected message", 00699 PsController::AllWarnings ) ; 00700 } 00701 } 00702 } 00703 } 00704 } 00705 00706 00707 #ifdef _DEBUGPVMMESS 00708 cerr << "PsnSvm::synchroniseReceiveAndProcessMessages : on n'est pas bloque la " << endl ; 00709 #endif 00710 } 00711 00712 00713 //----------------------------------------------------------------------- 00714 00715 void PsnSvm::sendCurrentBuffersWithTag(const PsnPvmMessage::MessageTag tag) 00716 { 00717 PsNameToPointerMap<PsnProcess>::iterator pPss ; // Iterateur pour parcours tableau 00718 PsnSvmLink * canal ; // Canal vers le precedent processus 00719 00720 #ifdef _DEBUGPVMMESS 00721 cerr<<"PsnSvm::sendCurrentBufferWithTag tag "<<tag<<endl<<"Processus : "<<endl; 00722 PsNameToPointerMap<PsnProcess>::iterator pPs ; 00723 for (pPs = _processTable->begin () ; pPs != _processTable->end () ; pPs ++) 00724 { 00725 cerr<<(*pPs).second->getProcessName (); 00726 } 00727 cerr<<endl; 00728 #endif 00729 00730 for (pPss = _processTable->begin () ; pPss != _processTable->end () ; pPss ++) 00731 { 00732 #ifdef _DEBUGPVMMESS 00733 cerr<<"Traitement des emissions vers "<<(*pPss).second->getProcessName ()<<endl; 00734 #endif 00735 if (_siteName != (*pPss).second->getProcessName ()) 00736 { 00737 // On connait alors le canal a observer 00738 canal = (*pPss).second->getSvmLink () ; 00739 00740 if ( tag == PsnPvmMessage::SynchronisationMessage ) 00741 { 00742 (*pPss).second->getSvmLink ()->getOutgoingBuffer()<<PsSynchronisationMessage::endOfSynchronisationFragment ; 00743 } 00744 00745 // On transmet le message du canal 00746 canal->sendOutgoingBuffer ( tag ) ; 00747 #ifdef _DEBUGPVMMESS 00748 cerr << "PsmSvm::transmission avec tag : " 00749 << tag << endl ; 00750 #endif 00751 } 00752 #ifdef _DEBUGPVMMESS 00753 else 00754 { 00755 cerr << "Rien ŕ faire "<<endl ; 00756 //canal->affiche () ; 00757 } 00758 #endif 00759 } 00760 } 00761 00762 //----------------------------------------------------------------------- 00763 00764 void PsnSvm::waitForMessageFrom (PsPvmController & parsingController, 00765 const PsName & nomPss, 00766 const PsnPvmMessage::MessageTag tag) 00767 { 00768 PsnSvmLink * canal ; // Canal vers le precedent processus 00769 PsnPvmIncomingMessage * messageRecu = NULL ; 00770 00771 // On recupere le canal a observer 00772 canal = getLinkToProcessNamed (nomPss) ; 00773 #ifdef _DEBUGPVMMESS 00774 cout << "PsmSvm::waitForMessageFrom sur " 00775 << nomPss << " tag : " 00776 << tag << endl ; 00777 canal->affiche () ; 00778 #endif 00779 // On recoit le message en bloquant 00780 messageRecu = & canal->waitForMessage (tag) ; 00781 // On envoie le message et l'objet au controleur local 00782 parsingController.parseSynchronisationMessage (messageRecu) ; 00783 // On libere le message ??? 00784 //delete messageRecu; 00785 } 00786 00787 00788 //----------------------------------------------------------------------- 00790 00791 void PsnSvm::timestampCurrentSendBuffers (const PsDate & date) 00792 { 00793 PsnPvmOutgoingMessage * message ; 00794 PsNameToPointerMap<PsnProcess>::iterator i ; 00795 for (i = _processTable->begin () ; i != _processTable->end () ; i ++) 00796 { 00797 if (_siteName != (*i).second->getProcessName ()) 00798 { 00799 i->second->getSvmLink ()->getOutgoingBuffer ().insertTimeStamp ( date ) ; 00800 } 00801 } 00802 } 00803 00804 00805 const PsName & PsnSvm::getSiteName() const 00806 { 00807 return _siteName; 00808 } 00809 00810 //----------------------------------------------------------------------- 00811 00812 PsnProcess * PsnSvm::waitForAnswerToBlockingRequest (PsPvmController & parsingController, PsnPvmMessage::MessageTag tag) 00813 { 00814 // this implementation assumes that only oine thread at a time will enter this function 00815 // numberOfThread is an unsafe method to test this 00816 static int numberOfThreads = 0 ; 00817 ++numberOfThreads ; 00818 assert ( numberOfThreads == 1) ; 00819 00820 PsnProcess * result ; 00821 00822 PsnPvmIncomingMessage receiveBuffer ; 00823 pair<PsnPvmMessage::MessageTag, int> receivedRequest ; 00824 00825 bool serving = true ; 00826 00827 while ( serving ) 00828 { 00829 00830 receivedRequest = waitForAnyRequests ( receiveBuffer ) ; 00831 00832 //find the sender process (result of this member function) 00833 map<int,PsnProcess *>::iterator i = _idToProcessTable.find ( receivedRequest.second ) ; 00834 00835 // the message should come from a known process 00836 assert ( i != _idToProcessTable.end() ) ; 00837 00838 result = i->second ; 00839 00840 switch (receivedRequest.first) 00841 { 00842 case PsnPvmMessage::SynchronisationMessage : 00843 { 00844 //have the controller process the message 00845 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 00846 00847 assert ( result != NULL ) ; 00848 00849 result->setDateOfLastMessage ( receiveBuffer.getMessageDate () ) ; 00850 } 00851 break ; 00852 case PsnPvmMessage::MirrorNeedsInitialValues : 00853 { 00854 parsingController.sendInitialValuesToMirror ( receiveBuffer ) ; 00855 } 00856 break ; 00857 case PsnPvmMessage::InitialValuesForMirror : 00858 { 00859 //have the controller process the message 00860 parsingController.parseSynchronisationMessage ( &receiveBuffer ) ; 00861 00862 } 00863 break ; 00864 case PsnPvmMessage::LocalInitSuccessfull : 00865 case PsnPvmMessage::ProcessTable : 00866 case PsnPvmMessage::EndOfSimulation : 00867 case PsnPvmMessage::EnterBarrier : 00868 case PsnPvmMessage::SiteName : 00869 case PsnPvmMessage::ExitBarrier : 00870 case PsnPvmMessage::NameServiceGetId : 00871 case PsnPvmMessage::NameServiceGetString : 00872 case PsnPvmMessage::NameServiceReturnId : 00873 case PsnPvmMessage::NameServiceReturnString : 00874 case PsnPvmMessage::NameServiceVerifyLocalNameServer : 00875 case PsnPvmMessage::NameServiceVerifyResult : 00876 default: 00877 { 00878 PsController::warning ("PsnSvm::waitForAnswerToBlockingRequest(): received unexpected message", 00879 PsController::AllWarnings ) ; 00880 } 00881 } 00882 00883 00884 00885 serving = ( receivedRequest.first != tag ) ; 00886 00887 } 00888 00889 --numberOfThreads ; 00890 00891 return result ; 00892 } 00893 00894 00895 00896
| Documentation generated on Mon Nov 25 15:25:01 2002 |
Generated with doxygen 1.2.12 by Dimitri van Heesch , 1997-2001 |