Первое приближение для новой библиотеки Redis

This commit is contained in:
Andrei Astafev 2020-04-25 08:45:05 +03:00
parent e6253e777f
commit 37fd015311
31 changed files with 1226 additions and 16 deletions

View File

@ -50,7 +50,7 @@ add_subdirectory(src/myx/redis)
if(MYXLIB_BUILD_EXAMPLES OR MYXLIB_BUILD_EXAMPLES_HO) if(MYXLIB_BUILD_EXAMPLES OR MYXLIB_BUILD_EXAMPLES_HO)
add_subdirectory(examples/filesystem) add_subdirectory(examples/filesystem)
add_subdirectory(examples/qt) add_subdirectory(examples/qt)
add_subdirectory(examples/redis) # add_subdirectory(examples/redis)
endif() endif()
# Документация # Документация

@ -1 +1 @@
Subproject commit 1eaded9031726f539cd5fd4a44d874d7bd8d458d Subproject commit 2ea38c32c8d6aa7ccdbdf84cff1d0b0d50cc2fb3

View File

@ -0,0 +1,72 @@
# Название основной цели и имя библиотеки в текущем каталоге
set(TRGT redis)
# cmake-format: off
# Список файлов исходных текстов
set(TRGT_cpp
${CMAKE_CURRENT_SOURCE_DIR}/client.cpp
${CMAKE_CURRENT_SOURCE_DIR}/lexer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/parser.cpp
${CMAKE_CURRENT_SOURCE_DIR}/request.cpp)
# Список заголовочных файлов (используется для установки)
set(TRGT_moc_hpp
${CMAKE_CURRENT_SOURCE_DIR}/client.hpp
${CMAKE_CURRENT_SOURCE_DIR}/lexer.hpp
${CMAKE_CURRENT_SOURCE_DIR}/parser.hpp
${CMAKE_CURRENT_SOURCE_DIR}/request.hpp)
set(TRGT_hpp
${CMAKE_CURRENT_SOURCE_DIR}/reply.hpp
${CMAKE_CURRENT_SOURCE_DIR}/client-inl.hpp
${CMAKE_CURRENT_SOURCE_DIR}/lexer-inl.hpp
${CMAKE_CURRENT_SOURCE_DIR}/parser-inl.hpp
${CMAKE_CURRENT_SOURCE_DIR}/request-inl.hpp)
set(TRGT_headers ${TRGT_moc_hpp} ${TRGT_hpp})
# cmake-format: on
qt5_wrap_cpp(TRGT_moc_cpp ${TRGT_moc_hpp})
add_library(${TRGT}-header-only INTERFACE)
target_include_directories(
${TRGT}-header-only SYSTEM INTERFACE "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>"
"$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>")
if(MYXLIB_BUILD_LIBRARIES)
add_common_library(${TRGT} OUTPUT_NAME myx-${TRGT} SOURCES ${TRGT_moc_cpp} ${TRGT_cpp} ${TRGT_headers})
common_target_properties(${TRGT})
# Создание цели для проверки утилитой clang-tidy
add_clang_tidy_check(${TRGT} ${TRGT_cpp} ${TRGT_headers})
# Создание цели для проверки утилитой clang-analyze
add_clang_analyze_check(${TRGT} ${TRGT_cpp} ${TRGT_headers})
# Создание цели для проверки утилитой clazy
add_clazy_check(${TRGT} ${TRGT_cpp} ${TRGT_headers})
# Создание цели для проверки утилитой pvs-studio
add_pvs_check(${TRGT})
# Создание цели для автоматического форматирования кода
add_format_sources(${TRGT} ${TRGT_cpp} ${TRGT_headers})
target_compile_definitions(${TRGT} PUBLIC MYXLIB_BUILD_LIBRARIES)
target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Core_INCLUDE_DIRS})
target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Network_INCLUDE_DIRS})
target_include_directories(${TRGT} SYSTEM PRIVATE ${CMAKE_SOURCE_DIR}/src)
cotire(${TRGT})
install(TARGETS ${TRGT}_static COMPONENT libs-dev ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR})
if(BUILD_SHARED_LIBS)
install(TARGETS ${TRGT}_shared COMPONENT main LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
endif()
endif()
generate_pkgconfig(myx-${TRGT} COMPONENT base-dev INSTALL_LIBRARY ${MYXLIB_BUILD_LIBRARIES})
install(FILES ${TRGT_headers} COMPONENT base-dev DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME}/${TRGT})
# Цель, используемая только для установки заголовочных файлов без компиляции проекта
add_custom_target(${TRGT}-install-headers COMMAND ${CMAKE_COMMAND} -DCMAKE_INSTALL_COMPONENT=base-dev -P
"${CMAKE_BINARY_DIR}/cmake_install.cmake")

View File

@ -4,24 +4,21 @@ set(TRGT redis)
# cmake-format: off # cmake-format: off
# Список файлов исходных текстов # Список файлов исходных текстов
set(TRGT_cpp set(TRGT_cpp
${CMAKE_CURRENT_SOURCE_DIR}/client.cpp ${CMAKE_CURRENT_SOURCE_DIR}/base.cpp
${CMAKE_CURRENT_SOURCE_DIR}/lexer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/pubsub.cpp
${CMAKE_CURRENT_SOURCE_DIR}/parser.cpp ${CMAKE_CURRENT_SOURCE_DIR}/containers.cpp)
${CMAKE_CURRENT_SOURCE_DIR}/request.cpp)
# Список заголовочных файлов (используется для установки) # Список заголовочных файлов
set(TRGT_moc_hpp set(TRGT_moc_hpp
${CMAKE_CURRENT_SOURCE_DIR}/client.hpp ${CMAKE_CURRENT_SOURCE_DIR}/base.hpp
${CMAKE_CURRENT_SOURCE_DIR}/lexer.hpp ${CMAKE_CURRENT_SOURCE_DIR}/pubsub.hpp)
${CMAKE_CURRENT_SOURCE_DIR}/parser.hpp
${CMAKE_CURRENT_SOURCE_DIR}/request.hpp)
set(TRGT_hpp set(TRGT_hpp
${CMAKE_CURRENT_SOURCE_DIR}/reply.hpp ${CMAKE_CURRENT_SOURCE_DIR}/base-inl.hpp
${CMAKE_CURRENT_SOURCE_DIR}/client-inl.hpp ${CMAKE_CURRENT_SOURCE_DIR}/containers.hpp
${CMAKE_CURRENT_SOURCE_DIR}/lexer-inl.hpp ${CMAKE_CURRENT_SOURCE_DIR}/containers-inl.hpp
${CMAKE_CURRENT_SOURCE_DIR}/parser-inl.hpp ${CMAKE_CURRENT_SOURCE_DIR}/containers.tpp
${CMAKE_CURRENT_SOURCE_DIR}/request-inl.hpp) ${CMAKE_CURRENT_SOURCE_DIR}/pubsub-inl.hpp)
set(TRGT_headers ${TRGT_moc_hpp} ${TRGT_hpp}) set(TRGT_headers ${TRGT_moc_hpp} ${TRGT_hpp})
# cmake-format: on # cmake-format: on

133
src/myx/redis/base-inl.hpp Normal file
View File

@ -0,0 +1,133 @@
#ifndef MYX_REDIS_BASE_INL_HPP_
#define MYX_REDIS_BASE_INL_HPP_
#pragma once
#ifndef MYXLIB_HEADER_ONLY
#include <myx/redis/base.hpp>
#endif
#include <myx/base/config.hpp>
#include <QTimer>
namespace myx {
namespace redis {
static const QByteArray Separator { "\r\n" }; //!< Строковый разделитель слов во фразе
/*!
* \brief Создает объект класса
* \param host Имя хоста сервера
* \param port Номер порта сервера
* \param parent Родительский объект
*/
Base::Base( QObject* parent ) :
QObject{ parent }
{
_readTimer = new QTimer { this };
_writeTimer = new QTimer { this };
_readTimer->setSingleShot( true );
_writeTimer->setSingleShot( true );
_readTimer->setInterval( 100 );
_writeTimer->setInterval( 100 );
connect( _readTimer, &QTimer::timeout, [this]() {
if ( _connectionFlags & Connected && _connectionFlags & Read )
{
_connectionFlags -= Read;
emit connectionStateChanged( _connectionFlags );
}
} );
connect( _writeTimer, &QTimer::timeout, [this]() {
if ( _connectionFlags & Connected && _connectionFlags & Write )
{
_connectionFlags -= Write;
emit connectionStateChanged( _connectionFlags );
}
} );
}
/*!
* \brief Задает адрес хоста
* \param host Адрес
*/
void Base::setHost( const QString& host )
{
_host = host;
}
/*!
* \brief Извлекает длину токена из буфера данных
* \param pos Позиция в буфере
* \param buffer Буфер данных
*/
int Base::fetchLength( QByteArray& buffer, int& pos )
{
if ( ( buffer[pos] != '$' ) && ( buffer[pos] != '*' ) && ( buffer[pos] != ':' ) )
{
buffer.clear();
pos = 0;
return( -1 );
}
pos++;
QByteArray text;
while ( pos < buffer.length() )
{
if ( QChar( buffer[pos] ).isNumber() )
{
text += buffer[pos++];
}
else
{
pos += Separator.length();
return( text.toInt() );
}
}
return( -1 );
} // Base::fetchLength
/*!
* \brief Добавляет к состоянию флаг Read
*/
void Base::onReadyRead()
{
if ( _connectionFlags & Connected && !( _connectionFlags & Read ) )
{
_connectionFlags += Read;
emit connectionStateChanged( _connectionFlags );
}
_readTimer->start();
}
/*!
* \brief Добавляет к состоянию флаг Read
*/
void Base::onBytesWritten()
{
if ( _connectionFlags & Connected && !( _connectionFlags & Write ) )
{
_connectionFlags += Write;
emit connectionStateChanged( _connectionFlags );
}
_writeTimer->start();
}
/*!
* \brief Выставляет флаг ошибки соединения
*/
void Base::onSocketError()
{
emit connectionStateChanged( _connectionFlags = Error );
}
} // namespace redis
} // namespace myx
#endif // ifndef MYX_REDIS_BASE_INL_HPP_

5
src/myx/redis/base.cpp Normal file
View File

@ -0,0 +1,5 @@
#ifndef MYXLIB_BUILD_LIBRARIES
#error Define MYXLIB_BUILD_LIBRARIES to compile this file.
#endif
#include <myx/redis/base-inl.hpp>

69
src/myx/redis/base.hpp Normal file
View File

@ -0,0 +1,69 @@
#ifndef MYX_REDIS_BASE_HPP_
#define MYX_REDIS_BASE_HPP_
#pragma once
#include <myx/base/config.hpp>
#include <QHash>
#include <QObject>
QT_FORWARD_DECLARE_CLASS( QTimer )
QT_FORWARD_DECLARE_CLASS( QTcpSocket )
namespace myx {
namespace redis {
/*!
* \brief Базовый класс для объектов взаимодействия с redis
*/
class Base : public QObject
{
Q_OBJECT
public:
/*!
* \brief Состояние соединения
*/
enum ConnectionStateFlags
{
Unconnected = 1, //!< Соединение не установлено
Connecting = 2, //!< Соединение устанавливается
Connected = 4, //!< Соединение установлено
Read = 8, //!< Есть данные для чтения
Write = 16, //!< Есть данные для записи
Error = 32, //!< Ошибка соединения
};
Base( QObject* parent = nullptr );
void setHost( const QString& host );
/*!
* \brief Сигнализирует об изменении состояния соединения
* \param flags Флаги состояния
*/
Q_SIGNAL void connectionStateChanged( int flags );
protected:
static constexpr int Timeout { 2000 }; //!< Таймаут на сетевые операции
int _connectionFlags { Unconnected }; //!< Флаги состояния соединения
quint16 _port { 6379 }; //!< Номер порта подключения к БД
QString _host { "127.0.0.1" }; //!< Имя хоста подключения к БД
QTimer* _readTimer; //!< Таймер чтения данных
QTimer* _writeTimer; //!< Таймер записи данных
int fetchLength( QByteArray& buffer, int& pos );
Q_SLOT void onReadyRead();
Q_SLOT void onBytesWritten();
Q_SLOT void onSocketError();
}; // class Base
} // namespace redis
} // namespace myx
#ifdef MYXLIB_HEADER_ONLY
#include "base-inl.hpp"
#endif
#endif // ifndef MYX_REDIS_BASE_HPP_

View File

@ -0,0 +1,233 @@
#ifndef MYX_REDIS_CONTAINERS_INL_HPP_
#define MYX_REDIS_CONTAINERS_INL_HPP_
#pragma once
#ifndef MYXLIB_HEADER_ONLY
#include <myx/redis/containers.hpp>
#endif
#include <QHash>
#include <QDebug>
namespace myx {
namespace redis {
/*!
* \brief Создает часть команды
* \param value Значение для создани части команды
* \return Часть команды
*/
QByteArray part( const QVariant& value )
{
auto bytes = value.toByteArray();
return( "$" + QByteArray::number( bytes.length() ) + Separator + bytes );
}
/*!
* \brief Составляет RESP массив
* \param parts Части для составления массива
* \return RESP массив
*/
QByteArray array( const QVariantList& parts )
{
QByteArrayList data;
std::transform( parts.cbegin(), parts.cend(), std::back_inserter( data ),
[]( const QVariant& iter ) { return( part( iter ) ); } );
data.prepend( "*" + QByteArray::number( data.size() ) );
return( data.join( Separator ) + Separator );
}
/*!
* \brief Разбивает буфер данных на массив частей
* \param buffer Входной буфер данных
* \param splitLength Длина разобранной части буффера
* \return Массив частей-строк
*/
QByteArrayList split( const QByteArray& buffer, int* splitLength )
{
enum Token
{
Undefined,
FetchNumber,
FetchPart,
};
if ( ( buffer == "$-1\r\n" ) || ( buffer == "$*0\r\n" ) || buffer.isEmpty() )
{
if ( splitLength != nullptr )
{
*splitLength = buffer.length();
}
return {};
}
if ( buffer.startsWith( "-WRONG" ) )
{
if ( splitLength != nullptr )
{
*splitLength = buffer.endsWith( "\r\n" ) ? buffer.length() : 0;
}
return {};
}
if ( splitLength != nullptr )
{
*splitLength = 0;
}
if ( !buffer.endsWith( Separator ) )
{
return {};
}
int size { 1 };
int pos { 0 };
QByteArray partLength;
if ( buffer[pos] == '*' )
{
pos++;
QByteArray text;
while ( QChar( buffer[pos] ).isNumber() && pos < buffer.size() ) text += buffer[pos++];
if ( pos == buffer.size() )
{
return {};
}
size = text.toInt();
pos += Separator.length();
}
QByteArrayList parts;
Token token { Undefined };
while ( parts.size() < size && pos < buffer.length() )
{
switch ( token )
{
case Undefined:
switch ( buffer[pos] )
{
case '$':
partLength.clear();
token = FetchPart;
pos++;
break;
case ':':
partLength.clear();
token = FetchNumber;
pos++;
break;
default:
return {};
}
break;
case FetchNumber:
if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) )
{
partLength += buffer[pos];
pos++;
}
else
{
parts.push_back( partLength );
token = Undefined;
pos += Separator.length();
}
break;
case FetchPart:
{
if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) )
{
partLength += buffer[pos];
pos++;
}
else
{
int length = partLength.toInt();
if ( length < 0 )
{
parts.push_back( "" );
pos += Separator.length();
}
else
{
pos += Separator.length();
if ( pos + length > buffer.length() )
{
return {};
}
parts.push_back( buffer.mid( pos, length ) );
pos += length + Separator.length();
}
token = Undefined;
}
break;
}
} // switch
}
// может быть ситуация, когда в буфер недочитан последний \r\n,
// но все части были вычитаны
if ( ( parts.size() == size ) && ( pos <= buffer.length() ) )
{
if ( splitLength != nullptr )
{
*splitLength = pos;
}
return( parts );
}
return {};
} // split
/*!
* \brief Запаковывает сообщение для передачи через систему pubsub
* \param hash Таблица значений
* \return Запакованное сообщение
*/
QByteArray packHash( const QVariantHash& hash )
{
QVariantList parts;
for ( auto iter = hash.cbegin(); iter != hash.cend(); ++iter ) parts << iter.key() << iter.value();
return( array( parts ) );
}
/*!
* \brief Распаковывает кодограмму в таблицу значений
* \param message Сообщение
* \return Таблица значений
*/
QVariantHash unpackHash( const QByteArray& message )
{
auto parts = split( message );
QVariantHash hash;
for ( int i = 1; i < parts.size(); i += 2 ) hash[parts[i - 1]] = parts[i];
return( hash );
}
/*!
* \brief Запаковывает сообщение для передачи через систему pubsub
* \param hash Список значений
* \return Запакованное сообщение
*/
QByteArray packList( const QVariantList& list )
{
return( array( list ) );
}
/*!
* \brief Распаковывает кодограмму в список значений
* \param message Сообщение
* \return Список значений
*/
QVariantList unpackList( const QByteArray& message )
{
auto data = split( message );
QVariantList list;
std::copy( data.begin(), data.end(), std::back_inserter( list ) );
return( list );
}
} // namespace redis
} // namespace myx
#endif // ifndef MYX_REDIS_CONTAINERS_INL_HPP_

View File

@ -0,0 +1,5 @@
#ifndef MYXLIB_BUILD_LIBRARIES
#error Define MYXLIB_BUILD_LIBRARIES to compile this file.
#endif
#include <myx/redis/containers-inl.hpp>

View File

@ -0,0 +1,44 @@
#ifndef MYX_REDIS_CONTAINERS_HPP_
#define MYX_REDIS_CONTAINERS_HPP_
#pragma once
#include <myx/base/config.hpp>
#include <QByteArrayList>
#include <QVariantList>
namespace myx {
namespace redis {
static const QByteArray Separator { "\r\n" }; //!< Строковый разделитель слов во фразе
QByteArray array( const QVariantList& parts );
QByteArrayList split( const QByteArray& buffer, int* splitLength = nullptr );
// QByteArray part( const QVariant& value );
// QByteArray packHash( const QVariantHash& hash );
// QVariantHash unpackHash( const QByteArray& message );
// QByteArray packList( QByteArrayList list );
// QByteArray packList( const QVariantList& list );
QVariantList unpackList( const QByteArray& message );
// template< typename T >
// QByteArray packPair( T first, T second );
// template< typename T1, typename T2 >
// QByteArray packPair( T1 first, T2 second );
// template< typename T >
// std::pair< T, T > unpackPair( const QByteArray& message );
// template< typename T1, typename T2 >
// std::pair< T1, T2 > unpackPair( const QByteArray& message );
} // namespace redis
} // namespace myx
#include <myx/redis/containers.tpp>
#ifdef MYXLIB_HEADER_ONLY
#include "containers-inl.hpp"
#endif
#endif // ifndef MYX_REDIS_CONTAINERS_HPP_

View File

@ -0,0 +1,68 @@
#include <QVariant>
namespace myx {
namespace redis {
/*!
* \brief Запаковывает пару значений в сообщение
* \param first Первое значение
* \param second Второе значение
* \return Текстовое сообщение
*/
template< typename T >
QByteArray packPair( T first, T second )
{
return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) );
}
/*!
* \brief Запаковывает пару значений в сообщение
* \param first Первое значение
* \param second Второе значение
* \return Текстовое сообщение
*/
template< typename T1, typename T2 >
QByteArray packPair( T1 first, T2 second )
{
return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) );
}
/*!
* \brief Распаковывает сообщение в пару однотипных значений
* \param pair Текст сообщения
* \return Пара значений
*/
template< typename T >
std::pair< T, T > unpackPair( const QByteArray& message )
{
auto list = unpackList( message );
if ( list.size() == 2 )
{
return { QVariant( list.first() ).value< T >(), QVariant( list.last() ).value< T >() };
}
return {};
}
/*!
* \brief Распаковывает сообщение в пару однотипных значений
* \param pair Текст сообщения
* \return Пара значений
*/
template< typename T1, typename T2 >
std::pair< T1, T2 > unpackPair( const QByteArray& message )
{
auto list = unpackList( message );
if ( list.size() == 2 )
{
return { QVariant( list.first() ).value< T1 >(), QVariant( list.last() ).value< T2 >() };
}
return {};
}
} // namespace redis
} // namespace myx

View File

@ -0,0 +1,504 @@
#ifndef MYX_REDIS_PUBSUB_INL_HPP_
#define MYX_REDIS_PUBSUB_INL_HPP_
#pragma once
#ifndef MYXLIB_HEADER_ONLY
#include <myx/redis/pubsub.hpp>
#endif
#include <myx/redis/containers.hpp>
#include <QTimer>
#include <QThread>
#include <QDateTime>
#include <QTcpSocket>
#include <QCoreApplication>
namespace myx {
namespace redis {
const QString PubSub::KeySpacePrefix { "__key%1__:" };
/*!
* \brief Конструктор класса
* \param parent Родительский объект для _socket
*/
PubSub::PubSub( QObject* parent ) :
Base{ parent }
{
_socket = new QTcpSocket { this };
QObject::connect( _socket, &QAbstractSocket::stateChanged,
this, &PubSub::onSocketStateChanged );
connect( _socket, &QIODevice::readyRead, this, &PubSub::onReadyRead );
connect( _socket, &QIODevice::bytesWritten, this, &PubSub::onBytesWritten );
QObject::connect( _socket, static_cast< void ( QAbstractSocket::* )( QAbstractSocket::SocketError ) >
( &QAbstractSocket::error ), [this]() {
qDebug() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" )
<< QByteArray( "redis pubsub connection error" ) << _socket->errorString();
} );
QObject::connect( _socket, &QIODevice::readyRead, this, &PubSub::read );
}
/*!
* \brief Инициирует работу с БД
*/
void PubSub::start()
{
emit disconnected();
emit connectionStateChanged( _connectionFlags );
_socket->connectToHost( _host, _port );
}
/*!
* \brief Завершает работу с БД
*/
void PubSub::stop()
{
if ( _socket->state() == QAbstractSocket::ConnectedState )
{
_socket->disconnectFromHost();
while ( _socket->state() != QAbstractSocket::UnconnectedState ) QCoreApplication::processEvents();
}
}
/*!
* \brief Создает подписку на канал сообщений
* \param channel Rанал подписки
* \param subscriber Объект-подписчик
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
*/
void PubSub::subscribe( QString channel, QObject* subscriber, const char* method )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = _subscribers.find( channel );
if ( iter == _subscribers.end() )
{
if ( _socket->state() == QAbstractSocket::ConnectedState )
{
_socket->write( array( { "SUBSCIBE", channel } ) );
}
_subscribers[channel].push_back( { subscriber, method } );
}
else
{
auto value = iter.value();
auto it = std::find_if( value.begin(), value.end(),
[subscriber, method]( const std::pair< QObject*, const char* >& p ) {
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
} );
if ( it == iter.value().end() )
{
iter.value().push_back( { subscriber, method } );
}
}
} // PubSub::subscribe
/*!
* \brief Создает подписку на канал изменения данных
* \param channel Rанал подписки
* \param subscriber Объект-подписчик
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
* \param database Номер БД
*/
void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = _subscribers.find( channel );
if ( iter == _subscribers.end() )
{
channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
if ( _socket->state() == QAbstractSocket::ConnectedState )
{
_socket->write( array( { "SUBSCRIBE", channel } ) );
}
_subscribers[channel].push_back( { subscriber, method } );
}
else
{
auto value = iter.value();
auto it = std::find_if( value.begin(), value.end(),
[subscriber, method]( const std::pair< QObject*, const char* >& p ) {
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
} );
if ( it == iter.value().end() )
{
iter.value().push_back( { subscriber, method } );
}
}
} // PubSub::subscribe
/*!
* \brief Создает подписку на шаблон канала сообщений
* \param channel Шаблон канала подписки
* \param subscriber Объект-подписчик
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
*/
void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = _psubscribers.find( channel );
if ( iter == _psubscribers.end() )
{
if ( _socket->state() == QAbstractSocket::ConnectedState )
{
_socket->write( array( { "PSUBSCRIBE", channel } ) );
}
_psubscribers[channel].push_back( { subscriber, method } );
}
else
{
auto value = iter.value();
auto it = std::find_if( value.begin(), value.end(),
[subscriber, method]( const std::pair< QObject*, const char* >& p ) {
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
} );
if ( it == iter.value().end() )
{
iter.value().push_back( { subscriber, method } );
}
}
} // PubSub::psubscribe
/*!
* \brief Создает подписку на шаблон канала изменения данных
* \param channel Шаблон канала подписки
* \param subscriber Объект-подписчик
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
* \param database Номер БД (-1 для всех)
*/
void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = _psubscribers.find( channel );
if ( iter == _psubscribers.end() )
{
if ( database == -1 )
{
channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() );
}
else
{
channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
}
if ( _socket->state() == QAbstractSocket::ConnectedState )
{
_socket->write( array( { "PSUBSCRIBE", channel } ) );
}
_psubscribers[channel].push_back( { subscriber, method } );
}
else
{
auto value = iter.value();
auto it = std::find_if( value.begin(), value.end(),
[subscriber, method]( const std::pair< QObject*, const char* >& p ) {
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
} );
if ( it == iter.value().end() )
{
iter.value().push_back( { subscriber, method } );
}
}
} // PubSub::psubscribe
/*!
* \brief Отписывает получателя от канала
* \param channel Имя канала
* \param subscriber Подписчик на канал
* \param method Метод обработки данных
* \param keyEvents Признак подписки на изменение данных
*/
void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = _subscribers.find( channel );
if ( iter != _subscribers.end() )
{
auto ctx = std::find_if( iter->begin(), iter->end(),
[subscriber, method]( const std::pair< QObject*, const char* >& value ) {
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
} );
if ( ctx != iter->end() )
{
iter->erase( ctx );
}
if ( iter->empty() )
{
if ( _socket->state() == QAbstractSocket::ConnectedState )
{
_socket->write( array( { "UNSUBSCRIBE", channel } ) );
}
_subscribers.remove( channel );
}
}
} // PubSub::unsubscribe
/*!
* \brief Отписывает получателя от канала
* \param channel Имя канала
* \param subscriber Подписчик на канал
* \param method Метод обработки данных
* \param keyEvents Признак подписки на изменение данных
*/
void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
if ( database == -1 )
{
channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() );
}
else
{
channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
}
auto iter = _subscribers.find( channel );
if ( iter != _subscribers.end() )
{
auto ctx = std::find_if( iter->begin(), iter->end(),
[subscriber, method]( const std::pair< QObject*, const char* >& value ) {
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
} );
if ( ctx != iter->end() )
{
iter->erase( ctx );
}
if ( iter->empty() )
{
if ( _socket->state() == QAbstractSocket::ConnectedState )
{
_socket->write( array( { "UNSUBSCRIBE", channel } ) );
}
_subscribers.remove( channel );
}
}
} // PubSub::unsubscribe
/*!
* \brief Отписывает получателя от шаблона канала
* \param channel Шаблон канала
* \param subscriber Подписчик на канал
* \param method Метод обработки данных
* \param keyEvents Признак подписки на изменение данных
*/
void PubSub::punsubscribe( QString channel, QObject* subscriber,
const char* method )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = _psubscribers.find( channel );
if ( iter != _psubscribers.end() )
{
auto ctx = std::find_if( iter->begin(), iter->end(),
[subscriber, method]( const std::pair< QObject*, const char* >& value ) {
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
} );
if ( ctx != iter->end() )
{
iter->erase( ctx );
}
if ( iter->empty() )
{
if ( _socket->state() == QAbstractSocket::ConnectedState )
{
_socket->write( array( { "PUNSUBSCRIBE", channel } ) );
}
_psubscribers.remove( channel );
}
}
} // PubSub::punsubscribe
/*!
* \brief Отписывает получателя от шаблона канала
* \param channel Шаблон канала
* \param subscriber Подписчик на канал
* \param method Метод обработки данных
* \param keyEvents Признак подписки на изменение данных
*/
void PubSub::punsubscribe( QString channel, QObject* subscriber,
const char* method, int database )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
if ( database == -1 )
{
channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() );
}
else
{
channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
}
auto iter = _psubscribers.find( channel );
if ( iter != _psubscribers.end() )
{
auto ctx = std::find_if( iter->begin(), iter->end(),
[subscriber, method]( const std::pair< QObject*, const char* >& value ) {
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
} );
if ( ctx != iter->end() )
{
iter->erase( ctx );
}
if ( iter->empty() )
{
if ( _socket->state() == QAbstractSocket::ConnectedState )
{
_socket->write( array( { "PUNSUBSCRIBE", channel } ) );
}
_psubscribers.remove( channel );
}
}
} // PubSub::punsubscribe
/*!
* \brief Разбирает поступившие от сервера уведомления и вызывает требуемые
* метаметоды обработки
*/
void PubSub::read()
{
int splitLength;
while ( _socket->bytesAvailable() > 0 )
{
_buffer += _socket->readAll();
while ( !_buffer.isEmpty() )
{
auto parts = split( _buffer, &splitLength );
if ( parts.empty() )
{
break;
}
if ( ( parts[0] == "pmessage" ) && ( parts.size() == 4 ) )
{
auto iter = _psubscribers.find( parts[1] );
if ( iter != _psubscribers.end() )
{
for ( const auto& i: iter.value() )
{
QMetaObject::invokeMethod( i.first, i.second,
Q_ARG( QString, parts[2] ),
Q_ARG( QByteArray, parts[3] ) );
}
}
}
else
{
if ( ( parts[0] == "message" ) && ( parts.size() == 3 ) )
{
auto iter = _subscribers.find( parts[1] );
if ( iter != _subscribers.end() )
{
for ( const auto& i: iter.value() )
{
QMetaObject::invokeMethod( i.first, i.second,
Q_ARG( QString, parts[1] ),
Q_ARG( QByteArray, parts[2] ) );
}
}
}
}
_buffer.remove( 0, splitLength );
}
}
} // PubSub::read
/*!
* \brief Обработка изменения состояния сокета
*/
void PubSub::onSocketStateChanged()
{
switch ( _socket->state() )
{
case QAbstractSocket::ConnectedState:
for ( auto iter = _psubscribers.cbegin(); iter != _psubscribers.cend(); ++iter ) _socket->write( array( { "PSUBSCRIBE", iter.key() } ) );
for ( auto iter = _subscribers.cbegin(); iter != _subscribers.cend(); ++iter ) _socket->write( array( { "SUBSCRIBE", iter.key() } ) );
qInfo() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" ) <<
QString( "redis at %1:%2 is connected" ).arg( _host ).arg( _port );
emit connectionStateChanged( _connectionFlags = Connected );
emit connected();
break;
case QAbstractSocket::UnconnectedState:
_buffer.clear();
QTimer::singleShot( Timeout, [this]() {
_socket->connectToHost( _host, _port );
} );
qInfo() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" ) <<
QString( "redis at %1:%2 is unconnected" ).arg( _host ).arg( _port );
if ( _connectionFlags & Connected )
{
emit disconnected();
}
emit connectionStateChanged( _connectionFlags = Unconnected );
break;
default:
break;
} // switch
} // PubSub::onSocketStateChanged
/*!
* \brief Возвращает признак установленного соединения
* \return Признак установленного соединения
*/
bool PubSub::isConnected() const
{
return ( _connectionFlags & Connected );
}
/*!
* \brief Возвращает сокет
* \return Сокет
*/
QTcpSocket* PubSub::socket() const
{
return( _socket );
}
} // namespace redis
} // namespace myx
#endif // ifndef MYX_REDIS_PUBSUB_INL_HPP_

5
src/myx/redis/pubsub.cpp Normal file
View File

@ -0,0 +1,5 @@
#ifndef MYXLIB_BUILD_LIBRARIES
#error Define MYXLIB_BUILD_LIBRARIES to compile this file.
#endif
#include <myx/redis/pubsub-inl.hpp>

75
src/myx/redis/pubsub.hpp Normal file
View File

@ -0,0 +1,75 @@
#ifndef MYX_REDIS_PUBSUB_HPP_
#define MYX_REDIS_PUBSUB_HPP_
#pragma once
#include <myx/redis/base.hpp>
#include <QObject>
QT_FORWARD_DECLARE_CLASS( QTcpSocket )
namespace myx {
namespace redis {
/*!
* \brief УПравляет подпиской и публикацией сообщений
*/
class PubSub : public Base
{
Q_OBJECT
public:
PubSub( QObject* parent = nullptr );
bool isConnected() const;
QTcpSocket* socket() const;
Q_SLOT void start();
Q_SLOT void stop();
Q_INVOKABLE void subscribe( QString channel, QObject* subscriber, const char* method );
Q_INVOKABLE void subscribe( QString channel, QObject* subscriber,
const char* method, int database );
Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber, const char* method );
Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber,
const char* method, int database );
Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber, const char* method );
Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber,
const char* method, int database );
Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber, const char* method );
Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber,
const char* method, int database );
/*!
* \brief Сигнализирует об установке соединения
*/
Q_SIGNAL void connected();
/*!
* \brief Сигнализирует о разрыве соединения
*/
Q_SIGNAL void disconnected();
private:
using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >;
static const QString KeySpacePrefix; //!< Приставка для наблюдения за данными
QTcpSocket* _socket;
/*!
* \brief Таблица подписчиков
*/
QByteArray _buffer; //!< Буффер прочитанных данных
Sub _subscribers; //!< Обработчики сообщений по каналу подписки
Sub _psubscribers; //!< Обработчики сообщений по шаблону канала подписки
Q_SLOT void read();
Q_SLOT void onSocketStateChanged();
}; // class PubSub
} // namespace redis
} // namespace myx
#ifdef MYXLIB_HEADER_ONLY
#include "pubsub-inl.hpp"
#endif
#endif // ifndef MYX_REDIS_PUBSUB_HPP_