# X-Plane Remote Access client module #------------------------------------------------------------------------------- import os import struct import array #------------------------------------------------------------------------------- COMMAND_GET_SINGLE = 0x01 COMMAND_SET_SINGLE = 0x02 COMMAND_GET_MULTI = 0x03 COMMAND_SET_MULTI = 0x04 COMMAND_REGISTER_GET_MULTI = 0x11 COMMAND_UNREGISTER_GET_MULTI = 0x12 COMMAND_EXECUTE_GET_MULTI = 0x13 COMMAND_REGISTER_SET_MULTI = 0x21 COMMAND_UNREGISTER_SET_MULTI = 0x22 COMMAND_EXECUTE_SET_MULTI = 0x23 TYPE_INT = 0x01 TYPE_FLOAT = 0x02 TYPE_DOUBLE = 0x03 TYPE_FLOAT_ARRAY = 0x11 TYPE_INT_ARRAY = 0x12 TYPE_BYTE_ARRAY = 0x13 RESULT_OK = 0x00 RESULT_INVALID_COMMAND = 0x01 RESULT_UNKNOWN_DATAREF = 0x02 RESULT_INVALID_TYPE = 0x03 RESULT_INVALID_LENGTH = 0x04 RESULT_INVALID_OFFSET = 0x05 RESULT_INVALID_COUNT = 0x06 RESULT_INVALID_ID = 0x07 RESULT_OTHER_ERROR = 0xff #------------------------------------------------------------------------------- class ProtocolException(Exception): """Exception to signify protocol errors.""" _message = { RESULT_INVALID_COMMAND : "invalid command", RESULT_UNKNOWN_DATAREF : "unknown dataref", RESULT_INVALID_TYPE : "invalid type", RESULT_INVALID_LENGTH : "invalid length", RESULT_INVALID_OFFSET : "invalid offset", RESULT_INVALID_COUNT : "invalid count", RESULT_INVALID_ID : "invalid ID", RESULT_OTHER_ERROR : "other error" } @staticmethod def getMessage(resultCode): """Get the message for the given result code.""" if resultCode in ProtocolException._message: return ProtocolException._message[resultCode] else: return "unknown error code " + `resultCode` def __init__(self, resultCode): super(ProtocolException, self).__init__("xplra.ProtocolException: " + self.getMessage(resultCode)) self.resultCode = resultCode #------------------------------------------------------------------------------- class XPlane(object): """The main class representing the connection to X-Plane.""" @staticmethod def _packLength(x): """Pack the given integer as a variable-length value.""" s = "" while True: y = x&0x7f x >>= 7 if x>0: y |= 0x80 s += struct.pack("B", y) if x==0: return s @staticmethod def _packString(s): """Pack the given string.""" return XPlane._packLength(len(s)) + s def __init__(self): """Construct the object.""" self._stream = None def connect(self): """Try to connect to X-Plane.""" if self._stream is not None: return if os.name=="nt": self._stream = open(r'\\.\pipe\\xplra', "b") else: import socket s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect("/tmp/xplra-" + os.environ["LOGNAME"]) self._stream = s.makefile() @property def isConnected(self): """Determine if we are connected to the simulator.""" return self._stream is not None def getInt(self, name): """Get the value of the integer dataref with the given name.""" return self._getSingle(name, TYPE_INT) def getFloat(self, name): """Get the value of the float dataref with the given name.""" return self._getSingle(name, TYPE_FLOAT) def getDouble(self, name): """Get the value of the double dataref with the given name.""" return self._getSingle(name, TYPE_DOUBLE) def getFloatArray(self, name, length = -1, offset = 0): """Get the value of the float array dataref with the given name.""" return self._getSingle(name, TYPE_FLOAT_ARRAY, length, offset) def getIntArray(self, name, length = -1, offset = 0): """Get the value of the integer array dataref with the given name.""" return self._getSingle(name, TYPE_INT_ARRAY, length, offset) def getByteArray(self, name, length = -1, offset = 0): """Get the value of the byte array dataref with the given name.""" return self._getSingle(name, TYPE_BYTE_ARRAY, length, offset) def getString(self, name, offset = 0): """Get the value of the byte array dataref with the given name as a string.""" s = "" for b in self.getByteArray(name, offset = offset): if b==0: break s += chr(b) return s def setInt(self, name, value): """Set the value of the integer dataref with the given name.""" self._setSingle(name, TYPE_INT, value) def setFloat(self, name, value): """Set the value of the float dataref with the given name.""" self._setSingle(name, TYPE_FLOAT, value) def setDouble(self, name, value): """Set the value of the double dataref with the given name.""" self._setSingle(name, TYPE_DOUBLE, value) def setFloatArray(self, name, value, offset = 0): """Set the value of the float array dataref with the given name.""" self._setSingle(name, TYPE_FLOAT_ARRAY, value, offset = offset) def setIntArray(self, name, value, offset = 0): """Set the value of the integer array dataref with the given name.""" self._setSingle(name, TYPE_INT_ARRAY, value, offset = offset) def setByteArray(self, name, value, offset = 0): """Set the value of the byte array dataref with the given name.""" self._setSingle(name, TYPE_BYTE_ARRAY, value, offset = offset) def setString(self, name, value, length, offset = 0): """Set the value of the byte array dataref with the given name from a string. The string will be padded with 0's so that its length is the given one.""" value = [ord(c) for c in value[:length]] value += [0] * (length - len(value)) self.setByteArray(name, value, offset) def disconnect(self): """Disconnect from X-Plane.""" if self._stream is not None: self._stream.close() self._stream = None def _checkResult(self, resultCode = None): """Check the given result code. If it is None, it will be read first. If it is not RESULT_OK, a ProtocolException is thrown.""" if resultCode is None: resultCode = self._readU8() if resultCode!=RESULT_OK: raise ProtocolException(resultCode) def _getSingle(self, name, type, length = None, offset = None): """Get the single value of the given name and type.""" self._writeU8(COMMAND_GET_SINGLE) self._writeString(name) self._writeU8(type) if length is not None and offset is not None: self._writeS32(length) self._writeS32(offset) self._flush() self._checkResult() return self._readValue(type) def _readValue(self, type): """Read the value of the given type from the stream.""" if type==TYPE_INT: return self._readS32() elif type==TYPE_FLOAT: return self._readFloat() elif type==TYPE_DOUBLE: return self._readDouble() else: length = self._readS32() arr = None elementSize = 4 if type==TYPE_FLOAT_ARRAY: arr = array.array("f") elif type==TYPE_INT_ARRAY: arr = array.array("i") elif type==TYPE_BYTE_ARRAY: arr = array.array("B") elementSize = 1 if arr is not None and length>0: arr.fromstring(self._stream.read(length*elementSize)) return arr def _setSingle(self, name, type, value, offset = None): """Set the single value of the given name and type.""" self._writeU8(COMMAND_SET_SINGLE) self._writeString(name) self._writeU8(type) if offset is not None: self._writeS32(len(value)) self._writeS32(offset) self._writeValue(type, value) self._flush() self._checkResult() def _writeValue(self, type, value): """Write a value of the given type.""" if type==TYPE_INT: self._writeS32(int(value)) elif type==TYPE_FLOAT: self._writeFloat(float(value)) elif type==TYPE_DOUBLE: self._writeDouble(float(value)) else: arr = None if type==TYPE_FLOAT_ARRAY: arr = array.array("f") elif type==TYPE_INT_ARRAY: arr = array.array("i") elif type==TYPE_BYTE_ARRAY: arr = array.array("B") arr.fromlist(value) self._stream.write(arr.tostring()) def _writeU8(self, x): """Write the given value as an unsigned, 8-bit value.""" self._stream.write(struct.pack("B", x)) def _writeS32(self, x): """Write the given value as a signed, 32-bit value.""" self._stream.write(struct.pack("i", x)) def _writeU32(self, x): """Write the given value as an unsigned, 32-bit value.""" self._stream.write(struct.pack("I", x)) def _writeFloat(self, x): """Write the given value as a single-precision floating point.""" self._stream.write(struct.pack("f", x)) def _writeDouble(self, x): """Write the given value as a double-precision floating point.""" self._stream.write(struct.pack("d", x)) def _writeLength(self, length): """Write the given value is a variable-length value into our stream.""" self._stream.write(self._packLength(length)) def _writeString(self, str): """Write the given string into the stream.""" self._stream.write(self._packString(str)) def _flush(self): """Flush our stream.""" self._stream.flush() def _readU8(self): """Read an unsigned, 8-bit value from the stream.""" (value,) = struct.unpack("B", self._stream.read(1)) return value def _readS32(self): """Read a signed, 32-bit value from the stream.""" (value,) = struct.unpack("i", self._stream.read(4)) return value def _readU32(self): """Read an unsigned, 32-bit value from the stream.""" (value,) = struct.unpack("I", self._stream.read(4)) return value def _readFloat(self): """Read a single-precision floating point value from the stream.""" (value,) = struct.unpack("f", self._stream.read(4)) return value def _readDouble(self): """Read a double-precision floating point value from the stream.""" (value,) = struct.unpack("d", self._stream.read(8)) return value def _readLength(self): """Read a variable-length value from our stream.""" length = 0 while True: (x,) = struct.unpack("B", self._stream.read(1)) length <<= 7 length |= x&0x7f if x&0x80==0: return length def _readString(self): """Read a string from our stream.""" length = self._readLength() return self._stream.read(length) #-------------------------------------------------------------------------------