importserialimportserial.tools.list_portsclassCommunication():
def__init__(self,com,bps,timeout):
self.port=comself.bps=bpsself.timeout=timeoutglobalRettry:
self.main_engine=serial.Serial(self.port,self.bps,timeout=self.timeout)
if (self.main_engine.is_open):
Ret=TrueexceptExceptionase:
print("---异常---:", e)
defPrint_Name(self):
print(self.main_engine.name) print(self.main_engine.port)print(self.main_engine.baudrate)print(self.main_engine.bytesize)print(self.main_engine.parity)print(self.main_engine.stopbits)print(self.main_engine.timeout)print(self.main_engine.writeTimeout)print(self.main_engine.xonxoff)print(self.main_engine.rtscts)print(self.main_engine.dsrdtr)print(self.main_engine.interCharTimeout)defOpen_Engine(self):
self.main_engine.open()
defClose_Engine(self):
self.main_engine.close()
print(self.main_engine.is_open) @staticmethoddefPrint_Used_Com():
port_list=list(serial.tools.list_ports.comports())
print(port_list)
defRead_Size(self,size):
returnself.main_engine.read(size=size)
defRead_Line(self):
returnself.main_engine.readline()
defSend_data(self,data):
self.main_engine.write(data)
defRecive_data(self,way):
print("开始接收数据:")
whileTrue:
try:
ifself.main_engine.in_waiting:
if(way==0):
foriinrange(self.main_engine.in_waiting):
print("接收ascii数据:"+str(self.Read_Size(1)))
data1=self.Read_Size(1).hex()data2=int(data1,16)if(way==1):
data=self.main_engine.read_all()exceptExceptionase:
print("异常报错:",e)
Communication.Print_Used_Com()
Ret=FalseEngine1=Communication("com12",115200,0.5)
if (Ret):
Engine1.Recive_data(0)