#!/usr/bin/env python3 import sys import cmd import struct import socket import os #------------------------------------------------------------------------------ class CLI(cmd.Cmd): """Simple command-line interface for the X-Plane Remote Access plugin.""" _types = { "i" : 0x01, "f" : 0x02, "d" : 0x03, "fa" : 0x11, "ia" : 0x12, "ba" : 0x13, "s" : 0x13 } @staticmethod def _splitArgs(args): """Split the given argument string into a list of words.""" words = [] word = "" inQuote = False for c in args: if c.isspace() and not inQuote: if word: words.append(word) word = "" elif c=="\"" and (not word or inQuote): inQuote = not inQuote else: word += c if word: words.append(word) return words @staticmethod def _parseGetSpec(words): """Parse a data query specification from the given argument word lisst. If there is some failure, it returns None. Otherwise it returns a tuple of two items: - the parsed data, which is a tuple of these items: - the name of the dataref to query - the type of the dataref to query (it is a valid type, checked by the function) - the length of the dataref to query - the offset to query the dataref from - the remainder of the argument list.""" if len(words)<2: print("Missing parameters", file=sys.stderr) return None nextIndex = 2 name = words[0] type = words[1] if type not in CLI._types: print("Invalid type", file=sys.stderr) return None length = None offset = None if type in ["ia", "fa", "ba", "s"]: if len(words)<4: print("Missing parameters", file=sys.stderr) return None length = int(words[2]) offset = int(words[3]) nextIndex = 4 return ((name, type, length, offset), words[nextIndex:]) @staticmethod def _packLength(x): """Pack the given integer as a variable-length value.""" s = bytes() while True: y = x&0x7f x >>= 7 if x>0: y |= 0x80 s += struct.pack("B", y) if x==0: return s @staticmethod def _packString(s): """Pack the given string.""" return CLI._packLength(len(s)) + bytes(s, "utf-8") def __init__(self, stream): """Construct the CLI.""" cmd.Cmd.__init__(self) self._stream = stream self._multiGets = {} self._multiSets = {} self.use_rawinput = True self.intro = "\nX-Plane Remote Access plugin command prompt\n" self.prompt = "XPLRA> " self.daemon = True def default(self, line): """Handle uhandled commands""" if line=="EOF": print() return True else: return super(CLI, self).default(line) def do_get(self, args): """Handle the 'get' command""" words = self._splitArgs(args) result = self._parseGetSpec(words) if result is None: return False (name, type, length, offset) = result[0] self._writeU8(0x01) self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._flush() result = self._readU8() if result==0: print(self._readValue(type)) else: print("Error code:", result, file=sys.stderr) def do_set(self, args): """Handle the 'set' command""" words = self._splitArgs(args) if len(words)<3: print("Missing parameters", file=sys.stderr) return False name = words[0] type = words[1] if type not in self._types: print("Invalid type", file=sys.stderr) return False value = words[2] length = None offset = None if len(words)>4: length = int(words[3]) offset = int(words[4]) self._writeU8(0x02) self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._writeValue(type, value, length) self._flush() result = self._readU8() if result!=0: print("Error code:", result, file=sys.stderr) def do_multiget(self, args): """Handle the 'multiget' command""" words = self._splitArgs(args) specs = [] while words: result = self._parseGetSpec(words) if result is None: return False else: specs.append(result[0]) words = result[1] if not specs: return False self._writeU8(0x03) self._writeU32(len(specs)) for (name, type, length, offset) in specs: self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._flush() result = self._readU8() if result==0: for (_, type, _, _) in specs: value = self._readValue(type) print(value) elif result==0x02: index = self._readU32() print("Invalid dataref at #%d" % (index,), file=sys.stderr) else: print("Error code:", result, file=sys.stderr) def do_multiset(self, args): """Handle the 'multiset' command""" words = self._splitArgs(args) specs = [] values = [] while words: result = self._parseGetSpec(words) if result is None: return False else: words = result[1] if not words: print("Missing argument", file=sys.stderr) return False specs.append(result[0]) values.append(words[0]) words = words[1:] if not specs: return False self._writeU8(0x04) self._writeU32(len(specs)) for ((name, type, length, offset), value) in zip(specs, values): self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._writeValue(type, value, length) self._flush() result = self._readU8() if result==0x02: index = self._readU32() print("Invalid dataref at #%d" % (index,), file=sys.stderr) elif result!=0: print("Error code:", result, file=sys.stderr) def do_reg_multiget(self, args): """Handle the 'reg_multiget' command""" words = self._splitArgs(args) specs = [] while words: result = self._parseGetSpec(words) if result is None: return False else: specs.append(result[0]) words = result[1] if not specs: return False self._writeU8(0x11) self._writeU32(len(specs)) for (name, type, length, offset) in specs: self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._flush() result = self._readU8() if result==0: id = self._readU32() self._multiGets[id] = \ [type for (name, type, length, offset) in specs] print("ID:", id) else: print("Error code:", result, file=sys.stderr) def do_unreg_multiget(self, args): """Handle the 'unreg_multiget' command""" words = self._splitArgs(args) if len(words)<1: print("Missing parameter", file=sys.stderr) return False id = int(words[0]) self._writeU8(0x12) self._writeU32(id) self._flush() result = self._readU8() if result!=0: print("Error code:", result, file=sys.stderr) if id in self._multiGets: del self._multiGets[id] def do_exec_multiget(self, args): """Handle the 'exec_multiget' command""" words = self._splitArgs(args) if len(words)<1: print("Missing parameter", file=sys.stderr) return False id = int(words[0]) if id not in self._multiGets: print("Invalid ID", file=sys.stderr) return False self._writeU8(0x13) self._writeU32(id) self._flush() result = self._readU8() if result==0: for type in self._multiGets[id]: value = self._readValue(type) print(value) elif result==0x02: index = self._readU32() print("Invalid dataref at #%d" % (index,), file=sys.stderr) else: print("Error code:", result, file=sys.stderr) def do_reg_multiset(self, args): """Handle the 'reg_multiset' command""" words = self._splitArgs(args) specs = [] while words: result = self._parseGetSpec(words) if result is None: return False else: specs.append(result[0]) words = result[1] if not specs: return False self._writeU8(0x21) self._writeU32(len(specs)) for (name, type, length, offset) in specs: self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._flush() result = self._readU8() if result==0: id = self._readU32() self._multiSets[id] = \ [(type, length) for (name, type, length, offset) in specs] print("ID:", id) else: print("Error code:", result, file=sys.stderr) def do_unreg_multiset(self, args): """Handle the 'unreg_multiset' command""" words = self._splitArgs(args) if len(words)<1: print("Missing parameter", file=sys.stderr) return False id = int(words[0]) self._writeU8(0x22) self._writeU32(id) self._flush() result = self._readU8() if result!=0: print("Error code:", result, file=sys.stderr) if id in self._multiSets: del self._multiSets[id] def do_exec_multiset(self, args): """Handle the 'exec_multiget' command""" words = self._splitArgs(args) if len(words)<1: print("Missing parameter", file=sys.stderr) return False id = int(words[0]) if id not in self._multiSets: print("Invalid ID", file=sys.stderr) return False words = words[1:] types = self._multiSets[id] if len(words)8192: print("Very big length:", length) return None elif length>0: if type=="fa": return [self._readFloat() for i in range(0, length)] elif type=="ia": return [self._readS32() for i in range(0, length)] elif type=="ba" or type=="s": bytes = [self._readU8() for i in range(0, length)] if type=="ba": return bytes else: string = "" for b in bytes: if b==0: break string += chr(b) return string elif length==0: return [] else: return None def _writeValue(self, type, value, length = None): """Write the given value of the given type to the stream.""" if type=="i": self._writeS32(int(value)) elif type=="f": self._writeFloat(float(value)) elif type=="d": self._writeDouble(float(value)) elif type in ["fa", "ia", "ba", "s"]: if type=="s": values = [ord(c) for c in value] else: words = [word.strip() for word in value.split(",")] values = [float(word) if type=="fa" else int(word) for word in words] if len(values)length: values = values[:length] for value in values: if type=="fa": self._writeFloat(value) elif type=="ia": self._writeS32(value) else: self._writeU8(value) #------------------------------------------------------------------------------ if __name__ == "__main__": s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect("/tmp/xplra-" + os.environ["LOGNAME"]) CLI(s.makefile("rwb")).cmdloop()