Changeset 31:bbd688924703 in xplcommon


Ignore:
Timestamp:
01/02/13 13:58:25 (12 years ago)
Author:
István Váradi <ivaradi@…>
Branch:
default
Phase:
public
Message:

Implemented the data stream

Files:
2 added
4 edited

Legend:

Unmodified
Added
Removed
  • src/xplcommon/BlockingStream.cc

    r30 r31  
    4646//------------------------------------------------------------------------------
    4747
    48 bool BlockingStream::isInterrupted()
     48inline bool BlockingStream::checkInterrupted()
    4949{
    50     if (failed()) return false;
    51 
    5250    if (!interrupted) {
    5351        interrupted = event.check();
     
    8381//------------------------------------------------------------------------------
    8482
     83bool BlockingStream::skip(size_t length)
     84{
     85    ReadingBuffer& readingBuffer = stream.getReadingBuffer();
     86    while (length>0) {
     87        size_t toSkip = min(length, readingBuffer.getLength() - readingOffset);
     88        readingOffset += toSkip;
     89        length -= toSkip;
     90
     91        if (length!=0) {
     92            if (!fill()) return false;
     93        }
     94    }
     95
     96    return true;
     97}
     98
     99//------------------------------------------------------------------------------
     100
    85101bool BlockingStream::write(const void* src, size_t length)
    86102{
     
    105121{
    106122    WritingBuffer& writingBuffer = stream.getWritingBuffer();
    107     while (!failed()) {
    108         if (isInterrupted()) return false;
     123    while (*this) {
     124        if (checkInterrupted()) return false;
    109125
    110126        if (writingBuffer.write()) {
     
    112128        } else if (writingBuffer.failed()) {
    113129            setErrorCode(writingBuffer.getErrorCode());
    114             return false;
    115130        } else {
    116131            Waiter* waiter = stream.getWaiter();
     
    132147    readingBuffer.reset();
    133148    readingOffset = 0;
    134     while (true) {
    135         if (isInterrupted()) return false;
     149    while (*this) {
     150        if (checkInterrupted()) return false;
    136151
    137152        if (readingBuffer.read()) {
    138             return !readingBuffer.isEmpty();
     153            eof = readingBuffer.isEmpty();
     154            return !eof;
    139155        } else if (readingBuffer.failed()) {
    140156            setErrorCode(readingBuffer.getErrorCode());
    141             return false;
    142157        } else {
    143158            Waiter* waiter = stream.getWaiter();
     
    149164        }
    150165    }
     166    return false;
    151167}
    152168
  • src/xplcommon/BlockingStream.h

    r30 r31  
    3535
    3636#include "BufferedStream.h"
     37
    3738#include "Waiter.h"
    3839#include "WaitableEvent.h"
     
    7677    size_t readingOffset;
    7778
     79    /**
     80     * Indicate if the end-of-file has been reached while reading.
     81     */
     82    bool eof;
     83
    7884public:
    7985    /**
     
    8389     */
    8490    BlockingStream(BufferedStream& stream);
     91
     92    /**
     93     * Determine if the stream has neither failed nor been
     94     * interrupted.
     95     */
     96    operator bool() const;
    8597
    8698    /**
     
    102114     */
    103115    bool read(void* dest, size_t length);
     116
     117    /**
     118     * Skip the given number of bytes.
     119     */
     120    bool skip(size_t length);
    104121
    105122    /**
     
    125142     */
    126143    bool fill();
     144
     145    /**
     146     * Check for the stream being interrupted.
     147     *
     148     * @return if the stream is interrupted, false otherwise
     149     */
     150    bool checkInterrupted();
    127151};
    128152
     
    135159    event(stream.getWaiter()),
    136160    interrupted(false),
    137     readingOffset(0)
     161    readingOffset(0),
     162    eof(false)
    138163{
     164}
     165
     166//------------------------------------------------------------------------------
     167
     168inline BlockingStream::operator bool() const
     169{
     170    return !failed() && !interrupted && !eof;
    139171}
    140172
     
    144176{
    145177    event.fire();
     178}
     179
     180//------------------------------------------------------------------------------
     181
     182inline bool BlockingStream::isInterrupted()
     183{
     184    return interrupted;
    146185}
    147186
  • src/xplcommon/Makefile.am

    r30 r31  
    1111libxplcommon_la_SOURCES= \
    1212        PseudoRandom.cc         \
    13         BlockingStream.cc
     13        BlockingStream.cc       \
     14        DataStream.cc
    1415
    1516if TARGET_API_POSIX
     
    3940        LocalConnector.h        \
    4041        LocalClientSocket.h     \
    41         BlockingStream.h
     42        BlockingStream.h        \
     43        DataStream.h
  • test/testblkstream.cc

    r30 r31  
    3737#include <xplcommon/LocalClientSocket.h>
    3838#include <xplcommon/LocalConnector.h>
    39 #include <xplcommon/BlockingStream.h>
     39#include <xplcommon/DataStream.h>
    4040#include <xplcommon/WaitableEvent.h>
    4141#include <xplcommon/PseudoRandom.h>
     
    5959using xplcommon::LocalConnector;
    6060using xplcommon::BufferedStream;
    61 using xplcommon::BlockingStream;
     61using xplcommon::DataStream;
    6262using xplcommon::PseudoRandom;
    6363
     
    6969{
    7070private:
    71     BlockingStream* dataStream;
     71    static const uint8_t OP_ADD = 1;
     72
     73    static const uint8_t OP_SUB = 2;
     74
     75    static const uint8_t OP_MUL = 3;
     76
     77    static const uint8_t OP_DIV = 4;
     78
     79private:
     80    static int32_t perform(int32_t value, uint8_t operation, int32_t x);
     81
     82    DataStream* dataStream;
    7283
    7384protected:
     
    8394//------------------------------------------------------------------------------
    8495
     96inline int32_t TestThread::perform(int32_t value, uint8_t operation, int32_t x)
     97{
     98    switch (operation) {
     99      case OP_ADD:
     100        return value + x;
     101      case OP_SUB:
     102        return value - x;
     103      case OP_MUL:
     104        return value * x;
     105      case OP_DIV:
     106        return value / x;
     107      default:
     108        return value;
     109    }
     110}
     111
     112//------------------------------------------------------------------------------
     113
    85114inline TestThread::TestThread() :
    86115    dataStream(0)
     
    99128void TestThread::communicate(BufferedStream& stream, unsigned seed, bool isClient)
    100129{
    101     static const size_t bufferSize = 1024;
    102 
    103     static const size_t minToWrite = 16;
    104     static const size_t maxToWrite = 6124;
    105 
    106     static const size_t reportingInterval = 100000000;
     130    static const size_t reportInterval = 100000;
    107131
    108132    const char* prefix = isClient ? "Client" : "Server";
    109133
    110     dataStream = new BlockingStream(stream);
     134    dataStream = new DataStream(stream);
    111135    PseudoRandom random(seed);
    112136
    113 
    114     size_t numBytesRead = 0;
    115     size_t nextReportRead = reportingInterval;
    116     size_t numBytesWritten = 0;
    117     size_t nextReportWritten = reportingInterval;
    118 
    119137    bool first = true;
    120138
    121     unsigned char* buffer = new unsigned char[bufferSize];
    122 
    123     while(true) {
    124         if (isClient || !first) {
    125             unsigned dataSize = random.nextUnsigned(maxToWrite, minToWrite);
    126             *reinterpret_cast<uint32_t*>(buffer) = dataSize;
    127             while(dataSize>0) {
    128                 unsigned toWrite = min(static_cast<size_t>(dataSize),
    129                                        bufferSize);
    130                 if (!dataStream->write(buffer, toWrite)) break;
    131                 dataSize -= toWrite;
    132                 numBytesWritten += toWrite;
    133                 if (numBytesWritten>=nextReportWritten) {
    134                     printf("%s: written %lu bytes\n", prefix,
    135                            static_cast<unsigned long>(numBytesWritten));
    136                     nextReportWritten += reportingInterval;
     139    size_t numRequests = 0;
     140    size_t nextReportRequests = reportInterval;
     141
     142    int32_t value = 0;
     143    if (isClient) {
     144        value = random.next();
     145        dataStream->writeS32(value);
     146    } else {
     147        value = dataStream->readS32();
     148    }
     149
     150    int32_t newValue = value;
     151
     152    while(*dataStream) {
     153        if (!first || isClient) {
     154            unsigned numOperations = random.nextUnsigned(20, 1);
     155            dataStream->writeU16(numOperations);
     156            //printf("%s: numOperations=%u\n", prefix, numOperations);
     157            for(unsigned i = 0; i<numOperations; ++i) {
     158                double r = random.nextDouble();
     159                uint8_t operation =
     160                    (r<0.4) ? OP_ADD : ((r<0.8) ? OP_SUB :
     161                                        ((r<0.98) ? OP_MUL : OP_DIV));
     162                int32_t x = random.next();
     163
     164                dataStream->writeU8(operation);
     165                dataStream->writeS32(x);
     166
     167                newValue = perform(newValue, operation, x);
     168            }
     169            if (!dataStream->flush()) break;
     170
     171            if (isClient) {
     172                value = newValue;
     173                ++numRequests;
     174                if (numRequests>=nextReportRequests) {
     175                    printf("%s: after %lu requests, value=%d\n",
     176                           prefix, static_cast<unsigned long>(numRequests),
     177                           value);
     178                    nextReportRequests += reportInterval;
    137179                }
    138180            }
    139             if (!dataStream->flush()) break;
    140         }
    141 
    142         uint32_t dataSize = 0;
    143         if (!dataStream->read(&dataSize, sizeof(dataSize))) break;
    144         assert(dataSize>=minToWrite);
    145         assert(dataSize<=maxToWrite);
    146         dataSize -= sizeof(dataSize);
    147         numBytesRead += 4;
    148         while(dataSize>0) {
    149             unsigned toRead = min(static_cast<size_t>(dataSize),
    150                                   bufferSize);
    151             if (!dataStream->read(buffer, toRead)) break;
    152             dataSize -= toRead;
    153             numBytesRead += toRead;
    154             if (numBytesRead>=nextReportRead) {
    155                 printf("%s: read %lu bytes\n", prefix,
    156                        static_cast<unsigned long>(numBytesRead));
    157                 nextReportRead += reportingInterval;
     181        }
     182
     183        unsigned numOperations = dataStream->readU16();
     184        for (unsigned i = 0; *dataStream && i<numOperations; ++i) {
     185            uint8_t operation = dataStream->readU8();
     186            int32_t x = dataStream->readS32();
     187            newValue = perform(newValue, operation, x);
     188        }
     189
     190        if (*dataStream) {
     191            if (!isClient) {
     192                value = newValue;
     193                ++numRequests;
     194                if (numRequests>=nextReportRequests) {
     195                    printf("%s: after %lu requests, value=%d\n",
     196                           prefix, static_cast<unsigned long>(numRequests),
     197                           value);
     198                    nextReportRequests += reportInterval;
     199                }
    158200            }
    159201        }
    160202
     203
    161204        first = false;
    162205    }
    163 
    164     delete [] buffer;
    165206
    166207    if (dataStream->failed()) {
     
    171212    }
    172213
    173     printf("%s: written %lu bytes and read %lu bytes\n", prefix,
    174            static_cast<unsigned long>(numBytesWritten),
    175            static_cast<unsigned long>(numBytesRead));
     214    printf("%s: value=%d, %lu requests\n", prefix, value,
     215           static_cast<unsigned long>(numRequests));
    176216}
    177217
     
    275315
    276316    Thread::sleep(60000);
    277     //Thread::sleep(5000);
    278     printf("Signalling the server thread\n");
    279     serverThread.interrupt();
    280     //clientThread.interrupt();
    281     printf("Signalled the server thread\n");
     317    //Thread::sleep(1000);
     318    printf("Signalling the client thread\n");
     319    //serverThread.interrupt();
     320    clientThread.interrupt();
     321    printf("Signalled the client thread\n");
    282322
    283323    printf("Waiting for the client thread\n");
Note: See TracChangeset for help on using the changeset viewer.