summaryrefslogtreecommitdiff
path: root/daemon/appmsgmanager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'daemon/appmsgmanager.cpp')
-rw-r--r--daemon/appmsgmanager.cpp288
1 files changed, 189 insertions, 99 deletions
diff --git a/daemon/appmsgmanager.cpp b/daemon/appmsgmanager.cpp
index 12971ce..d5f527e 100644
--- a/daemon/appmsgmanager.cpp
+++ b/daemon/appmsgmanager.cpp
@@ -1,3 +1,5 @@
+#include <QTimer>
+
#include "appmsgmanager.h"
#include "unpacker.h"
#include "packer.h"
@@ -5,46 +7,22 @@
// TODO D-Bus server for non JS kit apps!!!!
AppMsgManager::AppMsgManager(AppManager *apps, WatchConnector *watch, QObject *parent)
- : QObject(parent), apps(apps), watch(watch), lastTransactionId(0)
+ : QObject(parent), apps(apps), watch(watch), lastTransactionId(0), timeout(new QTimer(this))
{
+ connect(watch, &WatchConnector::connectedChanged,
+ this, &AppMsgManager::handleWatchConnectedChanged);
+
+ timeout->setSingleShot(true);
+ timeout->setInterval(3000);
+ connect(timeout, &QTimer::timeout,
+ this, &AppMsgManager::handleTimeout);
+
watch->setEndpointHandler(WatchConnector::watchLAUNCHER,
[this](const QByteArray &data) {
- if (data.at(0) == WatchConnector::appmsgPUSH) {
- uint transaction;
- QUuid uuid;
- WatchConnector::Dict dict;
-
- if (!unpackPushMessage(data, &transaction, &uuid, &dict)) {
- // Failed to parse!
- // Since we're the only one handling this endpoint,
- // all messages must be accepted
- logger()->warn() << "Failed to parser LAUNCHER PUSH message";
- return true;
- }
- if (!dict.contains(1)) {
- logger()->warn() << "LAUNCHER message has no item in dict";
- return true;
- }
-
- switch (dict.value(1).toInt()) {
- case WatchConnector::launcherSTARTED:
- logger()->debug() << "App starting in watch:" << uuid;
- this->watch->sendMessage(WatchConnector::watchLAUNCHER,
- buildAckMessage(transaction));
- emit appStarted(uuid);
- break;
- case WatchConnector::launcherSTOPPED:
- logger()->debug() << "App stopping in watch:" << uuid;
- this->watch->sendMessage(WatchConnector::watchLAUNCHER,
- buildAckMessage(transaction));
- emit appStopped(uuid);
- break;
- default:
- logger()->warn() << "LAUNCHER pushed unknown message:" << uuid << dict;
- this->watch->sendMessage(WatchConnector::watchLAUNCHER,
- buildNackMessage(transaction));
- break;
- }
+ switch (data.at(0)) {
+ case WatchConnector::appmsgPUSH:
+ handleLauncherPushMessage(data);
+ break;
}
return true;
@@ -53,27 +31,14 @@ AppMsgManager::AppMsgManager(AppManager *apps, WatchConnector *watch, QObject *p
watch->setEndpointHandler(WatchConnector::watchAPPLICATION_MESSAGE,
[this](const QByteArray &data) {
switch (data.at(0)) {
- case WatchConnector::appmsgPUSH: {
- uint transaction;
- QUuid uuid;
- WatchConnector::Dict dict;
-
- if (!unpackPushMessage(data, &transaction, &uuid, &dict)) {
- logger()->warn() << "Failed to parse APP_MSG PUSH";
- return true;
- }
-
- logger()->debug() << "Received appmsg PUSH from" << uuid << "with" << dict;
-
- QVariantMap data = mapAppKeys(uuid, dict);
- logger()->debug() << "Mapped dict" << data;
-
- emit messageReceived(uuid, data);
+ case WatchConnector::appmsgPUSH:
+ handlePushMessage(data);
break;
- }
case WatchConnector::appmsgACK:
+ handleAckMessage(data, true);
+ break;
case WatchConnector::appmsgNACK:
- logger()->info() << "appmsg endpoint handler received an unwanted ack/nack";
+ handleAckMessage(data, false);
break;
default:
logger()->warn() << "Unknown application message type:" << int(data.at(0));
@@ -86,47 +51,22 @@ AppMsgManager::AppMsgManager(AppManager *apps, WatchConnector *watch, QObject *p
void AppMsgManager::send(const QUuid &uuid, const QVariantMap &data, const std::function<void ()> &ackCallback, const std::function<void ()> &nackCallback)
{
- WatchConnector::Dict dict = mapAppKeys(uuid, data);
- quint8 transaction = ++lastTransactionId;
- QByteArray msg = buildPushMessage(transaction, uuid, dict);
-
- logger()->debug() << "Sending appmsg" << transaction << "to" << uuid << "with" << dict;
-
-#if 0 /* Try to unpack what we just packed. */
- WatchConnector::Dict t_dict;
- QUuid t_uuid;
- uint t_trans;
- if (unpackPushMessage(msg, &t_trans, &t_uuid, &t_dict)) {
- logger()->debug() << t_trans << t_uuid << t_dict;
- } else {
- logger()->error() << "not able to unpack my own dict";
+ PendingTransaction trans;
+ trans.uuid = uuid;
+ trans.transactionId = ++lastTransactionId;
+ trans.dict = mapAppKeys(uuid, data);
+ trans.ackCallback = ackCallback;
+ trans.nackCallback = nackCallback;
+
+ logger()->debug() << "Queueing appmsg" << trans.transactionId << "to" << trans.uuid
+ << "with dict" << trans.dict;
+
+ pending.enqueue(trans);
+ if (pending.size() == 1) {
+ // This is the only transaction on the queue
+ // Therefore, we were idle before: we can submit this transaction right now.
+ transmitNextPendingTransaction();
}
-#endif
-
- watch->sendMessage(WatchConnector::watchAPPLICATION_MESSAGE, msg,
- [this, ackCallback, nackCallback, transaction](const QByteArray &reply) {
- if (reply.size() < 2) return false;
-
- quint8 type = reply[0];
- quint8 recv_transaction = reply[1];
-
- if (recv_transaction != transaction) return false;
-
- logger()->debug() << "Got response to transaction" << transaction;
-
- switch (type) {
- case WatchConnector::appmsgACK:
- logger()->debug() << "Got ACK to transaction" << transaction;
- if (ackCallback) ackCallback();
- return true;
- case WatchConnector::appmsgNACK:
- logger()->info() << "Got NACK to transaction" << transaction;
- if (nackCallback) nackCallback();
- return true;
- default:
- return false;
- }
- });
}
void AppMsgManager::send(const QUuid &uuid, const QVariantMap &data)
@@ -194,7 +134,7 @@ QVariantMap AppMsgManager::mapAppKeys(const QUuid &uuid, const WatchConnector::D
return data;
}
-bool AppMsgManager::unpackPushMessage(const QByteArray &msg, uint *transaction, QUuid *uuid, WatchConnector::Dict *dict)
+bool AppMsgManager::unpackPushMessage(const QByteArray &msg, quint8 *transaction, QUuid *uuid, WatchConnector::Dict *dict)
{
Unpacker u(msg);
quint8 code = u.read<quint8>();
@@ -211,7 +151,7 @@ bool AppMsgManager::unpackPushMessage(const QByteArray &msg, uint *transaction,
return true;
}
-QByteArray AppMsgManager::buildPushMessage(uint transaction, const QUuid &uuid, const WatchConnector::Dict &dict)
+QByteArray AppMsgManager::buildPushMessage(quint8 transaction, const QUuid &uuid, const WatchConnector::Dict &dict)
{
QByteArray ba;
Packer p(&ba);
@@ -223,7 +163,7 @@ QByteArray AppMsgManager::buildPushMessage(uint transaction, const QUuid &uuid,
return ba;
}
-QByteArray AppMsgManager::buildAckMessage(uint transaction)
+QByteArray AppMsgManager::buildAckMessage(quint8 transaction)
{
QByteArray ba(2, Qt::Uninitialized);
ba[0] = WatchConnector::appmsgACK;
@@ -231,10 +171,160 @@ QByteArray AppMsgManager::buildAckMessage(uint transaction)
return ba;
}
-QByteArray AppMsgManager::buildNackMessage(uint transaction)
+QByteArray AppMsgManager::buildNackMessage(quint8 transaction)
{
QByteArray ba(2, Qt::Uninitialized);
ba[0] = WatchConnector::appmsgNACK;
ba[1] = transaction;
return ba;
}
+
+void AppMsgManager::handleLauncherPushMessage(const QByteArray &data)
+{
+ quint8 transaction;
+ QUuid uuid;
+ WatchConnector::Dict dict;
+
+ if (!unpackPushMessage(data, &transaction, &uuid, &dict)) {
+ // Failed to parse!
+ // Since we're the only one handling this endpoint,
+ // all messages must be accepted
+ logger()->warn() << "Failed to parser LAUNCHER PUSH message";
+ return;
+ }
+ if (!dict.contains(1)) {
+ logger()->warn() << "LAUNCHER message has no item in dict";
+ return;
+ }
+
+ switch (dict.value(1).toInt()) {
+ case WatchConnector::launcherSTARTED:
+ logger()->debug() << "App starting in watch:" << uuid;
+ this->watch->sendMessage(WatchConnector::watchLAUNCHER,
+ buildAckMessage(transaction));
+ emit appStarted(uuid);
+ break;
+ case WatchConnector::launcherSTOPPED:
+ logger()->debug() << "App stopping in watch:" << uuid;
+ this->watch->sendMessage(WatchConnector::watchLAUNCHER,
+ buildAckMessage(transaction));
+ emit appStopped(uuid);
+ break;
+ default:
+ logger()->warn() << "LAUNCHER pushed unknown message:" << uuid << dict;
+ this->watch->sendMessage(WatchConnector::watchLAUNCHER,
+ buildNackMessage(transaction));
+ break;
+ }
+}
+
+void AppMsgManager::handlePushMessage(const QByteArray &data)
+{
+ quint8 transaction;
+ QUuid uuid;
+ WatchConnector::Dict dict;
+
+ if (!unpackPushMessage(data, &transaction, &uuid, &dict)) {
+ logger()->warn() << "Failed to parse APP_MSG PUSH";
+ return;
+ }
+
+ logger()->debug() << "Received appmsg PUSH from" << uuid << "with" << dict;
+
+ QVariantMap msg = mapAppKeys(uuid, dict);
+ logger()->debug() << "Mapped dict" << msg;
+
+ emit messageReceived(uuid, msg);
+}
+
+void AppMsgManager::handleAckMessage(const QByteArray &data, bool ack)
+{
+ if (data.size() < 2) {
+ logger()->warn() << "invalid ack/nack message size";
+ return;
+ }
+
+ if (pending.empty()) {
+ logger()->warn() << "received an ack/nack but no active transaction";
+ }
+
+ const quint8 type = data[0];
+ const quint8 recv_transaction = data[1];
+
+ Q_ASSERT(type == WatchConnector::appmsgACK || type == WatchConnector::appmsgNACK);
+
+ PendingTransaction &trans = pending.head();
+ if (trans.transactionId != recv_transaction) {
+ logger()->warn() << "received an ack/nack but for the wrong transaction";
+ }
+
+ logger()->debug() << "Got " << (ack ? "ACK" : "NACK") << " to transaction" << trans.transactionId;
+
+ timeout->stop();
+
+ if (ack) {
+ if (trans.ackCallback) {
+ trans.ackCallback();
+ }
+ } else {
+ if (trans.nackCallback) {
+ trans.nackCallback();
+ }
+ }
+
+ pending.dequeue();
+
+ if (!pending.empty()) {
+ transmitNextPendingTransaction();
+ }
+}
+
+void AppMsgManager::handleWatchConnectedChanged()
+{
+ // If the watch is disconnected, everything breaks loose
+ // TODO In the future we may want to avoid doing the following.
+ if (!watch->isConnected()) {
+ abortPendingTransactions();
+ }
+}
+
+void AppMsgManager::handleTimeout()
+{
+ // Abort the first transaction
+ Q_ASSERT(!pending.empty());
+ PendingTransaction trans = pending.dequeue();
+
+ logger()->warn() << "timeout on appmsg transaction" << trans.transactionId;
+
+ if (trans.nackCallback) {
+ trans.nackCallback();
+ }
+
+ if (!pending.empty()) {
+ transmitNextPendingTransaction();
+ }
+}
+
+void AppMsgManager::transmitNextPendingTransaction()
+{
+ Q_ASSERT(!pending.empty());
+ PendingTransaction &trans = pending.head();
+
+ QByteArray msg = buildPushMessage(trans.transactionId, trans.uuid, trans.dict);
+
+ watch->sendMessage(WatchConnector::watchAPPLICATION_MESSAGE, msg);
+
+ timeout->start();
+}
+
+void AppMsgManager::abortPendingTransactions()
+{
+ // Invoke all the NACK callbacks in the pending queue, then drop them.
+ Q_FOREACH(const PendingTransaction &trans, pending) {
+ if (trans.nackCallback) {
+ trans.nackCallback();
+ }
+ }
+
+ pending.clear();
+}