How to Compress Slot Calls When Using Queued Connection in Qt?

Michel Feinstein picture Michel Feinstein · Jan 1, 2014 · Viewed 8.4k times · Source

After reading some articles like this about Qt Signal-Slot communications I still have a question concerning the queued connection.

If I have some threads sending signals all the time to each other and lets say one thread_slowis running a slow method in it's event loop and another thread_fast is running a fast one that sends multiple signals while the other thread is still running it's slow method.....when the slow method from thread_slow returns to the event loop, will it process all the signals that were sent before by thread_fastor just the last one (all the signals are the same type)?

If it will process all the signals, is it there a way to make the thread_slow only process the last one? (Considering "the last one" in a multithread application might be vague, let's consider the last signal before the thread asked for the last signal, for the sake of simplicity, so new ones being sent while the thread looks for the last might be lost).

(I am asking this because I have multiple threads receiving data from multiple threads, and I dont want them to process old data, just the last one that was sent)

I have run some tests, and it appears that Qt will process all the signals. I made one thread do:

while(true)
{
    QThread::msleep(500);
    emit testQueue(test);
    test++;
}

and a slot in another will do:

void test::testQueue(int test)
{
    test.store(private_test.load() + test);
    emit testText(QString("Test Queue: ") + QString::number(private_test.load()));
}

and the thread will run:

while(true)
{
    QThread::msleep(3000);
    QCoreApplication::processEvents();
    private_test.store(private_test.load() + 1000);
}

I am sending a signal from one thread to the other every 500 milliseconds, and the other thread sleeps for 3000 milliseconds (3 seconds) and then wakes up and increment an internal variable by 100. every time the slot is executed it emits a text with the value received + the internal variable. The result I am having is that every time QCoreApplication::processEvents(); is called, all signals are executed....(I edited this part because I found a bug in my previous code)

Answer

QCoreApplication QMetaCallEvent Compression

Every queued slot call ends up in the posting of a QMetaCallEvent to the target object. The event contains the sender object, the signal id, the slot index, and packaged call parameters. On Qt 5, the signal id generally doesn't equal the value returned by QMetaObject::signalIndex(): it is an index computed as if the object only had signal methods and no other methods.

The objective is to compress such calls so that only one unique call exists in the event queue for a given tuple of (sender object, sender signal, receiver object, receiver slot).

This is the only sane way to do it, without having to make changes to source or target objects, and while maintaining minimal overhead. The event-loop-recursing methods in my other answers have serious stack overhead per each event, on the order of 1kbyte when Qt is built for 64-bit-pointer architectures.

The event queue can be accessed when new events are posted to an object that has one or more events already posted to it. In such case, QCoreApplication::postEvent calls QCoreApplication::compressEvent. compressEvent is not called when the first event is posted to an object. In a reimplementation of this method, the contents of a QMetaCallEvent posted to the target object can be checked for a call to your slot, and the obsolete duplicate has to be deleted. Private Qt headers have to be included to obtain the definitions of QMetaCallEvent, QPostEvent and QPostEventList.

Pros: Neither the sender nor the receiver objects have to be aware of anything. Signals and slots work as-is, including the method-pointer calls in Qt 5. Qt itself uses this way of compressing events.

Cons: Requires inclusion of private Qt headers and forcible clearing of QEvent::posted flag.

Instead of hacking the QEvent::posted flag, the events to be deleted can be queued in a separate list and deleted outside of the compressEvent call, when a zero-duration timer is fired. This has the overhead of an extra list of events, and each event deletion iterating through the posted event list.

Other Approaches

The point of doing it some other way is not to use Qt's internals.

L1 The first limitation is not having access to the contents of privately defined QMetaCallEvent. It can be dealt with as follows:

  1. A proxy object with signals and slots of same signatures as those of the target can be connected between the source and target objects.

  2. Running the QMetaCallEvent on a proxy object allows extraction of the call type, the called slot id, and the arguments.

  3. In lieu of signal-slot connections, events can be explicitly posted to the target object. The target object, or an event filter, must explicitly re-synthesize the slot call from event's data.

  4. A custom compressedConnect implementation can be used in lieu of QObject::connect. This fully exposes the details of the signal and slot. A proxy object can be used to perform a compression-friendly equivalent of queued_activate on the side of the sender object.

L2 The second limitation is not being able to completely reimplement QCoreApplication::compressEvent, since the event list is defined privately. We still have access to the event being compressed, and we can still decide whether to delete it or not, but there's no way to iterate the event list. Thus:

  1. The event queue can be accessed implicitly by recursively calling sendPostedEvents from within notify (thus also from eventFilter(), event() or from the slots). This doesn't cause a deadlock, since QCoreApplication::sendPostedEvents can't (and doesn't) hold an event loop mutex while the event is delivered via sendEvent. Events can be filtered as follows:

    • globally in a reimplemented QCoreApplication::notify,
    • globally by registering a QInternal::EventNotifyCallback,
    • locally by attaching an event filter to the objects,
    • explicitly locally by reimplementing QObject::event() in the target class.

    The duplicate events are still posted to the event queue. The recursive calls to notify from within sendPostedEvents consume quite a bit of stack space (budget 1kb on 64-bit-pointer architectures).

  2. The events already present can be removed by calling QCoreApplication::removePostedEvents before posting a new event to an object. Unfortunately, doing this within QCoreApplication::compressEvent causes a deadlock as the event queue mutex is already held.

    A custom event class that includes the pointer to the receiver object can automatically call removePostedEvents in the constructor.

  3. Existing compressed events, such as QEvent::Exit, can be reappropriated.

    The set of those events is an implementation detail and could change. Qt doesn't discriminate among those events other than by the receiver QObject pointer. An implementation requires the overhead of a proxy QObject per each (event type, receiver object) tuple.

Implementation

The code below works on both Qt 4 and Qt 5. On the latter, make sure to add QT += core-private to your qmake project file, so that private Qt headers get included.

The implementations not using Qt internal headers are given in other answers:

There are two event-removing code paths, selected by if (true). The enabled code path retains the most recent event and makes most sense, typically. Alternatively, you could want to retain the oldest event - that's what the disabled code path does.

screenshot

#include <QApplication>
#include <QMap>
#include <QSet>
#include <QMetaMethod>
#include <QMetaObject>
#include <private/qcoreapplication_p.h>
#include <private/qthread_p.h>
#include <private/qobject_p.h>

#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

// Works on both Qt 4 and Qt 5.

//
// Common Code

/*! Keeps a list of singal indices for one or more meatobject classes.
 * The indices are signal indices as given by QMetaCallEvent.signalId.
 * On Qt 5, those do *not* match QMetaObject::methodIndex since they
 * exclude non-signal methods. */
class SignalList {
    Q_DISABLE_COPY(SignalList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
    /*! Returns a signal index that is can be compared to QMetaCallEvent.signalId. */
    static int signalIndex(const QMetaMethod & method) {
        Q_ASSERT(method.methodType() == QMetaMethod::Signal);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        int index = -1;
        const QMetaObject * mobj = method.enclosingMetaObject();
        for (int i = 0; i <= method.methodIndex(); ++i) {
            if (mobj->method(i).methodType() != QMetaMethod::Signal) continue;
            ++ index;
        }
        return index;
#else
        return method.methodIndex();
#endif
    }
public:
    SignalList() {}
    void add(const QMetaMethod & method) {
        m_data[method.enclosingMetaObject()].insert(signalIndex(method));
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(signalIndex(method));
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int signalId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(signalId);
    }
};

//
// Implementation Using Event Compression With Access to Private Qt Headers

struct EventHelper : private QEvent {
    static void clearPostedFlag(QEvent * ev) {
        (&static_cast<EventHelper*>(ev)->t)[1] &= ~0x8001; // Hack to clear QEvent::posted
    }
};

template <class Base> class CompressorApplication : public Base {
    SignalList m_compressedSignals;
public:
    CompressorApplication(int & argc, char ** argv) : Base(argc, argv) {}
    void addCompressedSignal(const QMetaMethod & method) { m_compressedSignals.add(method); }
    void removeCompressedSignal(const QMetaMethod & method) { m_compressedSignals.remove(method); }
protected:
    bool compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents) {
        if (event->type() != QEvent::MetaCall)
            return Base::compressEvent(event, receiver, postedEvents);

        QMetaCallEvent *mce = static_cast<QMetaCallEvent*>(event);
        if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false;
        for (QPostEventList::iterator it = postedEvents->begin(); it != postedEvents->end(); ++it) {
            QPostEvent &cur = *it;
            if (cur.receiver != receiver || cur.event == 0 || cur.event->type() != event->type())
                continue;
            QMetaCallEvent *cur_mce = static_cast<QMetaCallEvent*>(cur.event);
            if (cur_mce->sender() != mce->sender() || cur_mce->signalId() != mce->signalId() ||
                    cur_mce->id() != mce->id())
                continue;
            if (true) {
              /* Keep The Newest Call */              
              // We can't merely qSwap the existing posted event with the new one, since QEvent
              // keeps track of whether it has been posted. Deletion of a formerly posted event
              // takes the posted event list mutex and does a useless search of the posted event
              // list upon deletion. We thus clear the QEvent::posted flag before deletion.
              EventHelper::clearPostedFlag(cur.event);
              delete cur.event;
              cur.event = event;
            } else {
              /* Keep the Oldest Call */
              delete event;
            }
            return true;
        }
        return false;
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        connect(invoke, &QPushButton::clicked, this, &Widget::sendSignals);
        connect(&m_signaller, &Signaller::emptySignal, this, &Widget::emptySlot, Qt::QueuedConnection);
        connect(&m_signaller, &Signaller::dataSignal, this, &Widget::dataSlot, Qt::QueuedConnection);
#else
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
#endif
    }
};

int main(int argc, char *argv[])
{
    CompressorApplication<QApplication> a(argc, argv);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::emptySignal));
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::dataSignal));
#else
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("emptySignal()")));
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("dataSignal(int)")));
#endif
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"