Changeset 21:eb59943050c9 in xplcommon for test/testlocsock.cc


Ignore:
Timestamp:
12/31/12 14:17:32 (11 years ago)
Author:
István Váradi <ivaradi@…>
Branch:
default
Phase:
public
Message:

Added the implementation of the local sockets for Win32 and it seems to work

File:
1 edited

Legend:

Unmodified
Added
Removed
  • test/testlocsock.cc

    r19 r21  
    3636#include <xplcommon/LocalSocket.h>
    3737#include <xplcommon/LocalClientSocket.h>
    38 #include <xplcommon/Connector.h>
     38#include <xplcommon/LocalConnector.h>
    3939#include <xplcommon/ReadingBuffer.h>
    4040#include <xplcommon/WritingBuffer.h>
     
    5757using xplcommon::LocalSocket;
    5858using xplcommon::LocalClientSocket;
    59 using xplcommon::Connector;
     59using xplcommon::LocalConnector;
    6060using xplcommon::ReadingBuffer;
    6161using xplcommon::WritingBuffer;
     
    6666//------------------------------------------------------------------------------
    6767
    68 void communicate(ReadingBuffer& readingBuffer, WritingBuffer& writingBuffer,
     68void communicate(ReadingBuffer* readingBuffer, WritingBuffer* writingBuffer,
    6969                 Waiter& waiter, unsigned seed, const char* prefix,
    7070                 WaitableEvent* quitEvent = 0)
     
    7474    static const size_t rangeToWrite = maxToWrite - minToWrite;
    7575
    76     static const size_t reportingInterval = 100000;
    77 
    78     static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.95);
     76    static const size_t reportingInterval = 10000;
     77
     78    static const int sleepThreshold = static_cast<int>(RAND_MAX * 0.99);
    7979    static const int sleepRemainder = RAND_MAX - sleepThreshold;
    8080    static const unsigned minSleep = 10;
     
    9393
    9494    while(!toQuit) {
     95#if TARGET_API_POSIX
    9596        int r = rand_r(&seed);
     97#endif
     98#if TARGET_API_WIN32
     99        int r = rand();
     100#endif
    96101        size_t toWrite = minToWrite +
    97102            static_cast<size_t>( static_cast<double>(r) *
    98103                                 rangeToWrite / RAND_MAX );
    99104
     105        size_t lastWriteLength = 0;
     106        if (writingBuffer!=0) {
     107            lastWriteLength = min(writingBuffer->getCapacity(), toWrite);
     108            writingBuffer->reset();
     109            writingBuffer->addLength(lastWriteLength);
     110        }
     111
    100112        while(toWrite>0) {
    101             writingBuffer.reset();
    102             size_t length = min(writingBuffer.getCapacity(), toWrite);
    103             writingBuffer.addLength(length);
    104 
    105113            if (quitEvent!=0 && quitEvent->check()) {
    106114                toQuit = true;
     
    108116            }
    109117
    110             if (writingBuffer.write()) {
    111                 numBytesWritten += length;
    112                 toWrite -= length;
    113                 length = 0;
    114 
    115                 if (numBytesWritten>=nextReportWritten) {
    116                     printf("%s: written %zu bytes with %zu blocking and %zu ms sleeping\n",
    117                            prefix, numBytesWritten, numWriteBlocked,
    118                            numMillisSlept);
    119                     nextReportWritten += reportingInterval;
     118            bool hasPending = false;
     119
     120            if (writingBuffer!=0) {
     121                size_t loopCount = 0;
     122                while(toWrite>0) {
     123                    ++loopCount;
     124                    if (writingBuffer->write()) {
     125                        numBytesWritten += lastWriteLength;
     126                        toWrite -= lastWriteLength;
     127
     128                        if (numBytesWritten>=nextReportWritten) {
     129                            printf("%s: written %lu bytes with %lu blocking and %lu ms sleeping\n",
     130                                   prefix,
     131                                   static_cast<unsigned long>(numBytesWritten),
     132                                   static_cast<unsigned long>(numWriteBlocked),
     133                                   static_cast<unsigned long>(numMillisSlept));
     134                            nextReportWritten += reportingInterval;
     135                        }
     136
     137                        if (toWrite>0) {
     138                            if (quitEvent!=0 && quitEvent->check()) {
     139                                toQuit = true;
     140                                break;
     141                            } else {
     142                                lastWriteLength = min(writingBuffer->getCapacity(), toWrite);
     143                                writingBuffer->addLength(lastWriteLength);
     144                                if (loopCount>10) {
     145                                    break;
     146                                }
     147                            }
     148                        }
     149                    } else if (writingBuffer->failed()) {
     150                        printf("%s: writing failed with error: %lu\n",
     151                               prefix,
     152                               static_cast<unsigned long>(writingBuffer->getErrorCode()));
     153                        toQuit = true;
     154                        break;
     155                    } else {
     156                        ++numWriteBlocked;
     157                        hasPending = true;
     158                        break;
     159                    }
    120160                }
    121             } else if (writingBuffer.failed()) {
    122                 printf("%s: writing failed with error: %lu\n",
    123                        prefix,
    124                        static_cast<unsigned long>(writingBuffer.getErrorCode()));
    125                 toQuit = true;
    126                 break;
    127             } else {
    128                 ++numWriteBlocked;
     161                if (toQuit) break;
    129162            }
    130163
    131             if (readingBuffer.read()) {
    132                 numBytesRead += readingBuffer.getLength();
    133                 if (numBytesRead>=nextReportRead) {
    134                     printf("%s: read %zu bytes with %zu blocking and %zu ms sleeping\n",
    135                            prefix, numBytesRead, numReadBlocked, numMillisSlept);
    136                     nextReportRead += reportingInterval;
     164            if (readingBuffer!=0) {
     165                size_t loopCount = 0;
     166                while(true) {
     167                    ++loopCount;
     168                    if (readingBuffer->read()) {
     169                        size_t l = readingBuffer->getLength();
     170                        if (l==0) {
     171                            printf("%s: reading encountered end of file\n", prefix);
     172                            toQuit = true;
     173                            break;
     174                        }
     175                        numBytesRead += l;
     176                        if (numBytesRead>=nextReportRead) {
     177                            printf("%s: read %lu bytes with %lu blocking and %lu ms sleeping\n",
     178                                   prefix,
     179                                   static_cast<unsigned long>(numBytesRead),
     180                                   static_cast<unsigned long>(numReadBlocked),
     181                                   static_cast<unsigned long>(numMillisSlept));
     182                            nextReportRead += reportingInterval;
     183                        }
     184                        readingBuffer->reset();
     185                        if (quitEvent!=0 && quitEvent->check()) {
     186                            toQuit = true;
     187                            break;
     188                        } else if (loopCount>10) {
     189                            break;
     190                        }
     191                    } else if (readingBuffer->failed()) {
     192                        printf("%s: reading failed with error: %lu\n",
     193                               prefix,
     194                               static_cast<unsigned long>(readingBuffer->getErrorCode()));
     195                        toQuit = true;
     196                        break;
     197                    } else {
     198                        ++numReadBlocked;
     199                        hasPending = true;
     200                        break;
     201                    }
    137202                }
    138                 readingBuffer.reset();
    139             } else if (readingBuffer.failed()) {
    140                 printf("%s: reading failed with error: %lu\n",
    141                        prefix,
    142                        static_cast<unsigned long>(readingBuffer.getErrorCode()));
    143                 toQuit = true;
    144                 break;
    145             } else {
    146                 ++numReadBlocked;
     203                if (toQuit) break;
    147204            }
    148205
     206#if TARGET_API_POSIX
    149207            r = rand_r(&seed);
     208#endif
     209#if TARGET_API_WIN32
     210            r = rand();
     211#endif
    150212            if (r>=sleepThreshold) {
    151213                unsigned toSleep = minSleep +
     
    158220            }
    159221
    160             if (length!=0) {
     222            if (toWrite>0 && hasPending) {
    161223                waiter.wait();
    162224                if (waiter.failed()) {
     
    171233    }
    172234
    173     printf("%s: written %zu bytes with %zu blocking\n",
    174            prefix, numBytesWritten, numWriteBlocked);
    175     printf("%s: read %zu bytes with %zu blocking\n",
    176            prefix, numBytesRead, numReadBlocked);
    177     printf("%s: slept %zu ms\n", prefix, numMillisSlept);
     235    printf("%s: written %lu bytes with %lu blocking\n",
     236           prefix,
     237           static_cast<unsigned long>(numBytesWritten),
     238           static_cast<unsigned long>(numWriteBlocked));
     239    printf("%s: read %lu bytes with %lu blocking\n",
     240           prefix,
     241           static_cast<unsigned long>(numBytesRead),
     242           static_cast<unsigned long>(numReadBlocked));
     243    printf("%s: slept %lu ms\n", prefix,
     244           static_cast<unsigned long>(numMillisSlept));
    178245}
    179246
     
    219286    LocalAcceptor& acceptor = serverSocket.getAcceptor();
    220287
     288    bool eventFired = false;
    221289    while(!acceptor.accept()) {
     290        eventFired = event.check();
     291        if (eventFired) break;
    222292        if (acceptor.failed()) {
    223             printf("ServerThread::run: acceptor failed...\n");
     293            printf("ServerThread::run: acceptor failed...: %lu\n",
     294                   (unsigned long)acceptor.getErrorCode());
    224295            return;
    225296        }
     
    228299    }
    229300
     301    if (eventFired) {
     302        printf("ServerThread::run: waiting done, quitting\n");
     303        return;
     304    }
     305
    230306    printf("ServerThread::run: waiting done, received connection\n");
    231307    LocalSocket* socket = acceptor.getSocket();
    232308
    233     communicate(socket->getReadingBuffer(), socket->getWritingBuffer(),
     309    communicate(&socket->getReadingBuffer(), &socket->getWritingBuffer(),
    234310                waiter, seed, "ServerThread", &event);
    235311
     
    243319class ClientThread : public Thread
    244320{
     321private:
     322    Waiter waiter;
     323
     324    WaitableEvent event;
     325
    245326public:
     327    ClientThread();
     328
     329    void signal();
     330
    246331    virtual void run();
    247332};
     333
     334//------------------------------------------------------------------------------
     335
     336ClientThread::ClientThread() :
     337    event(&waiter)
     338{
     339}
     340
     341//------------------------------------------------------------------------------
     342
     343void ClientThread::signal()
     344{
     345    event.fire();
     346}
    248347
    249348//------------------------------------------------------------------------------
     
    258357    printf("ClientThread::run: connecting\n");
    259358
    260     Waiter waiter;
    261 
    262359    LocalClientSocket socket("test", &waiter);
    263360
    264     Connector& connector = socket.getConnector();
     361    LocalConnector& connector = socket.getConnector();
    265362
    266363    while(!connector.connect()) {
     
    275372    printf("ClientThread::run: connected\n");
    276373
    277     communicate(socket.getReadingBuffer(), socket.getWritingBuffer(),
    278                 waiter, seed, "ClientThread");
     374    communicate(&socket.getReadingBuffer(), &socket.getWritingBuffer(),
     375                waiter, seed, "ClientThread", &event);
    279376}
    280377
     
    284381int main()
    285382{
     383#if TARGET_API_POSIX
    286384    signal(SIGPIPE, SIG_IGN);
     385#endif
    287386
    288387    ServerThread serverThread;
     
    294393
    295394    Thread::sleep(60000);
    296     serverThread.signal();
     395    //Thread::sleep(5000);
     396    printf("Signalling the server thread\n");
     397    //serverThread.signal();
     398    clientThread.signal();
    297399    printf("Signalled the server thread\n");
    298400
Note: See TracChangeset for help on using the changeset viewer.