Changeset 31:bbd688924703 in xplcommon for test


Ignore:
Timestamp:
01/02/13 13:58:25 (12 years ago)
Author:
István Váradi <ivaradi@…>
Branch:
default
Phase:
public
Message:

Implemented the data stream

File:
1 edited

Legend:

Unmodified
Added
Removed
  • test/testblkstream.cc

    r30 r31  
    3737#include <xplcommon/LocalClientSocket.h>
    3838#include <xplcommon/LocalConnector.h>
    39 #include <xplcommon/BlockingStream.h>
     39#include <xplcommon/DataStream.h>
    4040#include <xplcommon/WaitableEvent.h>
    4141#include <xplcommon/PseudoRandom.h>
     
    5959using xplcommon::LocalConnector;
    6060using xplcommon::BufferedStream;
    61 using xplcommon::BlockingStream;
     61using xplcommon::DataStream;
    6262using xplcommon::PseudoRandom;
    6363
     
    6969{
    7070private:
    71     BlockingStream* dataStream;
     71    static const uint8_t OP_ADD = 1;
     72
     73    static const uint8_t OP_SUB = 2;
     74
     75    static const uint8_t OP_MUL = 3;
     76
     77    static const uint8_t OP_DIV = 4;
     78
     79private:
     80    static int32_t perform(int32_t value, uint8_t operation, int32_t x);
     81
     82    DataStream* dataStream;
    7283
    7384protected:
     
    8394//------------------------------------------------------------------------------
    8495
     96inline int32_t TestThread::perform(int32_t value, uint8_t operation, int32_t x)
     97{
     98    switch (operation) {
     99      case OP_ADD:
     100        return value + x;
     101      case OP_SUB:
     102        return value - x;
     103      case OP_MUL:
     104        return value * x;
     105      case OP_DIV:
     106        return value / x;
     107      default:
     108        return value;
     109    }
     110}
     111
     112//------------------------------------------------------------------------------
     113
    85114inline TestThread::TestThread() :
    86115    dataStream(0)
     
    99128void TestThread::communicate(BufferedStream& stream, unsigned seed, bool isClient)
    100129{
    101     static const size_t bufferSize = 1024;
    102 
    103     static const size_t minToWrite = 16;
    104     static const size_t maxToWrite = 6124;
    105 
    106     static const size_t reportingInterval = 100000000;
     130    static const size_t reportInterval = 100000;
    107131
    108132    const char* prefix = isClient ? "Client" : "Server";
    109133
    110     dataStream = new BlockingStream(stream);
     134    dataStream = new DataStream(stream);
    111135    PseudoRandom random(seed);
    112136
    113 
    114     size_t numBytesRead = 0;
    115     size_t nextReportRead = reportingInterval;
    116     size_t numBytesWritten = 0;
    117     size_t nextReportWritten = reportingInterval;
    118 
    119137    bool first = true;
    120138
    121     unsigned char* buffer = new unsigned char[bufferSize];
    122 
    123     while(true) {
    124         if (isClient || !first) {
    125             unsigned dataSize = random.nextUnsigned(maxToWrite, minToWrite);
    126             *reinterpret_cast<uint32_t*>(buffer) = dataSize;
    127             while(dataSize>0) {
    128                 unsigned toWrite = min(static_cast<size_t>(dataSize),
    129                                        bufferSize);
    130                 if (!dataStream->write(buffer, toWrite)) break;
    131                 dataSize -= toWrite;
    132                 numBytesWritten += toWrite;
    133                 if (numBytesWritten>=nextReportWritten) {
    134                     printf("%s: written %lu bytes\n", prefix,
    135                            static_cast<unsigned long>(numBytesWritten));
    136                     nextReportWritten += reportingInterval;
     139    size_t numRequests = 0;
     140    size_t nextReportRequests = reportInterval;
     141
     142    int32_t value = 0;
     143    if (isClient) {
     144        value = random.next();
     145        dataStream->writeS32(value);
     146    } else {
     147        value = dataStream->readS32();
     148    }
     149
     150    int32_t newValue = value;
     151
     152    while(*dataStream) {
     153        if (!first || isClient) {
     154            unsigned numOperations = random.nextUnsigned(20, 1);
     155            dataStream->writeU16(numOperations);
     156            //printf("%s: numOperations=%u\n", prefix, numOperations);
     157            for(unsigned i = 0; i<numOperations; ++i) {
     158                double r = random.nextDouble();
     159                uint8_t operation =
     160                    (r<0.4) ? OP_ADD : ((r<0.8) ? OP_SUB :
     161                                        ((r<0.98) ? OP_MUL : OP_DIV));
     162                int32_t x = random.next();
     163
     164                dataStream->writeU8(operation);
     165                dataStream->writeS32(x);
     166
     167                newValue = perform(newValue, operation, x);
     168            }
     169            if (!dataStream->flush()) break;
     170
     171            if (isClient) {
     172                value = newValue;
     173                ++numRequests;
     174                if (numRequests>=nextReportRequests) {
     175                    printf("%s: after %lu requests, value=%d\n",
     176                           prefix, static_cast<unsigned long>(numRequests),
     177                           value);
     178                    nextReportRequests += reportInterval;
    137179                }
    138180            }
    139             if (!dataStream->flush()) break;
    140         }
    141 
    142         uint32_t dataSize = 0;
    143         if (!dataStream->read(&dataSize, sizeof(dataSize))) break;
    144         assert(dataSize>=minToWrite);
    145         assert(dataSize<=maxToWrite);
    146         dataSize -= sizeof(dataSize);
    147         numBytesRead += 4;
    148         while(dataSize>0) {
    149             unsigned toRead = min(static_cast<size_t>(dataSize),
    150                                   bufferSize);
    151             if (!dataStream->read(buffer, toRead)) break;
    152             dataSize -= toRead;
    153             numBytesRead += toRead;
    154             if (numBytesRead>=nextReportRead) {
    155                 printf("%s: read %lu bytes\n", prefix,
    156                        static_cast<unsigned long>(numBytesRead));
    157                 nextReportRead += reportingInterval;
     181        }
     182
     183        unsigned numOperations = dataStream->readU16();
     184        for (unsigned i = 0; *dataStream && i<numOperations; ++i) {
     185            uint8_t operation = dataStream->readU8();
     186            int32_t x = dataStream->readS32();
     187            newValue = perform(newValue, operation, x);
     188        }
     189
     190        if (*dataStream) {
     191            if (!isClient) {
     192                value = newValue;
     193                ++numRequests;
     194                if (numRequests>=nextReportRequests) {
     195                    printf("%s: after %lu requests, value=%d\n",
     196                           prefix, static_cast<unsigned long>(numRequests),
     197                           value);
     198                    nextReportRequests += reportInterval;
     199                }
    158200            }
    159201        }
    160202
     203
    161204        first = false;
    162205    }
    163 
    164     delete [] buffer;
    165206
    166207    if (dataStream->failed()) {
     
    171212    }
    172213
    173     printf("%s: written %lu bytes and read %lu bytes\n", prefix,
    174            static_cast<unsigned long>(numBytesWritten),
    175            static_cast<unsigned long>(numBytesRead));
     214    printf("%s: value=%d, %lu requests\n", prefix, value,
     215           static_cast<unsigned long>(numRequests));
    176216}
    177217
     
    275315
    276316    Thread::sleep(60000);
    277     //Thread::sleep(5000);
    278     printf("Signalling the server thread\n");
    279     serverThread.interrupt();
    280     //clientThread.interrupt();
    281     printf("Signalled the server thread\n");
     317    //Thread::sleep(1000);
     318    printf("Signalling the client thread\n");
     319    //serverThread.interrupt();
     320    clientThread.interrupt();
     321    printf("Signalled the client thread\n");
    282322
    283323    printf("Waiting for the client thread\n");
Note: See TracChangeset for help on using the changeset viewer.