IOThread.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "config.h"
00023
00024 #include "IOThread.hpp"
00025 #include "IOThreadUpdateTask.hpp"
00026 #include "AddStreamReaderTask.hpp"
00027 #include "AddStreamWriterTask.hpp"
00028 #include "WriteTask.hpp"
00029 #include "CloseTask.hpp"
00030 #include <ActiveObjectManagerAce.hpp>
00031 #include <cassert>
00032 #include <iostream>
00033
00034 using namespace mermaid::support::io;
00035 using mermaid::support::activeobject::ActiveObjectManagerAce;
00036
00037
00038 static const int TIMEOUT_MSEC = 1000;
00039
00040
00041 static const int IOTHREAD_UPDATE_RATE = 100;
00042
00043 IOThread::~IOThread()
00044 {
00045 }
00046
00047 IOThread::IOThread() : activeObject_ (ActiveObjectManagerAce::getInstance()->createActiveObject()), streamHandler_ (new StreamHandler)
00048 {
00049 activeObject_->start();
00050
00051 shared_ptr<Task> ioThreadUpdateTask (new IOThreadUpdateTask (this, IOTHREAD_UPDATE_RATE));
00052 activeObject_->addTask (ioThreadUpdateTask);
00053 }
00054
00055 void IOThread::addStreamReader (shared_ptr<StreamReader> streamReader)
00056 {
00057 shared_ptr<Task> addStreamReaderTask (new AddStreamReaderTask (this, streamReader));
00058 this->addTask (addStreamReaderTask);
00059 }
00060
00061 void IOThread::addStreamWriter (shared_ptr<StreamWriter> streamWriter)
00062 {
00063 shared_ptr<Task> addStreamWriterTask (new AddStreamWriterTask (this, streamWriter));
00064 this->addTask (addStreamWriterTask);
00065 }
00066
00067 void IOThread::addWriteOperation (int fileDescriptor, size_t bytesToWrite, const char* data, shared_ptr<StreamOperationHandlerMethodBase> streamOperationHandlerWrite)
00068 {
00069 shared_ptr<Task> addWriteTask (new WriteTask (this, fileDescriptor, bytesToWrite, data, streamOperationHandlerWrite));
00070 this->addTask (addWriteTask);
00071 }
00072
00073 void IOThread::addCloseOperation (int fileDescriptor, bool writeEnabled, bool readEnabled)
00074 {
00075 shared_ptr<Task> addCloseTask (new CloseTask (this, fileDescriptor, writeEnabled, readEnabled));
00076 this->addTask (addCloseTask);
00077 }
00078
00079 void IOThread::run()
00080 {
00081 ACE_Time_Value time (0, TIMEOUT_MSEC);
00082
00083
00084 proactor_.proactor_run_event_loop (time);
00085 proactor_.proactor_reset_event_loop();
00086 }
00087
00088 void IOThread::addTask (shared_ptr<Task> task)
00089 {
00090 activeObject_->addTask (task);
00091
00092
00093 std::cerr << "Interrupted" << std::endl;
00094 proactor_.proactor_end_event_loop();
00095 }
00096
00097 ACE_Proactor* IOThread::getProactor()
00098 {
00099 return &proactor_;
00100 }
00101