Merge pull request #42 from QuasarApp/sheduller

Sheduller
This commit is contained in:
Andrei Yankovich 2021-10-12 12:10:32 +03:00 committed by GitHub
commit 32284bcaee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 552 additions and 3 deletions

View File

@ -0,0 +1,105 @@
//#
//# Copyright (C) 2021-2021 QuasarApp.
//# Distributed under the lgplv3 software license, see the accompanying
//# Everyone is permitted to copy and distribute verbatim copies
//# of this license document, but changing it is not allowed.
//#
#include "taskscheduler.h"
#include <QDateTime>
namespace QH {
TaskScheduler::TaskScheduler() {
_timer = new QTimer();
connect(_timer, &QTimer::timeout, this, &TaskScheduler::handleTimeOut);
}
TaskScheduler::~TaskScheduler() {
_timer->stop();
delete _timer;
}
bool TaskScheduler::shedule(const QSharedPointer<AbstractTask> &task) {
quint64 currentTime = QDateTime::currentMSecsSinceEpoch();
quint64 invokeTime = 0;
switch (task->mode()) {
case SheduleMode::SingleWork: {
invokeTime = currentTime + task->time();
break;
}
case SheduleMode::Repeat: {
invokeTime = currentTime + task->time();
break;
}
case SheduleMode::TimePoint: {
invokeTime = task->time();
if (invokeTime < currentTime)
return false;
break;
}
}
_taskPool[task->taskId()] = task;
_taskQueue.insert(invokeTime, task->taskId());
int top = _taskQueue.begin().key();
int timeout = top - currentTime;
if (timeout < 0)
timeout = 0;
_timer->start(timeout);
return true;
}
bool TaskScheduler::remove(const QSharedPointer<AbstractTask> &task) {
return remove(task->taskId());
}
bool TaskScheduler::remove(int task) {
auto val = _taskQueue.key(task);
_taskQueue.remove(val);
_taskPool.remove(task);
return true;
}
int TaskScheduler::taskCount() const {
return _taskPool.size();
}
void TaskScheduler::handleTimeOut() {
auto top = _taskQueue.begin();
if (top == _taskQueue.end()) {
_timer->stop();
return;
}
auto toWork = _taskQueue.values(top.key());
for (int key: toWork) {
auto task = _taskPool.value(key);
if (!task) {
_taskPool.remove(key);
continue;
}
remove(task);
if (task->mode() == SheduleMode::Repeat) {
shedule(task);
}
emit sigPushWork(task);
}
}
}

View File

@ -0,0 +1,74 @@
//#
//# Copyright (C) 2021-2021 QuasarApp.
//# Distributed under the lgplv3 software license, see the accompanying
//# Everyone is permitted to copy and distribute verbatim copies
//# of this license document, but changing it is not allowed.
//#
#ifndef TASKSCHEDULER_H
#define TASKSCHEDULER_H
#include "abstracttask.h"
#include <QHash>
#include <QMap>
#include <QSharedPointer>
#include <QTimer>
namespace QH {
/**
* @brief The TaskScheduler class This class contains queue of all shedule tasks.
* @see AbstractTask
*/
class TaskScheduler: public QObject
{
Q_OBJECT
public:
TaskScheduler();
~TaskScheduler();
/**
* @brief shedule This method shedule new task in this node.
* @param task This is sharedpointe to taskObject.
* @return true if the task sheduled successful. If the task alredy sheduled the return false.
*/
bool shedule(const QSharedPointer<AbstractTask>& task);
/**
* @brief remove This method remove the @a task from a tasks queue.
* @param task This is removed task.
* @return true if the task remove successful
*/
bool remove(const QSharedPointer<AbstractTask>& task);
/**
* @brief remove This method remove the task with @a task id.
* @param task This is id of the removed task.
* @return true if the method removed successful
*/
bool remove(int task);
/**
* @brief taskCount This method return tasks count.
* @return tasks count
*/
int taskCount() const;
signals:
/**
* @brief sigPushWork This signal emited when the task @a work neet to execute.
* @param work This is needed to execute task.
*/
void sigPushWork(QSharedPointer<QH::AbstractTask> work);
private slots:
void handleTimeOut();
private:
QMultiMap<quint64, int> _taskQueue;
QHash<int, QSharedPointer<AbstractTask>> _taskPool;
QTimer *_timer = nullptr;
};
}
#endif // TASKSCHEDULER_H

View File

@ -30,8 +30,10 @@
#include "tcpsocket.h"
#include "asynclauncher.h"
#include "receivedata.h"
#include "abstracttask.h"
#include <bigdatamanager.h>
#include <taskscheduler.h>
namespace QH {
@ -51,6 +53,12 @@ AbstractNode::AbstractNode( QObject *ptr):
_dataSender = new DataSender(_senderThread);
_socketWorker = new AsyncLauncher(_senderThread);
_bigdatamanager = new BigDataManager(this);
_tasksheduller = new TaskScheduler();
qRegisterMetaType<QSharedPointer<QH::AbstractTask>>();
connect(_tasksheduller, &TaskScheduler::sigPushWork,
this, &AbstractNode::handleBeginWork);
registerPackageType<Ping>();
registerPackageType<BadRequest>();
@ -72,12 +80,13 @@ AbstractNode::~AbstractNode() {
for (auto it: qAsConst(_receiveData)) {
delete it;
}
_receiveData.clear();
_receiveData.clear();
delete _dataSender;
delete _senderThread;
delete _socketWorker;
delete _tasksheduller;
}
bool AbstractNode::run(const QString &addres, unsigned short port) {
@ -948,6 +957,7 @@ void AbstractNode::handleWorkerStoped() {
auto senderObject = dynamic_cast<QFutureWatcher <bool>*>(sender());
if (senderObject) {
_workers.remove(senderObject);
delete senderObject;
}
@ -960,6 +970,27 @@ void AbstractNode::handleForceRemoveNode(HostAddress node) {
}
}
void AbstractNode::handleBeginWork(QSharedPointer<QH::AbstractTask> work) {
auto executeObject = [this, work]() -> bool {
if (!work)
return false;
return work->execute(this);
};
QMutexLocker locer(&_threadPoolMutex);
if (_threadPool) {
auto worker = new QFutureWatcher <bool>();
worker->setFuture(QtConcurrent::run(_threadPool, executeObject));
_workers.insert(worker);
connect(worker, &QFutureWatcher<bool>::finished,
this, &AbstractNode::handleWorkerStoped);
}
}
bool AbstractNode::listen(const HostAddress &address) {
return QTcpServer::listen(address, address.port());
}
@ -1007,6 +1038,18 @@ QList<HostAddress> AbstractNode::activeConnectionsList() const {
return result;
}
void AbstractNode::sheduleTask(const QSharedPointer<AbstractTask> &task) {
_tasksheduller->shedule(task);
}
void AbstractNode::removeTask(int taskId) {
_tasksheduller->remove(taskId);
}
int AbstractNode::sheduledTaskCount() const {
return _tasksheduller->taskCount();
}
void AbstractNode::newWork(const Package &pkg, AbstractNodeInfo *sender,
const HostAddress& id) {

View File

@ -40,6 +40,8 @@ class ReceiveData;
class SocketFactory;
class AsyncLauncher;
class BigDataManager;
class TaskScheduler;
class AbstractTask;
namespace PKG {
class ErrorData;
@ -250,6 +252,33 @@ public:
*/
static QThread *mainThreadID();
/**
* @brief sheduleTask This method shedule execute task on this node.
* @param task This is task that will be sheduled.
* @see AbstractNode::removeTask
* @see AbstractNode::sheduledTaskCount
*/
void sheduleTask(const QSharedPointer<AbstractTask>& task);
/**
* @brief removeTask This method remove task from sheduler.
* @param taskId This is task id that will be removed.
* @see AbstractNode::sheduleTask
* @see AbstractNode::sheduledTaskCount
*/
void removeTask(int taskId);
/**
* @brief sheduledTaskCount This method return count of sheduled tasks.
* @return count of sheduled tasks.
* @see AbstractNode::sheduleTask
* @see AbstractNode::removeTask
*/
int sheduledTaskCount() const;
signals:
/**
* @brief requestError This signal emited when client or node received from remoute server or node the BadRequest package.
@ -259,7 +288,6 @@ signals:
void requestError(unsigned char code, QString msg);
protected:
#ifdef HEART_SSL
@ -582,6 +610,7 @@ protected:
*/
QList<HostAddress> activeConnectionsList() const;
/**
* @brief commandHandler This method it is simple wrapper for the handle pacakges in the AbstractNode::parsePackage method.
* Exmaple of use :
@ -667,6 +696,12 @@ private slots:
*/
void handleForceRemoveNode(HostAddress node);
/**
* @brief handleBeginWork This method run task on new thread.
* @param work This is new work task
*/
void handleBeginWork(QSharedPointer<QH::AbstractTask> work);
private:
/**
@ -714,6 +749,7 @@ private:
AsyncLauncher * _socketWorker = nullptr;
QThread *_senderThread = nullptr;
BigDataManager *_bigdatamanager = nullptr;
TaskScheduler *_tasksheduller = nullptr;
QSet<QFutureWatcher <bool>*> _workers;

View File

@ -0,0 +1,50 @@
//#
//# Copyright (C) 2021-2021 QuasarApp.
//# Distributed under the lgplv3 software license, see the accompanying
//# Everyone is permitted to copy and distribute verbatim copies
//# of this license document, but changing it is not allowed.
//#
#include "abstracttask.h"
#include <QHash>
#include <QByteArray>
namespace QH {
SheduleMode AbstractTask::mode() const {
return _mode;
}
void AbstractTask::setMode(SheduleMode newMode) {
_mode = newMode;
idGen();
}
quint64 AbstractTask::time() const {
return _time;
}
void AbstractTask::setTime(quint64 newTime) {
_time = newTime;
idGen();
}
int AbstractTask::taskId() const {
return _taskId;
}
void AbstractTask::idGen() {
QByteArray data;
data.insert(0, reinterpret_cast<char*>(&_time), sizeof (_time));
data.insert(0, reinterpret_cast<char*>(&_mode), sizeof (_mode));
int code = typeid(this).hash_code();
data.insert(0, reinterpret_cast<char*>(&code), sizeof (code));
_taskId = qHash(data);
}
}

View File

@ -0,0 +1,93 @@
//#
//# Copyright (C) 2021-2021 QuasarApp.
//# Distributed under the lgplv3 software license, see the accompanying
//# Everyone is permitted to copy and distribute verbatim copies
//# of this license document, but changing it is not allowed.
//#
#ifndef ABSTRACTTASK_H
#define ABSTRACTTASK_H
#include <QSharedPointer>
#include "heart_global.h"
namespace QH {
class AbstractNode;
/**
* @brief The SheduleMode enum contails list of the shedule modes.
*/
enum class SheduleMode: int{
/// In this mode AbstractTask will be executed after **time** msecunds from the moment of adding this task.
SingleWork,
/// In this mode AbstractTask will be executed task every **time** from the moment of adding this task.
Repeat,
/// In this mode AbstractTask will be executed int **time** msecunds by Unix time.
TimePoint
};
/**
* @brief The AbstractTask class. All tasks executed on separate thread.
*/
class HEARTSHARED_EXPORT AbstractTask
{
public:
AbstractTask() = default;
virtual ~AbstractTask() = default;
/**
* @brief mode This method retunr current mode of this task.
* @return current mode of this task.
* @see AbstractTask::setMode
*/
SheduleMode mode() const;
/**
* @brief setMode This method sets new mode of this task.
* @param newMode new mode of this task.
* @see AbstractTask::mode
*/
void setMode(SheduleMode newMode);
/**
* @brief time This is universal property.
* This property has its own meaning for each AbstractTask::mode.
* For more information see the SheduleMode enum.
* @return time property.
* @see AbstractTask::setTime
*/
quint64 time() const;
/**
* @brief setTime This method sets new value for the AbstractTask::time property.
* @param newTime This is new value of the time propertye.
* @see AbstractTask::time
*/
void setTime(quint64 newTime);
/**
* @brief execute This method will be invoked when task be executed.
* @param node This is pointer to node object.
* @return true if the work winished successfull
*/
virtual bool execute(AbstractNode *node) const = 0;
/**
* @brief taskId This method return id of this task.
* @return id of this task.
*/
int taskId() const;
private:
void idGen();
SheduleMode _mode = SheduleMode::SingleWork;
quint64 _time = 0;
int _taskId = 0;
};
}
Q_DECLARE_METATYPE(QSharedPointer<QH::AbstractTask>)
#endif // ABSTRACTTASK_H

View File

@ -0,0 +1,116 @@
//#
//# Copyright (C) 2021-2021 QuasarApp.
//# Distributed under the lgplv3 software license, see the accompanying
//# Everyone is permitted to copy and distribute verbatim copies
//# of this license document, but changing it is not allowed.
//#
#include "shedullertest.h"
#include "abstractnode.h"
#include "abstracttask.h"
#include "ctime"
#include <QTest>
#include <QDateTime>
#include <cmath>
class ShedullerestNode: public QH::AbstractNode {
public:
quint64 executedTime = 0;
};
class TestTask: public QH::AbstractTask {
// AbstractTask interface
public:
bool execute(QH::AbstractNode *node) const override {
static_cast<ShedullerestNode*>(node)->executedTime = QDateTime::currentMSecsSinceEpoch();
return true;
};
};
ShedullerTest::ShedullerTest() {
}
void ShedullerTest::test() {
testSingleMode();
testRepeatMode();
testTimePointMode();
}
void ShedullerTest::testSingleMode() {
ShedullerestNode *node = new ShedullerestNode();
auto task = QSharedPointer<TestTask>::create();
task->setMode(QH::SheduleMode::SingleWork);
task->setTime(2000);
quint64 ct = QDateTime::currentMSecsSinceEpoch();
node->sheduleTask(task);
QVERIFY(wait([&node](){return node->executedTime;}, WAIT_TIME));
int diff = std::abs(static_cast<long long>(node->executedTime - (ct + 2000)));
QVERIFY(diff < 1000);
node->executedTime = 0;
QVERIFY(node->sheduledTaskCount() == 0);
node->softDelete();
}
void ShedullerTest::testRepeatMode() {
ShedullerestNode *node = new ShedullerestNode();
auto task = QSharedPointer<TestTask>::create();
task->setTime(2000);
task->setMode(QH::SheduleMode::Repeat);
quint64 ct = QDateTime::currentMSecsSinceEpoch();
node->sheduleTask(task);
QVERIFY(wait([&node](){return node->executedTime;}, WAIT_TIME));
int diff = std::abs(static_cast<long long>(node->executedTime - (ct + 2000)));
QVERIFY(diff < 1000);
node->executedTime = 0;
QVERIFY(node->sheduledTaskCount() == 1);
QVERIFY(wait([&node](){return node->executedTime;}, WAIT_TIME));
diff = std::abs(static_cast<long long>(node->executedTime - (ct + 4000)));
QVERIFY(diff < 1000);
QVERIFY(node->sheduledTaskCount() == 1);
node->removeTask(task->taskId());
QVERIFY(node->sheduledTaskCount() == 0);
node->softDelete();
}
void ShedullerTest::testTimePointMode() {
ShedullerestNode *node = new ShedullerestNode();
auto task = QSharedPointer<TestTask>::create();
task->setMode(QH::SheduleMode::TimePoint);
quint64 requestTime = QDateTime::currentMSecsSinceEpoch() + 5000;
task->setTime(requestTime);
node->sheduleTask(task);
QVERIFY(wait([&node](){return node->executedTime;}, WAIT_TIME));
int diff = std::abs(static_cast<long long>(node->executedTime - requestTime));
QVERIFY(diff < 1000);
QVERIFY(node->sheduledTaskCount() == 0);
node->softDelete();
}

View File

@ -0,0 +1,30 @@
//#
//# Copyright (C) 2021-2021 QuasarApp.
//# Distributed under the lgplv3 software license, see the accompanying
//# Everyone is permitted to copy and distribute verbatim copies
//# of this license document, but changing it is not allowed.
//#
#ifndef SHEDULLERTEST_H
#define SHEDULLERTEST_H
#include "test.h"
#include "testutils.h"
#include <QtTest>
class ShedullerTest: public Test, protected TestUtils
{
public:
ShedullerTest();
void test() override;
private:
void testSingleMode();
void testRepeatMode();
void testTimePointMode();
};
#endif // SHEDULLERTEST_H

View File

@ -10,10 +10,11 @@
#if HEART_BUILD_LVL >= 0
#include "abstractnodetest.h"
#include <shedullertest.h>
#include <bigdatatest.h>
#endif
#if HEART_BUILD_LVL >= 1
#include <basenodetest.h>
#include <bigdatatest.h>
#include <singleservertest.h>
#include <upgradedatabasetest.h>
#endif
@ -40,6 +41,7 @@ private slots:
TestCase(abstractNodeTest, AbstractNodeTest)
TestCase(bigDataTest, BigDataTest);
TestCase(shedullerTest, ShedullerTest);
#endif
#if HEART_BUILD_LVL >= 1
TestCase(baseNodeTest, BaseNodeTest)