#!/usr/bin/env python import sys import cmd import struct import socket import os #------------------------------------------------------------------------------ class CLI(cmd.Cmd): """Simple command-line interface for the X-Plane Remote Access plugin.""" _types = { "i" : 0x01, "f" : 0x02, "d" : 0x03, "fa" : 0x11, "ia" : 0x12, "ba" : 0x13, "s" : 0x13 } @staticmethod def _splitArgs(args): """Split the given argument string into a list of words.""" words = [] word = "" inQuote = False for c in args: if c.isspace() and not inQuote: if word: words.append(word) word = "" elif c=="\"" and (not word or inQuote): inQuote = not inQuote else: word += c if word: words.append(word) return words @staticmethod def _parseGetSpec(words): """Parse a data query specification from the given argument word lisst. If there is some failure, it returns None. Otherwise it returns a tuple of two items: - the parsed data, which is a tuple of these items: - the name of the dataref to query - the type of the dataref to query (it is a valid type, checked by the function) - the length of the dataref to query - the offset to query the dataref from - the remainder of the argument list.""" if len(words)<2: print >> sys.stderr, "Missing parameters" return None nextIndex = 2 name = words[0] type = words[1] if type not in CLI._types: print >> sys.stderr, "Invalid type" return None length = None offset = None if type in ["ia", "fa", "ba", "s"]: if len(words)<4: print >> sys.stderr, "Missing parameters" return None length = int(words[2]) offset = int(words[3]) nextIndex = 4 return ((name, type, length, offset), words[nextIndex:]) @staticmethod def _packLength(x): """Pack the given integer as a variable-length value.""" s = "" while True: y = x&0x7f x >>= 7 if x>0: y |= 0x80 s += struct.pack("B", y) if x==0: return s @staticmethod def _packString(s): """Pack the given string.""" return CLI._packLength(len(s)) + s def __init__(self, stream): """Construct the CLI.""" cmd.Cmd.__init__(self) self._stream = stream self._multiGets = {} self._multiSets = {} self.use_rawinput = True self.intro = "\nX-Plane Remote Access plugin command prompt\n" self.prompt = "XPLRA> " self.daemon = True def default(self, line): """Handle uhandled commands""" if line=="EOF": print return True else: return super(CLI, self).default(line) def do_get(self, args): """Handle the 'get' command""" words = self._splitArgs(args) result = self._parseGetSpec(words) if result is None: return False (name, type, length, offset) = result[0] self._writeU8(0x01) self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._flush() result = self._readU8() if result==0: print self._readValue(type) else: print >> sys.stderr, "Error code:", result def do_set(self, args): """Handle the 'set' command""" words = self._splitArgs(args) if len(words)<3: print >> sys.stderr, "Missing parameters" return False name = words[0] type = words[1] if type not in self._types: print >> sys.stderr, "Invalid type" return False value = words[2] length = None offset = None if len(words)>4: length = int(words[3]) offset = int(words[4]) self._writeU8(0x02) self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._writeValue(type, value, length) self._flush() result = self._readU8() if result!=0: print >> sys.stderr, "Error code:", result def do_multiget(self, args): """Handle the 'multiget' command""" words = self._splitArgs(args) specs = [] while words: result = self._parseGetSpec(words) if result is None: return False else: specs.append(result[0]) words = result[1] if not specs: return False self._writeU8(0x03) self._writeU32(len(specs)) for (name, type, length, offset) in specs: self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._flush() result = self._readU8() if result==0: for (_, type, _, _) in specs: value = self._readValue(type) print value elif result==0x02: index = self._readU32() print >> sys.stderr, "Invalid dataref at #%d" % (index,) else: print >> sys.stderr, "Error code:", result def do_multiset(self, args): """Handle the 'multiset' command""" words = self._splitArgs(args) specs = [] values = [] while words: result = self._parseGetSpec(words) if result is None: return False else: words = result[1] if not words: print >> sys.stderr, "Missing argument" return False specs.append(result[0]) values.append(words[0]) words = words[1:] if not specs: return False self._writeU8(0x04) self._writeU32(len(specs)) for ((name, type, length, offset), value) in zip(specs, values): self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._writeValue(type, value, length) self._flush() result = self._readU8() if result==0x02: index = self._readU32() print >> sys.stderr, "Invalid dataref at #%d" % (index,) elif result!=0: print >> sys.stderr, "Error code:", result def do_reg_multiget(self, args): """Handle the 'reg_multiget' command""" words = self._splitArgs(args) specs = [] while words: result = self._parseGetSpec(words) if result is None: return False else: specs.append(result[0]) words = result[1] if not specs: return False self._writeU8(0x11) self._writeU32(len(specs)) for (name, type, length, offset) in specs: self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._flush() result = self._readU8() if result==0: id = self._readU32() self._multiGets[id] = \ [type for (name, type, length, offset) in specs] print "ID:", id else: print >> sys.stderr, "Error code:", result def do_unreg_multiget(self, args): """Handle the 'unreg_multiget' command""" words = self._splitArgs(args) if len(words)<1: print >> sys.stderr, "Missing parameter" return False id = int(words[0]) self._writeU8(0x12) self._writeU32(id) self._flush() result = self._readU8() if result!=0: print >> sys.stderr, "Error code:", result if id in self._multiGets: del self._multiGets[id] def do_exec_multiget(self, args): """Handle the 'exec_multiget' command""" words = self._splitArgs(args) if len(words)<1: print >> sys.stderr, "Missing parameter" return False id = int(words[0]) if id not in self._multiGets: print >> sys.stderr, "Invalid ID" return False self._writeU8(0x13) self._writeU32(id) self._flush() result = self._readU8() if result==0: for type in self._multiGets[id]: value = self._readValue(type) print value elif result==0x02: index = self._readU32() print >> sys.stderr, "Invalid dataref at #%d" % (index,) else: print >> sys.stderr, "Error code:", result def do_reg_multiset(self, args): """Handle the 'reg_multiset' command""" words = self._splitArgs(args) specs = [] while words: result = self._parseGetSpec(words) if result is None: return False else: specs.append(result[0]) words = result[1] if not specs: return False self._writeU8(0x21) self._writeU32(len(specs)) for (name, type, length, offset) in specs: self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._flush() result = self._readU8() if result==0: id = self._readU32() self._multiSets[id] = \ [(type, length) for (name, type, length, offset) in specs] print "ID:", id else: print >> sys.stderr, "Error code:", result def do_unreg_multiset(self, args): """Handle the 'unreg_multiset' command""" words = self._splitArgs(args) if len(words)<1: print >> sys.stderr, "Missing parameter" return False id = int(words[0]) self._writeU8(0x22) self._writeU32(id) self._flush() result = self._readU8() if result!=0: print >> sys.stderr, "Error code:", result if id in self._multiSets: del self._multiSets[id] def do_exec_multiset(self, args): """Handle the 'exec_multiget' command""" words = self._splitArgs(args) if len(words)<1: print >> sys.stderr, "Missing parameter" return False id = int(words[0]) if id not in self._multiSets: print >> sys.stderr, "Invalid ID" return False words = words[1:] types = self._multiSets[id] if len(words)> sys.stderr, "Missing parameter" return False self._writeU8(0x23) self._writeU32(id) for ((type, length), word) in zip(types, words): self._writeValue(type, word, length) self._flush() result = self._readU8() if result==0x02: index = self._readU32() print >> sys.stderr, "Invalid dataref at #%d" % (index,) elif result!=0: print >> sys.stderr, "Error code:", result def _writeU8(self, x): """Write the given value as an unsigned, 8-bit value.""" self._stream.write(struct.pack("B", x)) def _writeS32(self, x): """Write the given value as a signed, 32-bit value.""" self._stream.write(struct.pack("i", x)) def _writeU32(self, x): """Write the given value as an unsigned, 32-bit value.""" self._stream.write(struct.pack("I", x)) def _writeFloat(self, x): """Write the given value as a single-precision floating point.""" self._stream.write(struct.pack("f", x)) def _writeDouble(self, x): """Write the given value as a double-precision floating point.""" self._stream.write(struct.pack("d", x)) def _writeLength(self, length): """Write the given value is a variable-length value into our stream.""" self._stream.write(self._packLength(length)) def _writeString(self, str): """Write the given string into the stream.""" self._stream.write(self._packString(str)) def _flush(self): """Flush our stream.""" self._stream.flush() def _readU8(self): """Read an unsigned, 8-bit value from the stream.""" (value,) = struct.unpack("B", self._stream.read(1)) return value def _readS32(self): """Read a signed, 32-bit value from the stream.""" (value,) = struct.unpack("i", self._stream.read(4)) return value def _readU32(self): """Read an unsigned, 32-bit value from the stream.""" (value,) = struct.unpack("I", self._stream.read(4)) return value def _readFloat(self): """Read a single-precision floating point value from the stream.""" (value,) = struct.unpack("f", self._stream.read(4)) return value def _readDouble(self): """Read a double-precision floating point value from the stream.""" (value,) = struct.unpack("d", self._stream.read(8)) return value def _readLength(self): """Read a variable-length value from our stream.""" length = 0 while True: (x,) = struct.unpack("B", self._stream.read(1)) length <<= 7 length |= x&0x7f if x&0x80==0: return length def _readString(self): """Read a string from our stream.""" length = self._readLength() return self._stream.read(length) def _readValue(self, type): """Read the value of the given type from the stream.""" if type=="i": return self._readS32() elif type=="f": return self._readFloat() elif type=="d": return self._readDouble() elif type in ["fa", "ia", "ba", "s"]: length = self._readS32() if length>8192: print "Very big length:", length return None elif length>0: if type=="fa": return [self._readFloat() for i in range(0, length)] elif type=="ia": return [self._readS32() for i in range(0, length)] elif type=="ba" or type=="s": bytes = [self._readU8() for i in range(0, length)] if type=="ba": return bytes else: string = "" for b in bytes: if b==0: break string += chr(b) return string elif length==0: return [] else: return None def _writeValue(self, type, value, length = None): """Write the given value of the given type to the stream.""" if type=="i": self._writeS32(int(value)) elif type=="f": self._writeFloat(float(value)) elif type=="d": self._writeDouble(float(value)) elif type in ["fa", "ia", "ba", "s"]: if type=="s": values = [ord(c) for c in value] else: words = [word.strip() for word in value.split(",")] values = [float(word) if type=="fa" else int(word) for word in words] if len(values)length: values = values[:length] for value in values: if type=="fa": self._writeFloat(value) elif type=="ia": self._writeS32(value) else: self._writeU8(value) #------------------------------------------------------------------------------ if __name__ == "__main__": s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect("/tmp/xplra-" + os.environ["LOGNAME"]) CLI(s.makefile()).cmdloop()