Changeset 21:eb59943050c9 in xplcommon
- Timestamp:
- 12/31/12 14:17:32 (12 years ago)
- Branch:
- default
- Phase:
- public
- Files:
-
- 22 added
- 3 deleted
- 20 edited
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
src/xplcommon/BufferedStream.h
r17 r21 58 58 public: 59 59 /** 60 * Destroy the buffered waitable.60 * Destroy the buffered stream. 61 61 */ 62 62 virtual ~BufferedStream(); … … 64 64 /** 65 65 * Get the reference of the reading buffer. Note, that it can be a 66 * 0 reference, if the waitablehas no reading buffer.66 * 0 reference, if the stream has no reading buffer. 67 67 */ 68 68 ReadingBuffer& getReadingBuffer(); … … 70 70 /** 71 71 * Get the reference of the writing buffer. Note, that it can be a 72 * 0 reference, if the waitablehas no writing buffer.72 * 0 reference, if the stream has no writing buffer. 73 73 */ 74 74 WritingBuffer& getWritingBuffer(); -
src/xplcommon/LocalAcceptor.h
r17 r21 32 32 //------------------------------------------------------------------------------ 33 33 34 #include " Acceptor.h"34 #include "Failable.h" 35 35 36 36 //------------------------------------------------------------------------------ … … 48 48 * connection. 49 49 */ 50 class LocalAcceptor : public Acceptor50 class LocalAcceptor : public FailableReference<LocalAcceptor> 51 51 { 52 52 public: 53 54 /** 55 * Start accepting a connection. 56 * 57 * If a connection is already accepted, but not retrieved yet, it 58 * returns true. 59 * 60 * If a new socket becomes immediately available, it returns true. 61 * 62 * If some error occurs, the acceptor is marked as failed. 63 */ 64 bool accept(); 65 53 66 /** 54 67 * Get the local socket accepted last. If no socket was accepted, 55 * return 0. The local socket's waiter will be the given one 56 * (which can be 0). 68 * return 0. The local socket's waiter will be the given one. If 69 * the server socket has no waiter, this one should be 0. If the 70 * server socket has a waiter, this one should not be 0. In other 71 * words, if the server socket is non-blocking, the accepted one 72 * should also be non-blocking, if the server socket is blocking, 73 * the accepted one should also be blocking. 57 74 */ 58 75 LocalSocket* getSocket(Waiter* waiter, -
src/xplcommon/LocalClientSocket.h
r17 r21 32 32 //------------------------------------------------------------------------------ 33 33 34 #include " ClientSocket.h"34 #include "BufferedStream.h" 35 35 36 36 //------------------------------------------------------------------------------ … … 44 44 //------------------------------------------------------------------------------ 45 45 46 class LocalConnector; 47 48 //------------------------------------------------------------------------------ 49 46 50 /** 47 51 * Client socket that connects to a server socket locally. On Linux it … … 50 54 * The server socket's name should be given. 51 55 */ 52 class LocalClientSocket : public ClientSocket56 class LocalClientSocket : public BufferedStream 53 57 { 54 58 public: … … 65 69 size_t readingCapacity = DEFAULT_CAPACITY, 66 70 size_t writingCapacity = DEFAULT_CAPACITY); 71 72 /** 73 * Get the connector. Use this object to connect to another socket. 74 */ 75 LocalConnector& getConnector(); 67 76 }; 68 77 -
src/xplcommon/LocalConnector.h
r17 r21 28 28 // either expressed or implied, of the FreeBSD Project. 29 29 30 #ifndef XPLCOMMON_ CONNECTOR_H31 #define XPLCOMMON_ CONNECTOR_H30 #ifndef XPLCOMMON_LOCALCONNECTOR_H 31 #define XPLCOMMON_LOCALCONNECTOR_H 32 32 //------------------------------------------------------------------------------ 33 33 … … 36 36 //------------------------------------------------------------------------------ 37 37 38 #if 0 // The public interface of a connector for a client socket38 #if 0 // The public interface of a connector for a local client socket 39 39 40 40 //------------------------------------------------------------------------------ … … 49 49 * only acquired from a client socket. 50 50 */ 51 class Connector : public FailableReference<Connector>51 class LocalConnector : public FailableReference<LocalConnector> 52 52 { 53 53 public: … … 73 73 //------------------------------------------------------------------------------ 74 74 75 #endif // 0, The public interface of a connector for a client socket75 #endif // 0, The public interface of a connector for a local client socket 76 76 77 77 //------------------------------------------------------------------------------ … … 85 85 //------------------------------------------------------------------------------ 86 86 87 #include "win32/ Connector.h"87 #include "win32/LocalConnector.h" 88 88 89 namespace xplcommon { typedef win32:: ConnectorConnector; }89 namespace xplcommon { typedef win32::LocalConnector LocalConnector; } 90 90 91 91 //------------------------------------------------------------------------------ … … 95 95 //------------------------------------------------------------------------------ 96 96 97 #include "posix/ Connector.h"97 #include "posix/LocalConnector.h" 98 98 99 namespace xplcommon { typedef posix:: ConnectorConnector; }99 namespace xplcommon { typedef posix::LocalConnector LocalConnector; } 100 100 101 101 //------------------------------------------------------------------------------ … … 104 104 105 105 //------------------------------------------------------------------------------ 106 #endif // XPLCOMMON_ CONNECTOR_H106 #endif // XPLCOMMON_LOCALCONNECTOR_H 107 107 108 108 // Local Variables: -
src/xplcommon/Makefile.am
r17 r21 32 32 ReadingBuffer.h \ 33 33 WritingBuffer.h \ 34 Acceptor.h \35 34 LocalAcceptor.h \ 36 35 LocalSocket.h \ 37 36 LocalServerSocket.h \ 38 Connector.h \ 39 ClientSocket.h \ 37 LocalConnector.h \ 40 38 LocalClientSocket.h -
src/xplcommon/ReadingBuffer.h
r17 r21 95 95 //------------------------------------------------------------------------------ 96 96 97 //#include "win32/ReadingBuffer.h"97 #include "win32/ReadingBuffer.h" 98 98 99 //namespace xplcommon { typedef win32::ReadingBuffer ReadingBuffer; }99 namespace xplcommon { typedef win32::ReadingBuffer ReadingBuffer; } 100 100 101 101 //------------------------------------------------------------------------------ -
src/xplcommon/WritingBuffer.h
r17 r21 101 101 //------------------------------------------------------------------------------ 102 102 103 //#include "win32/WritingBuffer.h"103 #include "win32/WritingBuffer.h" 104 104 105 //namespace xplcommon { typedef win32::WritingBuffer WritingBuffer; }105 namespace xplcommon { typedef win32::WritingBuffer WritingBuffer; } 106 106 107 107 //------------------------------------------------------------------------------ -
src/xplcommon/posix/ClientSocket.cc
r16 r21 41 41 42 42 using xplcommon::posix::ClientSocket; 43 using xplcommon::posix::Connector;44 45 //------------------------------------------------------------------------------46 47 ClientSocket::~ClientSocket()48 {49 delete connector;50 }51 52 //------------------------------------------------------------------------------53 54 Connector& ClientSocket::getConnector()55 {56 if (connector==0) {57 connector = createConnector();58 }59 return *connector;60 }61 43 62 44 //------------------------------------------------------------------------------ … … 64 46 void ClientSocket::handleEvents(short events) 65 47 { 66 if (connector !=0 && connector->connecting && (events&POLLOUT)==POLLOUT) {67 connector ->handleWritable();48 if (connector.connecting && (events&POLLOUT)==POLLOUT) { 49 connector.handleWritable(); 68 50 events &= ~POLLOUT; 69 51 } -
src/xplcommon/posix/ClientSocket.h
r14 r21 51 51 private: 52 52 /** 53 * The connector , if any. It is created only, if requested.53 * The connector. 54 54 */ 55 Connector *connector;55 Connector& connector; 56 56 57 57 protected: … … 59 59 * Construct the socket with the given parameters. 60 60 */ 61 ClientSocket(int domain, int type, Waiter* waiter = 0, int protocol = 0, 61 ClientSocket(int domain, int type, Connector& connector, 62 Waiter* waiter = 0, int protocol = 0, 62 63 size_t readingCapacity = DEFAULT_CAPACITY, 63 64 size_t writingCapacity = DEFAULT_CAPACITY); 64 65 65 public:66 /**67 * Destroy the socket.68 */69 virtual ~ClientSocket();70 71 /**72 * Get the connector. Use this object to connect to another socket.73 */74 Connector& getConnector();75 76 66 protected: 77 /**78 * Create a new connector of the correct type.79 */80 virtual Connector* createConnector() = 0;81 82 67 /** 83 68 * Handle any events on the file descriptor. If a connection is in … … 105 90 //------------------------------------------------------------------------------ 106 91 107 inline ClientSocket::ClientSocket(int domain, int type, Waiter* waiter, 108 int protocol, size_t readingCapacity, 92 inline ClientSocket::ClientSocket(int domain, int type, Connector& connector, 93 Waiter* waiter, int protocol, 94 size_t readingCapacity, 109 95 size_t writingCapacity) : 110 96 Socket(domain, type, waiter, protocol, readingCapacity, writingCapacity), 111 connector( 0)97 connector(connector) 112 98 { 113 99 } -
src/xplcommon/posix/LocalClientSocket.cc
r15 r21 32 32 #include "LocalClientSocket.h" 33 33 34 #include " Connector.h"34 #include "LocalConnector.h" 35 35 #include "LocalServerSocket.h" 36 36 … … 40 40 41 41 using xplcommon::posix::LocalClientSocket; 42 using xplcommon::posix::Connector;43 44 //------------------------------------------------------------------------------45 46 /**47 * The connector class used by the local client.48 */49 class LocalClientSocket::LocalConnector : public Connector50 {51 private:52 /**53 * The reference to the corresponding socket's address.54 */55 const struct sockaddr_un& address;56 57 public:58 /**59 * Create the connector for the given local client socket.60 */61 LocalConnector(LocalClientSocket* socket);62 63 protected:64 /**65 * Get the address.66 */67 virtual const struct sockaddr* getAddress(size_t& addrlen);68 };69 70 //------------------------------------------------------------------------------71 72 inline73 LocalClientSocket::LocalConnector::LocalConnector(LocalClientSocket* socket) :74 Connector(socket),75 address(socket->sun)76 {77 }78 79 //------------------------------------------------------------------------------80 81 const struct sockaddr*82 LocalClientSocket::LocalConnector::getAddress(size_t& addrlen)83 {84 addrlen = sizeof(address);85 return reinterpret_cast<const struct sockaddr*>(&address);86 }87 42 88 43 //------------------------------------------------------------------------------ … … 92 47 size_t readingCapacity, 93 48 size_t writingCapacity) : 94 ClientSocket(AF_UNIX, SOCK_STREAM, waiter, 0, 95 readingCapacity, writingCapacity) 49 ClientSocket(AF_UNIX, SOCK_STREAM, connector, waiter, 0, 50 readingCapacity, writingCapacity), 51 connector(this, name) 96 52 { 97 sun.sun_family = AF_UNIX;98 LocalServerSocket::setupPath(sun, name);99 }100 101 //------------------------------------------------------------------------------102 103 Connector* LocalClientSocket::createConnector()104 {105 return new LocalConnector(this);106 53 } 107 54 -
src/xplcommon/posix/LocalClientSocket.h
r15 r21 34 34 #include "ClientSocket.h" 35 35 36 #include <sys/un.h>36 #include "LocalConnector.h" 37 37 38 38 //------------------------------------------------------------------------------ … … 53 53 private: 54 54 /** 55 * The connector class.56 */57 class LocalConnector;58 59 /**60 55 * The address to use for connecting. 61 56 */ 62 struct sockaddr_un sun;57 LocalConnector connector; 63 58 64 59 public: … … 76 71 size_t writingCapacity = DEFAULT_CAPACITY); 77 72 78 protected:79 73 /** 80 * Create a new connector of the correct type.74 * Get the connector for this client. 81 75 */ 82 virtual Connector* createConnector();76 LocalConnector& getConnector(); 83 77 84 78 friend class LocalConnector; 85 79 }; 80 81 //------------------------------------------------------------------------------ 82 // Inline definitions 83 //------------------------------------------------------------------------------ 84 85 inline LocalConnector& LocalClientSocket::getConnector() 86 { 87 return connector; 88 } 86 89 87 90 //------------------------------------------------------------------------------ -
src/xplcommon/posix/Makefile.am
r16 r21 15 15 ClientSocket.cc \ 16 16 LocalServerSocket.cc \ 17 LocalConnector.cc \ 17 18 LocalClientSocket.cc 18 19 … … 35 36 LocalAcceptor.h \ 36 37 LocalServerSocket.h \ 38 LocalConnector.h \ 37 39 LocalClientSocket.h -
src/xplcommon/posix/ReadingBuffer.cc
r16 r21 34 34 #include "BufferedStream.h" 35 35 36 #include <cassert> 37 36 38 #include <poll.h> 37 39 … … 60 62 } else { 61 63 stream.events &= ~POLLIN; 62 addLength(result); 64 size_t added = addLength(result); 65 assert(added==result); 66 assert(getLength()==result); 63 67 } 64 68 return result>=0; -
src/xplcommon/posix/Waitable.h
r14 r21 93 93 94 94 /** 95 * Indicate if this waitable is valid, i.e. the file descriptor is96 * non-negative.97 */98 bool isValid() const;99 100 /**101 95 * Determine if the waitable is ready or not, i.e. if there is no 102 96 * need to wait for it. … … 143 137 //------------------------------------------------------------------------------ 144 138 145 inline bool Waitable::isValid() const146 {147 return fd>=0;148 }149 150 //------------------------------------------------------------------------------151 152 139 inline bool Waitable::ready() 153 140 { -
src/xplcommon/win32/Event.cc
r20 r21 40 40 //------------------------------------------------------------------------------ 41 41 42 bool Event::isFired() const 43 { 44 DWORD result = WaitForSingleObject(handle, (waiter==0) ? INFINITE : 0); 45 if (result==WAIT_FAILED) { 46 eventFailable.setErrorCode(GetLastError()); 47 return false; 48 } else if (result==WAIT_OBJECT_0) { 49 return true; 50 } else { 51 return false; 52 } 53 } 54 55 //------------------------------------------------------------------------------ 56 42 57 void Event::addTo(Waiter& w) 43 58 { -
src/xplcommon/win32/Event.h
r20 r21 32 32 //------------------------------------------------------------------------------ 33 33 34 #include " ../Failable.h"34 #include "EventFailable.h" 35 35 36 36 #include <windows.h> … … 49 49 * Wrapper for an event. 50 50 */ 51 class Event : public ::xplcommon::Failable51 class Event 52 52 { 53 53 private: 54 54 /** 55 * The object receiving the failure codes. 56 */ 57 EventFailable& eventFailable; 58 59 /** 55 60 * The handle of the event. 56 61 */ … … 66 71 * Construct the event. 67 72 */ 68 Event( );73 Event(EventFailable& eventFailable); 69 74 70 75 /** … … 96 101 97 102 /** 103 * Determine if the event is being waited for, i.e. it is 104 * associated with a waiter. 105 */ 106 bool isWaited() const; 107 108 /** 98 109 * Set the event. 99 110 */ 100 111 void fire(); 112 113 /** 114 * Determine if the event is fired or not. 115 */ 116 bool isFired() const; 101 117 102 118 /** … … 106 122 */ 107 123 bool clear(); 124 125 protected: 126 /** 127 * Set the error code on the failable object. 128 */ 129 void setErrorCode(errorCode_t errorCode); 108 130 }; 109 131 … … 112 134 //------------------------------------------------------------------------------ 113 135 114 inline Event::Event() : 136 inline Event::Event(EventFailable& eventFailable) : 137 eventFailable(eventFailable), 115 138 handle(CreateEvent(0, true, false, 0)), 116 139 waiter(0) 117 140 { 118 if (handle==0) setErrorCode(GetLastError());141 if (handle==0) eventFailable.setErrorCode(GetLastError()); 119 142 } 120 143 … … 143 166 //------------------------------------------------------------------------------ 144 167 168 inline bool Event::isWaited() const 169 { 170 return waiter!=0; 171 } 172 173 //------------------------------------------------------------------------------ 174 145 175 inline void Event::fire() 146 176 { 147 177 if (!SetEvent(handle)) { 148 setErrorCode(GetLastError());178 eventFailable.setErrorCode(GetLastError()); 149 179 } 150 180 } … … 155 185 { 156 186 if (!ResetEvent(handle)) { 157 setErrorCode(GetLastError());187 eventFailable.setErrorCode(GetLastError()); 158 188 return false; 159 189 } else { 160 190 return true; 161 191 } 192 } 193 194 //------------------------------------------------------------------------------ 195 196 inline void Event::setErrorCode(errorCode_t errorCode) 197 { 198 eventFailable.setErrorCode(errorCode); 162 199 } 163 200 -
src/xplcommon/win32/Makefile.am
r20 r21 6 6 Overlapped.cc \ 7 7 Waiter.cc \ 8 WaitableEvent.cc 8 BufferedStream.cc \ 9 ReadingBuffer.cc \ 10 WritingBuffer.cc \ 11 LocalAcceptor.cc \ 12 LocalServerSocketBase.cc\ 13 LocalConnector.cc \ 14 LocalClientSocket.cc 9 15 10 16 include_xplcommon_win32dir=$(includedir)/xplcommon/win32 11 17 include_xplcommon_win32_HEADERS=\ 12 18 Thread.h \ 19 EventFailable.h \ 13 20 Event.h \ 14 21 Overlapped.h \ 15 22 Waiter.h \ 16 WaitableEvent.h 23 WaitableEvent.h \ 24 Overlappable.h \ 25 Completer.h \ 26 StreamBuffer.h \ 27 BufferedStream.h \ 28 ReadingBuffer.h \ 29 WritingBuffer.h \ 30 LocalSocket.h \ 31 LocalAcceptor.h \ 32 LocalServerSocketBase.h \ 33 LocalServerSocket.h \ 34 LocalConnector.h \ 35 LocalClientSocket.h -
src/xplcommon/win32/Overlapped.cc
r20 r21 46 46 //------------------------------------------------------------------------------ 47 47 48 bool Overlapped::getResult(DWORD& size, HANDLE file) 49 { 50 bool result = GetOverlappedResult(file, &overlapped, &size, FALSE); 51 if (!result) { 52 DWORD error = GetLastError(); 53 if (error!=ERROR_IO_PENDING && error!=ERROR_IO_INCOMPLETE) { 54 reset(); 55 setErrorCode(error); 56 } 57 } else { 58 reset(); 59 } 60 return result; 61 } 62 63 //------------------------------------------------------------------------------ 64 48 65 // Local Variables: 49 66 // mode: C++ -
src/xplcommon/win32/Overlapped.h
r20 r21 59 59 * 0, and the event handle will be set. 60 60 */ 61 Overlapped( );61 Overlapped(EventFailable& eventFailable); 62 62 63 63 /** … … 75 75 */ 76 76 const OVERLAPPED* get() const; 77 78 /** 79 * Get the result of the overlapped operation. 80 * 81 * If the operation has completed, the event will be cleared. 82 * 83 * @param size on successful return, it will contain the number of 84 * bytes read or written, which may be 0 for an end-of-file 85 * condition 86 * @param file the handle of the file on which the operation in 87 * question was performed 88 * 89 * @return true if the operation has completed, or false if it has 90 * not, or a failure has occured. 91 */ 92 bool getResult(DWORD& size, HANDLE file); 77 93 }; 78 94 … … 81 97 //------------------------------------------------------------------------------ 82 98 83 inline Overlapped::Overlapped() 99 inline Overlapped::Overlapped(EventFailable& eventFailable) : 100 Event(eventFailable) 84 101 { 85 102 reset(); -
src/xplcommon/win32/WaitableEvent.h
r20 r21 33 33 34 34 #include "Event.h" 35 #include "EventFailable.h" 35 36 36 37 //------------------------------------------------------------------------------ … … 43 44 * An event which can be waited for. 44 45 */ 45 class WaitableEvent : public Event 46 class WaitableEvent : public Event, public EventFailable 46 47 { 47 48 public: … … 62 63 //------------------------------------------------------------------------------ 63 64 64 inline WaitableEvent::WaitableEvent(Waiter* waiter) 65 inline WaitableEvent::WaitableEvent(Waiter* waiter) : 66 Event(static_cast<EventFailable&>(*this)) 65 67 { 66 68 if (waiter!=0) addTo(*waiter); 69 } 70 71 //------------------------------------------------------------------------------ 72 73 inline bool WaitableEvent::check() 74 { 75 if (isFired()) { 76 clear(); 77 return true; 78 } else { 79 return false; 80 } 67 81 } 68 82 -
test/testlocsock.cc
r19 r21 36 36 #include <xplcommon/LocalSocket.h> 37 37 #include <xplcommon/LocalClientSocket.h> 38 #include <xplcommon/ Connector.h>38 #include <xplcommon/LocalConnector.h> 39 39 #include <xplcommon/ReadingBuffer.h> 40 40 #include <xplcommon/WritingBuffer.h> … … 57 57 using xplcommon::LocalSocket; 58 58 using xplcommon::LocalClientSocket; 59 using xplcommon:: Connector;59 using xplcommon::LocalConnector; 60 60 using xplcommon::ReadingBuffer; 61 61 using xplcommon::WritingBuffer; … … 66 66 //------------------------------------------------------------------------------ 67 67 68 void communicate(ReadingBuffer & readingBuffer, WritingBuffer&writingBuffer,68 void communicate(ReadingBuffer* readingBuffer, WritingBuffer* writingBuffer, 69 69 Waiter& waiter, unsigned seed, const char* prefix, 70 70 WaitableEvent* quitEvent = 0) … … 74 74 static const size_t rangeToWrite = maxToWrite - minToWrite; 75 75 76 static const size_t reportingInterval = 10000 0;77 78 static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.9 5);76 static const size_t reportingInterval = 10000; 77 78 static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.99); 79 79 static const int sleepRemainder = RAND_MAX - sleepThreshold; 80 80 static const unsigned minSleep = 10; … … 93 93 94 94 while(!toQuit) { 95 #if TARGET_API_POSIX 95 96 int r = rand_r(&seed); 97 #endif 98 #if TARGET_API_WIN32 99 int r = rand(); 100 #endif 96 101 size_t toWrite = minToWrite + 97 102 static_cast<size_t>( static_cast<double>(r) * 98 103 rangeToWrite / RAND_MAX ); 99 104 105 size_t lastWriteLength = 0; 106 if (writingBuffer!=0) { 107 lastWriteLength = min(writingBuffer->getCapacity(), toWrite); 108 writingBuffer->reset(); 109 writingBuffer->addLength(lastWriteLength); 110 } 111 100 112 while(toWrite>0) { 101 writingBuffer.reset();102 size_t length = min(writingBuffer.getCapacity(), toWrite);103 writingBuffer.addLength(length);104 105 113 if (quitEvent!=0 && quitEvent->check()) { 106 114 toQuit = true; … … 108 116 } 109 117 110 if (writingBuffer.write()) { 111 numBytesWritten += length; 112 toWrite -= length; 113 length = 0; 114 115 if (numBytesWritten>=nextReportWritten) { 116 printf("%s: written %zu bytes with %zu blocking and %zu ms sleeping\n", 117 prefix, numBytesWritten, numWriteBlocked, 118 numMillisSlept); 119 nextReportWritten += reportingInterval; 118 bool hasPending = false; 119 120 if (writingBuffer!=0) { 121 size_t loopCount = 0; 122 while(toWrite>0) { 123 ++loopCount; 124 if (writingBuffer->write()) { 125 numBytesWritten += lastWriteLength; 126 toWrite -= lastWriteLength; 127 128 if (numBytesWritten>=nextReportWritten) { 129 printf("%s: written %lu bytes with %lu blocking and %lu ms sleeping\n", 130 prefix, 131 static_cast<unsigned long>(numBytesWritten), 132 static_cast<unsigned long>(numWriteBlocked), 133 static_cast<unsigned long>(numMillisSlept)); 134 nextReportWritten += reportingInterval; 135 } 136 137 if (toWrite>0) { 138 if (quitEvent!=0 && quitEvent->check()) { 139 toQuit = true; 140 break; 141 } else { 142 lastWriteLength = min(writingBuffer->getCapacity(), toWrite); 143 writingBuffer->addLength(lastWriteLength); 144 if (loopCount>10) { 145 break; 146 } 147 } 148 } 149 } else if (writingBuffer->failed()) { 150 printf("%s: writing failed with error: %lu\n", 151 prefix, 152 static_cast<unsigned long>(writingBuffer->getErrorCode())); 153 toQuit = true; 154 break; 155 } else { 156 ++numWriteBlocked; 157 hasPending = true; 158 break; 159 } 120 160 } 121 } else if (writingBuffer.failed()) { 122 printf("%s: writing failed with error: %lu\n", 123 prefix, 124 static_cast<unsigned long>(writingBuffer.getErrorCode())); 125 toQuit = true; 126 break; 127 } else { 128 ++numWriteBlocked; 161 if (toQuit) break; 129 162 } 130 163 131 if (readingBuffer.read()) { 132 numBytesRead += readingBuffer.getLength(); 133 if (numBytesRead>=nextReportRead) { 134 printf("%s: read %zu bytes with %zu blocking and %zu ms sleeping\n", 135 prefix, numBytesRead, numReadBlocked, numMillisSlept); 136 nextReportRead += reportingInterval; 164 if (readingBuffer!=0) { 165 size_t loopCount = 0; 166 while(true) { 167 ++loopCount; 168 if (readingBuffer->read()) { 169 size_t l = readingBuffer->getLength(); 170 if (l==0) { 171 printf("%s: reading encountered end of file\n", prefix); 172 toQuit = true; 173 break; 174 } 175 numBytesRead += l; 176 if (numBytesRead>=nextReportRead) { 177 printf("%s: read %lu bytes with %lu blocking and %lu ms sleeping\n", 178 prefix, 179 static_cast<unsigned long>(numBytesRead), 180 static_cast<unsigned long>(numReadBlocked), 181 static_cast<unsigned long>(numMillisSlept)); 182 nextReportRead += reportingInterval; 183 } 184 readingBuffer->reset(); 185 if (quitEvent!=0 && quitEvent->check()) { 186 toQuit = true; 187 break; 188 } else if (loopCount>10) { 189 break; 190 } 191 } else if (readingBuffer->failed()) { 192 printf("%s: reading failed with error: %lu\n", 193 prefix, 194 static_cast<unsigned long>(readingBuffer->getErrorCode())); 195 toQuit = true; 196 break; 197 } else { 198 ++numReadBlocked; 199 hasPending = true; 200 break; 201 } 137 202 } 138 readingBuffer.reset(); 139 } else if (readingBuffer.failed()) { 140 printf("%s: reading failed with error: %lu\n", 141 prefix, 142 static_cast<unsigned long>(readingBuffer.getErrorCode())); 143 toQuit = true; 144 break; 145 } else { 146 ++numReadBlocked; 203 if (toQuit) break; 147 204 } 148 205 206 #if TARGET_API_POSIX 149 207 r = rand_r(&seed); 208 #endif 209 #if TARGET_API_WIN32 210 r = rand(); 211 #endif 150 212 if (r>=sleepThreshold) { 151 213 unsigned toSleep = minSleep + … … 158 220 } 159 221 160 if ( length!=0) {222 if (toWrite>0 && hasPending) { 161 223 waiter.wait(); 162 224 if (waiter.failed()) { … … 171 233 } 172 234 173 printf("%s: written %zu bytes with %zu blocking\n", 174 prefix, numBytesWritten, numWriteBlocked); 175 printf("%s: read %zu bytes with %zu blocking\n", 176 prefix, numBytesRead, numReadBlocked); 177 printf("%s: slept %zu ms\n", prefix, numMillisSlept); 235 printf("%s: written %lu bytes with %lu blocking\n", 236 prefix, 237 static_cast<unsigned long>(numBytesWritten), 238 static_cast<unsigned long>(numWriteBlocked)); 239 printf("%s: read %lu bytes with %lu blocking\n", 240 prefix, 241 static_cast<unsigned long>(numBytesRead), 242 static_cast<unsigned long>(numReadBlocked)); 243 printf("%s: slept %lu ms\n", prefix, 244 static_cast<unsigned long>(numMillisSlept)); 178 245 } 179 246 … … 219 286 LocalAcceptor& acceptor = serverSocket.getAcceptor(); 220 287 288 bool eventFired = false; 221 289 while(!acceptor.accept()) { 290 eventFired = event.check(); 291 if (eventFired) break; 222 292 if (acceptor.failed()) { 223 printf("ServerThread::run: acceptor failed...\n"); 293 printf("ServerThread::run: acceptor failed...: %lu\n", 294 (unsigned long)acceptor.getErrorCode()); 224 295 return; 225 296 } … … 228 299 } 229 300 301 if (eventFired) { 302 printf("ServerThread::run: waiting done, quitting\n"); 303 return; 304 } 305 230 306 printf("ServerThread::run: waiting done, received connection\n"); 231 307 LocalSocket* socket = acceptor.getSocket(); 232 308 233 communicate( socket->getReadingBuffer(),socket->getWritingBuffer(),309 communicate(&socket->getReadingBuffer(), &socket->getWritingBuffer(), 234 310 waiter, seed, "ServerThread", &event); 235 311 … … 243 319 class ClientThread : public Thread 244 320 { 321 private: 322 Waiter waiter; 323 324 WaitableEvent event; 325 245 326 public: 327 ClientThread(); 328 329 void signal(); 330 246 331 virtual void run(); 247 332 }; 333 334 //------------------------------------------------------------------------------ 335 336 ClientThread::ClientThread() : 337 event(&waiter) 338 { 339 } 340 341 //------------------------------------------------------------------------------ 342 343 void ClientThread::signal() 344 { 345 event.fire(); 346 } 248 347 249 348 //------------------------------------------------------------------------------ … … 258 357 printf("ClientThread::run: connecting\n"); 259 358 260 Waiter waiter;261 262 359 LocalClientSocket socket("test", &waiter); 263 360 264 Connector& connector = socket.getConnector();361 LocalConnector& connector = socket.getConnector(); 265 362 266 363 while(!connector.connect()) { … … 275 372 printf("ClientThread::run: connected\n"); 276 373 277 communicate( socket.getReadingBuffer(),socket.getWritingBuffer(),278 waiter, seed, "ClientThread" );374 communicate(&socket.getReadingBuffer(), &socket.getWritingBuffer(), 375 waiter, seed, "ClientThread", &event); 279 376 } 280 377 … … 284 381 int main() 285 382 { 383 #if TARGET_API_POSIX 286 384 signal(SIGPIPE, SIG_IGN); 385 #endif 287 386 288 387 ServerThread serverThread; … … 294 393 295 394 Thread::sleep(60000); 296 serverThread.signal(); 395 //Thread::sleep(5000); 396 printf("Signalling the server thread\n"); 397 //serverThread.signal(); 398 clientThread.signal(); 297 399 printf("Signalled the server thread\n"); 298 400
Note:
See TracChangeset
for help on using the changeset viewer.