How do I recover from a serialException using pySerial

Stephen picture Stephen · Jan 25, 2013 · Viewed 9.8k times · Source

I have an application that reads and transmits data to a device connected via USB. I'm using pySerial to facilitate this communication. Everything works fine until the USB cable is unplugged from the PC and an exception is thrown. Once the cable is plugged back in, I can't seem to recover and reconnect to my device. The only way for me to recover is to close down the application and unplug and plug the cable in again. Any help in understanding what's going on would be much appreciated.

This is basic test code that I'm useing to help me understand the process.

# Class used to communicate with USB Dongle

import serial
import time
import sys

class LPort:
  def __init__(self, port=0):
    "initialize the LPort class"
    self.error = ""
    self.traffic = ""
    self.dest = None
    if port == None:
        self.simulation = True
    else:
        self.simulation = False
        self.port = port # serial port we should use
    self.reset()
    self.time = time.time()

def reInit(self):
    self.close()

def reset(self):
    "flush port, reset the LPort, initialize LPort"
    if self.simulation:
        r = "LPort simulator"
    else:
        self.port.flushInput()
        self.port.flushOutput()           
        self.fail = False
        self.command("/H1")
        self.dest = None
        r = "reset"
    self.error = ""
    self.traffic = ""
    return r  

def status(self):
    "return accumulated status info, reset collection"
    s = self.error
    self.error = ""
    return s 

def data(self):
    "return accumulated traffic data, reset collection"
    s = self.traffic
    self.traffic = ""
    return s   

def set_dest(self, addr):
    "set the destination address (if necessary)"
    if addr != self.dest:
        self.dest = addr
        self.command("/O")
        r = self.command("/D%02X" % addr)
        if r != "*":
            self.dest = None
            self.error += r
        else:
            r = True
    return r 

def checksum(self, bytes):
    "calculate the CRC-8 checksum for the given packet"
    crc_table = [
            # this table is taken from the CP rectifier code
            0x00,0x07,0x0E,0x09,0x1C,0x1B,0x12,0x15,0x38,0x3F,
            0x36,0x31,0x24,0x23,0x2A,0x2D,0x70,0x77,0x7E,0x79,
            0x6C,0x6B,0x62,0x65,0x48,0x4F,0x46,0x41,0x54,0x53,
            0x5A,0x5D,0xE0,0xE7,0xEE,0xE9,0xFC,0xFB,0xF2,0xF5,
            0xD8,0xDF,0xD6,0xD1,0xC4,0xC3,0xCA,0xCD,0x90,0x97,
            0x9E,0x99,0x8C,0x8B,0x82,0x85,0xA8,0xAF,0xA6,0xA1,
            0xB4,0xB3,0xBA,0xBD,0xC7,0xC0,0xC9,0xCE,0xDB,0xDC,
            0xD5,0xD2,0xFF,0xF8,0xF1,0xF6,0xE3,0xE4,0xED,0xEA,
            0xB7,0xB0,0xB9,0xBE,0xAB,0xAC,0xA5,0xA2,0x8F,0x88,
            0x81,0x86,0x93,0x94,0x9D,0x9A,0x27,0x20,0x29,0x2E,
            0x3B,0x3C,0x35,0x32,0x1F,0x18,0x11,0x16,0x03,0x04,
            0x0D,0x0A,0x57,0x50,0x59,0x5E,0x4B,0x4C,0x45,0x42,
            0x6F,0x68,0x61,0x66,0x73,0x74,0x7D,0x7A,0x89,0x8E,
            0x87,0x80,0x95,0x92,0x9B,0x9C,0xB1,0xB6,0xBF,0xB8,
            0xAD,0xAA,0xA3,0xA4,0xF9,0xFE,0xF7,0xF0,0xE5,0xE2,
            0xEB,0xEC,0xC1,0xC6,0xCF,0xC8,0xDD,0xDA,0xD3,0xD4,
            0x69,0x6E,0x67,0x60,0x75,0x72,0x7B,0x7C,0x51,0x56,
            0x5F,0x58,0x4D,0x4A,0x43,0x44,0x19,0x1E,0x17,0x10,
            0x05,0x02,0x0B,0x0C,0x21,0x26,0x2F,0x28,0x3D,0x3A,
            0x33,0x34,0x4E,0x49,0x40,0x47,0x52,0x55,0x5C,0x5B,
            0x76,0x71,0x78,0x7F,0x6A,0x6D,0x64,0x63,0x3E,0x39,
            0x30,0x37,0x22,0x25,0x2C,0x2B,0x06,0x01,0x08,0x0F,
            0x1A,0x1D,0x14,0x13,0xAE,0xA9,0xA0,0xA7,0xB2,0xB5,
            0xBC,0xBB,0x96,0x91,0x98,0x9F,0x8A,0x8D,0x84,0x83,
            0xDE,0xD9,0xD0,0xD7,0xC2,0xC5,0xCC,0xCB,0xE6,0xE1,
            0xE8,0xEF,0xFA,0xFD,0xF4,0xF3]
    for i in range(len(bytes)):
        b = int(bytes[i])
        if i == 0: chksum = crc_table[b]
        else: chksum = crc_table[chksum ^ b]
    return chksum  

def command(self, cmd):
    "transmit distinct commands to unit, and accept response"
    if self.simulation:
        r = "*"
    else:
        try:
            self.port.write(cmd + chr(13))
        except serial.serialutil.SerialTimeoutException:
            r = "/TO"
            return r              
        except:
            print "Unexpected error:", sys.exc_info()[0]
            r = "/Unknown"
            return r
        r = ""
        eol = False
        while True:
            c = self.port.read(1)
            if not c:
                r = "/FAIL " + r + " " + cmd
                self.error = r
                break
            else:
                r += c
                ordc = ord(c)
                if ordc == 13 or ordc == 42:
                    break           
    return r

def checkRawDataForErrors(self, raw, errors = []):

    errorCodes = {'/SNA':'Slave Not Acknowledging',
                    '/I81':'Busy, Command Ignored',
                    '/I88':'Connection Not Open',
                    '/I89':'Invalid Command Argument',
                    '/I8A':'Transmit Not Active',
                    '/I8F':'Invalid Command',
                    '/I90':'Buffer Overflow',
                    '/DAT':'Data Error',
                    '/BADPEC':'Bad PEC Value',
                    '/NO_MRC':'No Master Read Complete Signal',
                    '/FAIL':'General Failure',
                    '/LEN':'Data Length Error'}

    for ekey, eval in errorCodes.items():
        if ekey in raw:
            errors.append(eval)

    return errors        
# self-testing module
if __name__ == "__main__":

  com = serial.Serial(port=4, baudrate=115200, timeout=1, xonxoff=0)

  if com:
    port = LPort(com)
    print port
    time.sleep(5)

    port = LPort(com)

    print "/V =", port.command("/V")
    print "/V", port.data(), port.status()
    print "/O =", port.command("/O")
    print "/O", port.data(), port.status()
    print "/A =", port.command("/A")
    print "/A", port.data(), port.status()
    print "/L =", port.command("/L")
    print "/L", port.data(), port.status()
    com.close()
else:
    print "cannot open com port"

UPDATE: The following is the code around the creatfile() in serialwin32.py which returns the following message: serial.serialutil.SerialException: could not open port COM5: [Error 2] The system cannot find the file specified.

    self.hComPort = win32.CreateFile(port,
           win32.GENERIC_READ | win32.GENERIC_WRITE,
           0, # exclusive access
           None, # no security
           win32.OPEN_EXISTING,
           win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
           0)
    if self.hComPort == win32.INVALID_HANDLE_VALUE:
        self.hComPort = None    # 'cause __del__ is called anyway
        raise SerialException("could not open port %s: %s" % (self.portstr,  ctypes.WinError())) 

Answer

Dima Tisnek picture Dima Tisnek · Oct 1, 2014

Assuming your device is well-behaved, all you must do is this:

  • close your serial port (serial.Serial instance)
  • find the COMX name of your port again
  • open the serial port

The 2nd part is problematic because Windows tries to be clever. In your case the following happens:

  • USB device is connected and is assigned name COM2
  • Your program opens the device
  • USB disconnects
  • USB reconnects quickly before your program noticed that device died
  • Windows sees that COM2 is busy and assigns a different name to this USB device
  • (optional) your program closes the device
  • your program tries to open COM2 again, but there's no hardware at that name

The are way to get around Windows being clever -- you can specifically assign fixed COMX name to this device in Device Manager, COM ports, your port, advanced options.

Another option is to detect device dying very fast and closing the file handle. If you are lucky then by the time device reconnects original COM2 is free again.

Yet another option is to use a USB-serial converter from another manufacturer that uses another driver. Somehow COMX letter assignment is driver-specific. Better drivers may give you a stable name.