diff --git a/Heart/AbstractSpace/Private/taskscheduler.cpp b/Heart/AbstractSpace/Private/taskscheduler.cpp new file mode 100644 index 0000000..fa76913 --- /dev/null +++ b/Heart/AbstractSpace/Private/taskscheduler.cpp @@ -0,0 +1,92 @@ +//# +//# Copyright (C) 2021-2021 QuasarApp. +//# Distributed under the lgplv3 software license, see the accompanying +//# Everyone is permitted to copy and distribute verbatim copies +//# of this license document, but changing it is not allowed. +//# + + +#include "taskscheduler.h" + +namespace QH { + +TaskScheduler::TaskScheduler() { + _timer = new QTimer(this); + + connect(_timer, &QTimer::timeout, this, &TaskScheduler::handleTimeOut); +} + +bool TaskScheduler::shedule(const QSharedPointer<AbstractTask> &task) { + int currentTime = time(0); + int invokeTime = 0; + + switch (task->mode()) { + case SheduleMode::SingleWork: { + invokeTime = currentTime + task->time(); + break; + } + case SheduleMode::Repeat: { + invokeTime = currentTime + task->time(); + break; + } + case SheduleMode::TimePoint: { + invokeTime = task->time(); + + if (invokeTime < currentTime) + return false; + break; + } + } + + _taskPool[task->taskId()] = task; + _taskQueue.insert(invokeTime, task->taskId()); + + + int top = _taskQueue.begin().key(); + + _timer->start(top * 1000); + + return true; +} + +bool TaskScheduler::remove(const QSharedPointer<AbstractTask> &task) { + return remove(task->taskId()); +} + +bool TaskScheduler::remove(int task) { + auto val = _taskQueue.key(task); + + _taskQueue.remove(val); + _taskPool.remove(task); + + return true; +} + +void TaskScheduler::handleTimeOut() { + auto top = _taskQueue.begin(); + + if (top == _taskQueue.end()) { + _timer->stop(); + return; + } + + auto toWork = _taskQueue.values(top.key()); + + for (int key: toWork) { + auto task = _taskPool.value(key); + + if (!task) { + _taskPool.remove(key); + continue; + } + + remove(task); + + if (task->mode() == SheduleMode::Repeat) { + shedule(task); + } + + emit sigPushWork(task); + } +} +} diff --git a/Heart/AbstractSpace/Private/taskscheduler.h b/Heart/AbstractSpace/Private/taskscheduler.h new file mode 100644 index 0000000..96807b1 --- /dev/null +++ b/Heart/AbstractSpace/Private/taskscheduler.h @@ -0,0 +1,66 @@ +//# +//# Copyright (C) 2021-2021 QuasarApp. +//# Distributed under the lgplv3 software license, see the accompanying +//# Everyone is permitted to copy and distribute verbatim copies +//# of this license document, but changing it is not allowed. +//# + +#ifndef TASKSCHEDULER_H +#define TASKSCHEDULER_H + +#include "abstracttask.h" + +#include <QMap> +#include <QSharedPointer> +#include <QTimer> + +namespace QH { + +/** + * @brief The TaskScheduler class This class contains queue of all shedule tasks. + * @see AbstractTask + */ +class TaskScheduler: public QObject +{ + Q_OBJECT +public: + TaskScheduler(); + + /** + * @brief shedule This method shedule new task in this node. + * @param task This is sharedpointe to taskObject. + * @return true if the task sheduled successful. If the task alredy sheduled the return false. + */ + bool shedule(const QSharedPointer<AbstractTask>& task); + + /** + * @brief remove This method remove the @a task from a tasks queue. + * @param task This is removed task. + * @return true if the task remove successful + */ + bool remove(const QSharedPointer<AbstractTask>& task); + + /** + * @brief remove This method remove the task with @a task id. + * @param task This is id of the removed task. + * @return true if the method removed successful + */ + bool remove(int task); + +signals: + /** + * @brief sigPushWork This signal emited when the task @a work neet to execute. + * @param work This is needed to execute task. + */ + void sigPushWork(QSharedPointer<QH::AbstractTask> work); + +private slots: + void handleTimeOut(); + +private: + QMultiMap<int, int> _taskQueue; + QHash<int, QSharedPointer<AbstractTask>> _taskPool; + QTimer *_timer = nullptr; +}; +} +#endif // TASKSCHEDULER_H diff --git a/Heart/AbstractSpace/abstractnode.cpp b/Heart/AbstractSpace/abstractnode.cpp index 343dc56..f139284 100644 --- a/Heart/AbstractSpace/abstractnode.cpp +++ b/Heart/AbstractSpace/abstractnode.cpp @@ -30,8 +30,10 @@ #include "tcpsocket.h" #include "asynclauncher.h" #include "receivedata.h" +#include "abstracttask.h" #include <bigdatamanager.h> +#include <taskscheduler.h> namespace QH { @@ -51,6 +53,12 @@ AbstractNode::AbstractNode( QObject *ptr): _dataSender = new DataSender(_senderThread); _socketWorker = new AsyncLauncher(_senderThread); _bigdatamanager = new BigDataManager(this); + _tasksheduller = new TaskScheduler(); + + qRegisterMetaType<QSharedPointer<QH::AbstractTask>>(); + + connect(_tasksheduller, &TaskScheduler::sigPushWork, + this, &AbstractNode::handleBeginWork); registerPackageType<Ping>(); registerPackageType<BadRequest>(); @@ -72,12 +80,13 @@ AbstractNode::~AbstractNode() { for (auto it: qAsConst(_receiveData)) { delete it; } - _receiveData.clear(); + _receiveData.clear(); delete _dataSender; delete _senderThread; delete _socketWorker; + delete _tasksheduller; } bool AbstractNode::run(const QString &addres, unsigned short port) { @@ -948,6 +957,12 @@ void AbstractNode::handleWorkerStoped() { auto senderObject = dynamic_cast<QFutureWatcher <bool>*>(sender()); if (senderObject) { + + if (!senderObject->result()) { + QuasarAppUtils::Params::log("Work finished with an error.", + QuasarAppUtils::Error); + } + _workers.remove(senderObject); delete senderObject; } @@ -960,6 +975,27 @@ void AbstractNode::handleForceRemoveNode(HostAddress node) { } } +void AbstractNode::handleBeginWork(QSharedPointer<QH::AbstractTask> work) { + + auto executeObject = [this, work]() -> bool { + if (!work) + return false; + + return work->execute(this); + }; + + QMutexLocker locer(&_threadPoolMutex); + + if (_threadPool) { + auto worker = new QFutureWatcher <bool>(); + worker->setFuture(QtConcurrent::run(_threadPool, executeObject)); + _workers.insert(worker); + + connect(worker, &QFutureWatcher<bool>::finished, + this, &AbstractNode::handleWorkerStoped); + } +} + bool AbstractNode::listen(const HostAddress &address) { return QTcpServer::listen(address, address.port()); } @@ -1007,6 +1043,14 @@ QList<HostAddress> AbstractNode::activeConnectionsList() const { return result; } +void AbstractNode::sheduleTask(const QSharedPointer<AbstractTask> &task) { + _tasksheduller->shedule(task); +} + +void AbstractNode::removeTask(int taskId) { + _tasksheduller->remove(taskId); +} + void AbstractNode::newWork(const Package &pkg, AbstractNodeInfo *sender, const HostAddress& id) { diff --git a/Heart/AbstractSpace/abstractnode.h b/Heart/AbstractSpace/abstractnode.h index b328c92..1f0d200 100644 --- a/Heart/AbstractSpace/abstractnode.h +++ b/Heart/AbstractSpace/abstractnode.h @@ -40,6 +40,8 @@ class ReceiveData; class SocketFactory; class AsyncLauncher; class BigDataManager; +class TaskScheduler; +class AbstractTask; namespace PKG { class ErrorData; @@ -582,6 +584,18 @@ protected: */ QList<HostAddress> activeConnectionsList() const; + /** + * @brief sheduleTask This method shedule execute task on this node. + * @param task This is task that will be sheduled. + */ + void sheduleTask(const QSharedPointer<AbstractTask>& task); + + /** + * @brief removeTask This method remove task from sheduler. + * @param taskId This is task id that will be removed. + */ + void removeTask(int taskId); + /** * @brief commandHandler This method it is simple wrapper for the handle pacakges in the AbstractNode::parsePackage method. * Exmaple of use : @@ -667,6 +681,12 @@ private slots: */ void handleForceRemoveNode(HostAddress node); + /** + * @brief handleBeginWork This method run task on new thread. + * @param work This is new work task + */ + void handleBeginWork(QSharedPointer<QH::AbstractTask> work); + private: /** @@ -714,6 +734,7 @@ private: AsyncLauncher * _socketWorker = nullptr; QThread *_senderThread = nullptr; BigDataManager *_bigdatamanager = nullptr; + TaskScheduler *_tasksheduller = nullptr; QSet<QFutureWatcher <bool>*> _workers; diff --git a/Heart/AbstractSpace/abstracttask.cpp b/Heart/AbstractSpace/abstracttask.cpp new file mode 100644 index 0000000..b50f26d --- /dev/null +++ b/Heart/AbstractSpace/abstracttask.cpp @@ -0,0 +1,48 @@ +//# +//# Copyright (C) 2021-2021 QuasarApp. +//# Distributed under the lgplv3 software license, see the accompanying +//# Everyone is permitted to copy and distribute verbatim copies +//# of this license document, but changing it is not allowed. +//# + + + +#include "abstracttask.h" +#include <QHash> +#include <QByteArray> + +namespace QH { + + +SheduleMode AbstractTask::mode() const { + return _mode; +} + +void AbstractTask::setMode(SheduleMode newMode) { + _mode = newMode; + idGen(); +} + +int AbstractTask::time() const { + return _time; +} + +void AbstractTask::setTime(int newTime) { + _time = newTime; + idGen(); +} + +int AbstractTask::taskId() const { + return _taskId; +} + +void AbstractTask::idGen() { + QByteArray data; + data.push_back(_time); + data.push_back(static_cast<int>(_mode)); + data.push_back(typeid(this).hash_code()); + + _taskId = qHash(data); +} + +} diff --git a/Heart/AbstractSpace/abstracttask.h b/Heart/AbstractSpace/abstracttask.h new file mode 100644 index 0000000..9c5c2de --- /dev/null +++ b/Heart/AbstractSpace/abstracttask.h @@ -0,0 +1,91 @@ +//# +//# Copyright (C) 2021-2021 QuasarApp. +//# Distributed under the lgplv3 software license, see the accompanying +//# Everyone is permitted to copy and distribute verbatim copies +//# of this license document, but changing it is not allowed. +//# + +#ifndef ABSTRACTTASK_H +#define ABSTRACTTASK_H +#include <QSharedPointer> + +namespace QH { + +class AbstractNode; +/** + * @brief The SheduleMode enum contails list of the shedule modes. + */ +enum class SheduleMode { + /// In this mode AbstractTask will be executed after **time** secunds from the moment of adding this task. + SingleWork, + /// In this mode AbstractTask will be executed task every **time** from the moment of adding this task. + Repeat, + /// In this mode AbstractTask will be executed int **time** secunds by Unix time. + TimePoint +}; + +/** + * @brief The AbstractTask class. All tasks executed on separate thread. + */ +class AbstractTask +{ +public: + AbstractTask() = default; + virtual ~AbstractTask() = default; + + /** + * @brief mode This method retunr current mode of this task. + * @return current mode of this task. + * @see AbstractTask::setMode + */ + SheduleMode mode() const; + + /** + * @brief setMode This method sets new mode of this task. + * @param newMode new mode of this task. + * @see AbstractTask::mode + */ + void setMode(SheduleMode newMode); + + /** + * @brief time This is universal property. + * This property has its own meaning for each AbstractTask::mode. + * For more information see the SheduleMode enum. + * @return time property. + * @see AbstractTask::setTime + */ + int time() const; + + /** + * @brief setTime This method sets new value for the AbstractTask::time property. + * @param newTime This is new value of the time propertye. + * @see AbstractTask::time + */ + void setTime(int newTime); + + /** + * @brief execute This method will be invoked when task be executed. + * @param node This is pointer to node object. + * @return true if the work winished successfull + */ + virtual bool execute(AbstractNode *node) const = 0; + + /** + * @brief taskId This method return id of this task. + * @return id of this task. + */ + int taskId() const; + +private: + void idGen(); + SheduleMode _mode = SheduleMode::SingleWork; + int _time = 0; + int _taskId = 0; +}; + +} + +Q_DECLARE_METATYPE(QSharedPointer<QH::AbstractTask>) + + +#endif // ABSTRACTTASK_H