From 37fd015311f4ecacaeb9d17f66d90216f160f882 Mon Sep 17 00:00:00 2001 From: Andrey Astafyev Date: Sat, 25 Apr 2020 08:45:05 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=B5=D1=80=D0=B2=D0=BE=D0=B5=20=D0=BF?= =?UTF-8?q?=D1=80=D0=B8=D0=B1=D0=BB=D0=B8=D0=B6=D0=B5=D0=BD=D0=B8=D0=B5=20?= =?UTF-8?q?=D0=B4=D0=BB=D1=8F=20=D0=BD=D0=BE=D0=B2=D0=BE=D0=B9=20=D0=B1?= =?UTF-8?q?=D0=B8=D0=B1=D0=BB=D0=B8=D0=BE=D1=82=D0=B5=D0=BA=D0=B8=20Redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 2 +- cmake/cmlib | 2 +- .../01_client/CMakeLists.txt | 0 .../{redis => redis.old}/01_client/client.cpp | 0 .../{redis => redis.old}/01_client/client.hpp | 0 examples/{redis => redis.old}/CMakeLists.txt | 0 src/myx/redis.old/CMakeLists.txt | 72 +++ src/myx/{redis => redis.old}/client-inl.hpp | 0 src/myx/{redis => redis.old}/client.cpp | 0 src/myx/{redis => redis.old}/client.hpp | 0 src/myx/{redis => redis.old}/lexer-inl.hpp | 0 src/myx/{redis => redis.old}/lexer.cpp | 0 src/myx/{redis => redis.old}/lexer.hpp | 0 src/myx/{redis => redis.old}/parser-inl.hpp | 0 src/myx/{redis => redis.old}/parser.cpp | 0 src/myx/{redis => redis.old}/parser.hpp | 0 src/myx/{redis => redis.old}/reply.hpp | 0 src/myx/{redis => redis.old}/request-inl.hpp | 0 src/myx/{redis => redis.old}/request.cpp | 0 src/myx/{redis => redis.old}/request.hpp | 0 src/myx/redis/CMakeLists.txt | 25 +- src/myx/redis/base-inl.hpp | 133 +++++ src/myx/redis/base.cpp | 5 + src/myx/redis/base.hpp | 69 +++ src/myx/redis/containers-inl.hpp | 233 ++++++++ src/myx/redis/containers.cpp | 5 + src/myx/redis/containers.hpp | 44 ++ src/myx/redis/containers.tpp | 68 +++ src/myx/redis/pubsub-inl.hpp | 504 ++++++++++++++++++ src/myx/redis/pubsub.cpp | 5 + src/myx/redis/pubsub.hpp | 75 +++ 31 files changed, 1226 insertions(+), 16 deletions(-) rename examples/{redis => redis.old}/01_client/CMakeLists.txt (100%) rename examples/{redis => redis.old}/01_client/client.cpp (100%) rename examples/{redis => redis.old}/01_client/client.hpp (100%) rename examples/{redis => redis.old}/CMakeLists.txt (100%) create mode 100644 src/myx/redis.old/CMakeLists.txt rename src/myx/{redis => redis.old}/client-inl.hpp (100%) rename src/myx/{redis => redis.old}/client.cpp (100%) rename src/myx/{redis => redis.old}/client.hpp (100%) rename src/myx/{redis => redis.old}/lexer-inl.hpp (100%) rename src/myx/{redis => redis.old}/lexer.cpp (100%) rename src/myx/{redis => redis.old}/lexer.hpp (100%) rename src/myx/{redis => redis.old}/parser-inl.hpp (100%) rename src/myx/{redis => redis.old}/parser.cpp (100%) rename src/myx/{redis => redis.old}/parser.hpp (100%) rename src/myx/{redis => redis.old}/reply.hpp (100%) rename src/myx/{redis => redis.old}/request-inl.hpp (100%) rename src/myx/{redis => redis.old}/request.cpp (100%) rename src/myx/{redis => redis.old}/request.hpp (100%) create mode 100644 src/myx/redis/base-inl.hpp create mode 100644 src/myx/redis/base.cpp create mode 100644 src/myx/redis/base.hpp create mode 100644 src/myx/redis/containers-inl.hpp create mode 100644 src/myx/redis/containers.cpp create mode 100644 src/myx/redis/containers.hpp create mode 100644 src/myx/redis/containers.tpp create mode 100644 src/myx/redis/pubsub-inl.hpp create mode 100644 src/myx/redis/pubsub.cpp create mode 100644 src/myx/redis/pubsub.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a7996a6..65f3340 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,7 +50,7 @@ add_subdirectory(src/myx/redis) if(MYXLIB_BUILD_EXAMPLES OR MYXLIB_BUILD_EXAMPLES_HO) add_subdirectory(examples/filesystem) add_subdirectory(examples/qt) - add_subdirectory(examples/redis) +# add_subdirectory(examples/redis) endif() # Документация diff --git a/cmake/cmlib b/cmake/cmlib index 1eaded9..2ea38c3 160000 --- a/cmake/cmlib +++ b/cmake/cmlib @@ -1 +1 @@ -Subproject commit 1eaded9031726f539cd5fd4a44d874d7bd8d458d +Subproject commit 2ea38c32c8d6aa7ccdbdf84cff1d0b0d50cc2fb3 diff --git a/examples/redis/01_client/CMakeLists.txt b/examples/redis.old/01_client/CMakeLists.txt similarity index 100% rename from examples/redis/01_client/CMakeLists.txt rename to examples/redis.old/01_client/CMakeLists.txt diff --git a/examples/redis/01_client/client.cpp b/examples/redis.old/01_client/client.cpp similarity index 100% rename from examples/redis/01_client/client.cpp rename to examples/redis.old/01_client/client.cpp diff --git a/examples/redis/01_client/client.hpp b/examples/redis.old/01_client/client.hpp similarity index 100% rename from examples/redis/01_client/client.hpp rename to examples/redis.old/01_client/client.hpp diff --git a/examples/redis/CMakeLists.txt b/examples/redis.old/CMakeLists.txt similarity index 100% rename from examples/redis/CMakeLists.txt rename to examples/redis.old/CMakeLists.txt diff --git a/src/myx/redis.old/CMakeLists.txt b/src/myx/redis.old/CMakeLists.txt new file mode 100644 index 0000000..a8fc417 --- /dev/null +++ b/src/myx/redis.old/CMakeLists.txt @@ -0,0 +1,72 @@ +# Название основной цели и имя библиотеки в текущем каталоге +set(TRGT redis) + +# cmake-format: off +# Список файлов исходных текстов +set(TRGT_cpp + ${CMAKE_CURRENT_SOURCE_DIR}/client.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/lexer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/parser.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/request.cpp) + +# Список заголовочных файлов (используется для установки) +set(TRGT_moc_hpp + ${CMAKE_CURRENT_SOURCE_DIR}/client.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/lexer.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/parser.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/request.hpp) + +set(TRGT_hpp + ${CMAKE_CURRENT_SOURCE_DIR}/reply.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/client-inl.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/lexer-inl.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/parser-inl.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/request-inl.hpp) + +set(TRGT_headers ${TRGT_moc_hpp} ${TRGT_hpp}) +# cmake-format: on + +qt5_wrap_cpp(TRGT_moc_cpp ${TRGT_moc_hpp}) + +add_library(${TRGT}-header-only INTERFACE) +target_include_directories( + ${TRGT}-header-only SYSTEM INTERFACE "$" + "$") + +if(MYXLIB_BUILD_LIBRARIES) + add_common_library(${TRGT} OUTPUT_NAME myx-${TRGT} SOURCES ${TRGT_moc_cpp} ${TRGT_cpp} ${TRGT_headers}) + common_target_properties(${TRGT}) + + # Создание цели для проверки утилитой clang-tidy + add_clang_tidy_check(${TRGT} ${TRGT_cpp} ${TRGT_headers}) + + # Создание цели для проверки утилитой clang-analyze + add_clang_analyze_check(${TRGT} ${TRGT_cpp} ${TRGT_headers}) + + # Создание цели для проверки утилитой clazy + add_clazy_check(${TRGT} ${TRGT_cpp} ${TRGT_headers}) + + # Создание цели для проверки утилитой pvs-studio + add_pvs_check(${TRGT}) + + # Создание цели для автоматического форматирования кода + add_format_sources(${TRGT} ${TRGT_cpp} ${TRGT_headers}) + + target_compile_definitions(${TRGT} PUBLIC MYXLIB_BUILD_LIBRARIES) + target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Core_INCLUDE_DIRS}) + target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Network_INCLUDE_DIRS}) + target_include_directories(${TRGT} SYSTEM PRIVATE ${CMAKE_SOURCE_DIR}/src) + + cotire(${TRGT}) + install(TARGETS ${TRGT}_static COMPONENT libs-dev ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) + if(BUILD_SHARED_LIBS) + install(TARGETS ${TRGT}_shared COMPONENT main LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}) + endif() +endif() + +generate_pkgconfig(myx-${TRGT} COMPONENT base-dev INSTALL_LIBRARY ${MYXLIB_BUILD_LIBRARIES}) +install(FILES ${TRGT_headers} COMPONENT base-dev DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME}/${TRGT}) + +# Цель, используемая только для установки заголовочных файлов без компиляции проекта +add_custom_target(${TRGT}-install-headers COMMAND ${CMAKE_COMMAND} -DCMAKE_INSTALL_COMPONENT=base-dev -P + "${CMAKE_BINARY_DIR}/cmake_install.cmake") diff --git a/src/myx/redis/client-inl.hpp b/src/myx/redis.old/client-inl.hpp similarity index 100% rename from src/myx/redis/client-inl.hpp rename to src/myx/redis.old/client-inl.hpp diff --git a/src/myx/redis/client.cpp b/src/myx/redis.old/client.cpp similarity index 100% rename from src/myx/redis/client.cpp rename to src/myx/redis.old/client.cpp diff --git a/src/myx/redis/client.hpp b/src/myx/redis.old/client.hpp similarity index 100% rename from src/myx/redis/client.hpp rename to src/myx/redis.old/client.hpp diff --git a/src/myx/redis/lexer-inl.hpp b/src/myx/redis.old/lexer-inl.hpp similarity index 100% rename from src/myx/redis/lexer-inl.hpp rename to src/myx/redis.old/lexer-inl.hpp diff --git a/src/myx/redis/lexer.cpp b/src/myx/redis.old/lexer.cpp similarity index 100% rename from src/myx/redis/lexer.cpp rename to src/myx/redis.old/lexer.cpp diff --git a/src/myx/redis/lexer.hpp b/src/myx/redis.old/lexer.hpp similarity index 100% rename from src/myx/redis/lexer.hpp rename to src/myx/redis.old/lexer.hpp diff --git a/src/myx/redis/parser-inl.hpp b/src/myx/redis.old/parser-inl.hpp similarity index 100% rename from src/myx/redis/parser-inl.hpp rename to src/myx/redis.old/parser-inl.hpp diff --git a/src/myx/redis/parser.cpp b/src/myx/redis.old/parser.cpp similarity index 100% rename from src/myx/redis/parser.cpp rename to src/myx/redis.old/parser.cpp diff --git a/src/myx/redis/parser.hpp b/src/myx/redis.old/parser.hpp similarity index 100% rename from src/myx/redis/parser.hpp rename to src/myx/redis.old/parser.hpp diff --git a/src/myx/redis/reply.hpp b/src/myx/redis.old/reply.hpp similarity index 100% rename from src/myx/redis/reply.hpp rename to src/myx/redis.old/reply.hpp diff --git a/src/myx/redis/request-inl.hpp b/src/myx/redis.old/request-inl.hpp similarity index 100% rename from src/myx/redis/request-inl.hpp rename to src/myx/redis.old/request-inl.hpp diff --git a/src/myx/redis/request.cpp b/src/myx/redis.old/request.cpp similarity index 100% rename from src/myx/redis/request.cpp rename to src/myx/redis.old/request.cpp diff --git a/src/myx/redis/request.hpp b/src/myx/redis.old/request.hpp similarity index 100% rename from src/myx/redis/request.hpp rename to src/myx/redis.old/request.hpp diff --git a/src/myx/redis/CMakeLists.txt b/src/myx/redis/CMakeLists.txt index a8fc417..ea6fed6 100644 --- a/src/myx/redis/CMakeLists.txt +++ b/src/myx/redis/CMakeLists.txt @@ -4,24 +4,21 @@ set(TRGT redis) # cmake-format: off # Список файлов исходных текстов set(TRGT_cpp - ${CMAKE_CURRENT_SOURCE_DIR}/client.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/lexer.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/parser.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/request.cpp) + ${CMAKE_CURRENT_SOURCE_DIR}/base.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/pubsub.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/containers.cpp) -# Список заголовочных файлов (используется для установки) +# Список заголовочных файлов set(TRGT_moc_hpp - ${CMAKE_CURRENT_SOURCE_DIR}/client.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/lexer.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/parser.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/request.hpp) + ${CMAKE_CURRENT_SOURCE_DIR}/base.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/pubsub.hpp) set(TRGT_hpp - ${CMAKE_CURRENT_SOURCE_DIR}/reply.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/client-inl.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/lexer-inl.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/parser-inl.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/request-inl.hpp) + ${CMAKE_CURRENT_SOURCE_DIR}/base-inl.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/containers.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/containers-inl.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/containers.tpp + ${CMAKE_CURRENT_SOURCE_DIR}/pubsub-inl.hpp) set(TRGT_headers ${TRGT_moc_hpp} ${TRGT_hpp}) # cmake-format: on diff --git a/src/myx/redis/base-inl.hpp b/src/myx/redis/base-inl.hpp new file mode 100644 index 0000000..92c5552 --- /dev/null +++ b/src/myx/redis/base-inl.hpp @@ -0,0 +1,133 @@ +#ifndef MYX_REDIS_BASE_INL_HPP_ +#define MYX_REDIS_BASE_INL_HPP_ + +#pragma once + +#ifndef MYXLIB_HEADER_ONLY +#include +#endif + +#include + +#include + +namespace myx { + +namespace redis { + +static const QByteArray Separator { "\r\n" }; //!< Строковый разделитель слов во фразе + +/*! + * \brief Создает объект класса + * \param host Имя хоста сервера + * \param port Номер порта сервера + * \param parent Родительский объект + */ +Base::Base( QObject* parent ) : + QObject{ parent } +{ + _readTimer = new QTimer { this }; + _writeTimer = new QTimer { this }; + _readTimer->setSingleShot( true ); + _writeTimer->setSingleShot( true ); + _readTimer->setInterval( 100 ); + _writeTimer->setInterval( 100 ); + connect( _readTimer, &QTimer::timeout, [this]() { + if ( _connectionFlags & Connected && _connectionFlags & Read ) + { + _connectionFlags -= Read; + emit connectionStateChanged( _connectionFlags ); + } + } ); + connect( _writeTimer, &QTimer::timeout, [this]() { + if ( _connectionFlags & Connected && _connectionFlags & Write ) + { + _connectionFlags -= Write; + emit connectionStateChanged( _connectionFlags ); + } + } ); +} + + +/*! + * \brief Задает адрес хоста + * \param host Адрес + */ +void Base::setHost( const QString& host ) +{ + _host = host; +} + + +/*! + * \brief Извлекает длину токена из буфера данных + * \param pos Позиция в буфере + * \param buffer Буфер данных + */ +int Base::fetchLength( QByteArray& buffer, int& pos ) +{ + if ( ( buffer[pos] != '$' ) && ( buffer[pos] != '*' ) && ( buffer[pos] != ':' ) ) + { + buffer.clear(); + pos = 0; + return( -1 ); + } + pos++; + QByteArray text; + while ( pos < buffer.length() ) + { + if ( QChar( buffer[pos] ).isNumber() ) + { + text += buffer[pos++]; + } + else + { + pos += Separator.length(); + return( text.toInt() ); + } + } + return( -1 ); +} // Base::fetchLength + + +/*! + * \brief Добавляет к состоянию флаг Read + */ +void Base::onReadyRead() +{ + if ( _connectionFlags & Connected && !( _connectionFlags & Read ) ) + { + _connectionFlags += Read; + emit connectionStateChanged( _connectionFlags ); + } + _readTimer->start(); +} + + +/*! + * \brief Добавляет к состоянию флаг Read + */ +void Base::onBytesWritten() +{ + if ( _connectionFlags & Connected && !( _connectionFlags & Write ) ) + { + _connectionFlags += Write; + emit connectionStateChanged( _connectionFlags ); + } + _writeTimer->start(); +} + + +/*! + * \brief Выставляет флаг ошибки соединения + */ +void Base::onSocketError() +{ + emit connectionStateChanged( _connectionFlags = Error ); +} + +} // namespace redis + +} // namespace myx + +#endif // ifndef MYX_REDIS_BASE_INL_HPP_ diff --git a/src/myx/redis/base.cpp b/src/myx/redis/base.cpp new file mode 100644 index 0000000..0ac1478 --- /dev/null +++ b/src/myx/redis/base.cpp @@ -0,0 +1,5 @@ +#ifndef MYXLIB_BUILD_LIBRARIES +#error Define MYXLIB_BUILD_LIBRARIES to compile this file. +#endif + +#include diff --git a/src/myx/redis/base.hpp b/src/myx/redis/base.hpp new file mode 100644 index 0000000..b4e879d --- /dev/null +++ b/src/myx/redis/base.hpp @@ -0,0 +1,69 @@ +#ifndef MYX_REDIS_BASE_HPP_ +#define MYX_REDIS_BASE_HPP_ + +#pragma once + +#include + +#include +#include + +QT_FORWARD_DECLARE_CLASS( QTimer ) +QT_FORWARD_DECLARE_CLASS( QTcpSocket ) + +namespace myx { + +namespace redis { + +/*! + * \brief Базовый класс для объектов взаимодействия с redis + */ +class Base : public QObject +{ + Q_OBJECT + +public: + /*! + * \brief Состояние соединения + */ + enum ConnectionStateFlags + { + Unconnected = 1, //!< Соединение не установлено + Connecting = 2, //!< Соединение устанавливается + Connected = 4, //!< Соединение установлено + Read = 8, //!< Есть данные для чтения + Write = 16, //!< Есть данные для записи + Error = 32, //!< Ошибка соединения + }; + + Base( QObject* parent = nullptr ); + void setHost( const QString& host ); + /*! + * \brief Сигнализирует об изменении состояния соединения + * \param flags Флаги состояния + */ + Q_SIGNAL void connectionStateChanged( int flags ); + +protected: + static constexpr int Timeout { 2000 }; //!< Таймаут на сетевые операции + int _connectionFlags { Unconnected }; //!< Флаги состояния соединения + quint16 _port { 6379 }; //!< Номер порта подключения к БД + QString _host { "127.0.0.1" }; //!< Имя хоста подключения к БД + QTimer* _readTimer; //!< Таймер чтения данных + QTimer* _writeTimer; //!< Таймер записи данных + + int fetchLength( QByteArray& buffer, int& pos ); + Q_SLOT void onReadyRead(); + Q_SLOT void onBytesWritten(); + Q_SLOT void onSocketError(); +}; // class Base + +} // namespace redis + +} // namespace myx + +#ifdef MYXLIB_HEADER_ONLY +#include "base-inl.hpp" +#endif + +#endif // ifndef MYX_REDIS_BASE_HPP_ diff --git a/src/myx/redis/containers-inl.hpp b/src/myx/redis/containers-inl.hpp new file mode 100644 index 0000000..585839b --- /dev/null +++ b/src/myx/redis/containers-inl.hpp @@ -0,0 +1,233 @@ +#ifndef MYX_REDIS_CONTAINERS_INL_HPP_ +#define MYX_REDIS_CONTAINERS_INL_HPP_ + +#pragma once + +#ifndef MYXLIB_HEADER_ONLY +#include +#endif + +#include +#include + +namespace myx { + +namespace redis { + +/*! + * \brief Создает часть команды + * \param value Значение для создани части команды + * \return Часть команды + */ +QByteArray part( const QVariant& value ) +{ + auto bytes = value.toByteArray(); + return( "$" + QByteArray::number( bytes.length() ) + Separator + bytes ); +} + + +/*! + * \brief Составляет RESP массив + * \param parts Части для составления массива + * \return RESP массив + */ +QByteArray array( const QVariantList& parts ) +{ + QByteArrayList data; + std::transform( parts.cbegin(), parts.cend(), std::back_inserter( data ), + []( const QVariant& iter ) { return( part( iter ) ); } ); + data.prepend( "*" + QByteArray::number( data.size() ) ); + return( data.join( Separator ) + Separator ); +} + + +/*! + * \brief Разбивает буфер данных на массив частей + * \param buffer Входной буфер данных + * \param splitLength Длина разобранной части буффера + * \return Массив частей-строк + */ +QByteArrayList split( const QByteArray& buffer, int* splitLength ) +{ + enum Token + { + Undefined, + FetchNumber, + FetchPart, + }; + + if ( ( buffer == "$-1\r\n" ) || ( buffer == "$*0\r\n" ) || buffer.isEmpty() ) + { + if ( splitLength != nullptr ) + { + *splitLength = buffer.length(); + } + return {}; + } + if ( buffer.startsWith( "-WRONG" ) ) + { + if ( splitLength != nullptr ) + { + *splitLength = buffer.endsWith( "\r\n" ) ? buffer.length() : 0; + } + return {}; + } + if ( splitLength != nullptr ) + { + *splitLength = 0; + } + if ( !buffer.endsWith( Separator ) ) + { + return {}; + } + int size { 1 }; + int pos { 0 }; + QByteArray partLength; + if ( buffer[pos] == '*' ) + { + pos++; + QByteArray text; + while ( QChar( buffer[pos] ).isNumber() && pos < buffer.size() ) text += buffer[pos++]; + if ( pos == buffer.size() ) + { + return {}; + } + size = text.toInt(); + pos += Separator.length(); + } + QByteArrayList parts; + Token token { Undefined }; + while ( parts.size() < size && pos < buffer.length() ) + { + switch ( token ) + { + case Undefined: + switch ( buffer[pos] ) + { + case '$': + partLength.clear(); + token = FetchPart; + pos++; + break; + case ':': + partLength.clear(); + token = FetchNumber; + pos++; + break; + default: + return {}; + } + break; + case FetchNumber: + if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) ) + { + partLength += buffer[pos]; + pos++; + } + else + { + parts.push_back( partLength ); + token = Undefined; + pos += Separator.length(); + } + break; + case FetchPart: + { + if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) ) + { + partLength += buffer[pos]; + pos++; + } + else + { + int length = partLength.toInt(); + if ( length < 0 ) + { + parts.push_back( "" ); + pos += Separator.length(); + } + else + { + pos += Separator.length(); + if ( pos + length > buffer.length() ) + { + return {}; + } + parts.push_back( buffer.mid( pos, length ) ); + pos += length + Separator.length(); + } + token = Undefined; + } + break; + } + } // switch + } + // может быть ситуация, когда в буфер недочитан последний \r\n, + // но все части были вычитаны + if ( ( parts.size() == size ) && ( pos <= buffer.length() ) ) + { + if ( splitLength != nullptr ) + { + *splitLength = pos; + } + return( parts ); + } + return {}; +} // split + + +/*! + * \brief Запаковывает сообщение для передачи через систему pubsub + * \param hash Таблица значений + * \return Запакованное сообщение + */ +QByteArray packHash( const QVariantHash& hash ) +{ + QVariantList parts; + for ( auto iter = hash.cbegin(); iter != hash.cend(); ++iter ) parts << iter.key() << iter.value(); + return( array( parts ) ); +} + + +/*! + * \brief Распаковывает кодограмму в таблицу значений + * \param message Сообщение + * \return Таблица значений + */ +QVariantHash unpackHash( const QByteArray& message ) +{ + auto parts = split( message ); + QVariantHash hash; + for ( int i = 1; i < parts.size(); i += 2 ) hash[parts[i - 1]] = parts[i]; + return( hash ); +} + + +/*! + * \brief Запаковывает сообщение для передачи через систему pubsub + * \param hash Список значений + * \return Запакованное сообщение + */ +QByteArray packList( const QVariantList& list ) +{ + return( array( list ) ); +} + + +/*! + * \brief Распаковывает кодограмму в список значений + * \param message Сообщение + * \return Список значений + */ +QVariantList unpackList( const QByteArray& message ) +{ + auto data = split( message ); + QVariantList list; + std::copy( data.begin(), data.end(), std::back_inserter( list ) ); + return( list ); +} + +} // namespace redis + +} // namespace myx +#endif // ifndef MYX_REDIS_CONTAINERS_INL_HPP_ diff --git a/src/myx/redis/containers.cpp b/src/myx/redis/containers.cpp new file mode 100644 index 0000000..1a0358d --- /dev/null +++ b/src/myx/redis/containers.cpp @@ -0,0 +1,5 @@ +#ifndef MYXLIB_BUILD_LIBRARIES +#error Define MYXLIB_BUILD_LIBRARIES to compile this file. +#endif + +#include diff --git a/src/myx/redis/containers.hpp b/src/myx/redis/containers.hpp new file mode 100644 index 0000000..d6ecdbc --- /dev/null +++ b/src/myx/redis/containers.hpp @@ -0,0 +1,44 @@ +#ifndef MYX_REDIS_CONTAINERS_HPP_ +#define MYX_REDIS_CONTAINERS_HPP_ + +#pragma once + +#include + +#include +#include + +namespace myx { + +namespace redis { + +static const QByteArray Separator { "\r\n" }; //!< Строковый разделитель слов во фразе + +QByteArray array( const QVariantList& parts ); +QByteArrayList split( const QByteArray& buffer, int* splitLength = nullptr ); +// QByteArray part( const QVariant& value ); +// QByteArray packHash( const QVariantHash& hash ); +// QVariantHash unpackHash( const QByteArray& message ); +// QByteArray packList( QByteArrayList list ); +// QByteArray packList( const QVariantList& list ); +QVariantList unpackList( const QByteArray& message ); +// template< typename T > +// QByteArray packPair( T first, T second ); +// template< typename T1, typename T2 > +// QByteArray packPair( T1 first, T2 second ); +// template< typename T > +// std::pair< T, T > unpackPair( const QByteArray& message ); +// template< typename T1, typename T2 > +// std::pair< T1, T2 > unpackPair( const QByteArray& message ); + +} // namespace redis + +} // namespace myx + +#include + +#ifdef MYXLIB_HEADER_ONLY +#include "containers-inl.hpp" +#endif + +#endif // ifndef MYX_REDIS_CONTAINERS_HPP_ diff --git a/src/myx/redis/containers.tpp b/src/myx/redis/containers.tpp new file mode 100644 index 0000000..7b3fc7c --- /dev/null +++ b/src/myx/redis/containers.tpp @@ -0,0 +1,68 @@ +#include + +namespace myx { + +namespace redis { + +/*! + * \brief Запаковывает пару значений в сообщение + * \param first Первое значение + * \param second Второе значение + * \return Текстовое сообщение + */ +template< typename T > +QByteArray packPair( T first, T second ) +{ + return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) ); +} + + +/*! + * \brief Запаковывает пару значений в сообщение + * \param first Первое значение + * \param second Второе значение + * \return Текстовое сообщение + */ +template< typename T1, typename T2 > +QByteArray packPair( T1 first, T2 second ) +{ + return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) ); +} + + +/*! + * \brief Распаковывает сообщение в пару однотипных значений + * \param pair Текст сообщения + * \return Пара значений + */ +template< typename T > +std::pair< T, T > unpackPair( const QByteArray& message ) +{ + auto list = unpackList( message ); + if ( list.size() == 2 ) + { + return { QVariant( list.first() ).value< T >(), QVariant( list.last() ).value< T >() }; + } + return {}; +} + + +/*! + * \brief Распаковывает сообщение в пару однотипных значений + * \param pair Текст сообщения + * \return Пара значений + */ +template< typename T1, typename T2 > +std::pair< T1, T2 > unpackPair( const QByteArray& message ) +{ + auto list = unpackList( message ); + if ( list.size() == 2 ) + { + return { QVariant( list.first() ).value< T1 >(), QVariant( list.last() ).value< T2 >() }; + } + return {}; +} + +} // namespace redis + +} // namespace myx diff --git a/src/myx/redis/pubsub-inl.hpp b/src/myx/redis/pubsub-inl.hpp new file mode 100644 index 0000000..f56722f --- /dev/null +++ b/src/myx/redis/pubsub-inl.hpp @@ -0,0 +1,504 @@ +#ifndef MYX_REDIS_PUBSUB_INL_HPP_ +#define MYX_REDIS_PUBSUB_INL_HPP_ + +#pragma once + +#ifndef MYXLIB_HEADER_ONLY +#include +#endif + +#include + +#include +#include +#include +#include +#include + +namespace myx { + +namespace redis { + +const QString PubSub::KeySpacePrefix { "__key%1__:" }; + +/*! + * \brief Конструктор класса + * \param parent Родительский объект для _socket + */ +PubSub::PubSub( QObject* parent ) : + Base{ parent } +{ + _socket = new QTcpSocket { this }; + QObject::connect( _socket, &QAbstractSocket::stateChanged, + this, &PubSub::onSocketStateChanged ); + connect( _socket, &QIODevice::readyRead, this, &PubSub::onReadyRead ); + connect( _socket, &QIODevice::bytesWritten, this, &PubSub::onBytesWritten ); + QObject::connect( _socket, static_cast< void ( QAbstractSocket::* )( QAbstractSocket::SocketError ) > + ( &QAbstractSocket::error ), [this]() { + qDebug() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" ) + << QByteArray( "redis pubsub connection error" ) << _socket->errorString(); + } ); + QObject::connect( _socket, &QIODevice::readyRead, this, &PubSub::read ); +} + + +/*! + * \brief Инициирует работу с БД + */ +void PubSub::start() +{ + emit disconnected(); + emit connectionStateChanged( _connectionFlags ); + _socket->connectToHost( _host, _port ); +} + + +/*! + * \brief Завершает работу с БД + */ +void PubSub::stop() +{ + if ( _socket->state() == QAbstractSocket::ConnectedState ) + { + _socket->disconnectFromHost(); + while ( _socket->state() != QAbstractSocket::UnconnectedState ) QCoreApplication::processEvents(); + } +} + + +/*! + * \brief Создает подписку на канал сообщений + * \param channel Rанал подписки + * \param subscriber Объект-подписчик + * \param method Имя метода, вызываемого при изменении получении серверного сообщения + */ +void PubSub::subscribe( QString channel, QObject* subscriber, const char* method ) +{ + if ( QThread::currentThread() != thread() ) + { + qWarning() << "no redis connection for caller thread"; + return; + } + auto iter = _subscribers.find( channel ); + if ( iter == _subscribers.end() ) + { + if ( _socket->state() == QAbstractSocket::ConnectedState ) + { + _socket->write( array( { "SUBSCIBE", channel } ) ); + } + _subscribers[channel].push_back( { subscriber, method } ); + } + else + { + auto value = iter.value(); + auto it = std::find_if( value.begin(), value.end(), + [subscriber, method]( const std::pair< QObject*, const char* >& p ) { + return( p.first == subscriber && strcmp( p.second, method ) == 0 ); + } ); + if ( it == iter.value().end() ) + { + iter.value().push_back( { subscriber, method } ); + } + } +} // PubSub::subscribe + + +/*! + * \brief Создает подписку на канал изменения данных + * \param channel Rанал подписки + * \param subscriber Объект-подписчик + * \param method Имя метода, вызываемого при изменении получении серверного сообщения + * \param database Номер БД + */ +void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database ) +{ + if ( QThread::currentThread() != thread() ) + { + qWarning() << "no redis connection for caller thread"; + return; + } + auto iter = _subscribers.find( channel ); + if ( iter == _subscribers.end() ) + { + channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); + if ( _socket->state() == QAbstractSocket::ConnectedState ) + { + _socket->write( array( { "SUBSCRIBE", channel } ) ); + } + _subscribers[channel].push_back( { subscriber, method } ); + } + else + { + auto value = iter.value(); + auto it = std::find_if( value.begin(), value.end(), + [subscriber, method]( const std::pair< QObject*, const char* >& p ) { + return( p.first == subscriber && strcmp( p.second, method ) == 0 ); + } ); + if ( it == iter.value().end() ) + { + iter.value().push_back( { subscriber, method } ); + } + } +} // PubSub::subscribe + + +/*! + * \brief Создает подписку на шаблон канала сообщений + * \param channel Шаблон канала подписки + * \param subscriber Объект-подписчик + * \param method Имя метода, вызываемого при изменении получении серверного сообщения + */ +void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method ) +{ + if ( QThread::currentThread() != thread() ) + { + qWarning() << "no redis connection for caller thread"; + return; + } + auto iter = _psubscribers.find( channel ); + if ( iter == _psubscribers.end() ) + { + if ( _socket->state() == QAbstractSocket::ConnectedState ) + { + _socket->write( array( { "PSUBSCRIBE", channel } ) ); + } + _psubscribers[channel].push_back( { subscriber, method } ); + } + else + { + auto value = iter.value(); + auto it = std::find_if( value.begin(), value.end(), + [subscriber, method]( const std::pair< QObject*, const char* >& p ) { + return( p.first == subscriber && strcmp( p.second, method ) == 0 ); + } ); + if ( it == iter.value().end() ) + { + iter.value().push_back( { subscriber, method } ); + } + } +} // PubSub::psubscribe + + +/*! + * \brief Создает подписку на шаблон канала изменения данных + * \param channel Шаблон канала подписки + * \param subscriber Объект-подписчик + * \param method Имя метода, вызываемого при изменении получении серверного сообщения + * \param database Номер БД (-1 для всех) + */ +void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database ) +{ + if ( QThread::currentThread() != thread() ) + { + qWarning() << "no redis connection for caller thread"; + return; + } + auto iter = _psubscribers.find( channel ); + if ( iter == _psubscribers.end() ) + { + if ( database == -1 ) + { + channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() ); + } + else + { + channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); + } + if ( _socket->state() == QAbstractSocket::ConnectedState ) + { + _socket->write( array( { "PSUBSCRIBE", channel } ) ); + } + _psubscribers[channel].push_back( { subscriber, method } ); + } + else + { + auto value = iter.value(); + auto it = std::find_if( value.begin(), value.end(), + [subscriber, method]( const std::pair< QObject*, const char* >& p ) { + return( p.first == subscriber && strcmp( p.second, method ) == 0 ); + } ); + if ( it == iter.value().end() ) + { + iter.value().push_back( { subscriber, method } ); + } + } +} // PubSub::psubscribe + + +/*! + * \brief Отписывает получателя от канала + * \param channel Имя канала + * \param subscriber Подписчик на канал + * \param method Метод обработки данных + * \param keyEvents Признак подписки на изменение данных + */ +void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method ) +{ + if ( QThread::currentThread() != thread() ) + { + qWarning() << "no redis connection for caller thread"; + return; + } + auto iter = _subscribers.find( channel ); + if ( iter != _subscribers.end() ) + { + auto ctx = std::find_if( iter->begin(), iter->end(), + [subscriber, method]( const std::pair< QObject*, const char* >& value ) { + return( subscriber == value.first && strcmp( method, value.second ) == 0 ); + } ); + if ( ctx != iter->end() ) + { + iter->erase( ctx ); + } + if ( iter->empty() ) + { + if ( _socket->state() == QAbstractSocket::ConnectedState ) + { + _socket->write( array( { "UNSUBSCRIBE", channel } ) ); + } + _subscribers.remove( channel ); + } + } +} // PubSub::unsubscribe + + +/*! + * \brief Отписывает получателя от канала + * \param channel Имя канала + * \param subscriber Подписчик на канал + * \param method Метод обработки данных + * \param keyEvents Признак подписки на изменение данных + */ +void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database ) +{ + if ( QThread::currentThread() != thread() ) + { + qWarning() << "no redis connection for caller thread"; + return; + } + if ( database == -1 ) + { + channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() ); + } + else + { + channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); + } + auto iter = _subscribers.find( channel ); + if ( iter != _subscribers.end() ) + { + auto ctx = std::find_if( iter->begin(), iter->end(), + [subscriber, method]( const std::pair< QObject*, const char* >& value ) { + return( subscriber == value.first && strcmp( method, value.second ) == 0 ); + } ); + if ( ctx != iter->end() ) + { + iter->erase( ctx ); + } + if ( iter->empty() ) + { + if ( _socket->state() == QAbstractSocket::ConnectedState ) + { + _socket->write( array( { "UNSUBSCRIBE", channel } ) ); + } + _subscribers.remove( channel ); + } + } +} // PubSub::unsubscribe + + +/*! + * \brief Отписывает получателя от шаблона канала + * \param channel Шаблон канала + * \param subscriber Подписчик на канал + * \param method Метод обработки данных + * \param keyEvents Признак подписки на изменение данных + */ +void PubSub::punsubscribe( QString channel, QObject* subscriber, + const char* method ) +{ + if ( QThread::currentThread() != thread() ) + { + qWarning() << "no redis connection for caller thread"; + return; + } + auto iter = _psubscribers.find( channel ); + if ( iter != _psubscribers.end() ) + { + auto ctx = std::find_if( iter->begin(), iter->end(), + [subscriber, method]( const std::pair< QObject*, const char* >& value ) { + return( subscriber == value.first && strcmp( method, value.second ) == 0 ); + } ); + if ( ctx != iter->end() ) + { + iter->erase( ctx ); + } + if ( iter->empty() ) + { + if ( _socket->state() == QAbstractSocket::ConnectedState ) + { + _socket->write( array( { "PUNSUBSCRIBE", channel } ) ); + } + _psubscribers.remove( channel ); + } + } +} // PubSub::punsubscribe + + +/*! + * \brief Отписывает получателя от шаблона канала + * \param channel Шаблон канала + * \param subscriber Подписчик на канал + * \param method Метод обработки данных + * \param keyEvents Признак подписки на изменение данных + */ +void PubSub::punsubscribe( QString channel, QObject* subscriber, + const char* method, int database ) +{ + if ( QThread::currentThread() != thread() ) + { + qWarning() << "no redis connection for caller thread"; + return; + } + if ( database == -1 ) + { + channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() ); + } + else + { + channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() ); + } + auto iter = _psubscribers.find( channel ); + if ( iter != _psubscribers.end() ) + { + auto ctx = std::find_if( iter->begin(), iter->end(), + [subscriber, method]( const std::pair< QObject*, const char* >& value ) { + return( subscriber == value.first && strcmp( method, value.second ) == 0 ); + } ); + if ( ctx != iter->end() ) + { + iter->erase( ctx ); + } + if ( iter->empty() ) + { + if ( _socket->state() == QAbstractSocket::ConnectedState ) + { + _socket->write( array( { "PUNSUBSCRIBE", channel } ) ); + } + _psubscribers.remove( channel ); + } + } +} // PubSub::punsubscribe + + +/*! + * \brief Разбирает поступившие от сервера уведомления и вызывает требуемые + * метаметоды обработки + */ +void PubSub::read() +{ + int splitLength; + while ( _socket->bytesAvailable() > 0 ) + { + _buffer += _socket->readAll(); + while ( !_buffer.isEmpty() ) + { + auto parts = split( _buffer, &splitLength ); + if ( parts.empty() ) + { + break; + } + if ( ( parts[0] == "pmessage" ) && ( parts.size() == 4 ) ) + { + auto iter = _psubscribers.find( parts[1] ); + if ( iter != _psubscribers.end() ) + { + for ( const auto& i: iter.value() ) + { + QMetaObject::invokeMethod( i.first, i.second, + Q_ARG( QString, parts[2] ), + Q_ARG( QByteArray, parts[3] ) ); + } + } + } + else + { + if ( ( parts[0] == "message" ) && ( parts.size() == 3 ) ) + { + auto iter = _subscribers.find( parts[1] ); + if ( iter != _subscribers.end() ) + { + for ( const auto& i: iter.value() ) + { + QMetaObject::invokeMethod( i.first, i.second, + Q_ARG( QString, parts[1] ), + Q_ARG( QByteArray, parts[2] ) ); + } + } + } + } + _buffer.remove( 0, splitLength ); + } + } +} // PubSub::read + + +/*! + * \brief Обработка изменения состояния сокета + */ +void PubSub::onSocketStateChanged() +{ + switch ( _socket->state() ) + { + case QAbstractSocket::ConnectedState: + for ( auto iter = _psubscribers.cbegin(); iter != _psubscribers.cend(); ++iter ) _socket->write( array( { "PSUBSCRIBE", iter.key() } ) ); + for ( auto iter = _subscribers.cbegin(); iter != _subscribers.cend(); ++iter ) _socket->write( array( { "SUBSCRIBE", iter.key() } ) ); + qInfo() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" ) << + QString( "redis at %1:%2 is connected" ).arg( _host ).arg( _port ); + emit connectionStateChanged( _connectionFlags = Connected ); + emit connected(); + break; + case QAbstractSocket::UnconnectedState: + _buffer.clear(); + QTimer::singleShot( Timeout, [this]() { + _socket->connectToHost( _host, _port ); + } ); + qInfo() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" ) << + QString( "redis at %1:%2 is unconnected" ).arg( _host ).arg( _port ); + if ( _connectionFlags & Connected ) + { + emit disconnected(); + } + emit connectionStateChanged( _connectionFlags = Unconnected ); + break; + default: + break; + } // switch +} // PubSub::onSocketStateChanged + + +/*! + * \brief Возвращает признак установленного соединения + * \return Признак установленного соединения + */ +bool PubSub::isConnected() const +{ + return ( _connectionFlags & Connected ); +} + + +/*! + * \brief Возвращает сокет + * \return Сокет + */ +QTcpSocket* PubSub::socket() const +{ + return( _socket ); +} + +} // namespace redis + +} // namespace myx + + +#endif // ifndef MYX_REDIS_PUBSUB_INL_HPP_ diff --git a/src/myx/redis/pubsub.cpp b/src/myx/redis/pubsub.cpp new file mode 100644 index 0000000..527683c --- /dev/null +++ b/src/myx/redis/pubsub.cpp @@ -0,0 +1,5 @@ +#ifndef MYXLIB_BUILD_LIBRARIES +#error Define MYXLIB_BUILD_LIBRARIES to compile this file. +#endif + +#include diff --git a/src/myx/redis/pubsub.hpp b/src/myx/redis/pubsub.hpp new file mode 100644 index 0000000..be62bcf --- /dev/null +++ b/src/myx/redis/pubsub.hpp @@ -0,0 +1,75 @@ +#ifndef MYX_REDIS_PUBSUB_HPP_ +#define MYX_REDIS_PUBSUB_HPP_ + +#pragma once + +#include + +#include + +QT_FORWARD_DECLARE_CLASS( QTcpSocket ) + +namespace myx { + +namespace redis { + +/*! + * \brief УПравляет подпиской и публикацией сообщений + */ + +class PubSub : public Base +{ + Q_OBJECT + +public: + PubSub( QObject* parent = nullptr ); + bool isConnected() const; + QTcpSocket* socket() const; + Q_SLOT void start(); + Q_SLOT void stop(); + Q_INVOKABLE void subscribe( QString channel, QObject* subscriber, const char* method ); + Q_INVOKABLE void subscribe( QString channel, QObject* subscriber, + const char* method, int database ); + Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber, const char* method ); + Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber, + const char* method, int database ); + Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber, const char* method ); + Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber, + const char* method, int database ); + Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber, const char* method ); + Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber, + const char* method, int database ); + /*! + * \brief Сигнализирует об установке соединения + */ + Q_SIGNAL void connected(); + /*! + * \brief Сигнализирует о разрыве соединения + */ + Q_SIGNAL void disconnected(); + +private: + using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >; + + static const QString KeySpacePrefix; //!< Приставка для наблюдения за данными + QTcpSocket* _socket; + /*! + * \brief Таблица подписчиков + */ + QByteArray _buffer; //!< Буффер прочитанных данных + Sub _subscribers; //!< Обработчики сообщений по каналу подписки + Sub _psubscribers; //!< Обработчики сообщений по шаблону канала подписки + + Q_SLOT void read(); + Q_SLOT void onSocketStateChanged(); +}; // class PubSub + +} // namespace redis + +} // namespace myx + +#ifdef MYXLIB_HEADER_ONLY +#include "pubsub-inl.hpp" +#endif + +#endif // ifndef MYX_REDIS_PUBSUB_HPP_