www.pudn.com > fanccMSNr.src.rar > MessageDispatcher.cpp
// MessageDispatcher.cpp : ±¸Çö ÆÄÀÏÀÔ´Ï´Ù. // #include "stdafx.h" #include "DumbMessenger.h" #include "MessageDispatcher.h" #include "messenger/MIMEMessage.hpp" #include "messenger/MessageFactory.hpp" #include// CMessageDispatcher CMessageDispatcher::CMessageDispatcher(): listeners() { serverSocket=NULL; } CMessageDispatcher::~CMessageDispatcher() { if(serverSocket!=NULL) { delete serverSocket; } } void CMessageDispatcher::connect(const string &address, unsigned long port) { // ´ÝÈ÷Áö ¾Ê¾Ò´Âµ¥ Á¢¼ÓÇÏ·Á´Â °æ¿ì¸¦ ¹è·ÁÇÑ´Ù. //Close(); if(Create()==0) { int error=GetLastError(); if(error!=WSAEWOULDBLOCK) { TRACE("ERROR in Create() in CMessageDispatcher::connect(). GetLastError==%d\n", error); throw SocketException(); } } if(Connect(address.data(), port)==0) { int error=GetLastError(); if(error!=WSAEWOULDBLOCK) { TRACE("ERROR in Connect() in CMessageDispatcher::connect(). GetLastError==%d\n", error); throw SocketException(); } } } namespace { class ServerSocket: public CAsyncSocket { protected: CAsyncSocket *connection; public: ServerSocket(CAsyncSocket &connection) { this->connection=&connection; } virtual void OnAccept(int nErrorCode) { //TODO: handle nErrorCode Accept(*connection); } }; } int CMessageDispatcher::listen(int preferredPort) { if(serverSocket==NULL) { serverSocket=new ServerSocket(*this); } int port=preferredPort; // ´ÝÈ÷Áö ¾Ê¾Ò´Âµ¥ Á¢¼ÓÇÏ·Á´Â °æ¿ì¸¦ ¹è·ÁÇÑ´Ù. serverSocket->Close(); while(serverSocket->Create(port)==0) { int error=GetLastError(); if(error==WSAEPROTONOSUPPORT) { port++; // retry Create() continue; } TRACE("ERROR in Create() in CMessageDispatcher::listen(). GetLastError==%d\n", error); throw SocketException(); } if(serverSocket->Listen()==0) { int error=GetLastError(); TRACE("ERROR in Listen() in CMessageDispatcher::listen(). GetLastError==%d\n", error); throw SocketException(); } return port; } void CMessageDispatcher::getAddress(string &address) { CString socketAddress; UINT port; GetSockName(socketAddress, port); address=(LPCSTR)socketAddress; } void CMessageDispatcher::addMessageListener(MessageListener &messageListener) { listeners.push_back(&messageListener); } void CMessageDispatcher::removeMessageListener(MessageListener &messageListener) { list ::iterator location=find(listeners.begin(), listeners.end(), &messageListener); if(location!=listeners.end()) { listeners.erase(location); } } MessageDispatcher *CMessageDispatcher::newInstance() { return new CMessageDispatcher; } //#include void CMessageDispatcher::sendMessage(Message &message) { const string &stringMessage=message.toString(); TRACE("send: \"%s\"\n", stringMessage.data()); // static ofstream out("out_log.txt"); // out<< message.toString(); sendBytes((const unsigned char *)stringMessage.data(), stringMessage.length()); } bool CMessageDispatcher::sendBytes(const unsigned char *bytes, size_t length) { if(Send(bytes, (int)length)==SOCKET_ERROR) { int error=GetLastError(); if(error==WSAEWOULDBLOCK) { return false; } TRACE("ERROR in CMessageDispatcher::sendMessage(). GetLastError==%d\n", error); throw SocketException(); } return true; } // CMessageDispatcher ¸â¹ö ÇÔ¼öÀÔ´Ï´Ù. void CMessageDispatcher::OnConnect(int nErrorCode) { // ÀÌÀü ¿¬°á¿¡ ³²¾ÆÀÖ´ø ¹öÆÛ¸¦ Áö¿î´Ù. buffer.erase(buffer.begin(), buffer.end()); list ::iterator listener; for(listener=listeners.begin(); listener!=listeners.end(); listener++) { if( nErrorCode==0) { (*listener)->connected(); } else { TRACE("About to failed() in CMessageDispatcher::OnConnect() nErrorCode==%d\n", nErrorCode); (*listener)->failed(); } } } void CMessageDispatcher::OnClose(int nErrorCode) { // ¿¡·¯°¡ ¹ß»ýÇßÀ» ¶§¸¸ È£ÃâÇÑ´Ù. if(nErrorCode!=0) { list ::iterator listener; for(listener=listeners.begin(); listener!=listeners.end(); listener++) { TRACE("About to failed() in CMessageDispatcher::OnClose() nErrorCode==%d\n", nErrorCode); (*listener)->failed(); } } } void CMessageDispatcher::OnSend(int nErrorCode) { list ::iterator listener; // ¿¡·¯°¡ ¹ß»ýÇßÀ» ¶§¸¸ È£ÃâÇÑ´Ù. if(nErrorCode!=0) { for(listener=listeners.begin(); listener!=listeners.end(); listener++) { TRACE("About to failed() in CMessageDispatcher::OnSend() nErrorCode==%d\n", nErrorCode); (*listener)->failed(); } } for(listener=listeners.begin(); listener!=listeners.end(); listener++) { (*listener)->sendable(); } } // CAsyncSocket¿¡¼ ¿À¹ö¶óÀ̵å void CMessageDispatcher::OnReceive(int nErrorCode) { if(nErrorCode!=0) { list ::iterator listener; for(listener=listeners.begin(); listener!=listeners.end(); listener++) { TRACE("About to failed() in CMessageDispatcher::OnReceive() nErrorCode==%d\n", nErrorCode); (*listener)->failed(); } return; } static const int nByteBuffer=500; char byteBuffer[nByteBuffer+1]; int nReceived = Receive(byteBuffer, nByteBuffer); if(nReceived==0 || nReceived==SOCKET_ERROR) { int error=GetLastError(); if(error!=WSAEWOULDBLOCK) { TRACE("ERROR in CMessageDispatcher::OnReceive(). GetLastError==%d\n", error); // OnReceive¿¡¼ ÀбⰡ ½ÇÆÐÇÏ¸é ±×Àú ¾Æ¹« Àϵµ ÇÏÁö ¾Ê´Â´Ù. return; } } byteBuffer[nReceived]='\0'; // TRACE("Bytes from socket: \"%s\"\n", byteBuffer); buffer.append(byteBuffer, nReceived); Message *message=NULL; try { while((message=MessageFactory::createMessage(buffer))!=NULL) { TRACE("message arrived: \"%s\"", message->toString().data()); // static ofstream out("in_log.txt"); // out<< message->toString(); list ::iterator listener; for( listener=listeners.begin(); listener!=listeners.end(); listener++) { (*listener)->messageArrived(*message); } delete message; message=NULL; } } catch(exception e) { if(message!=NULL) { delete message; message=NULL; } // TODO: handle exception } }