From 5936bf96f00e2e4941620d348c53903a81d62ea2 Mon Sep 17 00:00:00 2001 From: Andrey Astafyev Date: Sat, 25 Apr 2020 11:41:19 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9D=D0=BE=D0=B2=D0=B0=D1=8F=20=D0=B1=D0=B8?= =?UTF-8?q?=D0=B1=D0=BB=D0=B8=D0=BE=D1=82=D0=B5=D0=BA=D0=B0=20=D0=B4=D0=BB?= =?UTF-8?q?=D1=8F=20Redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .clang-tidy | 2 +- src/myx/redis/CMakeLists.txt | 1 - src/myx/redis/base-inl.hpp | 152 ++++++++++------- src/myx/redis/base.hpp | 48 ++++-- src/myx/redis/containers-inl.hpp | 53 +++--- src/myx/redis/containers.hpp | 64 +++++++- src/myx/redis/containers.tpp | 68 -------- src/myx/redis/pubsub-inl.hpp | 270 ++++++++++++++++--------------- src/myx/redis/pubsub.hpp | 23 +-- 9 files changed, 369 insertions(+), 312 deletions(-) delete mode 100644 src/myx/redis/containers.tpp diff --git a/.clang-tidy b/.clang-tidy index c8c0ac7..44dc142 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -223,7 +223,7 @@ CheckOptions: - key: readability-identifier-naming.ProtectedMemberCase value: camelBack - key: readability-identifier-naming.ProtectedMemberPrefix - value: '' + value: 'm_' - key: readability-identifier-naming.ProtectedMemberSuffix value: '' - key: readability-identifier-naming.ProtectedMethodCase diff --git a/src/myx/redis/CMakeLists.txt b/src/myx/redis/CMakeLists.txt index ea6fed6..6aaba4c 100644 --- a/src/myx/redis/CMakeLists.txt +++ b/src/myx/redis/CMakeLists.txt @@ -17,7 +17,6 @@ set(TRGT_hpp ${CMAKE_CURRENT_SOURCE_DIR}/base-inl.hpp ${CMAKE_CURRENT_SOURCE_DIR}/containers.hpp ${CMAKE_CURRENT_SOURCE_DIR}/containers-inl.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/containers.tpp ${CMAKE_CURRENT_SOURCE_DIR}/pubsub-inl.hpp) set(TRGT_headers ${TRGT_moc_hpp} ${TRGT_hpp}) diff --git a/src/myx/redis/base-inl.hpp b/src/myx/redis/base-inl.hpp index 92c5552..b3a452b 100644 --- a/src/myx/redis/base-inl.hpp +++ b/src/myx/redis/base-inl.hpp @@ -23,107 +23,147 @@ static const QByteArray Separator { "\r\n" }; //!< Строковый разде * \param port Номер порта сервера * \param parent Родительский объект */ -Base::Base( QObject* parent ) : - QObject{ parent } +inline Base::Base( QObject* parent ) : + QObject { parent }, + m_readTimer ( new QTimer( this ) ), + m_writeTimer( new QTimer( this ) ) { - _readTimer = new QTimer { this }; - _writeTimer = new QTimer { this }; - _readTimer->setSingleShot( true ); - _writeTimer->setSingleShot( true ); - _readTimer->setInterval( 100 ); - _writeTimer->setInterval( 100 ); - connect( _readTimer, &QTimer::timeout, [this]() { - if ( _connectionFlags & Connected && _connectionFlags & Read ) + m_readTimer->setSingleShot( true ); + m_writeTimer->setSingleShot( true ); + m_readTimer->setInterval( rwTimeout() ); + m_writeTimer->setInterval( rwTimeout() ); + connect( m_readTimer, &QTimer::timeout, this, [this]() { + if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kRead ) != 0 ) ) { - _connectionFlags -= Read; - emit connectionStateChanged( _connectionFlags ); + m_connectionFlags -= kRead; + Q_EMIT connectionStateChanged( m_connectionFlags ); } } ); - connect( _writeTimer, &QTimer::timeout, [this]() { - if ( _connectionFlags & Connected && _connectionFlags & Write ) + connect( m_writeTimer, &QTimer::timeout, this, [this]() { + if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kWrite ) != 0 ) ) { - _connectionFlags -= Write; - emit connectionStateChanged( _connectionFlags ); + m_connectionFlags -= kWrite; + Q_EMIT connectionStateChanged( m_connectionFlags ); } } ); } +inline QString Base::host() const +{ + return( m_host ); +} + + /*! * \brief Задает адрес хоста * \param host Адрес */ -void Base::setHost( const QString& host ) +inline void Base::setHost( const QString& host ) { - _host = host; + m_host = host; +} + + +inline quint16 Base::port() const +{ + return( m_port ); +} + + +inline void Base::setPort( quint16 port ) +{ + m_port = port; +} + + +inline int Base::connectionFlags() const +{ + return( m_connectionFlags ); +} + + +inline void Base::setConnectionFlags( int connectionFlags ) +{ + m_connectionFlags = connectionFlags; } /*! - * \brief Извлекает длину токена из буфера данных - * \param pos Позиция в буфере - * \param buffer Буфер данных + * \brief Возвращает признак установленного соединения + * \return Признак установленного соединения */ -int Base::fetchLength( QByteArray& buffer, int& pos ) +inline bool Base::isConnected() const { - if ( ( buffer[pos] != '$' ) && ( buffer[pos] != '*' ) && ( buffer[pos] != ':' ) ) - { - buffer.clear(); - pos = 0; - return( -1 ); - } - pos++; - QByteArray text; - while ( pos < buffer.length() ) - { - if ( QChar( buffer[pos] ).isNumber() ) - { - text += buffer[pos++]; - } - else - { - pos += Separator.length(); - return( text.toInt() ); - } - } - return( -1 ); -} // Base::fetchLength + return( ( m_connectionFlags & kConnected ) != 0 ); +} + + +/// *! +// * \brief Извлекает длину токена из буфера данных +// * \param pos Позиция в буфере +// * \param buffer Буфер данных +// */ +// inline int Base::fetchLength( QByteArray& buffer, int& pos ) +// { +// if ( ( buffer[pos] != '$' ) && ( buffer[pos] != '*' ) && ( buffer[pos] != ':' ) ) +// { +// buffer.clear(); +// pos = 0; +// return( -1 ); +// } +// pos++; +// QByteArray text; +// while ( pos < buffer.length() ) +// { +// if ( QChar( buffer[pos] ).isNumber() ) +// { +// text += buffer[pos++]; +// } +// else +// { +// pos += Separator.length(); +// return( text.toInt() ); +// } +// } +// return( -1 ); +// } // Base::fetchLength /*! * \brief Добавляет к состоянию флаг Read */ -void Base::onReadyRead() +inline void Base::onReadyRead() { - if ( _connectionFlags & Connected && !( _connectionFlags & Read ) ) + if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kRead ) == 0 ) ) { - _connectionFlags += Read; - emit connectionStateChanged( _connectionFlags ); + m_connectionFlags += kRead; + Q_EMIT connectionStateChanged( m_connectionFlags ); } - _readTimer->start(); + m_readTimer->start(); } /*! * \brief Добавляет к состоянию флаг Read */ -void Base::onBytesWritten() +inline void Base::onBytesWritten() { - if ( _connectionFlags & Connected && !( _connectionFlags & Write ) ) + if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kWrite ) == 0 ) ) { - _connectionFlags += Write; - emit connectionStateChanged( _connectionFlags ); + m_connectionFlags += kWrite; + Q_EMIT connectionStateChanged( m_connectionFlags ); } - _writeTimer->start(); + m_writeTimer->start(); } /*! * \brief Выставляет флаг ошибки соединения */ -void Base::onSocketError() +inline void Base::onSocketError() { - emit connectionStateChanged( _connectionFlags = Error ); + Q_EMIT connectionStateChanged( m_connectionFlags = kError ); } } // namespace redis diff --git a/src/myx/redis/base.hpp b/src/myx/redis/base.hpp index b4e879d..0f1b002 100644 --- a/src/myx/redis/base.hpp +++ b/src/myx/redis/base.hpp @@ -28,31 +28,47 @@ public: */ enum ConnectionStateFlags { - Unconnected = 1, //!< Соединение не установлено - Connecting = 2, //!< Соединение устанавливается - Connected = 4, //!< Соединение установлено - Read = 8, //!< Есть данные для чтения - Write = 16, //!< Есть данные для записи - Error = 32, //!< Ошибка соединения + kDisconnected = 1, //!< Соединение не установлено + kConnecting = 2, //!< Соединение устанавливается + kConnected = 4, //!< Соединение установлено + kRead = 8, //!< Есть данные для чтения + kWrite = 16, //!< Есть данные для записи + kError = 32, //!< Ошибка соединения }; - Base( QObject* parent = nullptr ); - void setHost( const QString& host ); + explicit Base( QObject* parent = nullptr ); + /*! * \brief Сигнализирует об изменении состояния соединения * \param flags Флаги состояния */ Q_SIGNAL void connectionStateChanged( int flags ); -protected: - static constexpr int Timeout { 2000 }; //!< Таймаут на сетевые операции - int _connectionFlags { Unconnected }; //!< Флаги состояния соединения - quint16 _port { 6379 }; //!< Номер порта подключения к БД - QString _host { "127.0.0.1" }; //!< Имя хоста подключения к БД - QTimer* _readTimer; //!< Таймер чтения данных - QTimer* _writeTimer; //!< Таймер записи данных + bool isConnected() const; - int fetchLength( QByteArray& buffer, int& pos ); +private: + static constexpr int k_RwTimeout { 200 }; //!< Таймаут на операции чтения и записи + int m_connectionFlags { kDisconnected }; //!< Флаги состояния соединения + quint16 m_port { 6379 }; //!< Номер порта подключения к БД + QString m_host { QStringLiteral( "127.0.0.1" ) }; //!< Имя хоста подключения к БД + QTimer* m_readTimer; //!< Таймер чтения данных + QTimer* m_writeTimer; //!< Таймер записи данных + +protected: + + int connectionTimeout() const; + int rwTimeout() const; + + QString host() const; + void setHost( const QString& host ); + + quint16 port() const; + void setPort( quint16 port ); + + int connectionFlags() const; + void setConnectionFlags( int connectionFlags ); + +// int fetchLength( QByteArray& buffer, int& pos ); Q_SLOT void onReadyRead(); Q_SLOT void onBytesWritten(); Q_SLOT void onSocketError(); diff --git a/src/myx/redis/containers-inl.hpp b/src/myx/redis/containers-inl.hpp index 585839b..e6f0278 100644 --- a/src/myx/redis/containers-inl.hpp +++ b/src/myx/redis/containers-inl.hpp @@ -7,8 +7,8 @@ #include #endif -#include #include +#include namespace myx { @@ -19,7 +19,7 @@ namespace redis { * \param value Значение для создани части команды * \return Часть команды */ -QByteArray part( const QVariant& value ) +inline QByteArray part( const QVariant& value ) { auto bytes = value.toByteArray(); return( "$" + QByteArray::number( bytes.length() ) + Separator + bytes ); @@ -31,7 +31,7 @@ QByteArray part( const QVariant& value ) * \param parts Части для составления массива * \return RESP массив */ -QByteArray array( const QVariantList& parts ) +inline QByteArray array( const QVariantList& parts ) { QByteArrayList data; std::transform( parts.cbegin(), parts.cend(), std::back_inserter( data ), @@ -47,13 +47,13 @@ QByteArray array( const QVariantList& parts ) * \param splitLength Длина разобранной части буффера * \return Массив частей-строк */ -QByteArrayList split( const QByteArray& buffer, int* splitLength ) +inline QByteArrayList split( const QByteArray& buffer, int* splitLength ) { enum Token { - Undefined, - FetchNumber, - FetchPart, + kUndefined, + kFetchNumber, + kFetchPart, }; if ( ( buffer == "$-1\r\n" ) || ( buffer == "$*0\r\n" ) || buffer.isEmpty() ) @@ -87,7 +87,10 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength ) { pos++; QByteArray text; - while ( QChar( buffer[pos] ).isNumber() && pos < buffer.size() ) text += buffer[pos++]; + while ( QChar( buffer[pos] ).isNumber() && pos < buffer.size() ) + { + text += buffer[pos++]; + } if ( pos == buffer.size() ) { return {}; @@ -96,29 +99,29 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength ) pos += Separator.length(); } QByteArrayList parts; - Token token { Undefined }; + Token token { kUndefined }; while ( parts.size() < size && pos < buffer.length() ) { switch ( token ) { - case Undefined: + case kUndefined: switch ( buffer[pos] ) { case '$': partLength.clear(); - token = FetchPart; + token = kFetchPart; pos++; break; case ':': partLength.clear(); - token = FetchNumber; + token = kFetchNumber; pos++; break; default: return {}; } break; - case FetchNumber: + case kFetchNumber: if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) ) { partLength += buffer[pos]; @@ -127,11 +130,11 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength ) else { parts.push_back( partLength ); - token = Undefined; + token = kUndefined; pos += Separator.length(); } break; - case FetchPart: + case kFetchPart: { if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) ) { @@ -156,7 +159,7 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength ) parts.push_back( buffer.mid( pos, length ) ); pos += length + Separator.length(); } - token = Undefined; + token = kUndefined; } break; } @@ -181,10 +184,13 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength ) * \param hash Таблица значений * \return Запакованное сообщение */ -QByteArray packHash( const QVariantHash& hash ) +inline QByteArray pack_hash( const QVariantHash& hash ) { QVariantList parts; - for ( auto iter = hash.cbegin(); iter != hash.cend(); ++iter ) parts << iter.key() << iter.value(); + for ( auto iter = hash.cbegin(); iter != hash.cend(); ++iter ) + { + parts << iter.key() << iter.value(); + } return( array( parts ) ); } @@ -194,11 +200,14 @@ QByteArray packHash( const QVariantHash& hash ) * \param message Сообщение * \return Таблица значений */ -QVariantHash unpackHash( const QByteArray& message ) +inline QVariantHash unpack_hash( const QByteArray& message ) { auto parts = split( message ); QVariantHash hash; - for ( int i = 1; i < parts.size(); i += 2 ) hash[parts[i - 1]] = parts[i]; + for ( int i = 1; i < parts.size(); i += 2 ) + { + hash[parts[i - 1]] = parts[i]; + } return( hash ); } @@ -208,7 +217,7 @@ QVariantHash unpackHash( const QByteArray& message ) * \param hash Список значений * \return Запакованное сообщение */ -QByteArray packList( const QVariantList& list ) +inline QByteArray pack_list( const QVariantList& list ) { return( array( list ) ); } @@ -219,7 +228,7 @@ QByteArray packList( const QVariantList& list ) * \param message Сообщение * \return Список значений */ -QVariantList unpackList( const QByteArray& message ) +inline QVariantList unpack_list( const QByteArray& message ) { auto data = split( message ); QVariantList list; diff --git a/src/myx/redis/containers.hpp b/src/myx/redis/containers.hpp index d6ecdbc..bb0d2fe 100644 --- a/src/myx/redis/containers.hpp +++ b/src/myx/redis/containers.hpp @@ -21,7 +21,7 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength = nullptr ); // QVariantHash unpackHash( const QByteArray& message ); // QByteArray packList( QByteArrayList list ); // QByteArray packList( const QVariantList& list ); -QVariantList unpackList( const QByteArray& message ); +QVariantList unpack_list( const QByteArray& message ); // template< typename T > // QByteArray packPair( T first, T second ); // template< typename T1, typename T2 > @@ -31,12 +31,70 @@ QVariantList unpackList( const QByteArray& message ); // template< typename T1, typename T2 > // std::pair< T1, T2 > unpackPair( const QByteArray& message ); + +/*! + * \brief Запаковывает пару значений в сообщение + * \param first Первое значение + * \param second Второе значение + * \return Текстовое сообщение + */ +template< typename T > +QByteArray pack_pair( T first, T second ) +{ + return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) ); +} + + +/*! + * \brief Запаковывает пару значений в сообщение + * \param first Первое значение + * \param second Второе значение + * \return Текстовое сообщение + */ +template< typename T1, typename T2 > +QByteArray pack_pair( T1 first, T2 second ) +{ + return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) ); +} + + +/*! + * \brief Распаковывает сообщение в пару однотипных значений + * \param pair Текст сообщения + * \return Пара значений + */ +template< typename T > +std::pair< T, T > unpack_pair( const QByteArray& message ) +{ + auto list = unpack_list( message ); + if ( list.size() == 2 ) + { + return { QVariant( list.first() ).value< T >(), QVariant( list.last() ).value< T >() }; + } + return {}; +} + + +/*! + * \brief Распаковывает сообщение в пару однотипных значений + * \param pair Текст сообщения + * \return Пара значений + */ +template< typename T1, typename T2 > +std::pair< T1, T2 > unpack_pair( const QByteArray& message ) +{ + auto list = unpack_list( message ); + if ( list.size() == 2 ) + { + return { QVariant( list.first() ).value< T1 >(), QVariant( list.last() ).value< T2 >() }; + } + return {}; +} + } // namespace redis } // namespace myx -#include - #ifdef MYXLIB_HEADER_ONLY #include "containers-inl.hpp" #endif diff --git a/src/myx/redis/containers.tpp b/src/myx/redis/containers.tpp deleted file mode 100644 index 7b3fc7c..0000000 --- a/src/myx/redis/containers.tpp +++ /dev/null @@ -1,68 +0,0 @@ -#include - -namespace myx { - -namespace redis { - -/*! - * \brief Запаковывает пару значений в сообщение - * \param first Первое значение - * \param second Второе значение - * \return Текстовое сообщение - */ -template< typename T > -QByteArray packPair( T first, T second ) -{ - return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) ); -} - - -/*! - * \brief Запаковывает пару значений в сообщение - * \param first Первое значение - * \param second Второе значение - * \return Текстовое сообщение - */ -template< typename T1, typename T2 > -QByteArray packPair( T1 first, T2 second ) -{ - return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) ); -} - - -/*! - * \brief Распаковывает сообщение в пару однотипных значений - * \param pair Текст сообщения - * \return Пара значений - */ -template< typename T > -std::pair< T, T > unpackPair( const QByteArray& message ) -{ - auto list = unpackList( message ); - if ( list.size() == 2 ) - { - return { QVariant( list.first() ).value< T >(), QVariant( list.last() ).value< T >() }; - } - return {}; -} - - -/*! - * \brief Распаковывает сообщение в пару однотипных значений - * \param pair Текст сообщения - * \return Пара значений - */ -template< typename T1, typename T2 > -std::pair< T1, T2 > unpackPair( const QByteArray& message ) -{ - auto list = unpackList( message ); - if ( list.size() == 2 ) - { - return { QVariant( list.first() ).value< T1 >(), QVariant( list.last() ).value< T2 >() }; - } - return {}; -} - -} // namespace redis - -} // namespace myx diff --git a/src/myx/redis/pubsub-inl.hpp b/src/myx/redis/pubsub-inl.hpp index f56722f..fceef6b 100644 --- a/src/myx/redis/pubsub-inl.hpp +++ b/src/myx/redis/pubsub-inl.hpp @@ -9,59 +9,69 @@ #include -#include -#include +#include #include #include -#include +#include +#include namespace myx { namespace redis { -const QString PubSub::KeySpacePrefix { "__key%1__:" }; +const QString PubSub::k_KeySpacePrefix { QStringLiteral( "__key%1__:" ) }; /*! * \brief Конструктор класса - * \param parent Родительский объект для _socket + * \param parent Родительский объект для m_socket */ -PubSub::PubSub( QObject* parent ) : - Base{ parent } +inline PubSub::PubSub( QObject* parent ) : + Base { parent }, + m_socket ( new QTcpSocket( this ) ), + m_connectionTimer( new QTimer( this ) ) + { - _socket = new QTcpSocket { this }; - QObject::connect( _socket, &QAbstractSocket::stateChanged, - this, &PubSub::onSocketStateChanged ); - connect( _socket, &QIODevice::readyRead, this, &PubSub::onReadyRead ); - connect( _socket, &QIODevice::bytesWritten, this, &PubSub::onBytesWritten ); - QObject::connect( _socket, static_cast< void ( QAbstractSocket::* )( QAbstractSocket::SocketError ) > - ( &QAbstractSocket::error ), [this]() { - qDebug() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" ) - << QByteArray( "redis pubsub connection error" ) << _socket->errorString(); + connect( m_socket, &QAbstractSocket::stateChanged, this, &PubSub::onSocketStateChanged ); + connect( m_socket, &QIODevice::readyRead, this, &PubSub::onReadyRead ); + connect( m_socket, &QIODevice::bytesWritten, this, &PubSub::onBytesWritten ); + connect( m_socket, static_cast< void ( QAbstractSocket::* )( QAbstractSocket::SocketError ) > + ( &QAbstractSocket::error ), this, [this]() { + qDebug() << QDateTime::currentDateTimeUtc().toString( QStringLiteral( "hh:mm:ss:zzz" ) ) + << QByteArray( "redis pubsub connection error" ) << m_socket->errorString(); + } ); + connect( m_socket, &QIODevice::readyRead, this, &PubSub::read ); + + m_connectionTimer->setInterval( connectionTimeout() ); + m_connectionTimer->setSingleShot( true ); + connect( m_connectionTimer, &QTimer::timeout, this, [this]() { + m_socket->connectToHost( host(), port() ); } ); - QObject::connect( _socket, &QIODevice::readyRead, this, &PubSub::read ); } /*! * \brief Инициирует работу с БД */ -void PubSub::start() +inline void PubSub::start() { - emit disconnected(); - emit connectionStateChanged( _connectionFlags ); - _socket->connectToHost( _host, _port ); + Q_EMIT disconnected(); + Q_EMIT connectionStateChanged( connectionFlags() ); + m_socket->connectToHost( host(), port() ); } /*! * \brief Завершает работу с БД */ -void PubSub::stop() +inline void PubSub::stop() { - if ( _socket->state() == QAbstractSocket::ConnectedState ) + if ( m_socket->state() == QAbstractSocket::ConnectedState ) { - _socket->disconnectFromHost(); - while ( _socket->state() != QAbstractSocket::UnconnectedState ) QCoreApplication::processEvents(); + m_socket->disconnectFromHost(); + while ( m_socket->state() != QAbstractSocket::UnconnectedState ) + { + QCoreApplication::processEvents(); + } } } @@ -72,32 +82,32 @@ void PubSub::stop() * \param subscriber Объект-подписчик * \param method Имя метода, вызываемого при изменении получении серверного сообщения */ -void PubSub::subscribe( QString channel, QObject* subscriber, const char* method ) +inline void PubSub::subscribe( const QString& channel, QObject* subscriber, const char* method ) { if ( QThread::currentThread() != thread() ) { qWarning() << "no redis connection for caller thread"; return; } - auto iter = _subscribers.find( channel ); - if ( iter == _subscribers.end() ) + auto iter = m_subscribers.find( channel ); + if ( iter == m_subscribers.end() ) { - if ( _socket->state() == QAbstractSocket::ConnectedState ) + if ( m_socket->state() == QAbstractSocket::ConnectedState ) { - _socket->write( array( { "SUBSCIBE", channel } ) ); + m_socket->write( array( { "SUBSCIBE", channel } ) ); } - _subscribers[channel].push_back( { subscriber, method } ); + m_subscribers[channel].emplace_back( subscriber, method ); } else { auto value = iter.value(); auto it = std::find_if( value.begin(), value.end(), - [subscriber, method]( const std::pair< QObject*, const char* >& p ) { + [subscriber, method]( std::pair< QObject*, const char* > p ) { return( p.first == subscriber && strcmp( p.second, method ) == 0 ); } ); if ( it == iter.value().end() ) { - iter.value().push_back( { subscriber, method } ); + iter.value().emplace_back( subscriber, method ); } } } // PubSub::subscribe @@ -110,33 +120,33 @@ void PubSub::subscribe( QString channel, QObject* subscriber, const char* method * \param method Имя метода, вызываемого при изменении получении серверного сообщения * \param database Номер БД */ -void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database ) +inline void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database ) { if ( QThread::currentThread() != thread() ) { qWarning() << "no redis connection for caller thread"; return; } - auto iter = _subscribers.find( channel ); - if ( iter == _subscribers.end() ) + auto iter = m_subscribers.find( channel ); + if ( iter == m_subscribers.end() ) { - channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); - if ( _socket->state() == QAbstractSocket::ConnectedState ) + channel.prepend( k_KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); + if ( m_socket->state() == QAbstractSocket::ConnectedState ) { - _socket->write( array( { "SUBSCRIBE", channel } ) ); + m_socket->write( array( { "SUBSCRIBE", channel } ) ); } - _subscribers[channel].push_back( { subscriber, method } ); + m_subscribers[channel].emplace_back( subscriber, method ); } else { auto value = iter.value(); auto it = std::find_if( value.begin(), value.end(), - [subscriber, method]( const std::pair< QObject*, const char* >& p ) { + [subscriber, method]( std::pair< QObject*, const char* > p ) { return( p.first == subscriber && strcmp( p.second, method ) == 0 ); } ); if ( it == iter.value().end() ) { - iter.value().push_back( { subscriber, method } ); + iter.value().emplace_back( subscriber, method ); } } } // PubSub::subscribe @@ -148,32 +158,32 @@ void PubSub::subscribe( QString channel, QObject* subscriber, const char* method * \param subscriber Объект-подписчик * \param method Имя метода, вызываемого при изменении получении серверного сообщения */ -void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method ) +inline void PubSub::psubscribe( const QString& channel, QObject* subscriber, const char* method ) { if ( QThread::currentThread() != thread() ) { qWarning() << "no redis connection for caller thread"; return; } - auto iter = _psubscribers.find( channel ); - if ( iter == _psubscribers.end() ) + auto iter = m_psubscribers.find( channel ); + if ( iter == m_psubscribers.end() ) { - if ( _socket->state() == QAbstractSocket::ConnectedState ) + if ( m_socket->state() == QAbstractSocket::ConnectedState ) { - _socket->write( array( { "PSUBSCRIBE", channel } ) ); + m_socket->write( array( { "PSUBSCRIBE", channel } ) ); } - _psubscribers[channel].push_back( { subscriber, method } ); + m_psubscribers[channel].emplace_back( subscriber, method ); } else { auto value = iter.value(); auto it = std::find_if( value.begin(), value.end(), - [subscriber, method]( const std::pair< QObject*, const char* >& p ) { + [subscriber, method]( std::pair< QObject*, const char* > p ) { return( p.first == subscriber && strcmp( p.second, method ) == 0 ); } ); if ( it == iter.value().end() ) { - iter.value().push_back( { subscriber, method } ); + iter.value().emplace_back( subscriber, method ); } } } // PubSub::psubscribe @@ -186,40 +196,40 @@ void PubSub::psubscribe( QString channel, QObject* subscriber, const char* metho * \param method Имя метода, вызываемого при изменении получении серверного сообщения * \param database Номер БД (-1 для всех) */ -void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database ) +inline void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database ) { if ( QThread::currentThread() != thread() ) { qWarning() << "no redis connection for caller thread"; return; } - auto iter = _psubscribers.find( channel ); - if ( iter == _psubscribers.end() ) + auto iter = m_psubscribers.find( channel ); + if ( iter == m_psubscribers.end() ) { if ( database == -1 ) { - channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() ); + channel.prepend( k_KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() ); } else { - channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); + channel.prepend( k_KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); } - if ( _socket->state() == QAbstractSocket::ConnectedState ) + if ( m_socket->state() == QAbstractSocket::ConnectedState ) { - _socket->write( array( { "PSUBSCRIBE", channel } ) ); + m_socket->write( array( { "PSUBSCRIBE", channel } ) ); } - _psubscribers[channel].push_back( { subscriber, method } ); + m_psubscribers[channel].emplace_back( subscriber, method ); } else { auto value = iter.value(); auto it = std::find_if( value.begin(), value.end(), - [subscriber, method]( const std::pair< QObject*, const char* >& p ) { + [subscriber, method]( std::pair< QObject*, const char* > p ) { return( p.first == subscriber && strcmp( p.second, method ) == 0 ); } ); if ( it == iter.value().end() ) { - iter.value().push_back( { subscriber, method } ); + iter.value().emplace_back( subscriber, method ); } } } // PubSub::psubscribe @@ -232,18 +242,18 @@ void PubSub::psubscribe( QString channel, QObject* subscriber, const char* metho * \param method Метод обработки данных * \param keyEvents Признак подписки на изменение данных */ -void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method ) +inline void PubSub::unsubscribe( const QString& channel, QObject* subscriber, const char* method ) { if ( QThread::currentThread() != thread() ) { qWarning() << "no redis connection for caller thread"; return; } - auto iter = _subscribers.find( channel ); - if ( iter != _subscribers.end() ) + auto iter = m_subscribers.find( channel ); + if ( iter != m_subscribers.end() ) { auto ctx = std::find_if( iter->begin(), iter->end(), - [subscriber, method]( const std::pair< QObject*, const char* >& value ) { + [subscriber, method]( std::pair< QObject*, const char* > value ) { return( subscriber == value.first && strcmp( method, value.second ) == 0 ); } ); if ( ctx != iter->end() ) @@ -252,11 +262,11 @@ void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* meth } if ( iter->empty() ) { - if ( _socket->state() == QAbstractSocket::ConnectedState ) + if ( m_socket->state() == QAbstractSocket::ConnectedState ) { - _socket->write( array( { "UNSUBSCRIBE", channel } ) ); + m_socket->write( array( { "UNSUBSCRIBE", channel } ) ); } - _subscribers.remove( channel ); + m_subscribers.remove( channel ); } } } // PubSub::unsubscribe @@ -269,7 +279,7 @@ void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* meth * \param method Метод обработки данных * \param keyEvents Признак подписки на изменение данных */ -void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database ) +inline void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database ) { if ( QThread::currentThread() != thread() ) { @@ -278,17 +288,17 @@ void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* meth } if ( database == -1 ) { - channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() ); + channel.prepend( k_KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() ); } else { - channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); + channel.prepend( k_KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); } - auto iter = _subscribers.find( channel ); - if ( iter != _subscribers.end() ) + auto iter = m_subscribers.find( channel ); + if ( iter != m_subscribers.end() ) { auto ctx = std::find_if( iter->begin(), iter->end(), - [subscriber, method]( const std::pair< QObject*, const char* >& value ) { + [subscriber, method]( std::pair< QObject*, const char* > value ) { return( subscriber == value.first && strcmp( method, value.second ) == 0 ); } ); if ( ctx != iter->end() ) @@ -297,11 +307,11 @@ void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* meth } if ( iter->empty() ) { - if ( _socket->state() == QAbstractSocket::ConnectedState ) + if ( m_socket->state() == QAbstractSocket::ConnectedState ) { - _socket->write( array( { "UNSUBSCRIBE", channel } ) ); + m_socket->write( array( { "UNSUBSCRIBE", channel } ) ); } - _subscribers.remove( channel ); + m_subscribers.remove( channel ); } } } // PubSub::unsubscribe @@ -314,19 +324,19 @@ void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* meth * \param method Метод обработки данных * \param keyEvents Признак подписки на изменение данных */ -void PubSub::punsubscribe( QString channel, QObject* subscriber, - const char* method ) +inline void PubSub::punsubscribe( const QString& channel, QObject* subscriber, + const char* method ) { if ( QThread::currentThread() != thread() ) { qWarning() << "no redis connection for caller thread"; return; } - auto iter = _psubscribers.find( channel ); - if ( iter != _psubscribers.end() ) + auto iter = m_psubscribers.find( channel ); + if ( iter != m_psubscribers.end() ) { auto ctx = std::find_if( iter->begin(), iter->end(), - [subscriber, method]( const std::pair< QObject*, const char* >& value ) { + [subscriber, method]( std::pair< QObject*, const char* > value ) { return( subscriber == value.first && strcmp( method, value.second ) == 0 ); } ); if ( ctx != iter->end() ) @@ -335,11 +345,11 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber, } if ( iter->empty() ) { - if ( _socket->state() == QAbstractSocket::ConnectedState ) + if ( m_socket->state() == QAbstractSocket::ConnectedState ) { - _socket->write( array( { "PUNSUBSCRIBE", channel } ) ); + m_socket->write( array( { "PUNSUBSCRIBE", channel } ) ); } - _psubscribers.remove( channel ); + m_psubscribers.remove( channel ); } } } // PubSub::punsubscribe @@ -352,8 +362,8 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber, * \param method Метод обработки данных * \param keyEvents Признак подписки на изменение данных */ -void PubSub::punsubscribe( QString channel, QObject* subscriber, - const char* method, int database ) +inline void PubSub::punsubscribe( QString channel, QObject* subscriber, + const char* method, int database ) { if ( QThread::currentThread() != thread() ) { @@ -362,17 +372,17 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber, } if ( database == -1 ) { - channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() ); + channel.prepend( k_KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() ); } else { - channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); + channel.prepend( k_KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); } - auto iter = _psubscribers.find( channel ); - if ( iter != _psubscribers.end() ) + auto iter = m_psubscribers.find( channel ); + if ( iter != m_psubscribers.end() ) { auto ctx = std::find_if( iter->begin(), iter->end(), - [subscriber, method]( const std::pair< QObject*, const char* >& value ) { + [subscriber, method]( std::pair< QObject*, const char* > value ) { return( subscriber == value.first && strcmp( method, value.second ) == 0 ); } ); if ( ctx != iter->end() ) @@ -381,11 +391,11 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber, } if ( iter->empty() ) { - if ( _socket->state() == QAbstractSocket::ConnectedState ) + if ( m_socket->state() == QAbstractSocket::ConnectedState ) { - _socket->write( array( { "PUNSUBSCRIBE", channel } ) ); + m_socket->write( array( { "PUNSUBSCRIBE", channel } ) ); } - _psubscribers.remove( channel ); + m_psubscribers.remove( channel ); } } } // PubSub::punsubscribe @@ -395,23 +405,23 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber, * \brief Разбирает поступившие от сервера уведомления и вызывает требуемые * метаметоды обработки */ -void PubSub::read() +inline void PubSub::read() { - int splitLength; - while ( _socket->bytesAvailable() > 0 ) + int splitLength = 0; + while ( m_socket->bytesAvailable() > 0 ) { - _buffer += _socket->readAll(); - while ( !_buffer.isEmpty() ) + m_buffer += m_socket->readAll(); + while ( !m_buffer.isEmpty() ) { - auto parts = split( _buffer, &splitLength ); + auto parts = split( m_buffer, &splitLength ); if ( parts.empty() ) { break; } if ( ( parts[0] == "pmessage" ) && ( parts.size() == 4 ) ) { - auto iter = _psubscribers.find( parts[1] ); - if ( iter != _psubscribers.end() ) + auto iter = m_psubscribers.find( parts[1] ); + if ( iter != m_psubscribers.end() ) { for ( const auto& i: iter.value() ) { @@ -425,8 +435,8 @@ void PubSub::read() { if ( ( parts[0] == "message" ) && ( parts.size() == 3 ) ) { - auto iter = _subscribers.find( parts[1] ); - if ( iter != _subscribers.end() ) + auto iter = m_subscribers.find( parts[1] ); + if ( iter != m_subscribers.end() ) { for ( const auto& i: iter.value() ) { @@ -437,7 +447,7 @@ void PubSub::read() } } } - _buffer.remove( 0, splitLength ); + m_buffer.remove( 0, splitLength ); } } } // PubSub::read @@ -446,30 +456,32 @@ void PubSub::read() /*! * \brief Обработка изменения состояния сокета */ -void PubSub::onSocketStateChanged() +inline void PubSub::onSocketStateChanged() { - switch ( _socket->state() ) + switch ( m_socket->state() ) { case QAbstractSocket::ConnectedState: - for ( auto iter = _psubscribers.cbegin(); iter != _psubscribers.cend(); ++iter ) _socket->write( array( { "PSUBSCRIBE", iter.key() } ) ); - for ( auto iter = _subscribers.cbegin(); iter != _subscribers.cend(); ++iter ) _socket->write( array( { "SUBSCRIBE", iter.key() } ) ); - qInfo() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" ) << - QString( "redis at %1:%2 is connected" ).arg( _host ).arg( _port ); - emit connectionStateChanged( _connectionFlags = Connected ); - emit connected(); + for ( auto iter = m_psubscribers.cbegin(); iter != m_psubscribers.cend(); ++iter ) + { + m_socket->write( array( { "PSUBSCRIBE", iter.key() } ) ); + } + for ( auto iter = m_subscribers.cbegin(); iter != m_subscribers.cend(); ++iter ) + { + m_socket->write( array( { "SUBSCRIBE", iter.key() } ) ); + } + setConnectionFlags( kConnected ); + Q_EMIT connectionStateChanged( connectionFlags() ); + Q_EMIT connected(); break; case QAbstractSocket::UnconnectedState: - _buffer.clear(); - QTimer::singleShot( Timeout, [this]() { - _socket->connectToHost( _host, _port ); - } ); - qInfo() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" ) << - QString( "redis at %1:%2 is unconnected" ).arg( _host ).arg( _port ); - if ( _connectionFlags & Connected ) + m_buffer.clear(); + m_connectionTimer->start(); + if ( !isConnected() ) { - emit disconnected(); + Q_EMIT disconnected(); } - emit connectionStateChanged( _connectionFlags = Unconnected ); + setConnectionFlags( kDisconnected ); + Q_EMIT connectionStateChanged( connectionFlags() ); break; default: break; @@ -477,23 +489,13 @@ void PubSub::onSocketStateChanged() } // PubSub::onSocketStateChanged -/*! - * \brief Возвращает признак установленного соединения - * \return Признак установленного соединения - */ -bool PubSub::isConnected() const -{ - return ( _connectionFlags & Connected ); -} - - /*! * \brief Возвращает сокет * \return Сокет */ -QTcpSocket* PubSub::socket() const +inline QTcpSocket* PubSub::socket() const { - return( _socket ); + return( m_socket ); } } // namespace redis diff --git a/src/myx/redis/pubsub.hpp b/src/myx/redis/pubsub.hpp index be62bcf..7f415b4 100644 --- a/src/myx/redis/pubsub.hpp +++ b/src/myx/redis/pubsub.hpp @@ -22,21 +22,20 @@ class PubSub : public Base Q_OBJECT public: - PubSub( QObject* parent = nullptr ); - bool isConnected() const; + explicit PubSub( QObject* parent = nullptr ); QTcpSocket* socket() const; Q_SLOT void start(); Q_SLOT void stop(); - Q_INVOKABLE void subscribe( QString channel, QObject* subscriber, const char* method ); + Q_INVOKABLE void subscribe( const QString& channel, QObject* subscriber, const char* method ); Q_INVOKABLE void subscribe( QString channel, QObject* subscriber, const char* method, int database ); - Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber, const char* method ); + Q_INVOKABLE void psubscribe( const QString& channel, QObject* subscriber, const char* method ); Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber, const char* method, int database ); - Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber, const char* method ); + Q_INVOKABLE void unsubscribe( const QString& channel, QObject* subscriber, const char* method ); Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber, const char* method, int database ); - Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber, const char* method ); + Q_INVOKABLE void punsubscribe( const QString& channel, QObject* subscriber, const char* method ); Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber, const char* method, int database ); /*! @@ -51,14 +50,16 @@ public: private: using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >; - static const QString KeySpacePrefix; //!< Приставка для наблюдения за данными - QTcpSocket* _socket; + static const QString k_KeySpacePrefix; //!< Приставка для наблюдения за данными + QTcpSocket* m_socket; + QTimer* m_connectionTimer; //!< Таймер установления сетевого соединения + static constexpr int k_ConnectionTimeout { 2000 }; //!< Таймаут на сетевые операции /*! * \brief Таблица подписчиков */ - QByteArray _buffer; //!< Буффер прочитанных данных - Sub _subscribers; //!< Обработчики сообщений по каналу подписки - Sub _psubscribers; //!< Обработчики сообщений по шаблону канала подписки + QByteArray m_buffer; //!< Буффер прочитанных данных + Sub m_subscribers; //!< Обработчики сообщений по каналу подписки + Sub m_psubscribers; //!< Обработчики сообщений по шаблону канала подписки Q_SLOT void read(); Q_SLOT void onSocketStateChanged();