Changeset 8:acc105036a41 in xplra


Ignore:
Timestamp:
01/04/13 14:24:04 (12 years ago)
Author:
István Váradi <ivaradi@…>
Branch:
default
Phase:
public
Message:

Added the registry of multiple-data queries

Files:
5 added
7 edited

Legend:

Unmodified
Added
Removed
  • misc/client.py

    r7 r8  
    3939
    4040    @staticmethod
     41    def _parseGetSpec(words):
     42        """Parse a data query specification from the given argument word lisst.
     43
     44        If there is some failure, it returns None.
     45
     46        Otherwise it returns a tuple of two items:
     47        - the parsed data, which is a tuple of these items:
     48          - the name of the dataref to query
     49          - the type of the dataref to query (it is a valid type, checked by
     50          the function)
     51          - the length of the dataref to query
     52          - the offset to query the dataref from
     53
     54        - the remainder of the argument list."""
     55        if len(words)<2:
     56            print >> sys.stderr, "Missing parameters"
     57            return None
     58
     59        nextIndex = 2
     60
     61        name = words[0]
     62        type = words[1]
     63        if type not in CLI._types:
     64            print >> sys.stderr, "Invalid type"
     65            return None
     66
     67        length = None
     68        offset = None
     69        if type in ["ia", "fa", "ba", "s"]:
     70            if len(words)<4:
     71                print >> sys.stderr, "Missing parameters"
     72                return None
     73            length = int(words[2])
     74            offset = int(words[3])
     75            nextIndex = 4
     76
     77        return ((name, type, length, offset), words[nextIndex:])
     78
     79    @staticmethod
    4180    def _packLength(x):
    4281        """Pack the given integer as a variable-length value."""
     
    6099
    61100        self._stream = stream
     101        self._multiGets = {}
    62102
    63103        self.use_rawinput = True
     
    78118        """Handle the 'get' command"""
    79119        words = self._splitArgs(args)
    80         if len(words)<2:
    81             print >> sys.stderr, "Missing parameters"
    82             return False
    83 
    84         name = words[0]
    85 
    86         type = words[1]
    87         if type not in self._types:
    88             print >> sys.stderr, "Invalid type"
    89             return False
    90 
    91         length = None
    92         offset = None
    93         if len(words)>3:
    94             length = int(words[2])
    95             offset = int(words[3])
     120        result = self._parseGetSpec(words)
     121        if result is None:
     122            return False
     123
     124        (name, type, length, offset) = result[0]
    96125
    97126        self._writeU8(0x01)
     
    107136            print self._readValue(type)
    108137        else:
    109             print "Error code:", result
     138            print >> sys.stderr, "Error code:", result
    110139
    111140    def do_set(self, args):
     
    143172        result = self._readU8()
    144173        if result!=0:
    145             print "Error code:", result
     174            print >> sys.stderr, "Error code:", result
     175
     176    def do_reg_multiget(self, args):
     177        """Handle the 'reg_multiget' command"""
     178        words = self._splitArgs(args)
     179        specs = []
     180        while words:
     181            result = self._parseGetSpec(words)
     182            if result is None:
     183                return False
     184            else:
     185                specs.append(result[0])
     186                words = result[1]
     187
     188        if not specs:
     189            return False
     190
     191        self._writeU8(0x11)
     192        self._writeU32(len(specs))
     193        for (name, type, length, offset) in specs:
     194            print name, type
     195            self._writeString(name)
     196            self._writeU8(self._types[type])
     197            if length is not None:
     198                self._writeS32(length)
     199                self._writeS32(offset)
     200        self._flush()
     201
     202        result = self._readU8()
     203        if result==0:
     204            id = self._readU32()
     205            self._multiGets[id] = \
     206                [type for (name, type, length, offset) in specs]
     207            print "ID:", id
     208        else:
     209            print >> sys.stderr, "Error code:", result
     210
     211    def do_unreg_multiget(self, args):
     212        """Handle the 'unreg_multiget' command"""
     213        words = self._splitArgs(args)
     214        if len(words)<1:
     215            print >> sys.stderr, "Missing parameter"
     216            return False
     217
     218        id = int(words[0])
     219
     220        self._writeU8(0x12)
     221        self._writeU32(id)
     222        self._flush()
     223
     224        result = self._readU8()
     225        if result!=0:
     226            print >> sys.stderr, "Error code:", result
     227
     228        if id in self._multiGets:
     229            del self._multiGets[id]
     230
     231    def do_exec_multiget(self, args):
     232        """Handle the 'unreg_multiget' command"""
     233        words = self._splitArgs(args)
     234        if len(words)<1:
     235            print >> sys.stderr, "Missing parameter"
     236            return False
     237
     238        id = int(words[0])
     239        if id not in self._multiGets:
     240            print >> sys.stderr, "Invalid ID"
     241            return False
     242
     243        self._writeU8(0x13)
     244        self._writeU32(id)
     245        self._flush()
     246
     247        result = self._readU8()
     248        if result==0:
     249            for type in self._multiGets[id]:
     250                value = self._readValue(type)
     251                print value
     252        elif result==0x02:
     253            index = self._readU32()
     254            print >> sys.stderr, "Invalid dataref at #%d" % (index,)
     255        else:
     256            print >> sys.stderr, "Error code:", result
    146257
    147258    def _writeU8(self, x):
     
    153264        self._stream.write(struct.pack("i", x))
    154265
     266    def _writeU32(self, x):
     267        """Write the given value as an unsigned, 32-bit value."""
     268        self._stream.write(struct.pack("I", x))
     269
    155270    def _writeFloat(self, x):
    156271        """Write the given value as a single-precision floating point."""
     
    181296        """Read a signed, 32-bit value from the stream."""
    182297        (value,) = struct.unpack("i", self._stream.read(4))
     298        return value
     299
     300    def _readU32(self):
     301        """Read an unsigned, 32-bit value from the stream."""
     302        (value,) = struct.unpack("I", self._stream.read(4))
    183303        return value
    184304
  • src/xplra/GetDataRefTask.cc

    r7 r8  
    4242
    4343using xplcommon::DataStream;
     44using xplcommon::Util;
    4445
    4546using std::string;
     
    5455    uint8_t type = stream.readU8();
    5556    if (!stream) return 0;
     57
     58    // Util::debug("hu.varadiistvan.xplra.GetDataRefTask::execute: name='%s', type=0x%02x\n",
     59    //             name.c_str(), type);
    5660
    5761    if (type==Protocol::TYPE_INT) {
  • src/xplra/Makefile.am

    r7 r8  
    2424
    2525libxplra_la_SOURCES= \
    26         plugin.cc               \
    27         ListenThread.cc         \
    28         RequestQueue.cc         \
    29         ServerThread.cc         \
    30         TaskRequest.cc          \
    31         DataRefTask.cc          \
    32         GetDataRefTask.cc       \
    33         SetDataRefTask.cc
     26        plugin.cc                       \
     27        ListenThread.cc                 \
     28        RequestQueue.cc                 \
     29        ServerThread.cc                 \
     30        TaskRequest.cc                  \
     31        DataRefTask.cc                  \
     32        GetDataRefTask.cc               \
     33        SetDataRefTask.cc               \
     34        MultiTaskRequest.cc             \
     35        GetMultiDataRefRequest.cc
    3436
    3537libxplra_la_LIBADD=$(LIBXPLCOMMON_LIBS)
    3638
    3739noinst_HEADERS= \
    38         ListenThread.h          \
    39         Request.h               \
    40         RequestQueue.h          \
    41         Protocol.h              \
    42         ServerThread.h          \
    43         Task.h                  \
    44         TaskRequest.h           \
    45         DataRefTask.h           \
    46         GetDataRefTask.h        \
    47         SetDataRefTask.h
     40        ListenThread.h                  \
     41        Request.h                       \
     42        RequestQueue.h                  \
     43        Protocol.h                      \
     44        ServerThread.h                  \
     45        Task.h                          \
     46        TaskRequest.h                   \
     47        DataRefTask.h                   \
     48        GetDataRefTask.h                \
     49        SetDataRefTask.h                \
     50        MultiTaskRequest.h              \
     51        GetMultiDataRefRequest.h
  • src/xplra/Protocol.h

    r7 r8  
    5050     */
    5151    static const uint8_t COMMAND_SET_SINGLE = 0x02;
     52
     53    /**
     54     * Command: register a multiple-data query request
     55     */
     56    static const uint8_t COMMAND_REGISTER_GET_MULTI = 0x11;
     57
     58    /**
     59     * Command: unregister a multiple-data query request
     60     */
     61    static const uint8_t COMMAND_UNREGISTER_GET_MULTI = 0x12;
     62
     63    /**
     64     * Command: execute a registered multiple-data query request
     65     */
     66    static const uint8_t COMMAND_EXECUTE_GET_MULTI = 0x13;
    5267
    5368    /**
     
    112127
    113128    /**
     129     * Result code: invalid count
     130     */
     131    static const uint8_t RESULT_INVALID_COUNT = 0x06;
     132
     133    /**
     134     * Result code: invalid ID
     135     */
     136    static const uint8_t RESULT_INVALID_ID = 0x07;
     137
     138    /**
    114139     * Result code: other error
    115140     */
     
    120145     */
    121146    static const int MAX_LENGTH = 2048;
     147
     148    /**
     149     * The maximal count of requests in a multiple-data query or
     150     * update.
     151     */
     152    static const size_t MAX_MULTI_COUNT = 1024;
    122153};
    123154
  • src/xplra/Request.h

    r2 r8  
    5858    virtual ~Request();
    5959
     60protected:
    6061    /**
    6162     * Execute the request. Make this very fast!
  • src/xplra/ServerThread.cc

    r7 r8  
    3636#include "SetDataRefTask.h"
    3737#include "TaskRequest.h"
     38#include "GetMultiDataRefRequest.h"
    3839
    3940#include <xplcommon/Util.h>
     
    7475    requestQueue(requestQueue),
    7576    bufferedStream(acceptor.getSocket(&waiter)),
    76     stream(*bufferedStream)
     77    stream(*bufferedStream),
     78    nextGetMultiRequestID(1)
    7779{
    7880    instancesMutex.lock();
     
    8688{
    8789    delete bufferedStream;
     90
     91    for(getMultiRequests_t::iterator i = getMultiRequests.begin();
     92        i!=getMultiRequests.end(); ++i)
     93    {
     94        delete i->second;
     95    }
    8896
    8997    instancesMutex.lock();
     
    115123        } else if (command==Protocol::COMMAND_SET_SINGLE) {
    116124            if (!handleSetSingle()) break;
     125        } else if (command==Protocol::COMMAND_REGISTER_GET_MULTI) {
     126            if (!handleRegisterGetMulti()) break;
     127        } else if (command==Protocol::COMMAND_UNREGISTER_GET_MULTI) {
     128            if (!handleUnregisterGetMulti()) break;
     129        } else if (command==Protocol::COMMAND_EXECUTE_GET_MULTI) {
     130            if (!handleExecuteGetMulti()) break;
    117131        } else {
    118132            stream.writeU8(Protocol::RESULT_INVALID_COMMAND);
     
    131145    GetDataRefTask* task = GetDataRefTask::create(result, stream);
    132146
    133     if (!stream) return false;
    134     else if (task==0) {
     147    if (!stream) {
     148        return false;
     149    } else if (task==0) {
    135150        stream.writeU8(result);
    136151        return true;
     
    157172    SetDataRefTask* task = SetDataRefTask::create(result, stream);
    158173
    159     if (!stream) return false;
    160     else if (task==0) {
     174    if (!stream) {
     175        return false;
     176    } else if (task==0) {
    161177        stream.writeU8(result);
    162178        return true;
     
    168184    stream.writeU8(task->isValid() ? Protocol::RESULT_OK :
    169185                   Protocol::RESULT_UNKNOWN_DATAREF);
     186    return true;
     187}
     188
     189//------------------------------------------------------------------------------
     190
     191bool ServerThread::handleRegisterGetMulti()
     192{
     193    uint32_t numTasks = stream.readU32();
     194    if (!stream) {
     195        return false;
     196    } else if (numTasks==0 || numTasks>Protocol::MAX_MULTI_COUNT) {
     197        stream.writeU8(Protocol::RESULT_INVALID_COUNT);
     198        return true;
     199    }
     200
     201    uint8_t result = Protocol::RESULT_OK;
     202    GetMultiDataRefRequest* request =
     203        new GetMultiDataRefRequest(result, numTasks, stream);
     204    if (result!=Protocol::RESULT_OK || !stream) {
     205        delete request;
     206        stream.writeU8(result);
     207        return stream;
     208    }
     209
     210    size_t id = nextGetMultiRequestID++;
     211    getMultiRequests[id] = request;
     212
     213    stream.writeU8(Protocol::RESULT_OK);
     214    stream.writeU32(id);
     215
     216    return true;
     217}
     218
     219//------------------------------------------------------------------------------
     220
     221bool ServerThread::handleUnregisterGetMulti()
     222{
     223    uint32_t id = stream.readU32();
     224    if (!stream) return false;
     225
     226    getMultiRequests_t::iterator i = getMultiRequests.find(id);
     227    if (i==getMultiRequests.end()) {
     228        stream.writeU8(Protocol::RESULT_INVALID_ID);
     229    } else {
     230        GetMultiDataRefRequest* request = i->second;
     231        getMultiRequests.erase(i);
     232        delete request;
     233        stream.writeU8(Protocol::RESULT_OK);
     234    }
     235
     236    return true;
     237}
     238
     239//------------------------------------------------------------------------------
     240
     241bool ServerThread::handleExecuteGetMulti()
     242{
     243    uint32_t id = stream.readU32();
     244    if (!stream) return false;
     245
     246    getMultiRequests_t::iterator i = getMultiRequests.find(id);
     247    if (i==getMultiRequests.end()) {
     248        stream.writeU8(Protocol::RESULT_INVALID_ID);
     249    } else {
     250        GetMultiDataRefRequest* request = i->second;
     251        if (!requestQueue.execute(request)) return false;
     252        request->writeResult(stream);
     253    }
     254
    170255    return true;
    171256}
  • src/xplra/ServerThread.h

    r7 r8  
    4141
    4242#include <set>
     43#include <map>
    4344
    4445//------------------------------------------------------------------------------
     
    4950
    5051class RequestQueue;
     52
     53class GetMultiDataRefRequest;
    5154
    5255//------------------------------------------------------------------------------
     
    6265     */
    6366    typedef std::set<ServerThread*> instances_t;
     67
     68    /**
     69     * Type for the registered multiple-data requests.
     70     */
     71    typedef std::map<size_t, GetMultiDataRefRequest*> getMultiRequests_t;
    6472
    6573    /**
     
    100108    xplcommon::DataStream stream;
    101109
     110    /**
     111     * The ID of the next multiple-data query request.
     112     */
     113    size_t nextGetMultiRequestID;
     114
     115    /**
     116     * The registered multiple-data query requests.
     117     */
     118    getMultiRequests_t getMultiRequests;
     119
    102120public:
    103121    /**
     
    137155     */
    138156    bool handleSetSingle();
     157
     158    /**
     159     * Handle the COMMAND_REGISTER_GET_MULTI command
     160     *
     161     * @return true, if we can continue, false if the thread should quit
     162     */
     163    bool handleRegisterGetMulti();
     164
     165    /**
     166     * Handle the COMMAND_UNREGISTER_GET_MULTI command
     167     *
     168     * @return true, if we can continue, false if the thread should quit
     169     */
     170    bool handleUnregisterGetMulti();
     171
     172    /**
     173     * Handle the COMMAND_EXECUTE_GET_MULTI command
     174     *
     175     * @return true, if we can continue, false if the thread should quit
     176     */
     177    bool handleExecuteGetMulti();
    139178};
    140179
Note: See TracChangeset for help on using the changeset viewer.