summaryrefslogtreecommitdiff
path: root/daemon/uploadmanager.cpp
diff options
context:
space:
mode:
authorJavier <dev.git@javispedro.com>2014-12-07 23:39:29 +0100
committerJavier <dev.git@javispedro.com>2014-12-07 23:39:29 +0100
commita60c1cb3c4afd6dfd305115ec4c52e993172fa7d (patch)
tree1a12cfaed45b923ed511de388d556ccc705c9e6a /daemon/uploadmanager.cpp
parent49c20eb7e2933ae6e9e4337fc0fe9b49a39efde8 (diff)
ability to upload apps
Diffstat (limited to 'daemon/uploadmanager.cpp')
-rw-r--r--daemon/uploadmanager.cpp266
1 files changed, 266 insertions, 0 deletions
diff --git a/daemon/uploadmanager.cpp b/daemon/uploadmanager.cpp
new file mode 100644
index 0000000..89d70f7
--- /dev/null
+++ b/daemon/uploadmanager.cpp
@@ -0,0 +1,266 @@
+#include "uploadmanager.h"
+#include "unpacker.h"
+#include "packer.h"
+#include "stm32crc.h"
+
+static const int CHUNK_SIZE = 2000;
+using std::function;
+
+UploadManager::UploadManager(WatchConnector *watch, QObject *parent) :
+ QObject(parent), watch(watch), _lastUploadId(0), _state(StateNotStarted)
+{
+ watch->setEndpointHandler(WatchConnector::watchPUTBYTES,
+ [this](const QByteArray &msg) {
+ if (_pending.empty()) {
+ logger()->warn() << "putbytes message, but queue is empty!";
+ return false;
+ }
+ handleMessage(msg);
+ return true;
+ });
+}
+
+uint UploadManager::upload(WatchConnector::UploadType type, int index, QIODevice *device, int size,
+ function<void()> successCallback, function<void(int)> errorCallback)
+{
+ PendingUpload upload;
+ upload.id = ++_lastUploadId;
+ upload.type = type;
+ upload.index = index;
+ upload.device = device;
+ if (size < 0) {
+ upload.remaining = device->size();
+ } else {
+ upload.remaining = size;
+ }
+ upload.successCallback = successCallback;
+ upload.errorCallback = errorCallback;
+
+ if (upload.remaining <= 0) {
+ logger()->warn() << "upload is empty";
+ if (errorCallback) {
+ errorCallback(-1);
+ return -1;
+ }
+ }
+
+ _pending.enqueue(upload);
+
+ if (_pending.size() == 1) {
+ startNextUpload();
+ }
+
+ return upload.id;
+}
+
+void UploadManager::cancel(uint id, int code)
+{
+ if (_pending.empty()) {
+ logger()->warn() << "cannot cancel, empty queue";
+ return;
+ }
+
+ if (id == _pending.head().id) {
+ PendingUpload upload = _pending.dequeue();
+ logger()->debug() << "aborting current upload" << id << "(code:" << code << ")";
+
+ if (_state != StateNotStarted && _state != StateWaitForToken && _state != StateComplete) {
+ QByteArray msg;
+ Packer p(&msg);
+ p.write<quint8>(WatchConnector::putbytesABORT);
+ p.write<quint32>(_token);
+
+ logger()->debug() << "sending abort for upload" << id;
+
+ watch->sendMessage(WatchConnector::watchPUTBYTES, msg);
+ }
+
+ _state = StateNotStarted;
+ _token = 0;
+
+ if (upload.errorCallback) {
+ upload.errorCallback(code);
+ }
+
+ if (!_pending.empty()) {
+ startNextUpload();
+ }
+ } else {
+ for (int i = 1; i < _pending.size(); ++i) {
+ if (_pending[i].id == id) {
+ logger()->debug() << "cancelling upload" << id << "(code:" << code << ")";
+ if (_pending[i].errorCallback) {
+ _pending[i].errorCallback(code);
+ }
+ _pending.removeAt(i);
+ return;
+ }
+ }
+ logger()->warn() << "cannot cancel, id" << id << "not found";
+ }
+}
+
+void UploadManager::startNextUpload()
+{
+ Q_ASSERT(!_pending.empty());
+ Q_ASSERT(_state == StateNotStarted);
+
+ PendingUpload &upload = _pending.head();
+ QByteArray msg;
+ Packer p(&msg);
+ p.write<quint8>(WatchConnector::putbytesINIT);
+ p.write<quint32>(upload.remaining);
+ p.write<quint8>(upload.type);
+ p.write<quint8>(upload.index);
+
+ _state = StateWaitForToken;
+ watch->sendMessage(WatchConnector::watchPUTBYTES, msg);
+}
+
+void UploadManager::handleMessage(const QByteArray &msg)
+{
+ Q_ASSERT(!_pending.empty());
+ PendingUpload &upload = _pending.head();
+
+ logger()->debug() << "get message" << msg.toHex();
+
+ Unpacker u(msg);
+ int status = u.read<quint8>();
+
+ if (u.bad() || status != 1) {
+ logger()->warn() << "upload" << upload.id << "got error code=" << status;
+ cancel(upload.id, status);
+ return;
+ }
+
+ quint32 recv_token = u.read<quint32>();
+
+ if (u.bad()) {
+ logger()->warn() << "upload" << upload.id << ": could not read the token";
+ cancel(upload.id, -1);
+ return;
+ }
+
+ if (_state != StateNotStarted && _state != StateWaitForToken && _state != StateComplete) {
+ if (recv_token != _token) {
+ logger()->warn() << "upload" << upload.id << ": invalid token";
+ cancel(upload.id, -1);
+ return;
+ }
+ }
+
+ switch (_state) {
+ case StateNotStarted:
+ logger()->warn() << "got packet when upload is not started";
+ break;
+ case StateWaitForToken:
+ logger()->debug() << "token received";
+ _token = recv_token;
+ _state = StateInProgress;
+
+ /* fallthrough */
+ case StateInProgress:
+ logger()->debug() << "moving to the next chunk";
+ if (upload.remaining > 0) {
+ if (!uploadNextChunk(upload)) {
+ cancel(upload.id, -1);
+ return;
+ }
+ } else {
+ logger()->debug() << "no additional chunks, commit";
+ _state = StateCommit;
+ if (!commit(upload)) {
+ cancel(upload.id, -1);
+ return;
+ }
+ }
+ break;
+ case StateCommit:
+ logger()->debug() << "commited succesfully";
+ _state = StateComplete;
+ if (!complete(upload)) {
+ cancel(upload.id, -1);
+ return;
+ }
+ break;
+ case StateComplete:
+ logger()->debug() << "upload" << upload.id << "succesful, invoking callback";
+ if (upload.successCallback) {
+ upload.successCallback();
+ }
+ _pending.dequeue();
+ _token = 0;
+ _state = StateNotStarted;
+ if (!_pending.empty()) {
+ startNextUpload();
+ }
+ break;
+ }
+}
+
+bool UploadManager::uploadNextChunk(PendingUpload &upload)
+{
+ QByteArray chunk = upload.device->read(qMin<int>(upload.remaining, CHUNK_SIZE));
+
+ if (upload.remaining < CHUNK_SIZE && chunk.size() < upload.remaining) {
+ // Short read!
+ logger()->warn() << "short read during upload" << upload.id;
+ return false;
+ }
+
+ Q_ASSERT(!chunk.isEmpty());
+ Q_ASSERT(_state = StateInProgress);
+
+ QByteArray msg;
+ Packer p(&msg);
+ p.write<quint8>(WatchConnector::putbytesSEND);
+ p.write<quint32>(_token);
+ p.write<quint32>(chunk.size());
+ msg.append(chunk);
+
+ logger()->debug() << "sending a chunk of" << chunk.size() << "bytes";
+
+ watch->sendMessage(WatchConnector::watchPUTBYTES, msg);
+
+ upload.remaining -= chunk.size();
+ upload.crc.addData(chunk);
+
+ logger()->debug() << "remaining" << upload.remaining << "bytes";
+
+ return true;
+}
+
+bool UploadManager::commit(PendingUpload &upload)
+{
+ Q_ASSERT(_state == StateCommit);
+ Q_ASSERT(upload.remaining == 0);
+
+ QByteArray msg;
+ Packer p(&msg);
+ p.write<quint8>(WatchConnector::putbytesCOMMIT);
+ p.write<quint32>(_token);
+ p.write<quint32>(upload.crc.result());
+
+ logger()->debug() << "commiting upload" << upload.id
+ << "with crc" << qPrintable(QString("0x%1").arg(upload.crc.result(), 0, 16));
+
+ watch->sendMessage(WatchConnector::watchPUTBYTES, msg);
+
+ return true;
+}
+
+bool UploadManager::complete(PendingUpload &upload)
+{
+ Q_ASSERT(_state == StateComplete);
+
+ QByteArray msg;
+ Packer p(&msg);
+ p.write<quint8>(WatchConnector::putbytesCOMPLETE);
+ p.write<quint32>(_token);
+
+ logger()->debug() << "completing upload" << upload.id;
+
+ watch->sendMessage(WatchConnector::watchPUTBYTES, msg);
+
+ return true;
+}