#!/usr/bin/env python import sys import cmd import struct import socket import os #------------------------------------------------------------------------------ class CLI(cmd.Cmd): """Simple command-line interface for the X-Plane Remote Access plugin.""" _types = { "i" : 0x01, "f" : 0x02, "d" : 0x03, "fa" : 0x11, "ia" : 0x12, "ba" : 0x13 } @staticmethod def _packLength(x): """Pack the given integer as a variable-length value.""" s = "" while True: y = x&0x7f x >>= 7 if x>0: y |= 0x80 s += struct.pack("B", y) if x==0: return s @staticmethod def _packString(s): """Pack the given string.""" return CLI._packLength(len(s)) + s def __init__(self, stream): """Construct the CLI.""" cmd.Cmd.__init__(self) self._stream = stream self.use_rawinput = True self.intro = "\nX-Plane Remote Access plugin command prompt\n" self.prompt = "XPLRA> " self.daemon = True def default(self, line): """Handle uhandled commands""" if line=="EOF": print return True else: return super(CLI, self).default(line) def do_get(self, args): """Handle the 'get' command""" words = args.split() if len(words)<2: print >> sys.stderr, "Missing parameters" return False name = words[0] type = words[1] if type not in self._types: print >> sys.stderr, "Invalid type" return False length = None offset = None if len(words)>3: length = int(words[2]) offset = int(words[3]) self._writeU8(0x01) self._writeString(name) self._writeU8(self._types[type]) if length is not None: self._writeS32(length) self._writeS32(offset) self._flush() result = self._readU8() if result==0: value = None if type=="i": value = self._readS32() elif type=="f": value = self._readFloat() elif type=="d": value = self._readDouble() elif type=="fa": length = self._readS32() if length>0: value = [self._readFloat() for i in range(0, length)] elif type=="ia": length = self._readS32() if length>0: value = [self._readS32() for i in range(0, length)] elif type=="ba": length = self._readS32() if length>0: bytes = [self._readU8() for i in range(0, length)] value = "" for b in bytes: if b==0: break value += chr(b) #value = bytes print value else: print "Result code:", result def _writeU8(self, x): """Write the given value as an unsigned, 8-bit value.""" self._stream.write(struct.pack("B", x)) def _writeS32(self, x): """Write the given value as a signed, 32-bit value.""" self._stream.write(struct.pack("i", x)) def _writeLength(self, length): """Write the given value is a variable-length value into our stream.""" self._stream.write(self._packLength(length)) def _writeString(self, str): """Write the given string into the stream.""" self._stream.write(self._packString(str)) def _flush(self): """Flush our stream.""" self._stream.flush() def _readU8(self): """Read an unsigned, 8-bit value from the stream.""" (value,) = struct.unpack("B", self._stream.read(1)) return value def _readS32(self): """Read a signed, 32-bit value from the stream.""" (value,) = struct.unpack("i", self._stream.read(4)) return value def _readFloat(self): """Read a single-precision floating point value from the stream.""" (value,) = struct.unpack("f", self._stream.read(4)) return value def _readDouble(self): """Read a double-precision floating point value from the stream.""" (value,) = struct.unpack("d", self._stream.read(8)) return value def _readLength(self): """Read a variable-length value from our stream.""" length = 0 while True: (x,) = struct.unpack("B", self._stream.read(1)) length <<= 7 length |= x&0x7f if x&0x80==0: return length @staticmethod def _readString(self): """Read a string from our stream.""" length = self._readLength() return self._stream.read(length) #------------------------------------------------------------------------------ if __name__ == "__main__": s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect("/tmp/xplra-" + os.environ["LOGNAME"]) CLI(s.makefile()).cmdloop()