Source code for pyrcareworld.utils.rfuniverse_communicator

import struct
import socket
import threading
from sys import platform
import numpy as np
from pyrcareworld.utils.locker import Locker


[docs] class RFUniverseCommunicator(threading.Thread): def __init__( self, port: int = 5004, receive_data_callback=None, proc_type="editor", ): self.server = None self.client = None self.connected = False threading.Thread.__init__(self) self.read_offset = 0 self.on_receive_data = receive_data_callback # self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # send_buffer_size = 1024 * 1024 * 10 # self.server.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buffer_size) # recv_buffer_size = 1024 * 1024 * 10 # self.server.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buffer_size) self.port = port if proc_type == "editor": # self.server.bind(("localhost", self.port)) pass elif proc_type == "release": self._get_port() else: raise ValueError(f"Unknown proc_type: {proc_type}") def _get_port(self): with Locker("port"): self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while self.port < 65536: try: self.server.bind(("localhost", self.port)) self.server.close() return except OSError: self.port += 256 raise OSError("No available port")
[docs] def online(self): self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind(("localhost", self.port)) print(f"Waiting for connections on port: {self.port}...") self.server.listen(1) self.client, _ = self.server.accept() print(f"Connected successfully") self.connected = True self.client.settimeout(None) self.client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.receive_step()
[docs] def close(self): self.client.close() self.server.close() self.connected = False
[docs] def run(self): while True: data = self.receive_bytes() objs = self.receive_object(data) if self.on_receive_data is not None: self.on_receive_data(objs)
[docs] def sync_step(self): self.send_object("StepStart") self.receive_step()
[docs] def receive_step(self): # sync_receive_objects_queue = [] while True: if not self.connected: raise ConnectionError("Connection closed") data = self.receive_bytes() if data is not None and len(data) > 0: objs = self.receive_object(data) if len(objs) > 0 and objs[0] == "StepEnd": break self.on_receive_data(objs)
[docs] def receive_bytes(self): data = bytearray() while len(data) < 4: temp_data = self.client.recv(4 - len(data)) assert len(temp_data) != 0 data.extend(temp_data) assert len(data) == 4 length = int.from_bytes(data, byteorder="little", signed=False) if length == 0: return None buffer = bytearray() while len(buffer) < length: temp_data = self.client.recv(length - len(buffer)) assert len(temp_data) != 0 buffer.extend(temp_data) assert len(buffer) == length return buffer
[docs] def send_bytes(self, data: bytes): if not self.connected: return length = len(data).to_bytes(4, byteorder="little", signed=False) self.client.send(length) self.client.send(data) if platform == 'linux': self.client.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
[docs] def receive_object(self, data: bytes) -> list: self.read_offset = 0 count = self.read_int(data) objs = [] for i in range(count): objs.append(self.read_object(data)) return objs
[docs] def read_object(self, datas: bytes) -> object: data_type = self.read_string(datas) if data_type == "int": return self.read_int(datas) elif data_type == "float": return self.read_float(datas) elif data_type == "string": return self.read_string(datas) elif data_type == "bool": return self.read_bool(datas) elif data_type == "bytes": return self.read_bytes(datas) elif data_type == "vector3": return self.read_object(datas) elif data_type == "quaternion": return self.read_object(datas) elif data_type == "matrix": return self.read_object(datas) elif data_type == "rect": return [self.read_float(datas) for _ in range(4)] elif data_type == "array": rank = self.read_int(datas) shape = [] for _ in range(rank): shape.append(self.read_int(datas)) result = np.ndarray(shape, dtype=np.float32) result = result.reshape(-1) for i in range(len(result)): result[i] = self.read_float(datas) return result.reshape(shape) elif data_type == "list": count = self.read_int(datas) result = [] for _ in range(count): result.append(self.read_object(datas)) return result elif data_type == "dict": count = self.read_int(datas) result = {} for _ in range(count): key = self.read_object(datas) value = self.read_object(datas) result[key] = value return result elif data_type == "tuple": count = self.read_int(datas) result = [] for _ in range(count): result.append(self.read_object(datas)) return tuple(result) elif data_type == "null" or data_type == "none": return None else: raise ValueError(f"This type is unsupported: {data_type}")
[docs] def read_string(self, datas: bytes) -> str: count = self.read_int(datas) try: assert count <= len(datas) - self.read_offset except: raise AssertionError( f"count: {count}, len(datas): {len(datas)}, self.read_offset: {self.read_offset}" ) self.read_offset += count try: ret = datas[self.read_offset - count: self.read_offset].decode("utf-8") except: print(datas[self.read_offset - count: self.read_offset]) print( f"read_start: {self.read_offset - count}, read_end: {self.read_offset}, count: {count}" ) raise UnicodeDecodeError( "utf-8", datas[self.read_offset - count: self.read_offset], self.read_offset - count, self.read_offset, ) return ret
[docs] def read_int(self, datas: bytes) -> int: self.read_offset += 4 ret = int.from_bytes( datas[self.read_offset - 4: self.read_offset], byteorder="little", signed=True, ) return ret
[docs] def read_float(self, datas: bytes) -> float: self.read_offset += 4 return struct.unpack("f", datas[self.read_offset - 4: self.read_offset])[0]
[docs] def read_bool(self, datas: bytes) -> bool: self.read_offset += 1 return bool( int.from_bytes( datas[self.read_offset - 1: self.read_offset], byteorder="little" ) )
[docs] def read_bytes(self, datas: bytes) -> bytes: count = self.read_int(datas) self.read_offset += count return datas[self.read_offset - count: self.read_offset]
[docs] def send_object(self, *args): datas = bytearray() self.write_int(datas, len(args)) for obj in args: self.write_object(datas, obj) self.send_bytes(bytes(datas))
[docs] def write_object(self, datas: bytearray, obj): if obj is None: self.write_string(datas, "none") elif type(obj) == int or type(obj) == np.int32 or type(obj) == np.int64: self.write_string(datas, "int") self.write_int(datas, obj) elif type(obj) == float or type(obj) == np.float32 or type(obj) == np.float64: self.write_string(datas, "float") self.write_float(datas, obj) elif type(obj) == bool: self.write_string(datas, "bool") self.write_bool(datas, obj) elif type(obj) == str: self.write_string(datas, "string") self.write_string(datas, obj) elif type(obj) == bytes or type(obj) == bytearray: self.write_string(datas, "bytes") self.write_bytes(datas, bytes(obj)) elif type(obj) == list: self.write_string(datas, "list") self.write_int(datas, len(obj)) for item in obj: self.write_object(datas, item) elif type(obj) == dict: self.write_string(datas, "dict") self.write_int(datas, len(obj)) for item in obj: self.write_object(datas, item) self.write_object(datas, obj[item]) elif type(obj) == np.ndarray: self.write_string(datas, "array") self.write_int(datas, len(obj.shape)) for i in range(len(obj.shape)): self.write_int(datas, obj.shape[i]) obj = obj.reshape(-1) for i in range(len(obj)): self.write_float(datas, float(obj[i])) elif type(obj) == tuple: self.write_string(datas, "tuple") self.write_int(datas, len(obj)) for i in range(len(obj)): self.write_object(datas, obj[i]) else: print(f"dont support this type: {type(obj)}") self.write_string(datas, "null")
[docs] def write_string(self, datas: bytearray, s: str): s_byte = s.encode("utf-8") self.write_int(datas, len(s_byte)) datas.extend(s_byte)
[docs] def write_int(self, datas: bytearray, i: int): datas.extend(i.to_bytes(4, byteorder="little"))
[docs] def write_float(self, datas: bytearray, f: float): datas.extend(struct.pack("f", f))
[docs] def write_bool(self, datas: bytearray, b: bool): datas.extend(int(b).to_bytes(1, byteorder="little"))
[docs] def write_bytes(self, datas: bytearray, b: bytes): self.write_int(datas, len(b)) datas.extend(b)