added shedduler

This commit is contained in:
Andrei Yankovich 2021-10-11 20:42:13 +03:00
parent 38366cfa32
commit e939c5152b
6 changed files with 363 additions and 1 deletions

View File

@ -0,0 +1,92 @@
//#
//# Copyright (C) 2021-2021 QuasarApp.
//# Distributed under the lgplv3 software license, see the accompanying
//# Everyone is permitted to copy and distribute verbatim copies
//# of this license document, but changing it is not allowed.
//#
#include "taskscheduler.h"
namespace QH {
TaskScheduler::TaskScheduler() {
_timer = new QTimer(this);
connect(_timer, &QTimer::timeout, this, &TaskScheduler::handleTimeOut);
}
bool TaskScheduler::shedule(const QSharedPointer<AbstractTask> &task) {
int currentTime = time(0);
int invokeTime = 0;
switch (task->mode()) {
case SheduleMode::SingleWork: {
invokeTime = currentTime + task->time();
break;
}
case SheduleMode::Repeat: {
invokeTime = currentTime + task->time();
break;
}
case SheduleMode::TimePoint: {
invokeTime = task->time();
if (invokeTime < currentTime)
return false;
break;
}
}
_taskPool[task->taskId()] = task;
_taskQueue.insert(invokeTime, task->taskId());
int top = _taskQueue.begin().key();
_timer->start(top * 1000);
return true;
}
bool TaskScheduler::remove(const QSharedPointer<AbstractTask> &task) {
return remove(task->taskId());
}
bool TaskScheduler::remove(int task) {
auto val = _taskQueue.key(task);
_taskQueue.remove(val);
_taskPool.remove(task);
return true;
}
void TaskScheduler::handleTimeOut() {
auto top = _taskQueue.begin();
if (top == _taskQueue.end()) {
_timer->stop();
return;
}
auto toWork = _taskQueue.values(top.key());
for (int key: toWork) {
auto task = _taskPool.value(key);
if (!task) {
_taskPool.remove(key);
continue;
}
remove(task);
if (task->mode() == SheduleMode::Repeat) {
shedule(task);
}
emit sigPushWork(task);
}
}
}

View File

@ -0,0 +1,66 @@
//#
//# Copyright (C) 2021-2021 QuasarApp.
//# Distributed under the lgplv3 software license, see the accompanying
//# Everyone is permitted to copy and distribute verbatim copies
//# of this license document, but changing it is not allowed.
//#
#ifndef TASKSCHEDULER_H
#define TASKSCHEDULER_H
#include "abstracttask.h"
#include <QMap>
#include <QSharedPointer>
#include <QTimer>
namespace QH {
/**
* @brief The TaskScheduler class This class contains queue of all shedule tasks.
* @see AbstractTask
*/
class TaskScheduler: public QObject
{
Q_OBJECT
public:
TaskScheduler();
/**
* @brief shedule This method shedule new task in this node.
* @param task This is sharedpointe to taskObject.
* @return true if the task sheduled successful. If the task alredy sheduled the return false.
*/
bool shedule(const QSharedPointer<AbstractTask>& task);
/**
* @brief remove This method remove the @a task from a tasks queue.
* @param task This is removed task.
* @return true if the task remove successful
*/
bool remove(const QSharedPointer<AbstractTask>& task);
/**
* @brief remove This method remove the task with @a task id.
* @param task This is id of the removed task.
* @return true if the method removed successful
*/
bool remove(int task);
signals:
/**
* @brief sigPushWork This signal emited when the task @a work neet to execute.
* @param work This is needed to execute task.
*/
void sigPushWork(QSharedPointer<QH::AbstractTask> work);
private slots:
void handleTimeOut();
private:
QMultiMap<int, int> _taskQueue;
QHash<int, QSharedPointer<AbstractTask>> _taskPool;
QTimer *_timer = nullptr;
};
}
#endif // TASKSCHEDULER_H

View File

@ -30,8 +30,10 @@
#include "tcpsocket.h"
#include "asynclauncher.h"
#include "receivedata.h"
#include "abstracttask.h"
#include <bigdatamanager.h>
#include <taskscheduler.h>
namespace QH {
@ -51,6 +53,12 @@ AbstractNode::AbstractNode( QObject *ptr):
_dataSender = new DataSender(_senderThread);
_socketWorker = new AsyncLauncher(_senderThread);
_bigdatamanager = new BigDataManager(this);
_tasksheduller = new TaskScheduler();
qRegisterMetaType<QSharedPointer<QH::AbstractTask>>();
connect(_tasksheduller, &TaskScheduler::sigPushWork,
this, &AbstractNode::handleBeginWork);
registerPackageType<Ping>();
registerPackageType<BadRequest>();
@ -72,12 +80,13 @@ AbstractNode::~AbstractNode() {
for (auto it: qAsConst(_receiveData)) {
delete it;
}
_receiveData.clear();
_receiveData.clear();
delete _dataSender;
delete _senderThread;
delete _socketWorker;
delete _tasksheduller;
}
bool AbstractNode::run(const QString &addres, unsigned short port) {
@ -948,6 +957,12 @@ void AbstractNode::handleWorkerStoped() {
auto senderObject = dynamic_cast<QFutureWatcher <bool>*>(sender());
if (senderObject) {
if (!senderObject->result()) {
QuasarAppUtils::Params::log("Work finished with an error.",
QuasarAppUtils::Error);
}
_workers.remove(senderObject);
delete senderObject;
}
@ -960,6 +975,27 @@ void AbstractNode::handleForceRemoveNode(HostAddress node) {
}
}
void AbstractNode::handleBeginWork(QSharedPointer<QH::AbstractTask> work) {
auto executeObject = [this, work]() -> bool {
if (!work)
return false;
return work->execute(this);
};
QMutexLocker locer(&_threadPoolMutex);
if (_threadPool) {
auto worker = new QFutureWatcher <bool>();
worker->setFuture(QtConcurrent::run(_threadPool, executeObject));
_workers.insert(worker);
connect(worker, &QFutureWatcher<bool>::finished,
this, &AbstractNode::handleWorkerStoped);
}
}
bool AbstractNode::listen(const HostAddress &address) {
return QTcpServer::listen(address, address.port());
}
@ -1007,6 +1043,14 @@ QList<HostAddress> AbstractNode::activeConnectionsList() const {
return result;
}
void AbstractNode::sheduleTask(const QSharedPointer<AbstractTask> &task) {
_tasksheduller->shedule(task);
}
void AbstractNode::removeTask(int taskId) {
_tasksheduller->remove(taskId);
}
void AbstractNode::newWork(const Package &pkg, AbstractNodeInfo *sender,
const HostAddress& id) {

View File

@ -40,6 +40,8 @@ class ReceiveData;
class SocketFactory;
class AsyncLauncher;
class BigDataManager;
class TaskScheduler;
class AbstractTask;
namespace PKG {
class ErrorData;
@ -582,6 +584,18 @@ protected:
*/
QList<HostAddress> activeConnectionsList() const;
/**
* @brief sheduleTask This method shedule execute task on this node.
* @param task This is task that will be sheduled.
*/
void sheduleTask(const QSharedPointer<AbstractTask>& task);
/**
* @brief removeTask This method remove task from sheduler.
* @param taskId This is task id that will be removed.
*/
void removeTask(int taskId);
/**
* @brief commandHandler This method it is simple wrapper for the handle pacakges in the AbstractNode::parsePackage method.
* Exmaple of use :
@ -667,6 +681,12 @@ private slots:
*/
void handleForceRemoveNode(HostAddress node);
/**
* @brief handleBeginWork This method run task on new thread.
* @param work This is new work task
*/
void handleBeginWork(QSharedPointer<QH::AbstractTask> work);
private:
/**
@ -714,6 +734,7 @@ private:
AsyncLauncher * _socketWorker = nullptr;
QThread *_senderThread = nullptr;
BigDataManager *_bigdatamanager = nullptr;
TaskScheduler *_tasksheduller = nullptr;
QSet<QFutureWatcher <bool>*> _workers;

View File

@ -0,0 +1,48 @@
//#
//# Copyright (C) 2021-2021 QuasarApp.
//# Distributed under the lgplv3 software license, see the accompanying
//# Everyone is permitted to copy and distribute verbatim copies
//# of this license document, but changing it is not allowed.
//#
#include "abstracttask.h"
#include <QHash>
#include <QByteArray>
namespace QH {
SheduleMode AbstractTask::mode() const {
return _mode;
}
void AbstractTask::setMode(SheduleMode newMode) {
_mode = newMode;
idGen();
}
int AbstractTask::time() const {
return _time;
}
void AbstractTask::setTime(int newTime) {
_time = newTime;
idGen();
}
int AbstractTask::taskId() const {
return _taskId;
}
void AbstractTask::idGen() {
QByteArray data;
data.push_back(_time);
data.push_back(static_cast<int>(_mode));
data.push_back(typeid(this).hash_code());
_taskId = qHash(data);
}
}

View File

@ -0,0 +1,91 @@
//#
//# Copyright (C) 2021-2021 QuasarApp.
//# Distributed under the lgplv3 software license, see the accompanying
//# Everyone is permitted to copy and distribute verbatim copies
//# of this license document, but changing it is not allowed.
//#
#ifndef ABSTRACTTASK_H
#define ABSTRACTTASK_H
#include <QSharedPointer>
namespace QH {
class AbstractNode;
/**
* @brief The SheduleMode enum contails list of the shedule modes.
*/
enum class SheduleMode {
/// In this mode AbstractTask will be executed after **time** secunds from the moment of adding this task.
SingleWork,
/// In this mode AbstractTask will be executed task every **time** from the moment of adding this task.
Repeat,
/// In this mode AbstractTask will be executed int **time** secunds by Unix time.
TimePoint
};
/**
* @brief The AbstractTask class. All tasks executed on separate thread.
*/
class AbstractTask
{
public:
AbstractTask() = default;
virtual ~AbstractTask() = default;
/**
* @brief mode This method retunr current mode of this task.
* @return current mode of this task.
* @see AbstractTask::setMode
*/
SheduleMode mode() const;
/**
* @brief setMode This method sets new mode of this task.
* @param newMode new mode of this task.
* @see AbstractTask::mode
*/
void setMode(SheduleMode newMode);
/**
* @brief time This is universal property.
* This property has its own meaning for each AbstractTask::mode.
* For more information see the SheduleMode enum.
* @return time property.
* @see AbstractTask::setTime
*/
int time() const;
/**
* @brief setTime This method sets new value for the AbstractTask::time property.
* @param newTime This is new value of the time propertye.
* @see AbstractTask::time
*/
void setTime(int newTime);
/**
* @brief execute This method will be invoked when task be executed.
* @param node This is pointer to node object.
* @return true if the work winished successfull
*/
virtual bool execute(AbstractNode *node) const = 0;
/**
* @brief taskId This method return id of this task.
* @return id of this task.
*/
int taskId() const;
private:
void idGen();
SheduleMode _mode = SheduleMode::SingleWork;
int _time = 0;
int _taskId = 0;
};
}
Q_DECLARE_METATYPE(QSharedPointer<QH::AbstractTask>)
#endif // ABSTRACTTASK_H