# X-Plane Remote Access client module #------------------------------------------------------------------------------- import os import struct import array #------------------------------------------------------------------------------- ## @package xplra # # Python client module for the X-Plane Remote Access plugin #------------------------------------------------------------------------------- ## Protocol command: query the value of a single dataref COMMAND_GET_SINGLE = 0x01 ## Protocol command: set the value of a single dataref COMMAND_SET_SINGLE = 0x02 ## Protocol command: query the value of several datarefs COMMAND_GET_MULTI = 0x03 ## Protocol command: set the value of several datarefs COMMAND_SET_MULTI = 0x04 ## Protocol command: register a multi-dataref getter COMMAND_REGISTER_GET_MULTI = 0x11 ## Protocol command: unregister a multi-dataref getter COMMAND_UNREGISTER_GET_MULTI = 0x12 ## Protocol command: execute a registered multi-dataref getter COMMAND_EXECUTE_GET_MULTI = 0x13 ## Protocol command: register a multi-dataref setter COMMAND_REGISTER_SET_MULTI = 0x21 ## Protocol command: unregister a multi-dataref setter COMMAND_UNREGISTER_SET_MULTI = 0x22 ## Protocol command: execute a registered multi-dataref setter COMMAND_EXECUTE_SET_MULTI = 0x23 ## Protocol command: get the versions of X-Plane, XPLM and XPLRA COMMAND_GET_VERSIONS = 0x31 ## Protocol command: reload all plugins COMMAND_RELOAD_PLUGINS = 0x32 ## Protocol command: show a message to the pilot COMMAND_SHOW_MESSAGE = 0x41 ## Protocol command: register hotkeys COMMAND_REGISTER_HOTKEYS = 0x51 ## Protocol command: query the status of registered hotkeys COMMAND_QUERY_HOTKEYS = 0x52 ## Protocol command: unregister hotkeys COMMAND_UNREGISTER_HOTKEYS = 0x53 ## Protocol type constant: integer TYPE_INT = 0x01 ## Protocol type constant: single-precision floating point TYPE_FLOAT = 0x02 ## Protocol type constant: double-precision floating point TYPE_DOUBLE = 0x03 ## Protocol type constant: array of single-precision floating point values TYPE_FLOAT_ARRAY = 0x11 ## Protocol type constant: array of integers TYPE_INT_ARRAY = 0x12 ## Protocol type constant: array of bytes TYPE_BYTE_ARRAY = 0x13 ## Protocol result: OK RESULT_OK = 0x00 ## Protocol result: an invalid command was sent RESULT_INVALID_COMMAND = 0x01 ## Protocol result: an unknown dataref was attempted to query or set RESULT_UNKNOWN_DATAREF = 0x02 ## Protocol result: invalid type RESULT_INVALID_TYPE = 0x03 ## Protocol result: invalid length RESULT_INVALID_LENGTH = 0x04 ## Protocol result: invalid offset RESULT_INVALID_OFFSET = 0x05 ## Protocol result: invalid count RESULT_INVALID_COUNT = 0x06 ## Protocol result: invalid ID RESULT_INVALID_ID = 0x07 ## Protocol result: invalid duration RESULT_INVALID_DURATION = 0x08 ## Protocol result: other error RESULT_OTHER_ERROR = 0xff ## Hotkey modifier: Shift HOTKEY_MODIFIER_SHIFT = 0x0100 ## Hotkey modifier: Control HOTKEY_MODIFIER_CONTROL = 0x0200 #------------------------------------------------------------------------------- class ProtocolException(Exception): """Exception to signify protocol errors.""" ## mapping from result codes to their string representation _message = { RESULT_INVALID_COMMAND : "invalid command", RESULT_UNKNOWN_DATAREF : "unknown dataref", RESULT_INVALID_TYPE : "invalid type", RESULT_INVALID_LENGTH : "invalid length", RESULT_INVALID_OFFSET : "invalid offset", RESULT_INVALID_COUNT : "invalid count", RESULT_INVALID_ID : "invalid ID", RESULT_INVALID_DURATION : "invalid duration", RESULT_OTHER_ERROR : "other error" } ## @var resultCode # the result code, one of the RESULT_XXX constants ## @var parameter # an optional parameter for the result @staticmethod def getMessage(resultCode): """Get the message for the given result code.""" if resultCode in ProtocolException._message: return ProtocolException._message[resultCode] else: return "unknown error code " + `resultCode` def __init__(self, resultCode, parameter = None): """Construct the exception.""" message = "xplra.ProtocolException: " + self.getMessage(resultCode) if parameter is not None: if resultCode==RESULT_UNKNOWN_DATAREF: message += " (# %d)" % (parameter,) super(ProtocolException, self).__init__(message) self.resultCode = resultCode self.parameter = parameter #------------------------------------------------------------------------------- if os.name=="nt": import win32file import io class Win32NamedPipe(io.RawIOBase): """A stream object to represent a Win32 named pipe.""" ## @var _handle # the Windows file handle for the pipe def __init__(self, name): """Construct the pipe with the given name.""" self._handle = win32file.CreateFile(name, win32file.GENERIC_READ | win32file.GENERIC_WRITE, 0, None, win32file.OPEN_EXISTING, 0, None) def close(self): """Close the pipe, if not closed already.""" if self._handle is not None: win32file.CloseHandle(self._handle) self._handle = None @property def closed(self): """Determine if the pipe is closed or not.""" return self._handle is None def fileno(self): """Get the file number, which is impossible.""" raise IOError("Win32NamedPipe.fileno: not supported") def flush(self): """Flush the stream.""" pass def isatty(self): """Determine if the stream is a terminal, which it is not.""" return False def read(self, n = -1): """Read at most the given number of bytes from the stream.""" if n==-1: return self.readall() else: (error, data) = win32file.ReadFile(self._handle, n) return data def readable(self): """Determine if the stream is readable, which it is.""" return self._handle is not None def readall(self): """Read all bytes from the stream, which is not supported.""" raise IOError("Win32NamedPipe.readall: not supported") def readinto(self, buffer): """Read into the given buffer.""" (error, data) = win32file.ReadFile(self._handle, len(buffer)) length = len(data) buffer[:length] = data return length def readline(self, limit = -1): """Read a line, which is currently not supported.""" raise IOError("Win32NamedPipe.readline: not supported") def readlines(self, hint = -1): """Read lines, which is currently not supported.""" raise IOError("Win32NamedPipe.readlines: not supported") def seek(self, offset, whence = io.SEEK_SET): """Seek in the stream, which is not supported.""" raise IOError("Win32NamedPipe.seek: not supported") def seekable(self): """Determine if the stream is seekable, which it is not.""" return False def tell(self): """Get the current position, which is not supported.""" raise IOError("Win32NamedPipe.tell: not supported") def truncate(self, size = None): """Truncate the stream, which is not supported.""" raise IOError("Win32NamedPipe.truncate: not supported") def writable(self): """Determine if the stream is writable, which it is.""" return self._handle is not None def write(self, buffer): """Write the given buffer into the stream.""" (error, written) = win32file.WriteFile(self._handle, buffer.tobytes()) return written def writelines(self, lines): """Write the given lines, which is not supported.""" raise IOError("Win32NamedPipe.writelines: not supported") #------------------------------------------------------------------------------- class XPlane(object): """The main class representing the connection to X-Plane.""" ## @var _stream # the data stream used to communicate with the plugin ## @var _multiBuffers # the list of multi-dataref buffers belonging to this object @staticmethod def _packLength(x): """Pack the given integer as a variable-length value.""" s = "" while True: y = x&0x7f x >>= 7 if x>0: y |= 0x80 s += struct.pack("B", y) if x==0: return s @staticmethod def _packString(s): """Pack the given string.""" return XPlane._packLength(len(s)) + s def __init__(self): """Construct the object.""" self._stream = None self._multiBuffers = [] def connect(self): """Try to connect to X-Plane.""" if self._stream is not None: return if os.name=="nt": pipe = Win32NamedPipe(r'\\.\pipe\\xplra') self._stream = io.BufferedRWPair(pipe, pipe) else: import socket s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect("/tmp/xplra-" + os.environ["LOGNAME"]) self._stream = s.makefile() for multiBuffer in self._multiBuffers: multiBuffer._reregister() @property def isConnected(self): """Determine if we are connected to the simulator.""" return self._stream is not None def createMultiGetter(self): """Create a new multi-dataref getter for this X-Plane object.""" multiGetter = MultiGetter(self) self._multiBuffers.append(multiGetter) return multiGetter def createMultiSetter(self): """Create a new multi-dataref setter for this X-Plane object.""" multiSetter = MultiSetter(self) self._multiBuffers.append(multiSetter) return multiSetter def destroyMultiBuffer(self, multiBuffer): """Destroy the given multi-dataref buffer. It actually removes it from the list of multiple buffers and unregisters it safely. Returns the result of the safe unregistration if the buffer was found, None otherwise. """ if multiBuffer in self._multiBuffers: self._multiBuffers.remove(multiBuffer) return multiBuffer.unregisterSafely() else: return None def getVersions(self): """Get the versions of X-Plane, XPLM and XPLRA as a tuple.""" self._writeU8(COMMAND_GET_VERSIONS) self._flush() self._checkResult() return (self._readS32(), self._readS32(), self._readS32()) def reloadPlugins(self): """Reload the plugins in X-Plane. After this, this connection becomes invalid.""" self._writeU8(COMMAND_RELOAD_PLUGINS) self._flush() self._checkResult(); def getInt(self, name): """Get the value of the integer dataref with the given name.""" return self._getSingle(name, TYPE_INT) def getFloat(self, name): """Get the value of the float dataref with the given name.""" return self._getSingle(name, TYPE_FLOAT) def getDouble(self, name): """Get the value of the double dataref with the given name.""" return self._getSingle(name, TYPE_DOUBLE) def getFloatArray(self, name, length = -1, offset = 0): """Get the value of the float array dataref with the given name.""" return self._getSingle(name, TYPE_FLOAT_ARRAY, length, offset) def getIntArray(self, name, length = -1, offset = 0): """Get the value of the integer array dataref with the given name.""" return self._getSingle(name, TYPE_INT_ARRAY, length, offset) def getByteArray(self, name, length = -1, offset = 0): """Get the value of the byte array dataref with the given name.""" return self._getSingle(name, TYPE_BYTE_ARRAY, length, offset) def getString(self, name, offset = 0): """Get the value of the byte array dataref with the given name as a string.""" s = "" for b in self.getByteArray(name, offset = offset): if b==0: break s += chr(b) return s def setInt(self, name, value): """Set the value of the integer dataref with the given name.""" self._setSingle(name, TYPE_INT, value) def setFloat(self, name, value): """Set the value of the float dataref with the given name.""" self._setSingle(name, TYPE_FLOAT, value) def setDouble(self, name, value): """Set the value of the double dataref with the given name.""" self._setSingle(name, TYPE_DOUBLE, value) def setFloatArray(self, name, value, offset = 0): """Set the value of the float array dataref with the given name.""" self._setSingle(name, TYPE_FLOAT_ARRAY, value, offset = offset) def setIntArray(self, name, value, offset = 0): """Set the value of the integer array dataref with the given name.""" self._setSingle(name, TYPE_INT_ARRAY, value, offset = offset) def setByteArray(self, name, value, offset = 0): """Set the value of the byte array dataref with the given name.""" self._setSingle(name, TYPE_BYTE_ARRAY, value, offset = offset) def setString(self, name, value, length, offset = 0): """Set the value of the byte array dataref with the given name from a string. The string will be padded with 0's so that its length is the given one.""" value = [ord(c) for c in value[:length]] value += [0] * (length - len(value)) self.setByteArray(name, value, offset) def showMessage(self, message, duration): """Show a message in the simulator window for the given duration. The duration is a floating-point number denoting seconds.""" self._writeU8(COMMAND_SHOW_MESSAGE) self._writeString(message) self._writeFloat(duration) self._flush() self._checkResult() def registerHotkeys(self, hotkeyCodes): """Register the given hotkey codes for watching.""" self._writeU8(COMMAND_REGISTER_HOTKEYS) self._writeU32(len(hotkeyCodes)) for hotkeyCode in hotkeyCodes: self._writeU16(hotkeyCode) self._flush() self._checkResult() def queryHotkeys(self): """Query the state of the hotkeys registered previously""" self._writeU8(COMMAND_QUERY_HOTKEYS) self._flush() self._checkResult() length = self._readU32() states = [] for i in range(0, length): states.append(self._readU8()!=0) return states def unregisterHotkeys(self): """Unregister the previously registered hotkeys.""" self._writeU8(COMMAND_UNREGISTER_HOTKEYS) self._flush() self._checkResult() def disconnect(self): """Disconnect from X-Plane.""" if self._stream is not None: try: self._stream.close() finally: self._stream = None def _checkResult(self, resultCode = None, parameter = None, multi = False): """Check the given result code. If it is None, it will be read first. If it is not RESULT_OK, a ProtocolException is thrown.""" if resultCode is None: resultCode = self._readU8() if multi and resultCode==RESULT_UNKNOWN_DATAREF: parameter = self._readU32() if resultCode!=RESULT_OK: raise ProtocolException(resultCode, parameter) def _getSingle(self, name, type, length = None, offset = None): """Get the single value of the given name and type.""" self._writeU8(COMMAND_GET_SINGLE) self._writeString(name) self._writeU8(type) if length is not None and offset is not None: self._writeS32(length) self._writeS32(offset) self._flush() self._checkResult() return self._readValue(type) def _readValue(self, type): """Read the value of the given type from the stream.""" if type==TYPE_INT: return self._readS32() elif type==TYPE_FLOAT: return self._readFloat() elif type==TYPE_DOUBLE: return self._readDouble() else: length = self._readS32() arr = None elementSize = 4 if type==TYPE_FLOAT_ARRAY: arr = array.array("f") elif type==TYPE_INT_ARRAY: arr = array.array("i") elif type==TYPE_BYTE_ARRAY: arr = array.array("B") elementSize = 1 if arr is not None and length>0: arr.fromstring(self._stream.read(length*elementSize)) return None if arr is None else arr.tolist() def _setSingle(self, name, type, value, offset = None): """Set the single value of the given name and type.""" self._writeU8(COMMAND_SET_SINGLE) self._writeString(name) self._writeU8(type) if offset is not None: self._writeS32(len(value)) self._writeS32(offset) self._writeValue(type, value) self._flush() self._checkResult() def _writeValue(self, type, value): """Write a value of the given type.""" if type==TYPE_INT: self._writeS32(int(value)) elif type==TYPE_FLOAT: self._writeFloat(float(value)) elif type==TYPE_DOUBLE: self._writeDouble(float(value)) else: arr = None if type==TYPE_FLOAT_ARRAY: arr = array.array("f") elif type==TYPE_INT_ARRAY: arr = array.array("i") elif type==TYPE_BYTE_ARRAY: arr = array.array("B") arr.fromlist(value) self._stream.write(arr.tostring()) def _writeU8(self, x): """Write the given value as an unsigned, 8-bit value.""" self._stream.write(struct.pack("B", x)) def _writeU16(self, x): """Write the given value as an unsigned, 16-bit value.""" self._stream.write(struct.pack("H", x)) def _writeS32(self, x): """Write the given value as a signed, 32-bit value.""" self._stream.write(struct.pack("i", x)) def _writeU32(self, x): """Write the given value as an unsigned, 32-bit value.""" self._stream.write(struct.pack("I", x)) def _writeFloat(self, x): """Write the given value as a single-precision floating point.""" self._stream.write(struct.pack("f", x)) def _writeDouble(self, x): """Write the given value as a double-precision floating point.""" self._stream.write(struct.pack("d", x)) def _writeLength(self, length): """Write the given value is a variable-length value into our stream.""" self._stream.write(self._packLength(length)) def _writeString(self, str): """Write the given string into the stream.""" self._stream.write(self._packString(str)) def _flush(self): """Flush our stream.""" self._stream.flush() def _readU8(self): """Read an unsigned, 8-bit value from the stream.""" (value,) = struct.unpack("B", self._stream.read(1)) return value def _readS32(self): """Read a signed, 32-bit value from the stream.""" (value,) = struct.unpack("i", self._stream.read(4)) return value def _readU32(self): """Read an unsigned, 32-bit value from the stream.""" (value,) = struct.unpack("I", self._stream.read(4)) return value def _readFloat(self): """Read a single-precision floating point value from the stream.""" (value,) = struct.unpack("f", self._stream.read(4)) return value def _readDouble(self): """Read a double-precision floating point value from the stream.""" (value,) = struct.unpack("d", self._stream.read(8)) return value def _readLength(self): """Read a variable-length value from our stream.""" length = 0 while True: (x,) = struct.unpack("B", self._stream.read(1)) length <<= 7 length |= x&0x7f if x&0x80==0: return length def _readString(self): """Read a string from our stream.""" length = self._readLength() return self._stream.read(length) #------------------------------------------------------------------------------- class MultiBuffer(object): """Buffer for querying or setting multi-dataref values.""" ## @var _xplane # the \ref XPlane object this buffer belongs to ## @var _registerCommand # the command used to perform the registration of this buffer # (\ref COMMAND_REGISTER_GET_MULTI or \ref COMMAND_REGISTER_SET_MULTI) ## @var _unregisterCommand # the command used to perform the registration of this buffer # (\ref COMMAND_UNREGISTER_GET_MULTI or \ref COMMAND_UNREGISTER_SET_MULTI) ## @var _dataRefs # the datarefs belonging to the buffer ## @var _values # the value of the datarefs before setting or after querying ## @var _registeredID # the ID with which the buffer is registered in X-Plane @staticmethod def _getDefault(type, length): """Get the default value for the given type.""" if type==TYPE_INT: return 0 elif type==TYPE_FLOAT: return float(0.0) elif type==TYPE_DOUBLE: return 0.0 elif length<=0: return [] elif type==TYPE_FLOAT_ARRAY: return [float(0.0)] * length elif type==TYPE_INT_ARRAY or type==TYPE_BYTE_ARRAY: return [0] * length def __init__(self, xplane, registerCommand, unregisterCommand): """Construct the buffer for the given XPlane instance and with the given register/unregister command values.""" self._xplane = xplane self._registerCommand = registerCommand self._unregisterCommand = unregisterCommand self._dataRefs = [] self._values = None self._registeredID = None @property def values(self): """Query the values as a list.""" if self._values is None: self.finalize() return self._values def addInt(self, name): """Add an integer to the buffer with the given name Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_INT) def addFloat(self, name): """Add a float to the buffer with the given name Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_FLOAT) def addDouble(self, name): """Add a double to the buffer with the given name Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_DOUBLE) def addFloatArray(self, name, length = -1, offset = 0): """Add a floating point array to the buffer with the given name. Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_FLOAT_ARRAY, length, offset) def addIntArray(self, name, length = -1, offset = 0): """Add an integer array to the buffer with the given name. Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_INT_ARRAY, length, offset) def addByteArray(self, name, length = -1, offset = 0): """Add a byte array to the buffer with the given name. Returns an ID (or index) of the dataref.""" return self._add(name, TYPE_BYTE_ARRAY, length, offset) def finalize(self): """Finalize the buffer, if not finalized yet. It initializes the array of values with some defaults. Returns whether there is any data in the buffer.""" if self._values is None: self._values = [self._getDefault(type, length) for (_, type, length, _) in self._dataRefs] return self._values!=[] def register(self): """Register the buffer in X-Plane.""" if self.finalize() and self._registeredID is None: self._writeSpec(self._registerCommand) self._registeredID = self._xplane._readU32() def unregister(self): """Unregister the buffer from X-Plane.""" if self._registeredID is not None: self._xplane._writeU8(self._unregisterCommand) self._xplane._writeU32(self._registeredID) self._xplane._flush() self._xplane._checkResult() self._registeredID = None def unregisterSafely(self): """Unregister the buffer from X-Plane, ignoring exceptions. Returns True if the unregistration succeeded, False otherwise. """ try: self.unregister() return True except: return False finally: self._registeredID = None def execute(self): """Perform the querying or the setting of the values. It first checks if the buffer is finalized. If not, it will be finalized. However, if it is not finalized but also registered, it will first be unregistered and the re-registered after finalizing.""" if self._values is None and self._registeredID is not None: self.unregister() self.register() else: self.finalize() if self._registeredID is None: self._executeUnregistered() else: self._executeRegistered() def getString(self, id): """Get the value of the dataref with the given ID as a string. The dataref should be of type byte array.""" if self._dataRefs[id][1]!=TYPE_BYTE_ARRAY: raise TypeError("xplra.MultiBuffer.getString: only byte arrays can be converted to strings") if self._values is None: self.finalize() s = "" for c in self._values[id]: if c==0: break s += chr(c) return s def _reregister(self): """Re-register the buffer in X-Plane, if it has been registered earlier If it has not been registered, nothing is done. Otherwise the buffer gets registered, and the old ID is forgotten. This function is meant to be used by the XPlane object when it creates a new connection. If the registration fails, the original ID is restored, so the _reregister() could be called again """ if self._registeredID is not None: origRegisteredID = self._registeredID try: self._registeredID = None self.register() except: self._registeredID = origRegisteredID raise def _add(self, name, type, length = None, offset = None): """Add a scalar to the buffer with the given name and type""" index = len(self._dataRefs) self._values = None self._dataRefs.append( (name, type, length, offset) ) return index def _writeSpec(self, command): """Write the specification preceded by the given command and check for the result. The specification is basically the list of the datarefs.""" self._xplane._writeU8(command) self._xplane._writeU32(len(self._dataRefs)) for (name, type, length, offset) in self._dataRefs: self._xplane._writeString(name) self._xplane._writeU8(type) if length is not None and offset is not None: self._xplane._writeS32(length) self._xplane._writeS32(offset) self._xplane._flush() self._xplane._checkResult(multi = True) def __len__(self): """Get the number of value items in the buffer.""" if self._values is None: self.finalize() return len(self._values) def __getitem__(self, id): """Get the item with the given ID.""" if self._values is None: self.finalize() return self._values[id] def __setitem__(self, id, value): """Set the item with the given ID.""" if self._values is None: self.finalize() type = self._dataRefs[id][1] length = self._dataRefs[id][2] if type==TYPE_INT: self._values[id] = int(value) elif type==TYPE_FLOAT or type==TYPE_DOUBLE: self._values[id] = float(value) elif type==TYPE_FLOAT_ARRAY: if len(value)!=length: raise ValueError("xplra.MultiBuffer: expected a list of length %d" % (length,)) self._values[id] = [float(x) for x in value] elif type==TYPE_INT_ARRAY: if len(value)!=length: raise ValueError("xplra.MultiBuffer: expected a list of length %d" % (length,)) self._values[id] = [int(x) for x in value] elif type==TYPE_BYTE_ARRAY: if isinstance(value, str): lst = [ord(x) for x in value[:length]] if len(lst)