# X-Plane Remote Access client module #------------------------------------------------------------------------------- import os import struct import array #------------------------------------------------------------------------------- COMMAND_GET_SINGLE = 0x01 COMMAND_SET_SINGLE = 0x02 COMMAND_GET_MULTI = 0x03 COMMAND_SET_MULTI = 0x04 COMMAND_REGISTER_GET_MULTI = 0x11 COMMAND_UNREGISTER_GET_MULTI = 0x12 COMMAND_EXECUTE_GET_MULTI = 0x13 COMMAND_REGISTER_SET_MULTI = 0x21 COMMAND_UNREGISTER_SET_MULTI = 0x22 COMMAND_EXECUTE_SET_MULTI = 0x23 COMMAND_GET_VERSIONS = 0x31 COMMAND_RELOAD_PLUGINS = 0x32 COMMAND_SHOW_MESSAGE = 0x41 COMMAND_REGISTER_HOTKEYS = 0x51 COMMAND_QUERY_HOTKEYS = 0x52 COMMAND_UNREGISTER_HOTKEYS = 0x53 TYPE_INT = 0x01 TYPE_FLOAT = 0x02 TYPE_DOUBLE = 0x03 TYPE_FLOAT_ARRAY = 0x11 TYPE_INT_ARRAY = 0x12 TYPE_BYTE_ARRAY = 0x13 RESULT_OK = 0x00 RESULT_INVALID_COMMAND = 0x01 RESULT_UNKNOWN_DATAREF = 0x02 RESULT_INVALID_TYPE = 0x03 RESULT_INVALID_LENGTH = 0x04 RESULT_INVALID_OFFSET = 0x05 RESULT_INVALID_COUNT = 0x06 RESULT_INVALID_ID = 0x07 RESULT_INVALID_DURATION = 0x08 RESULT_OTHER_ERROR = 0xff HOTKEY_MODIFIER_SHIFT = 0x0100 HOTKEY_MODIFIER_CONTROL = 0x0200 #------------------------------------------------------------------------------- class ProtocolException(Exception): """Exception to signify protocol errors.""" _message = { RESULT_INVALID_COMMAND : "invalid command", RESULT_UNKNOWN_DATAREF : "unknown dataref", RESULT_INVALID_TYPE : "invalid type", RESULT_INVALID_LENGTH : "invalid length", RESULT_INVALID_OFFSET : "invalid offset", RESULT_INVALID_COUNT : "invalid count", RESULT_INVALID_ID : "invalid ID", RESULT_INVALID_DURATION : "invalid duration", RESULT_OTHER_ERROR : "other error" } @staticmethod def getMessage(resultCode): """Get the message for the given result code.""" if resultCode in ProtocolException._message: return ProtocolException._message[resultCode] else: return "unknown error code " + `resultCode` def __init__(self, resultCode, parameter = None): message = "xplra.ProtocolException: " + self.getMessage(resultCode) if parameter is not None: if resultCode==RESULT_UNKNOWN_DATAREF: message += " (# %d)" % (parameter,) super(ProtocolException, self).__init__(message) self.resultCode = resultCode self.parameter = parameter #------------------------------------------------------------------------------- if os.name=="nt": import win32file import io class Win32NamedPipe(io.RawIOBase): """A stream object to represent a Win32 named pipe.""" def __init__(self, name): """Construct the pipe with the given name.""" self._handle = win32file.CreateFile(name, win32file.GENERIC_READ | win32file.GENERIC_WRITE, 0, None, win32file.OPEN_EXISTING, 0, None) def close(self): """Close the pipe, if not closed already.""" if self._handle is not None: win32file.CloseHandle(self._handle) self._handle = None @property def closed(self): """Determine if the pipe is closed or not.""" return self._handle is None def fileno(self): """Get the file number, which is impossible.""" raise IOError("Win32NamedPipe.fileno: not supported") def flush(self): """Flush the stream.""" pass def isatty(self): """Determine if the stream is a terminal, which it is not.""" return False def read(self, n = -1): """Read at most the given number of bytes from the stream.""" if n==-1: return self.readall() else: (error, data) = win32file.ReadFile(self._handle, n) return data def readable(self): """Determine if the stream is readable, which it is.""" return self._handle is not None def readall(self): """Read all bytes from the stream, which is not supported.""" raise IOError("Win32NamedPipe.readall: not supported") def readinto(self, buffer): """Read into the given buffer.""" (error, data) = win32file.ReadFile(self._handle, len(buffer)) length = len(data) buffer[:length] = data return length def readline(self, limit = -1): """Read a line, which is currently not supported.""" raise IOError("Win32NamedPipe.readline: not supported") def readlines(self, hint = -1): """Read lines, which is currently not supported.""" raise IOError("Win32NamedPipe.readlines: not supported") def seek(self, offset, whence = io.SEEK_SET): """Seek in the stream, which is not supported.""" raise IOError("Win32NamedPipe.seek: not supported") def seekable(self): """Determine if the stream is seekable, which it is not.""" return False def tell(self): """Get the current position, which is not supported.""" raise IOError("Win32NamedPipe.tell: not supported") def truncate(self, size = None): """Truncate the stream, which is not supported.""" raise IOError("Win32NamedPipe.truncate: not supported") def writable(self): """Determine if the stream is writable, which it is.""" return self._handle is not None def write(self, buffer): """Write the given buffer into the stream.""" (error, written) = win32file.WriteFile(self._handle, buffer.tobytes()) return written def writelines(self, lines): """Write the given lines, which is not supported.""" raise IOError("Win32NamedPipe.writelines: not supported") #------------------------------------------------------------------------------- class XPlane(object): """The main class representing the connection to X-Plane.""" @staticmethod def _packLength(x): """Pack the given integer as a variable-length value.""" s = "" while True: y = x&0x7f x >>= 7 if x>0: y |= 0x80 s += struct.pack("B", y) if x==0: return s @staticmethod def _packString(s): """Pack the given string.""" return XPlane._packLength(len(s)) + s def __init__(self): """Construct the object.""" self._stream = None self._multiBuffers = [] def connect(self): """Try to connect to X-Plane.""" if self._stream is not None: return if os.name=="nt": pipe = Win32NamedPipe(r'\\.\pipe\\xplra') self._stream = io.BufferedRWPair(pipe, pipe) else: import socket s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect("/tmp/xplra-" + os.environ["LOGNAME"]) self._stream = s.makefile() for multiBuffer in self._multiBuffers: multiBuffer._reregister() @property def isConnected(self): """Determine if we are connected to the simulator.""" return self._stream is not None def createMultiGetter(self): """Create a new multi-dataref getter for this X-Plane object.""" multiGetter = MultiGetter(self) self._multiBuffers.append(multiGetter) return multiGetter def createMultiSetter(self): """Create a new multi-dataref setter for this X-Plane object.""" multiSetter = MultiSetter(self) self._multiBuffers.append(multiSetter) return multiSetter def destroyMultiBuffer(self, multiBuffer): """Destroy the given multi-dataref buffer. It actually removes it from the list of multiple buffers and unregisters it safely. Returns the result of the safe unregistration if the buffer was found, None otherwise. """ if multiBuffer in self._multiBuffers: self._multiBuffers.remove(multiBuffer) return multiBuffer.unregisterSafely() else: return None def getVersions(self): """Get the versions of X-Plane, XPLM and XPLRA as a tuple.""" self._writeU8(COMMAND_GET_VERSIONS) self._flush() self._checkResult() return (self._readS32(), self._readS32(), self._readS32()) def reloadPlugins(self): """Reload the plugins in X-Plane. After this, this connection becomes invalid.""" self._writeU8(COMMAND_RELOAD_PLUGINS) self._flush() self._checkResult(); def getInt(self, name): """Get the value of the integer dataref with the given name.""" return self._getSingle(name, TYPE_INT) def getFloat(self, name): """Get the value of the float dataref with the given name.""" return self._getSingle(name, TYPE_FLOAT) def getDouble(self, name): """Get the value of the double dataref with the given name.""" return self._getSingle(name, TYPE_DOUBLE) def getFloatArray(self, name, length = -1, offset = 0): """Get the value of the float array dataref with the given name.""" return self._getSingle(name, TYPE_FLOAT_ARRAY, length, offset) def getIntArray(self, name, length = -1, offset = 0): """Get the value of the integer array dataref with the given name.""" return self._getSingle(name, TYPE_INT_ARRAY, length, offset) def getByteArray(self, name, length = -1, offset = 0): """Get the value of the byte array dataref with the given name.""" return self._getSingle(name, TYPE_BYTE_ARRAY, length, offset) def getString(self, name, offset = 0): """Get the value of the byte array dataref with the given name as a string.""" s = "" for b in self.getByteArray(name, offset = offset): if b==0: break s += chr(b) return s def setInt(self, name, value): """Set the value of the integer dataref with the given name.""" self._setSingle(name, TYPE_INT, value) def setFloat(self, name, value): """Set the value of the float dataref with the given name.""" self._setSingle(name, TYPE_FLOAT, value) def setDouble(self, name, value): """Set the value of the double dataref with the given name.""" self._setSingle(name, TYPE_DOUBLE, value) def setFloatArray(self, name, value, offset = 0): """Set the value of the float array dataref with the given name.""" self._setSingle(name, TYPE_FLOAT_ARRAY, value, offset = offset) def setIntArray(self, name, value, offset = 0): """Set the value of the integer array dataref with the given name.""" self._setSingle(name, TYPE_INT_ARRAY, value, offset = offset) def setByteArray(self, name, value, offset = 0): """Set the value of the byte array dataref with the given name.""" self._setSingle(name, TYPE_BYTE_ARRAY, value, offset = offset) def setString(self, name, value, length, offset = 0): """Set the value of the byte array dataref with the given name from a string. The string will be padded with 0's so that its length is the given one.""" value = [ord(c) for c in value[:length]] value += [0] * (length - len(value)) self.setByteArray(name, value, offset) def showMessage(self, message, duration): """Show a message in the simulator window for the given duration. The duration is a floating-point number denoting seconds.""" self._writeU8(COMMAND_SHOW_MESSAGE) self._writeString(message) self._writeFloat(duration) self._flush() self._checkResult() def registerHotkeys(self, hotkeyCodes): """Register the given hotkey codes for watching.""" self._writeU8(COMMAND_REGISTER_HOTKEYS) self._writeU32(len(hotkeyCodes)) for hotkeyCode in hotkeyCodes: self._writeU16(hotkeyCode) self._flush() self._checkResult() def queryHotkeys(self): """Query the state of the hotkeys registered previously""" self._writeU8(COMMAND_QUERY_HOTKEYS) self._flush() self._checkResult() length = self._readU32() states = [] for i in range(0, length): states.append(self._readU8()!=0) return states def unregisterHotkeys(self): """Unregister the previously registered hotkeys.""" self._writeU8(COMMAND_UNREGISTER_HOTKEYS) self._flush() self._checkResult() def disconnect(self): """Disconnect from X-Plane.""" if self._stream is not None: try: self._stream.close() finally: self._stream = None def _checkResult(self, resultCode = None, parameter = None, multi = False): """Check the given result code. If it is None, it will be read first. If it is not RESULT_OK, a ProtocolException is thrown.""" if resultCode is None: resultCode = self._readU8() if multi and resultCode==RESULT_UNKNOWN_DATAREF: parameter = self._readU32() if resultCode!=RESULT_OK: raise ProtocolException(resultCode, parameter) def _getSingle(self, name, type, length = None, offset = None): """Get the single value of the given name and type.""" self._writeU8(COMMAND_GET_SINGLE) self._writeString(name) self._writeU8(type) if length is not None and offset is not None: self._writeS32(length) self._writeS32(offset) self._flush() self._checkResult() return self._readValue(type) def _readValue(self, type): """Read the value of the given type from the stream.""" if type==TYPE_INT: return self._readS32() elif type==TYPE_FLOAT: return self._readFloat() elif type==TYPE_DOUBLE: return self._readDouble() else: length = self._readS32() arr = None elementSize = 4 if type==TYPE_FLOAT_ARRAY: arr = array.array("f") elif type==TYPE_INT_ARRAY: arr = array.array("i") elif type==TYPE_BYTE_ARRAY: arr = array.array("B") elementSize = 1 if arr is not None and length>0: arr.fromstring(self._stream.read(length*elementSize)) return None if arr is None else arr.tolist() def _setSingle(self, name, type, value, offset = None): """Set the single value of the given name and type.""" self._writeU8(COMMAND_SET_SINGLE) self._writeString(name) self._writeU8(type) if offset is not None: self._writeS32(len(value)) self._writeS32(offset) self._writeValue(type, value) self._flush() self._checkResult() def _writeValue(self, type, value): """Write a value of the given type.""" if type==TYPE_INT: self._writeS32(int(value)) elif type==TYPE_FLOAT: self._writeFloat(float(value)) elif type==TYPE_DOUBLE: self._writeDouble(float(value)) else: arr = None if type==TYPE_FLOAT_ARRAY: arr = array.array("f") elif type==TYPE_INT_ARRAY: arr = array.array("i") elif type==TYPE_BYTE_ARRAY: arr = array.array("B") arr.fromlist(value) self._stream.write(arr.tostring()) def _writeU8(self, x): """Write the given value as an unsigned, 8-bit value.""" self._stream.write(struct.pack("B", x)) def _writeU16(self, x): """Write the given value as an unsigned, 16-bit value.""" self._stream.write(struct.pack("H", x)) def _writeS32(self, x): """Write the given value as a signed, 32-bit value.""" self._stream.write(struct.pack("i", x)) def _writeU32(self, x): """Write the given value as an unsigned, 32-bit value.""" self._stream.write(struct.pack("I", x)) def _writeFloat(self, x): """Write the given value as a single-precision floating point.""" self._stream.write(struct.pack("f", x)) def _writeDouble(self, x): """Write the given value as a double-precision floating point.""" self._stream.write(struct.pack("d", x)) def _writeLength(self, length): """Write the given value is a variable-length value into our stream.""" self._stream.write(self._packLength(length)) def _writeString(self, str): """Write the given string into the stream.""" self._stream.write(self._packString(str)) def _flush(self): """Flush our stream.""" self._stream.flush() def _readU8(self): """Read an unsigned, 8-bit value from the stream.""" (value,) = struct.unpack("B", self._stream.read(1)) return value def _readS32(self): """Read a signed, 32-bit value from the stream.""" (value,) = struct.unpack("i", self._stream.read(4)) return value def _readU32(self): """Read an unsigned, 32-bit value from the stream.""" (value,) = struct.unpack("I", self._stream.read(4)) return value def _readFloat(self): """Read a single-precision floating point value from the stream.""" (value,) = struct.unpack("f", self._stream.read(4)) return value def _readDouble(self): """Read a double-precision floating point value from the stream.""" (value,) = struct.unpack("d", self._stream.read(8)) return value def _readLength(self): """Read a variable-length value from our stream.""" length = 0 while True: (x,) = struct.unpack("B", self._stream.read(1)) length <<= 7 length |= x&0x7f if x&0x80==0: return length def _readString(self): """Read a string from our stream.""" length = self._readLength() return self._stream.read(length) #------------------------------------------------------------------------------- class MultiBuffer(object): """Buffer for querying or setting multi-dataref values.""" @staticmethod def _getDefault(type, length): """Get the default value for the given type.""" if type==TYPE_INT: return 0 elif type==TYPE_FLOAT: return float(0.0) elif type==TYPE_DOUBLE: return 0.0 elif length<=0: return [] elif type==TYPE_FLOAT_ARRAY: return [float(0.0)] * length elif type==TYPE_INT_ARRAY or type==TYPE_BYTE_ARRAY: return [0] * length def __init__(self, xplane, registerCommand, unregisterCommand): """Construct the buffer for the given XPlane instance and with the given register/unregister command values.""" self._xplane = xplane self._registerCommand = registerCommand self._unregisterCommand = unregisterCommand self._dataRefs = [] self._values = None self._registeredID = None @property def values(self): """Query the values as a list.""" if self._values is None: self.finalize() return self._values def addInt(self, name): """Add an integer to the buffer with the given name Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_INT) def addFloat(self, name): """Add a float to the buffer with the given name Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_FLOAT) def addDouble(self, name): """Add a double to the buffer with the given name Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_DOUBLE) def addFloatArray(self, name, length = -1, offset = 0): """Add a floating point array to the buffer with the given name. Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_FLOAT_ARRAY, length, offset) def addIntArray(self, name, length = -1, offset = 0): """Add an integer array to the buffer with the given name. Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_INT_ARRAY, length, offset) def addByteArray(self, name, length = -1, offset = 0): """Add a byte array to the buffer with the given name. Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_BYTE_ARRAY, length, offset) def finalize(self): """Finalize the buffer, if not finalized yet. It initializes the array of values with some defaults. Returns whether there is any data in the buffer.""" if self._values is None: self._values = [self._getDefault(type, length) for (_, type, length, _) in self._dataRefs] return self._values!=[] def register(self): """Register the buffer in X-Plane.""" if self.finalize() and self._registeredID is None: self._writeSpec(self._registerCommand) self._registeredID = self._xplane._readU32() def unregister(self): """Unregister the buffer from X-Plane.""" if self._registeredID is not None: self._xplane._writeU8(self._unregisterCommand) self._xplane._writeU32(self._registeredID) self._xplane._flush() self._xplane._checkResult() self._registeredID = None def unregisterSafely(self): """Unregister the buffer from X-Plane, ignoring exceptions. Returns True if the unregistration succeeded, False otherwise. """ try: self.unregister() return True except: return False finally: self._registeredID = None def execute(self): """Perform the querying or the setting of the values. It first checks if the buffer is finalized. If not, it will be finalized. However, if it is not finalized but also registered, it will first be unregistered and the re-registered after finalizing.""" if self._values is None and self._registeredID is not None: self.unregister() self.register() else: self.finalize() if self._registeredID is None: self._executeUnregistered() else: self._executeRegistered() def getString(self, id): """Get the value of the dataref with the given ID as a string. The dataref should be of type byte array.""" if self._dataRefs[id][1]!=TYPE_BYTE_ARRAY: raise TypeError("xplra.MultiBuffer.getString: only byte arrays can be converted to strings") if self._values is None: self.finalize() s = "" for c in self._values[id]: if c==0: break s += chr(c) return s def _reregister(self): """Re-register the buffer in X-Plane, if it has been registered earlier If it has not been registered, nothing is done. Otherwise the buffer gets registered, and the old ID is forgotten. This function is meant to be used by the XPlane object when it creates a new connection. If the registration fails, the original ID is restored, so the _reregister() could be called again """ if self._registeredID is not None: origRegisteredID = self._registeredID try: self._registeredID = None self.register() except: self._registeredID = origRegisteredID raise def _add(self, name, type, length = None, offset = None): """Add a scalar to the buffer with the given name and type""" index = len(self._dataRefs) self._values = None self._dataRefs.append( (name, type, length, offset) ) return index def _writeSpec(self, command): """Write the specification preceded by the given command and check for the result. The specification is basically the list of the datarefs.""" self._xplane._writeU8(command) self._xplane._writeU32(len(self._dataRefs)) for (name, type, length, offset) in self._dataRefs: self._xplane._writeString(name) self._xplane._writeU8(type) if length is not None and offset is not None: self._xplane._writeS32(length) self._xplane._writeS32(offset) self._xplane._flush() self._xplane._checkResult(multi = True) def __len__(self): """Get the number of value items in the buffer.""" if self._values is None: self.finalize() return len(self._values) def __getitem__(self, id): """Get the item with the given ID.""" if self._values is None: self.finalize() return self._values[id] def __setitem__(self, id, value): """Set the item with the given ID.""" if self._values is None: self.finalize() type = self._dataRefs[id][1] length = self._dataRefs[id][2] if type==TYPE_INT: self._values[id] = int(value) elif type==TYPE_FLOAT or type==TYPE_DOUBLE: self._values[id] = float(value) elif type==TYPE_FLOAT_ARRAY: if len(value)!=length: raise ValueError("xplra.MultiBuffer: expected a list of length %d" % (length,)) self._values[id] = [float(x) for x in value] elif type==TYPE_INT_ARRAY: if len(value)!=length: raise ValueError("xplra.MultiBuffer: expected a list of length %d" % (length,)) self._values[id] = [int(x) for x in value] elif type==TYPE_BYTE_ARRAY: if isinstance(value, str): lst = [ord(x) for x in value[:length]] if len(lst)