// Copyright (c) 2012 by István Váradi // This file is part of libxplcommon, a common utility library for // development related to X-Plane // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // 1. Redistributions of source code must retain the above copyright notice, this // list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // The views and conclusions contained in the software and documentation are those // of the authors and should not be interpreted as representing official policies, // either expressed or implied, of the FreeBSD Project. //------------------------------------------------------------------------------ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //------------------------------------------------------------------------------ using hu::varadiistvan::scpl::io::Waiter; using hu::varadiistvan::scpl::io::TCPServerSocket; using hu::varadiistvan::scpl::io::TCPAcceptor; using hu::varadiistvan::scpl::io::TCPSocket; using hu::varadiistvan::scpl::io::TCPClientSocket; using hu::varadiistvan::scpl::io::TCPConnector; using hu::varadiistvan::scpl::io::ReadingBuffer; using hu::varadiistvan::scpl::io::WritingBuffer; using hu::varadiistvan::scpl::io::WaitableEvent; using hu::varadiistvan::scpl::Thread; using hu::varadiistvan::scpl::PseudoRandom; using std::min; //------------------------------------------------------------------------------ void communicate(ReadingBuffer* readingBuffer, WritingBuffer* writingBuffer, Waiter& waiter, unsigned seed, const char* prefix, WaitableEvent* quitEvent = 0) { static const size_t minToWrite = 2076; static const size_t maxToWrite = 1345670; static const size_t reportingInterval = 10000; static const int sleepThreshold = static_cast(PseudoRandom::MAX * 0.99); static const int sleepRemainder = PseudoRandom::MAX - sleepThreshold; static const unsigned minSleep = 10; static const unsigned maxSleep = 250; static const unsigned sleepRange = maxSleep - minSleep; size_t numBytesRead = 0; size_t nextReportRead = reportingInterval; size_t numWriteBlocked = 0; size_t numBytesWritten = 0; size_t nextReportWritten = reportingInterval; size_t numReadBlocked = 0; size_t numMillisSlept = 0; bool toQuit = false; PseudoRandom random(seed); while(!toQuit) { size_t toWrite = random.nextUnsigned(maxToWrite, minToWrite); size_t lastWriteLength = 0; if (writingBuffer!=0) { lastWriteLength = min(writingBuffer->getCapacity(), toWrite); writingBuffer->reset(); writingBuffer->addLength(lastWriteLength); } while(toWrite>0) { if (quitEvent!=0 && quitEvent->check()) { toQuit = true; break; } bool hasPending = false; if (writingBuffer!=0) { size_t loopCount = 0; while(toWrite>0) { ++loopCount; if (writingBuffer->write()) { numBytesWritten += lastWriteLength; toWrite -= lastWriteLength; if (numBytesWritten>=nextReportWritten) { printf("%s: written %lu bytes with %lu blocking and %lu ms sleeping\n", prefix, static_cast(numBytesWritten), static_cast(numWriteBlocked), static_cast(numMillisSlept)); nextReportWritten += reportingInterval; } if (toWrite>0) { if (quitEvent!=0 && quitEvent->check()) { toQuit = true; break; } else { lastWriteLength = min(writingBuffer->getCapacity(), toWrite); writingBuffer->addLength(lastWriteLength); if (loopCount>10) { break; } } } } else if (writingBuffer->failed()) { printf("%s: writing failed with error: %lu\n", prefix, static_cast(writingBuffer->getErrorCode())); toQuit = true; break; } else { ++numWriteBlocked; hasPending = true; break; } } if (toQuit) break; } if (readingBuffer!=0) { size_t loopCount = 0; while(true) { ++loopCount; if (readingBuffer->read()) { size_t l = readingBuffer->getLength(); if (l==0) { printf("%s: reading encountered end of file\n", prefix); toQuit = true; break; } numBytesRead += l; if (numBytesRead>=nextReportRead) { printf("%s: read %lu bytes with %lu blocking and %lu ms sleeping\n", prefix, static_cast(numBytesRead), static_cast(numReadBlocked), static_cast(numMillisSlept)); nextReportRead += reportingInterval; } readingBuffer->reset(); if (quitEvent!=0 && quitEvent->check()) { toQuit = true; break; } else if (loopCount>10) { break; } } else if (readingBuffer->failed()) { printf("%s: reading failed with error: %lu\n", prefix, static_cast(readingBuffer->getErrorCode())); toQuit = true; break; } else { ++numReadBlocked; hasPending = true; break; } } if (toQuit) break; } int r = random.next(); if (r>=sleepThreshold) { unsigned toSleep = minSleep + static_cast(static_cast(r - sleepThreshold) * sleepRange / sleepRemainder); assert(toSleep>=minSleep); assert(toSleep0 && hasPending) { waiter.wait(); if (waiter.failed()) { printf("%s: waiting failed with error: %lu\n", prefix, static_cast(waiter.getErrorCode())); toQuit = true; break; } } } } printf("%s: written %lu bytes with %lu blocking\n", prefix, static_cast(numBytesWritten), static_cast(numWriteBlocked)); printf("%s: read %lu bytes with %lu blocking\n", prefix, static_cast(numBytesRead), static_cast(numReadBlocked)); printf("%s: slept %lu ms\n", prefix, static_cast(numMillisSlept)); } //------------------------------------------------------------------------------ class ServerThread : public Thread { private: Waiter waiter; WaitableEvent event; public: ServerThread(); void signal(); virtual void run(); }; //------------------------------------------------------------------------------ ServerThread::ServerThread() : event(&waiter) { } //------------------------------------------------------------------------------ void ServerThread::signal() { event.fire(); } //------------------------------------------------------------------------------ void ServerThread::run() { unsigned seed = static_cast(time(0)*1.2); printf("ServerThread::run: seed=%u\n", seed); TCPServerSocket serverSocket(12345, &waiter); TCPAcceptor& acceptor = serverSocket.getAcceptor(); bool eventFired = false; while(!acceptor.accept()) { eventFired = event.check(); if (eventFired) break; if (acceptor.failed()) { printf("ServerThread::run: acceptor failed...: %lu\n", (unsigned long)acceptor.getErrorCode()); return; } printf("ServerThread::run: waiting...\n"); waiter.wait(); printf("ServerThread::run: waiting done...\n"); } if (eventFired) { printf("ServerThread::run: waiting done, quitting\n"); return; } printf("ServerThread::run: waiting done, received connection\n"); TCPSocket* socket = acceptor.getSocket(); communicate(&socket->getReadingBuffer(), &socket->getWritingBuffer(), waiter, seed, "ServerThread", &event); delete socket; } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ class ClientThread : public Thread { private: Waiter waiter; WaitableEvent event; public: ClientThread(); void signal(); virtual void run(); }; //------------------------------------------------------------------------------ ClientThread::ClientThread() : event(&waiter) { } //------------------------------------------------------------------------------ void ClientThread::signal() { event.fire(); } //------------------------------------------------------------------------------ void ClientThread::run() { unsigned seed = static_cast(time(0)*1.8); printf("ClientThread::run: seed=%u\n", seed); printf("ClientThread::run: sleeping\n"); sleep(500); printf("ClientThread::run: connecting\n"); TCPClientSocket socket("127.0.0.1", 12345, &waiter); TCPConnector& connector = socket.getConnector(); while(!connector.connect()) { if (connector.failed()) { printf("ClientThread::run: connector failed...: %lu\n", (unsigned long)connector.getErrorCode()); return; } printf("ClientThread::run: waiting... waiter=%p\n", &waiter); waiter.wait(); printf("ClientThread::run: waiting done\n"); } printf("ClientThread::run: connected\n"); communicate(&socket.getReadingBuffer(), &socket.getWritingBuffer(), waiter, seed, "ClientThread", &event); } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ int main() { #if TARGET_API_POSIX signal(SIGPIPE, SIG_IGN); #endif ServerThread serverThread; ClientThread clientThread; printf("Starting threads\n"); serverThread.start(); clientThread.start(); Thread::sleep(60000); //Thread::sleep(5000); printf("Signalling the server thread\n"); //serverThread.signal(); clientThread.signal(); printf("Signalled the server thread\n"); printf("Waiting for the client thread\n"); clientThread.join(); printf("Waiting for the server thread\n"); serverThread.join(); printf("Both threads returned\n"); return 0; } //------------------------------------------------------------------------------ // Local Variables: // mode: C++ // c-basic-offset: 4 // indent-tabs-mode: nil // End: