4
1
mirror of https://github.com/QuasarApp/qTbot.git synced 2025-05-08 19:29:35 +00:00

added parallel connection limit

This commit is contained in:
Andrei Yankovich 2024-12-14 18:39:06 +01:00
parent 5390532c5d
commit f2cf4b0442
3 changed files with 110 additions and 50 deletions
src
example
qTbot/src/public/qTbot

@ -24,6 +24,7 @@ int main(int argc, char *argv[]) {
QCoreApplication app(argc, argv);
qTbot::TelegramRestBot bot;
bot.setReqestLimitPerSecond(10);
srand(time(0));
@ -62,30 +63,38 @@ int main(int argc, char *argv[]) {
});;
}
bot.sendSpecificMessageWithKeyboard(qTbot::TelegramArgs{tmsg->chatId(), "I see it", tmsg->messageId()},
{{{"test_button", [tmsg, &bot](const QString& queryId, const QVariant& msgId){
static int index = 0;
if (tmsg->text() == "spam") {
for (int i = 0 ; i < 1000; i++) {
bot.sendMessage(tmsg->chatId(), QString(" message N %0").arg(i));
}
} else {
bot.sendSpecificMessageWithKeyboard(qTbot::TelegramArgs{tmsg->chatId(), "I see it", tmsg->messageId()},
{{{"test_button", [tmsg, &bot](const QString& queryId, const QVariant& msgId){
static int index = 0;
auto&& args = qTbot::TelegramArgs{tmsg->chatId(),
"I see it. Presed count: " + QString::number(index++),
tmsg->messageId(),
"",
false,
queryId};
auto&& args = qTbot::TelegramArgs{tmsg->chatId(),
"I see it. Presed count: " + QString::number(index++),
tmsg->messageId(),
"",
false,
queryId};
auto&& keyboard = qTbot::KeyboardOnMessage{
{{"test_button", [](auto , auto ){}},
{"test_button 2", [](auto , auto ){}}}};
auto&& keyboard = qTbot::KeyboardOnMessage{
{{"test_button", [](auto , auto ){}},
{"test_button 2", [](auto , auto ){}}}};
bot.editSpecificMessageWithKeyboard(msgId,
args,
keyboard
);
}}}});
bot.sendSpecificMessageWithKeyboard(qTbot::TelegramArgs{tmsg->chatId(), "I see it", tmsg->messageId()},
{{{"test_button"},
{"test_button"},}}, true, true);
}
bot.editSpecificMessageWithKeyboard(msgId,
args,
keyboard
);
}}}});
bot.sendSpecificMessageWithKeyboard(qTbot::TelegramArgs{tmsg->chatId(), "I see it", tmsg->messageId()},
{{{"test_button"},
{"test_button"},}}, true, true);
}
}

@ -17,6 +17,10 @@ namespace qTbot {
IBot::IBot() {
_manager = new QNetworkAccessManager();
_manager->setAutoDeleteReplies(true);
_requestExecutor = new QTimer(this);
_requestExecutor->setInterval(1000 / 20); // 20 times per second.
connect(_requestExecutor, &QTimer::timeout, this , &IBot::handleEcxecuteRequest);
}
IBot::~IBot() {
@ -92,12 +96,25 @@ QNetworkReply* IBot::sendRquestImpl(const QSharedPointer<iRequest> &rquest) {
return networkReplay;
}
int IBot::parallelActiveNetworkThreads() const {
return _parallelActiveNetworkThreads;
}
void IBot::setParallelActiveNetworkThreads(int newParallelActiveNetworkThreads) {
_parallelActiveNetworkThreads = newParallelActiveNetworkThreads;
}
void IBot::setCurrentParallelActiveNetworkThreads(int newParallelActiveNetworkThreads) {
_currentParallelActiveNetworkThreads = newParallelActiveNetworkThreads;
qDebug () << "current network active requests count : " << _currentParallelActiveNetworkThreads;
}
int IBot::reqestLimitPerSecond() const {
return _reqestLimitPerSecond;
return _requestExecutor->interval() * 1000;
}
void IBot::setReqestLimitPerSecond(int newReqestLimitPerSecond) {
_reqestLimitPerSecond = newReqestLimitPerSecond;
_requestExecutor->setInterval(1000 / newReqestLimitPerSecond);
}
QFuture<QByteArray>
@ -106,6 +123,8 @@ IBot::sendRequest(const QSharedPointer<iRequest> &rquest) {
responce->start();
_requestQueue.push_back(RequestData{rquest, "", responce});
_requestExecutor->start();
return responce->future();
}
@ -116,6 +135,8 @@ IBot::sendRequest(const QSharedPointer<iRequest> &rquest,
responce->start();
_requestQueue.push_back(RequestData{rquest, pathToResult, responce});
_requestExecutor->start();
return responce->future();
}
@ -140,17 +161,37 @@ void IBot::handleIncomeNewUpdate(const QSharedPointer<iUpdate> & message) {
emit sigReceiveUpdate(message);
}
QFuture<QByteArray> IBot::sendRequestPrivate(const QSharedPointer<iRequest> &rquest) {
void IBot::handleEcxecuteRequest() {
if (!_requestQueue.size()) {
_requestExecutor->stop();
return;
}
if (_currentParallelActiveNetworkThreads > _parallelActiveNetworkThreads) {
return;
}
auto&& requestData = _requestQueue.takeFirst();
if (requestData.responceFilePath.size()) {
sendRequestPrivate(requestData.request, requestData.responceFilePath, requestData.responce);
return;
}
sendRequestPrivate(requestData.request, requestData.responce);
}
void IBot::sendRequestPrivate(const QSharedPointer<iRequest> &rquest,
const QSharedPointer<QPromise<QByteArray> > &promise) {
QNetworkReply* networkReplay = sendRquestImpl(rquest);
if (!networkReplay) {
return {};
return;
}
auto&& promise = QSharedPointer<QPromise<QByteArray>>::create();
promise->start();
setParallelActiveNetworkThreads(_currentParallelActiveNetworkThreads + 1);
networkReplay->connect(networkReplay, &QNetworkReply::finished, [networkReplay, promise](){
connect(networkReplay, &QNetworkReply::finished, [this, networkReplay, promise](){
if (networkReplay->error() == QNetworkReply::NoError) {
promise->addResult(networkReplay->readAll());
promise->finish();
@ -158,6 +199,9 @@ QFuture<QByteArray> IBot::sendRequestPrivate(const QSharedPointer<iRequest> &rqu
} else {
promise->setException(HttpException(networkReplay->error(), networkReplay->errorString().toLatin1() + networkReplay->readAll()));
}
setParallelActiveNetworkThreads(_currentParallelActiveNetworkThreads - 1);
});
auto && setProggress = [promise](qint64 bytesCurrent, qint64 bytesTotal){
@ -168,30 +212,27 @@ QFuture<QByteArray> IBot::sendRequestPrivate(const QSharedPointer<iRequest> &rqu
promise->setProgressValue(bytesCurrent);
};
networkReplay->connect(networkReplay, &QNetworkReply::downloadProgress, setProggress);
networkReplay->connect(networkReplay, &QNetworkReply::uploadProgress, setProggress);
return promise->future();
connect(networkReplay, &QNetworkReply::downloadProgress, setProggress);
connect(networkReplay, &QNetworkReply::uploadProgress, setProggress);
}
QFuture<QByteArray> IBot::sendRequestPrivate(const QSharedPointer<iRequest> &rquest,
const QString &pathToResult) {
void IBot::sendRequestPrivate(const QSharedPointer<iRequest> &rquest,
const QString &pathToResult,
const QSharedPointer<QPromise<QByteArray>> & promise) {
auto&& file = QSharedPointer<QFile>::create(pathToResult);
if (!file->open(QIODeviceBase::WriteOnly | QIODevice::Truncate)) {
qCritical() << "Fail to wrote data into " << pathToResult;
return {};
return;
}
QNetworkReply* networkReplay = sendRquestImpl(rquest);
if (!networkReplay) {
return {};
return;
}
auto&& promise = QSharedPointer<QPromise<QByteArray>>::create();
promise->start();
networkReplay->connect(networkReplay, &QNetworkReply::finished, [promise, networkReplay, pathToResult](){
setParallelActiveNetworkThreads(_currentParallelActiveNetworkThreads + 1);
connect(networkReplay, &QNetworkReply::finished, [this, promise, networkReplay, pathToResult](){
if (networkReplay->error() == QNetworkReply::NoError) {
promise->setException(HttpException(networkReplay->error(), networkReplay->errorString().toLatin1()));
@ -199,10 +240,10 @@ QFuture<QByteArray> IBot::sendRequestPrivate(const QSharedPointer<iRequest> &rqu
promise->addResult(pathToResult.toUtf8()); // wil not work with UTF 8 path names
promise->finish();
}
setParallelActiveNetworkThreads(_currentParallelActiveNetworkThreads - 1);
});
networkReplay->connect(networkReplay, &QNetworkReply::readyRead, [networkReplay, promise, pathToResult, file](){
connect(networkReplay, &QNetworkReply::readyRead, [networkReplay, promise, pathToResult, file](){
if (networkReplay->error() == QNetworkReply::NoError) {
file->write(networkReplay->readAll());
}
@ -217,10 +258,9 @@ QFuture<QByteArray> IBot::sendRequestPrivate(const QSharedPointer<iRequest> &rqu
promise->setProgressValue(bytesCurrent);
};
networkReplay->connect(networkReplay, &QNetworkReply::downloadProgress, setProggress);
networkReplay->connect(networkReplay, &QNetworkReply::uploadProgress, setProggress);
connect(networkReplay, &QNetworkReply::downloadProgress, setProggress);
connect(networkReplay, &QNetworkReply::uploadProgress, setProggress);
return promise->future();
}
QSet<unsigned long long> IBot::processed() const {

@ -17,6 +17,7 @@
#include <QHash>
#include <QSet>
#include <QFileInfo>
#include <QTimer>
#include <QNetworkReply>
#include <QObject>
@ -178,6 +179,9 @@ public:
*/
void setReqestLimitPerSecond(int newReqestLimitPerSecond);
int parallelActiveNetworkThreads() const;
void setParallelActiveNetworkThreads(int newParallelActiveNetworkThreads);
protected:
/**
@ -286,13 +290,17 @@ signals:
*/
void sigStopRequire();
private slots:
void handleEcxecuteRequest();
private:
QFuture<QByteArray>
sendRequestPrivate(const QSharedPointer<iRequest>& rquest);
void setCurrentParallelActiveNetworkThreads(int newParallelActiveNetworkThreads);
QFuture<QByteArray>
sendRequestPrivate(const QSharedPointer<iRequest>& rquest,
const QString& pathToResult);
void sendRequestPrivate(const QSharedPointer<iRequest>& rquest,
const QSharedPointer<QPromise<QByteArray>> & promiseResult);
void sendRequestPrivate(const QSharedPointer<iRequest>& rquest,
const QString& pathToResult,
const QSharedPointer<QPromise<QByteArray> > &promiseResult);
QNetworkReply *sendRquestImpl(const QSharedPointer<iRequest> &rquest);
@ -301,8 +309,11 @@ private:
QMap<unsigned long long, QSharedPointer<iUpdate>> _notProcessedUpdates;
QSet<unsigned long long> _processed;
QNetworkAccessManager *_manager = nullptr;
int _reqestLimitPerSecond = 20;
QTimer* _requestExecutor = nullptr;
QList<RequestData> _requestQueue;
int _currentParallelActiveNetworkThreads = 0;
int _parallelActiveNetworkThreads = 10;
};