diff --git a/CMakeLists.txt b/CMakeLists.txt index a7996a6..f21927c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,13 +44,11 @@ add_subdirectory(src/myx/base) add_subdirectory(src/myx/filesystem) add_subdirectory(src/myx/qt) add_subdirectory(src/myx/math) -add_subdirectory(src/myx/redis) # Примеры if(MYXLIB_BUILD_EXAMPLES OR MYXLIB_BUILD_EXAMPLES_HO) add_subdirectory(examples/filesystem) add_subdirectory(examples/qt) - add_subdirectory(examples/redis) endif() # Документация diff --git a/examples/redis/01_client/CMakeLists.txt b/examples/redis/01_client/CMakeLists.txt deleted file mode 100644 index bc9f579..0000000 --- a/examples/redis/01_client/CMakeLists.txt +++ /dev/null @@ -1,92 +0,0 @@ -# Название основной цели в текущем каталоге -set(TRGT example-redis-client) - -# Список файлов исходных текстов -set(TRGT_cpp ${CMAKE_CURRENT_SOURCE_DIR}/client.cpp) - -set(TRGT_moc_hpp ${CMAKE_CURRENT_SOURCE_DIR}/client.hpp) - -qt5_wrap_cpp(TRGT_moc_cpp ${TRGT_moc_hpp}) - -if(MYXLIB_BUILD_EXAMPLES) - - # Путь поиска библиотек внутри проекта - link_directories(${CMAKE_BINARY_DIR}/${CMAKE_INSTALL_LIBDIR}) - - # Цель для создания исполняемого файла - add_executable(${TRGT} ${TRGT_cpp} ${TRGT_moc_cpp} ${TRGT_moc_hpp}) - common_target_properties(${TRGT}) - - # Создание цели для проверки утилитой clang-tidy - add_clang_tidy_check(${TRGT} ${TRGT_cpp}) - - # Создание цели для проверки утилитой clang-analyze - add_clang_analyze_check(${TRGT} ${TRGT_cpp}) - - # Создание цели для проверки утилитой clazy - add_clazy_check(${TRGT} ${TRGT_cpp}) - - # Создание цели для проверки утилитой pvs-studio - add_pvs_check(${TRGT}) - - # Создание цели для автоматического форматирования кода - add_format_sources(${TRGT} ${TRGT_cpp} ${TRGT_moc_hpp}) - - # Qt5 - target_include_directories(${TRGT} PRIVATE ${CMAKE_SOURCE_DIR}/src) - target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Core_INCLUDE_DIRS}) - target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Network_INCLUDE_DIRS}) - - target_include_directories(${TRGT} SYSTEM PRIVATE ${CMAKE_SOURCE_DIR}/src) - add_dependencies(${TRGT} base redis) - - target_link_libraries(${TRGT} base_static redis_static) - target_link_libraries(${TRGT} Qt5::Core Qt5::Network) - target_link_libraries(${TRGT} Threads::Threads) - - # Имя выходного файла для цели - set_target_properties(${TRGT} PROPERTIES OUTPUT_NAME redis-client) - - add_sanitizers(${TRGT}) - - cotire(${TRGT}) - - add_dependencies(${TRGT} create_auxilary_symlinks) - - # Правила для установки - install(TARGETS ${TRGT} COMPONENT examples RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) -endif() - -if(MYXLIB_BUILD_EXAMPLES_HO) - set(REDIS_LIB_DIR ${CMAKE_SOURCE_DIR}/src/myx/redis) - - set(REDIS_moc_hpp ${REDIS_LIB_DIR}/base.hpp - ${REDIS_LIB_DIR}/containers.hpp - ${REDIS_LIB_DIR}/pubsub.hpp) - - qt5_wrap_cpp(REDIS_moc_cpp ${REDIS_moc_hpp}) - - # Цель для создания исполняемого файла - add_executable(${TRGT}-ho ${TRGT_cpp} ${TRGT_moc_cpp} ${REDIS_moc_cpp} ${TRGT_moc_hpp}) - common_target_properties(${TRGT}-ho) - - target_include_directories(${TRGT}-ho PRIVATE ${CMAKE_SOURCE_DIR}/src) - target_include_directories(${TRGT}-ho SYSTEM PUBLIC ${Qt5Core_INCLUDE_DIRS}) - target_include_directories(${TRGT}-ho SYSTEM PUBLIC ${Qt5Network_INCLUDE_DIRS}) - - add_dependencies(${TRGT}-ho base-header-only redis-header-only) - - target_link_libraries(${TRGT}-ho Qt5::Core Qt5::Network) - target_link_libraries(${TRGT}-ho Threads::Threads) - - # Имя выходного файла для цели - set_target_properties(${TRGT}-ho PROPERTIES OUTPUT_NAME redis-translators-ho) - - add_sanitizers(${TRGT}-ho) - cotire(${TRGT}-ho) - - add_dependencies(${TRGT}-ho create_auxilary_symlinks) - - # Правила для установки - install(TARGETS ${TRGT}-ho COMPONENT examples RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) -endif() diff --git a/examples/redis/01_client/client.cpp b/examples/redis/01_client/client.cpp deleted file mode 100644 index 9700faf..0000000 --- a/examples/redis/01_client/client.cpp +++ /dev/null @@ -1,21 +0,0 @@ -#include "client.hpp" - -#include -#include - -#include -#include - -namespace MR = myx::redis; - -int main( int argc, char** argv ) -{ - QCoreApplication app( argc, argv ); - - MR::PubSub a; - MR::Client b; - RedisClient c( &a ); - a.start(); - - return( QCoreApplication::exec() ); -} // main diff --git a/examples/redis/01_client/client.hpp b/examples/redis/01_client/client.hpp deleted file mode 100644 index 432a015..0000000 --- a/examples/redis/01_client/client.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#include -#include - - #include - -namespace MR = myx::redis; - -class RedisClient : public QObject -{ - Q_OBJECT - -public: - explicit RedisClient( MR::PubSub* p, QObject* o = nullptr ) : - QObject( o ) - { - p->subscribe( "test", this, "readChannelTest" ); - } - - -private: - Q_INVOKABLE void readChannelTest( const QString& channel, const QByteArray& message ) - { - qDebug() << channel; - qDebug() << message; - - if ( message == "quit" ) - { - qCritical() << message; - qApp->quit(); - } - } -}; // class RedisClient diff --git a/examples/redis/CMakeLists.txt b/examples/redis/CMakeLists.txt deleted file mode 100644 index 49cbfc7..0000000 --- a/examples/redis/CMakeLists.txt +++ /dev/null @@ -1 +0,0 @@ -add_subdirectory(01_client) diff --git a/src/myx/redis/CMakeLists.txt b/src/myx/redis/CMakeLists.txt deleted file mode 100644 index 6e833ec..0000000 --- a/src/myx/redis/CMakeLists.txt +++ /dev/null @@ -1,71 +0,0 @@ -# Название основной цели и имя библиотеки в текущем каталоге -set(TRGT redis) - -# cmake-format: off -# Список файлов исходных текстов -set(TRGT_cpp - ${CMAKE_CURRENT_SOURCE_DIR}/base.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/client.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/pubsub.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/containers.cpp) - -# Список заголовочных файлов -set(TRGT_moc_hpp - ${CMAKE_CURRENT_SOURCE_DIR}/base.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/client.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/pubsub.hpp) - -set(TRGT_hpp - ${CMAKE_CURRENT_SOURCE_DIR}/base-inl.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/client-inl.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/containers.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/containers-inl.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/pubsub-inl.hpp) - -set(TRGT_headers ${TRGT_moc_hpp} ${TRGT_hpp}) -# cmake-format: on - -qt5_wrap_cpp(TRGT_moc_cpp ${TRGT_moc_hpp}) - -add_library(${TRGT}-header-only INTERFACE) -target_include_directories( - ${TRGT}-header-only SYSTEM INTERFACE "$" - "$") - -if(MYXLIB_BUILD_LIBRARIES) - add_common_library(${TRGT} OUTPUT_NAME myx-${TRGT} SOURCES ${TRGT_moc_cpp} ${TRGT_cpp} ${TRGT_headers}) - common_target_properties(${TRGT}) - - # Создание цели для проверки утилитой clang-tidy - add_clang_tidy_check(${TRGT} ${TRGT_cpp} ${TRGT_headers}) - - # Создание цели для проверки утилитой clang-analyze - add_clang_analyze_check(${TRGT} ${TRGT_cpp} ${TRGT_headers}) - - # Создание цели для проверки утилитой clazy - add_clazy_check(${TRGT} ${TRGT_cpp} ${TRGT_headers}) - - # Создание цели для проверки утилитой pvs-studio - add_pvs_check(${TRGT}) - - # Создание цели для автоматического форматирования кода - add_format_sources(${TRGT} ${TRGT_cpp} ${TRGT_headers}) - - target_compile_definitions(${TRGT} PUBLIC MYXLIB_BUILD_LIBRARIES) - target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Core_INCLUDE_DIRS}) - target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Network_INCLUDE_DIRS}) - target_include_directories(${TRGT} SYSTEM PRIVATE ${CMAKE_SOURCE_DIR}/src) - - cotire(${TRGT}) - install(TARGETS ${TRGT}_static COMPONENT libs-dev ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) - if(BUILD_SHARED_LIBS) - install(TARGETS ${TRGT}_shared COMPONENT main LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}) - endif() -endif() - -generate_pkgconfig(myx-${TRGT} COMPONENT base-dev INSTALL_LIBRARY ${MYXLIB_BUILD_LIBRARIES}) -install(FILES ${TRGT_headers} COMPONENT base-dev DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME}/${TRGT}) - -# Цель, используемая только для установки заголовочных файлов без компиляции проекта -add_custom_target(${TRGT}-install-headers COMMAND ${CMAKE_COMMAND} -DCMAKE_INSTALL_COMPONENT=base-dev -P - "${CMAKE_BINARY_DIR}/cmake_install.cmake") diff --git a/src/myx/redis/base-inl.hpp b/src/myx/redis/base-inl.hpp deleted file mode 100644 index 1817554..0000000 --- a/src/myx/redis/base-inl.hpp +++ /dev/null @@ -1,146 +0,0 @@ -#ifndef MYX_REDIS_BASE_INL_HPP_ -#define MYX_REDIS_BASE_INL_HPP_ - -#pragma once - -#ifndef MYXLIB_HEADER_ONLY -#include -#endif - -#include - -#include - -namespace myx { - -namespace redis { - -/*! - * \brief Создаёт объект класса - * \param host Имя хоста сервера - * \param port Номер порта сервера - * \param parent Родительский объект - */ -MYXLIB_INLINE Base::Base( QObject* parent ) : - QObject { parent }, - m_readTimer ( new QTimer( this ) ), - m_writeTimer( new QTimer( this ) ) -{ - m_readTimer->setSingleShot( true ); - m_writeTimer->setSingleShot( true ); - m_readTimer->setInterval( rwTimeout() ); - m_writeTimer->setInterval( rwTimeout() ); - connect( m_readTimer, &QTimer::timeout, this, [this]() { - if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kRead ) != 0 ) ) - { - m_connectionFlags -= kRead; - Q_EMIT connectionStateChanged( m_connectionFlags ); - } - } ); - connect( m_writeTimer, &QTimer::timeout, this, [this]() { - if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kWrite ) != 0 ) ) - { - m_connectionFlags -= kWrite; - Q_EMIT connectionStateChanged( m_connectionFlags ); - } - } ); -} - - -MYXLIB_INLINE int Base::rwTimeout() const -{ - return( kRWTimeout ); -} - - -MYXLIB_INLINE QString Base::host() const -{ - return( m_host ); -} - - -/*! - * \brief Задаёт адрес хоста - * \param host Адрес - */ -MYXLIB_INLINE void Base::setHost( const QString& host ) -{ - m_host = host; -} - - -MYXLIB_INLINE quint16 Base::port() const -{ - return( m_port ); -} - - -MYXLIB_INLINE void Base::setPort( quint16 port ) -{ - m_port = port; -} - - -MYXLIB_INLINE int Base::connectionFlags() const -{ - return( m_connectionFlags ); -} - - -MYXLIB_INLINE void Base::setConnectionFlags( int connectionFlags ) -{ - m_connectionFlags = connectionFlags; -} - - -/*! - * \brief Возвращает признак установленного соединения - * \return Признак установленного соединения - */ -MYXLIB_INLINE bool Base::isConnected() const -{ - return( ( m_connectionFlags & kConnected ) != 0 ); -} - - -/*! - * \brief Добавляет к состоянию флаг Read - */ -MYXLIB_INLINE void Base::onReadyRead() -{ - if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kRead ) == 0 ) ) - { - m_connectionFlags += kRead; - Q_EMIT connectionStateChanged( m_connectionFlags ); - } - m_readTimer->start(); -} - - -/*! - * \brief Добавляет к состоянию флаг Write - */ -MYXLIB_INLINE void Base::onBytesWritten() -{ - if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kWrite ) == 0 ) ) - { - m_connectionFlags += kWrite; - Q_EMIT connectionStateChanged( m_connectionFlags ); - } - m_writeTimer->start(); -} - - -/*! - * \brief Выставляет флаг ошибки соединения - */ -MYXLIB_INLINE void Base::onSocketError() -{ - Q_EMIT connectionStateChanged( m_connectionFlags = kError ); -} - -} // namespace redis - -} // namespace myx - -#endif // ifndef MYX_REDIS_BASE_INL_HPP_ diff --git a/src/myx/redis/base.cpp b/src/myx/redis/base.cpp deleted file mode 100644 index 0ac1478..0000000 --- a/src/myx/redis/base.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#ifndef MYXLIB_BUILD_LIBRARIES -#error Define MYXLIB_BUILD_LIBRARIES to compile this file. -#endif - -#include diff --git a/src/myx/redis/base.hpp b/src/myx/redis/base.hpp deleted file mode 100644 index 107d1b9..0000000 --- a/src/myx/redis/base.hpp +++ /dev/null @@ -1,83 +0,0 @@ -#ifndef MYX_REDIS_BASE_HPP_ -#define MYX_REDIS_BASE_HPP_ - -#pragma once - -#include - -#include -#include - -QT_FORWARD_DECLARE_CLASS( QTimer ) -QT_FORWARD_DECLARE_CLASS( QTcpSocket ) - -namespace myx { - -namespace redis { - -/*! - * \brief Базовый класс для объектов взаимодействия с redis - */ -class Base : public QObject -{ - Q_OBJECT - -public: - /*! - * \brief Состояние соединения - */ - enum ConnectionStateFlags - { - kDisconnected = 1, //!< Соединение не установлено - kConnecting = 2, //!< Соединение устанавливается - kConnected = 4, //!< Соединение установлено - kRead = 8, //!< Есть данные для чтения - kWrite = 16, //!< Есть данные для записи - kError = 32, //!< Ошибка соединения - }; - - explicit Base( QObject* parent = nullptr ); - - /*! - * \brief Сигнализирует об изменении состояния соединения - * \param flags Флаги состояния - */ - Q_SIGNAL void connectionStateChanged( int flags ); - - bool isConnected() const; - -private: - static constexpr int kRWTimeout { 200 }; //!< Таймаут на операции чтения и записи - int m_connectionFlags { kDisconnected }; //!< Флаги состояния соединения - quint16 m_port { 6379 }; //!< Номер порта подключения к БД - QString m_host { QStringLiteral( "127.0.0.1" ) }; //!< Имя хоста подключения к БД - QTimer* m_readTimer; //!< Таймер чтения данных - QTimer* m_writeTimer; //!< Таймер записи данных - -protected: - - int rwTimeout() const; - - QString host() const; - void setHost( const QString& host ); - - quint16 port() const; - void setPort( quint16 port ); - - int connectionFlags() const; - void setConnectionFlags( int connectionFlags ); - - Q_SLOT void onReadyRead(); - Q_SLOT void onBytesWritten(); - Q_SLOT void onSocketError(); -}; // class Base - -} // namespace redis - -} // namespace myx - -#ifdef MYXLIB_HEADER_ONLY -#include "base-inl.hpp" -#endif - -#endif // ifndef MYX_REDIS_BASE_HPP_ diff --git a/src/myx/redis/client-inl.hpp b/src/myx/redis/client-inl.hpp deleted file mode 100644 index 6893d82..0000000 --- a/src/myx/redis/client-inl.hpp +++ /dev/null @@ -1,2280 +0,0 @@ -#ifndef MYX_REDIS_CLIENT_INL_HPP_ -#define MYX_REDIS_CLIENT_INL_HPP_ - -#pragma once - -#ifndef MYXLIB_HEADER_ONLY -#include -#endif - -#include - -#include -#include -#include -#include -#include -#include - - -namespace myx { - -namespace redis { - -namespace constants { - -extern const QByteArray Separator; // { "\r\n" }; //!< Строковый разделитель слов во фразе - -} // namespace constants - -namespace C = constants; - -/*! - * \brief Создает объект класса - * \param parent Родительский объект - * \param database Идентификатор БД - */ -Client::Client( int database, QObject* parent ) : - Base{ parent } -{ - _database = database; -} - - -/*! - * \brief Удаляет все созданные соединения - */ -Client::~Client() -{ - for ( auto iter: _session ) - { - if ( iter->state() == QTcpSocket::ConnectedState ) - { - iter->disconnectFromHost(); - if ( iter->state() != QTcpSocket::UnconnectedState ) - { - iter->waitForDisconnected(); - } - } - iter->deleteLater(); - } -} - - -/*! - * \brief Возвращает идентификатор БД - * \return Идентификатор БД - */ -int Client::database() const -{ - return( _database ); -} - - -/*! - * \brief Выполняет переключение текущей БД - * \param database Номер новой БД, к которой будет организовано подключение - */ -void Client::select( int database ) -{ - if ( _database == database ) - { - return; - } - _database = database; - _outOfDatabase = { _session.begin(), _session.end() }; -} - - -/*! - * \brief Определяет сколько из переданных ключей существует в redis - * \param keys Rk.xb - * \return Количество существующих ключей - */ -int Client::exists( const QStringList& keys ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( 0 ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( 0 ); - } - socket->write( array( QVariantList { "EXISTS" } + QVariant( keys ).toList() ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( 0 ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - if ( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ) - { - QRegExp rx { "\\d+" }; - rx.indexIn( buffer ); - return( rx.cap().toInt() ); - } - return( -1 ); -} // Client::exists - - -/*! - * \brief Очищает БД - */ -bool Client::flushdb() -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( { "$7", "FLUSHDB" } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( buffer == "+OK\r\n" ); -} // Client::flushdb - - -/*! - * \brief Удаляет данные по списку ключей - * \param keys Список ключей для удаления - * \return Признак успешной операции - */ -bool Client::del( const QStringList& keys ) -{ - if ( keys.empty() ) - { - return( true ); - } - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( QVariantList { "DEL" } + QVariant( keys ).toList() ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ); -} // Client::del - - -/*! - * \brief Устанавливает таймаут на значение в секундах - * \param key Ключ значения - * \param secs Таймаут в секундах - * \return Признак успешной установки - */ -bool Client::expire( const QString& key, int secs ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( { "EXPIRE", key, secs } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( true ); -} // Client::expire - - -/*! - * \brief Устанавливает таймаут на значение в милисекундах - * \param key Ключ значения - * \param secs Таймаут в милисекундах - * \return Признак успешной установки - */ -bool Client::pexpire( const QString& key, int msecs ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( { "PEXPIRE", key, msecs } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( true ); -} // Client::pexpire - - -/*! - * \brief Удаляет таймаут со значения, делаю его persistent - * \param key Ключ значения - * \return Признак успешного удаления таймаута - */ -bool Client::persist( const QString& key ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( { "PERSIST", key } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( true ); -} // Client::persist - - -/*! - * \brief Публикует сообщение - * \param channel Канал сообщений - * \param message Текст сообщений - * \return Признак удачной публикации - */ -bool Client::publish( const QString& channel, const QByteArray& message ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - if ( message.isEmpty() ) - { - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( { "PUBLISH", channel, message } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( true ); -} // Client::publish - - -/*! - * \brief Получает данные из redis без создания постоянного подключения - * - * Метод отличается более низкой производительностью по сравнению с нестатической реализацией - * \param host Хост для установки соединения с сервером redis - * \param port Порт для установки соединения с сервером redis - * \param channel Канал сообщений - * \param message Текст сообщений - * \return Признак удачной публикации - */ -bool Client::publish( const QString& channel, const QByteArray& message, - const QString& host, quint16 port ) -{ - if ( message.isEmpty() ) - { - return( false ); - } - QTcpSocket socket; - do { - if ( socket.state() == QAbstractSocket::UnconnectedState ) - { - socket.connectToHost( host, port ); - } - QCoreApplication::processEvents(); - } while( socket.state() != QAbstractSocket::ConnectedState ); - socket.write( array( { "PUBLISH", channel, message } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket.state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket.bytesAvailable() > 0 ) - { - buffer.append( socket.readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( true ); -} // Client::publish - - -/*! - * \brief Выполняет команду SET протокола RESP - * \param key Ключ-строка - * \param value Значение по ключу - * \return Признак удачной установки значения - */ -bool Client::set( const QString& key, const QVariant& value ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( { "SET", key, value } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( buffer == "+OK\r\n" ); -} // Client::set - - -/*! - * \brief Получает данные из redis без создания постоянного подключения - * - * Метод отличается более низкой производительностью по сравнению с нестатической реализацией - * \param key Ключ данных - * \param host Хост для установки соединения с сервером redis - * \param port Порт для установки соединения с сервером redis - * \param database Идентификатор БД - * \return Строка данных - */ -bool Client::set( const QString& key, const QVariant& value, - int database, const QString& host, quint16 port ) -{ - QTcpSocket socket; - if ( !setupSocket( &socket, database, host, port ) ) - { - return {}; - } - socket.write( array( { "SET", key, value } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket.state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket.bytesAvailable() > 0 ) - { - buffer.append( socket.readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( buffer == "+OK\r\n" ); -} // Client::set - - -/*! - * \brief Загружает lua скрипт на сервер - * \param script Код скрипта - * \return SHA загруженного скрипта (пустая SHA в случае возникновения ошибки) - */ -QByteArray Client::scriptLoad( const QByteArray& script ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( { "SCRIPT", "LOAD", script } ) ); - QByteArray buffer; - while ( socket->state() == QAbstractSocket::ConnectedState ) - { - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( buffer.startsWith( "-ERR" ) ) - { - qWarning() << "loading redis script error:" << buffer; - return {}; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - if ( parts.size() == 1 ) - { - return( parts.front() ); - } - return {}; - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::scriptLoad - - -/*! - * \brief Синхронный метод получения значения - * \param key Ключ - * \return Строка данных - */ -QByteArray Client::get( const QString& key ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( { "GET", key } ) ); - QByteArray buffer; - while ( socket->state() == QAbstractSocket::ConnectedState ) - { - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( buffer.startsWith( "-WRONG" ) ) - { - return {}; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - if ( parts.size() == 1 ) - { - return( parts.front() ); - } - return {}; - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::get - - -/*! - * \brief Получает данные из redis без создания постоянного подключения - * - * Метод отличается более низкой производительностью по сравнению с нестатической реализацией - * \param key Ключ данных - * \param host Хост для установки соединения с сервером redis - * \param port Порт для установки соединения с сервером redis - * \param database Идентификатор БД - * \return Строка данных - */ -QByteArray Client::get( const QString& key, int database, const QString& host, quint16 port ) -{ - QTcpSocket socket; - if ( !setupSocket( &socket, database, host, port ) ) - { - return {}; - } - socket.write( array( { "GET", key } ) ); - QByteArray buffer; - QByteArray value; - while ( socket.state() == QAbstractSocket::ConnectedState ) - { - if ( socket.bytesAvailable() > 0 ) - { - buffer.append( socket.readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - if ( parts.size() == 1 ) - { - value = parts.front(); - } - break; - } - else - { - QCoreApplication::processEvents(); - } - } - if ( socket.state() == QAbstractSocket::ConnectedState ) - { - socket.disconnectFromHost(); - } - while ( socket.state() != QAbstractSocket::UnconnectedState ) QCoreApplication::processEvents(); - return( value ); -} // Client::get - - -/*! - * \brief Задает значение, хранящееся в хэш-таблице - * \param key Ключ таблицы - * \param field Ключ значения - * \param value Значение - * \return Признак успешной операции - */ -bool Client::hset( const QString& key, const QString& field, const QVariant& value ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( { "HSET", key, field, value } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ); -} // Client::hset - - -/*! - * \brief Заполняет hash по ключу - * \param key Ключ - * \param hash Hash таблица со значениями - * \return Признак корректного заполения - */ -bool Client::hmset( const QString& key, const QVariantHash& hash ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - QVariantList parts { "HMSET", key }; - for ( auto iter = hash.cbegin(); iter != hash.cend(); iter++ ) parts << iter.key() << iter.value(); - socket->write( array( parts ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( buffer == "+OK\r\n" ); -} // Client::hmset - - -/*! - * \brief Заполняет удаляет данные из хэш-таблицы redis - * \param key Ключ таблицы для redis - * \param hashKeys Ключи удаляемых данных - * \return Признак корректного заполения - */ -bool Client::hdel( const QString& redisKey, const QStringList& hashKeys ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( QVariantList { "HDEL", redisKey } + QVariant( hashKeys ).toList() ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ); -} // Client::hdel - - -/*! - * \brief Заполняет hash по ключу - * - * Метод отличается более низкой производительностью по сравнению с нестатической реализацией - * \param key Ключ - * \param hash Hash таблица со значениями - * \param host Хост для установки соединения с сервером redis - * \param port Порт для установки соединения с сервером redis - * \param database Идентификатор БД - * \return Признак корректного заполения - */ -bool Client::hmset( const QString& key, const QVariantHash& hash, int database, - const QString& host, quint16 port ) -{ - QTcpSocket socket; - if ( !setupSocket( &socket, database, host, port ) ) - { - return {}; - } - QVariantList parts { "HMSET", key }; - for ( auto iter = hash.cbegin(); iter != hash.cend(); iter++ ) parts << iter.key() << iter.value(); - socket.write( array( parts ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket.state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket.bytesAvailable() > 0 ) - { - buffer.append( socket.readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - if ( socket.state() == QAbstractSocket::ConnectedState ) - { - socket.disconnectFromHost(); - } - while ( socket.state() != QAbstractSocket::UnconnectedState ) QCoreApplication::processEvents(); - return( buffer == "+OK\r\n" ); -} // Client::hmset - - -/*! - * \brief Синхронный метод хэш-таблицы - * \param key Ключ - * \return Таблица данных - */ -QVariantHash Client::hgetall( const QString& key ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( { "HGETALL", key } ) ); - QByteArray buffer; - QVariantHash hash; - while ( socket->state() == QAbstractSocket::ConnectedState ) - { - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( buffer.startsWith( "-WRONG" ) ) - { - return {}; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - for ( int i = 1; i < parts.size(); i += 2 ) hash[parts[i - 1]] = parts[i]; - return( hash ); - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::hgetall - - -/*! - * \brief Получает данные из redis без создания постоянного подключения - * - * Метод отличается более низкой производительностью по сравнению с нестатической реализацией - * \param key Ключ данных - * \param host Хост для установки соединения с сервером redis - * \param port Порт для установки соединения с сервером redis - * \param database Идентификатор БД - * \return Таблица данных - */ -QVariantHash Client::hgetall( const QString& key, int database, - const QString& host, quint16 port ) -{ - QTcpSocket socket; - if ( !setupSocket( &socket, database, host, port ) ) - { - return {}; - } - socket.write( array( { "HGETALL", key } ) ); - QByteArray buffer; - QVariantHash hash; - while ( socket.state() == QAbstractSocket::ConnectedState ) - { - if ( socket.bytesAvailable() > 0 ) - { - buffer.append( socket.readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( ( buffer == "$-1\r\n" ) || ( buffer == "*0\r\n" ) ) - { - break; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - for ( int i = 1; i < parts.size(); i += 2 ) hash[parts[i - 1]] = parts[i]; - break; - } - else - { - QCoreApplication::processEvents(); - } - } - if ( socket.state() == QAbstractSocket::ConnectedState ) - { - socket.disconnectFromHost(); - } - while ( socket.state() != QAbstractSocket::UnconnectedState ) QCoreApplication::processEvents(); - return( hash ); -} // Client::hgetall - - -/*! - * \brief Получает данные из redis без создания постоянного подключения - * \param redisKey Ключ данных для redis - * \param hashKey Ключ внутри запрашиваемой таблицы - * \return Значение из таблицы - */ -QByteArray Client::hget( const QString& redisKey, const QString& hashKey ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( { "HGET", redisKey, hashKey } ) ); - QByteArray value; - QByteArray buffer; - while ( socket->state() == QAbstractSocket::ConnectedState ) - { - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - if ( parts.size() == 1 ) - { - return( parts.front() ); - } - return {}; - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::hget - - -/*! - * \brief Получает данные из redis без создания постоянного подключения - * - * Метод отличается более низкой производительностью по сравнению с нестатической реализацией - * \param redisKey Ключ данных для redis - * \param hashKey Ключ внутри запрашиваемой таблицы - * \param host Хост для установки соединения с сервером redis - * \param port Порт для установки соединения с сервером redis - * \param database Идентификатор БД - * \return Значение из таблицы - */ -QByteArray Client::hget( const QString& redisKey, const QString& hashKey, - int database, const QString& host, quint16 port ) -{ - QTcpSocket socket; - if ( !setupSocket( &socket, database, host, port ) ) - { - return {}; - } - socket.write( array( { "HGET", redisKey, hashKey } ) ); - QByteArray value; - QByteArray buffer; - while ( socket.state() == QAbstractSocket::ConnectedState ) - { - if ( socket.bytesAvailable() > 0 ) - { - buffer.append( socket.readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - if ( parts.size() == 1 ) - { - value = parts.front(); - } - break; - } - else - { - QCoreApplication::processEvents(); - } - } - if ( socket.state() == QAbstractSocket::ConnectedState ) - { - socket.disconnectFromHost(); - } - while ( socket.state() != QAbstractSocket::UnconnectedState ) QCoreApplication::processEvents(); - return( value ); -} // Client::hget - - -/*! - * \brief Добавляет элементы в конец списка - * \param key Ключ списка - * \param values Добавляемые элементы - * \return Признак успешной операции - */ -bool Client::rpush( const QString& key, const QVariantList& values ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( QVariantList { "RPUSH", key } + values ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ); -} // Client::rpush - - -/*! - * \brief Добавляет элементы в начало списка - * \param key Ключ списка - * \param values Добавляемые элементы - * \return Признак успешной операции - */ -bool Client::lpush( const QString& key, const QVariantList& values ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( QVariantList { "LPUSH", key } + values ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ); -} // Client::lpush - - -/*! - * \brief Добавляет задает значение элемента по его индексу - * \param key Ключ списка - * \param index Индекс элемента - * \param index Значение элемента - * \return Признак успешной операции - */ -bool Client::lset( const QString& key, int index, const QVariant& value ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - auto indexString = QByteArray::number( index ); - auto valueString = value.toString(); - socket->write( array( { "LSET", key, index, value } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( buffer == "+OK\r\n" ); -} // Client::lset - - -/*! - * \brief Оставляет только элементы списка внутри диапазона from...to - * \param key Ключ списка - * \param from Начало диапазона (0-based) - * > 0: от начала списка - * < 0: от конца списка - * \param to Конец диапазона (0-based) - * > 0: от начала списка - * < 0: от конца списка - * \return Признак успешной операции - */ -bool Client::ltrim( const QString& key, int from, int to ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( { "LTRIM", key, from, to } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( buffer == "+OK\r\n" ); -} // Client::ltrim - - -/*! - * \brief Удаляет count первых элементов списка - * \param key Ключ списка - * \param count Количество удаляемых элементов - * count > 0 от начала списка - * count < 0 от конца списка - * count = 0 удаляет все элементы - * \param value Значение удаляемых элементов - * \return Признак успешной операции - */ -bool Client::lrem( const QString& key, const QVariant& value, int count ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( { "LREM", key, count, value } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ); -} // Client::lrem - - -/*! - * \brief Возвращает длину списка - * \param key Ключ списка - * \return Длина списка или -1 в случае ошибки операции - */ -int Client::llen( const QString& key ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( -1 ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( -1 ); - } - socket->write( array( { "LLEN", key } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( -1 ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - if ( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ) - { - QRegExp rx { "\\d+" }; - rx.indexIn( buffer ); - return( rx.cap().toInt() ); - } - return( -1 ); -} // Client::llen - - -/*! - * \brief Добавляет элементы в начало сета - * - * O(N), N - количество добавляемых элементов - * \param key Ключ сета - * \param values Добавляемые элементы - * \return Признак успешной операции - */ -bool Client::sadd( const QString& key, const QVariantList& values ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( QVariantList { "SADD", key } + values ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ); -} // Client::sadd - - -/*! - * \brief Удаляет значения из сета - * - * O(N), N - количество добавляемых элементов - * \param key Ключ сета - * \param values Добавляемые элементы - * \return Признак успешной операции - */ -bool Client::srem( const QString& key, const QVariantList& values ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( QVariantList { "SREM", key } + values ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ); -} // Client::srem - - -/*! - * \brief Проверяет является ли значение частью набора - * \param key Ключ набора - * \param value Проверяемое значение - * \return Признак того, что значение входит в набор - */ -bool Client::sismember( const QString& key, const QVariant& value ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( false ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( false ); - } - socket->write( array( { "SISMEMBER", key, value } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( buffer == ":1\r\n" ); -} // Client::sismember - - -/*! - * \brief Проверяет является ли значение частью набора без создания постоянного подключения - * - * Метод отличается более низкой производительностью по сравнению с нестатической реализацией - * \param key Ключ набора - * \param value Проверяемое значение - * \param host Хост для установки соединения с сервером redis - * \param port Порт для установки соединения с сервером redis - * \param database Идентификатор БД - * \return Признак того, что значение входит в набор - */ -bool Client::sismember( const QString& key, const QVariant& value, - int database, const QString& host, quint16 port ) -{ - QTcpSocket socket; - if ( !setupSocket( &socket, database, host, port ) ) - { - return {}; - } - socket.write( array( { "SISMEMBER", key, value } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket.state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( socket.bytesAvailable() > 0 ) - { - buffer.append( socket.readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - if ( socket.state() == QAbstractSocket::ConnectedState ) - { - socket.disconnectFromHost(); - } - while ( socket.state() != QAbstractSocket::UnconnectedState ) QCoreApplication::processEvents(); - return( buffer == ":1\r\n" ); -} // Client::sismember - - -/*! - * \brief Возвращает количество элементов в наборе - * \param key Ключ набора - * \return КОличество элементов - */ -int Client::scard( const QString& key ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return( -1 ); - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return( -1 ); - } - socket->write( array( { "SCARD", key } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( -1 ); - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - } - else - { - QCoreApplication::processEvents(); - } - } - if ( QRegExp( ":\\d+\\r\\n" ).exactMatch( buffer ) ) - { - QRegExp rx { "\\d+" }; - rx.indexIn( buffer ); - return( rx.cap().toInt() ); - } - return( -1 ); -} // Client::scard - - -/*! - * \brief Возвращает элементы сета - * \param key Ключ сета - * \return Строковые значения сета - */ -QVariantList Client::smembers( const QString& key ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( { "SMEMBERS", key } ) ); - QByteArray buffer; - while ( socket->state() == QAbstractSocket::ConnectedState ) - { - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( ( buffer == "$-1\r\n" ) || ( buffer == "*0\r\n" ) ) - { - return {}; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - QVariantList list; - std::copy( parts.begin(), parts.end(), std::back_inserter( list ) ); - return( list ); - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::smembers - - -/*! - * \brief Удаляет и возвращает последний элемент списка - * \param key Ключ списка - * \return Последний элемент - */ -QByteArray Client::rpop( const QString& key ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( { "RPOP", key } ) ); - QByteArray buffer; - while ( !buffer.endsWith( C::Separator ) ) - { - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return {}; - } - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - return( parts.size() == 1 ? parts.front() : "" ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( "" ); -} // Client::rpop - - -/*! - * \brief Возвращает элемент по его позиции - * \param key Ключ списка - * \param index Позиция элемента - * \return Текстовое представление элемента - */ -QByteArray Client::lindex( const QString& key, int index ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( { "LINDEX", key, index } ) ); - QByteArray buffer; - while ( socket->state() == QAbstractSocket::ConnectedState ) - { - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( buffer == "$-1" + C::Separator ) - { - return {}; - } - buffer.append( socket->readAll() ); - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - if ( parts.size() == 1 ) - { - return( parts.front() ); - } - return {}; - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::lindex - - -/*! - * \brief Реализует метод redis KEYS - * \param pattern Шпблон для поиска ключей - * \return Список ключей, соответствующих шиблону - */ -QStringList Client::keys( const QString& pattern ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( pattern.isEmpty() ? QVariantList { "KEYS" } : - QVariantList { "KEYS", pattern } ) ); - QByteArray buffer; - while ( socket->state() == QAbstractSocket::ConnectedState ) - { - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( ( buffer == "$-1\r\n" ) || ( buffer == "*0\r\n" ) || buffer.startsWith( "-WRONG" ) ) - { - return {}; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - QStringList list; - std::copy( parts.begin(), parts.end(), std::back_inserter( list ) ); - return( list ); - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::keys - - -/*! - * \brief Проверяет является ли значение частью набора без создания постоянного подключения - * - * Метод отличается более низкой производительностью по сравнению с нестатической реализацией - * \param pattern Шаблон ключей - * \param host Хост для установки соединения с сервером redis - * \param port Порт для установки соединения с сервером redis - * \param database Идентификатор БД - * \return Признак того, что значение входит в набор - */ -QStringList Client::keys( const QString& pattern, int database, const QString& host, quint16 port ) -{ - QTcpSocket socket; - if ( !setupSocket( &socket, database, host, port ) ) - { - return {}; - } - socket.write( array( pattern.isEmpty() ? QVariantList { "KEYS" } : - QVariantList { "KEYS", pattern } ) ); - QByteArray buffer; - while ( socket.state() == QAbstractSocket::ConnectedState ) - { - if ( socket.bytesAvailable() > 0 ) - { - buffer.append( socket.readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( ( buffer == "$-1\r\n" ) || ( buffer == "*0\r\n" ) || buffer.startsWith( "-WRONG" ) ) - { - return {}; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - QStringList list; - std::copy( parts.begin(), parts.end(), std::back_inserter( list ) ); - return( list ); - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::keys - - -/*! - * \brief Возвращает элементы хранимого списка - * \param key Ключ списка - * \param from Начальный индекс - * \param to Конечный индекс - * \return Строковые значения списка - */ -QVariantList Client::lrange( const QString& key, int from, int to ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( { "LRANGE", key, from, to } ) ); - QByteArray buffer; - while ( socket->state() == QAbstractSocket::ConnectedState ) - { - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( ( buffer == "$-1\r\n" ) || ( buffer == "*0\r\n" ) || buffer.startsWith( "-WRONG" ) ) - { - return {}; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - QVariantList list; - std::copy( parts.begin(), parts.end(), std::back_inserter( list ) ); - return( list ); - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::lrange - - -/*! - * \brief Извлекает из базы вектор и преобразует его к типу вектор json - * - * Специализация - * \param key Ключ для измлечения данных - * \param database Идентификатор БД - * \param port Номер порта redis - * \param from Начало интервала - * \param to Конец интервала - * \param Хост redis - * \return Вектор значений - */ -template<> -std::vector< QJsonObject > Client::lrange< QJsonObject >( const QString& key, int from, int to, - int database, - const QString& host, quint16 port ) -{ - auto list = lrange( key, from, to, database, host, port ); - std::vector< QJsonObject > data; -// TODO -// std::transform( list.begin(), list.end(), std::back_inserter( data ), -// []( const auto& iter ) { return( QJsonDocument::fromJson( iter ).object() ); } ); - return( data ); -} - - -/*! - * \brief Возвращает элементы хранимого списка - * - * Метод отличается более низкой производительностью по сравнению с нестатической реализацией - * \param key Ключ списка - * \param from Начальный индекс - * \param to Конечный индекс - * \param port Порт для установки соединения с сервером redis - * \param database Идентификатор БД - * \return Строковые значения списка - */ -QByteArrayList Client::lrange( const QString& key, int from, int to, int database, - const QString& host, quint16 port ) -{ - QTcpSocket socket; - if ( !setupSocket( &socket, database, host, port ) ) - { - return {}; - } - socket.write( array( { "LRANGE", key, from, to } ) ); - QByteArray buffer; - QByteArrayList data; - while ( socket.state() == QAbstractSocket::ConnectedState ) - { - if ( socket.bytesAvailable() > 0 ) - { - buffer.append( socket.readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( ( buffer == "$-1\r\n" ) || ( buffer == "*0\r\n" ) || buffer.startsWith( "-WRONG" ) ) - { - break; - } - int splitPos; - data = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - break; - } - else - { - QCoreApplication::processEvents(); - } - } - if ( socket.state() == QAbstractSocket::ConnectedState ) - { - socket.disconnectFromHost(); - } - while ( socket.state() != QAbstractSocket::UnconnectedState ) QCoreApplication::processEvents(); - return( data ); -} // Client::lrange - - -/*! - * \brief Переключает сокет на работу с определенной БД - * \param socket Сокет - * \param database Номер БД - * \return Признак успешной операции - */ -bool Client::select( QTcpSocket* socket, int database ) -{ - socket->write( array( { "SELECT", database } ) ); - QByteArray buffer; - while ( socket->state() == QAbstractSocket::ConnectedState ) - { - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - return( buffer == "+OK\r\n" ); - } - else - { - QCoreApplication::processEvents(); - } - } - return( false ); -} // Client::select - - -/*! - * \brief Настраивает сокет для подключения к БД - * \param socket Сокет TCP - * \param database База днных - * \param host Адрес хоста redis - * \param port Номер порта redis - * \return Признак удачной настройки - */ -bool Client::setupSocket( QTcpSocket* socket, int database, const QString& host, quint16 port ) -{ - static constexpr int timeout { 3000 }; - QElapsedTimer timer; - timer.start(); - do { - if ( socket->state() == QAbstractSocket::UnconnectedState ) - { - socket->connectToHost( host, port ); - } - QCoreApplication::processEvents(); - } while( socket->state() != QAbstractSocket::ConnectedState && timer.elapsed() < timeout ); - if ( socket->state() != QAbstractSocket::ConnectedState ) - { - return( false ); - } - if ( ( database != 0 ) && !select( socket, database ) ) - { - return( false ); - } - return( true ); -} // Client::setupSocket - - -/*! - * \brief Запускает на выполнение сохраненный на стороне сервера lua скрипт - * \param sha SHA сохраненного скрипта - * \param keys Ключи для выполнения скрипта - * \param argv Аргументы для выполнения скрипта - * \return Части результата выполнения скрипта (возвращенного значения) - */ -QByteArrayList Client::evalSha( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv ) -{ - if ( QThread::currentThread() != thread() ) - { - qDebug() << "no redis connection for caller thread"; - return {}; - } - Session instance { _sessionCount }; - auto socket = session( rwTimeout() ); - if ( socket == nullptr ) - { - return {}; - } - socket->write( array( QVariantList { "EVALSHA", sha, keys.size() } + keys + argv ) ); - QByteArray buffer; - while ( socket->state() == QAbstractSocket::ConnectedState ) - { - if ( socket->bytesAvailable() > 0 ) - { - buffer.append( socket->readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( buffer.startsWith( "-ERR" ) || buffer.startsWith( "-NOSCRIPT" ) ) - { - qWarning() << "evaluating redis script error:" << buffer; - return {}; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - return( parts ); - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::evalSha - - -/*! - * \brief Запускает на выполнение сохраненный на стороне сервера lua скрипт - * - * Статический вариант - * \param sha SHA сохраненного скрипта - * \param keys Ключи для выполнения скрипта - * \param argv Аргументы для выполнения скрипта - * \param database Идентификатор БД - * \param host Адрес сервера redis - * \param port Номер порта - * \return Части результата выполнения скрипта (возвращенного значения) - */ -QByteArrayList Client::evalSha( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv, - int database, const QString& host, quint16 port ) -{ - QTcpSocket socket; - if ( !setupSocket( &socket, database, host, port ) ) - { - return {}; - } - socket.write( array( QVariantList { "EVALSHA", sha, keys.size() } + keys + argv ) ); - QByteArray buffer; - while ( socket.state() == QAbstractSocket::ConnectedState ) - { - if ( socket.bytesAvailable() > 0 ) - { - buffer.append( socket.readAll() ); - if ( !buffer.endsWith( C::Separator ) ) - { - continue; - } - if ( buffer.startsWith( "-ERR" ) || buffer.startsWith( "-NOSCRIPT" ) ) - { - qWarning() << "evaluating redis script error:" << buffer; - return {}; - } - int splitPos; - auto parts = split( buffer, &splitPos ); - if ( splitPos < buffer.length() ) - { - continue; - } - return( parts ); - } - else - { - QCoreApplication::processEvents(); - } - } - return {}; -} // Client::evalSha - - -/*! - * \brief Запускает на выполнение сохраненный на стороне сервера lua скрипт - * - * Специализация для типа QJsonArray - * \param sha SHA сохраненного скрипта - * \param keys Ключи для выполнения скрипта - * \param argv Аргументы для выполнения скрипта - * \return Части результата выполнения скрипта (возвращенного значения) - */ -template<> -QJsonArray Client::evalSha< QJsonArray >( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv ) -{ - auto parts = evalSha( sha, keys, argv ); - return( parts.empty() ? QJsonArray {} : QJsonDocument::fromJson( parts.front() ).array() ); -} - - -/*! - * \brief Запускает на выполнение сохраненный на стороне сервера lua скрипт - * - * Специализация для типа QJsonArray, static вариант - * \param sha SHA сохраненного скрипта - * \param keys Ключи для выполнения скрипта - * \param argv Аргументы для выполнения скрипта - * \param database Идентификатор БД - * \param host Адрес сервера - * \param port Номер порта - * \return Части результата выполнения скрипта (возвращенного значения) - */ -template<> -QJsonArray Client::evalSha< QJsonArray >( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv, - int database, const QString& host, quint16 port ) -{ - auto parts = evalSha( sha, keys, argv, database, host, port ); - return( parts.empty() ? QJsonArray {} : QJsonDocument::fromJson( parts.front() ).array() ); -} - - -/*! - * \brief Запускает на выполнение сохраненный на стороне сервера lua скрипт - * - * Специализация для типа QVariant - * \param sha SHA сохраненного скрипта - * \param keys Ключи для выполнения скрипта - * \param argv Аргументы для выполнения скрипта - * \return Части результата выполнения скрипта (возвращенного значения) - */ -template<> -QVariant Client::evalSha< QVariant >( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv ) -{ - auto parts = evalSha( sha, keys, argv ); - return( parts.empty() ? QVariant {} : parts.front() ); -} - - -/*! - * \brief Запускает на выполнение сохраненный на стороне сервера lua скрипт - * - * Специализация для типа QVariantList - * \param sha SHA сохраненного скрипта - * \param keys Ключи для выполнения скрипта - * \param argv Аргументы для выполнения скрипта - * \return Части результата выполнения скрипта (возвращенного значения) - */ -template<> -QVariantList Client::evalSha< QVariantList >( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv ) -{ - auto parts = evalSha( sha, keys, argv ); - QVariantList data; - std::copy( parts.begin(), parts.end(), std::back_inserter( data ) ); - return( data ); -} - - -/*! - * \brief Запускает на выполнение сохраненный на стороне сервера lua скрипт - * - * Специализация для типа QVariantHash - * \param sha SHA сохраненного скрипта - * \param keys Ключи для выполнения скрипта - * \param argv Аргументы для выполнения скрипта - * \return Части результата выполнения скрипта (возвращенного значения) - */ -template<> -QVariantHash Client::evalSha< QVariantHash >( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv ) -{ - auto parts = evalSha( sha, keys, argv ); - QVariantHash data; - for ( int i = 1; i < parts.size(); i += 2 ) data[parts[i - 1]] = parts[i]; - return( data ); -} - - -/*! - * \brief Создает сокет для неоткрытой сессии - * \param Таймаут на установку соединения в мс - * \return Созданный сокет - */ -QTcpSocket* Client::session( int timeout ) -{ - if ( _sessionCount == 0 ) - { - return( nullptr ); - } - while ( _sessionCount > _session.size() ) - { - _session.push_back( new QTcpSocket { this } ); - connect( _session.back(), &QIODevice::readyRead, this, &Client::onReadyRead ); - connect( _session.back(), &QIODevice::bytesWritten, this, &Client::onBytesWritten ); - } - auto socket = _session[_sessionCount - 1]; - QElapsedTimer timer; - timer.start(); - while ( socket->state() != QAbstractSocket::ConnectedState ) - { - if ( connectionFlags() != kConnecting ) - { - setConnectionFlags( kConnecting ); - emit connectionStateChanged( connectionFlags() ); - } - if ( timer.elapsed() > timeout ) - { - setConnectionFlags( kDisconnected ); - emit connectionStateChanged( connectionFlags() ); - return( nullptr ); - } - if ( socket->state() == QAbstractSocket::UnconnectedState ) - { - socket->connectToHost( host(), port() ); - } - QCoreApplication::processEvents(); - if ( ( _database != 0 ) && !select( socket, _database ) ) - { - setConnectionFlags( kDisconnected ); - emit connectionStateChanged( connectionFlags() ); - return( nullptr ); - } - } - auto iter = _outOfDatabase.find( socket ); - if ( iter != _outOfDatabase.end() ) - { - if ( select( socket, _database ) ) - { - _outOfDatabase.erase( iter ); - } - else - { - setConnectionFlags( kDisconnected ); - emit connectionStateChanged( connectionFlags() ); - return( nullptr ); - } - } - if ( !( isConnected() ) ) - { - setConnectionFlags( kConnected ); - emit connectionStateChanged( connectionFlags() ); - } - return( socket ); -} // Client::session - - -/*! - * \brief Конструктор класса - * \param count Счетчик открытых сессий - */ -Client::Session::Session( uint& count ) : - _count{ count } -{ - _count++; -} - - -/*! - * \brief Деструктор класса - */ -Client::Session::~Session() -{ - _count--; -} - -} // namespace redis - -} // namespace myx - -#endif // ifndef MYX_REDIS_CLIENT_INL_HPP_ diff --git a/src/myx/redis/client.cpp b/src/myx/redis/client.cpp deleted file mode 100644 index e825f87..0000000 --- a/src/myx/redis/client.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#ifndef MYXLIB_BUILD_LIBRARIES -#error Define MYXLIB_BUILD_LIBRARIES to compile this file. -#endif - -#include diff --git a/src/myx/redis/client.hpp b/src/myx/redis/client.hpp deleted file mode 100644 index 38399eb..0000000 --- a/src/myx/redis/client.hpp +++ /dev/null @@ -1,157 +0,0 @@ -#ifndef MYX_REDIS_CLIENT_HPP_ -#define MYX_REDIS_CLIENT_HPP_ - -#pragma once - -#include - -#include -#include -#include -#include - -// QT_FORWARD_DECLARE_CLASS( QTcpSocket ) - -namespace myx { - -namespace redis { - -/*! - * \brief Реализует клиентскую часть в соответствии с RESP - */ -class Client : public Base -{ - Q_OBJECT -public: - Client( int database = 0, QObject* parent = nullptr ); - virtual ~Client(); - int database() const; - int exists( const QStringList& keys ); - void select( int database ); - bool flushdb(); - bool del( const QStringList& keys ); - bool expire( const QString& key, int secs ); - bool pexpire( const QString& key, int msecs ); - bool persist( const QString& key ); - bool publish( const QString& channel, const QByteArray& message ); - static bool publish( const QString& channel, const QByteArray& message, - const QString& host, quint16 port = 6379 ); - bool set( const QString& key, const QVariant& value ); - static bool set( const QString& key, const QVariant& value, - int database, const QString& host = "127.0.0.1", quint16 port = 6379 ); - QByteArray scriptLoad( const QByteArray& script ); - QByteArray get( const QString& key ); - template< typename T > - T get( const QString& key ); - static QByteArray get( const QString& key, int database, - const QString& host = "127.0.0.1", quint16 port = 6379 ); - template< typename T > - static T get( const QString& key, int database, - const QString& host = "127.0.0.1", quint16 port = 6379 ); - bool hdel( const QString& redisKey, const QStringList& hashKeys ); - bool hset( const QString& key, const QString& field, const QVariant& value ); - bool hmset( const QString& key, const QVariantHash& hash ); - static bool hmset( const QString& key, const QVariantHash& hash, int database, - const QString& host = "127.0.0.1", quint16 port = 6379 ); - QVariantHash hgetall( const QString& key ); - static QVariantHash hgetall( const QString& key, int database, - const QString& host = "127.0.0.1", quint16 port = 6379 ); - QByteArray hget( const QString& redisKey, const QString& hashKey ); - static QByteArray hget( const QString& redisKey, const QString& hashKey, - int database, const QString& host = "127.0.0.1", quint16 port = 6379 ); - bool rpush( const QString& key, const QVariantList& values ); - bool lpush( const QString& key, const QVariantList& values ); - bool lset( const QString& key, int index, const QVariant& value ); - bool lrem( const QString& key, const QVariant& value, int count = 1 ); - bool ltrim( const QString& key, int from, int to ); - int llen( const QString& key ); - QVariantList lrange( const QString& key, int from, int to ); - template< typename T > - std::vector< T > lrange( const QString& key ); - static QByteArrayList lrange( const QString& key, int from, int to, int database, - const QString& host = "127.0.0.1", quint16 port = 6379 ); - template< typename T > - static std::vector< T > lrange( const QString& key, int from, int to, int database, - const QString& host = "127.0.0.1", quint16 port = 6379 ); - bool sadd( const QString& key, const QVariantList& values ); - bool srem( const QString& key, const QVariantList& values ); - bool sismember( const QString& key, const QVariant& value ); - static bool sismember( const QString& key, const QVariant& value, int database, - const QString& host = "127.0.0.1", quint16 port = 6379 ); - int scard( const QString& key ); - QVariantList smembers( const QString& key ); - template< typename T > - std::vector< T > smembers( const QString& key ); - QByteArray rpop( const QString& key ); - QByteArray lindex( const QString& key, int index ); - QStringList keys( const QString& pattern = {} ); - static QStringList keys( const QString& pattern, int database, - const QString& host, quint16 port = 6379 ); - template< typename T > - T evalSha( const QByteArray& sha, const QVariantList& keys, const QVariantList& argv ); - template< typename T > - static T evalSha( const QByteArray& sha, const QVariantList& keys, const QVariantList& argv, - int database, const QString& host, quint16 port = 6379 ); - QByteArrayList evalSha( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv ); - static QByteArrayList evalSha( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv, - int database, const QString& host, quint16 port = 6379 ); - -private: - /*! - * \brief Класс для синхронизации порядка чтения/записи в poolSocket; - */ - struct Session - { - Session( uint& count ); - ~Session(); - -private: - uint& _count; //!< Счетчик открытых сессий - }; - - int _database; //!< Номер БД в redis - uint _sessionCount { 0 }; //!< Счетчик открытых сессий - std::vector< QTcpSocket* > _session; //!< Сокеты для синхронных сессий чтения/записи - /*! - * \brief Содержит TCP соединения, связанные с БД, отличной от текущей - */ - std::unordered_set< QTcpSocket* > _outOfDatabase; - - static bool setupSocket( QTcpSocket* socket, int database, const QString& host, quint16 port ); - static bool select( QTcpSocket* socket, int database ); - QTcpSocket* session( int timeout ); -}; // class Client - -template<> -QVariant Client::evalSha< QVariant >( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv ); -template<> -QVariantList Client::evalSha< QVariantList >( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv ); -template<> -QVariantHash Client::evalSha< QVariantHash >( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv ); -template<> -QJsonArray Client::evalSha< QJsonArray >( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv ); -template<> -QJsonArray Client::evalSha< QJsonArray >( const QByteArray& sha, const QVariantList& keys, - const QVariantList& argv, - int database, const QString& host, quint16 port ); -template<> -std::vector< QJsonObject > Client::lrange< QJsonObject >( const QString & key, int from, int to, - int database, const QString & host, quint16 ); - -} // namespace redis - -} // namespace myx - -#ifdef MYXLIB_HEADER_ONLY -#include "client-inl.hpp" -#endif - -// #include "client_redis.tcc" - -#endif // ifndef MYX_REDIS_CLIENT_HPP_ diff --git a/src/myx/redis/containers-inl.hpp b/src/myx/redis/containers-inl.hpp deleted file mode 100644 index ca15727..0000000 --- a/src/myx/redis/containers-inl.hpp +++ /dev/null @@ -1,250 +0,0 @@ -#ifndef MYX_REDIS_CONTAINERS_INL_HPP_ -#define MYX_REDIS_CONTAINERS_INL_HPP_ - -#pragma once - -#ifndef MYXLIB_HEADER_ONLY -#include -#endif - -#include -#include - -namespace myx { - -namespace redis { - -namespace constants { - -static const QByteArray Separator { "\r\n" }; //!< Строковый разделитель слов во фразе - -} // namespace constants - -namespace C = constants; - -/*! - * \brief Создаёт часть команды - * \param value Значение для создании части команды - * \return Часть команды - */ -MYXLIB_INLINE QByteArray part( const QVariant& value ) -{ - auto bytes = value.toByteArray(); - return( "$" + QByteArray::number( bytes.length() ) + C::Separator + bytes ); -} - - -/*! - * \brief Составляет RESP массив - * \param parts Части для составления массива - * \return RESP массив - */ -MYXLIB_INLINE QByteArray array( const QVariantList& parts ) -{ - QByteArrayList data; - std::transform( parts.cbegin(), parts.cend(), std::back_inserter( data ), - []( const QVariant& iter ) { return( part( iter ) ); } ); - data.prepend( "*" + QByteArray::number( data.size() ) ); - return( data.join( C::Separator ) + C::Separator ); -} - - -/*! - * \brief Разбивает буфер данных на массив частей - * \param buffer Входной буфер данных - * \param splitLength Длина разобранной части буфера - * \return Массив частей-строк - */ -MYXLIB_INLINE QByteArrayList split( const QByteArray& buffer, int* splitLength ) -{ - enum Token - { - kUndefined, - kFetchNumber, - kFetchPart, - }; - - if ( ( buffer == "$-1\r\n" ) || ( buffer == "$*0\r\n" ) || buffer.isEmpty() ) - { - if ( splitLength != nullptr ) - { - *splitLength = buffer.length(); - } - return {}; - } - if ( buffer.startsWith( "-WRONG" ) ) - { - if ( splitLength != nullptr ) - { - *splitLength = buffer.endsWith( "\r\n" ) ? buffer.length() : 0; - } - return {}; - } - if ( splitLength != nullptr ) - { - *splitLength = 0; - } - if ( !buffer.endsWith( C::Separator ) ) - { - return {}; - } - int size { 1 }; - int pos { 0 }; - QByteArray partLength; - if ( buffer[pos] == '*' ) - { - pos++; - QByteArray text; - while ( QChar( buffer[pos] ).isNumber() && pos < buffer.size() ) - { - text += buffer[pos++]; - } - if ( pos == buffer.size() ) - { - return {}; - } - size = text.toInt(); - pos += C::Separator.length(); - } - QByteArrayList parts; - Token token { kUndefined }; - while ( parts.size() < size && pos < buffer.length() ) - { - switch ( token ) - { - case kUndefined: - switch ( buffer[pos] ) - { - case '$': - partLength.clear(); - token = kFetchPart; - pos++; - break; - case ':': - partLength.clear(); - token = kFetchNumber; - pos++; - break; - default: - return {}; - } - break; - case kFetchNumber: - if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) ) - { - partLength += buffer[pos]; - pos++; - } - else - { - parts.push_back( partLength ); - token = kUndefined; - pos += C::Separator.length(); - } - break; - case kFetchPart: - { - if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) ) - { - partLength += buffer[pos]; - pos++; - } - else - { - int length = partLength.toInt(); - if ( length < 0 ) - { - parts.push_back( "" ); - pos += C::Separator.length(); - } - else - { - pos += C::Separator.length(); - if ( pos + length > buffer.length() ) - { - return {}; - } - parts.push_back( buffer.mid( pos, length ) ); - pos += length + C::Separator.length(); - } - token = kUndefined; - } - break; - } - } // switch - } - // может быть ситуация, когда в буфер не дочитан последний \r\n, - // но все части были вычитаны - if ( ( parts.size() == size ) && ( pos <= buffer.length() ) ) - { - if ( splitLength != nullptr ) - { - *splitLength = pos; - } - return( parts ); - } - return {}; -} // split - - -/*! - * \brief Запаковывает сообщение для передачи через систему pubsub - * \param hash Таблица значений - * \return Запакованное сообщение - */ -MYXLIB_INLINE QByteArray pack_hash( const QVariantHash& hash ) -{ - QVariantList parts; - for ( auto iter = hash.cbegin(); iter != hash.cend(); ++iter ) - { - parts << iter.key() << iter.value(); - } - return( array( parts ) ); -} - - -/*! - * \brief Распаковывает кодограмму в таблицу значений - * \param message Сообщение - * \return Таблица значений - */ -MYXLIB_INLINE QVariantHash unpack_hash( const QByteArray& message ) -{ - auto parts = split( message ); - QVariantHash hash; - for ( int i = 1; i < parts.size(); i += 2 ) - { - hash[parts[i - 1]] = parts[i]; - } - return( hash ); -} - - -/*! - * \brief Запаковывает сообщение для передачи через систему pubsub - * \param hash Список значений - * \return Запакованное сообщение - */ -MYXLIB_INLINE QByteArray pack_list( const QVariantList& list ) -{ - return( array( list ) ); -} - - -/*! - * \brief Распаковывает кодограмму в список значений - * \param message Сообщение - * \return Список значений - */ -MYXLIB_INLINE QVariantList unpack_list( const QByteArray& message ) -{ - auto data = split( message ); - QVariantList list; - std::copy( data.begin(), data.end(), std::back_inserter( list ) ); - return( list ); -} - -} // namespace redis - -} // namespace myx -#endif // ifndef MYX_REDIS_CONTAINERS_INL_HPP_ diff --git a/src/myx/redis/containers.cpp b/src/myx/redis/containers.cpp deleted file mode 100644 index 1a0358d..0000000 --- a/src/myx/redis/containers.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#ifndef MYXLIB_BUILD_LIBRARIES -#error Define MYXLIB_BUILD_LIBRARIES to compile this file. -#endif - -#include diff --git a/src/myx/redis/containers.hpp b/src/myx/redis/containers.hpp deleted file mode 100644 index fd462e7..0000000 --- a/src/myx/redis/containers.hpp +++ /dev/null @@ -1,100 +0,0 @@ -#ifndef MYX_REDIS_CONTAINERS_HPP_ -#define MYX_REDIS_CONTAINERS_HPP_ - -#pragma once - -#include - -#include -#include - -namespace myx { - -namespace redis { - -QByteArray array( const QVariantList& parts ); -QByteArrayList split( const QByteArray& buffer, int* splitLength = nullptr ); -// QByteArray part( const QVariant& value ); -// QByteArray packHash( const QVariantHash& hash ); -// QVariantHash unpackHash( const QByteArray& message ); -// QByteArray packList( QByteArrayList list ); -// QByteArray packList( const QVariantList& list ); -QVariantList unpack_list( const QByteArray& message ); -// template< typename T > -// QByteArray packPair( T first, T second ); -// template< typename T1, typename T2 > -// QByteArray packPair( T1 first, T2 second ); -// template< typename T > -// std::pair< T, T > unpackPair( const QByteArray& message ); -// template< typename T1, typename T2 > -// std::pair< T1, T2 > unpackPair( const QByteArray& message ); - - -/*! - * \brief Запаковывает пару значений в сообщение - * \param first Первое значение - * \param second Второе значение - * \return Текстовое сообщение - */ -template< typename T > -QByteArray pack_pair( T first, T second ) -{ - return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) ); -} - - -/*! - * \brief Запаковывает пару значений в сообщение - * \param first Первое значение - * \param second Второе значение - * \return Текстовое сообщение - */ -template< typename T1, typename T2 > -QByteArray pack_pair( T1 first, T2 second ) -{ - return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) ); -} - - -/*! - * \brief Распаковывает сообщение в пару однотипных значений - * \param pair Текст сообщения - * \return Пара значений - */ -template< typename T > -std::pair< T, T > unpack_pair( const QByteArray& message ) -{ - auto list = unpack_list( message ); - if ( list.size() == 2 ) - { - return { QVariant( list.first() ).value< T >(), QVariant( list.last() ).value< T >() }; - } - return {}; -} - - -/*! - * \brief Распаковывает сообщение в пару однотипных значений - * \param pair Текст сообщения - * \return Пара значений - */ -template< typename T1, typename T2 > -std::pair< T1, T2 > unpack_pair( const QByteArray& message ) -{ - auto list = unpack_list( message ); - if ( list.size() == 2 ) - { - return { QVariant( list.first() ).value< T1 >(), QVariant( list.last() ).value< T2 >() }; - } - return {}; -} - -} // namespace redis - -} // namespace myx - -#ifdef MYXLIB_HEADER_ONLY -#include "containers-inl.hpp" -#endif - -#endif // ifndef MYX_REDIS_CONTAINERS_HPP_ diff --git a/src/myx/redis/pubsub-inl.hpp b/src/myx/redis/pubsub-inl.hpp deleted file mode 100644 index 5692045..0000000 --- a/src/myx/redis/pubsub-inl.hpp +++ /dev/null @@ -1,580 +0,0 @@ -#ifndef MYX_REDIS_PUBSUB_INL_HPP_ -#define MYX_REDIS_PUBSUB_INL_HPP_ - -#pragma once - -#ifndef MYXLIB_HEADER_ONLY -#include -#endif - -#include - -#include -#include -#include -#include -#include - -namespace myx { - -namespace redis { - -/*! - * \brief Конструктор класса - * \param parent Родительский объект для m_socket - */ -MYXLIB_INLINE PubSub::PubSub( QObject* parent ) : - Base { parent }, - m_socket ( new QTcpSocket( this ) ), - m_connectionTimer( new QTimer( this ) ) - -{ - connect( m_socket, &QAbstractSocket::stateChanged, this, &PubSub::onSocketStateChanged ); - connect( m_socket, &QIODevice::readyRead, this, &PubSub::read ); - connect( m_socket, &QIODevice::readyRead, this, &PubSub::onReadyRead ); - connect( m_socket, &QIODevice::bytesWritten, this, &PubSub::onBytesWritten ); - - connect( m_socket, static_cast< void ( QAbstractSocket::* )( QAbstractSocket::SocketError ) > - ( &QAbstractSocket::error ), this, [this]() { - qDebug() << QDateTime::currentDateTimeUtc().toString( QStringLiteral( "hh:mm:ss:zzz" ) ) - << QByteArray( "redis pubsub connection error" ) << m_socket->errorString(); - } ); - - m_connectionTimer->setInterval( connectionTimeout() ); - m_connectionTimer->setSingleShot( true ); - connect( m_connectionTimer, &QTimer::timeout, this, [this]() { - m_socket->connectToHost( host(), port() ); - } ); -} - - -MYXLIB_INLINE int PubSub::connectionTimeout() const -{ - return( kConnectionTimeout ); -} - - -/*! - * \brief Инициирует работу с БД - */ -MYXLIB_INLINE void PubSub::start() -{ - Q_EMIT disconnected(); - Q_EMIT connectionStateChanged( connectionFlags() ); - m_socket->connectToHost( host(), port() ); -} - - -/*! - * \brief Завершает работу с БД - */ -MYXLIB_INLINE void PubSub::stop() -{ - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->disconnectFromHost(); - while ( m_socket->state() != QAbstractSocket::UnconnectedState ) - { - QCoreApplication::processEvents(); - } - } -} - - -/*! - * \brief Создает подписку на канал сообщений - * \param channel Канал подписки - * \param subscriber Объект-подписчик - * \param method Имя метода, вызываемого при изменении получении серверного сообщения - */ -MYXLIB_INLINE void PubSub::subscribe( const QString& channel, QObject* subscriber, const char* method ) -{ - if ( QThread::currentThread() != thread() ) - { - qWarning() << "no redis connection for caller thread"; - return; - } - auto iter = m_subscribers.find( channel ); - if ( iter == m_subscribers.end() ) - { - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->write( array( { "SUBSCRIBE", channel } ) ); - } - m_subscribers[channel].emplace_back( subscriber, method ); - } - else - { - auto value = iter.value(); - auto it = std::find_if( value.begin(), value.end(), - [subscriber, method]( std::pair< QObject*, const char* > p ) { - return( p.first == subscriber && strcmp( p.second, method ) == 0 ); - } ); - if ( it == iter.value().end() ) - { - iter.value().emplace_back( subscriber, method ); - connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed, - Qt::UniqueConnection ); - } - } -} // PubSub::subscribe - - -/*! - * \brief Создает подписку на канал изменения данных - * \param channel Канал подписки - * \param subscriber Объект-подписчик - * \param method Имя метода, вызываемого при изменении получении серверного сообщения - * \param database Номер БД - */ -MYXLIB_INLINE void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database ) -{ - if ( QThread::currentThread() != thread() ) - { - qWarning() << "no redis connection for caller thread"; - return; - } - auto iter = m_subscribers.find( channel ); - if ( iter == m_subscribers.end() ) - { - channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->write( array( { "SUBSCRIBE", channel } ) ); - } - m_subscribers[channel].emplace_back( subscriber, method ); - } - else - { - auto value = iter.value(); - auto it = std::find_if( value.begin(), value.end(), - [subscriber, method]( std::pair< QObject*, const char* > p ) { - return( p.first == subscriber && strcmp( p.second, method ) == 0 ); - } ); - if ( it == iter.value().end() ) - { - iter.value().emplace_back( subscriber, method ); - connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed, - Qt::UniqueConnection ); - } - } -} // PubSub::subscribe - - -/*! - * \brief Создает подписку на шаблон канала сообщений - * \param channel Шаблон канала подписки - * \param subscriber Объект-подписчик - * \param method Имя метода, вызываемого при изменении получении серверного сообщения - */ -MYXLIB_INLINE void PubSub::psubscribe( const QString& channel, QObject* subscriber, const char* method ) -{ - if ( QThread::currentThread() != thread() ) - { - qWarning() << "no redis connection for caller thread"; - return; - } - auto iter = m_psubscribers.find( channel ); - if ( iter == m_psubscribers.end() ) - { - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->write( array( { "PSUBSCRIBE", channel } ) ); - } - m_psubscribers[channel].emplace_back( subscriber, method ); - } - else - { - auto value = iter.value(); - auto it = std::find_if( value.begin(), value.end(), - [subscriber, method]( std::pair< QObject*, const char* > p ) { - return( p.first == subscriber && strcmp( p.second, method ) == 0 ); - } ); - if ( it == iter.value().end() ) - { - iter.value().emplace_back( subscriber, method ); - connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed, - Qt::UniqueConnection ); - } - } -} // PubSub::psubscribe - - -/*! - * \brief Создает подписку на шаблон канала изменения данных - * \param channel Шаблон канала подписки - * \param subscriber Объект-подписчик - * \param method Имя метода, вызываемого при изменении получении серверного сообщения - * \param database Номер БД (-1 для всех) - */ -MYXLIB_INLINE void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database ) -{ - if ( QThread::currentThread() != thread() ) - { - qWarning() << "no redis connection for caller thread"; - return; - } - auto iter = m_psubscribers.find( channel ); - if ( iter == m_psubscribers.end() ) - { - if ( database == -1 ) - { - channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() ); - } - else - { - channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); - } - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->write( array( { "PSUBSCRIBE", channel } ) ); - } - m_psubscribers[channel].emplace_back( subscriber, method ); - } - else - { - auto value = iter.value(); - auto it = std::find_if( value.begin(), value.end(), - [subscriber, method]( std::pair< QObject*, const char* > p ) { - return( p.first == subscriber && strcmp( p.second, method ) == 0 ); - } ); - if ( it == iter.value().end() ) - { - iter.value().emplace_back( subscriber, method ); - connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed, - Qt::UniqueConnection ); - } - } -} // PubSub::psubscribe - - -/*! - * \brief Отписывает получателя от канала - * \param channel Имя канала - * \param subscriber Подписчик на канал - * \param method Метод обработки данных - * \param keyEvents Признак подписки на изменение данных - */ -MYXLIB_INLINE void PubSub::unsubscribe( const QString& channel, QObject* subscriber, const char* method ) -{ - if ( QThread::currentThread() != thread() ) - { - qWarning() << "no redis connection for caller thread"; - return; - } - auto iter = m_subscribers.find( channel ); - if ( iter != m_subscribers.end() ) - { - auto ctx = std::find_if( iter->begin(), iter->end(), - [subscriber, method]( std::pair< QObject*, const char* > value ) { - return( subscriber == value.first && strcmp( method, value.second ) == 0 ); - } ); - if ( ctx != iter->end() ) - { - iter->erase( ctx ); - } - if ( iter->empty() ) - { - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->write( array( { "UNSUBSCRIBE", channel } ) ); - } - m_subscribers.remove( channel ); - } - } -} // PubSub::unsubscribe - - -/*! - * \brief Отписывает получателя от канала - * \param channel Имя канала - * \param subscriber Подписчик на канал - * \param method Метод обработки данных - * \param keyEvents Признак подписки на изменение данных - */ -MYXLIB_INLINE void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database ) -{ - if ( QThread::currentThread() != thread() ) - { - qWarning() << "no redis connection for caller thread"; - return; - } - if ( database == -1 ) - { - channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() ); - } - else - { - channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); - } - auto iter = m_subscribers.find( channel ); - if ( iter != m_subscribers.end() ) - { - auto ctx = std::find_if( iter->begin(), iter->end(), - [subscriber, method]( std::pair< QObject*, const char* > value ) { - return( subscriber == value.first && strcmp( method, value.second ) == 0 ); - } ); - if ( ctx != iter->end() ) - { - iter->erase( ctx ); - } - if ( iter->empty() ) - { - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->write( array( { "UNSUBSCRIBE", channel } ) ); - } - m_subscribers.remove( channel ); - } - } -} // PubSub::unsubscribe - - -/*! - * \brief Отписывает получателя от шаблона канала - * \param channel Шаблон канала - * \param subscriber Подписчик на канал - * \param method Метод обработки данных - * \param keyEvents Признак подписки на изменение данных - */ -MYXLIB_INLINE void PubSub::punsubscribe( const QString& channel, QObject* subscriber, - const char* method ) -{ - if ( QThread::currentThread() != thread() ) - { - qWarning() << "no redis connection for caller thread"; - return; - } - auto iter = m_psubscribers.find( channel ); - if ( iter != m_psubscribers.end() ) - { - auto ctx = std::find_if( iter->begin(), iter->end(), - [subscriber, method]( std::pair< QObject*, const char* > value ) { - return( subscriber == value.first && strcmp( method, value.second ) == 0 ); - } ); - if ( ctx != iter->end() ) - { - iter->erase( ctx ); - } - if ( iter->empty() ) - { - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->write( array( { "PUNSUBSCRIBE", channel } ) ); - } - m_psubscribers.remove( channel ); - } - } -} // PubSub::punsubscribe - - -/*! - * \brief Отписывает получателя от шаблона канала - * \param channel Шаблон канала - * \param subscriber Подписчик на канал - * \param method Метод обработки данных - * \param keyEvents Признак подписки на изменение данных - */ -MYXLIB_INLINE void PubSub::punsubscribe( QString channel, QObject* subscriber, - const char* method, int database ) -{ - if ( QThread::currentThread() != thread() ) - { - qWarning() << "no redis connection for caller thread"; - return; - } - if ( database == -1 ) - { - channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() ); - } - else - { - channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); - } - auto iter = m_psubscribers.find( channel ); - if ( iter != m_psubscribers.end() ) - { - auto ctx = std::find_if( iter->begin(), iter->end(), - [subscriber, method]( std::pair< QObject*, const char* > value ) { - return( subscriber == value.first && strcmp( method, value.second ) == 0 ); - } ); - if ( ctx != iter->end() ) - { - iter->erase( ctx ); - } - if ( iter->empty() ) - { - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->write( array( { "PUNSUBSCRIBE", channel } ) ); - } - m_psubscribers.remove( channel ); - } - } -} // PubSub::punsubscribe - - -/*! - * \brief Разбирает поступившие от сервера уведомления и вызывает требуемые - * метаметоды обработки - */ -MYXLIB_INLINE void PubSub::read() -{ - int splitLength = 0; - while ( m_socket->bytesAvailable() > 0 ) - { - m_buffer += m_socket->readAll(); - while ( !m_buffer.isEmpty() ) - { - auto parts = split( m_buffer, &splitLength ); - if ( parts.empty() ) - { - break; - } - if ( ( parts[0] == "pmessage" ) && ( parts.size() == 4 ) ) - { - auto iter = m_psubscribers.find( parts[1] ); - if ( iter != m_psubscribers.end() ) - { - for ( const auto& i: iter.value() ) - { - QMetaObject::invokeMethod( i.first, i.second, - Q_ARG( QString, parts[2] ), - Q_ARG( QByteArray, parts[3] ) ); - } - } - } - else - { - if ( ( parts[0] == "message" ) && ( parts.size() == 3 ) ) - { - auto iter = m_subscribers.find( parts[1] ); - if ( iter != m_subscribers.end() ) - { - for ( const auto& i: iter.value() ) - { - QMetaObject::invokeMethod( i.first, i.second, - Q_ARG( QString, parts[1] ), - Q_ARG( QByteArray, parts[2] ) ); - } - } - } - } - m_buffer.remove( 0, splitLength ); - } - } -} // PubSub::read - - -/*! - * \brief Обработка изменения состояния сокета - */ -MYXLIB_INLINE void PubSub::onSocketStateChanged() -{ - switch ( m_socket->state() ) - { - case QAbstractSocket::ConnectedState: - for ( auto iter = m_psubscribers.cbegin(); iter != m_psubscribers.cend(); ++iter ) - { - m_socket->write( array( { "PSUBSCRIBE", iter.key() } ) ); - } - for ( auto iter = m_subscribers.cbegin(); iter != m_subscribers.cend(); ++iter ) - { - m_socket->write( array( { "SUBSCRIBE", iter.key() } ) ); - } - setConnectionFlags( kConnected ); - Q_EMIT connectionStateChanged( connectionFlags() ); - Q_EMIT connected(); - break; - case QAbstractSocket::UnconnectedState: - m_buffer.clear(); - m_connectionTimer->start(); - if ( !isConnected() ) - { - Q_EMIT disconnected(); - } - setConnectionFlags( kDisconnected ); - Q_EMIT connectionStateChanged( connectionFlags() ); - break; - default: - break; - } // switch -} // PubSub::onSocketStateChanged - - -/*! - * \brief При удалении подписчика уничтожает все его подписки - * \param subscriber Удаленный подписчик - */ -MYXLIB_INLINE void PubSub::onSubscriberDestroyed( QObject* subscriber ) -{ - for ( auto i = m_subscribers.begin(); i != m_subscribers.end(); ) - { - for ( auto j = i.value().begin(); j != i.value().end(); ) - { - if ( j->first == subscriber ) - { - j = i.value().erase( j ); - } - else - { - j++; - } - } - if ( i.value().empty() ) - { - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->write( array( { "UNSUBSCRIBE", i.key() } ) ); - } - i = m_subscribers.erase( i ); - } - else - { - i++; - } - } - - for ( auto i = m_psubscribers.begin(); i != m_psubscribers.end(); ) - { - for ( auto j = i.value().begin(); j != i.value().end(); ) - { - if ( j->first == subscriber ) - { - j = i.value().erase( j ); - } - else - { - j++; - } - } - if ( i.value().empty() ) - { - if ( m_socket->state() == QAbstractSocket::ConnectedState ) - { - m_socket->write( array( { "PUNSUBSCRIBE", i.key() } ) ); - } - i = m_psubscribers.erase( i ); - } - else - { - i++; - } - } -} // PubSub::onSubscriberDestroyed - - -/*! - * \brief Возвращает сокет - * \return Сокет - */ -MYXLIB_INLINE QTcpSocket* PubSub::socket() const -{ - return( m_socket ); -} - -} // namespace redis - -} // namespace myx - -#endif // ifndef MYX_REDIS_PUBSUB_INL_HPP_ diff --git a/src/myx/redis/pubsub.cpp b/src/myx/redis/pubsub.cpp deleted file mode 100644 index 527683c..0000000 --- a/src/myx/redis/pubsub.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#ifndef MYXLIB_BUILD_LIBRARIES -#error Define MYXLIB_BUILD_LIBRARIES to compile this file. -#endif - -#include diff --git a/src/myx/redis/pubsub.hpp b/src/myx/redis/pubsub.hpp deleted file mode 100644 index bdff020..0000000 --- a/src/myx/redis/pubsub.hpp +++ /dev/null @@ -1,87 +0,0 @@ -#ifndef MYX_REDIS_PUBSUB_HPP_ -#define MYX_REDIS_PUBSUB_HPP_ - -#pragma once - -#include - -#include - -QT_FORWARD_DECLARE_CLASS( QTcpSocket ) - -namespace myx { - -namespace redis { - -namespace constants { - -/// @brief Приставка для наблюдения за данными -static const QString KeySpacePrefix { QStringLiteral( "__key%1__:" ) }; - -} // namespace constants - -namespace C = constants; - -/*! - * \brief Управляет подпиской и публикацией сообщений - */ -class PubSub : public Base -{ - Q_OBJECT - -public: - explicit PubSub( QObject* parent = nullptr ); - QTcpSocket* socket() const; - int connectionTimeout() const; - - Q_SLOT void start(); - Q_SLOT void stop(); - Q_INVOKABLE void subscribe( const QString& channel, QObject* subscriber, const char* method ); - Q_INVOKABLE void subscribe( QString channel, QObject* subscriber, - const char* method, int database ); - Q_INVOKABLE void psubscribe( const QString& channel, QObject* subscriber, const char* method ); - Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber, - const char* method, int database ); - Q_INVOKABLE void unsubscribe( const QString& channel, QObject* subscriber, const char* method ); - Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber, - const char* method, int database ); - Q_INVOKABLE void punsubscribe( const QString& channel, QObject* subscriber, const char* method ); - Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber, - const char* method, int database ); - /*! - * \brief Сигнализирует об установке соединения - */ - Q_SIGNAL void connected(); - /*! - * \brief Сигнализирует о разрыве соединения - */ - Q_SIGNAL void disconnected(); - -private: - using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >; - - static constexpr int kConnectionTimeout { 2000 }; //!< Таймаут на сетевые операции - QTcpSocket* m_socket; - QTimer* m_connectionTimer; //!< Таймер установления сетевого соединения - - /*! - * \brief Таблица подписчиков - */ - QByteArray m_buffer; //!< Буффер прочитанных данных - Sub m_subscribers; //!< Обработчики сообщений по каналу подписки - Sub m_psubscribers; //!< Обработчики сообщений по шаблону канала подписки - - Q_SLOT void read(); - Q_SLOT void onSocketStateChanged(); - Q_SLOT void onSubscriberDestroyed( QObject* subscriber ); -}; // class PubSub - -} // namespace redis - -} // namespace myx - -#ifdef MYXLIB_HEADER_ONLY -#include "pubsub-inl.hpp" -#endif - -#endif // ifndef MYX_REDIS_PUBSUB_HPP_