Compare commits
No commits in common. "c1740b2223635d7500e5e034d47a3ca9178d1c5a" and "c321076f9656dd0b263e9a3fce7f394015726d8d" have entirely different histories.
c1740b2223
...
c321076f96
@ -50,7 +50,7 @@ add_subdirectory(src/myx/redis)
|
|||||||
if(MYXLIB_BUILD_EXAMPLES OR MYXLIB_BUILD_EXAMPLES_HO)
|
if(MYXLIB_BUILD_EXAMPLES OR MYXLIB_BUILD_EXAMPLES_HO)
|
||||||
add_subdirectory(examples/filesystem)
|
add_subdirectory(examples/filesystem)
|
||||||
add_subdirectory(examples/qt)
|
add_subdirectory(examples/qt)
|
||||||
add_subdirectory(examples/redis)
|
# add_subdirectory(examples/redis)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
# Документация
|
# Документация
|
||||||
|
@ -60,9 +60,8 @@ endif()
|
|||||||
if(MYXLIB_BUILD_EXAMPLES_HO)
|
if(MYXLIB_BUILD_EXAMPLES_HO)
|
||||||
set(REDIS_LIB_DIR ${CMAKE_SOURCE_DIR}/src/myx/redis)
|
set(REDIS_LIB_DIR ${CMAKE_SOURCE_DIR}/src/myx/redis)
|
||||||
|
|
||||||
set(REDIS_moc_hpp ${REDIS_LIB_DIR}/base.hpp
|
set(REDIS_moc_hpp ${REDIS_LIB_DIR}/client.hpp ${REDIS_LIB_DIR}/lexer.hpp ${REDIS_LIB_DIR}/parser.hpp
|
||||||
${REDIS_LIB_DIR}/containers.hpp
|
${REDIS_LIB_DIR}/request.hpp)
|
||||||
${REDIS_LIB_DIR}/pubsub.hpp)
|
|
||||||
|
|
||||||
qt5_wrap_cpp(REDIS_moc_cpp ${REDIS_moc_hpp})
|
qt5_wrap_cpp(REDIS_moc_cpp ${REDIS_moc_hpp})
|
||||||
|
|
@ -1,19 +1,12 @@
|
|||||||
#include "client.hpp"
|
#include "client.hpp"
|
||||||
|
|
||||||
#include <myx/redis/pubsub.hpp>
|
|
||||||
|
|
||||||
#include <QCoreApplication>
|
#include <QCoreApplication>
|
||||||
#include <QDebug>
|
#include <QDebug>
|
||||||
|
|
||||||
namespace MR = myx::redis;
|
|
||||||
|
|
||||||
int main( int argc, char** argv )
|
int main( int argc, char** argv )
|
||||||
{
|
{
|
||||||
QCoreApplication app( argc, argv );
|
QCoreApplication app( argc, argv );
|
||||||
|
RedisClient client;
|
||||||
MR::PubSub a;
|
|
||||||
RedisClient c( &a );
|
|
||||||
a.start();
|
|
||||||
|
|
||||||
return( QCoreApplication::exec() );
|
return( QCoreApplication::exec() );
|
||||||
} // main
|
} // main
|
89
examples/redis.old/01_client/client.hpp
Normal file
89
examples/redis.old/01_client/client.hpp
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
#include <QObject>
|
||||||
|
#include <QDebug>
|
||||||
|
|
||||||
|
#include <myx/redis/client.hpp>
|
||||||
|
|
||||||
|
namespace MR = myx::redis;
|
||||||
|
|
||||||
|
class RedisClient : public QObject
|
||||||
|
{
|
||||||
|
Q_OBJECT
|
||||||
|
|
||||||
|
MR::Client m_client;
|
||||||
|
MR::Client m_subscribe;
|
||||||
|
MR::Request* m_request;
|
||||||
|
MR::Request* m_channel;
|
||||||
|
|
||||||
|
public:
|
||||||
|
RedisClient( QObject* parent = nullptr ) :
|
||||||
|
QObject( parent )
|
||||||
|
{
|
||||||
|
connect( &m_client, &MR::Client::connected, this, &RedisClient::slotConnected );
|
||||||
|
connect( &m_subscribe, &MR::Client::connected, this, &RedisClient::slotStartSubscribe );
|
||||||
|
m_client.connectToHost( "127.0.0.1" );
|
||||||
|
m_subscribe.connectToHost( "127.0.0.1" );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
virtual ~RedisClient() {}
|
||||||
|
|
||||||
|
Q_SLOT void slotStartSubscribe()
|
||||||
|
{
|
||||||
|
m_channel = m_subscribe.subscribeToChannel( "test" );
|
||||||
|
connect( m_channel, &MR::Request::reply, this, &RedisClient::slotSubscribeTest );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Q_SLOT void slotSubscribeTest( MR::Reply reply )
|
||||||
|
{
|
||||||
|
qDebug() << static_cast< int >( reply.type() );
|
||||||
|
auto& v = reply.value();
|
||||||
|
if ( !v.canConvert< QVariantList >() ) { return; }
|
||||||
|
|
||||||
|
auto l = v.toList();
|
||||||
|
for ( auto& a: l )
|
||||||
|
{
|
||||||
|
qDebug() << a.value< MR::Reply >().value().toByteArray();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Q_SLOT void slotConnected()
|
||||||
|
{
|
||||||
|
m_request = m_client.sendCommand( "PING" );
|
||||||
|
connect( m_request, &MR::Request::reply, this, &RedisClient::slotPong );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Q_SLOT void slotPong( MR::Reply reply )
|
||||||
|
{
|
||||||
|
qDebug() << static_cast< int >( reply.type() );
|
||||||
|
qDebug() << reply.value().toString();
|
||||||
|
m_request->disconnect();
|
||||||
|
m_request->deleteLater();
|
||||||
|
|
||||||
|
m_request = m_client.sendCommand( "SET value 10" );
|
||||||
|
connect( m_request, &MR::Request::reply, this, &RedisClient::slotSetValue );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Q_SLOT void slotSetValue( MR::Reply reply )
|
||||||
|
{
|
||||||
|
qDebug() << static_cast< int >( reply.type() );
|
||||||
|
qDebug() << reply.value().toString();
|
||||||
|
m_request->disconnect();
|
||||||
|
m_request->deleteLater();
|
||||||
|
|
||||||
|
m_request = m_client.sendCommand( "GET value" );
|
||||||
|
connect( m_request, &MR::Request::reply, this, &RedisClient::slotGetValue );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Q_SLOT void slotGetValue( MR::Reply reply )
|
||||||
|
{
|
||||||
|
qDebug() << static_cast< int >( reply.type() );
|
||||||
|
qDebug() << reply.value().toByteArray();
|
||||||
|
m_request->disconnect();
|
||||||
|
m_request->deleteLater();
|
||||||
|
}
|
||||||
|
}; // class RedisClient
|
@ -1,32 +0,0 @@
|
|||||||
#include <QObject>
|
|
||||||
#include <QDebug>
|
|
||||||
|
|
||||||
#include <myx/redis/pubsub.hpp>
|
|
||||||
|
|
||||||
namespace MR = myx::redis;
|
|
||||||
|
|
||||||
class RedisClient : public QObject
|
|
||||||
{
|
|
||||||
Q_OBJECT
|
|
||||||
|
|
||||||
public:
|
|
||||||
explicit RedisClient( MR::PubSub* p, QObject* o = nullptr ) :
|
|
||||||
QObject( o )
|
|
||||||
{
|
|
||||||
p->subscribe( "test", this, "readChannelTest" );
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private:
|
|
||||||
Q_INVOKABLE void readChannelTest( const QString& channel, const QByteArray& message )
|
|
||||||
{
|
|
||||||
qDebug() << channel;
|
|
||||||
qDebug() << message;
|
|
||||||
|
|
||||||
if ( message == "quit" )
|
|
||||||
{
|
|
||||||
qCritical() << message;
|
|
||||||
qApp->quit();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}; // class RedisClient
|
|
@ -15,9 +15,11 @@
|
|||||||
#else
|
#else
|
||||||
#define MYXLIB_API
|
#define MYXLIB_API
|
||||||
#endif
|
#endif
|
||||||
#define MYXLIB_INLINE
|
#define MYXLIB_INLINE inline // NOLINT
|
||||||
#else
|
#else
|
||||||
|
#ifndef MYXLIB_HEADER_ONLY
|
||||||
#define MYXLIB_HEADER_ONLY
|
#define MYXLIB_HEADER_ONLY
|
||||||
|
#endif
|
||||||
#define MYXLIB_API
|
#define MYXLIB_API
|
||||||
#define MYXLIB_INLINE inline
|
#define MYXLIB_INLINE inline
|
||||||
#endif // ifdef MYXLIB_BUILD_LIBRARIES
|
#endif // ifdef MYXLIB_BUILD_LIBRARIES
|
||||||
|
@ -16,12 +16,12 @@ namespace myx {
|
|||||||
namespace redis {
|
namespace redis {
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Создаёт объект класса
|
* \brief Создает объект класса
|
||||||
* \param host Имя хоста сервера
|
* \param host Имя хоста сервера
|
||||||
* \param port Номер порта сервера
|
* \param port Номер порта сервера
|
||||||
* \param parent Родительский объект
|
* \param parent Родительский объект
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE Base::Base( QObject* parent ) :
|
inline Base::Base( QObject* parent ) :
|
||||||
QObject { parent },
|
QObject { parent },
|
||||||
m_readTimer ( new QTimer( this ) ),
|
m_readTimer ( new QTimer( this ) ),
|
||||||
m_writeTimer( new QTimer( this ) )
|
m_writeTimer( new QTimer( this ) )
|
||||||
@ -47,47 +47,47 @@ MYXLIB_INLINE Base::Base( QObject* parent ) :
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MYXLIB_INLINE int Base::rwTimeout() const
|
inline int Base::rwTimeout() const
|
||||||
{
|
{
|
||||||
return( kRWTimeout );
|
return( kRWTimeout );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MYXLIB_INLINE QString Base::host() const
|
inline QString Base::host() const
|
||||||
{
|
{
|
||||||
return( m_host );
|
return( m_host );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Задаёт адрес хоста
|
* \brief Задает адрес хоста
|
||||||
* \param host Адрес
|
* \param host Адрес
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void Base::setHost( const QString& host )
|
inline void Base::setHost( const QString& host )
|
||||||
{
|
{
|
||||||
m_host = host;
|
m_host = host;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MYXLIB_INLINE quint16 Base::port() const
|
inline quint16 Base::port() const
|
||||||
{
|
{
|
||||||
return( m_port );
|
return( m_port );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MYXLIB_INLINE void Base::setPort( quint16 port )
|
inline void Base::setPort( quint16 port )
|
||||||
{
|
{
|
||||||
m_port = port;
|
m_port = port;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MYXLIB_INLINE int Base::connectionFlags() const
|
inline int Base::connectionFlags() const
|
||||||
{
|
{
|
||||||
return( m_connectionFlags );
|
return( m_connectionFlags );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MYXLIB_INLINE void Base::setConnectionFlags( int connectionFlags )
|
inline void Base::setConnectionFlags( int connectionFlags )
|
||||||
{
|
{
|
||||||
m_connectionFlags = connectionFlags;
|
m_connectionFlags = connectionFlags;
|
||||||
}
|
}
|
||||||
@ -97,7 +97,7 @@ MYXLIB_INLINE void Base::setConnectionFlags( int connectionFlags )
|
|||||||
* \brief Возвращает признак установленного соединения
|
* \brief Возвращает признак установленного соединения
|
||||||
* \return Признак установленного соединения
|
* \return Признак установленного соединения
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE bool Base::isConnected() const
|
inline bool Base::isConnected() const
|
||||||
{
|
{
|
||||||
return( ( m_connectionFlags & kConnected ) != 0 );
|
return( ( m_connectionFlags & kConnected ) != 0 );
|
||||||
}
|
}
|
||||||
@ -106,7 +106,7 @@ MYXLIB_INLINE bool Base::isConnected() const
|
|||||||
/*!
|
/*!
|
||||||
* \brief Добавляет к состоянию флаг Read
|
* \brief Добавляет к состоянию флаг Read
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void Base::onReadyRead()
|
inline void Base::onReadyRead()
|
||||||
{
|
{
|
||||||
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kRead ) == 0 ) )
|
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kRead ) == 0 ) )
|
||||||
{
|
{
|
||||||
@ -120,7 +120,7 @@ MYXLIB_INLINE void Base::onReadyRead()
|
|||||||
/*!
|
/*!
|
||||||
* \brief Добавляет к состоянию флаг Read
|
* \brief Добавляет к состоянию флаг Read
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void Base::onBytesWritten()
|
inline void Base::onBytesWritten()
|
||||||
{
|
{
|
||||||
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kWrite ) == 0 ) )
|
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kWrite ) == 0 ) )
|
||||||
{
|
{
|
||||||
@ -134,7 +134,7 @@ MYXLIB_INLINE void Base::onBytesWritten()
|
|||||||
/*!
|
/*!
|
||||||
* \brief Выставляет флаг ошибки соединения
|
* \brief Выставляет флаг ошибки соединения
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void Base::onSocketError()
|
inline void Base::onSocketError()
|
||||||
{
|
{
|
||||||
Q_EMIT connectionStateChanged( m_connectionFlags = kError );
|
Q_EMIT connectionStateChanged( m_connectionFlags = kError );
|
||||||
}
|
}
|
||||||
|
@ -23,11 +23,11 @@ static const QByteArray Separator { "\r\n" }; //!< Строковый разде
|
|||||||
namespace C = constants;
|
namespace C = constants;
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Создаёт часть команды
|
* \brief Создает часть команды
|
||||||
* \param value Значение для создании части команды
|
* \param value Значение для создани части команды
|
||||||
* \return Часть команды
|
* \return Часть команды
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE QByteArray part( const QVariant& value )
|
inline QByteArray part( const QVariant& value )
|
||||||
{
|
{
|
||||||
auto bytes = value.toByteArray();
|
auto bytes = value.toByteArray();
|
||||||
return( "$" + QByteArray::number( bytes.length() ) + C::Separator + bytes );
|
return( "$" + QByteArray::number( bytes.length() ) + C::Separator + bytes );
|
||||||
@ -39,7 +39,7 @@ MYXLIB_INLINE QByteArray part( const QVariant& value )
|
|||||||
* \param parts Части для составления массива
|
* \param parts Части для составления массива
|
||||||
* \return RESP массив
|
* \return RESP массив
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE QByteArray array( const QVariantList& parts )
|
inline QByteArray array( const QVariantList& parts )
|
||||||
{
|
{
|
||||||
QByteArrayList data;
|
QByteArrayList data;
|
||||||
std::transform( parts.cbegin(), parts.cend(), std::back_inserter( data ),
|
std::transform( parts.cbegin(), parts.cend(), std::back_inserter( data ),
|
||||||
@ -52,10 +52,10 @@ MYXLIB_INLINE QByteArray array( const QVariantList& parts )
|
|||||||
/*!
|
/*!
|
||||||
* \brief Разбивает буфер данных на массив частей
|
* \brief Разбивает буфер данных на массив частей
|
||||||
* \param buffer Входной буфер данных
|
* \param buffer Входной буфер данных
|
||||||
* \param splitLength Длина разобранной части буфера
|
* \param splitLength Длина разобранной части буффера
|
||||||
* \return Массив частей-строк
|
* \return Массив частей-строк
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
inline QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
||||||
{
|
{
|
||||||
enum Token
|
enum Token
|
||||||
{
|
{
|
||||||
@ -173,7 +173,7 @@ MYXLIB_INLINE QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
|||||||
}
|
}
|
||||||
} // switch
|
} // switch
|
||||||
}
|
}
|
||||||
// может быть ситуация, когда в буфер не дочитан последний \r\n,
|
// может быть ситуация, когда в буфер недочитан последний \r\n,
|
||||||
// но все части были вычитаны
|
// но все части были вычитаны
|
||||||
if ( ( parts.size() == size ) && ( pos <= buffer.length() ) )
|
if ( ( parts.size() == size ) && ( pos <= buffer.length() ) )
|
||||||
{
|
{
|
||||||
@ -192,7 +192,7 @@ MYXLIB_INLINE QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
|||||||
* \param hash Таблица значений
|
* \param hash Таблица значений
|
||||||
* \return Запакованное сообщение
|
* \return Запакованное сообщение
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE QByteArray pack_hash( const QVariantHash& hash )
|
inline QByteArray pack_hash( const QVariantHash& hash )
|
||||||
{
|
{
|
||||||
QVariantList parts;
|
QVariantList parts;
|
||||||
for ( auto iter = hash.cbegin(); iter != hash.cend(); ++iter )
|
for ( auto iter = hash.cbegin(); iter != hash.cend(); ++iter )
|
||||||
@ -208,7 +208,7 @@ MYXLIB_INLINE QByteArray pack_hash( const QVariantHash& hash )
|
|||||||
* \param message Сообщение
|
* \param message Сообщение
|
||||||
* \return Таблица значений
|
* \return Таблица значений
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE QVariantHash unpack_hash( const QByteArray& message )
|
inline QVariantHash unpack_hash( const QByteArray& message )
|
||||||
{
|
{
|
||||||
auto parts = split( message );
|
auto parts = split( message );
|
||||||
QVariantHash hash;
|
QVariantHash hash;
|
||||||
@ -225,7 +225,7 @@ MYXLIB_INLINE QVariantHash unpack_hash( const QByteArray& message )
|
|||||||
* \param hash Список значений
|
* \param hash Список значений
|
||||||
* \return Запакованное сообщение
|
* \return Запакованное сообщение
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE QByteArray pack_list( const QVariantList& list )
|
inline QByteArray pack_list( const QVariantList& list )
|
||||||
{
|
{
|
||||||
return( array( list ) );
|
return( array( list ) );
|
||||||
}
|
}
|
||||||
@ -236,7 +236,7 @@ MYXLIB_INLINE QByteArray pack_list( const QVariantList& list )
|
|||||||
* \param message Сообщение
|
* \param message Сообщение
|
||||||
* \return Список значений
|
* \return Список значений
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE QVariantList unpack_list( const QByteArray& message )
|
inline QVariantList unpack_list( const QByteArray& message )
|
||||||
{
|
{
|
||||||
auto data = split( message );
|
auto data = split( message );
|
||||||
QVariantList list;
|
QVariantList list;
|
||||||
|
@ -19,11 +19,13 @@ namespace myx {
|
|||||||
|
|
||||||
namespace redis {
|
namespace redis {
|
||||||
|
|
||||||
|
const QString PubSub::kKeySpacePrefix { QStringLiteral( "__key%1__:" ) };
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Конструктор класса
|
* \brief Конструктор класса
|
||||||
* \param parent Родительский объект для m_socket
|
* \param parent Родительский объект для m_socket
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE PubSub::PubSub( QObject* parent ) :
|
inline PubSub::PubSub( QObject* parent ) :
|
||||||
Base { parent },
|
Base { parent },
|
||||||
m_socket ( new QTcpSocket( this ) ),
|
m_socket ( new QTcpSocket( this ) ),
|
||||||
m_connectionTimer( new QTimer( this ) )
|
m_connectionTimer( new QTimer( this ) )
|
||||||
@ -47,7 +49,7 @@ MYXLIB_INLINE PubSub::PubSub( QObject* parent ) :
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MYXLIB_INLINE int PubSub::connectionTimeout() const
|
inline int PubSub::connectionTimeout() const
|
||||||
{
|
{
|
||||||
return( kConnectionTimeout );
|
return( kConnectionTimeout );
|
||||||
}
|
}
|
||||||
@ -56,7 +58,7 @@ MYXLIB_INLINE int PubSub::connectionTimeout() const
|
|||||||
/*!
|
/*!
|
||||||
* \brief Инициирует работу с БД
|
* \brief Инициирует работу с БД
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::start()
|
inline void PubSub::start()
|
||||||
{
|
{
|
||||||
Q_EMIT disconnected();
|
Q_EMIT disconnected();
|
||||||
Q_EMIT connectionStateChanged( connectionFlags() );
|
Q_EMIT connectionStateChanged( connectionFlags() );
|
||||||
@ -67,7 +69,7 @@ MYXLIB_INLINE void PubSub::start()
|
|||||||
/*!
|
/*!
|
||||||
* \brief Завершает работу с БД
|
* \brief Завершает работу с БД
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::stop()
|
inline void PubSub::stop()
|
||||||
{
|
{
|
||||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
@ -81,12 +83,12 @@ MYXLIB_INLINE void PubSub::stop()
|
|||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Создаёт подписку на канал сообщений
|
* \brief Создает подписку на канал сообщений
|
||||||
* \param channel Канал подписки
|
* \param channel Rанал подписки
|
||||||
* \param subscriber Объект-подписчик
|
* \param subscriber Объект-подписчик
|
||||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::subscribe( const QString& channel, QObject* subscriber, const char* method )
|
inline void PubSub::subscribe( const QString& channel, QObject* subscriber, const char* method )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
@ -120,13 +122,13 @@ MYXLIB_INLINE void PubSub::subscribe( const QString& channel, QObject* subscribe
|
|||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Создаёт подписку на канал изменения данных
|
* \brief Создает подписку на канал изменения данных
|
||||||
* \param channel Канал подписки
|
* \param channel Rанал подписки
|
||||||
* \param subscriber Объект-подписчик
|
* \param subscriber Объект-подписчик
|
||||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||||
* \param database Номер БД
|
* \param database Номер БД
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database )
|
inline void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
@ -136,7 +138,7 @@ MYXLIB_INLINE void PubSub::subscribe( QString channel, QObject* subscriber, cons
|
|||||||
auto iter = m_subscribers.find( channel );
|
auto iter = m_subscribers.find( channel );
|
||||||
if ( iter == m_subscribers.end() )
|
if ( iter == m_subscribers.end() )
|
||||||
{
|
{
|
||||||
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
channel.prepend( kKeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
m_socket->write( array( { "SUBSCRIBE", channel } ) );
|
m_socket->write( array( { "SUBSCRIBE", channel } ) );
|
||||||
@ -161,12 +163,12 @@ MYXLIB_INLINE void PubSub::subscribe( QString channel, QObject* subscriber, cons
|
|||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Создаёт подписку на шаблон канала сообщений
|
* \brief Создает подписку на шаблон канала сообщений
|
||||||
* \param channel Шаблон канала подписки
|
* \param channel Шаблон канала подписки
|
||||||
* \param subscriber Объект-подписчик
|
* \param subscriber Объект-подписчик
|
||||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::psubscribe( const QString& channel, QObject* subscriber, const char* method )
|
inline void PubSub::psubscribe( const QString& channel, QObject* subscriber, const char* method )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
@ -200,13 +202,13 @@ MYXLIB_INLINE void PubSub::psubscribe( const QString& channel, QObject* subscrib
|
|||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Создаёт подписку на шаблон канала изменения данных
|
* \brief Создает подписку на шаблон канала изменения данных
|
||||||
* \param channel Шаблон канала подписки
|
* \param channel Шаблон канала подписки
|
||||||
* \param subscriber Объект-подписчик
|
* \param subscriber Объект-подписчик
|
||||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||||
* \param database Номер БД (-1 для всех)
|
* \param database Номер БД (-1 для всех)
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database )
|
inline void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
@ -218,11 +220,11 @@ MYXLIB_INLINE void PubSub::psubscribe( QString channel, QObject* subscriber, con
|
|||||||
{
|
{
|
||||||
if ( database == -1 )
|
if ( database == -1 )
|
||||||
{
|
{
|
||||||
channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
channel.prepend( kKeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
channel.prepend( kKeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
@ -254,7 +256,7 @@ MYXLIB_INLINE void PubSub::psubscribe( QString channel, QObject* subscriber, con
|
|||||||
* \param method Метод обработки данных
|
* \param method Метод обработки данных
|
||||||
* \param keyEvents Признак подписки на изменение данных
|
* \param keyEvents Признак подписки на изменение данных
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::unsubscribe( const QString& channel, QObject* subscriber, const char* method )
|
inline void PubSub::unsubscribe( const QString& channel, QObject* subscriber, const char* method )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
@ -291,7 +293,7 @@ MYXLIB_INLINE void PubSub::unsubscribe( const QString& channel, QObject* subscri
|
|||||||
* \param method Метод обработки данных
|
* \param method Метод обработки данных
|
||||||
* \param keyEvents Признак подписки на изменение данных
|
* \param keyEvents Признак подписки на изменение данных
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database )
|
inline void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
@ -300,11 +302,11 @@ MYXLIB_INLINE void PubSub::unsubscribe( QString channel, QObject* subscriber, co
|
|||||||
}
|
}
|
||||||
if ( database == -1 )
|
if ( database == -1 )
|
||||||
{
|
{
|
||||||
channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
channel.prepend( kKeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
channel.prepend( kKeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
auto iter = m_subscribers.find( channel );
|
auto iter = m_subscribers.find( channel );
|
||||||
if ( iter != m_subscribers.end() )
|
if ( iter != m_subscribers.end() )
|
||||||
@ -336,8 +338,8 @@ MYXLIB_INLINE void PubSub::unsubscribe( QString channel, QObject* subscriber, co
|
|||||||
* \param method Метод обработки данных
|
* \param method Метод обработки данных
|
||||||
* \param keyEvents Признак подписки на изменение данных
|
* \param keyEvents Признак подписки на изменение данных
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::punsubscribe( const QString& channel, QObject* subscriber,
|
inline void PubSub::punsubscribe( const QString& channel, QObject* subscriber,
|
||||||
const char* method )
|
const char* method )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
@ -374,8 +376,8 @@ MYXLIB_INLINE void PubSub::punsubscribe( const QString& channel, QObject* subscr
|
|||||||
* \param method Метод обработки данных
|
* \param method Метод обработки данных
|
||||||
* \param keyEvents Признак подписки на изменение данных
|
* \param keyEvents Признак подписки на изменение данных
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
inline void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
||||||
const char* method, int database )
|
const char* method, int database )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
@ -384,11 +386,11 @@ MYXLIB_INLINE void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
|||||||
}
|
}
|
||||||
if ( database == -1 )
|
if ( database == -1 )
|
||||||
{
|
{
|
||||||
channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
channel.prepend( kKeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
channel.prepend( kKeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
auto iter = m_psubscribers.find( channel );
|
auto iter = m_psubscribers.find( channel );
|
||||||
if ( iter != m_psubscribers.end() )
|
if ( iter != m_psubscribers.end() )
|
||||||
@ -417,7 +419,7 @@ MYXLIB_INLINE void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
|||||||
* \brief Разбирает поступившие от сервера уведомления и вызывает требуемые
|
* \brief Разбирает поступившие от сервера уведомления и вызывает требуемые
|
||||||
* метаметоды обработки
|
* метаметоды обработки
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::read()
|
inline void PubSub::read()
|
||||||
{
|
{
|
||||||
int splitLength = 0;
|
int splitLength = 0;
|
||||||
while ( m_socket->bytesAvailable() > 0 )
|
while ( m_socket->bytesAvailable() > 0 )
|
||||||
@ -468,7 +470,7 @@ MYXLIB_INLINE void PubSub::read()
|
|||||||
/*!
|
/*!
|
||||||
* \brief Обработка изменения состояния сокета
|
* \brief Обработка изменения состояния сокета
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::onSocketStateChanged()
|
inline void PubSub::onSocketStateChanged()
|
||||||
{
|
{
|
||||||
switch ( m_socket->state() )
|
switch ( m_socket->state() )
|
||||||
{
|
{
|
||||||
@ -503,9 +505,9 @@ MYXLIB_INLINE void PubSub::onSocketStateChanged()
|
|||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief При удалении подписчика уничтожает все его подписки
|
* \brief При удалении подписчика уничтожает все его подписки
|
||||||
* \param subscriber Удалённый подписчик
|
* \param subscriber Удаленный подписчик
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE void PubSub::onSubscriberDestroyed( QObject* subscriber )
|
inline void PubSub::onSubscriberDestroyed( QObject* subscriber )
|
||||||
{
|
{
|
||||||
for ( auto i = m_subscribers.begin(); i != m_subscribers.end(); )
|
for ( auto i = m_subscribers.begin(); i != m_subscribers.end(); )
|
||||||
{
|
{
|
||||||
@ -567,7 +569,7 @@ MYXLIB_INLINE void PubSub::onSubscriberDestroyed( QObject* subscriber )
|
|||||||
* \brief Возвращает сокет
|
* \brief Возвращает сокет
|
||||||
* \return Сокет
|
* \return Сокет
|
||||||
*/
|
*/
|
||||||
MYXLIB_INLINE QTcpSocket* PubSub::socket() const
|
inline QTcpSocket* PubSub::socket() const
|
||||||
{
|
{
|
||||||
return( m_socket );
|
return( m_socket );
|
||||||
}
|
}
|
||||||
@ -576,4 +578,5 @@ MYXLIB_INLINE QTcpSocket* PubSub::socket() const
|
|||||||
|
|
||||||
} // namespace myx
|
} // namespace myx
|
||||||
|
|
||||||
|
|
||||||
#endif // ifndef MYX_REDIS_PUBSUB_INL_HPP_
|
#endif // ifndef MYX_REDIS_PUBSUB_INL_HPP_
|
||||||
|
@ -13,18 +13,10 @@ namespace myx {
|
|||||||
|
|
||||||
namespace redis {
|
namespace redis {
|
||||||
|
|
||||||
namespace constants {
|
|
||||||
|
|
||||||
/// @brief Приставка для наблюдения за данными
|
|
||||||
static const QString KeySpacePrefix { QStringLiteral( "__key%1__:" ) };
|
|
||||||
|
|
||||||
} // namespace constants
|
|
||||||
|
|
||||||
namespace C = constants;
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Управляет подпиской и публикацией сообщений
|
* \brief Управляет подпиской и публикацией сообщений
|
||||||
*/
|
*/
|
||||||
|
|
||||||
class PubSub : public Base
|
class PubSub : public Base
|
||||||
{
|
{
|
||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
@ -60,6 +52,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >;
|
using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >;
|
||||||
|
|
||||||
|
static const QString kKeySpacePrefix; //!< Приставка для наблюдения за данными
|
||||||
static constexpr int kConnectionTimeout { 2000 }; //!< Таймаут на сетевые операции
|
static constexpr int kConnectionTimeout { 2000 }; //!< Таймаут на сетевые операции
|
||||||
QTcpSocket* m_socket;
|
QTcpSocket* m_socket;
|
||||||
QTimer* m_connectionTimer; //!< Таймер установления сетевого соединения
|
QTimer* m_connectionTimer; //!< Таймер установления сетевого соединения
|
||||||
|
Loading…
Reference in New Issue
Block a user