IOThread.cpp

00001 /*
00002 Copyright 2007, 2008, 2009, 2010, 2011 Instituto de Sistemas e Robotica, Instituto Superior Tecnico
00003 
00004 This file is part of MeRMaID.
00005 
00006 MeRMaID is free software: you can redistribute it and/or modify
00007 it under the terms of the GNU Lesser General Public License as published by
00008 the Free Software Foundation, either version 3 of the License, or
00009 (at your option) any later version.
00010 
00011 MeRMaID is distributed in the hope that it will be useful,
00012 but WITHOUT ANY WARRANTY; without even the implied warranty of
00013 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014 GNU Lesser General Public License for more details.
00015 
00016 You should have received a copy of the GNU Lesser General Public License
00017 along with MeRMaID.  If not, see <http://www.gnu.org/licenses/>.
00018 */
00019 
00020 
00021 
00022 #include "config.h"
00023 
00024 #include "IOThread.hpp"
00025 #include "IOThreadUpdateTask.hpp"
00026 #include "AddStreamReaderTask.hpp"
00027 #include "AddStreamWriterTask.hpp"
00028 #include "WriteTask.hpp"
00029 #include "CloseTask.hpp"
00030 #include <ActiveObjectManagerAce.hpp>
00031 #include <cassert>
00032 #include <iostream>
00033 
00034 using namespace mermaid::support::io;
00035 using mermaid::support::activeobject::ActiveObjectManagerAce;
00036 
00037 // timeout of 1 second
00038 static const int TIMEOUT_MSEC = 1000;
00039 
00040 // number of times per second IOThread::run will be executed
00041 static const int IOTHREAD_UPDATE_RATE = 100;
00042 
00043 IOThread::~IOThread()
00044 {
00045 }
00046 
00047 IOThread::IOThread() : activeObject_ (ActiveObjectManagerAce::getInstance()->createActiveObject()), streamHandler_ (new StreamHandler)
00048 {
00049   activeObject_->start();
00050   
00051   shared_ptr<Task> ioThreadUpdateTask (new IOThreadUpdateTask (this, IOTHREAD_UPDATE_RATE));
00052   activeObject_->addTask (ioThreadUpdateTask);
00053 }
00054 
00055 void IOThread::addStreamReader (shared_ptr<StreamReader> streamReader)
00056 {
00057   shared_ptr<Task> addStreamReaderTask (new AddStreamReaderTask (this, streamReader));
00058   this->addTask (addStreamReaderTask);
00059 }
00060 
00061 void IOThread::addStreamWriter (shared_ptr<StreamWriter> streamWriter)
00062 {
00063   shared_ptr<Task> addStreamWriterTask (new AddStreamWriterTask (this, streamWriter));
00064   this->addTask (addStreamWriterTask);
00065 }
00066 
00067 void IOThread::addWriteOperation (int fileDescriptor, size_t bytesToWrite, const char* data, shared_ptr<StreamOperationHandlerMethodBase> streamOperationHandlerWrite)
00068 {
00069   shared_ptr<Task> addWriteTask (new WriteTask (this, fileDescriptor, bytesToWrite, data, streamOperationHandlerWrite));
00070   this->addTask (addWriteTask);
00071 }
00072 
00073 void IOThread::addCloseOperation (int fileDescriptor, bool writeEnabled, bool readEnabled)
00074 {
00075   shared_ptr<Task> addCloseTask (new CloseTask (this, fileDescriptor, writeEnabled, readEnabled));
00076   this->addTask (addCloseTask);
00077 }
00078 
00079 void IOThread::run()
00080 {
00081   ACE_Time_Value time (0, TIMEOUT_MSEC);
00082   
00083   // run the proactor event loop
00084   proactor_.proactor_run_event_loop (time);
00085   proactor_.proactor_reset_event_loop();
00086 }
00087 
00088 void IOThread::addTask (shared_ptr<Task> task)
00089 {
00090   activeObject_->addTask (task);
00091   
00092   // interrupt the event loop so that this task can be processed with minimum delay
00093   std::cerr << "Interrupted" << std::endl;
00094   proactor_.proactor_end_event_loop();
00095 }
00096 
00097 ACE_Proactor* IOThread::getProactor()
00098 {
00099   return &proactor_;
00100 }
00101 
Generated on Fri Mar 4 22:14:58 2011 for MeRMaID::support by  doxygen 1.6.3