How to get notifications from BLE Device using pygatt in python?

Alican Uzun picture Alican Uzun · Jul 7, 2017 · Viewed 8.6k times · Source

I am developing a Linux application using python that will connect to my BLE Device and get the data by notifying characteristic. I am using pygatt for BLE Communications. I can successfully connect and bond to device and read from/write to characteristics. Even I can subscribe to notify characteristic, but the problem is, my BLE device is a custom machine and have 4 counters inside it, every time one of the counter's data changed, it sets the corresponding flag for notification, thus, with the onDataChanged-like methods I can read the counter's data from reading characteristic. In Python using pygatt, I can subscribe to notify characteristic with:

class_name.device.subscribe(uuid.UUID(notify_characteristic),callback=notifyBle)

and the notifyBle is:

def notifyBle(self,handle,data):
    read_data = class_name.device.char_read(uuid.UUID(read_characteristic))
    print(read_data)

When I run the program, first I scan the devices and connect to my device and bond with it, then I discover characteristics and list them. All is successful. After listing the characteristics, I write to write characteristic to clear notification flags, it is successful too. Last I subscribe to notify characteristic it is successful.

After all these processes, I increase my device's counters physically(there are buttons on the device for increasing counters). When I press the button program goes to the notifyBle method and it gives the error, which is:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 137, in run
    event["callback"](event)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 479, in _handle_notification_string
    self._connected_device.receive_notification(handle, values)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/device.py", line 226, in receive_notification
    callback(handle, value)
  File "/home/acd/Masaüstü/python_workspace/ble.py", line 54, in notifyBle
    read_data = bleFunctions.dev.char_read(uuid.UUID(bleFunctions.read_characteristic))
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/device.py", line 17, in wrapper
    return func(self, *args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/device.py", line 40, in char_read
    return self._backend.char_read(self, uuid, *args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 53, in wrapper
    return func(self, *args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 519, in char_read
    self.sendline('char-read-uuid %s' % uuid)
  File "/usr/lib/python3.5/contextlib.py", line 66, in __exit__
    next(self.gen)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 180, in event
    self.wait(event, timeout)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 154, in wait
    raise NotificationTimeout()
pygatt.exceptions.NotificationTimeout

Any help would be appreciated.

PS: I wrote the exact same program in Android, and in Windows UWP. With python, I am aiming to run this on raspberry pi 3.

PSS: I am using raspberry pi 3 with Ubuntu Mate installed for developing this program in python.

Answer

emre.sahin picture emre.sahin · Jul 18, 2017

Firstly, create event class like below,

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

then, create a ble class

import pygatt
from eventclass import Event

class myBle:
    ADDRESS_TYPE = pygatt.BLEAddressType.random
    read_characteristic = "0000xxxx-0000-1000-8000-00805f9b34fb"
    write_characteristic = "0000xxxx-0000-1000-8000-00805f9b34fb"
    notify_characteristic = "0000xxxxx-0000-1000-8000-00805f9b34fb"
    def __init__(self,device):
        self.device = device
        self.valueChanged = Event()
        self.checkdata = False

    def alert(self):
         self.valueChanged(self.checkdata)

    def write(self,data):
        self.device.write_char(self.write_characteristic,binascii.unhexlify(data))

    def notify(self,handle,data):
        self.checkdata = True

    def read(self):
        if(self.checkdata):
            self.read_data = self.device.char_read(uuid.UUID(self.read_characteristic))
            self.write(bytearray(b'\x10\x00'))
            self.checkdata = False
            return self.read_data
    def discover(self):
        return self.device.discover_characteristics().keys()

when you get notification, you will set the boolean value to true and the alert method will notify the boolean value is changed.You will listen to alert method with

def triggerEvent(checkdata):
   print(str(checkdata))

ble = myBle(device)
ble.valueChanged += triggerEvent
ble.alert()

you can call the read method to get characteristics value with triggerEvent method.