CommunicationGateway.cpp

Go to the documentation of this file.
00001 /*
00002 Copyright 2007, 2008, 2009, 2010, 2011 Instituto de Sistemas e Robotica, Instituto Superior Tecnico
00003 
00004 This file is part of MeRMaID.
00005 
00006 MeRMaID is free software: you can redistribute it and/or modify
00007 it under the terms of the GNU Lesser General Public License as published by
00008 the Free Software Foundation, either version 3 of the License, or
00009 (at your option) any later version.
00010 
00011 MeRMaID is distributed in the hope that it will be useful,
00012 but WITHOUT ANY WARRANTY; without even the implied warranty of
00013 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014 GNU Lesser General Public License for more details.
00015 
00016 You should have received a copy of the GNU Lesser General Public License
00017 along with MeRMaID.  If not, see <http://www.gnu.org/licenses/>.
00018 */
00019 
00020 
00021 
00022 /**
00023  * @Filename CommunicationGateway.cpp
00024  * @Description CommunicationGateway implementation
00025  * @Status Work in Progress
00026  * @Version $Id: CommunicationGateway.cpp 1 2011-03-04 18:13:18Z jreis $
00027  * @Maintainer
00028  */
00029 
00030 
00031 #include "config.h"
00032 
00033 #include <CommunicationGateway.hpp>
00034 #include <DataFactory.hpp>
00035 #include <DataFeedDescription.hpp>
00036 #include <Exception.hpp>
00037 #include <Entity.hpp>
00038 #include <EntityDescription.hpp>
00039 #include <ServiceInterfaceDescription.hpp>
00040 #include <ServiceInstanceDescription.hpp>
00041 #include <ServiceTypeDescription.hpp>
00042 
00043 
00044 #include <iostream>
00045 
00046 #include <sstream>
00047 
00048 // YARP stuff
00049 #include <yarp/os/all.h>
00050 
00051 
00052 // YARP-specific communication stuff
00053 #include <YarpDataFeedReporter.hpp>
00054 #include <YarpServiceAsynchInputPort.hpp>
00055 #include <YarpServiceAsynchRequestPort.hpp>
00056 #include <YarpServiceAsynchRequestType.hpp>
00057 
00058 using namespace mermaid::support::communication;
00059 
00060 using mermaid::support::data::DataFactory;
00061 using mermaid::support::errorhandling::Exception;
00062 using mermaid::support::service::DataFeedDescription;
00063 using mermaid::support::service::Entity;
00064 using mermaid::support::service::EntityDescription;
00065 using mermaid::support::service::ServiceInterfaceDescription;
00066 using mermaid::support::service::ServiceInstanceDescription;
00067 using mermaid::support::service::ServiceTypeDescription;
00068 using std::string;
00069 using yarp::os::Network;
00070 using yarp::os::Port;
00071 
00072 CommunicationGateway::CommunicationGateway()
00073 {
00074 
00075 #ifdef USE_COMM
00076   executingSetupTask_ = false;
00077 #endif
00078   
00079 }; // CommunicationGateway()
00080 
00081 CommunicationGateway::~CommunicationGateway()
00082 {
00083   //std::cerr << "CommunicationGateway::~CommunicationGateway()" << std::endl;
00084 }; // ~CommCommunicationGateway()
00085 
00086 void CommunicationGateway::registerService (shared_ptr<Service> s)
00087 {
00088   //std::cerr << "CommunicationGateway::registerService()" << std::endl;
00089   
00090   shared_ptr<ServiceInstanceDescription> serviceInstanceDescription = s->getServiceInstanceDescription();
00091   std::string serviceInstanceName = serviceInstanceDescription->getServiceInstanceName();
00092   std::string entityName = s->getEntity()->getEntityDescription()->getEntityName();
00093   
00094   //create input port for service
00095   shared_ptr<YarpServiceAsynchInputPort> servicePort (new YarpServiceAsynchInputPort (s));
00096   
00097   //put it in the input port map
00098   serviceAsynchInputPorts_[serviceInstanceName] = servicePort;
00099   
00100   //go through all service types
00101   std::vector<shared_ptr<ServiceTypeDescription> > serviceTypes = s->getServiceInstanceDescription()->getServiceTypeDescriptions();
00102   
00103   std::vector<shared_ptr<ServiceTypeDescription> >::iterator typesIterator;
00104   
00105   for (typesIterator = serviceTypes.begin(); typesIterator != serviceTypes.end(); typesIterator++) {
00106     shared_ptr<ServiceTypeDescription> serviceType = *typesIterator;
00107     
00108     //create data feed ports for service
00109     std::vector< shared_ptr<DataFeedDescription> > dataFeedDescriptions = serviceType->getAllDataFeedDescriptions();
00110     
00111     std::vector< shared_ptr<DataFeedDescription> >::iterator dataFeedIt;
00112     
00113     for (dataFeedIt = dataFeedDescriptions.begin(); dataFeedIt != dataFeedDescriptions.end(); dataFeedIt++) {
00114       shared_ptr<DataFeedDescription> dataFeedDescription = *dataFeedIt;
00115       std::string dataFeedName = dataFeedDescription->getDataFeedName();
00116       std::cerr << "CommunicationGateway::registerService : adding DataFeed port for DataFeed with dataFeedName=\"" << dataFeedName << "\" to Service with entityName=\"" << entityName << "\", serviceInstanceName=\"" << serviceInstanceName << "\"" << std::endl;
00117       
00118       shared_ptr<YarpDataFeedOutputPort> datafeedOutputPort (new YarpDataFeedOutputPort (entityName, serviceInstanceName, dataFeedName));
00119       
00120       addYarpDataFeedOutputPort (datafeedOutputPort);
00121     }
00122   }
00123 };
00124 
00125 void CommunicationGateway::sendServiceRequest (shared_ptr<Service> requesterService, shared_ptr<ServiceRequest> request)
00126 {
00127   //std::cerr << "CommunicationGateway::sendServiceRequest()" << std::endl;
00128   
00129   std::string targetEntityName = request->getTargetEntityName();
00130   std::string targetServiceName = request->getTargetServiceName();
00131   
00132   // get port
00133   shared_ptr<YarpServiceAsynchRequestPort> requestPort = getYarpServiceAsynchRequestPort (requesterService, request);
00134   
00135   //register request
00136   registerServiceRequest (request);
00137   
00138   //send request
00139   YarpServiceAsynchRequestType& ysart = requestPort->prepare();
00140   ysart.convertFromServiceRequest (*request);
00141   requestPort->writeStrict();
00142   request->notifySent();
00143   
00144 }; // sendServiceRequest()
00145 
00146 
00147 void CommunicationGateway::sendServiceReply (shared_ptr<Service> targetService, std::string requesterEntityName, std::string requesterServiceName, shared_ptr<ServiceReply> sr)
00148 {
00149   //std::cerr << "CommunicationGateway::sendServiceReply()" << std::endl;
00150   
00151   //get port
00152   shared_ptr<YarpServiceAsynchOutputPort> outputPort = getYarpServiceAsynchOutputPort (targetService, requesterEntityName, requesterServiceName);
00153   
00154   //send reply
00155   YarpServiceAsynchReplyType& ysart = outputPort->prepare();
00156   ysart.convertFromServiceReply (*sr);
00157   outputPort->writeStrict();
00158   
00159   
00160 }; // sendServiceAsynchReply()
00161 
00162 
00163 shared_ptr<YarpServiceAsynchRequestPort> CommunicationGateway::getYarpServiceAsynchRequestPort (shared_ptr<Service> requesterService, shared_ptr<ServiceRequest> request)
00164 {
00165   std::string targetEntityName = request->getTargetEntityName();
00166   std::string targetServiceName = request->getTargetServiceName();
00167   std::string requesterEntityName = requesterService->getServiceInstanceDescription()->getEntityDescription()->getEntityName();
00168   std::string requesterServiceName = requesterService->getServiceInstanceDescription()->getServiceInstanceName();
00169   //! @TODO port map should depend on the targetEntityName also.
00170   std::pair<std::string, std::string> requesterPair (requesterEntityName, requesterServiceName);
00171   std::pair<std::string, std::string> targetPair (targetEntityName, targetServiceName);
00172   shared_ptr<YarpServiceAsynchRequestPort> requestPort = serviceAsynchRequestPorts_[requesterPair][targetPair];
00173   
00174   if (requestPort == false) {
00175     //we need to create the ports
00176     
00177     //create request port
00178     requestPort = shared_ptr<YarpServiceAsynchRequestPort> (new YarpServiceAsynchRequestPort (requesterService, request));
00179     std::map<std::pair<std::string, std::string>, shared_ptr<YarpServiceAsynchRequestPort> > requesterRequestPorts = serviceAsynchRequestPorts_[requesterPair];
00180     
00181     requesterRequestPorts[targetPair] = requestPort;
00182     serviceAsynchRequestPorts_[requesterPair] = requesterRequestPorts;
00183     
00184     //lets create the reply port also
00185     shared_ptr<YarpServiceAsynchReplyPort> replyPort (new YarpServiceAsynchReplyPort (requesterService, targetEntityName, targetServiceName));
00186     
00187     std::map<std::pair<std::string, std::string>, shared_ptr<YarpServiceAsynchReplyPort> > requesterReplyPorts = serviceAsynchReplyPorts_[requesterPair];
00188     
00189     requesterReplyPorts[targetPair] = replyPort;
00190     serviceAsynchReplyPorts_[requesterPair] = requesterReplyPorts;
00191     
00192   }
00193   
00194   return requestPort;
00195   
00196 }; // getYarpServiceAsynchRequestPort()
00197 
00198 
00199 shared_ptr<YarpServiceAsynchReplyPort> CommunicationGateway::getYarpServiceAsynchReplyPort (std::string targetEntityName, std::string targetServiceName, std::string requesterEntityName, std::string requesterServiceName)
00200 {
00201   std::pair<std::string, std::string> requesterPair (requesterEntityName, requesterServiceName);
00202   std::pair<std::string, std::string> targetPair (targetEntityName, targetServiceName);
00203   return serviceAsynchReplyPorts_[requesterPair][targetPair];
00204 }; // getYarpServiceAsynchReplyPort
00205 
00206 
00207 shared_ptr<YarpServiceAsynchOutputPort> CommunicationGateway::getYarpServiceAsynchOutputPort (shared_ptr<Service> targetService, std::string requesterEntityName, std::string requesterServiceName)
00208 {
00209   std::string targetEntityName = targetService->getServiceInstanceDescription()->getEntityDescription()->getEntityName();
00210   std::string targetServiceName = targetService->getServiceInstanceDescription()->getServiceInstanceName();
00211   std::pair<std::string, std::string> requesterPair (requesterEntityName, requesterServiceName);
00212   std::pair<std::string, std::string> targetPair (targetEntityName, targetServiceName);
00213   
00214   shared_ptr<YarpServiceAsynchOutputPort> outputPort = serviceAsynchOutputPorts_[requesterPair][targetPair];
00215   
00216   if (outputPort == false) {
00217     //we need to create the port
00218     
00219     //create output port
00220     outputPort = shared_ptr<YarpServiceAsynchOutputPort> (new YarpServiceAsynchOutputPort (targetService, requesterEntityName, requesterServiceName));
00221     std::map<std::pair<std::string, std::string>, shared_ptr<YarpServiceAsynchOutputPort> > requesterOutputPorts = serviceAsynchOutputPorts_[requesterPair];
00222     
00223     requesterOutputPorts[targetPair] = outputPort;
00224     serviceAsynchOutputPorts_[requesterPair] = requesterOutputPorts;
00225   }
00226   
00227   return outputPort;
00228 }; // getYarpServiceAsynchOutputPort()
00229 
00230 
00231 void CommunicationGateway::sendToDataFeed (std::string entityName, std::string serviceName, std::string dataFeedName, shared_ptr<DataBox> dataBox)
00232 {
00233   shared_ptr<YarpDataFeedOutputPort> port = getYarpDataFeedOutputPort (entityName, serviceName, dataFeedName);
00234   Bottle& b = port->prepare();
00235   b.clear();
00236   DataFactory::writeDataBoxToBottle (dataBox, b);
00237   port->writeStrict();
00238 }; // sendDataToDataFeed()
00239 
00240 void CommunicationGateway::addYarpDataFeedOutputPort (shared_ptr<YarpDataFeedOutputPort> outputPort)
00241 {
00242   std::string entityName = outputPort->getEntityName();
00243   std::string serviceName = outputPort->getServiceInstanceName();
00244   std::string dataFeedName = outputPort->getDataFeedName();
00245   
00246   std::map<std::string, std::map<std::string, shared_ptr<YarpDataFeedOutputPort> > > entityFeedPorts = dataFeedOutputPorts_[entityName];
00247   
00248   std::map<std::string, shared_ptr<YarpDataFeedOutputPort> > serviceFeedPorts = entityFeedPorts[serviceName];
00249   
00250   serviceFeedPorts[dataFeedName] = outputPort;
00251   entityFeedPorts[serviceName] = serviceFeedPorts;
00252   dataFeedOutputPorts_[entityName] = entityFeedPorts;
00253 }; // addYarpDataFeedOutputPort()
00254 
00255 shared_ptr<YarpDataFeedOutputPort> CommunicationGateway::getYarpDataFeedOutputPort (std::string entityName, std::string serviceName, std::string feedName)
00256 {
00257   shared_ptr<YarpDataFeedOutputPort> port = dataFeedOutputPorts_[entityName][serviceName][feedName];
00258   
00259   if (port == false) {
00260     // we need to create the port
00261     std::stringstream ss;
00262     ss << "CommunicationGateway::getYarpDataFeedOutputPort() : no data feed port exists with entityName=\"" << entityName << "\", serviceName=\"" << serviceName << "\", feedName=\"" << feedName << "\"";
00263     throw Exception (ss.str());
00264   }
00265   
00266   return port;
00267   
00268 }; // getYarpDataFeedOutputPort()
00269 
00270 void CommunicationGateway::addYarpDataFeedInputPort (shared_ptr<YarpDataFeedInputPort> inputPort)
00271 {
00272   std::string consumerEntityName = inputPort->getConsumerService()->getEntity()->getEntityDescription()->getEntityName();
00273   std::string consumerServiceName = inputPort->getConsumerService()->getServiceInstanceDescription()->getServiceInstanceName();
00274   std::string producerEntityName = inputPort->getProducerEntityName();
00275   std::string producerServiceName = inputPort->getProducerServiceName();
00276   std::string dataFeedName = inputPort->getDataFeedName();
00277   
00278   dataFeedInputPorts_[consumerEntityName][consumerServiceName][producerEntityName][producerServiceName][dataFeedName] = inputPort;
00279 }; // addYarpDataFeedInputPort()
00280 
00281 shared_ptr<YarpDataFeedInputPort> CommunicationGateway::getYarpDataFeedInputPort (std::string consumerEntityName, std::string consumerServiceName, std::string producerEntityName, std::string producerServiceName, std::string dataFeedName)
00282 {
00283   shared_ptr<YarpDataFeedInputPort> port = dataFeedInputPorts_[consumerEntityName][consumerServiceName][producerEntityName][producerServiceName][dataFeedName];
00284   
00285   if (port == false) {
00286     //error
00287     std::stringstream ss;
00288     ss << "CommunicationGateway::getYarpDataFeedInputPort() : no data feed port exists with consumerEntityName=\"" << consumerEntityName << "\", consumerServiceName=\"" << consumerServiceName << "\", producerEntityName=\"" << producerEntityName << "\", producerServiceName=\"" << producerServiceName << "\", dataFeedName=\"" << dataFeedName << "\"";
00289     throw Exception (ss.str());
00290   }
00291   
00292   return port;
00293 }; // getYarpDataFeedInputPort()
00294 
00295 void CommunicationGateway::connectToDataFeed (std::string producerEntityName, std::string producerServiceName, std::string datafeedName, shared_ptr<Service> consumerService)
00296 {
00297   //std::cerr << "CommunicationGateway::connectToDataFeed()" << std::endl;
00298   
00299   shared_ptr<YarpDataFeedInputPort> port (new YarpDataFeedInputPort (producerEntityName, producerServiceName, datafeedName, consumerService));
00300   
00301   this->addYarpDataFeedInputPort (port);
00302   
00303   
00304 }; // connectToDataFeed()
00305 
00306 void CommunicationGateway::registerServiceRequest (shared_ptr<ServiceRequest> request)
00307 {
00308   std::string requesterEntityName = request->getRequesterEntityName();
00309   std::string requesterServiceName = request->getRequesterServiceName();
00310   int reqId = request->getRequestId();
00311   
00312   std::map<std::string, std::map<int, shared_ptr<ServiceRequest> > > entityMap = serviceRequests_[requesterEntityName];
00313   
00314   std::map<int, shared_ptr<ServiceRequest> > serviceMap = entityMap[requesterServiceName];
00315   
00316   //test if we already have a request registered with the same ID, for the same entity and service
00317   if (serviceMap.find (reqId) != serviceMap.end()) {
00318     std::stringstream ss;
00319     ss << "CommunicationGateway::registerServiceRequest : ServiceRequest already registered with the following identification: EntityName=" << requesterEntityName << " ,ServiceName=" << requesterServiceName << " ,RequestId=" << reqId;
00320     throw Exception (ss.str());
00321   }
00322   else {
00323     // lets insert the service request to the map
00324     //std::cerr << "CommunicationGateway::registerService : EntityName=" << requesterEntityName << " ,ServiceName=" << requesterServiceName << " ,RequestId=" << reqId << std::endl;
00325     serviceMap[reqId] = request;
00326     entityMap[requesterServiceName] = serviceMap;
00327     serviceRequests_[requesterEntityName] = entityMap;
00328   }
00329 }; // registerServiceRequest
00330 
00331 shared_ptr<ServiceRequest> CommunicationGateway::getServiceRequest (std::string requesterEntityName, std::string requesterServiceName, int requestId)
00332 {
00333   std::map<std::string, std::map<int, shared_ptr<ServiceRequest> > > entityMap = serviceRequests_[requesterEntityName];
00334   std::map<int, shared_ptr<ServiceRequest> > serviceMap = entityMap[requesterServiceName];
00335   
00336   if (serviceMap.find (requestId) == serviceMap.end()) {
00337     std::stringstream ss;
00338     ss << "CommunicationGateway::getServiceRequest : ServiceRequest with the following identification: EntityName=" << requesterEntityName << " ,ServiceName=" << requesterServiceName << " ,RequestId=" << requestId << " does not exist.";
00339     throw Exception (ss.str());
00340   }
00341   else {
00342     return serviceMap[requestId];
00343   }
00344 }; // getServiceRequest
00345 
00346 void CommunicationGateway::unregisterServiceRequest (std::string requesterEntityName, std::string requesterServiceName, int requestId)
00347 {
00348   //! @TODO implement CommunicationGateway::unregisterServiceRequest
00349 }; // unregisterServiceRequest
00350 
00351 #ifdef USE_COMM
00352 
00353 void CommunicationGateway::addDataFeedSetupTask (shared_ptr<Task> dst)
00354 {
00355   std::cout << "CommunicationGateway::addDataFeedSetupTask()" << std::endl;
00356   dataFeedSetupTasks_.push_back (dst);
00357 }
00358 
00359 bool CommunicationGateway::isNewDataFeedSetupTaskAvailable()
00360 {
00361   if (executingSetupTask_ == true) {
00362     return false;
00363   }
00364   else if (dataFeedSetupTasks_.size() > 0) {
00365     return true;
00366   }
00367   else {
00368     return false;
00369   }
00370   
00371 }
00372 
00373 shared_ptr<Task> CommunicationGateway::getDataFeedSetupTask()
00374 {
00375   shared_ptr<Task> t = dataFeedSetupTasks_[0];
00376   dataFeedSetupTasks_.erase (dataFeedSetupTasks_.begin());
00377   executingSetupTask_ = true;
00378   return t;
00379 }
00380 
00381 void CommunicationGateway::notifyDataFeedSetupEnded()
00382 {
00383   executingSetupTask_ = false;
00384 }
00385 
00386 #endif
Generated on Fri Mar 4 22:14:58 2011 for MeRMaID::support by  doxygen 1.6.3