Библиотека Redis вынесена в отдельный проект
This commit is contained in:
parent
b41f17a9b2
commit
94616ba6bc
@ -44,13 +44,11 @@ add_subdirectory(src/myx/base)
|
||||
add_subdirectory(src/myx/filesystem)
|
||||
add_subdirectory(src/myx/qt)
|
||||
add_subdirectory(src/myx/math)
|
||||
add_subdirectory(src/myx/redis)
|
||||
|
||||
# Примеры
|
||||
if(MYXLIB_BUILD_EXAMPLES OR MYXLIB_BUILD_EXAMPLES_HO)
|
||||
add_subdirectory(examples/filesystem)
|
||||
add_subdirectory(examples/qt)
|
||||
add_subdirectory(examples/redis)
|
||||
endif()
|
||||
|
||||
# Документация
|
||||
|
@ -1,92 +0,0 @@
|
||||
# Название основной цели в текущем каталоге
|
||||
set(TRGT example-redis-client)
|
||||
|
||||
# Список файлов исходных текстов
|
||||
set(TRGT_cpp ${CMAKE_CURRENT_SOURCE_DIR}/client.cpp)
|
||||
|
||||
set(TRGT_moc_hpp ${CMAKE_CURRENT_SOURCE_DIR}/client.hpp)
|
||||
|
||||
qt5_wrap_cpp(TRGT_moc_cpp ${TRGT_moc_hpp})
|
||||
|
||||
if(MYXLIB_BUILD_EXAMPLES)
|
||||
|
||||
# Путь поиска библиотек внутри проекта
|
||||
link_directories(${CMAKE_BINARY_DIR}/${CMAKE_INSTALL_LIBDIR})
|
||||
|
||||
# Цель для создания исполняемого файла
|
||||
add_executable(${TRGT} ${TRGT_cpp} ${TRGT_moc_cpp} ${TRGT_moc_hpp})
|
||||
common_target_properties(${TRGT})
|
||||
|
||||
# Создание цели для проверки утилитой clang-tidy
|
||||
add_clang_tidy_check(${TRGT} ${TRGT_cpp})
|
||||
|
||||
# Создание цели для проверки утилитой clang-analyze
|
||||
add_clang_analyze_check(${TRGT} ${TRGT_cpp})
|
||||
|
||||
# Создание цели для проверки утилитой clazy
|
||||
add_clazy_check(${TRGT} ${TRGT_cpp})
|
||||
|
||||
# Создание цели для проверки утилитой pvs-studio
|
||||
add_pvs_check(${TRGT})
|
||||
|
||||
# Создание цели для автоматического форматирования кода
|
||||
add_format_sources(${TRGT} ${TRGT_cpp} ${TRGT_moc_hpp})
|
||||
|
||||
# Qt5
|
||||
target_include_directories(${TRGT} PRIVATE ${CMAKE_SOURCE_DIR}/src)
|
||||
target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Core_INCLUDE_DIRS})
|
||||
target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Network_INCLUDE_DIRS})
|
||||
|
||||
target_include_directories(${TRGT} SYSTEM PRIVATE ${CMAKE_SOURCE_DIR}/src)
|
||||
add_dependencies(${TRGT} base redis)
|
||||
|
||||
target_link_libraries(${TRGT} base_static redis_static)
|
||||
target_link_libraries(${TRGT} Qt5::Core Qt5::Network)
|
||||
target_link_libraries(${TRGT} Threads::Threads)
|
||||
|
||||
# Имя выходного файла для цели
|
||||
set_target_properties(${TRGT} PROPERTIES OUTPUT_NAME redis-client)
|
||||
|
||||
add_sanitizers(${TRGT})
|
||||
|
||||
cotire(${TRGT})
|
||||
|
||||
add_dependencies(${TRGT} create_auxilary_symlinks)
|
||||
|
||||
# Правила для установки
|
||||
install(TARGETS ${TRGT} COMPONENT examples RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
|
||||
endif()
|
||||
|
||||
if(MYXLIB_BUILD_EXAMPLES_HO)
|
||||
set(REDIS_LIB_DIR ${CMAKE_SOURCE_DIR}/src/myx/redis)
|
||||
|
||||
set(REDIS_moc_hpp ${REDIS_LIB_DIR}/base.hpp
|
||||
${REDIS_LIB_DIR}/containers.hpp
|
||||
${REDIS_LIB_DIR}/pubsub.hpp)
|
||||
|
||||
qt5_wrap_cpp(REDIS_moc_cpp ${REDIS_moc_hpp})
|
||||
|
||||
# Цель для создания исполняемого файла
|
||||
add_executable(${TRGT}-ho ${TRGT_cpp} ${TRGT_moc_cpp} ${REDIS_moc_cpp} ${TRGT_moc_hpp})
|
||||
common_target_properties(${TRGT}-ho)
|
||||
|
||||
target_include_directories(${TRGT}-ho PRIVATE ${CMAKE_SOURCE_DIR}/src)
|
||||
target_include_directories(${TRGT}-ho SYSTEM PUBLIC ${Qt5Core_INCLUDE_DIRS})
|
||||
target_include_directories(${TRGT}-ho SYSTEM PUBLIC ${Qt5Network_INCLUDE_DIRS})
|
||||
|
||||
add_dependencies(${TRGT}-ho base-header-only redis-header-only)
|
||||
|
||||
target_link_libraries(${TRGT}-ho Qt5::Core Qt5::Network)
|
||||
target_link_libraries(${TRGT}-ho Threads::Threads)
|
||||
|
||||
# Имя выходного файла для цели
|
||||
set_target_properties(${TRGT}-ho PROPERTIES OUTPUT_NAME redis-translators-ho)
|
||||
|
||||
add_sanitizers(${TRGT}-ho)
|
||||
cotire(${TRGT}-ho)
|
||||
|
||||
add_dependencies(${TRGT}-ho create_auxilary_symlinks)
|
||||
|
||||
# Правила для установки
|
||||
install(TARGETS ${TRGT}-ho COMPONENT examples RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
|
||||
endif()
|
@ -1,21 +0,0 @@
|
||||
#include "client.hpp"
|
||||
|
||||
#include <myx/redis/pubsub.hpp>
|
||||
#include <myx/redis/client.hpp>
|
||||
|
||||
#include <QCoreApplication>
|
||||
#include <QDebug>
|
||||
|
||||
namespace MR = myx::redis;
|
||||
|
||||
int main( int argc, char** argv )
|
||||
{
|
||||
QCoreApplication app( argc, argv );
|
||||
|
||||
MR::PubSub a;
|
||||
MR::Client b;
|
||||
RedisClient c( &a );
|
||||
a.start();
|
||||
|
||||
return( QCoreApplication::exec() );
|
||||
} // main
|
@ -1,32 +0,0 @@
|
||||
#include <QObject>
|
||||
#include <QDebug>
|
||||
|
||||
#include <myx/redis/pubsub.hpp>
|
||||
|
||||
namespace MR = myx::redis;
|
||||
|
||||
class RedisClient : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
explicit RedisClient( MR::PubSub* p, QObject* o = nullptr ) :
|
||||
QObject( o )
|
||||
{
|
||||
p->subscribe( "test", this, "readChannelTest" );
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
Q_INVOKABLE void readChannelTest( const QString& channel, const QByteArray& message )
|
||||
{
|
||||
qDebug() << channel;
|
||||
qDebug() << message;
|
||||
|
||||
if ( message == "quit" )
|
||||
{
|
||||
qCritical() << message;
|
||||
qApp->quit();
|
||||
}
|
||||
}
|
||||
}; // class RedisClient
|
@ -1 +0,0 @@
|
||||
add_subdirectory(01_client)
|
@ -1,71 +0,0 @@
|
||||
# Название основной цели и имя библиотеки в текущем каталоге
|
||||
set(TRGT redis)
|
||||
|
||||
# cmake-format: off
|
||||
# Список файлов исходных текстов
|
||||
set(TRGT_cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/base.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/client.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/pubsub.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/containers.cpp)
|
||||
|
||||
# Список заголовочных файлов
|
||||
set(TRGT_moc_hpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/base.hpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/client.hpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/pubsub.hpp)
|
||||
|
||||
set(TRGT_hpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/base-inl.hpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/client-inl.hpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/containers.hpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/containers-inl.hpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/pubsub-inl.hpp)
|
||||
|
||||
set(TRGT_headers ${TRGT_moc_hpp} ${TRGT_hpp})
|
||||
# cmake-format: on
|
||||
|
||||
qt5_wrap_cpp(TRGT_moc_cpp ${TRGT_moc_hpp})
|
||||
|
||||
add_library(${TRGT}-header-only INTERFACE)
|
||||
target_include_directories(
|
||||
${TRGT}-header-only SYSTEM INTERFACE "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>"
|
||||
"$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>")
|
||||
|
||||
if(MYXLIB_BUILD_LIBRARIES)
|
||||
add_common_library(${TRGT} OUTPUT_NAME myx-${TRGT} SOURCES ${TRGT_moc_cpp} ${TRGT_cpp} ${TRGT_headers})
|
||||
common_target_properties(${TRGT})
|
||||
|
||||
# Создание цели для проверки утилитой clang-tidy
|
||||
add_clang_tidy_check(${TRGT} ${TRGT_cpp} ${TRGT_headers})
|
||||
|
||||
# Создание цели для проверки утилитой clang-analyze
|
||||
add_clang_analyze_check(${TRGT} ${TRGT_cpp} ${TRGT_headers})
|
||||
|
||||
# Создание цели для проверки утилитой clazy
|
||||
add_clazy_check(${TRGT} ${TRGT_cpp} ${TRGT_headers})
|
||||
|
||||
# Создание цели для проверки утилитой pvs-studio
|
||||
add_pvs_check(${TRGT})
|
||||
|
||||
# Создание цели для автоматического форматирования кода
|
||||
add_format_sources(${TRGT} ${TRGT_cpp} ${TRGT_headers})
|
||||
|
||||
target_compile_definitions(${TRGT} PUBLIC MYXLIB_BUILD_LIBRARIES)
|
||||
target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Core_INCLUDE_DIRS})
|
||||
target_include_directories(${TRGT} SYSTEM PUBLIC ${Qt5Network_INCLUDE_DIRS})
|
||||
target_include_directories(${TRGT} SYSTEM PRIVATE ${CMAKE_SOURCE_DIR}/src)
|
||||
|
||||
cotire(${TRGT})
|
||||
install(TARGETS ${TRGT}_static COMPONENT libs-dev ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR})
|
||||
if(BUILD_SHARED_LIBS)
|
||||
install(TARGETS ${TRGT}_shared COMPONENT main LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
|
||||
endif()
|
||||
endif()
|
||||
|
||||
generate_pkgconfig(myx-${TRGT} COMPONENT base-dev INSTALL_LIBRARY ${MYXLIB_BUILD_LIBRARIES})
|
||||
install(FILES ${TRGT_headers} COMPONENT base-dev DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME}/${TRGT})
|
||||
|
||||
# Цель, используемая только для установки заголовочных файлов без компиляции проекта
|
||||
add_custom_target(${TRGT}-install-headers COMMAND ${CMAKE_COMMAND} -DCMAKE_INSTALL_COMPONENT=base-dev -P
|
||||
"${CMAKE_BINARY_DIR}/cmake_install.cmake")
|
@ -1,146 +0,0 @@
|
||||
#ifndef MYX_REDIS_BASE_INL_HPP_
|
||||
#define MYX_REDIS_BASE_INL_HPP_
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifndef MYXLIB_HEADER_ONLY
|
||||
#include <myx/redis/base.hpp>
|
||||
#endif
|
||||
|
||||
#include <myx/base/config.hpp>
|
||||
|
||||
#include <QTimer>
|
||||
|
||||
namespace myx {
|
||||
|
||||
namespace redis {
|
||||
|
||||
/*!
|
||||
* \brief Создаёт объект класса
|
||||
* \param host Имя хоста сервера
|
||||
* \param port Номер порта сервера
|
||||
* \param parent Родительский объект
|
||||
*/
|
||||
MYXLIB_INLINE Base::Base( QObject* parent ) :
|
||||
QObject { parent },
|
||||
m_readTimer ( new QTimer( this ) ),
|
||||
m_writeTimer( new QTimer( this ) )
|
||||
{
|
||||
m_readTimer->setSingleShot( true );
|
||||
m_writeTimer->setSingleShot( true );
|
||||
m_readTimer->setInterval( rwTimeout() );
|
||||
m_writeTimer->setInterval( rwTimeout() );
|
||||
connect( m_readTimer, &QTimer::timeout, this, [this]() {
|
||||
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kRead ) != 0 ) )
|
||||
{
|
||||
m_connectionFlags -= kRead;
|
||||
Q_EMIT connectionStateChanged( m_connectionFlags );
|
||||
}
|
||||
} );
|
||||
connect( m_writeTimer, &QTimer::timeout, this, [this]() {
|
||||
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kWrite ) != 0 ) )
|
||||
{
|
||||
m_connectionFlags -= kWrite;
|
||||
Q_EMIT connectionStateChanged( m_connectionFlags );
|
||||
}
|
||||
} );
|
||||
}
|
||||
|
||||
|
||||
MYXLIB_INLINE int Base::rwTimeout() const
|
||||
{
|
||||
return( kRWTimeout );
|
||||
}
|
||||
|
||||
|
||||
MYXLIB_INLINE QString Base::host() const
|
||||
{
|
||||
return( m_host );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Задаёт адрес хоста
|
||||
* \param host Адрес
|
||||
*/
|
||||
MYXLIB_INLINE void Base::setHost( const QString& host )
|
||||
{
|
||||
m_host = host;
|
||||
}
|
||||
|
||||
|
||||
MYXLIB_INLINE quint16 Base::port() const
|
||||
{
|
||||
return( m_port );
|
||||
}
|
||||
|
||||
|
||||
MYXLIB_INLINE void Base::setPort( quint16 port )
|
||||
{
|
||||
m_port = port;
|
||||
}
|
||||
|
||||
|
||||
MYXLIB_INLINE int Base::connectionFlags() const
|
||||
{
|
||||
return( m_connectionFlags );
|
||||
}
|
||||
|
||||
|
||||
MYXLIB_INLINE void Base::setConnectionFlags( int connectionFlags )
|
||||
{
|
||||
m_connectionFlags = connectionFlags;
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Возвращает признак установленного соединения
|
||||
* \return Признак установленного соединения
|
||||
*/
|
||||
MYXLIB_INLINE bool Base::isConnected() const
|
||||
{
|
||||
return( ( m_connectionFlags & kConnected ) != 0 );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Добавляет к состоянию флаг Read
|
||||
*/
|
||||
MYXLIB_INLINE void Base::onReadyRead()
|
||||
{
|
||||
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kRead ) == 0 ) )
|
||||
{
|
||||
m_connectionFlags += kRead;
|
||||
Q_EMIT connectionStateChanged( m_connectionFlags );
|
||||
}
|
||||
m_readTimer->start();
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Добавляет к состоянию флаг Write
|
||||
*/
|
||||
MYXLIB_INLINE void Base::onBytesWritten()
|
||||
{
|
||||
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kWrite ) == 0 ) )
|
||||
{
|
||||
m_connectionFlags += kWrite;
|
||||
Q_EMIT connectionStateChanged( m_connectionFlags );
|
||||
}
|
||||
m_writeTimer->start();
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Выставляет флаг ошибки соединения
|
||||
*/
|
||||
MYXLIB_INLINE void Base::onSocketError()
|
||||
{
|
||||
Q_EMIT connectionStateChanged( m_connectionFlags = kError );
|
||||
}
|
||||
|
||||
} // namespace redis
|
||||
|
||||
} // namespace myx
|
||||
|
||||
#endif // ifndef MYX_REDIS_BASE_INL_HPP_
|
@ -1,5 +0,0 @@
|
||||
#ifndef MYXLIB_BUILD_LIBRARIES
|
||||
#error Define MYXLIB_BUILD_LIBRARIES to compile this file.
|
||||
#endif
|
||||
|
||||
#include <myx/redis/base-inl.hpp>
|
@ -1,83 +0,0 @@
|
||||
#ifndef MYX_REDIS_BASE_HPP_
|
||||
#define MYX_REDIS_BASE_HPP_
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <myx/base/config.hpp>
|
||||
|
||||
#include <QHash>
|
||||
#include <QObject>
|
||||
|
||||
QT_FORWARD_DECLARE_CLASS( QTimer )
|
||||
QT_FORWARD_DECLARE_CLASS( QTcpSocket )
|
||||
|
||||
namespace myx {
|
||||
|
||||
namespace redis {
|
||||
|
||||
/*!
|
||||
* \brief Базовый класс для объектов взаимодействия с redis
|
||||
*/
|
||||
class Base : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
/*!
|
||||
* \brief Состояние соединения
|
||||
*/
|
||||
enum ConnectionStateFlags
|
||||
{
|
||||
kDisconnected = 1, //!< Соединение не установлено
|
||||
kConnecting = 2, //!< Соединение устанавливается
|
||||
kConnected = 4, //!< Соединение установлено
|
||||
kRead = 8, //!< Есть данные для чтения
|
||||
kWrite = 16, //!< Есть данные для записи
|
||||
kError = 32, //!< Ошибка соединения
|
||||
};
|
||||
|
||||
explicit Base( QObject* parent = nullptr );
|
||||
|
||||
/*!
|
||||
* \brief Сигнализирует об изменении состояния соединения
|
||||
* \param flags Флаги состояния
|
||||
*/
|
||||
Q_SIGNAL void connectionStateChanged( int flags );
|
||||
|
||||
bool isConnected() const;
|
||||
|
||||
private:
|
||||
static constexpr int kRWTimeout { 200 }; //!< Таймаут на операции чтения и записи
|
||||
int m_connectionFlags { kDisconnected }; //!< Флаги состояния соединения
|
||||
quint16 m_port { 6379 }; //!< Номер порта подключения к БД
|
||||
QString m_host { QStringLiteral( "127.0.0.1" ) }; //!< Имя хоста подключения к БД
|
||||
QTimer* m_readTimer; //!< Таймер чтения данных
|
||||
QTimer* m_writeTimer; //!< Таймер записи данных
|
||||
|
||||
protected:
|
||||
|
||||
int rwTimeout() const;
|
||||
|
||||
QString host() const;
|
||||
void setHost( const QString& host );
|
||||
|
||||
quint16 port() const;
|
||||
void setPort( quint16 port );
|
||||
|
||||
int connectionFlags() const;
|
||||
void setConnectionFlags( int connectionFlags );
|
||||
|
||||
Q_SLOT void onReadyRead();
|
||||
Q_SLOT void onBytesWritten();
|
||||
Q_SLOT void onSocketError();
|
||||
}; // class Base
|
||||
|
||||
} // namespace redis
|
||||
|
||||
} // namespace myx
|
||||
|
||||
#ifdef MYXLIB_HEADER_ONLY
|
||||
#include "base-inl.hpp"
|
||||
#endif
|
||||
|
||||
#endif // ifndef MYX_REDIS_BASE_HPP_
|
File diff suppressed because it is too large
Load Diff
@ -1,5 +0,0 @@
|
||||
#ifndef MYXLIB_BUILD_LIBRARIES
|
||||
#error Define MYXLIB_BUILD_LIBRARIES to compile this file.
|
||||
#endif
|
||||
|
||||
#include <myx/redis/client-inl.hpp>
|
@ -1,157 +0,0 @@
|
||||
#ifndef MYX_REDIS_CLIENT_HPP_
|
||||
#define MYX_REDIS_CLIENT_HPP_
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <myx/redis/base.hpp>
|
||||
|
||||
#include <QVariant>
|
||||
#include <QJsonArray>
|
||||
#include <QJsonObject>
|
||||
#include <unordered_set>
|
||||
|
||||
// QT_FORWARD_DECLARE_CLASS( QTcpSocket )
|
||||
|
||||
namespace myx {
|
||||
|
||||
namespace redis {
|
||||
|
||||
/*!
|
||||
* \brief Реализует клиентскую часть в соответствии с RESP
|
||||
*/
|
||||
class Client : public Base
|
||||
{
|
||||
Q_OBJECT
|
||||
public:
|
||||
Client( int database = 0, QObject* parent = nullptr );
|
||||
virtual ~Client();
|
||||
int database() const;
|
||||
int exists( const QStringList& keys );
|
||||
void select( int database );
|
||||
bool flushdb();
|
||||
bool del( const QStringList& keys );
|
||||
bool expire( const QString& key, int secs );
|
||||
bool pexpire( const QString& key, int msecs );
|
||||
bool persist( const QString& key );
|
||||
bool publish( const QString& channel, const QByteArray& message );
|
||||
static bool publish( const QString& channel, const QByteArray& message,
|
||||
const QString& host, quint16 port = 6379 );
|
||||
bool set( const QString& key, const QVariant& value );
|
||||
static bool set( const QString& key, const QVariant& value,
|
||||
int database, const QString& host = "127.0.0.1", quint16 port = 6379 );
|
||||
QByteArray scriptLoad( const QByteArray& script );
|
||||
QByteArray get( const QString& key );
|
||||
template< typename T >
|
||||
T get( const QString& key );
|
||||
static QByteArray get( const QString& key, int database,
|
||||
const QString& host = "127.0.0.1", quint16 port = 6379 );
|
||||
template< typename T >
|
||||
static T get( const QString& key, int database,
|
||||
const QString& host = "127.0.0.1", quint16 port = 6379 );
|
||||
bool hdel( const QString& redisKey, const QStringList& hashKeys );
|
||||
bool hset( const QString& key, const QString& field, const QVariant& value );
|
||||
bool hmset( const QString& key, const QVariantHash& hash );
|
||||
static bool hmset( const QString& key, const QVariantHash& hash, int database,
|
||||
const QString& host = "127.0.0.1", quint16 port = 6379 );
|
||||
QVariantHash hgetall( const QString& key );
|
||||
static QVariantHash hgetall( const QString& key, int database,
|
||||
const QString& host = "127.0.0.1", quint16 port = 6379 );
|
||||
QByteArray hget( const QString& redisKey, const QString& hashKey );
|
||||
static QByteArray hget( const QString& redisKey, const QString& hashKey,
|
||||
int database, const QString& host = "127.0.0.1", quint16 port = 6379 );
|
||||
bool rpush( const QString& key, const QVariantList& values );
|
||||
bool lpush( const QString& key, const QVariantList& values );
|
||||
bool lset( const QString& key, int index, const QVariant& value );
|
||||
bool lrem( const QString& key, const QVariant& value, int count = 1 );
|
||||
bool ltrim( const QString& key, int from, int to );
|
||||
int llen( const QString& key );
|
||||
QVariantList lrange( const QString& key, int from, int to );
|
||||
template< typename T >
|
||||
std::vector< T > lrange( const QString& key );
|
||||
static QByteArrayList lrange( const QString& key, int from, int to, int database,
|
||||
const QString& host = "127.0.0.1", quint16 port = 6379 );
|
||||
template< typename T >
|
||||
static std::vector< T > lrange( const QString& key, int from, int to, int database,
|
||||
const QString& host = "127.0.0.1", quint16 port = 6379 );
|
||||
bool sadd( const QString& key, const QVariantList& values );
|
||||
bool srem( const QString& key, const QVariantList& values );
|
||||
bool sismember( const QString& key, const QVariant& value );
|
||||
static bool sismember( const QString& key, const QVariant& value, int database,
|
||||
const QString& host = "127.0.0.1", quint16 port = 6379 );
|
||||
int scard( const QString& key );
|
||||
QVariantList smembers( const QString& key );
|
||||
template< typename T >
|
||||
std::vector< T > smembers( const QString& key );
|
||||
QByteArray rpop( const QString& key );
|
||||
QByteArray lindex( const QString& key, int index );
|
||||
QStringList keys( const QString& pattern = {} );
|
||||
static QStringList keys( const QString& pattern, int database,
|
||||
const QString& host, quint16 port = 6379 );
|
||||
template< typename T >
|
||||
T evalSha( const QByteArray& sha, const QVariantList& keys, const QVariantList& argv );
|
||||
template< typename T >
|
||||
static T evalSha( const QByteArray& sha, const QVariantList& keys, const QVariantList& argv,
|
||||
int database, const QString& host, quint16 port = 6379 );
|
||||
QByteArrayList evalSha( const QByteArray& sha, const QVariantList& keys,
|
||||
const QVariantList& argv );
|
||||
static QByteArrayList evalSha( const QByteArray& sha, const QVariantList& keys,
|
||||
const QVariantList& argv,
|
||||
int database, const QString& host, quint16 port = 6379 );
|
||||
|
||||
private:
|
||||
/*!
|
||||
* \brief Класс для синхронизации порядка чтения/записи в poolSocket;
|
||||
*/
|
||||
struct Session
|
||||
{
|
||||
Session( uint& count );
|
||||
~Session();
|
||||
|
||||
private:
|
||||
uint& _count; //!< Счетчик открытых сессий
|
||||
};
|
||||
|
||||
int _database; //!< Номер БД в redis
|
||||
uint _sessionCount { 0 }; //!< Счетчик открытых сессий
|
||||
std::vector< QTcpSocket* > _session; //!< Сокеты для синхронных сессий чтения/записи
|
||||
/*!
|
||||
* \brief Содержит TCP соединения, связанные с БД, отличной от текущей
|
||||
*/
|
||||
std::unordered_set< QTcpSocket* > _outOfDatabase;
|
||||
|
||||
static bool setupSocket( QTcpSocket* socket, int database, const QString& host, quint16 port );
|
||||
static bool select( QTcpSocket* socket, int database );
|
||||
QTcpSocket* session( int timeout );
|
||||
}; // class Client
|
||||
|
||||
template<>
|
||||
QVariant Client::evalSha< QVariant >( const QByteArray& sha, const QVariantList& keys,
|
||||
const QVariantList& argv );
|
||||
template<>
|
||||
QVariantList Client::evalSha< QVariantList >( const QByteArray& sha, const QVariantList& keys,
|
||||
const QVariantList& argv );
|
||||
template<>
|
||||
QVariantHash Client::evalSha< QVariantHash >( const QByteArray& sha, const QVariantList& keys,
|
||||
const QVariantList& argv );
|
||||
template<>
|
||||
QJsonArray Client::evalSha< QJsonArray >( const QByteArray& sha, const QVariantList& keys,
|
||||
const QVariantList& argv );
|
||||
template<>
|
||||
QJsonArray Client::evalSha< QJsonArray >( const QByteArray& sha, const QVariantList& keys,
|
||||
const QVariantList& argv,
|
||||
int database, const QString& host, quint16 port );
|
||||
template<>
|
||||
std::vector< QJsonObject > Client::lrange< QJsonObject >( const QString & key, int from, int to,
|
||||
int database, const QString & host, quint16 );
|
||||
|
||||
} // namespace redis
|
||||
|
||||
} // namespace myx
|
||||
|
||||
#ifdef MYXLIB_HEADER_ONLY
|
||||
#include "client-inl.hpp"
|
||||
#endif
|
||||
|
||||
// #include "client_redis.tcc"
|
||||
|
||||
#endif // ifndef MYX_REDIS_CLIENT_HPP_
|
@ -1,250 +0,0 @@
|
||||
#ifndef MYX_REDIS_CONTAINERS_INL_HPP_
|
||||
#define MYX_REDIS_CONTAINERS_INL_HPP_
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifndef MYXLIB_HEADER_ONLY
|
||||
#include <myx/redis/containers.hpp>
|
||||
#endif
|
||||
|
||||
#include <QDebug>
|
||||
#include <QHash>
|
||||
|
||||
namespace myx {
|
||||
|
||||
namespace redis {
|
||||
|
||||
namespace constants {
|
||||
|
||||
static const QByteArray Separator { "\r\n" }; //!< Строковый разделитель слов во фразе
|
||||
|
||||
} // namespace constants
|
||||
|
||||
namespace C = constants;
|
||||
|
||||
/*!
|
||||
* \brief Создаёт часть команды
|
||||
* \param value Значение для создании части команды
|
||||
* \return Часть команды
|
||||
*/
|
||||
MYXLIB_INLINE QByteArray part( const QVariant& value )
|
||||
{
|
||||
auto bytes = value.toByteArray();
|
||||
return( "$" + QByteArray::number( bytes.length() ) + C::Separator + bytes );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Составляет RESP массив
|
||||
* \param parts Части для составления массива
|
||||
* \return RESP массив
|
||||
*/
|
||||
MYXLIB_INLINE QByteArray array( const QVariantList& parts )
|
||||
{
|
||||
QByteArrayList data;
|
||||
std::transform( parts.cbegin(), parts.cend(), std::back_inserter( data ),
|
||||
[]( const QVariant& iter ) { return( part( iter ) ); } );
|
||||
data.prepend( "*" + QByteArray::number( data.size() ) );
|
||||
return( data.join( C::Separator ) + C::Separator );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Разбивает буфер данных на массив частей
|
||||
* \param buffer Входной буфер данных
|
||||
* \param splitLength Длина разобранной части буфера
|
||||
* \return Массив частей-строк
|
||||
*/
|
||||
MYXLIB_INLINE QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
||||
{
|
||||
enum Token
|
||||
{
|
||||
kUndefined,
|
||||
kFetchNumber,
|
||||
kFetchPart,
|
||||
};
|
||||
|
||||
if ( ( buffer == "$-1\r\n" ) || ( buffer == "$*0\r\n" ) || buffer.isEmpty() )
|
||||
{
|
||||
if ( splitLength != nullptr )
|
||||
{
|
||||
*splitLength = buffer.length();
|
||||
}
|
||||
return {};
|
||||
}
|
||||
if ( buffer.startsWith( "-WRONG" ) )
|
||||
{
|
||||
if ( splitLength != nullptr )
|
||||
{
|
||||
*splitLength = buffer.endsWith( "\r\n" ) ? buffer.length() : 0;
|
||||
}
|
||||
return {};
|
||||
}
|
||||
if ( splitLength != nullptr )
|
||||
{
|
||||
*splitLength = 0;
|
||||
}
|
||||
if ( !buffer.endsWith( C::Separator ) )
|
||||
{
|
||||
return {};
|
||||
}
|
||||
int size { 1 };
|
||||
int pos { 0 };
|
||||
QByteArray partLength;
|
||||
if ( buffer[pos] == '*' )
|
||||
{
|
||||
pos++;
|
||||
QByteArray text;
|
||||
while ( QChar( buffer[pos] ).isNumber() && pos < buffer.size() )
|
||||
{
|
||||
text += buffer[pos++];
|
||||
}
|
||||
if ( pos == buffer.size() )
|
||||
{
|
||||
return {};
|
||||
}
|
||||
size = text.toInt();
|
||||
pos += C::Separator.length();
|
||||
}
|
||||
QByteArrayList parts;
|
||||
Token token { kUndefined };
|
||||
while ( parts.size() < size && pos < buffer.length() )
|
||||
{
|
||||
switch ( token )
|
||||
{
|
||||
case kUndefined:
|
||||
switch ( buffer[pos] )
|
||||
{
|
||||
case '$':
|
||||
partLength.clear();
|
||||
token = kFetchPart;
|
||||
pos++;
|
||||
break;
|
||||
case ':':
|
||||
partLength.clear();
|
||||
token = kFetchNumber;
|
||||
pos++;
|
||||
break;
|
||||
default:
|
||||
return {};
|
||||
}
|
||||
break;
|
||||
case kFetchNumber:
|
||||
if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) )
|
||||
{
|
||||
partLength += buffer[pos];
|
||||
pos++;
|
||||
}
|
||||
else
|
||||
{
|
||||
parts.push_back( partLength );
|
||||
token = kUndefined;
|
||||
pos += C::Separator.length();
|
||||
}
|
||||
break;
|
||||
case kFetchPart:
|
||||
{
|
||||
if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) )
|
||||
{
|
||||
partLength += buffer[pos];
|
||||
pos++;
|
||||
}
|
||||
else
|
||||
{
|
||||
int length = partLength.toInt();
|
||||
if ( length < 0 )
|
||||
{
|
||||
parts.push_back( "" );
|
||||
pos += C::Separator.length();
|
||||
}
|
||||
else
|
||||
{
|
||||
pos += C::Separator.length();
|
||||
if ( pos + length > buffer.length() )
|
||||
{
|
||||
return {};
|
||||
}
|
||||
parts.push_back( buffer.mid( pos, length ) );
|
||||
pos += length + C::Separator.length();
|
||||
}
|
||||
token = kUndefined;
|
||||
}
|
||||
break;
|
||||
}
|
||||
} // switch
|
||||
}
|
||||
// может быть ситуация, когда в буфер не дочитан последний \r\n,
|
||||
// но все части были вычитаны
|
||||
if ( ( parts.size() == size ) && ( pos <= buffer.length() ) )
|
||||
{
|
||||
if ( splitLength != nullptr )
|
||||
{
|
||||
*splitLength = pos;
|
||||
}
|
||||
return( parts );
|
||||
}
|
||||
return {};
|
||||
} // split
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Запаковывает сообщение для передачи через систему pubsub
|
||||
* \param hash Таблица значений
|
||||
* \return Запакованное сообщение
|
||||
*/
|
||||
MYXLIB_INLINE QByteArray pack_hash( const QVariantHash& hash )
|
||||
{
|
||||
QVariantList parts;
|
||||
for ( auto iter = hash.cbegin(); iter != hash.cend(); ++iter )
|
||||
{
|
||||
parts << iter.key() << iter.value();
|
||||
}
|
||||
return( array( parts ) );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Распаковывает кодограмму в таблицу значений
|
||||
* \param message Сообщение
|
||||
* \return Таблица значений
|
||||
*/
|
||||
MYXLIB_INLINE QVariantHash unpack_hash( const QByteArray& message )
|
||||
{
|
||||
auto parts = split( message );
|
||||
QVariantHash hash;
|
||||
for ( int i = 1; i < parts.size(); i += 2 )
|
||||
{
|
||||
hash[parts[i - 1]] = parts[i];
|
||||
}
|
||||
return( hash );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Запаковывает сообщение для передачи через систему pubsub
|
||||
* \param hash Список значений
|
||||
* \return Запакованное сообщение
|
||||
*/
|
||||
MYXLIB_INLINE QByteArray pack_list( const QVariantList& list )
|
||||
{
|
||||
return( array( list ) );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Распаковывает кодограмму в список значений
|
||||
* \param message Сообщение
|
||||
* \return Список значений
|
||||
*/
|
||||
MYXLIB_INLINE QVariantList unpack_list( const QByteArray& message )
|
||||
{
|
||||
auto data = split( message );
|
||||
QVariantList list;
|
||||
std::copy( data.begin(), data.end(), std::back_inserter( list ) );
|
||||
return( list );
|
||||
}
|
||||
|
||||
} // namespace redis
|
||||
|
||||
} // namespace myx
|
||||
#endif // ifndef MYX_REDIS_CONTAINERS_INL_HPP_
|
@ -1,5 +0,0 @@
|
||||
#ifndef MYXLIB_BUILD_LIBRARIES
|
||||
#error Define MYXLIB_BUILD_LIBRARIES to compile this file.
|
||||
#endif
|
||||
|
||||
#include <myx/redis/containers-inl.hpp>
|
@ -1,100 +0,0 @@
|
||||
#ifndef MYX_REDIS_CONTAINERS_HPP_
|
||||
#define MYX_REDIS_CONTAINERS_HPP_
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <myx/base/config.hpp>
|
||||
|
||||
#include <QByteArrayList>
|
||||
#include <QVariantList>
|
||||
|
||||
namespace myx {
|
||||
|
||||
namespace redis {
|
||||
|
||||
QByteArray array( const QVariantList& parts );
|
||||
QByteArrayList split( const QByteArray& buffer, int* splitLength = nullptr );
|
||||
// QByteArray part( const QVariant& value );
|
||||
// QByteArray packHash( const QVariantHash& hash );
|
||||
// QVariantHash unpackHash( const QByteArray& message );
|
||||
// QByteArray packList( QByteArrayList list );
|
||||
// QByteArray packList( const QVariantList& list );
|
||||
QVariantList unpack_list( const QByteArray& message );
|
||||
// template< typename T >
|
||||
// QByteArray packPair( T first, T second );
|
||||
// template< typename T1, typename T2 >
|
||||
// QByteArray packPair( T1 first, T2 second );
|
||||
// template< typename T >
|
||||
// std::pair< T, T > unpackPair( const QByteArray& message );
|
||||
// template< typename T1, typename T2 >
|
||||
// std::pair< T1, T2 > unpackPair( const QByteArray& message );
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Запаковывает пару значений в сообщение
|
||||
* \param first Первое значение
|
||||
* \param second Второе значение
|
||||
* \return Текстовое сообщение
|
||||
*/
|
||||
template< typename T >
|
||||
QByteArray pack_pair( T first, T second )
|
||||
{
|
||||
return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Запаковывает пару значений в сообщение
|
||||
* \param first Первое значение
|
||||
* \param second Второе значение
|
||||
* \return Текстовое сообщение
|
||||
*/
|
||||
template< typename T1, typename T2 >
|
||||
QByteArray pack_pair( T1 first, T2 second )
|
||||
{
|
||||
return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Распаковывает сообщение в пару однотипных значений
|
||||
* \param pair Текст сообщения
|
||||
* \return Пара значений
|
||||
*/
|
||||
template< typename T >
|
||||
std::pair< T, T > unpack_pair( const QByteArray& message )
|
||||
{
|
||||
auto list = unpack_list( message );
|
||||
if ( list.size() == 2 )
|
||||
{
|
||||
return { QVariant( list.first() ).value< T >(), QVariant( list.last() ).value< T >() };
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Распаковывает сообщение в пару однотипных значений
|
||||
* \param pair Текст сообщения
|
||||
* \return Пара значений
|
||||
*/
|
||||
template< typename T1, typename T2 >
|
||||
std::pair< T1, T2 > unpack_pair( const QByteArray& message )
|
||||
{
|
||||
auto list = unpack_list( message );
|
||||
if ( list.size() == 2 )
|
||||
{
|
||||
return { QVariant( list.first() ).value< T1 >(), QVariant( list.last() ).value< T2 >() };
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
} // namespace redis
|
||||
|
||||
} // namespace myx
|
||||
|
||||
#ifdef MYXLIB_HEADER_ONLY
|
||||
#include "containers-inl.hpp"
|
||||
#endif
|
||||
|
||||
#endif // ifndef MYX_REDIS_CONTAINERS_HPP_
|
@ -1,580 +0,0 @@
|
||||
#ifndef MYX_REDIS_PUBSUB_INL_HPP_
|
||||
#define MYX_REDIS_PUBSUB_INL_HPP_
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifndef MYXLIB_HEADER_ONLY
|
||||
#include <myx/redis/pubsub.hpp>
|
||||
#endif
|
||||
|
||||
#include <myx/redis/containers.hpp>
|
||||
|
||||
#include <QCoreApplication>
|
||||
#include <QDateTime>
|
||||
#include <QTcpSocket>
|
||||
#include <QThread>
|
||||
#include <QTimer>
|
||||
|
||||
namespace myx {
|
||||
|
||||
namespace redis {
|
||||
|
||||
/*!
|
||||
* \brief Конструктор класса
|
||||
* \param parent Родительский объект для m_socket
|
||||
*/
|
||||
MYXLIB_INLINE PubSub::PubSub( QObject* parent ) :
|
||||
Base { parent },
|
||||
m_socket ( new QTcpSocket( this ) ),
|
||||
m_connectionTimer( new QTimer( this ) )
|
||||
|
||||
{
|
||||
connect( m_socket, &QAbstractSocket::stateChanged, this, &PubSub::onSocketStateChanged );
|
||||
connect( m_socket, &QIODevice::readyRead, this, &PubSub::read );
|
||||
connect( m_socket, &QIODevice::readyRead, this, &PubSub::onReadyRead );
|
||||
connect( m_socket, &QIODevice::bytesWritten, this, &PubSub::onBytesWritten );
|
||||
|
||||
connect( m_socket, static_cast< void ( QAbstractSocket::* )( QAbstractSocket::SocketError ) >
|
||||
( &QAbstractSocket::error ), this, [this]() {
|
||||
qDebug() << QDateTime::currentDateTimeUtc().toString( QStringLiteral( "hh:mm:ss:zzz" ) )
|
||||
<< QByteArray( "redis pubsub connection error" ) << m_socket->errorString();
|
||||
} );
|
||||
|
||||
m_connectionTimer->setInterval( connectionTimeout() );
|
||||
m_connectionTimer->setSingleShot( true );
|
||||
connect( m_connectionTimer, &QTimer::timeout, this, [this]() {
|
||||
m_socket->connectToHost( host(), port() );
|
||||
} );
|
||||
}
|
||||
|
||||
|
||||
MYXLIB_INLINE int PubSub::connectionTimeout() const
|
||||
{
|
||||
return( kConnectionTimeout );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Инициирует работу с БД
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::start()
|
||||
{
|
||||
Q_EMIT disconnected();
|
||||
Q_EMIT connectionStateChanged( connectionFlags() );
|
||||
m_socket->connectToHost( host(), port() );
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Завершает работу с БД
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::stop()
|
||||
{
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->disconnectFromHost();
|
||||
while ( m_socket->state() != QAbstractSocket::UnconnectedState )
|
||||
{
|
||||
QCoreApplication::processEvents();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Создает подписку на канал сообщений
|
||||
* \param channel Канал подписки
|
||||
* \param subscriber Объект-подписчик
|
||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::subscribe( const QString& channel, QObject* subscriber, const char* method )
|
||||
{
|
||||
if ( QThread::currentThread() != thread() )
|
||||
{
|
||||
qWarning() << "no redis connection for caller thread";
|
||||
return;
|
||||
}
|
||||
auto iter = m_subscribers.find( channel );
|
||||
if ( iter == m_subscribers.end() )
|
||||
{
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->write( array( { "SUBSCRIBE", channel } ) );
|
||||
}
|
||||
m_subscribers[channel].emplace_back( subscriber, method );
|
||||
}
|
||||
else
|
||||
{
|
||||
auto value = iter.value();
|
||||
auto it = std::find_if( value.begin(), value.end(),
|
||||
[subscriber, method]( std::pair< QObject*, const char* > p ) {
|
||||
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
||||
} );
|
||||
if ( it == iter.value().end() )
|
||||
{
|
||||
iter.value().emplace_back( subscriber, method );
|
||||
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
|
||||
Qt::UniqueConnection );
|
||||
}
|
||||
}
|
||||
} // PubSub::subscribe
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Создает подписку на канал изменения данных
|
||||
* \param channel Канал подписки
|
||||
* \param subscriber Объект-подписчик
|
||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||
* \param database Номер БД
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database )
|
||||
{
|
||||
if ( QThread::currentThread() != thread() )
|
||||
{
|
||||
qWarning() << "no redis connection for caller thread";
|
||||
return;
|
||||
}
|
||||
auto iter = m_subscribers.find( channel );
|
||||
if ( iter == m_subscribers.end() )
|
||||
{
|
||||
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->write( array( { "SUBSCRIBE", channel } ) );
|
||||
}
|
||||
m_subscribers[channel].emplace_back( subscriber, method );
|
||||
}
|
||||
else
|
||||
{
|
||||
auto value = iter.value();
|
||||
auto it = std::find_if( value.begin(), value.end(),
|
||||
[subscriber, method]( std::pair< QObject*, const char* > p ) {
|
||||
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
||||
} );
|
||||
if ( it == iter.value().end() )
|
||||
{
|
||||
iter.value().emplace_back( subscriber, method );
|
||||
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
|
||||
Qt::UniqueConnection );
|
||||
}
|
||||
}
|
||||
} // PubSub::subscribe
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Создает подписку на шаблон канала сообщений
|
||||
* \param channel Шаблон канала подписки
|
||||
* \param subscriber Объект-подписчик
|
||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::psubscribe( const QString& channel, QObject* subscriber, const char* method )
|
||||
{
|
||||
if ( QThread::currentThread() != thread() )
|
||||
{
|
||||
qWarning() << "no redis connection for caller thread";
|
||||
return;
|
||||
}
|
||||
auto iter = m_psubscribers.find( channel );
|
||||
if ( iter == m_psubscribers.end() )
|
||||
{
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->write( array( { "PSUBSCRIBE", channel } ) );
|
||||
}
|
||||
m_psubscribers[channel].emplace_back( subscriber, method );
|
||||
}
|
||||
else
|
||||
{
|
||||
auto value = iter.value();
|
||||
auto it = std::find_if( value.begin(), value.end(),
|
||||
[subscriber, method]( std::pair< QObject*, const char* > p ) {
|
||||
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
||||
} );
|
||||
if ( it == iter.value().end() )
|
||||
{
|
||||
iter.value().emplace_back( subscriber, method );
|
||||
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
|
||||
Qt::UniqueConnection );
|
||||
}
|
||||
}
|
||||
} // PubSub::psubscribe
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Создает подписку на шаблон канала изменения данных
|
||||
* \param channel Шаблон канала подписки
|
||||
* \param subscriber Объект-подписчик
|
||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||
* \param database Номер БД (-1 для всех)
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database )
|
||||
{
|
||||
if ( QThread::currentThread() != thread() )
|
||||
{
|
||||
qWarning() << "no redis connection for caller thread";
|
||||
return;
|
||||
}
|
||||
auto iter = m_psubscribers.find( channel );
|
||||
if ( iter == m_psubscribers.end() )
|
||||
{
|
||||
if ( database == -1 )
|
||||
{
|
||||
channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
||||
}
|
||||
else
|
||||
{
|
||||
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||
}
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->write( array( { "PSUBSCRIBE", channel } ) );
|
||||
}
|
||||
m_psubscribers[channel].emplace_back( subscriber, method );
|
||||
}
|
||||
else
|
||||
{
|
||||
auto value = iter.value();
|
||||
auto it = std::find_if( value.begin(), value.end(),
|
||||
[subscriber, method]( std::pair< QObject*, const char* > p ) {
|
||||
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
||||
} );
|
||||
if ( it == iter.value().end() )
|
||||
{
|
||||
iter.value().emplace_back( subscriber, method );
|
||||
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
|
||||
Qt::UniqueConnection );
|
||||
}
|
||||
}
|
||||
} // PubSub::psubscribe
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Отписывает получателя от канала
|
||||
* \param channel Имя канала
|
||||
* \param subscriber Подписчик на канал
|
||||
* \param method Метод обработки данных
|
||||
* \param keyEvents Признак подписки на изменение данных
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::unsubscribe( const QString& channel, QObject* subscriber, const char* method )
|
||||
{
|
||||
if ( QThread::currentThread() != thread() )
|
||||
{
|
||||
qWarning() << "no redis connection for caller thread";
|
||||
return;
|
||||
}
|
||||
auto iter = m_subscribers.find( channel );
|
||||
if ( iter != m_subscribers.end() )
|
||||
{
|
||||
auto ctx = std::find_if( iter->begin(), iter->end(),
|
||||
[subscriber, method]( std::pair< QObject*, const char* > value ) {
|
||||
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
||||
} );
|
||||
if ( ctx != iter->end() )
|
||||
{
|
||||
iter->erase( ctx );
|
||||
}
|
||||
if ( iter->empty() )
|
||||
{
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->write( array( { "UNSUBSCRIBE", channel } ) );
|
||||
}
|
||||
m_subscribers.remove( channel );
|
||||
}
|
||||
}
|
||||
} // PubSub::unsubscribe
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Отписывает получателя от канала
|
||||
* \param channel Имя канала
|
||||
* \param subscriber Подписчик на канал
|
||||
* \param method Метод обработки данных
|
||||
* \param keyEvents Признак подписки на изменение данных
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database )
|
||||
{
|
||||
if ( QThread::currentThread() != thread() )
|
||||
{
|
||||
qWarning() << "no redis connection for caller thread";
|
||||
return;
|
||||
}
|
||||
if ( database == -1 )
|
||||
{
|
||||
channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
||||
}
|
||||
else
|
||||
{
|
||||
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||
}
|
||||
auto iter = m_subscribers.find( channel );
|
||||
if ( iter != m_subscribers.end() )
|
||||
{
|
||||
auto ctx = std::find_if( iter->begin(), iter->end(),
|
||||
[subscriber, method]( std::pair< QObject*, const char* > value ) {
|
||||
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
||||
} );
|
||||
if ( ctx != iter->end() )
|
||||
{
|
||||
iter->erase( ctx );
|
||||
}
|
||||
if ( iter->empty() )
|
||||
{
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->write( array( { "UNSUBSCRIBE", channel } ) );
|
||||
}
|
||||
m_subscribers.remove( channel );
|
||||
}
|
||||
}
|
||||
} // PubSub::unsubscribe
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Отписывает получателя от шаблона канала
|
||||
* \param channel Шаблон канала
|
||||
* \param subscriber Подписчик на канал
|
||||
* \param method Метод обработки данных
|
||||
* \param keyEvents Признак подписки на изменение данных
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::punsubscribe( const QString& channel, QObject* subscriber,
|
||||
const char* method )
|
||||
{
|
||||
if ( QThread::currentThread() != thread() )
|
||||
{
|
||||
qWarning() << "no redis connection for caller thread";
|
||||
return;
|
||||
}
|
||||
auto iter = m_psubscribers.find( channel );
|
||||
if ( iter != m_psubscribers.end() )
|
||||
{
|
||||
auto ctx = std::find_if( iter->begin(), iter->end(),
|
||||
[subscriber, method]( std::pair< QObject*, const char* > value ) {
|
||||
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
||||
} );
|
||||
if ( ctx != iter->end() )
|
||||
{
|
||||
iter->erase( ctx );
|
||||
}
|
||||
if ( iter->empty() )
|
||||
{
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->write( array( { "PUNSUBSCRIBE", channel } ) );
|
||||
}
|
||||
m_psubscribers.remove( channel );
|
||||
}
|
||||
}
|
||||
} // PubSub::punsubscribe
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Отписывает получателя от шаблона канала
|
||||
* \param channel Шаблон канала
|
||||
* \param subscriber Подписчик на канал
|
||||
* \param method Метод обработки данных
|
||||
* \param keyEvents Признак подписки на изменение данных
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
||||
const char* method, int database )
|
||||
{
|
||||
if ( QThread::currentThread() != thread() )
|
||||
{
|
||||
qWarning() << "no redis connection for caller thread";
|
||||
return;
|
||||
}
|
||||
if ( database == -1 )
|
||||
{
|
||||
channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
||||
}
|
||||
else
|
||||
{
|
||||
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||
}
|
||||
auto iter = m_psubscribers.find( channel );
|
||||
if ( iter != m_psubscribers.end() )
|
||||
{
|
||||
auto ctx = std::find_if( iter->begin(), iter->end(),
|
||||
[subscriber, method]( std::pair< QObject*, const char* > value ) {
|
||||
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
||||
} );
|
||||
if ( ctx != iter->end() )
|
||||
{
|
||||
iter->erase( ctx );
|
||||
}
|
||||
if ( iter->empty() )
|
||||
{
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->write( array( { "PUNSUBSCRIBE", channel } ) );
|
||||
}
|
||||
m_psubscribers.remove( channel );
|
||||
}
|
||||
}
|
||||
} // PubSub::punsubscribe
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Разбирает поступившие от сервера уведомления и вызывает требуемые
|
||||
* метаметоды обработки
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::read()
|
||||
{
|
||||
int splitLength = 0;
|
||||
while ( m_socket->bytesAvailable() > 0 )
|
||||
{
|
||||
m_buffer += m_socket->readAll();
|
||||
while ( !m_buffer.isEmpty() )
|
||||
{
|
||||
auto parts = split( m_buffer, &splitLength );
|
||||
if ( parts.empty() )
|
||||
{
|
||||
break;
|
||||
}
|
||||
if ( ( parts[0] == "pmessage" ) && ( parts.size() == 4 ) )
|
||||
{
|
||||
auto iter = m_psubscribers.find( parts[1] );
|
||||
if ( iter != m_psubscribers.end() )
|
||||
{
|
||||
for ( const auto& i: iter.value() )
|
||||
{
|
||||
QMetaObject::invokeMethod( i.first, i.second,
|
||||
Q_ARG( QString, parts[2] ),
|
||||
Q_ARG( QByteArray, parts[3] ) );
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if ( ( parts[0] == "message" ) && ( parts.size() == 3 ) )
|
||||
{
|
||||
auto iter = m_subscribers.find( parts[1] );
|
||||
if ( iter != m_subscribers.end() )
|
||||
{
|
||||
for ( const auto& i: iter.value() )
|
||||
{
|
||||
QMetaObject::invokeMethod( i.first, i.second,
|
||||
Q_ARG( QString, parts[1] ),
|
||||
Q_ARG( QByteArray, parts[2] ) );
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
m_buffer.remove( 0, splitLength );
|
||||
}
|
||||
}
|
||||
} // PubSub::read
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Обработка изменения состояния сокета
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::onSocketStateChanged()
|
||||
{
|
||||
switch ( m_socket->state() )
|
||||
{
|
||||
case QAbstractSocket::ConnectedState:
|
||||
for ( auto iter = m_psubscribers.cbegin(); iter != m_psubscribers.cend(); ++iter )
|
||||
{
|
||||
m_socket->write( array( { "PSUBSCRIBE", iter.key() } ) );
|
||||
}
|
||||
for ( auto iter = m_subscribers.cbegin(); iter != m_subscribers.cend(); ++iter )
|
||||
{
|
||||
m_socket->write( array( { "SUBSCRIBE", iter.key() } ) );
|
||||
}
|
||||
setConnectionFlags( kConnected );
|
||||
Q_EMIT connectionStateChanged( connectionFlags() );
|
||||
Q_EMIT connected();
|
||||
break;
|
||||
case QAbstractSocket::UnconnectedState:
|
||||
m_buffer.clear();
|
||||
m_connectionTimer->start();
|
||||
if ( !isConnected() )
|
||||
{
|
||||
Q_EMIT disconnected();
|
||||
}
|
||||
setConnectionFlags( kDisconnected );
|
||||
Q_EMIT connectionStateChanged( connectionFlags() );
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
} // switch
|
||||
} // PubSub::onSocketStateChanged
|
||||
|
||||
|
||||
/*!
|
||||
* \brief При удалении подписчика уничтожает все его подписки
|
||||
* \param subscriber Удаленный подписчик
|
||||
*/
|
||||
MYXLIB_INLINE void PubSub::onSubscriberDestroyed( QObject* subscriber )
|
||||
{
|
||||
for ( auto i = m_subscribers.begin(); i != m_subscribers.end(); )
|
||||
{
|
||||
for ( auto j = i.value().begin(); j != i.value().end(); )
|
||||
{
|
||||
if ( j->first == subscriber )
|
||||
{
|
||||
j = i.value().erase( j );
|
||||
}
|
||||
else
|
||||
{
|
||||
j++;
|
||||
}
|
||||
}
|
||||
if ( i.value().empty() )
|
||||
{
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->write( array( { "UNSUBSCRIBE", i.key() } ) );
|
||||
}
|
||||
i = m_subscribers.erase( i );
|
||||
}
|
||||
else
|
||||
{
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
for ( auto i = m_psubscribers.begin(); i != m_psubscribers.end(); )
|
||||
{
|
||||
for ( auto j = i.value().begin(); j != i.value().end(); )
|
||||
{
|
||||
if ( j->first == subscriber )
|
||||
{
|
||||
j = i.value().erase( j );
|
||||
}
|
||||
else
|
||||
{
|
||||
j++;
|
||||
}
|
||||
}
|
||||
if ( i.value().empty() )
|
||||
{
|
||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||
{
|
||||
m_socket->write( array( { "PUNSUBSCRIBE", i.key() } ) );
|
||||
}
|
||||
i = m_psubscribers.erase( i );
|
||||
}
|
||||
else
|
||||
{
|
||||
i++;
|
||||
}
|
||||
}
|
||||
} // PubSub::onSubscriberDestroyed
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Возвращает сокет
|
||||
* \return Сокет
|
||||
*/
|
||||
MYXLIB_INLINE QTcpSocket* PubSub::socket() const
|
||||
{
|
||||
return( m_socket );
|
||||
}
|
||||
|
||||
} // namespace redis
|
||||
|
||||
} // namespace myx
|
||||
|
||||
#endif // ifndef MYX_REDIS_PUBSUB_INL_HPP_
|
@ -1,5 +0,0 @@
|
||||
#ifndef MYXLIB_BUILD_LIBRARIES
|
||||
#error Define MYXLIB_BUILD_LIBRARIES to compile this file.
|
||||
#endif
|
||||
|
||||
#include <myx/redis/pubsub-inl.hpp>
|
@ -1,87 +0,0 @@
|
||||
#ifndef MYX_REDIS_PUBSUB_HPP_
|
||||
#define MYX_REDIS_PUBSUB_HPP_
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <myx/redis/base.hpp>
|
||||
|
||||
#include <QObject>
|
||||
|
||||
QT_FORWARD_DECLARE_CLASS( QTcpSocket )
|
||||
|
||||
namespace myx {
|
||||
|
||||
namespace redis {
|
||||
|
||||
namespace constants {
|
||||
|
||||
/// @brief Приставка для наблюдения за данными
|
||||
static const QString KeySpacePrefix { QStringLiteral( "__key%1__:" ) };
|
||||
|
||||
} // namespace constants
|
||||
|
||||
namespace C = constants;
|
||||
|
||||
/*!
|
||||
* \brief Управляет подпиской и публикацией сообщений
|
||||
*/
|
||||
class PubSub : public Base
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
explicit PubSub( QObject* parent = nullptr );
|
||||
QTcpSocket* socket() const;
|
||||
int connectionTimeout() const;
|
||||
|
||||
Q_SLOT void start();
|
||||
Q_SLOT void stop();
|
||||
Q_INVOKABLE void subscribe( const QString& channel, QObject* subscriber, const char* method );
|
||||
Q_INVOKABLE void subscribe( QString channel, QObject* subscriber,
|
||||
const char* method, int database );
|
||||
Q_INVOKABLE void psubscribe( const QString& channel, QObject* subscriber, const char* method );
|
||||
Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber,
|
||||
const char* method, int database );
|
||||
Q_INVOKABLE void unsubscribe( const QString& channel, QObject* subscriber, const char* method );
|
||||
Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber,
|
||||
const char* method, int database );
|
||||
Q_INVOKABLE void punsubscribe( const QString& channel, QObject* subscriber, const char* method );
|
||||
Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber,
|
||||
const char* method, int database );
|
||||
/*!
|
||||
* \brief Сигнализирует об установке соединения
|
||||
*/
|
||||
Q_SIGNAL void connected();
|
||||
/*!
|
||||
* \brief Сигнализирует о разрыве соединения
|
||||
*/
|
||||
Q_SIGNAL void disconnected();
|
||||
|
||||
private:
|
||||
using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >;
|
||||
|
||||
static constexpr int kConnectionTimeout { 2000 }; //!< Таймаут на сетевые операции
|
||||
QTcpSocket* m_socket;
|
||||
QTimer* m_connectionTimer; //!< Таймер установления сетевого соединения
|
||||
|
||||
/*!
|
||||
* \brief Таблица подписчиков
|
||||
*/
|
||||
QByteArray m_buffer; //!< Буффер прочитанных данных
|
||||
Sub m_subscribers; //!< Обработчики сообщений по каналу подписки
|
||||
Sub m_psubscribers; //!< Обработчики сообщений по шаблону канала подписки
|
||||
|
||||
Q_SLOT void read();
|
||||
Q_SLOT void onSocketStateChanged();
|
||||
Q_SLOT void onSubscriberDestroyed( QObject* subscriber );
|
||||
}; // class PubSub
|
||||
|
||||
} // namespace redis
|
||||
|
||||
} // namespace myx
|
||||
|
||||
#ifdef MYXLIB_HEADER_ONLY
|
||||
#include "pubsub-inl.hpp"
|
||||
#endif
|
||||
|
||||
#endif // ifndef MYX_REDIS_PUBSUB_HPP_
|
Loading…
Reference in New Issue
Block a user