Heart 1.3.845.21d07c2
Heart is base back end library for your c++ Qt projects.
abstractnode.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2018-2025 QuasarApp.
3 * Distributed under the lgplv3 software license, see the accompanying
4 * Everyone is permitted to copy and distribute verbatim copies
5 * of this license document, but changing it is not allowed.
6*/
7
8#include "abstractnode.h"
9#include "datasender.h"
10#include "ping.h"
11#include "workstate.h"
12#include <QHostInfo>
13
14#include <badrequest.h>
15#include <quasarapp.h>
16#include <QTcpServer>
17
18#ifdef USE_HEART_SSL
19
20#include <sslsocket.h>
21#include <easyssl/x509.h>
22#include <easyssl/rsassl.h>
23
24#include <QSslConfiguration>
25#include <QSslCertificate>
26#include <QSslKey>
27#include <QSslSocket>
28
29#endif
30
31#include <QMetaObject>
32#include <QtConcurrent>
33#include <closeconnection.h>
34#include "tcpsocket.h"
35#include "asynclauncher.h"
36#include "receivedata.h"
37#include "abstracttask.h"
38
39#include <apiversion.h>
40#include <versionisreceived.h>
41#include <taskscheduler.h>
42#include <qaglobalutils.h>
43#include <bigdatawraper.h>
44#include <bigdataparser.h>
45#include <abstractnodeparser.h>
46#include <apiversionparser.h>
47
48namespace QH {
49
50using namespace PKG;
51
53 QTcpServer(ptr) {
54
55 initThreadId();
56
57 _senderThread = new QThread();
58 _senderThread->setObjectName("Sender");
59
60 _senderThread->start();
61
62 // This object moving to _senderThread.
63 _dataSender = new DataSender(_senderThread);
64 _socketWorker = new AsyncLauncher(_senderThread);
65 _tasksheduller = new TaskScheduler();
66 _apiVersionParser = new APIVersionParser(this);
67
68 addApiParser<BigDataParser>();
69
70 auto abstractNodeParser = addApiParserNative<AbstractNodeParser>();
71 connect(abstractNodeParser.data(), &AbstractNodeParser::sigPingReceived,
72 this, &AbstractNode::receivePing, Qt::DirectConnection);
73
74 connect(_apiVersionParser, &APIVersionParser::sigNoLongerSupport,
75 this, &AbstractNode::sigNoLongerSupport, Qt::DirectConnection);
76
77 qRegisterMetaType<QSharedPointer<QH::AbstractTask>>();
78#ifdef USE_HEART_SSL
79 qRegisterMetaType<QList<QSslError>>();
80#endif
81
82 connect(_tasksheduller, &TaskScheduler::sigPushWork,
83 this, &AbstractNode::handleBeginWork);
84
85
86 initThreadPool();
87
88}
89
91
92 _senderThread->quit();
93 _senderThread->wait();
94
95 for (auto it: std::as_const(_receiveData)) {
96 delete it;
97 }
98
99 _receiveData.clear();
100
101 delete _dataSender;
102 delete _senderThread;
103 delete _socketWorker;
104 delete _tasksheduller;
105 delete _apiVersionParser;
106}
107
108bool AbstractNode::run(const QString &addres, unsigned short port) {
109
110 if (!port)
111 return false;
112
113 HostAddress adr(addres, port);
114 if (addres.isEmpty()) {
115 adr = HostAddress{QHostAddress::Any, port};
116 }
117
118 if (!listen(adr)) {
119
120 qCritical() << "Run fail " << this->errorString();
121 qCritical() << "Address:: " << adr.toString();
122
123 return false;
124 }
125
126 initThreadPool();
127
128 return true;
129}
130
131QSharedPointer<iParser> AbstractNode::selectParser(unsigned short cmd,
132 AbstractNodeInfo *sender) const {
133 return _apiVersionParser->selectParser(cmd, sender);
134}
135
136QSharedPointer<iParser> AbstractNode::selectParser(const QString &type,
137 AbstractNodeInfo *sender) const {
138 return _apiVersionParser->selectParser(type, sender);
139}
140
141QSharedPointer<iParser> AbstractNode::selectParser(const QString &type, int version) const {
142 return _apiVersionParser->selectParser(type, version);
143}
144
146 close();
147
148 _connectionsMutex.lock();
149 for (const auto &i : std::as_const(_connections)) {
150 i->disconnect();
151 }
152 _connectionsMutex.unlock();
153
154 _workersMutex.lock();
155 for (auto it: std::as_const(_workers)) {
156 if (!it->isFinished()) {
157 it->cancel();
158 it->waitForFinished();
159 }
160 }
161 _workersMutex.unlock();
162
163 deinitThreadPool();
164}
165
167 QMutexLocker locer(&_connectionsMutex);
168
169 if (!_connections.contains(id)) {
170 return nullptr;
171 }
172
173 return dynamic_cast<AbstractNodeInfo*>(_connections[id]);
174}
175
176// fix me, if getInfoPtr invoced in an another thread and in some time main thread remove this object then
177// this method return invalid object and application crashed.
179 QMutexLocker locer(&_connectionsMutex);
180
181 if (!_connections.contains(id)) {
182 return nullptr;
183 }
184
185 return dynamic_cast<AbstractNodeInfo*>(_connections[id]);
186}
187
188void AbstractNode::ban(const HostAddress &target) {
189 auto info = getInfoPtr(target);
190 if (info)
191 info->ban();
192
193}
194
195void AbstractNode::unBan(const HostAddress &target) {
196 QMutexLocker locer(&_connectionsMutex);
197
198 if (!_connections.contains(target) || _connections[target]) {
199 return;
200 }
201
202 _connections[target]->unBan();
203}
204
205bool AbstractNode::addNode(const HostAddress &address) {
206
207 AsyncLauncher::Job action = [this, address]() -> bool {
208 QAbstractSocket *socket;
209#ifdef USE_HEART_SSL
210 if (_mode == SslMode::NoSSL) {
211 socket = new TcpSocket(nullptr);
212 } else {
213 socket = new SslSocket(nullptr);
214 }
215
216#else
217 socket = new TcpSocket(nullptr);
218#endif
219
220 if (!registerSocket(socket, &address)) {
222 delete socket;
223 return false;
224 }
225
226 socket->connectToHost(address, address.port());
227
228 return true;
229 };
230
231 return _socketWorker->run(action);
232}
233
235 const std::function<void (QH::AbstractNodeInfo *)> &action,
236 NodeCoonectionStatus status) {
237
238 auto peer = getInfoPtr(address);
239
240 if (action && (!peer || peer->status() < status)) {
241 auto &actionsList = _connectActions[status];
242 actionsList[address] = action;
243 }
244
245 return addNode(address);
246}
247
248bool AbstractNode::addNode(const QString &domain, unsigned short port,
249 const std::function<void (QH::AbstractNodeInfo *)> &action,
250 NodeCoonectionStatus status) {
251
252 HostAddress address{domain, port};
253 if (address.isNull()) {
254
255 QHostInfo::lookupHost(domain, [this, port, domain, action, status](QHostInfo info) {
256
257 if (info.error() != QHostInfo::NoError) {
258
259 qCritical() << "The domain name :" + domain +
260 " has error: " + info.errorString();
261 addNodeFailed(AddNodeError::HostNotFound);
262 return;
263 }
264
265 auto addresses = info.addresses();
266
267 if (addresses.size() > 1) {
268 qWarning() << "The domain name :" + domain +
269 " has more 1 ip addresses.";
270 }
271
272 if (action) {
273 addNode(HostAddress{addresses.first(), port}, action, status);
274 } else {
275 addNode(HostAddress{addresses.first(), port});
276 }
277
278 });
279
280
281 return true;
282 }
283
284
285 return addNode(address, action);
286}
287
288bool AbstractNode::removeNode(const HostAddress &nodeAdderess) {
289
290 if (AbstractNodeInfo *ptr = getInfoPtr(nodeAdderess)) {
291 return removeNode(ptr);
292 }
293
294 return false;
295}
296
298 if (!(node)) {
299 return false;
300 }
301
302 if (!node->isConnected())
303 return true;
304
305 if (node->isLocal()) {
306 node->removeSocket();
307 return true;
308 }
309
310 QTimer::singleShot(WAIT_CONFIRM_TIME, this,
311 std::bind(&AbstractNode::handleForceRemoveNode,
312 this, node->networkAddress()));
313
314 CloseConnection close;
315 return sendData(&close, node);
316}
317
319 return HostAddress{serverAddress(), serverPort()};
320}
321
322#ifdef USE_HEART_SSL
323QSslConfiguration AbstractNode::getSslConfig() const {
324 return _ssl;
325}
326
327QSslConfiguration AbstractNode::selfSignedSslConfiguration(const EasySSL::SslSrtData &sslData) {
328 QSslConfiguration res = QSslConfiguration::defaultConfiguration();
329
330 QSslKey pkey;
331 QSslCertificate crt;
332
333 EasySSL::X509 generator(QSharedPointer<EasySSL::RSASSL>::create());
334 EasySSL::SelfSignedSertificate certificate = generator.create(sslData);
335 res.setPrivateKey(certificate.key);
336 res.setLocalCertificate(certificate.crt);
337 res.setPeerVerifyMode(QSslSocket::VerifyNone);
338
339 return res;
340}
341
342bool AbstractNode::configureSslSocket(AbstractNodeInfo *node, bool fServer) {
343
344 if (!node)
345 return false;
346
347 auto socket = dynamic_cast<SslSocket*>(node->sct());
348 if (!socket) {
349
350 QuasarAppUtils::Params::log("Invalid ssl socket!! Connection not secure",
351 QuasarAppUtils::Error);
352 return false;
353 }
354
355 socket->setSslConfiguration(_ssl);
356
357 auto address = node->networkAddress();
358 connect(socket, &QSslSocket::encrypted, this ,[this, address]() {
359 handleEncrypted(getInfoPtr(address));
360 });
361
362 connect(socket, &SslSocket::sslErrorsOcurred,
363 this, &AbstractNode::handleSslErrorOcurredPrivate, Qt::DirectConnection);
364
365
366 AsyncLauncher::Job action = [socket, fServer]() -> bool {
367 if (fServer)
368 socket->startServerEncryption();
369 else
370 socket->startClientEncryption();
371
372 return true;
373 };
374
375 return _socketWorker->run(action);
376}
377
378const QList<QSslError> &AbstractNode::ignoreSslErrors() const {
379 return _ignoreSslErrors;
380}
381
382void AbstractNode::setIgnoreSslErrors(const QList<QSslError> &newIgnoreSslErrors) {
383 _ignoreSslErrors = newIgnoreSslErrors;
384};
385
386bool AbstractNode::useSelfSignedSslConfiguration(const EasySSL::SslSrtData &crtData) {
387
388 if (isListening()) {
389 return false;
390 }
391
392 _ssl = selfSignedSslConfiguration(crtData);
393 _mode = SslMode::InitSelfSigned;
394
395 if(!_ignoreSslErrors.contains(QSslError{QSslError::SelfSignedCertificate}))
396 _ignoreSslErrors.push_back(QSslError{QSslError::SelfSignedCertificate});
397
398 if(!_ignoreSslErrors.contains(QSslError{QSslError::SelfSignedCertificateInChain}))
399 _ignoreSslErrors.push_back(QSslError{QSslError::SelfSignedCertificateInChain});
400
401 return !_ssl.isNull();
402}
403
404bool AbstractNode::useSystemSslConfiguration(QSslConfiguration config) {
405 if (isListening()) {
406 return false;
407 }
408
409 _ssl = config;
410 _mode = SslMode::InitFromSystem;
411
412 return !_ssl.isNull();
413}
414
415bool AbstractNode::disableSSL() {
416 if (isListening()) {
417 return false;
418 }
419
420 _mode = SslMode::NoSSL;
421
422 return true;
423}
424
425void AbstractNode::handleEncrypted(AbstractNodeInfo *node) {
426 handleNodeStatusChanged(node, NodeCoonectionStatus::Connected);
427}
428
429void AbstractNode::handleSslErrorOcurredPrivate(SslSocket * sslScocket, const QList<QSslError> &errors) {
430
431 QList<QSslError> ignore;
432 for (auto &error : errors) {
433
434 if (!_ignoreSslErrors.contains(QSslError{error.error()})) {
435 handleSslErrorOcurred(sslScocket, error);
436 } else {
437 ignore += error;
438 }
439 }
440
441 if (ignore.isEmpty())
442 return;
443
444 if (sslScocket) {
445 sslScocket->ignoreSslErrors(ignore);
446 }
447}
448
449
450void AbstractNode::handleSslErrorOcurred(SslSocket *scket,
451 const QSslError &error) {
452 QuasarAppUtils::Params::log(scket->peerAddress().toString() + " : " + error.errorString(),
453 QuasarAppUtils::Error);
454
455 QuasarAppUtils::Params::log("Error code: " + QString::number(error.error()),
456 QuasarAppUtils::Error);
457}
458
459#endif
460
462 return _closeConnectionAfterBadRequest;
463}
464
465void AbstractNode::setCloseConnectionAfterBadRequest(bool newCloseConnectionAfterBadRequest) {
466 _closeConnectionAfterBadRequest = newCloseConnectionAfterBadRequest;
467}
468
470 return _sendBadRequestErrors;
471}
472
474 _sendBadRequestErrors = val;
475}
476
477const QSharedPointer<iParser> &
478AbstractNode::addApiParserImpl(const QSharedPointer<iParser> &parserObject) {
479 return _apiVersionParser->addApiParser(parserObject);
480}
481
483 const HostAddress* clientAddress) const {
484 return new AbstractNodeInfo(socket, clientAddress);
485}
486
487bool AbstractNode::registerSocket(QAbstractSocket *socket, const HostAddress* clientAddress) {
488
489 if (connectionsCount() >= maxPendingConnections()) {
490 return false;
491 }
492
493 HostAddress cliAddress;
494 if (clientAddress)
495 cliAddress = *clientAddress;
496 else
497 cliAddress = HostAddress{socket->peerAddress(), socket->peerPort()};
498
499
500 _connectionsMutex.lock();
501
502 if (_connections.contains(cliAddress)) {
503 auto info =_connections.value(cliAddress);
504 info->setSct(socket);
505 info->setIsLocal(clientAddress);
506
507 _connectionsMutex.unlock();
508
509 if (!info->isValid()) {
510 return false;
511 }
512
513 nodeAddedSucessful(info);
514 return true;
515 }
516
517 auto info = createNodeInfo(socket, &cliAddress);
518
519 info->setIsLocal(clientAddress);
520
521 _connections[cliAddress] = info;
522
523 _connectionsMutex.unlock();
524
525 connect(info, &AbstractNodeInfo::sigReadyRead,
526 this, &AbstractNode::avelableBytes, Qt::DirectConnection);
527
528 // using direct connection because socket clear all data of ip and port after disconnected.
529 connect(info, &AbstractNodeInfo::statusChaned,
530 this, &AbstractNode::handleNodeStatusChanged,
531 Qt::QueuedConnection);
532
533 if (info->status() != NodeCoonectionStatus::NotConnected) {
534 handleNodeStatusChanged(info, info->status());
535 }
536
537 // using direct connection because socket clear all data of ip and port after disconnected.
540 Qt::QueuedConnection);
541
542 // check node confirmed
543 QTimer::singleShot(WAIT_TIME, this, [this, cliAddress]() {
544 checkConfirmendOfNode(getInfoPtr(cliAddress));
545 });
546
547 nodeAddedSucessful(info);
548
549 return true;
550}
551
552ParserResult AbstractNode::parsePackage(const QSharedPointer<AbstractData> &pkg,
553 const Header &pkgHeader,
554 AbstractNodeInfo *sender) {
555 return _apiVersionParser->parsePackage(pkg, pkgHeader, sender);
556}
557
558QSharedPointer<AbstractData> AbstractNode::genPackage(unsigned short cmd,
559 AbstractNodeInfo *sender) const {
560 if (_apiVersionParser)
561 return _apiVersionParser->searchPackage(cmd, sender);
562
563 return nullptr;
564}
565
566bool AbstractNode::sendPackage(const Package &pkg, QAbstractSocket *target) const {
567 if (!pkg.isValid()) {
568 return false;
569 }
570
571 if (!target || !target->isValid()) {
572
573 qCritical() << "Destination server not valid!";
574 return false;
575 }
576
577 if (!target->waitForConnected()) {
578 qCritical() << "no connected to server! " + target->errorString();
579 return false;
580 }
581
582 return _dataSender->sendData(pkg.toBytes(), target, true);
583}
584
585unsigned int AbstractNode::sendData(const AbstractData *resp,
586 const HostAddress &addere,
587 const Header *req) {
588 return sendData(resp, getInfoPtr(addere), req);
589}
590
592 const AbstractNodeInfo *node,
593 const Header *req) {
594
595 if (!node) {
596 qDebug() << "Response not sent because client == null";
597 return 0;
598 }
599
600 if (!resp) {
601 return 0;
602 }
603
604 Package pkg;
605 bool convert = false;
606 if (req && req->isValid()) {
607 convert = resp->toPackage(pkg, node->multiVersionPackages().value(resp->cmd()),req->hash);
608 } else {
609 convert = resp->toPackage(pkg, node->multiVersionPackages().value(resp->cmd()));
610 }
611
612 if (!convert) {
613
614 if (static_cast<unsigned int>(pkg.data.size()) > Package::maximumSize()) {
615 // big data
616
617 auto wrap = QSharedPointer<BigDataWraper>::create();
618 wrap->setData(resp);
619 if ( parsePackage(wrap, {}, getInfoPtr(node->networkAddress())) != ParserResult::Processed) {
620 return 0;
621 }
622
623 return BIG_DATA_HASH_ID;
624 }
625 qCritical() << "Response not sent because dont create package from object";
626 return 0;
627 }
628
629 if (!sendPackage(pkg, node->sct())) {
630 qCritical() << "Response not sent!";
631 return 0;
632 }
633
634 return pkg.hdr.hash;
635}
636
637void AbstractNode::badRequest(const HostAddress &address, const Header &req,
638 const ErrorData &err, qint8 diff) {
639
640 if (!changeTrust(address, diff)) {
641
642
643 qCritical() << "Bad request detected, bud response command not sent!"
644 " because trust not changed";
645
646
647 qCritical() << "SECURITY LOG: Force block the " + address.toString() +
648 " because trust defined";
649
650 ban(address);
651
652 return;
653 }
654
655 auto node = getInfoPtr(address);
656 if (!isBanned(node)) {
657 auto bad = BadRequest(err);
658 if (!sendData(&bad, address, &req)) {
659 return;
660 }
661
662 qInfo() << "Bad request sendet to adderess: " + address.toString();
663
665 removeNode(node);
666 }
667 }
668}
669
671 WorkState state;
672
675 state.setMaxConnectionCount(maxPendingConnections());
676 state.setBanedList(banedList());
677 state.setIsRun(isListening());
678
679 return state;
680
681}
682
684 if (isListening()) {
685 if (connectionsCount() >= maxPendingConnections())
686 return "overload";
687 else {
688 return "Work";
689 }
690 }
691
692 return "Not running";
693}
694
696 return QString("%0 / %1").arg(connectionsCount()).arg(maxPendingConnections());
697}
698
699QList<HostAddress> AbstractNode::banedList() const {
700 QList<HostAddress> list = {};
701
702 QMutexLocker locer(&_connectionsMutex);
703
704 for (auto i = _connections.begin(); i != _connections.end(); ++i) {
705 if (i.value()->isBanned()) {
706 list.push_back(i.key());
707 }
708 }
709
710 return list;
711}
712
714 int count = 0;
715
716 QMutexLocker locer(&_connectionsMutex);
717
718 for (auto i : _connections) {
719 if (i->isConnected()) {
720 count++;
721 }
722 }
723 return count;
724}
725
727 int count = 0;
728
729 QMutexLocker locer(&_connectionsMutex);
730
731 for (auto i : _connections) {
732 if (i->status() == NodeCoonectionStatus::Confirmed) {
733 count++;
734 }
735 }
736 return count;
737}
738
739bool AbstractNode::ping(const HostAddress &address) {
740 Ping cmd;
741 return sendData(&cmd, address);
742
743}
744
746 if (!(info && info->isValid())) {
747 return false;
748 }
749
750 return info->isBanned();
751}
752
754 AsyncLauncher::Job action = [this, handle]() -> bool {
755 QAbstractSocket* socket = nullptr;
756
757#ifdef USE_HEART_SSL
758 if (_mode == SslMode::NoSSL) {
759 socket = new TcpSocket(nullptr);
760 } else {
761 socket = new SslSocket(nullptr);
762 }
763
764#else
765 socket = new TcpSocket(nullptr);
766#endif
767
768 socket->setSocketDescriptor(handle);
769
770 if (isBanned(getInfoPtr(HostAddress{socket->peerAddress(), socket->peerPort()}))) {
771 qCritical() << "Income connection from banned address";
772
773 delete socket;
774 return false;
775 }
776
777 if (!registerSocket(socket)) {
778
779 qCritical() << "Failed to register new socket";
780
781 delete socket;
782 return false;
783 }
784
785 return true;
786
787 };
788
789 _socketWorker->run(action);
790}
791
792bool AbstractNode::changeTrust(const HostAddress &id, int diff) {
793 auto ptr = getInfoPtr(id);
794 if (!ptr) {
795 return false;
796 }
797
798 auto objTrust = ptr->trust();
799
800 if (objTrust >= static_cast<int>(TrustNode::Undefined)) {
801 return false;
802 }
803
804 if (objTrust <= static_cast<int>(TrustNode::Baned)) {
805 return false;
806 }
807
808 ptr->setTrust(objTrust + diff);
809 return true;
810}
811
812void AbstractNode::avelableBytes(AbstractNodeInfo *sender) {
813
814 if (!sender) {
815 return;
816 }
817
818 auto id = sender->networkAddress();
819
820 if (!_connections.contains(id)) {
821 return;
822 }
823
824 if (!_receiveData.contains(id)) {
825 _receiveData.insert(id, new ReceiveData());
826 }
827
828 auto &pkg = _receiveData[id]->_pkg;
829 auto &hdrArray = _receiveData[id]->_hdrArray;
830
831 int workIndex = 0;
832 const int headerSize = sizeof(Header);
833
834 auto socket = sender->sct();
835 if (!socket) {
836 pkg.reset();
837 hdrArray.clear();
838 return;
839 }
840
841 // concat with old data of header.
842 auto array = hdrArray;
843 while (socket->bytesAvailable() > 0) {
844 array += socket->readAll();
845 };
846
847 const int arraySize = array.size();
848 hdrArray.clear();
849
850 while (arraySize > workIndex) {
851
852 int offset = arraySize - workIndex;
853
854 if (pkg.hdr.isValid()) {
855 // CASE 1: The Package data is still not collected, but the header is already collected. performs full or partial filling of packet data.
856
857 int dataLength = std::min(static_cast<int>(pkg.hdr.size - pkg.data.size()),
858 arraySize - workIndex);
859 pkg.data.append(array.mid(workIndex, dataLength));
860
861 workIndex += dataLength;
862
863
864 } else if (offset >= headerSize) {
865
866 // CASE 2: The header and package still do not exist and the amount of data allows you to create a new header. A header is created and will fill in all or part of the package data.
867
868 pkg.reset();
869
870 memcpy(&pkg.hdr,
871 array.data() + workIndex, headerSize);
872
873 if (!pkg.hdr.isValid())
874 return;
875
876 int dataLength = std::min(static_cast<int>(pkg.hdr.size),
877 arraySize - headerSize - workIndex);
878
879 pkg.data.append(array.mid(workIndex + headerSize, dataLength));
880
881 workIndex += headerSize + dataLength;
882
883 } else {
884 // CASE 3: There is not enough data to initialize the header. The data will be placed in temporary storage and will be processed the next time the data is received.
885
886 unsigned char dataLength = static_cast<unsigned char>(arraySize - workIndex);
887 hdrArray += array.mid(workIndex, dataLength);
888 workIndex += dataLength;
889 }
890
891 if (pkg.isValid()) {
892 newWork(pkg, sender, id);
893 pkg.reset();
894 hdrArray.clear();
895 } else if (static_cast<unsigned int>(pkg.data.size()) >= pkg.hdr.size) {
896 qWarning() << "Invalid Package received." + pkg.toString();
897 pkg.reset();
898 hdrArray.clear();
900
901 }
902 }
903}
904
905void AbstractNode::handleWorkerStoped() {
906 auto senderObject = dynamic_cast<QFutureWatcher <bool>*>(sender());
907
908 if (senderObject) {
909
910 _workersMutex.lock();
911 _workers.remove(senderObject);
912 _workersMutex.unlock();
913
914 delete senderObject;
915 }
916}
917
918void AbstractNode::handleForceRemoveNode(HostAddress node) {
919 AbstractNodeInfo* info = getInfoPtr(node);
920 if (info) {
921 info->removeSocket();
922 }
923}
924
925void AbstractNode::handleBeginWork(QSharedPointer<QH::AbstractTask> work) {
926
927 auto executeObject = [this, work]() -> bool {
928 if (!work)
929 return false;
930
931 return work->execute(this);
932 };
933
934 QMutexLocker locer(&_threadPoolMutex);
935
936 if (_threadPool) {
937 auto worker = new QFutureWatcher <bool>();
938 worker->setFuture(QtConcurrent::run(_threadPool, executeObject));
939
940 _workersMutex.lock();
941 _workers.insert(worker);
942 _workersMutex.unlock();
943
944
945 connect(worker, &QFutureWatcher<bool>::finished,
946 this, &AbstractNode::handleWorkerStoped);
947 }
948}
949
950bool AbstractNode::listen(const HostAddress &address) {
951 return QTcpServer::listen(address, address.port());
952}
953
954QSharedPointer<AbstractData>
956 AbstractNodeInfo *sender) const {
957
958 auto value = _apiVersionParser->searchPackage(pkg.hdr.command, sender);
959 if (!value) {
960 qDebug() << "Package type not registered!"
961 " Please use the registerPackageType method before parsing."
962 " Example invoke registerPackageType<MyData>() into constructor of you client and server nodes.";
963
964 return nullptr;
965 }
966
967 value->fromPakcage(pkg);
968 return value;
969}
970
971QList<HostAddress> AbstractNode::connectionsList() const {
972 QMutexLocker locer(&_connectionsMutex);
973
974 return _connections.keys();
975}
976
977QList<HostAddress> AbstractNode::activeConnectionsList() const {
978
979 QList<HostAddress> result;
980
981 QMutexLocker locer(&_connectionsMutex);
982 for (auto i : _connections) {
983 if (i->isConnected()) {
984 result.push_back(i->networkAddress());
985 }
986 }
987
988 return result;
989}
990
992
993 switch (error) {
995 qCritical() << "The remote host not found or dns server not responce.";
996 break;
997
998 }
999
1001 qCritical() << "The remote node is banned or serve is overload.";
1002 break;
1003
1004 }
1005 default: {
1006 qCritical() << "The unknown error ocurred.";
1007 }
1008 }
1009
1010}
1011
1015
1016bool AbstractNode::addApiParser(const QSharedPointer<iParser> &parser) {
1017
1018 if (!parser || parser->node() != this)
1019 return false;
1020
1021 return !addApiParserImpl(parser).isNull();
1022}
1023
1024void AbstractNode::receivePing(const QSharedPointer<PKG::Ping> &) {};
1025
1026bool AbstractNode::sheduleTask(const QSharedPointer<AbstractTask> &task) {
1027 return _tasksheduller->shedule(task);
1028}
1029
1031 _tasksheduller->remove(taskId);
1032}
1033
1035 return _tasksheduller->taskCount();
1036}
1037
1038void AbstractNode::newWork(const Package &pkg, AbstractNodeInfo *sender,
1039 const HostAddress& id) {
1040
1041 if (!sender)
1042 return;
1043
1044 auto executeObject = [pkg, sender, id, this]() {
1045
1046 auto data = prepareData(pkg, sender);
1047 if (!data)
1048 return false;
1049
1050 ParserResult parseResult = parsePackage(data, pkg.hdr, sender);
1051
1052#ifdef HEART_PRINT_PACKAGES
1053 QuasarAppUtils::Params::log(QString("Package received! %0").arg(data->toString()), QuasarAppUtils::Info);
1054#endif
1055
1056 if (parseResult != ParserResult::Processed) {
1057
1058 auto message = QString("Package not parsed! %0 \nresult: %1. \n%2").
1059 arg(pkg.toString(), iParser::pareseResultToString(parseResult), data->toString());
1060
1061 qInfo() << message;
1062
1063 if (parseResult == ParserResult::Error) {
1064
1065 if (fSendBadRequestErrors()) {
1066 badRequest(id, pkg.hdr, ErrorData{1, "Your send the invalid request."}, REQUEST_ERROR);
1067 } else {
1069 }
1070 }
1071
1072 if (parseResult == ParserResult::NotProcessed) {
1073 if (fSendBadRequestErrors()) {
1074 badRequest(id, pkg.hdr, ErrorData{2, "Your send the not supported command."}, NOTSUPPORT_ERROR);
1075 } else {
1077 }
1078
1079 }
1080
1081#ifdef QT_DEBUG
1082 qInfo() << _apiVersionParser->toString();
1083#endif
1084
1085 return false;
1086 }
1087
1088 _confirmNodeMutex.lock();
1089 sender->updateConfirmStatus();
1090 _confirmNodeMutex.unlock();
1091
1092
1093 return true;
1094 };
1095
1096
1097 QMutexLocker locer(&_threadPoolMutex);
1098
1099 if (_threadPool) {
1100 auto worker = new QFutureWatcher <bool>();
1101 worker->setFuture(QtConcurrent::run(_threadPool, executeObject));
1102
1103 _workersMutex.lock();
1104 _workers.insert(worker);
1105 _workersMutex.unlock();
1106
1107 connect(worker, &QFutureWatcher<bool>::finished,
1108 this, &AbstractNode::handleWorkerStoped);
1109 }
1110}
1111
1113 return _mode;
1114}
1115
1116QHash<HostAddress, AbstractNodeInfo *> AbstractNode::connections() const {
1117 return _connections;
1118}
1119
1123
1124void AbstractNode::handleNodeStatusChanged(AbstractNodeInfo *node, NodeCoonectionStatus status) {
1125
1126 if (status == NodeCoonectionStatus::NotConnected) {
1127 nodeDisconnected(node);
1128 } else if (status == NodeCoonectionStatus::Connected) {
1129
1130#ifdef USE_HEART_SSL
1131 if (_mode != SslMode::NoSSL) {
1132
1133 auto socket = dynamic_cast<SslSocket*>(node->sct());
1134
1135 if (!socket) {
1136 QuasarAppUtils::Params::log("Failed to preparet to configure ssl socket.",
1137 QuasarAppUtils::Error);
1138
1139 removeNode(node);
1140 return;
1141 }
1142
1143 if (!socket->isEncrypted()) {
1144 if (!configureSslSocket(node, !node->isLocal())) {
1145 QuasarAppUtils::Params::log("Failed to configure ssl socket.",
1146 QuasarAppUtils::Error);
1147 }
1148
1149 return;
1150 }
1151 }
1152#endif
1153
1154 nodeConnected(node);
1155 } else if (status == NodeCoonectionStatus::Confirmed) {
1156 nodeConfirmend(node);
1157 }
1158}
1159
1161
1162 auto &actions = _connectActions[NodeCoonectionStatus::Confirmed];
1163 auto action = actions.take(node->networkAddress());
1164 if (action)
1165 action(node);
1166}
1167
1169 if (!_apiVersionParser->sendSupportedAPI(node)) {
1170 qCritical() << "Failed to sent version information to dist node";
1171 }
1172
1173 auto &actions = _connectActions[NodeCoonectionStatus::Connected];
1174 auto action = actions.take(node->networkAddress());
1175 if (action)
1176 action(node);
1177}
1178
1180 Q_UNUSED(node)
1181}
1182
1184 QAbstractSocket::SocketError errorCode,
1185 QString errorString) {
1186 Q_UNUSED(errorCode)
1187
1188 QString message("Network error occured on the %0 node. Message: %1");
1189 qCritical() << message.arg(nodeInfo->networkAddress().toString(), errorString);
1190
1191 auto &actions = _connectActions[NodeCoonectionStatus::Connected];
1192 actions.remove(nodeInfo->networkAddress());
1193}
1194
1195void AbstractNode::checkConfirmendOfNode(AbstractNodeInfo *info) {
1196
1197 if(!info)
1198 return;
1199
1200 if (info->status() != NodeCoonectionStatus::Confirmed) {
1201 removeNode(info);
1202 }
1203}
1204
1205void AbstractNode::initThreadId() const {
1206 mainThreadID();
1207}
1208
1209void AbstractNode::initThreadPool() {
1210 deinitThreadPool();
1211
1212 QMutexLocker lock(&_threadPoolMutex);
1213 _threadPool = new QThreadPool();
1214 _threadPool->setObjectName("PackageWorker");
1215 _threadPool->setMaxThreadCount(QThread::idealThreadCount());
1216
1217}
1218
1219void AbstractNode::deinitThreadPool() {
1220 QMutexLocker lock(&_threadPoolMutex);
1221
1222 if (!_threadPool) {
1223 return;
1224 }
1225
1226 _threadPool->waitForDone(WAIT_TIME);
1227 delete _threadPool;
1228 _threadPool = nullptr;
1229}
1230
1232 static auto thread = QThread::currentThread();
1233
1234 return thread;
1235}
1236
1237} // namespace QH
#define REQUEST_ERROR
#define BIG_DATA_HASH_ID
#define NOTSUPPORT_ERROR
#define CRITICAL_ERROOR
The APIVersionParser class This is main parser forthe main command. This parsers work only with the A...
QSharedPointer< PKG::AbstractData > searchPackage(unsigned short cmd, AbstractNodeInfo *sender) const
searchPackage This method search package recursive in all registered pararsers. Searching will be in ...
QHash< QString, QSharedPointer< QH::iParser > > selectParser(const VersionData &distVersion) const
selectParser This method select api parser betwin nodes.
const QSharedPointer< QH::iParser > & addApiParser(const QSharedPointer< QH::iParser > &parserObject)
addApiParser This method add new Api parser for this node.
ParserResult parsePackage(const QSharedPointer< PKG::AbstractData > &pkg, const Header &pkgHeader, AbstractNodeInfo *sender) override
parsePackage This is main method of all childs classes of an AbstractNode class. This method work on ...
void sigNoLongerSupport(const QString &ApiKey, unsigned short version)
sigNoLongerSupport This signal will be emit when node receive incomplite versions.
QString toString() const override
toString This method show all supported commands and them names.
bool sendSupportedAPI(AbstractNodeInfo *dist) const
sendSupportedAPI This method sents all ainformation about suppported api.
The AbstractNodeInfo class contains information about client or server connection and tcp socket of n...
const PackagesVersionData & multiVersionPackages() const
multiVersionPackages This is list of packages of one api package tah support multiple versions.
bool isLocal() const
isLocal return true if connection opened on this node.
NodeCoonectionStatus status() const
status This method return status of the node connection.
void sigReadyRead(QH::AbstractNodeInfo *thisNode)
sigReadyRead This is wrapper signal for the QAbstractSocket::readyRead signal.
HostAddress networkAddress() const
networkAddress This method return network address of current node or client.
void statusChaned(QH::AbstractNodeInfo *thisNode, QH::NodeCoonectionStatus status)
statusChaned This signal emitted when nodes status is changed.
virtual void removeSocket()
removeSocket This method use for remove socket. You can override this method for handle this event.
virtual bool isValid() const
isValid - Check valid of node. This method check connect status of socket.
QAbstractSocket * sct() const
sct This method return socket of connection.
virtual bool isBanned() const
isBanned - check node which banned.
void sigErrorOccurred(QH::AbstractNodeInfo *thisNode, QAbstractSocket::SocketError socketError, QString message)
sigErrorOccurred This is wrapper signal for the QAbstractSocket::errorOccurred signal.
virtual bool isConnected() const
isConnected - Check of node connect status.
void sigPingReceived(const QSharedPointer< QH::PKG::Ping > &ping)
sigPingReceived This method emited
int sheduledTaskCount() const
sheduledTaskCount This method return count of sheduled tasks.
QHash< HostAddress, AbstractNodeInfo * > connections() const
connections - Return hash map of all connections of this node.
QSharedPointer< PKG::AbstractData > prepareData(const Package &pkg, AbstractNodeInfo *sender) const
prepareData This is private method for preparing package from the byteArray.
QList< HostAddress > connectionsList() const
connectionsList This method return list of all node connections
bool fSendBadRequestErrors() const
fSendBadRequestErrors This property enable or disable sending feedback to connected node when them se...
virtual void nodeErrorOccured(QH::AbstractNodeInfo *nodeInfo, QAbstractSocket::SocketError errorCode, QString errorString)
nodeErrorOccured This slot invoked when error ocured in the nodeInfo.
virtual void addNodeFailed(AddNodeError error)
addNodeFailed This method will be invoked when trying to add new node are failed. So override this me...
virtual void nodeAddedSucessful(AbstractNodeInfo *node)
nodeAddedSucessful This method will be invoked when new node added successful.
QSharedPointer< QH::iParser > selectParser(unsigned short cmd, AbstractNodeInfo *sender) const
selectParser This method select parser by command and sender.
bool removeNode(const HostAddress &nodeAdderess)
removeNode - Remove node and disconnected forom node (server).
virtual void stop()
stop - Stopped this node and close all network connections.
virtual bool changeTrust(const HostAddress &id, int diff)
changeTrust This method change trust of connected node.
virtual void nodeConnected(AbstractNodeInfo *node)
nodeConnected This method invocked when the node status changed to "connected" default implementatio ...
QList< HostAddress > banedList() const
banedList This method retrun list of banned clients of nodes.
virtual QString connectionState() const
connectionState This method return string value about the cocction state.
virtual bool sendPackage(const Package &pkg, QAbstractSocket *target) const
sendPackage This method prepare and send to target address a package.
bool sheduleTask(const QSharedPointer< AbstractTask > &task)
sheduleTask This method shedule execute task on this node.
virtual bool isBanned(const AbstractNodeInfo *socket) const
isBanned This method checks if the node is banned.
static QThread * mainThreadID()
mainThreadID This method return the pointer to main thread
virtual AbstractNodeInfo * createNodeInfo(QAbstractSocket *socket, const HostAddress *clientAddress=nullptr) const
createNodeInfo This method create a nodeInfo object. override this method for create your own nodeInf...
bool addApiParser(const QSharedPointer< iParser > &parser)
addApiParser This method add new Api parser for this node.
bool addNode(const HostAddress &address)
addNode - Connect to node (server) with address.
void removeTask(int taskId)
removeTask This method remove task from sheduler.
~AbstractNode() override
virtual void ban(const HostAddress &target)
ban - This method set for target connection a trust property to 0 and target connection will been abo...
HostAddress address() const
address - Thim method return own network address of current node (server).
virtual void badRequest(const HostAddress &address, const Header &req, const PKG::ErrorData &err, qint8 diff=REQUEST_ERROR)
badRequest This method is send data about error of request.
SslMode getMode() const
getMode - This method return SSL mode of corrent node (server).
void prepareForDelete() override
prepareForDelete This method must be prepare object for delete. Override this for working main functi...
AbstractNode(QObject *ptr=nullptr)
AbstractNode - Base constructor of node.
virtual WorkState getWorkState() const
getWorkState - This method collect general information about this server. For more information about ...
void incomingConnection(qintptr handle) override final
incomingConnection This is ovverided method of QTCPServer.
QList< HostAddress > activeConnectionsList() const
activeConnectionsList This method return list of actived nodes connections
virtual AbstractNodeInfo * getInfoPtr(const HostAddress &id)
getInfoPtr - This method return information class pointer about netwok connection....
virtual bool registerSocket(QAbstractSocket *socket, const HostAddress *address=nullptr)
registerSocket This method registration new socket object.
void sigNoLongerSupport(const QString &ApiKey, unsigned short version)
sigNoLongerSupport is some as a APIVersionParser::sigNoLongerSupport. This signal just retronslate th...
virtual void nodeConfirmend(AbstractNodeInfo *node)
nodeConfirmend This method invocked when the node status changed to "confirmend" default implementati...
virtual QString getWorkStateString() const
getWorkStateString This method generate string about work state of server.
void setSendBadRequestErrors(bool value)
setSendBadRequestErrors This method enable or disable the fSendBadRequestErrors property.
virtual bool run(const QString &addres, unsigned short port)
run This method implement deployment a network node (server) on selected address.
virtual void unBan(const HostAddress &target)
unBan - This method set for target connection a trust property to 100.
bool ping(const HostAddress &address)
ping This method send ping package to address for testing connection.
virtual void nodeDisconnected(AbstractNodeInfo *node)
nodeConnected This method invocked when the node status changed to "disconnected" default implementat...
void setCloseConnectionAfterBadRequest(bool value)
setSendBadRequestErrors This method enable or disable the fcloseConnectionAfterBadRequest property.
int confirmendCount() const
connectionsCount - Return count of nodes with status confirmend.
bool fCloseConnectionAfterBadRequest() const
fCloseConnectionAfterBadRequest This propery enable or disable droping connection after badRequests....
int connectionsCount() const
connectionsCount - Return count fo connections (connections with status connected)
virtual unsigned int sendData(const PKG::AbstractData *resp, const HostAddress &address, const Header *req=nullptr)
sendData This method send data object another to node
virtual void receivePing(const QSharedPointer< QH::PKG::Ping > &ping)
receivePing This method invoked when node receive new ping object.
The AsyncLauncher class is wraper of the Async class for support moving invokes to thread of the curr...
bool run(const Job &action, bool wait=false)
run This method run the action function in the work thread of this object.
std::function< bool()> Job
Definition async.h:35
The DataSender class this class create a queue for sendet data to network.
Definition datasender.h:23
bool sendData(const QByteArray &array, void *target, bool await=false) const
sendPackagePrivate This slot move send package to a main thread.
The Host Address class this is wrapper of QHostAddress. Contains the NetworkAddress and network port.
Definition hostaddress.h:22
unsigned short port() const
The port method return port of node.
QString toString() const
toString this method convert the Host Address value to string value.
The AbstractData class is provide base functions for transport data by network For create you own pac...
virtual bool toPackage(Package &package, const DistVersion &reqVersion, unsigned int triggerHash=0) const
toPackage This method convert this class object to the package. For more info see Package class.
virtual unsigned short cmd() const =0
cmd - This is command of this object, (for generate cmd use macross QH_PACKAGE)
The BadRequest class send response about error to client.
Definition badrequest.h:35
The CloseConnection class - This commanad is request for close connection on parent node of connectio...
The Ping class - test class for translate data on network.
Definition ping.h:22
The Package struct. This is base structure for transporting data by network between QH nodes....
Definition package.h:23
QByteArray data
data This is source data of package.
Definition package.h:33
virtual bool isValid() const
isValid This method validation a current package. Default implementation is checked a header and comp...
Definition package.cpp:18
Header hdr
hdr This is header of package. For more information see the Header struct.
Definition package.h:29
QString toString() const
toString This method convert a package information to a string label.
Definition package.cpp:38
static unsigned int maximumSize()
maximumSize This method return maximu size of pacakge. If pacakge large the maximum size then package...
Definition package.cpp:49
QByteArray toBytes() const
toBytes This method convert a current object to bytes array.
The TaskScheduler class This class contains queue of all shedule tasks.
void sigPushWork(QSharedPointer< QH::AbstractTask > work)
sigPushWork This signal emited when the task work neet to execute.
bool shedule(const QSharedPointer< AbstractTask > &task)
shedule This method shedule new task in this node.
int taskCount() const
taskCount This method return tasks count.
bool remove(const QSharedPointer< AbstractTask > &task)
remove This method remove the task from a tasks queue.
The AbstractSocket class This class is wraper of the QAbstract socket with slot implementation of the...
Definition tcpsocket.h:19
The WorkState class is simple class with data of work state of node.
Definition workstate.h:20
void setActiveConnections(const QList< HostAddress > &newActiveConnections)
setActiveConnections this method sets new list of active connections.
Definition workstate.cpp:34
void setMaxConnectionCount(int value)
setMaxConnectionCount this method set a new value of limit of connections.
Definition workstate.cpp:87
void setIsRun(bool value)
setIsRun This method set new value for run flag.
Definition workstate.cpp:18
void setBanedList(const QList< HostAddress > &banedList)
setBanedList -this method set banned list for this object.
Definition workstate.cpp:95
void setConnections(const QList< HostAddress > &connections)
setConnections This method sets connections list.
Definition workstate.cpp:26
static QString pareseResultToString(const ParserResult &parseResult)
pareseResultToString This method convert ParserResult value to string.
Definition iparser.cpp:22
#define WAIT_CONFIRM_TIME
Definition config.h:23
#define WAIT_TIME
Definition config.h:13
The QH namespace - QuasarApp Heart namespace. This namespace contains all classes of the Heart librar...
Definition heart.cpp:13
ParserResult
The ParserResult enum. Error - parser detect a errorob package. NotProcessed - the parser does not kn...
Definition iparser.h:35
@ NotProcessed
the parser does not know what to do with the package or has not finished processing it.
@ Error
parser detect a errorob package.
@ Processed
the parser finished processing correctly.
NodeCoonectionStatus
The AbstractNodeState enum - This is status of the known nodes or clients.
@ Connected
The node with this status has socket status is connected.
@ NotConnected
This node not sent data about its envirement and status of node socket is disconnected.
AddNodeError
The AddNodeError enum contains error code that can be occured after invoke the AbstractNode::addNode ...
@ RegisterSocketFailed
This error ocurred when you try add baned node or server is overrload.
@ HostNotFound
This error ocurred when DNS server not responce to node or node can't find the server ip address by h...
@ Baned
Node with this trust value is forbidden.
@ Undefined
Undefined node.
SslMode
The SslMode enum This enum contatins options for set ssl mode of node (server). For more information ...
@ NoSSL
This is not secure connection without ssl encription. It is default value of new any node see Abstrac...
The Header struct 32 bytes.
Definition header.h:19
bool isValid() const
isValid This method check header size and compare commands.
Definition header.cpp:18
unsigned int hash
hash This is unique id of a package. id calc with CRC32 function for Qt implementation.
Definition header.h:38
unsigned short command
command of package for more information see the AbstractData::toPackage method.
Definition header.h:23
The ErrorData struct is simple structure for contains data of the error.
Definition badrequest.h:20