00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "config.h"
00032
00033 #include <CommunicationGateway.hpp>
00034 #include <DataFactory.hpp>
00035 #include <DataFeedDescription.hpp>
00036 #include <Exception.hpp>
00037 #include <Entity.hpp>
00038 #include <EntityDescription.hpp>
00039 #include <ServiceInterfaceDescription.hpp>
00040 #include <ServiceInstanceDescription.hpp>
00041 #include <ServiceTypeDescription.hpp>
00042
00043
00044 #include <iostream>
00045
00046 #include <sstream>
00047
00048
00049 #include <yarp/os/all.h>
00050
00051
00052
00053 #include <YarpDataFeedReporter.hpp>
00054 #include <YarpServiceAsynchInputPort.hpp>
00055 #include <YarpServiceAsynchRequestPort.hpp>
00056 #include <YarpServiceAsynchRequestType.hpp>
00057
00058 using namespace mermaid::support::communication;
00059
00060 using mermaid::support::data::DataFactory;
00061 using mermaid::support::errorhandling::Exception;
00062 using mermaid::support::service::DataFeedDescription;
00063 using mermaid::support::service::Entity;
00064 using mermaid::support::service::EntityDescription;
00065 using mermaid::support::service::ServiceInterfaceDescription;
00066 using mermaid::support::service::ServiceInstanceDescription;
00067 using mermaid::support::service::ServiceTypeDescription;
00068 using std::string;
00069 using yarp::os::Network;
00070 using yarp::os::Port;
00071
00072 CommunicationGateway::CommunicationGateway()
00073 {
00074
00075 #ifdef USE_COMM
00076 executingSetupTask_ = false;
00077 #endif
00078
00079 };
00080
00081 CommunicationGateway::~CommunicationGateway()
00082 {
00083
00084 };
00085
00086 void CommunicationGateway::registerService (shared_ptr<Service> s)
00087 {
00088
00089
00090 shared_ptr<ServiceInstanceDescription> serviceInstanceDescription = s->getServiceInstanceDescription();
00091 std::string serviceInstanceName = serviceInstanceDescription->getServiceInstanceName();
00092 std::string entityName = s->getEntity()->getEntityDescription()->getEntityName();
00093
00094
00095 shared_ptr<YarpServiceAsynchInputPort> servicePort (new YarpServiceAsynchInputPort (s));
00096
00097
00098 serviceAsynchInputPorts_[serviceInstanceName] = servicePort;
00099
00100
00101 std::vector<shared_ptr<ServiceTypeDescription> > serviceTypes = s->getServiceInstanceDescription()->getServiceTypeDescriptions();
00102
00103 std::vector<shared_ptr<ServiceTypeDescription> >::iterator typesIterator;
00104
00105 for (typesIterator = serviceTypes.begin(); typesIterator != serviceTypes.end(); typesIterator++) {
00106 shared_ptr<ServiceTypeDescription> serviceType = *typesIterator;
00107
00108
00109 std::vector< shared_ptr<DataFeedDescription> > dataFeedDescriptions = serviceType->getAllDataFeedDescriptions();
00110
00111 std::vector< shared_ptr<DataFeedDescription> >::iterator dataFeedIt;
00112
00113 for (dataFeedIt = dataFeedDescriptions.begin(); dataFeedIt != dataFeedDescriptions.end(); dataFeedIt++) {
00114 shared_ptr<DataFeedDescription> dataFeedDescription = *dataFeedIt;
00115 std::string dataFeedName = dataFeedDescription->getDataFeedName();
00116 std::cerr << "CommunicationGateway::registerService : adding DataFeed port for DataFeed with dataFeedName=\"" << dataFeedName << "\" to Service with entityName=\"" << entityName << "\", serviceInstanceName=\"" << serviceInstanceName << "\"" << std::endl;
00117
00118 shared_ptr<YarpDataFeedOutputPort> datafeedOutputPort (new YarpDataFeedOutputPort (entityName, serviceInstanceName, dataFeedName));
00119
00120 addYarpDataFeedOutputPort (datafeedOutputPort);
00121 }
00122 }
00123 };
00124
00125 void CommunicationGateway::sendServiceRequest (shared_ptr<Service> requesterService, shared_ptr<ServiceRequest> request)
00126 {
00127
00128
00129 std::string targetEntityName = request->getTargetEntityName();
00130 std::string targetServiceName = request->getTargetServiceName();
00131
00132
00133 shared_ptr<YarpServiceAsynchRequestPort> requestPort = getYarpServiceAsynchRequestPort (requesterService, request);
00134
00135
00136 registerServiceRequest (request);
00137
00138
00139 YarpServiceAsynchRequestType& ysart = requestPort->prepare();
00140 ysart.convertFromServiceRequest (*request);
00141 requestPort->writeStrict();
00142 request->notifySent();
00143
00144 };
00145
00146
00147 void CommunicationGateway::sendServiceReply (shared_ptr<Service> targetService, std::string requesterEntityName, std::string requesterServiceName, shared_ptr<ServiceReply> sr)
00148 {
00149
00150
00151
00152 shared_ptr<YarpServiceAsynchOutputPort> outputPort = getYarpServiceAsynchOutputPort (targetService, requesterEntityName, requesterServiceName);
00153
00154
00155 YarpServiceAsynchReplyType& ysart = outputPort->prepare();
00156 ysart.convertFromServiceReply (*sr);
00157 outputPort->writeStrict();
00158
00159
00160 };
00161
00162
00163 shared_ptr<YarpServiceAsynchRequestPort> CommunicationGateway::getYarpServiceAsynchRequestPort (shared_ptr<Service> requesterService, shared_ptr<ServiceRequest> request)
00164 {
00165 std::string targetEntityName = request->getTargetEntityName();
00166 std::string targetServiceName = request->getTargetServiceName();
00167 std::string requesterEntityName = requesterService->getServiceInstanceDescription()->getEntityDescription()->getEntityName();
00168 std::string requesterServiceName = requesterService->getServiceInstanceDescription()->getServiceInstanceName();
00169
00170 std::pair<std::string, std::string> requesterPair (requesterEntityName, requesterServiceName);
00171 std::pair<std::string, std::string> targetPair (targetEntityName, targetServiceName);
00172 shared_ptr<YarpServiceAsynchRequestPort> requestPort = serviceAsynchRequestPorts_[requesterPair][targetPair];
00173
00174 if (requestPort == false) {
00175
00176
00177
00178 requestPort = shared_ptr<YarpServiceAsynchRequestPort> (new YarpServiceAsynchRequestPort (requesterService, request));
00179 std::map<std::pair<std::string, std::string>, shared_ptr<YarpServiceAsynchRequestPort> > requesterRequestPorts = serviceAsynchRequestPorts_[requesterPair];
00180
00181 requesterRequestPorts[targetPair] = requestPort;
00182 serviceAsynchRequestPorts_[requesterPair] = requesterRequestPorts;
00183
00184
00185 shared_ptr<YarpServiceAsynchReplyPort> replyPort (new YarpServiceAsynchReplyPort (requesterService, targetEntityName, targetServiceName));
00186
00187 std::map<std::pair<std::string, std::string>, shared_ptr<YarpServiceAsynchReplyPort> > requesterReplyPorts = serviceAsynchReplyPorts_[requesterPair];
00188
00189 requesterReplyPorts[targetPair] = replyPort;
00190 serviceAsynchReplyPorts_[requesterPair] = requesterReplyPorts;
00191
00192 }
00193
00194 return requestPort;
00195
00196 };
00197
00198
00199 shared_ptr<YarpServiceAsynchReplyPort> CommunicationGateway::getYarpServiceAsynchReplyPort (std::string targetEntityName, std::string targetServiceName, std::string requesterEntityName, std::string requesterServiceName)
00200 {
00201 std::pair<std::string, std::string> requesterPair (requesterEntityName, requesterServiceName);
00202 std::pair<std::string, std::string> targetPair (targetEntityName, targetServiceName);
00203 return serviceAsynchReplyPorts_[requesterPair][targetPair];
00204 };
00205
00206
00207 shared_ptr<YarpServiceAsynchOutputPort> CommunicationGateway::getYarpServiceAsynchOutputPort (shared_ptr<Service> targetService, std::string requesterEntityName, std::string requesterServiceName)
00208 {
00209 std::string targetEntityName = targetService->getServiceInstanceDescription()->getEntityDescription()->getEntityName();
00210 std::string targetServiceName = targetService->getServiceInstanceDescription()->getServiceInstanceName();
00211 std::pair<std::string, std::string> requesterPair (requesterEntityName, requesterServiceName);
00212 std::pair<std::string, std::string> targetPair (targetEntityName, targetServiceName);
00213
00214 shared_ptr<YarpServiceAsynchOutputPort> outputPort = serviceAsynchOutputPorts_[requesterPair][targetPair];
00215
00216 if (outputPort == false) {
00217
00218
00219
00220 outputPort = shared_ptr<YarpServiceAsynchOutputPort> (new YarpServiceAsynchOutputPort (targetService, requesterEntityName, requesterServiceName));
00221 std::map<std::pair<std::string, std::string>, shared_ptr<YarpServiceAsynchOutputPort> > requesterOutputPorts = serviceAsynchOutputPorts_[requesterPair];
00222
00223 requesterOutputPorts[targetPair] = outputPort;
00224 serviceAsynchOutputPorts_[requesterPair] = requesterOutputPorts;
00225 }
00226
00227 return outputPort;
00228 };
00229
00230
00231 void CommunicationGateway::sendToDataFeed (std::string entityName, std::string serviceName, std::string dataFeedName, shared_ptr<DataBox> dataBox)
00232 {
00233 shared_ptr<YarpDataFeedOutputPort> port = getYarpDataFeedOutputPort (entityName, serviceName, dataFeedName);
00234 Bottle& b = port->prepare();
00235 b.clear();
00236 DataFactory::writeDataBoxToBottle (dataBox, b);
00237 port->writeStrict();
00238 };
00239
00240 void CommunicationGateway::addYarpDataFeedOutputPort (shared_ptr<YarpDataFeedOutputPort> outputPort)
00241 {
00242 std::string entityName = outputPort->getEntityName();
00243 std::string serviceName = outputPort->getServiceInstanceName();
00244 std::string dataFeedName = outputPort->getDataFeedName();
00245
00246 std::map<std::string, std::map<std::string, shared_ptr<YarpDataFeedOutputPort> > > entityFeedPorts = dataFeedOutputPorts_[entityName];
00247
00248 std::map<std::string, shared_ptr<YarpDataFeedOutputPort> > serviceFeedPorts = entityFeedPorts[serviceName];
00249
00250 serviceFeedPorts[dataFeedName] = outputPort;
00251 entityFeedPorts[serviceName] = serviceFeedPorts;
00252 dataFeedOutputPorts_[entityName] = entityFeedPorts;
00253 };
00254
00255 shared_ptr<YarpDataFeedOutputPort> CommunicationGateway::getYarpDataFeedOutputPort (std::string entityName, std::string serviceName, std::string feedName)
00256 {
00257 shared_ptr<YarpDataFeedOutputPort> port = dataFeedOutputPorts_[entityName][serviceName][feedName];
00258
00259 if (port == false) {
00260
00261 std::stringstream ss;
00262 ss << "CommunicationGateway::getYarpDataFeedOutputPort() : no data feed port exists with entityName=\"" << entityName << "\", serviceName=\"" << serviceName << "\", feedName=\"" << feedName << "\"";
00263 throw Exception (ss.str());
00264 }
00265
00266 return port;
00267
00268 };
00269
00270 void CommunicationGateway::addYarpDataFeedInputPort (shared_ptr<YarpDataFeedInputPort> inputPort)
00271 {
00272 std::string consumerEntityName = inputPort->getConsumerService()->getEntity()->getEntityDescription()->getEntityName();
00273 std::string consumerServiceName = inputPort->getConsumerService()->getServiceInstanceDescription()->getServiceInstanceName();
00274 std::string producerEntityName = inputPort->getProducerEntityName();
00275 std::string producerServiceName = inputPort->getProducerServiceName();
00276 std::string dataFeedName = inputPort->getDataFeedName();
00277
00278 dataFeedInputPorts_[consumerEntityName][consumerServiceName][producerEntityName][producerServiceName][dataFeedName] = inputPort;
00279 };
00280
00281 shared_ptr<YarpDataFeedInputPort> CommunicationGateway::getYarpDataFeedInputPort (std::string consumerEntityName, std::string consumerServiceName, std::string producerEntityName, std::string producerServiceName, std::string dataFeedName)
00282 {
00283 shared_ptr<YarpDataFeedInputPort> port = dataFeedInputPorts_[consumerEntityName][consumerServiceName][producerEntityName][producerServiceName][dataFeedName];
00284
00285 if (port == false) {
00286
00287 std::stringstream ss;
00288 ss << "CommunicationGateway::getYarpDataFeedInputPort() : no data feed port exists with consumerEntityName=\"" << consumerEntityName << "\", consumerServiceName=\"" << consumerServiceName << "\", producerEntityName=\"" << producerEntityName << "\", producerServiceName=\"" << producerServiceName << "\", dataFeedName=\"" << dataFeedName << "\"";
00289 throw Exception (ss.str());
00290 }
00291
00292 return port;
00293 };
00294
00295 void CommunicationGateway::connectToDataFeed (std::string producerEntityName, std::string producerServiceName, std::string datafeedName, shared_ptr<Service> consumerService)
00296 {
00297
00298
00299 shared_ptr<YarpDataFeedInputPort> port (new YarpDataFeedInputPort (producerEntityName, producerServiceName, datafeedName, consumerService));
00300
00301 this->addYarpDataFeedInputPort (port);
00302
00303
00304 };
00305
00306 void CommunicationGateway::registerServiceRequest (shared_ptr<ServiceRequest> request)
00307 {
00308 std::string requesterEntityName = request->getRequesterEntityName();
00309 std::string requesterServiceName = request->getRequesterServiceName();
00310 int reqId = request->getRequestId();
00311
00312 std::map<std::string, std::map<int, shared_ptr<ServiceRequest> > > entityMap = serviceRequests_[requesterEntityName];
00313
00314 std::map<int, shared_ptr<ServiceRequest> > serviceMap = entityMap[requesterServiceName];
00315
00316
00317 if (serviceMap.find (reqId) != serviceMap.end()) {
00318 std::stringstream ss;
00319 ss << "CommunicationGateway::registerServiceRequest : ServiceRequest already registered with the following identification: EntityName=" << requesterEntityName << " ,ServiceName=" << requesterServiceName << " ,RequestId=" << reqId;
00320 throw Exception (ss.str());
00321 }
00322 else {
00323
00324
00325 serviceMap[reqId] = request;
00326 entityMap[requesterServiceName] = serviceMap;
00327 serviceRequests_[requesterEntityName] = entityMap;
00328 }
00329 };
00330
00331 shared_ptr<ServiceRequest> CommunicationGateway::getServiceRequest (std::string requesterEntityName, std::string requesterServiceName, int requestId)
00332 {
00333 std::map<std::string, std::map<int, shared_ptr<ServiceRequest> > > entityMap = serviceRequests_[requesterEntityName];
00334 std::map<int, shared_ptr<ServiceRequest> > serviceMap = entityMap[requesterServiceName];
00335
00336 if (serviceMap.find (requestId) == serviceMap.end()) {
00337 std::stringstream ss;
00338 ss << "CommunicationGateway::getServiceRequest : ServiceRequest with the following identification: EntityName=" << requesterEntityName << " ,ServiceName=" << requesterServiceName << " ,RequestId=" << requestId << " does not exist.";
00339 throw Exception (ss.str());
00340 }
00341 else {
00342 return serviceMap[requestId];
00343 }
00344 };
00345
00346 void CommunicationGateway::unregisterServiceRequest (std::string requesterEntityName, std::string requesterServiceName, int requestId)
00347 {
00348
00349 };
00350
00351 #ifdef USE_COMM
00352
00353 void CommunicationGateway::addDataFeedSetupTask (shared_ptr<Task> dst)
00354 {
00355 std::cout << "CommunicationGateway::addDataFeedSetupTask()" << std::endl;
00356 dataFeedSetupTasks_.push_back (dst);
00357 }
00358
00359 bool CommunicationGateway::isNewDataFeedSetupTaskAvailable()
00360 {
00361 if (executingSetupTask_ == true) {
00362 return false;
00363 }
00364 else if (dataFeedSetupTasks_.size() > 0) {
00365 return true;
00366 }
00367 else {
00368 return false;
00369 }
00370
00371 }
00372
00373 shared_ptr<Task> CommunicationGateway::getDataFeedSetupTask()
00374 {
00375 shared_ptr<Task> t = dataFeedSetupTasks_[0];
00376 dataFeedSetupTasks_.erase (dataFeedSetupTasks_.begin());
00377 executingSetupTask_ = true;
00378 return t;
00379 }
00380
00381 void CommunicationGateway::notifyDataFeedSetupEnded()
00382 {
00383 executingSetupTask_ = false;
00384 }
00385
00386 #endif