Heart 1.3.844.0629079
Heart is base back end library for your c++ Qt projects.
abstractnode.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2018-2025 QuasarApp.
3 * Distributed under the lgplv3 software license, see the accompanying
4 * Everyone is permitted to copy and distribute verbatim copies
5 * of this license document, but changing it is not allowed.
6*/
7
8#include "abstractnode.h"
9#include "datasender.h"
10#include "ping.h"
11#include "workstate.h"
12#include <QHostInfo>
13
14#include <badrequest.h>
15#include <quasarapp.h>
16#include <QTcpServer>
17
18#ifdef USE_HEART_SSL
19
20#include <sslsocket.h>
21#include <easyssl/x509.h>
22#include <easyssl/rsassl.h>
23
24#include <QSslConfiguration>
25#include <QSslCertificate>
26#include <QSslKey>
27#include <QSslSocket>
28
29#endif
30
31#include <QMetaObject>
32#include <QtConcurrent>
33#include <closeconnection.h>
34#include "tcpsocket.h"
35#include "asynclauncher.h"
36#include "receivedata.h"
37#include "abstracttask.h"
38
39#include <apiversion.h>
40#include <versionisreceived.h>
41#include <taskscheduler.h>
42#include <qaglobalutils.h>
43#include <bigdatawraper.h>
44#include <bigdataparser.h>
45#include <abstractnodeparser.h>
46#include <apiversionparser.h>
47
48namespace QH {
49
50using namespace PKG;
51
53 QTcpServer(ptr) {
54
55 initThreadId();
56
57 _senderThread = new QThread();
58 _senderThread->setObjectName("Sender");
59
60 _senderThread->start();
61
62 // This object moving to _senderThread.
63 _dataSender = new DataSender(_senderThread);
64 _socketWorker = new AsyncLauncher(_senderThread);
65 _tasksheduller = new TaskScheduler();
66 _apiVersionParser = new APIVersionParser(this);
67
68 addApiParser<BigDataParser>();
69
70 auto abstractNodeParser = addApiParserNative<AbstractNodeParser>();
71 connect(abstractNodeParser.data(), &AbstractNodeParser::sigPingReceived,
72 this, &AbstractNode::receivePing, Qt::DirectConnection);
73
74 connect(_apiVersionParser, &APIVersionParser::sigNoLongerSupport,
75 this, &AbstractNode::sigNoLongerSupport, Qt::DirectConnection);
76
77 qRegisterMetaType<QSharedPointer<QH::AbstractTask>>();
78#ifdef USE_HEART_SSL
79 qRegisterMetaType<QList<QSslError>>();
80#endif
81
82 connect(_tasksheduller, &TaskScheduler::sigPushWork,
83 this, &AbstractNode::handleBeginWork);
84
85
86 initThreadPool();
87
88}
89
91
92 _senderThread->quit();
93 _senderThread->wait();
94
95 for (auto it: std::as_const(_receiveData)) {
96 delete it;
97 }
98
99 _receiveData.clear();
100
101 delete _dataSender;
102 delete _senderThread;
103 delete _socketWorker;
104 delete _tasksheduller;
105 delete _apiVersionParser;
106}
107
108bool AbstractNode::run(const QString &addres, unsigned short port) {
109
110 if (!port)
111 return false;
112
113 HostAddress adr(addres, port);
114 if (addres.isEmpty()) {
115 adr = HostAddress{QHostAddress::Any, port};
116 }
117
118 if (!listen(adr)) {
119 QuasarAppUtils::Params::log("Run fail " + this->errorString(),
120 QuasarAppUtils::Error);
121
122 QuasarAppUtils::Params::log("Address:: " + adr.toString(),
123 QuasarAppUtils::Error);
124 return false;
125 }
126
127 initThreadPool();
128
129 return true;
130}
131
132QSharedPointer<iParser> AbstractNode::selectParser(unsigned short cmd,
133 AbstractNodeInfo *sender) const {
134 return _apiVersionParser->selectParser(cmd, sender);
135}
136
137QSharedPointer<iParser> AbstractNode::selectParser(const QString &type,
138 AbstractNodeInfo *sender) const {
139 return _apiVersionParser->selectParser(type, sender);
140}
141
142QSharedPointer<iParser> AbstractNode::selectParser(const QString &type, int version) const {
143 return _apiVersionParser->selectParser(type, version);
144}
145
147 close();
148
149 _connectionsMutex.lock();
150 for (const auto &i : std::as_const(_connections)) {
151 i->disconnect();
152 }
153 _connectionsMutex.unlock();
154
155 _workersMutex.lock();
156 for (auto it: std::as_const(_workers)) {
157 if (!it->isFinished()) {
158 it->cancel();
159 it->waitForFinished();
160 }
161 }
162 _workersMutex.unlock();
163
164 deinitThreadPool();
165}
166
168 QMutexLocker locer(&_connectionsMutex);
169
170 if (!_connections.contains(id)) {
171 return nullptr;
172 }
173
174 return dynamic_cast<AbstractNodeInfo*>(_connections[id]);
175}
176
177// fix me, if getInfoPtr invoced in an another thread and in some time main thread remove this object then
178// this method return invalid object and application crashed.
180 QMutexLocker locer(&_connectionsMutex);
181
182 if (!_connections.contains(id)) {
183 return nullptr;
184 }
185
186 return dynamic_cast<AbstractNodeInfo*>(_connections[id]);
187}
188
189void AbstractNode::ban(const HostAddress &target) {
190 auto info = getInfoPtr(target);
191 if (info)
192 info->ban();
193
194}
195
196void AbstractNode::unBan(const HostAddress &target) {
197 QMutexLocker locer(&_connectionsMutex);
198
199 if (!_connections.contains(target) || _connections[target]) {
200 return;
201 }
202
203 _connections[target]->unBan();
204}
205
206bool AbstractNode::addNode(const HostAddress &address) {
207
208 AsyncLauncher::Job action = [this, address]() -> bool {
209 QAbstractSocket *socket;
210#ifdef USE_HEART_SSL
211 if (_mode == SslMode::NoSSL) {
212 socket = new TcpSocket(nullptr);
213 } else {
214 socket = new SslSocket(nullptr);
215 }
216
217#else
218 socket = new TcpSocket(nullptr);
219#endif
220
221 if (!registerSocket(socket, &address)) {
223 delete socket;
224 return false;
225 }
226
227 socket->connectToHost(address, address.port());
228
229 return true;
230 };
231
232 return _socketWorker->run(action);
233}
234
236 const std::function<void (QH::AbstractNodeInfo *)> &action,
237 NodeCoonectionStatus status) {
238
239 auto peer = getInfoPtr(address);
240
241 if (action && (!peer || peer->status() < status)) {
242 auto &actionsList = _connectActions[status];
243 actionsList[address] = action;
244 }
245
246 return addNode(address);
247}
248
249bool AbstractNode::addNode(const QString &domain, unsigned short port,
250 const std::function<void (QH::AbstractNodeInfo *)> &action,
251 NodeCoonectionStatus status) {
252
253 HostAddress address{domain, port};
254 if (address.isNull()) {
255
256 QHostInfo::lookupHost(domain, [this, port, domain, action, status](QHostInfo info) {
257
258 if (info.error() != QHostInfo::NoError) {
259 QuasarAppUtils::Params::log("The domain name :" + domain +
260 " has error: " + info.errorString(),
261 QuasarAppUtils::Error);
262 addNodeFailed(AddNodeError::HostNotFound);
263 return;
264 }
265
266 auto addresses = info.addresses();
267
268 if (addresses.size() > 1) {
269 QuasarAppUtils::Params::log("The domain name :" + domain +
270 " has more 1 ip addresses.",
271 QuasarAppUtils::Warning);
272 }
273
274 if (action) {
275 addNode(HostAddress{addresses.first(), port}, action, status);
276 } else {
277 addNode(HostAddress{addresses.first(), port});
278 }
279
280 });
281
282
283 return true;
284 }
285
286
287 return addNode(address, action);
288}
289
290bool AbstractNode::removeNode(const HostAddress &nodeAdderess) {
291
292 if (AbstractNodeInfo *ptr = getInfoPtr(nodeAdderess)) {
293 return removeNode(ptr);
294 }
295
296 return false;
297}
298
300 if (!(node)) {
301 return false;
302 }
303
304 if (!node->isConnected())
305 return true;
306
307 if (node->isLocal()) {
308 node->removeSocket();
309 return true;
310 }
311
312 QTimer::singleShot(WAIT_CONFIRM_TIME, this,
313 std::bind(&AbstractNode::handleForceRemoveNode,
314 this, node->networkAddress()));
315
316 CloseConnection close;
317 return sendData(&close, node);
318}
319
321 return HostAddress{serverAddress(), serverPort()};
322}
323
324#ifdef USE_HEART_SSL
325QSslConfiguration AbstractNode::getSslConfig() const {
326 return _ssl;
327}
328
329QSslConfiguration AbstractNode::selfSignedSslConfiguration(const EasySSL::SslSrtData &sslData) {
330 QSslConfiguration res = QSslConfiguration::defaultConfiguration();
331
332 QSslKey pkey;
333 QSslCertificate crt;
334
335 EasySSL::X509 generator(QSharedPointer<EasySSL::RSASSL>::create());
336 EasySSL::SelfSignedSertificate certificate = generator.create(sslData);
337 res.setPrivateKey(certificate.key);
338 res.setLocalCertificate(certificate.crt);
339 res.setPeerVerifyMode(QSslSocket::VerifyNone);
340
341 return res;
342}
343
344bool AbstractNode::configureSslSocket(AbstractNodeInfo *node, bool fServer) {
345
346 if (!node)
347 return false;
348
349 auto socket = dynamic_cast<SslSocket*>(node->sct());
350 if (!socket) {
351
352 QuasarAppUtils::Params::log("Invalid ssl socket!! Connection not secure",
353 QuasarAppUtils::Error);
354 return false;
355 }
356
357 socket->setSslConfiguration(_ssl);
358
359 auto address = node->networkAddress();
360 connect(socket, &QSslSocket::encrypted, this ,[this, address]() {
361 handleEncrypted(getInfoPtr(address));
362 });
363
364 connect(socket, &SslSocket::sslErrorsOcurred,
365 this, &AbstractNode::handleSslErrorOcurredPrivate, Qt::DirectConnection);
366
367
368 AsyncLauncher::Job action = [socket, fServer]() -> bool {
369 if (fServer)
370 socket->startServerEncryption();
371 else
372 socket->startClientEncryption();
373
374 return true;
375 };
376
377 return _socketWorker->run(action);
378}
379
380const QList<QSslError> &AbstractNode::ignoreSslErrors() const {
381 return _ignoreSslErrors;
382}
383
384void AbstractNode::setIgnoreSslErrors(const QList<QSslError> &newIgnoreSslErrors) {
385 _ignoreSslErrors = newIgnoreSslErrors;
386};
387
388bool AbstractNode::useSelfSignedSslConfiguration(const EasySSL::SslSrtData &crtData) {
389
390 if (isListening()) {
391 return false;
392 }
393
394 _ssl = selfSignedSslConfiguration(crtData);
395 _mode = SslMode::InitSelfSigned;
396
397 if(!_ignoreSslErrors.contains(QSslError{QSslError::SelfSignedCertificate}))
398 _ignoreSslErrors.push_back(QSslError{QSslError::SelfSignedCertificate});
399
400 if(!_ignoreSslErrors.contains(QSslError{QSslError::SelfSignedCertificateInChain}))
401 _ignoreSslErrors.push_back(QSslError{QSslError::SelfSignedCertificateInChain});
402
403 return !_ssl.isNull();
404}
405
406bool AbstractNode::useSystemSslConfiguration(QSslConfiguration config) {
407 if (isListening()) {
408 return false;
409 }
410
411 _ssl = config;
412 _mode = SslMode::InitFromSystem;
413
414 return !_ssl.isNull();
415}
416
417bool AbstractNode::disableSSL() {
418 if (isListening()) {
419 return false;
420 }
421
422 _mode = SslMode::NoSSL;
423
424 return true;
425}
426
427void AbstractNode::handleEncrypted(AbstractNodeInfo *node) {
428 handleNodeStatusChanged(node, NodeCoonectionStatus::Connected);
429}
430
431void AbstractNode::handleSslErrorOcurredPrivate(SslSocket * sslScocket, const QList<QSslError> &errors) {
432
433 QList<QSslError> ignore;
434 for (auto &error : errors) {
435
436 if (!_ignoreSslErrors.contains(QSslError{error.error()})) {
437 handleSslErrorOcurred(sslScocket, error);
438 } else {
439 ignore += error;
440 }
441 }
442
443 if (ignore.isEmpty())
444 return;
445
446 if (sslScocket) {
447 sslScocket->ignoreSslErrors(ignore);
448 }
449}
450
451
452void AbstractNode::handleSslErrorOcurred(SslSocket *scket,
453 const QSslError &error) {
454 QuasarAppUtils::Params::log(scket->peerAddress().toString() + " : " + error.errorString(),
455 QuasarAppUtils::Error);
456
457 QuasarAppUtils::Params::log("Error code: " + QString::number(error.error()),
458 QuasarAppUtils::Error);
459}
460
461#endif
462
464 return _closeConnectionAfterBadRequest;
465}
466
467void AbstractNode::setCloseConnectionAfterBadRequest(bool newCloseConnectionAfterBadRequest) {
468 _closeConnectionAfterBadRequest = newCloseConnectionAfterBadRequest;
469}
470
472 return _sendBadRequestErrors;
473}
474
476 _sendBadRequestErrors = val;
477}
478
479const QSharedPointer<iParser> &
480AbstractNode::addApiParserImpl(const QSharedPointer<iParser> &parserObject) {
481 return _apiVersionParser->addApiParser(parserObject);
482}
483
485 const HostAddress* clientAddress) const {
486 return new AbstractNodeInfo(socket, clientAddress);
487}
488
489bool AbstractNode::registerSocket(QAbstractSocket *socket, const HostAddress* clientAddress) {
490
491 if (connectionsCount() >= maxPendingConnections()) {
492 return false;
493 }
494
495 HostAddress cliAddress;
496 if (clientAddress)
497 cliAddress = *clientAddress;
498 else
499 cliAddress = HostAddress{socket->peerAddress(), socket->peerPort()};
500
501
502 _connectionsMutex.lock();
503
504 if (_connections.contains(cliAddress)) {
505 auto info =_connections.value(cliAddress);
506 info->setSct(socket);
507 info->setIsLocal(clientAddress);
508
509 _connectionsMutex.unlock();
510
511 if (!info->isValid()) {
512 return false;
513 }
514
515 nodeAddedSucessful(info);
516 return true;
517 }
518
519 auto info = createNodeInfo(socket, &cliAddress);
520
521 info->setIsLocal(clientAddress);
522
523 _connections[cliAddress] = info;
524
525 _connectionsMutex.unlock();
526
527 connect(info, &AbstractNodeInfo::sigReadyRead,
528 this, &AbstractNode::avelableBytes, Qt::DirectConnection);
529
530 // using direct connection because socket clear all data of ip and port after disconnected.
531 connect(info, &AbstractNodeInfo::statusChaned,
532 this, &AbstractNode::handleNodeStatusChanged,
533 Qt::QueuedConnection);
534
535 if (info->status() != NodeCoonectionStatus::NotConnected) {
536 handleNodeStatusChanged(info, info->status());
537 }
538
539 // using direct connection because socket clear all data of ip and port after disconnected.
542 Qt::QueuedConnection);
543
544 // check node confirmed
545 QTimer::singleShot(WAIT_TIME, this, [this, cliAddress]() {
546 checkConfirmendOfNode(getInfoPtr(cliAddress));
547 });
548
549 nodeAddedSucessful(info);
550
551 return true;
552}
553
554ParserResult AbstractNode::parsePackage(const QSharedPointer<AbstractData> &pkg,
555 const Header &pkgHeader,
556 AbstractNodeInfo *sender) {
557 return _apiVersionParser->parsePackage(pkg, pkgHeader, sender);
558}
559
560QSharedPointer<AbstractData> AbstractNode::genPackage(unsigned short cmd,
561 AbstractNodeInfo *sender) const {
562 if (_apiVersionParser)
563 return _apiVersionParser->searchPackage(cmd, sender);
564
565 return nullptr;
566}
567
568bool AbstractNode::sendPackage(const Package &pkg, QAbstractSocket *target) const {
569 if (!pkg.isValid()) {
570 return false;
571 }
572
573 if (!target || !target->isValid()) {
574 QuasarAppUtils::Params::log("destination server not valid!",
575 QuasarAppUtils::Error);
576 return false;
577 }
578
579 if (!target->waitForConnected()) {
580 QuasarAppUtils::Params::log("no connected to server! " + target->errorString(),
581 QuasarAppUtils::Error);
582 return false;
583 }
584
585 return _dataSender->sendData(pkg.toBytes(), target, true);
586}
587
588unsigned int AbstractNode::sendData(const AbstractData *resp,
589 const HostAddress &addere,
590 const Header *req) {
591 return sendData(resp, getInfoPtr(addere), req);
592}
593
595 const AbstractNodeInfo *node,
596 const Header *req) {
597
598 if (!node) {
599 QuasarAppUtils::Params::log("Response not sent because client == null");
600 return 0;
601 }
602
603 if (!resp) {
604 return 0;
605 }
606
607 Package pkg;
608 bool convert = false;
609 if (req && req->isValid()) {
610 convert = resp->toPackage(pkg, node->multiVersionPackages().value(resp->cmd()),req->hash);
611 } else {
612 convert = resp->toPackage(pkg, node->multiVersionPackages().value(resp->cmd()));
613 }
614
615 if (!convert) {
616
617 if (static_cast<unsigned int>(pkg.data.size()) > Package::maximumSize()) {
618 // big data
619
620 auto wrap = QSharedPointer<BigDataWraper>::create();
621 wrap->setData(resp);
622 if ( parsePackage(wrap, {}, getInfoPtr(node->networkAddress())) != ParserResult::Processed) {
623 return 0;
624 }
625
626 return BIG_DATA_HASH_ID;
627 }
628
629 QuasarAppUtils::Params::log("Response not sent because dont create package from object",
630 QuasarAppUtils::Error);
631 return 0;
632 }
633
634 if (!sendPackage(pkg, node->sct())) {
635 QuasarAppUtils::Params::log("Response not sent!",
636 QuasarAppUtils::Error);
637 return 0;
638 }
639
640 return pkg.hdr.hash;
641}
642
643void AbstractNode::badRequest(const HostAddress &address, const Header &req,
644 const ErrorData &err, qint8 diff) {
645
646 if (!changeTrust(address, diff)) {
647
648 QuasarAppUtils::Params::log("Bad request detected, bud response command not sent!"
649 " because trust not changed",
650 QuasarAppUtils::Error);
651
652 QuasarAppUtils::Params::log("SECURITY LOG: Force block the " + address.toString() +
653 " because trust defined",
654 QuasarAppUtils::Error);
655
656 ban(address);
657
658 return;
659 }
660
661 auto node = getInfoPtr(address);
662 if (!isBanned(node)) {
663 auto bad = BadRequest(err);
664 if (!sendData(&bad, address, &req)) {
665 return;
666 }
667
668 QuasarAppUtils::Params::log("Bad request sendet to adderess: " +
670 QuasarAppUtils::Info);
671
673 removeNode(node);
674 }
675 }
676}
677
679 WorkState state;
680
683 state.setMaxConnectionCount(maxPendingConnections());
684 state.setBanedList(banedList());
685 state.setIsRun(isListening());
686
687 return state;
688
689}
690
692 if (isListening()) {
693 if (connectionsCount() >= maxPendingConnections())
694 return "overload";
695 else {
696 return "Work";
697 }
698 }
699
700 return "Not running";
701}
702
704 return QString("%0 / %1").arg(connectionsCount()).arg(maxPendingConnections());
705}
706
707QList<HostAddress> AbstractNode::banedList() const {
708 QList<HostAddress> list = {};
709
710 QMutexLocker locer(&_connectionsMutex);
711
712 for (auto i = _connections.begin(); i != _connections.end(); ++i) {
713 if (i.value()->isBanned()) {
714 list.push_back(i.key());
715 }
716 }
717
718 return list;
719}
720
722 int count = 0;
723
724 QMutexLocker locer(&_connectionsMutex);
725
726 for (auto i : _connections) {
727 if (i->isConnected()) {
728 count++;
729 }
730 }
731 return count;
732}
733
735 int count = 0;
736
737 QMutexLocker locer(&_connectionsMutex);
738
739 for (auto i : _connections) {
740 if (i->status() == NodeCoonectionStatus::Confirmed) {
741 count++;
742 }
743 }
744 return count;
745}
746
747bool AbstractNode::ping(const HostAddress &address) {
748 Ping cmd;
749 return sendData(&cmd, address);
750
751}
752
754 if (!(info && info->isValid())) {
755 return false;
756 }
757
758 return info->isBanned();
759}
760
762 AsyncLauncher::Job action = [this, handle]() -> bool {
763 QAbstractSocket* socket = nullptr;
764
765#ifdef USE_HEART_SSL
766 if (_mode == SslMode::NoSSL) {
767 socket = new TcpSocket(nullptr);
768 } else {
769 socket = new SslSocket(nullptr);
770 }
771
772#else
773 socket = new TcpSocket(nullptr);
774#endif
775
776 socket->setSocketDescriptor(handle);
777
778 if (isBanned(getInfoPtr(HostAddress{socket->peerAddress(), socket->peerPort()}))) {
779 QuasarAppUtils::Params::log("Income connection from banned address",
780 QuasarAppUtils::Error);
781
782 delete socket;
783 return false;
784 }
785
786 if (!registerSocket(socket)) {
787
788 QuasarAppUtils::Params::log("Failed to register new socket",
789 QuasarAppUtils::Error);
790
791 delete socket;
792 return false;
793 }
794
795 return true;
796
797 };
798
799 _socketWorker->run(action);
800}
801
802bool AbstractNode::changeTrust(const HostAddress &id, int diff) {
803 auto ptr = getInfoPtr(id);
804 if (!ptr) {
805 return false;
806 }
807
808 auto objTrust = ptr->trust();
809
810 if (objTrust >= static_cast<int>(TrustNode::Undefined)) {
811 return false;
812 }
813
814 if (objTrust <= static_cast<int>(TrustNode::Baned)) {
815 return false;
816 }
817
818 ptr->setTrust(objTrust + diff);
819 return true;
820}
821
822void AbstractNode::avelableBytes(AbstractNodeInfo *sender) {
823
824 if (!sender) {
825 return;
826 }
827
828 auto id = sender->networkAddress();
829
830 if (!_connections.contains(id)) {
831 return;
832 }
833
834 if (!_receiveData.contains(id)) {
835 _receiveData.insert(id, new ReceiveData());
836 }
837
838 auto &pkg = _receiveData[id]->_pkg;
839 auto &hdrArray = _receiveData[id]->_hdrArray;
840
841 int workIndex = 0;
842 const int headerSize = sizeof(Header);
843
844 auto socket = sender->sct();
845 if (!socket) {
846 pkg.reset();
847 hdrArray.clear();
848 return;
849 }
850
851 // concat with old data of header.
852 auto array = hdrArray;
853 while (socket->bytesAvailable() > 0) {
854 array += socket->readAll();
855 };
856
857 const int arraySize = array.size();
858 hdrArray.clear();
859
860 while (arraySize > workIndex) {
861
862 int offset = arraySize - workIndex;
863
864 if (pkg.hdr.isValid()) {
865 // CASE 1: The Package data is still not collected, but the header is already collected. performs full or partial filling of packet data.
866
867 int dataLength = std::min(static_cast<int>(pkg.hdr.size - pkg.data.size()),
868 arraySize - workIndex);
869 pkg.data.append(array.mid(workIndex, dataLength));
870
871 workIndex += dataLength;
872
873
874 } else if (offset >= headerSize) {
875
876 // CASE 2: The header and package still do not exist and the amount of data allows you to create a new header. A header is created and will fill in all or part of the package data.
877
878 pkg.reset();
879
880 memcpy(&pkg.hdr,
881 array.data() + workIndex, headerSize);
882
883 if (!pkg.hdr.isValid())
884 return;
885
886 int dataLength = std::min(static_cast<int>(pkg.hdr.size),
887 arraySize - headerSize - workIndex);
888
889 pkg.data.append(array.mid(workIndex + headerSize, dataLength));
890
891 workIndex += headerSize + dataLength;
892
893 } else {
894 // CASE 3: There is not enough data to initialize the header. The data will be placed in temporary storage and will be processed the next time the data is received.
895
896 unsigned char dataLength = static_cast<unsigned char>(arraySize - workIndex);
897 hdrArray += array.mid(workIndex, dataLength);
898 workIndex += dataLength;
899 }
900
901 if (pkg.isValid()) {
902 newWork(pkg, sender, id);
903 pkg.reset();
904 hdrArray.clear();
905 } else if (static_cast<unsigned int>(pkg.data.size()) >= pkg.hdr.size) {
906 QuasarAppUtils::Params::log("Invalid Package received." + pkg.toString(),
907 QuasarAppUtils::Warning);
908 pkg.reset();
909 hdrArray.clear();
911
912 }
913 }
914}
915
916void AbstractNode::handleWorkerStoped() {
917 auto senderObject = dynamic_cast<QFutureWatcher <bool>*>(sender());
918
919 if (senderObject) {
920
921 _workersMutex.lock();
922 _workers.remove(senderObject);
923 _workersMutex.unlock();
924
925 delete senderObject;
926 }
927}
928
929void AbstractNode::handleForceRemoveNode(HostAddress node) {
930 AbstractNodeInfo* info = getInfoPtr(node);
931 if (info) {
932 info->removeSocket();
933 }
934}
935
936void AbstractNode::handleBeginWork(QSharedPointer<QH::AbstractTask> work) {
937
938 auto executeObject = [this, work]() -> bool {
939 if (!work)
940 return false;
941
942 return work->execute(this);
943 };
944
945 QMutexLocker locer(&_threadPoolMutex);
946
947 if (_threadPool) {
948 auto worker = new QFutureWatcher <bool>();
949 worker->setFuture(QtConcurrent::run(_threadPool, executeObject));
950
951 _workersMutex.lock();
952 _workers.insert(worker);
953 _workersMutex.unlock();
954
955
956 connect(worker, &QFutureWatcher<bool>::finished,
957 this, &AbstractNode::handleWorkerStoped);
958 }
959}
960
961bool AbstractNode::listen(const HostAddress &address) {
962 return QTcpServer::listen(address, address.port());
963}
964
965QSharedPointer<AbstractData>
967 AbstractNodeInfo *sender) const {
968
969 auto value = _apiVersionParser->searchPackage(pkg.hdr.command, sender);
970 if (!value) {
971 QuasarAppUtils::Params::log("You try parse not registered package type."
972 " Plese use the registerPackageType method befor parsing."
973 " Example invoke registerPackageType<MyData>() into constructor of you client and server nodes.");
974
975 return nullptr;
976 }
977
978 value->fromPakcage(pkg);
979 return value;
980}
981
982QList<HostAddress> AbstractNode::connectionsList() const {
983 QMutexLocker locer(&_connectionsMutex);
984
985 return _connections.keys();
986}
987
988QList<HostAddress> AbstractNode::activeConnectionsList() const {
989
990 QList<HostAddress> result;
991
992 QMutexLocker locer(&_connectionsMutex);
993 for (auto i : _connections) {
994 if (i->isConnected()) {
995 result.push_back(i->networkAddress());
996 }
997 }
998
999 return result;
1000}
1001
1003
1004 switch (error) {
1006 QuasarAppUtils::Params::log("The remote host not found or dns server not responce.",
1007 QuasarAppUtils::Error);
1008 break;
1009
1010 }
1011
1013 QuasarAppUtils::Params::log("The remote node is banned or serve is overload.",
1014 QuasarAppUtils::Error);
1015 break;
1016
1017 }
1018 default: {
1019 QuasarAppUtils::Params::log("The unknown error ocurred.",
1020 QuasarAppUtils::Error);
1021 }
1022 }
1023
1024}
1025
1029
1030bool AbstractNode::addApiParser(const QSharedPointer<iParser> &parser) {
1031
1032 if (!parser || parser->node() != this)
1033 return false;
1034
1035 return !addApiParserImpl(parser).isNull();
1036}
1037
1038void AbstractNode::receivePing(const QSharedPointer<PKG::Ping> &) {};
1039
1040bool AbstractNode::sheduleTask(const QSharedPointer<AbstractTask> &task) {
1041 return _tasksheduller->shedule(task);
1042}
1043
1045 _tasksheduller->remove(taskId);
1046}
1047
1049 return _tasksheduller->taskCount();
1050}
1051
1052void AbstractNode::newWork(const Package &pkg, AbstractNodeInfo *sender,
1053 const HostAddress& id) {
1054
1055 if (!sender)
1056 return;
1057
1058 auto executeObject = [pkg, sender, id, this]() {
1059
1060 auto data = prepareData(pkg, sender);
1061 if (!data)
1062 return false;
1063
1064 ParserResult parseResult = parsePackage(data, pkg.hdr, sender);
1065
1066#ifdef HEART_PRINT_PACKAGES
1067 QuasarAppUtils::Params::log(QString("Package received! %0").arg(data->toString()), QuasarAppUtils::Info);
1068#endif
1069
1070 if (parseResult != ParserResult::Processed) {
1071
1072 auto message = QString("Package not parsed! %0 \nresult: %1. \n%2").
1073 arg(pkg.toString(), iParser::pareseResultToString(parseResult), data->toString());
1074
1075 QuasarAppUtils::Params::log(message, QuasarAppUtils::Info);
1076
1077 if (parseResult == ParserResult::Error) {
1078
1079 if (fSendBadRequestErrors()) {
1080 badRequest(id, pkg.hdr, ErrorData{1, "Your send the invalid request."}, REQUEST_ERROR);
1081 } else {
1083 }
1084 }
1085
1086 if (parseResult == ParserResult::NotProcessed) {
1087 if (fSendBadRequestErrors()) {
1088 badRequest(id, pkg.hdr, ErrorData{2, "Your send the not supported command."}, NOTSUPPORT_ERROR);
1089 } else {
1091 }
1092
1093 }
1094
1095#ifdef QT_DEBUG
1096 QuasarAppUtils::Params::log(_apiVersionParser->toString(), QuasarAppUtils::Info);
1097#endif
1098
1099 return false;
1100 }
1101
1102 _confirmNodeMutex.lock();
1103 sender->updateConfirmStatus();
1104 _confirmNodeMutex.unlock();
1105
1106
1107 return true;
1108 };
1109
1110
1111 QMutexLocker locer(&_threadPoolMutex);
1112
1113 if (_threadPool) {
1114 auto worker = new QFutureWatcher <bool>();
1115 worker->setFuture(QtConcurrent::run(_threadPool, executeObject));
1116
1117 _workersMutex.lock();
1118 _workers.insert(worker);
1119 _workersMutex.unlock();
1120
1121 connect(worker, &QFutureWatcher<bool>::finished,
1122 this, &AbstractNode::handleWorkerStoped);
1123 }
1124}
1125
1127 return _mode;
1128}
1129
1130QHash<HostAddress, AbstractNodeInfo *> AbstractNode::connections() const {
1131 return _connections;
1132}
1133
1137
1138void AbstractNode::handleNodeStatusChanged(AbstractNodeInfo *node, NodeCoonectionStatus status) {
1139
1140 if (status == NodeCoonectionStatus::NotConnected) {
1141 nodeDisconnected(node);
1142 } else if (status == NodeCoonectionStatus::Connected) {
1143
1144#ifdef USE_HEART_SSL
1145 if (_mode != SslMode::NoSSL) {
1146
1147 auto socket = dynamic_cast<SslSocket*>(node->sct());
1148
1149 if (!socket) {
1150 QuasarAppUtils::Params::log("Failed to preparet to configure ssl socket.",
1151 QuasarAppUtils::Error);
1152
1153 removeNode(node);
1154 return;
1155 }
1156
1157 if (!socket->isEncrypted()) {
1158 if (!configureSslSocket(node, !node->isLocal())) {
1159 QuasarAppUtils::Params::log("Failed to configure ssl socket.",
1160 QuasarAppUtils::Error);
1161 }
1162
1163 return;
1164 }
1165 }
1166#endif
1167
1168 nodeConnected(node);
1169 } else if (status == NodeCoonectionStatus::Confirmed) {
1170 nodeConfirmend(node);
1171 }
1172}
1173
1175
1176 auto &actions = _connectActions[NodeCoonectionStatus::Confirmed];
1177 auto action = actions.take(node->networkAddress());
1178 if (action)
1179 action(node);
1180}
1181
1183 if (!_apiVersionParser->sendSupportedAPI(node)) {
1184 QuasarAppUtils::Params::log("Failed to sent version information to dist node",
1185 QuasarAppUtils::Error);
1186 }
1187
1188 auto &actions = _connectActions[NodeCoonectionStatus::Connected];
1189 auto action = actions.take(node->networkAddress());
1190 if (action)
1191 action(node);
1192}
1193
1195 Q_UNUSED(node)
1196}
1197
1199 QAbstractSocket::SocketError errorCode,
1200 QString errorString) {
1201 Q_UNUSED(errorCode)
1202
1203 QString message("Network error occured on the %0 node. Message: %1");
1204 QuasarAppUtils::Params::log(
1205 message.arg(nodeInfo->networkAddress().toString(), errorString),
1206 QuasarAppUtils::Error);
1207
1208 auto &actions = _connectActions[NodeCoonectionStatus::Connected];
1209 actions.remove(nodeInfo->networkAddress());
1210}
1211
1212void AbstractNode::checkConfirmendOfNode(AbstractNodeInfo *info) {
1213
1214 if(!info)
1215 return;
1216
1217 if (info->status() != NodeCoonectionStatus::Confirmed) {
1218 removeNode(info);
1219 }
1220}
1221
1222void AbstractNode::initThreadId() const {
1223 mainThreadID();
1224}
1225
1226void AbstractNode::initThreadPool() {
1227 deinitThreadPool();
1228
1229 QMutexLocker lock(&_threadPoolMutex);
1230 _threadPool = new QThreadPool();
1231 _threadPool->setObjectName("PackageWorker");
1232 _threadPool->setMaxThreadCount(QThread::idealThreadCount());
1233
1234}
1235
1236void AbstractNode::deinitThreadPool() {
1237 QMutexLocker lock(&_threadPoolMutex);
1238
1239 if (!_threadPool) {
1240 return;
1241 }
1242
1243 _threadPool->waitForDone(WAIT_TIME);
1244 delete _threadPool;
1245 _threadPool = nullptr;
1246}
1247
1249 static auto thread = QThread::currentThread();
1250
1251 return thread;
1252}
1253
1254} // namespace QH
#define REQUEST_ERROR
#define BIG_DATA_HASH_ID
#define NOTSUPPORT_ERROR
#define CRITICAL_ERROOR
The APIVersionParser class This is main parser forthe main command. This parsers work only with the A...
QSharedPointer< PKG::AbstractData > searchPackage(unsigned short cmd, AbstractNodeInfo *sender) const
searchPackage This method search package recursive in all registered pararsers. Searching will be in ...
QHash< QString, QSharedPointer< QH::iParser > > selectParser(const VersionData &distVersion) const
selectParser This method select api parser betwin nodes.
const QSharedPointer< QH::iParser > & addApiParser(const QSharedPointer< QH::iParser > &parserObject)
addApiParser This method add new Api parser for this node.
ParserResult parsePackage(const QSharedPointer< PKG::AbstractData > &pkg, const Header &pkgHeader, AbstractNodeInfo *sender) override
parsePackage This is main method of all childs classes of an AbstractNode class. This method work on ...
void sigNoLongerSupport(const QString &ApiKey, unsigned short version)
sigNoLongerSupport This signal will be emit when node receive incomplite versions.
QString toString() const override
toString This method show all supported commands and them names.
bool sendSupportedAPI(AbstractNodeInfo *dist) const
sendSupportedAPI This method sents all ainformation about suppported api.
The AbstractNodeInfo class contains information about client or server connection and tcp socket of n...
const PackagesVersionData & multiVersionPackages() const
multiVersionPackages This is list of packages of one api package tah support multiple versions.
bool isLocal() const
isLocal return true if connection opened on this node.
NodeCoonectionStatus status() const
status This method return status of the node connection.
void sigReadyRead(QH::AbstractNodeInfo *thisNode)
sigReadyRead This is wrapper signal for the QAbstractSocket::readyRead signal.
HostAddress networkAddress() const
networkAddress This method return network address of current node or client.
void statusChaned(QH::AbstractNodeInfo *thisNode, QH::NodeCoonectionStatus status)
statusChaned This signal emitted when nodes status is changed.
virtual void removeSocket()
removeSocket This method use for remove socket. You can override this method for handle this event.
virtual bool isValid() const
isValid - Check valid of node. This method check connect status of socket.
QAbstractSocket * sct() const
sct This method return socket of connection.
virtual bool isBanned() const
isBanned - check node which banned.
void sigErrorOccurred(QH::AbstractNodeInfo *thisNode, QAbstractSocket::SocketError socketError, QString message)
sigErrorOccurred This is wrapper signal for the QAbstractSocket::errorOccurred signal.
virtual bool isConnected() const
isConnected - Check of node connect status.
void sigPingReceived(const QSharedPointer< QH::PKG::Ping > &ping)
sigPingReceived This method emited
int sheduledTaskCount() const
sheduledTaskCount This method return count of sheduled tasks.
QHash< HostAddress, AbstractNodeInfo * > connections() const
connections - Return hash map of all connections of this node.
QSharedPointer< PKG::AbstractData > prepareData(const Package &pkg, AbstractNodeInfo *sender) const
prepareData This is private method for preparing package from the byteArray.
QList< HostAddress > connectionsList() const
connectionsList This method return list of all node connections
bool fSendBadRequestErrors() const
fSendBadRequestErrors This property enable or disable sending feedback to connected node when them se...
virtual void nodeErrorOccured(QH::AbstractNodeInfo *nodeInfo, QAbstractSocket::SocketError errorCode, QString errorString)
nodeErrorOccured This slot invoked when error ocured in the nodeInfo.
virtual void addNodeFailed(AddNodeError error)
addNodeFailed This method will be invoked when trying to add new node are failed. So override this me...
virtual void nodeAddedSucessful(AbstractNodeInfo *node)
nodeAddedSucessful This method will be invoked when new node added successful.
QSharedPointer< QH::iParser > selectParser(unsigned short cmd, AbstractNodeInfo *sender) const
selectParser This method select parser by command and sender.
bool removeNode(const HostAddress &nodeAdderess)
removeNode - Remove node and disconnected forom node (server).
virtual void stop()
stop - Stopped this node and close all network connections.
virtual bool changeTrust(const HostAddress &id, int diff)
changeTrust This method change trust of connected node.
virtual void nodeConnected(AbstractNodeInfo *node)
nodeConnected This method invocked when the node status changed to "connected" default implementatio ...
QList< HostAddress > banedList() const
banedList This method retrun list of banned clients of nodes.
virtual QString connectionState() const
connectionState This method return string value about the cocction state.
virtual bool sendPackage(const Package &pkg, QAbstractSocket *target) const
sendPackage This method prepare and send to target address a package.
bool sheduleTask(const QSharedPointer< AbstractTask > &task)
sheduleTask This method shedule execute task on this node.
virtual bool isBanned(const AbstractNodeInfo *socket) const
isBanned This method checks if the node is banned.
static QThread * mainThreadID()
mainThreadID This method return the pointer to main thread
virtual AbstractNodeInfo * createNodeInfo(QAbstractSocket *socket, const HostAddress *clientAddress=nullptr) const
createNodeInfo This method create a nodeInfo object. override this method for create your own nodeInf...
bool addApiParser(const QSharedPointer< iParser > &parser)
addApiParser This method add new Api parser for this node.
bool addNode(const HostAddress &address)
addNode - Connect to node (server) with address.
void removeTask(int taskId)
removeTask This method remove task from sheduler.
~AbstractNode() override
virtual void ban(const HostAddress &target)
ban - This method set for target connection a trust property to 0 and target connection will been abo...
HostAddress address() const
address - Thim method return own network address of current node (server).
virtual void badRequest(const HostAddress &address, const Header &req, const PKG::ErrorData &err, qint8 diff=REQUEST_ERROR)
badRequest This method is send data about error of request.
SslMode getMode() const
getMode - This method return SSL mode of corrent node (server).
void prepareForDelete() override
prepareForDelete This method must be prepare object for delete. Override this for working main functi...
AbstractNode(QObject *ptr=nullptr)
AbstractNode - Base constructor of node.
virtual WorkState getWorkState() const
getWorkState - This method collect general information about this server. For more information about ...
void incomingConnection(qintptr handle) override final
incomingConnection This is ovverided method of QTCPServer.
QList< HostAddress > activeConnectionsList() const
activeConnectionsList This method return list of actived nodes connections
virtual AbstractNodeInfo * getInfoPtr(const HostAddress &id)
getInfoPtr - This method return information class pointer about netwok connection....
virtual bool registerSocket(QAbstractSocket *socket, const HostAddress *address=nullptr)
registerSocket This method registration new socket object.
void sigNoLongerSupport(const QString &ApiKey, unsigned short version)
sigNoLongerSupport is some as a APIVersionParser::sigNoLongerSupport. This signal just retronslate th...
virtual void nodeConfirmend(AbstractNodeInfo *node)
nodeConfirmend This method invocked when the node status changed to "confirmend" default implementati...
virtual QString getWorkStateString() const
getWorkStateString This method generate string about work state of server.
void setSendBadRequestErrors(bool value)
setSendBadRequestErrors This method enable or disable the fSendBadRequestErrors property.
virtual bool run(const QString &addres, unsigned short port)
run This method implement deployment a network node (server) on selected address.
virtual void unBan(const HostAddress &target)
unBan - This method set for target connection a trust property to 100.
bool ping(const HostAddress &address)
ping This method send ping package to address for testing connection.
virtual void nodeDisconnected(AbstractNodeInfo *node)
nodeConnected This method invocked when the node status changed to "disconnected" default implementat...
void setCloseConnectionAfterBadRequest(bool value)
setSendBadRequestErrors This method enable or disable the fcloseConnectionAfterBadRequest property.
int confirmendCount() const
connectionsCount - Return count of nodes with status confirmend.
bool fCloseConnectionAfterBadRequest() const
fCloseConnectionAfterBadRequest This propery enable or disable droping connection after badRequests....
int connectionsCount() const
connectionsCount - Return count fo connections (connections with status connected)
virtual unsigned int sendData(const PKG::AbstractData *resp, const HostAddress &address, const Header *req=nullptr)
sendData This method send data object another to node
virtual void receivePing(const QSharedPointer< QH::PKG::Ping > &ping)
receivePing This method invoked when node receive new ping object.
The AsyncLauncher class is wraper of the Async class for support moving invokes to thread of the curr...
bool run(const Job &action, bool wait=false)
run This method run the action function in the work thread of this object.
std::function< bool()> Job
Definition async.h:35
The DataSender class this class create a queue for sendet data to network.
Definition datasender.h:23
bool sendData(const QByteArray &array, void *target, bool await=false) const
sendPackagePrivate This slot move send package to a main thread.
The Host Address class this is wrapper of QHostAddress. Contains the NetworkAddress and network port.
Definition hostaddress.h:22
unsigned short port() const
The port method return port of node.
QString toString() const
toString this method convert the Host Address value to string value.
The AbstractData class is provide base functions for transport data by network For create you own pac...
virtual bool toPackage(Package &package, const DistVersion &reqVersion, unsigned int triggerHash=0) const
toPackage This method convert this class object to the package. For more info see Package class.
virtual unsigned short cmd() const =0
cmd - This is command of this object, (for generate cmd use macross QH_PACKAGE)
The BadRequest class send response about error to client.
Definition badrequest.h:35
The CloseConnection class - This commanad is request for close connection on parent node of connectio...
The Ping class - test class for translate data on network.
Definition ping.h:22
The Package struct. This is base structure for transporting data by network between QH nodes....
Definition package.h:23
QByteArray data
data This is source data of package.
Definition package.h:33
virtual bool isValid() const
isValid This method validation a current package. Default implementation is checked a header and comp...
Definition package.cpp:18
Header hdr
hdr This is header of package. For more information see the Header struct.
Definition package.h:29
QString toString() const
toString This method convert a package information to a string label.
Definition package.cpp:38
static unsigned int maximumSize()
maximumSize This method return maximu size of pacakge. If pacakge large the maximum size then package...
Definition package.cpp:49
QByteArray toBytes() const
toBytes This method convert a current object to bytes array.
The TaskScheduler class This class contains queue of all shedule tasks.
void sigPushWork(QSharedPointer< QH::AbstractTask > work)
sigPushWork This signal emited when the task work neet to execute.
bool shedule(const QSharedPointer< AbstractTask > &task)
shedule This method shedule new task in this node.
int taskCount() const
taskCount This method return tasks count.
bool remove(const QSharedPointer< AbstractTask > &task)
remove This method remove the task from a tasks queue.
The AbstractSocket class This class is wraper of the QAbstract socket with slot implementation of the...
Definition tcpsocket.h:19
The WorkState class is simple class with data of work state of node.
Definition workstate.h:20
void setActiveConnections(const QList< HostAddress > &newActiveConnections)
setActiveConnections this method sets new list of active connections.
Definition workstate.cpp:34
void setMaxConnectionCount(int value)
setMaxConnectionCount this method set a new value of limit of connections.
Definition workstate.cpp:87
void setIsRun(bool value)
setIsRun This method set new value for run flag.
Definition workstate.cpp:18
void setBanedList(const QList< HostAddress > &banedList)
setBanedList -this method set banned list for this object.
Definition workstate.cpp:95
void setConnections(const QList< HostAddress > &connections)
setConnections This method sets connections list.
Definition workstate.cpp:26
static QString pareseResultToString(const ParserResult &parseResult)
pareseResultToString This method convert ParserResult value to string.
Definition iparser.cpp:22
#define WAIT_CONFIRM_TIME
Definition config.h:23
#define WAIT_TIME
Definition config.h:13
The QH namespace - QuasarApp Heart namespace. This namespace contains all classes of the Heart librar...
Definition heart.cpp:13
ParserResult
The ParserResult enum. Error - parser detect a errorob package. NotProcessed - the parser does not kn...
Definition iparser.h:35
@ NotProcessed
the parser does not know what to do with the package or has not finished processing it.
@ Error
parser detect a errorob package.
@ Processed
the parser finished processing correctly.
NodeCoonectionStatus
The AbstractNodeState enum - This is status of the known nodes or clients.
@ Connected
The node with this status has socket status is connected.
@ NotConnected
This node not sent data about its envirement and status of node socket is disconnected.
AddNodeError
The AddNodeError enum contains error code that can be occured after invoke the AbstractNode::addNode ...
@ RegisterSocketFailed
This error ocurred when you try add baned node or server is overrload.
@ HostNotFound
This error ocurred when DNS server not responce to node or node can't find the server ip address by h...
@ Baned
Node with this trust value is forbidden.
@ Undefined
Undefined node.
SslMode
The SslMode enum This enum contatins options for set ssl mode of node (server). For more information ...
@ NoSSL
This is not secure connection without ssl encription. It is default value of new any node see Abstrac...
The Header struct 32 bytes.
Definition header.h:19
bool isValid() const
isValid This method check header size and compare commands.
Definition header.cpp:18
unsigned int hash
hash This is unique id of a package. id calc with CRC32 function for Qt implementation.
Definition header.h:38
unsigned short command
command of package for more information see the AbstractData::toPackage method.
Definition header.h:23
The ErrorData struct is simple structure for contains data of the error.
Definition badrequest.h:20