Changeset 8:acc105036a41 in xplra
- Timestamp:
- 01/04/13 14:24:04 (12 years ago)
- Branch:
- default
- Phase:
- public
- Files:
-
- 5 added
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
misc/client.py
r7 r8 39 39 40 40 @staticmethod 41 def _parseGetSpec(words): 42 """Parse a data query specification from the given argument word lisst. 43 44 If there is some failure, it returns None. 45 46 Otherwise it returns a tuple of two items: 47 - the parsed data, which is a tuple of these items: 48 - the name of the dataref to query 49 - the type of the dataref to query (it is a valid type, checked by 50 the function) 51 - the length of the dataref to query 52 - the offset to query the dataref from 53 54 - the remainder of the argument list.""" 55 if len(words)<2: 56 print >> sys.stderr, "Missing parameters" 57 return None 58 59 nextIndex = 2 60 61 name = words[0] 62 type = words[1] 63 if type not in CLI._types: 64 print >> sys.stderr, "Invalid type" 65 return None 66 67 length = None 68 offset = None 69 if type in ["ia", "fa", "ba", "s"]: 70 if len(words)<4: 71 print >> sys.stderr, "Missing parameters" 72 return None 73 length = int(words[2]) 74 offset = int(words[3]) 75 nextIndex = 4 76 77 return ((name, type, length, offset), words[nextIndex:]) 78 79 @staticmethod 41 80 def _packLength(x): 42 81 """Pack the given integer as a variable-length value.""" … … 60 99 61 100 self._stream = stream 101 self._multiGets = {} 62 102 63 103 self.use_rawinput = True … … 78 118 """Handle the 'get' command""" 79 119 words = self._splitArgs(args) 80 if len(words)<2: 81 print >> sys.stderr, "Missing parameters" 82 return False 83 84 name = words[0] 85 86 type = words[1] 87 if type not in self._types: 88 print >> sys.stderr, "Invalid type" 89 return False 90 91 length = None 92 offset = None 93 if len(words)>3: 94 length = int(words[2]) 95 offset = int(words[3]) 120 result = self._parseGetSpec(words) 121 if result is None: 122 return False 123 124 (name, type, length, offset) = result[0] 96 125 97 126 self._writeU8(0x01) … … 107 136 print self._readValue(type) 108 137 else: 109 print "Error code:", result138 print >> sys.stderr, "Error code:", result 110 139 111 140 def do_set(self, args): … … 143 172 result = self._readU8() 144 173 if result!=0: 145 print "Error code:", result 174 print >> sys.stderr, "Error code:", result 175 176 def do_reg_multiget(self, args): 177 """Handle the 'reg_multiget' command""" 178 words = self._splitArgs(args) 179 specs = [] 180 while words: 181 result = self._parseGetSpec(words) 182 if result is None: 183 return False 184 else: 185 specs.append(result[0]) 186 words = result[1] 187 188 if not specs: 189 return False 190 191 self._writeU8(0x11) 192 self._writeU32(len(specs)) 193 for (name, type, length, offset) in specs: 194 print name, type 195 self._writeString(name) 196 self._writeU8(self._types[type]) 197 if length is not None: 198 self._writeS32(length) 199 self._writeS32(offset) 200 self._flush() 201 202 result = self._readU8() 203 if result==0: 204 id = self._readU32() 205 self._multiGets[id] = \ 206 [type for (name, type, length, offset) in specs] 207 print "ID:", id 208 else: 209 print >> sys.stderr, "Error code:", result 210 211 def do_unreg_multiget(self, args): 212 """Handle the 'unreg_multiget' command""" 213 words = self._splitArgs(args) 214 if len(words)<1: 215 print >> sys.stderr, "Missing parameter" 216 return False 217 218 id = int(words[0]) 219 220 self._writeU8(0x12) 221 self._writeU32(id) 222 self._flush() 223 224 result = self._readU8() 225 if result!=0: 226 print >> sys.stderr, "Error code:", result 227 228 if id in self._multiGets: 229 del self._multiGets[id] 230 231 def do_exec_multiget(self, args): 232 """Handle the 'unreg_multiget' command""" 233 words = self._splitArgs(args) 234 if len(words)<1: 235 print >> sys.stderr, "Missing parameter" 236 return False 237 238 id = int(words[0]) 239 if id not in self._multiGets: 240 print >> sys.stderr, "Invalid ID" 241 return False 242 243 self._writeU8(0x13) 244 self._writeU32(id) 245 self._flush() 246 247 result = self._readU8() 248 if result==0: 249 for type in self._multiGets[id]: 250 value = self._readValue(type) 251 print value 252 elif result==0x02: 253 index = self._readU32() 254 print >> sys.stderr, "Invalid dataref at #%d" % (index,) 255 else: 256 print >> sys.stderr, "Error code:", result 146 257 147 258 def _writeU8(self, x): … … 153 264 self._stream.write(struct.pack("i", x)) 154 265 266 def _writeU32(self, x): 267 """Write the given value as an unsigned, 32-bit value.""" 268 self._stream.write(struct.pack("I", x)) 269 155 270 def _writeFloat(self, x): 156 271 """Write the given value as a single-precision floating point.""" … … 181 296 """Read a signed, 32-bit value from the stream.""" 182 297 (value,) = struct.unpack("i", self._stream.read(4)) 298 return value 299 300 def _readU32(self): 301 """Read an unsigned, 32-bit value from the stream.""" 302 (value,) = struct.unpack("I", self._stream.read(4)) 183 303 return value 184 304 -
src/xplra/GetDataRefTask.cc
r7 r8 42 42 43 43 using xplcommon::DataStream; 44 using xplcommon::Util; 44 45 45 46 using std::string; … … 54 55 uint8_t type = stream.readU8(); 55 56 if (!stream) return 0; 57 58 // Util::debug("hu.varadiistvan.xplra.GetDataRefTask::execute: name='%s', type=0x%02x\n", 59 // name.c_str(), type); 56 60 57 61 if (type==Protocol::TYPE_INT) { -
src/xplra/Makefile.am
r7 r8 24 24 25 25 libxplra_la_SOURCES= \ 26 plugin.cc \ 27 ListenThread.cc \ 28 RequestQueue.cc \ 29 ServerThread.cc \ 30 TaskRequest.cc \ 31 DataRefTask.cc \ 32 GetDataRefTask.cc \ 33 SetDataRefTask.cc 26 plugin.cc \ 27 ListenThread.cc \ 28 RequestQueue.cc \ 29 ServerThread.cc \ 30 TaskRequest.cc \ 31 DataRefTask.cc \ 32 GetDataRefTask.cc \ 33 SetDataRefTask.cc \ 34 MultiTaskRequest.cc \ 35 GetMultiDataRefRequest.cc 34 36 35 37 libxplra_la_LIBADD=$(LIBXPLCOMMON_LIBS) 36 38 37 39 noinst_HEADERS= \ 38 ListenThread.h \ 39 Request.h \ 40 RequestQueue.h \ 41 Protocol.h \ 42 ServerThread.h \ 43 Task.h \ 44 TaskRequest.h \ 45 DataRefTask.h \ 46 GetDataRefTask.h \ 47 SetDataRefTask.h 40 ListenThread.h \ 41 Request.h \ 42 RequestQueue.h \ 43 Protocol.h \ 44 ServerThread.h \ 45 Task.h \ 46 TaskRequest.h \ 47 DataRefTask.h \ 48 GetDataRefTask.h \ 49 SetDataRefTask.h \ 50 MultiTaskRequest.h \ 51 GetMultiDataRefRequest.h -
src/xplra/Protocol.h
r7 r8 50 50 */ 51 51 static const uint8_t COMMAND_SET_SINGLE = 0x02; 52 53 /** 54 * Command: register a multiple-data query request 55 */ 56 static const uint8_t COMMAND_REGISTER_GET_MULTI = 0x11; 57 58 /** 59 * Command: unregister a multiple-data query request 60 */ 61 static const uint8_t COMMAND_UNREGISTER_GET_MULTI = 0x12; 62 63 /** 64 * Command: execute a registered multiple-data query request 65 */ 66 static const uint8_t COMMAND_EXECUTE_GET_MULTI = 0x13; 52 67 53 68 /** … … 112 127 113 128 /** 129 * Result code: invalid count 130 */ 131 static const uint8_t RESULT_INVALID_COUNT = 0x06; 132 133 /** 134 * Result code: invalid ID 135 */ 136 static const uint8_t RESULT_INVALID_ID = 0x07; 137 138 /** 114 139 * Result code: other error 115 140 */ … … 120 145 */ 121 146 static const int MAX_LENGTH = 2048; 147 148 /** 149 * The maximal count of requests in a multiple-data query or 150 * update. 151 */ 152 static const size_t MAX_MULTI_COUNT = 1024; 122 153 }; 123 154 -
src/xplra/Request.h
r2 r8 58 58 virtual ~Request(); 59 59 60 protected: 60 61 /** 61 62 * Execute the request. Make this very fast! -
src/xplra/ServerThread.cc
r7 r8 36 36 #include "SetDataRefTask.h" 37 37 #include "TaskRequest.h" 38 #include "GetMultiDataRefRequest.h" 38 39 39 40 #include <xplcommon/Util.h> … … 74 75 requestQueue(requestQueue), 75 76 bufferedStream(acceptor.getSocket(&waiter)), 76 stream(*bufferedStream) 77 stream(*bufferedStream), 78 nextGetMultiRequestID(1) 77 79 { 78 80 instancesMutex.lock(); … … 86 88 { 87 89 delete bufferedStream; 90 91 for(getMultiRequests_t::iterator i = getMultiRequests.begin(); 92 i!=getMultiRequests.end(); ++i) 93 { 94 delete i->second; 95 } 88 96 89 97 instancesMutex.lock(); … … 115 123 } else if (command==Protocol::COMMAND_SET_SINGLE) { 116 124 if (!handleSetSingle()) break; 125 } else if (command==Protocol::COMMAND_REGISTER_GET_MULTI) { 126 if (!handleRegisterGetMulti()) break; 127 } else if (command==Protocol::COMMAND_UNREGISTER_GET_MULTI) { 128 if (!handleUnregisterGetMulti()) break; 129 } else if (command==Protocol::COMMAND_EXECUTE_GET_MULTI) { 130 if (!handleExecuteGetMulti()) break; 117 131 } else { 118 132 stream.writeU8(Protocol::RESULT_INVALID_COMMAND); … … 131 145 GetDataRefTask* task = GetDataRefTask::create(result, stream); 132 146 133 if (!stream) return false; 134 else if (task==0) { 147 if (!stream) { 148 return false; 149 } else if (task==0) { 135 150 stream.writeU8(result); 136 151 return true; … … 157 172 SetDataRefTask* task = SetDataRefTask::create(result, stream); 158 173 159 if (!stream) return false; 160 else if (task==0) { 174 if (!stream) { 175 return false; 176 } else if (task==0) { 161 177 stream.writeU8(result); 162 178 return true; … … 168 184 stream.writeU8(task->isValid() ? Protocol::RESULT_OK : 169 185 Protocol::RESULT_UNKNOWN_DATAREF); 186 return true; 187 } 188 189 //------------------------------------------------------------------------------ 190 191 bool ServerThread::handleRegisterGetMulti() 192 { 193 uint32_t numTasks = stream.readU32(); 194 if (!stream) { 195 return false; 196 } else if (numTasks==0 || numTasks>Protocol::MAX_MULTI_COUNT) { 197 stream.writeU8(Protocol::RESULT_INVALID_COUNT); 198 return true; 199 } 200 201 uint8_t result = Protocol::RESULT_OK; 202 GetMultiDataRefRequest* request = 203 new GetMultiDataRefRequest(result, numTasks, stream); 204 if (result!=Protocol::RESULT_OK || !stream) { 205 delete request; 206 stream.writeU8(result); 207 return stream; 208 } 209 210 size_t id = nextGetMultiRequestID++; 211 getMultiRequests[id] = request; 212 213 stream.writeU8(Protocol::RESULT_OK); 214 stream.writeU32(id); 215 216 return true; 217 } 218 219 //------------------------------------------------------------------------------ 220 221 bool ServerThread::handleUnregisterGetMulti() 222 { 223 uint32_t id = stream.readU32(); 224 if (!stream) return false; 225 226 getMultiRequests_t::iterator i = getMultiRequests.find(id); 227 if (i==getMultiRequests.end()) { 228 stream.writeU8(Protocol::RESULT_INVALID_ID); 229 } else { 230 GetMultiDataRefRequest* request = i->second; 231 getMultiRequests.erase(i); 232 delete request; 233 stream.writeU8(Protocol::RESULT_OK); 234 } 235 236 return true; 237 } 238 239 //------------------------------------------------------------------------------ 240 241 bool ServerThread::handleExecuteGetMulti() 242 { 243 uint32_t id = stream.readU32(); 244 if (!stream) return false; 245 246 getMultiRequests_t::iterator i = getMultiRequests.find(id); 247 if (i==getMultiRequests.end()) { 248 stream.writeU8(Protocol::RESULT_INVALID_ID); 249 } else { 250 GetMultiDataRefRequest* request = i->second; 251 if (!requestQueue.execute(request)) return false; 252 request->writeResult(stream); 253 } 254 170 255 return true; 171 256 } -
src/xplra/ServerThread.h
r7 r8 41 41 42 42 #include <set> 43 #include <map> 43 44 44 45 //------------------------------------------------------------------------------ … … 49 50 50 51 class RequestQueue; 52 53 class GetMultiDataRefRequest; 51 54 52 55 //------------------------------------------------------------------------------ … … 62 65 */ 63 66 typedef std::set<ServerThread*> instances_t; 67 68 /** 69 * Type for the registered multiple-data requests. 70 */ 71 typedef std::map<size_t, GetMultiDataRefRequest*> getMultiRequests_t; 64 72 65 73 /** … … 100 108 xplcommon::DataStream stream; 101 109 110 /** 111 * The ID of the next multiple-data query request. 112 */ 113 size_t nextGetMultiRequestID; 114 115 /** 116 * The registered multiple-data query requests. 117 */ 118 getMultiRequests_t getMultiRequests; 119 102 120 public: 103 121 /** … … 137 155 */ 138 156 bool handleSetSingle(); 157 158 /** 159 * Handle the COMMAND_REGISTER_GET_MULTI command 160 * 161 * @return true, if we can continue, false if the thread should quit 162 */ 163 bool handleRegisterGetMulti(); 164 165 /** 166 * Handle the COMMAND_UNREGISTER_GET_MULTI command 167 * 168 * @return true, if we can continue, false if the thread should quit 169 */ 170 bool handleUnregisterGetMulti(); 171 172 /** 173 * Handle the COMMAND_EXECUTE_GET_MULTI command 174 * 175 * @return true, if we can continue, false if the thread should quit 176 */ 177 bool handleExecuteGetMulti(); 139 178 }; 140 179
Note:
See TracChangeset
for help on using the changeset viewer.