diff options
| author | Javier <dev.git@javispedro.com> | 2014-12-04 00:17:28 +0100 |
|---|---|---|
| committer | Javier <dev.git@javispedro.com> | 2014-12-04 00:17:28 +0100 |
| commit | ddcc8ada42c186e980626ff617be038f45106145 (patch) | |
| tree | d96b326ab3d4eb4f49d09689cadcf414bf559982 /daemon/appmsgmanager.cpp | |
| parent | 8d98f990c9ed158d8c65befc154ab58a3c392646 (diff) | |
send appmessages one at a time
Diffstat (limited to 'daemon/appmsgmanager.cpp')
| -rw-r--r-- | daemon/appmsgmanager.cpp | 288 |
1 files changed, 189 insertions, 99 deletions
diff --git a/daemon/appmsgmanager.cpp b/daemon/appmsgmanager.cpp index 12971ce..d5f527e 100644 --- a/daemon/appmsgmanager.cpp +++ b/daemon/appmsgmanager.cpp @@ -1,3 +1,5 @@ +#include <QTimer> + #include "appmsgmanager.h" #include "unpacker.h" #include "packer.h" @@ -5,46 +7,22 @@ // TODO D-Bus server for non JS kit apps!!!! AppMsgManager::AppMsgManager(AppManager *apps, WatchConnector *watch, QObject *parent) - : QObject(parent), apps(apps), watch(watch), lastTransactionId(0) + : QObject(parent), apps(apps), watch(watch), lastTransactionId(0), timeout(new QTimer(this)) { + connect(watch, &WatchConnector::connectedChanged, + this, &AppMsgManager::handleWatchConnectedChanged); + + timeout->setSingleShot(true); + timeout->setInterval(3000); + connect(timeout, &QTimer::timeout, + this, &AppMsgManager::handleTimeout); + watch->setEndpointHandler(WatchConnector::watchLAUNCHER, [this](const QByteArray &data) { - if (data.at(0) == WatchConnector::appmsgPUSH) { - uint transaction; - QUuid uuid; - WatchConnector::Dict dict; - - if (!unpackPushMessage(data, &transaction, &uuid, &dict)) { - // Failed to parse! - // Since we're the only one handling this endpoint, - // all messages must be accepted - logger()->warn() << "Failed to parser LAUNCHER PUSH message"; - return true; - } - if (!dict.contains(1)) { - logger()->warn() << "LAUNCHER message has no item in dict"; - return true; - } - - switch (dict.value(1).toInt()) { - case WatchConnector::launcherSTARTED: - logger()->debug() << "App starting in watch:" << uuid; - this->watch->sendMessage(WatchConnector::watchLAUNCHER, - buildAckMessage(transaction)); - emit appStarted(uuid); - break; - case WatchConnector::launcherSTOPPED: - logger()->debug() << "App stopping in watch:" << uuid; - this->watch->sendMessage(WatchConnector::watchLAUNCHER, - buildAckMessage(transaction)); - emit appStopped(uuid); - break; - default: - logger()->warn() << "LAUNCHER pushed unknown message:" << uuid << dict; - this->watch->sendMessage(WatchConnector::watchLAUNCHER, - buildNackMessage(transaction)); - break; - } + switch (data.at(0)) { + case WatchConnector::appmsgPUSH: + handleLauncherPushMessage(data); + break; } return true; @@ -53,27 +31,14 @@ AppMsgManager::AppMsgManager(AppManager *apps, WatchConnector *watch, QObject *p watch->setEndpointHandler(WatchConnector::watchAPPLICATION_MESSAGE, [this](const QByteArray &data) { switch (data.at(0)) { - case WatchConnector::appmsgPUSH: { - uint transaction; - QUuid uuid; - WatchConnector::Dict dict; - - if (!unpackPushMessage(data, &transaction, &uuid, &dict)) { - logger()->warn() << "Failed to parse APP_MSG PUSH"; - return true; - } - - logger()->debug() << "Received appmsg PUSH from" << uuid << "with" << dict; - - QVariantMap data = mapAppKeys(uuid, dict); - logger()->debug() << "Mapped dict" << data; - - emit messageReceived(uuid, data); + case WatchConnector::appmsgPUSH: + handlePushMessage(data); break; - } case WatchConnector::appmsgACK: + handleAckMessage(data, true); + break; case WatchConnector::appmsgNACK: - logger()->info() << "appmsg endpoint handler received an unwanted ack/nack"; + handleAckMessage(data, false); break; default: logger()->warn() << "Unknown application message type:" << int(data.at(0)); @@ -86,47 +51,22 @@ AppMsgManager::AppMsgManager(AppManager *apps, WatchConnector *watch, QObject *p void AppMsgManager::send(const QUuid &uuid, const QVariantMap &data, const std::function<void ()> &ackCallback, const std::function<void ()> &nackCallback) { - WatchConnector::Dict dict = mapAppKeys(uuid, data); - quint8 transaction = ++lastTransactionId; - QByteArray msg = buildPushMessage(transaction, uuid, dict); - - logger()->debug() << "Sending appmsg" << transaction << "to" << uuid << "with" << dict; - -#if 0 /* Try to unpack what we just packed. */ - WatchConnector::Dict t_dict; - QUuid t_uuid; - uint t_trans; - if (unpackPushMessage(msg, &t_trans, &t_uuid, &t_dict)) { - logger()->debug() << t_trans << t_uuid << t_dict; - } else { - logger()->error() << "not able to unpack my own dict"; + PendingTransaction trans; + trans.uuid = uuid; + trans.transactionId = ++lastTransactionId; + trans.dict = mapAppKeys(uuid, data); + trans.ackCallback = ackCallback; + trans.nackCallback = nackCallback; + + logger()->debug() << "Queueing appmsg" << trans.transactionId << "to" << trans.uuid + << "with dict" << trans.dict; + + pending.enqueue(trans); + if (pending.size() == 1) { + // This is the only transaction on the queue + // Therefore, we were idle before: we can submit this transaction right now. + transmitNextPendingTransaction(); } -#endif - - watch->sendMessage(WatchConnector::watchAPPLICATION_MESSAGE, msg, - [this, ackCallback, nackCallback, transaction](const QByteArray &reply) { - if (reply.size() < 2) return false; - - quint8 type = reply[0]; - quint8 recv_transaction = reply[1]; - - if (recv_transaction != transaction) return false; - - logger()->debug() << "Got response to transaction" << transaction; - - switch (type) { - case WatchConnector::appmsgACK: - logger()->debug() << "Got ACK to transaction" << transaction; - if (ackCallback) ackCallback(); - return true; - case WatchConnector::appmsgNACK: - logger()->info() << "Got NACK to transaction" << transaction; - if (nackCallback) nackCallback(); - return true; - default: - return false; - } - }); } void AppMsgManager::send(const QUuid &uuid, const QVariantMap &data) @@ -194,7 +134,7 @@ QVariantMap AppMsgManager::mapAppKeys(const QUuid &uuid, const WatchConnector::D return data; } -bool AppMsgManager::unpackPushMessage(const QByteArray &msg, uint *transaction, QUuid *uuid, WatchConnector::Dict *dict) +bool AppMsgManager::unpackPushMessage(const QByteArray &msg, quint8 *transaction, QUuid *uuid, WatchConnector::Dict *dict) { Unpacker u(msg); quint8 code = u.read<quint8>(); @@ -211,7 +151,7 @@ bool AppMsgManager::unpackPushMessage(const QByteArray &msg, uint *transaction, return true; } -QByteArray AppMsgManager::buildPushMessage(uint transaction, const QUuid &uuid, const WatchConnector::Dict &dict) +QByteArray AppMsgManager::buildPushMessage(quint8 transaction, const QUuid &uuid, const WatchConnector::Dict &dict) { QByteArray ba; Packer p(&ba); @@ -223,7 +163,7 @@ QByteArray AppMsgManager::buildPushMessage(uint transaction, const QUuid &uuid, return ba; } -QByteArray AppMsgManager::buildAckMessage(uint transaction) +QByteArray AppMsgManager::buildAckMessage(quint8 transaction) { QByteArray ba(2, Qt::Uninitialized); ba[0] = WatchConnector::appmsgACK; @@ -231,10 +171,160 @@ QByteArray AppMsgManager::buildAckMessage(uint transaction) return ba; } -QByteArray AppMsgManager::buildNackMessage(uint transaction) +QByteArray AppMsgManager::buildNackMessage(quint8 transaction) { QByteArray ba(2, Qt::Uninitialized); ba[0] = WatchConnector::appmsgNACK; ba[1] = transaction; return ba; } + +void AppMsgManager::handleLauncherPushMessage(const QByteArray &data) +{ + quint8 transaction; + QUuid uuid; + WatchConnector::Dict dict; + + if (!unpackPushMessage(data, &transaction, &uuid, &dict)) { + // Failed to parse! + // Since we're the only one handling this endpoint, + // all messages must be accepted + logger()->warn() << "Failed to parser LAUNCHER PUSH message"; + return; + } + if (!dict.contains(1)) { + logger()->warn() << "LAUNCHER message has no item in dict"; + return; + } + + switch (dict.value(1).toInt()) { + case WatchConnector::launcherSTARTED: + logger()->debug() << "App starting in watch:" << uuid; + this->watch->sendMessage(WatchConnector::watchLAUNCHER, + buildAckMessage(transaction)); + emit appStarted(uuid); + break; + case WatchConnector::launcherSTOPPED: + logger()->debug() << "App stopping in watch:" << uuid; + this->watch->sendMessage(WatchConnector::watchLAUNCHER, + buildAckMessage(transaction)); + emit appStopped(uuid); + break; + default: + logger()->warn() << "LAUNCHER pushed unknown message:" << uuid << dict; + this->watch->sendMessage(WatchConnector::watchLAUNCHER, + buildNackMessage(transaction)); + break; + } +} + +void AppMsgManager::handlePushMessage(const QByteArray &data) +{ + quint8 transaction; + QUuid uuid; + WatchConnector::Dict dict; + + if (!unpackPushMessage(data, &transaction, &uuid, &dict)) { + logger()->warn() << "Failed to parse APP_MSG PUSH"; + return; + } + + logger()->debug() << "Received appmsg PUSH from" << uuid << "with" << dict; + + QVariantMap msg = mapAppKeys(uuid, dict); + logger()->debug() << "Mapped dict" << msg; + + emit messageReceived(uuid, msg); +} + +void AppMsgManager::handleAckMessage(const QByteArray &data, bool ack) +{ + if (data.size() < 2) { + logger()->warn() << "invalid ack/nack message size"; + return; + } + + if (pending.empty()) { + logger()->warn() << "received an ack/nack but no active transaction"; + } + + const quint8 type = data[0]; + const quint8 recv_transaction = data[1]; + + Q_ASSERT(type == WatchConnector::appmsgACK || type == WatchConnector::appmsgNACK); + + PendingTransaction &trans = pending.head(); + if (trans.transactionId != recv_transaction) { + logger()->warn() << "received an ack/nack but for the wrong transaction"; + } + + logger()->debug() << "Got " << (ack ? "ACK" : "NACK") << " to transaction" << trans.transactionId; + + timeout->stop(); + + if (ack) { + if (trans.ackCallback) { + trans.ackCallback(); + } + } else { + if (trans.nackCallback) { + trans.nackCallback(); + } + } + + pending.dequeue(); + + if (!pending.empty()) { + transmitNextPendingTransaction(); + } +} + +void AppMsgManager::handleWatchConnectedChanged() +{ + // If the watch is disconnected, everything breaks loose + // TODO In the future we may want to avoid doing the following. + if (!watch->isConnected()) { + abortPendingTransactions(); + } +} + +void AppMsgManager::handleTimeout() +{ + // Abort the first transaction + Q_ASSERT(!pending.empty()); + PendingTransaction trans = pending.dequeue(); + + logger()->warn() << "timeout on appmsg transaction" << trans.transactionId; + + if (trans.nackCallback) { + trans.nackCallback(); + } + + if (!pending.empty()) { + transmitNextPendingTransaction(); + } +} + +void AppMsgManager::transmitNextPendingTransaction() +{ + Q_ASSERT(!pending.empty()); + PendingTransaction &trans = pending.head(); + + QByteArray msg = buildPushMessage(trans.transactionId, trans.uuid, trans.dict); + + watch->sendMessage(WatchConnector::watchAPPLICATION_MESSAGE, msg); + + timeout->start(); +} + +void AppMsgManager::abortPendingTransactions() +{ + // Invoke all the NACK callbacks in the pending queue, then drop them. + Q_FOREACH(const PendingTransaction &trans, pending) { + if (trans.nackCallback) { + trans.nackCallback(); + } + } + + pending.clear(); +} |
