Новая библиотека для Redis
This commit is contained in:
parent
37fd015311
commit
5936bf96f0
@ -223,7 +223,7 @@ CheckOptions:
|
|||||||
- key: readability-identifier-naming.ProtectedMemberCase
|
- key: readability-identifier-naming.ProtectedMemberCase
|
||||||
value: camelBack
|
value: camelBack
|
||||||
- key: readability-identifier-naming.ProtectedMemberPrefix
|
- key: readability-identifier-naming.ProtectedMemberPrefix
|
||||||
value: ''
|
value: 'm_'
|
||||||
- key: readability-identifier-naming.ProtectedMemberSuffix
|
- key: readability-identifier-naming.ProtectedMemberSuffix
|
||||||
value: ''
|
value: ''
|
||||||
- key: readability-identifier-naming.ProtectedMethodCase
|
- key: readability-identifier-naming.ProtectedMethodCase
|
||||||
|
@ -17,7 +17,6 @@ set(TRGT_hpp
|
|||||||
${CMAKE_CURRENT_SOURCE_DIR}/base-inl.hpp
|
${CMAKE_CURRENT_SOURCE_DIR}/base-inl.hpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/containers.hpp
|
${CMAKE_CURRENT_SOURCE_DIR}/containers.hpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/containers-inl.hpp
|
${CMAKE_CURRENT_SOURCE_DIR}/containers-inl.hpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/containers.tpp
|
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/pubsub-inl.hpp)
|
${CMAKE_CURRENT_SOURCE_DIR}/pubsub-inl.hpp)
|
||||||
|
|
||||||
set(TRGT_headers ${TRGT_moc_hpp} ${TRGT_hpp})
|
set(TRGT_headers ${TRGT_moc_hpp} ${TRGT_hpp})
|
||||||
|
@ -23,107 +23,147 @@ static const QByteArray Separator { "\r\n" }; //!< Строковый разде
|
|||||||
* \param port Номер порта сервера
|
* \param port Номер порта сервера
|
||||||
* \param parent Родительский объект
|
* \param parent Родительский объект
|
||||||
*/
|
*/
|
||||||
Base::Base( QObject* parent ) :
|
inline Base::Base( QObject* parent ) :
|
||||||
QObject{ parent }
|
QObject { parent },
|
||||||
|
m_readTimer ( new QTimer( this ) ),
|
||||||
|
m_writeTimer( new QTimer( this ) )
|
||||||
{
|
{
|
||||||
_readTimer = new QTimer { this };
|
m_readTimer->setSingleShot( true );
|
||||||
_writeTimer = new QTimer { this };
|
m_writeTimer->setSingleShot( true );
|
||||||
_readTimer->setSingleShot( true );
|
m_readTimer->setInterval( rwTimeout() );
|
||||||
_writeTimer->setSingleShot( true );
|
m_writeTimer->setInterval( rwTimeout() );
|
||||||
_readTimer->setInterval( 100 );
|
connect( m_readTimer, &QTimer::timeout, this, [this]() {
|
||||||
_writeTimer->setInterval( 100 );
|
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kRead ) != 0 ) )
|
||||||
connect( _readTimer, &QTimer::timeout, [this]() {
|
|
||||||
if ( _connectionFlags & Connected && _connectionFlags & Read )
|
|
||||||
{
|
{
|
||||||
_connectionFlags -= Read;
|
m_connectionFlags -= kRead;
|
||||||
emit connectionStateChanged( _connectionFlags );
|
Q_EMIT connectionStateChanged( m_connectionFlags );
|
||||||
}
|
}
|
||||||
} );
|
} );
|
||||||
connect( _writeTimer, &QTimer::timeout, [this]() {
|
connect( m_writeTimer, &QTimer::timeout, this, [this]() {
|
||||||
if ( _connectionFlags & Connected && _connectionFlags & Write )
|
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kWrite ) != 0 ) )
|
||||||
{
|
{
|
||||||
_connectionFlags -= Write;
|
m_connectionFlags -= kWrite;
|
||||||
emit connectionStateChanged( _connectionFlags );
|
Q_EMIT connectionStateChanged( m_connectionFlags );
|
||||||
}
|
}
|
||||||
} );
|
} );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
inline QString Base::host() const
|
||||||
|
{
|
||||||
|
return( m_host );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Задает адрес хоста
|
* \brief Задает адрес хоста
|
||||||
* \param host Адрес
|
* \param host Адрес
|
||||||
*/
|
*/
|
||||||
void Base::setHost( const QString& host )
|
inline void Base::setHost( const QString& host )
|
||||||
{
|
{
|
||||||
_host = host;
|
m_host = host;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
inline quint16 Base::port() const
|
||||||
|
{
|
||||||
|
return( m_port );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
inline void Base::setPort( quint16 port )
|
||||||
|
{
|
||||||
|
m_port = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
inline int Base::connectionFlags() const
|
||||||
|
{
|
||||||
|
return( m_connectionFlags );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
inline void Base::setConnectionFlags( int connectionFlags )
|
||||||
|
{
|
||||||
|
m_connectionFlags = connectionFlags;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Извлекает длину токена из буфера данных
|
* \brief Возвращает признак установленного соединения
|
||||||
* \param pos Позиция в буфере
|
* \return Признак установленного соединения
|
||||||
* \param buffer Буфер данных
|
|
||||||
*/
|
*/
|
||||||
int Base::fetchLength( QByteArray& buffer, int& pos )
|
inline bool Base::isConnected() const
|
||||||
{
|
{
|
||||||
if ( ( buffer[pos] != '$' ) && ( buffer[pos] != '*' ) && ( buffer[pos] != ':' ) )
|
return( ( m_connectionFlags & kConnected ) != 0 );
|
||||||
{
|
}
|
||||||
buffer.clear();
|
|
||||||
pos = 0;
|
|
||||||
return( -1 );
|
/// *!
|
||||||
}
|
// * \brief Извлекает длину токена из буфера данных
|
||||||
pos++;
|
// * \param pos Позиция в буфере
|
||||||
QByteArray text;
|
// * \param buffer Буфер данных
|
||||||
while ( pos < buffer.length() )
|
// */
|
||||||
{
|
// inline int Base::fetchLength( QByteArray& buffer, int& pos )
|
||||||
if ( QChar( buffer[pos] ).isNumber() )
|
// {
|
||||||
{
|
// if ( ( buffer[pos] != '$' ) && ( buffer[pos] != '*' ) && ( buffer[pos] != ':' ) )
|
||||||
text += buffer[pos++];
|
// {
|
||||||
}
|
// buffer.clear();
|
||||||
else
|
// pos = 0;
|
||||||
{
|
// return( -1 );
|
||||||
pos += Separator.length();
|
// }
|
||||||
return( text.toInt() );
|
// pos++;
|
||||||
}
|
// QByteArray text;
|
||||||
}
|
// while ( pos < buffer.length() )
|
||||||
return( -1 );
|
// {
|
||||||
} // Base::fetchLength
|
// if ( QChar( buffer[pos] ).isNumber() )
|
||||||
|
// {
|
||||||
|
// text += buffer[pos++];
|
||||||
|
// }
|
||||||
|
// else
|
||||||
|
// {
|
||||||
|
// pos += Separator.length();
|
||||||
|
// return( text.toInt() );
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// return( -1 );
|
||||||
|
// } // Base::fetchLength
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Добавляет к состоянию флаг Read
|
* \brief Добавляет к состоянию флаг Read
|
||||||
*/
|
*/
|
||||||
void Base::onReadyRead()
|
inline void Base::onReadyRead()
|
||||||
{
|
{
|
||||||
if ( _connectionFlags & Connected && !( _connectionFlags & Read ) )
|
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kRead ) == 0 ) )
|
||||||
{
|
{
|
||||||
_connectionFlags += Read;
|
m_connectionFlags += kRead;
|
||||||
emit connectionStateChanged( _connectionFlags );
|
Q_EMIT connectionStateChanged( m_connectionFlags );
|
||||||
}
|
}
|
||||||
_readTimer->start();
|
m_readTimer->start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Добавляет к состоянию флаг Read
|
* \brief Добавляет к состоянию флаг Read
|
||||||
*/
|
*/
|
||||||
void Base::onBytesWritten()
|
inline void Base::onBytesWritten()
|
||||||
{
|
{
|
||||||
if ( _connectionFlags & Connected && !( _connectionFlags & Write ) )
|
if ( ( ( m_connectionFlags & kConnected ) != 0 ) && ( ( m_connectionFlags & kWrite ) == 0 ) )
|
||||||
{
|
{
|
||||||
_connectionFlags += Write;
|
m_connectionFlags += kWrite;
|
||||||
emit connectionStateChanged( _connectionFlags );
|
Q_EMIT connectionStateChanged( m_connectionFlags );
|
||||||
}
|
}
|
||||||
_writeTimer->start();
|
m_writeTimer->start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Выставляет флаг ошибки соединения
|
* \brief Выставляет флаг ошибки соединения
|
||||||
*/
|
*/
|
||||||
void Base::onSocketError()
|
inline void Base::onSocketError()
|
||||||
{
|
{
|
||||||
emit connectionStateChanged( _connectionFlags = Error );
|
Q_EMIT connectionStateChanged( m_connectionFlags = kError );
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace redis
|
} // namespace redis
|
||||||
|
@ -28,31 +28,47 @@ public:
|
|||||||
*/
|
*/
|
||||||
enum ConnectionStateFlags
|
enum ConnectionStateFlags
|
||||||
{
|
{
|
||||||
Unconnected = 1, //!< Соединение не установлено
|
kDisconnected = 1, //!< Соединение не установлено
|
||||||
Connecting = 2, //!< Соединение устанавливается
|
kConnecting = 2, //!< Соединение устанавливается
|
||||||
Connected = 4, //!< Соединение установлено
|
kConnected = 4, //!< Соединение установлено
|
||||||
Read = 8, //!< Есть данные для чтения
|
kRead = 8, //!< Есть данные для чтения
|
||||||
Write = 16, //!< Есть данные для записи
|
kWrite = 16, //!< Есть данные для записи
|
||||||
Error = 32, //!< Ошибка соединения
|
kError = 32, //!< Ошибка соединения
|
||||||
};
|
};
|
||||||
|
|
||||||
Base( QObject* parent = nullptr );
|
explicit Base( QObject* parent = nullptr );
|
||||||
void setHost( const QString& host );
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Сигнализирует об изменении состояния соединения
|
* \brief Сигнализирует об изменении состояния соединения
|
||||||
* \param flags Флаги состояния
|
* \param flags Флаги состояния
|
||||||
*/
|
*/
|
||||||
Q_SIGNAL void connectionStateChanged( int flags );
|
Q_SIGNAL void connectionStateChanged( int flags );
|
||||||
|
|
||||||
protected:
|
bool isConnected() const;
|
||||||
static constexpr int Timeout { 2000 }; //!< Таймаут на сетевые операции
|
|
||||||
int _connectionFlags { Unconnected }; //!< Флаги состояния соединения
|
|
||||||
quint16 _port { 6379 }; //!< Номер порта подключения к БД
|
|
||||||
QString _host { "127.0.0.1" }; //!< Имя хоста подключения к БД
|
|
||||||
QTimer* _readTimer; //!< Таймер чтения данных
|
|
||||||
QTimer* _writeTimer; //!< Таймер записи данных
|
|
||||||
|
|
||||||
int fetchLength( QByteArray& buffer, int& pos );
|
private:
|
||||||
|
static constexpr int k_RwTimeout { 200 }; //!< Таймаут на операции чтения и записи
|
||||||
|
int m_connectionFlags { kDisconnected }; //!< Флаги состояния соединения
|
||||||
|
quint16 m_port { 6379 }; //!< Номер порта подключения к БД
|
||||||
|
QString m_host { QStringLiteral( "127.0.0.1" ) }; //!< Имя хоста подключения к БД
|
||||||
|
QTimer* m_readTimer; //!< Таймер чтения данных
|
||||||
|
QTimer* m_writeTimer; //!< Таймер записи данных
|
||||||
|
|
||||||
|
protected:
|
||||||
|
|
||||||
|
int connectionTimeout() const;
|
||||||
|
int rwTimeout() const;
|
||||||
|
|
||||||
|
QString host() const;
|
||||||
|
void setHost( const QString& host );
|
||||||
|
|
||||||
|
quint16 port() const;
|
||||||
|
void setPort( quint16 port );
|
||||||
|
|
||||||
|
int connectionFlags() const;
|
||||||
|
void setConnectionFlags( int connectionFlags );
|
||||||
|
|
||||||
|
// int fetchLength( QByteArray& buffer, int& pos );
|
||||||
Q_SLOT void onReadyRead();
|
Q_SLOT void onReadyRead();
|
||||||
Q_SLOT void onBytesWritten();
|
Q_SLOT void onBytesWritten();
|
||||||
Q_SLOT void onSocketError();
|
Q_SLOT void onSocketError();
|
||||||
|
@ -7,8 +7,8 @@
|
|||||||
#include <myx/redis/containers.hpp>
|
#include <myx/redis/containers.hpp>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <QHash>
|
|
||||||
#include <QDebug>
|
#include <QDebug>
|
||||||
|
#include <QHash>
|
||||||
|
|
||||||
namespace myx {
|
namespace myx {
|
||||||
|
|
||||||
@ -19,7 +19,7 @@ namespace redis {
|
|||||||
* \param value Значение для создани части команды
|
* \param value Значение для создани части команды
|
||||||
* \return Часть команды
|
* \return Часть команды
|
||||||
*/
|
*/
|
||||||
QByteArray part( const QVariant& value )
|
inline QByteArray part( const QVariant& value )
|
||||||
{
|
{
|
||||||
auto bytes = value.toByteArray();
|
auto bytes = value.toByteArray();
|
||||||
return( "$" + QByteArray::number( bytes.length() ) + Separator + bytes );
|
return( "$" + QByteArray::number( bytes.length() ) + Separator + bytes );
|
||||||
@ -31,7 +31,7 @@ QByteArray part( const QVariant& value )
|
|||||||
* \param parts Части для составления массива
|
* \param parts Части для составления массива
|
||||||
* \return RESP массив
|
* \return RESP массив
|
||||||
*/
|
*/
|
||||||
QByteArray array( const QVariantList& parts )
|
inline QByteArray array( const QVariantList& parts )
|
||||||
{
|
{
|
||||||
QByteArrayList data;
|
QByteArrayList data;
|
||||||
std::transform( parts.cbegin(), parts.cend(), std::back_inserter( data ),
|
std::transform( parts.cbegin(), parts.cend(), std::back_inserter( data ),
|
||||||
@ -47,13 +47,13 @@ QByteArray array( const QVariantList& parts )
|
|||||||
* \param splitLength Длина разобранной части буффера
|
* \param splitLength Длина разобранной части буффера
|
||||||
* \return Массив частей-строк
|
* \return Массив частей-строк
|
||||||
*/
|
*/
|
||||||
QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
inline QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
||||||
{
|
{
|
||||||
enum Token
|
enum Token
|
||||||
{
|
{
|
||||||
Undefined,
|
kUndefined,
|
||||||
FetchNumber,
|
kFetchNumber,
|
||||||
FetchPart,
|
kFetchPart,
|
||||||
};
|
};
|
||||||
|
|
||||||
if ( ( buffer == "$-1\r\n" ) || ( buffer == "$*0\r\n" ) || buffer.isEmpty() )
|
if ( ( buffer == "$-1\r\n" ) || ( buffer == "$*0\r\n" ) || buffer.isEmpty() )
|
||||||
@ -87,7 +87,10 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
|||||||
{
|
{
|
||||||
pos++;
|
pos++;
|
||||||
QByteArray text;
|
QByteArray text;
|
||||||
while ( QChar( buffer[pos] ).isNumber() && pos < buffer.size() ) text += buffer[pos++];
|
while ( QChar( buffer[pos] ).isNumber() && pos < buffer.size() )
|
||||||
|
{
|
||||||
|
text += buffer[pos++];
|
||||||
|
}
|
||||||
if ( pos == buffer.size() )
|
if ( pos == buffer.size() )
|
||||||
{
|
{
|
||||||
return {};
|
return {};
|
||||||
@ -96,29 +99,29 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
|||||||
pos += Separator.length();
|
pos += Separator.length();
|
||||||
}
|
}
|
||||||
QByteArrayList parts;
|
QByteArrayList parts;
|
||||||
Token token { Undefined };
|
Token token { kUndefined };
|
||||||
while ( parts.size() < size && pos < buffer.length() )
|
while ( parts.size() < size && pos < buffer.length() )
|
||||||
{
|
{
|
||||||
switch ( token )
|
switch ( token )
|
||||||
{
|
{
|
||||||
case Undefined:
|
case kUndefined:
|
||||||
switch ( buffer[pos] )
|
switch ( buffer[pos] )
|
||||||
{
|
{
|
||||||
case '$':
|
case '$':
|
||||||
partLength.clear();
|
partLength.clear();
|
||||||
token = FetchPart;
|
token = kFetchPart;
|
||||||
pos++;
|
pos++;
|
||||||
break;
|
break;
|
||||||
case ':':
|
case ':':
|
||||||
partLength.clear();
|
partLength.clear();
|
||||||
token = FetchNumber;
|
token = kFetchNumber;
|
||||||
pos++;
|
pos++;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case FetchNumber:
|
case kFetchNumber:
|
||||||
if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) )
|
if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) )
|
||||||
{
|
{
|
||||||
partLength += buffer[pos];
|
partLength += buffer[pos];
|
||||||
@ -127,11 +130,11 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
parts.push_back( partLength );
|
parts.push_back( partLength );
|
||||||
token = Undefined;
|
token = kUndefined;
|
||||||
pos += Separator.length();
|
pos += Separator.length();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case FetchPart:
|
case kFetchPart:
|
||||||
{
|
{
|
||||||
if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) )
|
if ( QChar( buffer[pos] ).isNumber() || ( buffer[pos] == '-' ) )
|
||||||
{
|
{
|
||||||
@ -156,7 +159,7 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
|||||||
parts.push_back( buffer.mid( pos, length ) );
|
parts.push_back( buffer.mid( pos, length ) );
|
||||||
pos += length + Separator.length();
|
pos += length + Separator.length();
|
||||||
}
|
}
|
||||||
token = Undefined;
|
token = kUndefined;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -181,10 +184,13 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength )
|
|||||||
* \param hash Таблица значений
|
* \param hash Таблица значений
|
||||||
* \return Запакованное сообщение
|
* \return Запакованное сообщение
|
||||||
*/
|
*/
|
||||||
QByteArray packHash( const QVariantHash& hash )
|
inline QByteArray pack_hash( const QVariantHash& hash )
|
||||||
{
|
{
|
||||||
QVariantList parts;
|
QVariantList parts;
|
||||||
for ( auto iter = hash.cbegin(); iter != hash.cend(); ++iter ) parts << iter.key() << iter.value();
|
for ( auto iter = hash.cbegin(); iter != hash.cend(); ++iter )
|
||||||
|
{
|
||||||
|
parts << iter.key() << iter.value();
|
||||||
|
}
|
||||||
return( array( parts ) );
|
return( array( parts ) );
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,11 +200,14 @@ QByteArray packHash( const QVariantHash& hash )
|
|||||||
* \param message Сообщение
|
* \param message Сообщение
|
||||||
* \return Таблица значений
|
* \return Таблица значений
|
||||||
*/
|
*/
|
||||||
QVariantHash unpackHash( const QByteArray& message )
|
inline QVariantHash unpack_hash( const QByteArray& message )
|
||||||
{
|
{
|
||||||
auto parts = split( message );
|
auto parts = split( message );
|
||||||
QVariantHash hash;
|
QVariantHash hash;
|
||||||
for ( int i = 1; i < parts.size(); i += 2 ) hash[parts[i - 1]] = parts[i];
|
for ( int i = 1; i < parts.size(); i += 2 )
|
||||||
|
{
|
||||||
|
hash[parts[i - 1]] = parts[i];
|
||||||
|
}
|
||||||
return( hash );
|
return( hash );
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -208,7 +217,7 @@ QVariantHash unpackHash( const QByteArray& message )
|
|||||||
* \param hash Список значений
|
* \param hash Список значений
|
||||||
* \return Запакованное сообщение
|
* \return Запакованное сообщение
|
||||||
*/
|
*/
|
||||||
QByteArray packList( const QVariantList& list )
|
inline QByteArray pack_list( const QVariantList& list )
|
||||||
{
|
{
|
||||||
return( array( list ) );
|
return( array( list ) );
|
||||||
}
|
}
|
||||||
@ -219,7 +228,7 @@ QByteArray packList( const QVariantList& list )
|
|||||||
* \param message Сообщение
|
* \param message Сообщение
|
||||||
* \return Список значений
|
* \return Список значений
|
||||||
*/
|
*/
|
||||||
QVariantList unpackList( const QByteArray& message )
|
inline QVariantList unpack_list( const QByteArray& message )
|
||||||
{
|
{
|
||||||
auto data = split( message );
|
auto data = split( message );
|
||||||
QVariantList list;
|
QVariantList list;
|
||||||
|
@ -21,7 +21,7 @@ QByteArrayList split( const QByteArray& buffer, int* splitLength = nullptr );
|
|||||||
// QVariantHash unpackHash( const QByteArray& message );
|
// QVariantHash unpackHash( const QByteArray& message );
|
||||||
// QByteArray packList( QByteArrayList list );
|
// QByteArray packList( QByteArrayList list );
|
||||||
// QByteArray packList( const QVariantList& list );
|
// QByteArray packList( const QVariantList& list );
|
||||||
QVariantList unpackList( const QByteArray& message );
|
QVariantList unpack_list( const QByteArray& message );
|
||||||
// template< typename T >
|
// template< typename T >
|
||||||
// QByteArray packPair( T first, T second );
|
// QByteArray packPair( T first, T second );
|
||||||
// template< typename T1, typename T2 >
|
// template< typename T1, typename T2 >
|
||||||
@ -31,12 +31,70 @@ QVariantList unpackList( const QByteArray& message );
|
|||||||
// template< typename T1, typename T2 >
|
// template< typename T1, typename T2 >
|
||||||
// std::pair< T1, T2 > unpackPair( const QByteArray& message );
|
// std::pair< T1, T2 > unpackPair( const QByteArray& message );
|
||||||
|
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Запаковывает пару значений в сообщение
|
||||||
|
* \param first Первое значение
|
||||||
|
* \param second Второе значение
|
||||||
|
* \return Текстовое сообщение
|
||||||
|
*/
|
||||||
|
template< typename T >
|
||||||
|
QByteArray pack_pair( T first, T second )
|
||||||
|
{
|
||||||
|
return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Запаковывает пару значений в сообщение
|
||||||
|
* \param first Первое значение
|
||||||
|
* \param second Второе значение
|
||||||
|
* \return Текстовое сообщение
|
||||||
|
*/
|
||||||
|
template< typename T1, typename T2 >
|
||||||
|
QByteArray pack_pair( T1 first, T2 second )
|
||||||
|
{
|
||||||
|
return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Распаковывает сообщение в пару однотипных значений
|
||||||
|
* \param pair Текст сообщения
|
||||||
|
* \return Пара значений
|
||||||
|
*/
|
||||||
|
template< typename T >
|
||||||
|
std::pair< T, T > unpack_pair( const QByteArray& message )
|
||||||
|
{
|
||||||
|
auto list = unpack_list( message );
|
||||||
|
if ( list.size() == 2 )
|
||||||
|
{
|
||||||
|
return { QVariant( list.first() ).value< T >(), QVariant( list.last() ).value< T >() };
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Распаковывает сообщение в пару однотипных значений
|
||||||
|
* \param pair Текст сообщения
|
||||||
|
* \return Пара значений
|
||||||
|
*/
|
||||||
|
template< typename T1, typename T2 >
|
||||||
|
std::pair< T1, T2 > unpack_pair( const QByteArray& message )
|
||||||
|
{
|
||||||
|
auto list = unpack_list( message );
|
||||||
|
if ( list.size() == 2 )
|
||||||
|
{
|
||||||
|
return { QVariant( list.first() ).value< T1 >(), QVariant( list.last() ).value< T2 >() };
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace redis
|
} // namespace redis
|
||||||
|
|
||||||
} // namespace myx
|
} // namespace myx
|
||||||
|
|
||||||
#include <myx/redis/containers.tpp>
|
|
||||||
|
|
||||||
#ifdef MYXLIB_HEADER_ONLY
|
#ifdef MYXLIB_HEADER_ONLY
|
||||||
#include "containers-inl.hpp"
|
#include "containers-inl.hpp"
|
||||||
#endif
|
#endif
|
||||||
|
@ -1,68 +0,0 @@
|
|||||||
#include <QVariant>
|
|
||||||
|
|
||||||
namespace myx {
|
|
||||||
|
|
||||||
namespace redis {
|
|
||||||
|
|
||||||
/*!
|
|
||||||
* \brief Запаковывает пару значений в сообщение
|
|
||||||
* \param first Первое значение
|
|
||||||
* \param second Второе значение
|
|
||||||
* \return Текстовое сообщение
|
|
||||||
*/
|
|
||||||
template< typename T >
|
|
||||||
QByteArray packPair( T first, T second )
|
|
||||||
{
|
|
||||||
return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) );
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*!
|
|
||||||
* \brief Запаковывает пару значений в сообщение
|
|
||||||
* \param first Первое значение
|
|
||||||
* \param second Второе значение
|
|
||||||
* \return Текстовое сообщение
|
|
||||||
*/
|
|
||||||
template< typename T1, typename T2 >
|
|
||||||
QByteArray packPair( T1 first, T2 second )
|
|
||||||
{
|
|
||||||
return( packList( { QVariant::fromValue( first ), QVariant::fromValue( second ) } ) );
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*!
|
|
||||||
* \brief Распаковывает сообщение в пару однотипных значений
|
|
||||||
* \param pair Текст сообщения
|
|
||||||
* \return Пара значений
|
|
||||||
*/
|
|
||||||
template< typename T >
|
|
||||||
std::pair< T, T > unpackPair( const QByteArray& message )
|
|
||||||
{
|
|
||||||
auto list = unpackList( message );
|
|
||||||
if ( list.size() == 2 )
|
|
||||||
{
|
|
||||||
return { QVariant( list.first() ).value< T >(), QVariant( list.last() ).value< T >() };
|
|
||||||
}
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*!
|
|
||||||
* \brief Распаковывает сообщение в пару однотипных значений
|
|
||||||
* \param pair Текст сообщения
|
|
||||||
* \return Пара значений
|
|
||||||
*/
|
|
||||||
template< typename T1, typename T2 >
|
|
||||||
std::pair< T1, T2 > unpackPair( const QByteArray& message )
|
|
||||||
{
|
|
||||||
auto list = unpackList( message );
|
|
||||||
if ( list.size() == 2 )
|
|
||||||
{
|
|
||||||
return { QVariant( list.first() ).value< T1 >(), QVariant( list.last() ).value< T2 >() };
|
|
||||||
}
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace redis
|
|
||||||
|
|
||||||
} // namespace myx
|
|
@ -9,59 +9,69 @@
|
|||||||
|
|
||||||
#include <myx/redis/containers.hpp>
|
#include <myx/redis/containers.hpp>
|
||||||
|
|
||||||
#include <QTimer>
|
#include <QCoreApplication>
|
||||||
#include <QThread>
|
|
||||||
#include <QDateTime>
|
#include <QDateTime>
|
||||||
#include <QTcpSocket>
|
#include <QTcpSocket>
|
||||||
#include <QCoreApplication>
|
#include <QThread>
|
||||||
|
#include <QTimer>
|
||||||
|
|
||||||
namespace myx {
|
namespace myx {
|
||||||
|
|
||||||
namespace redis {
|
namespace redis {
|
||||||
|
|
||||||
const QString PubSub::KeySpacePrefix { "__key%1__:" };
|
const QString PubSub::k_KeySpacePrefix { QStringLiteral( "__key%1__:" ) };
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Конструктор класса
|
* \brief Конструктор класса
|
||||||
* \param parent Родительский объект для _socket
|
* \param parent Родительский объект для m_socket
|
||||||
*/
|
*/
|
||||||
PubSub::PubSub( QObject* parent ) :
|
inline PubSub::PubSub( QObject* parent ) :
|
||||||
Base{ parent }
|
Base { parent },
|
||||||
|
m_socket ( new QTcpSocket( this ) ),
|
||||||
|
m_connectionTimer( new QTimer( this ) )
|
||||||
|
|
||||||
{
|
{
|
||||||
_socket = new QTcpSocket { this };
|
connect( m_socket, &QAbstractSocket::stateChanged, this, &PubSub::onSocketStateChanged );
|
||||||
QObject::connect( _socket, &QAbstractSocket::stateChanged,
|
connect( m_socket, &QIODevice::readyRead, this, &PubSub::onReadyRead );
|
||||||
this, &PubSub::onSocketStateChanged );
|
connect( m_socket, &QIODevice::bytesWritten, this, &PubSub::onBytesWritten );
|
||||||
connect( _socket, &QIODevice::readyRead, this, &PubSub::onReadyRead );
|
connect( m_socket, static_cast< void ( QAbstractSocket::* )( QAbstractSocket::SocketError ) >
|
||||||
connect( _socket, &QIODevice::bytesWritten, this, &PubSub::onBytesWritten );
|
( &QAbstractSocket::error ), this, [this]() {
|
||||||
QObject::connect( _socket, static_cast< void ( QAbstractSocket::* )( QAbstractSocket::SocketError ) >
|
qDebug() << QDateTime::currentDateTimeUtc().toString( QStringLiteral( "hh:mm:ss:zzz" ) )
|
||||||
( &QAbstractSocket::error ), [this]() {
|
<< QByteArray( "redis pubsub connection error" ) << m_socket->errorString();
|
||||||
qDebug() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" )
|
} );
|
||||||
<< QByteArray( "redis pubsub connection error" ) << _socket->errorString();
|
connect( m_socket, &QIODevice::readyRead, this, &PubSub::read );
|
||||||
|
|
||||||
|
m_connectionTimer->setInterval( connectionTimeout() );
|
||||||
|
m_connectionTimer->setSingleShot( true );
|
||||||
|
connect( m_connectionTimer, &QTimer::timeout, this, [this]() {
|
||||||
|
m_socket->connectToHost( host(), port() );
|
||||||
} );
|
} );
|
||||||
QObject::connect( _socket, &QIODevice::readyRead, this, &PubSub::read );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Инициирует работу с БД
|
* \brief Инициирует работу с БД
|
||||||
*/
|
*/
|
||||||
void PubSub::start()
|
inline void PubSub::start()
|
||||||
{
|
{
|
||||||
emit disconnected();
|
Q_EMIT disconnected();
|
||||||
emit connectionStateChanged( _connectionFlags );
|
Q_EMIT connectionStateChanged( connectionFlags() );
|
||||||
_socket->connectToHost( _host, _port );
|
m_socket->connectToHost( host(), port() );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Завершает работу с БД
|
* \brief Завершает работу с БД
|
||||||
*/
|
*/
|
||||||
void PubSub::stop()
|
inline void PubSub::stop()
|
||||||
{
|
{
|
||||||
if ( _socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
_socket->disconnectFromHost();
|
m_socket->disconnectFromHost();
|
||||||
while ( _socket->state() != QAbstractSocket::UnconnectedState ) QCoreApplication::processEvents();
|
while ( m_socket->state() != QAbstractSocket::UnconnectedState )
|
||||||
|
{
|
||||||
|
QCoreApplication::processEvents();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,32 +82,32 @@ void PubSub::stop()
|
|||||||
* \param subscriber Объект-подписчик
|
* \param subscriber Объект-подписчик
|
||||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||||
*/
|
*/
|
||||||
void PubSub::subscribe( QString channel, QObject* subscriber, const char* method )
|
inline void PubSub::subscribe( const QString& channel, QObject* subscriber, const char* method )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
qWarning() << "no redis connection for caller thread";
|
qWarning() << "no redis connection for caller thread";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto iter = _subscribers.find( channel );
|
auto iter = m_subscribers.find( channel );
|
||||||
if ( iter == _subscribers.end() )
|
if ( iter == m_subscribers.end() )
|
||||||
{
|
{
|
||||||
if ( _socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
_socket->write( array( { "SUBSCIBE", channel } ) );
|
m_socket->write( array( { "SUBSCIBE", channel } ) );
|
||||||
}
|
}
|
||||||
_subscribers[channel].push_back( { subscriber, method } );
|
m_subscribers[channel].emplace_back( subscriber, method );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto value = iter.value();
|
auto value = iter.value();
|
||||||
auto it = std::find_if( value.begin(), value.end(),
|
auto it = std::find_if( value.begin(), value.end(),
|
||||||
[subscriber, method]( const std::pair< QObject*, const char* >& p ) {
|
[subscriber, method]( std::pair< QObject*, const char* > p ) {
|
||||||
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
||||||
} );
|
} );
|
||||||
if ( it == iter.value().end() )
|
if ( it == iter.value().end() )
|
||||||
{
|
{
|
||||||
iter.value().push_back( { subscriber, method } );
|
iter.value().emplace_back( subscriber, method );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // PubSub::subscribe
|
} // PubSub::subscribe
|
||||||
@ -110,33 +120,33 @@ void PubSub::subscribe( QString channel, QObject* subscriber, const char* method
|
|||||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||||
* \param database Номер БД
|
* \param database Номер БД
|
||||||
*/
|
*/
|
||||||
void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database )
|
inline void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
qWarning() << "no redis connection for caller thread";
|
qWarning() << "no redis connection for caller thread";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto iter = _subscribers.find( channel );
|
auto iter = m_subscribers.find( channel );
|
||||||
if ( iter == _subscribers.end() )
|
if ( iter == m_subscribers.end() )
|
||||||
{
|
{
|
||||||
channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
channel.prepend( k_KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||||
if ( _socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
_socket->write( array( { "SUBSCRIBE", channel } ) );
|
m_socket->write( array( { "SUBSCRIBE", channel } ) );
|
||||||
}
|
}
|
||||||
_subscribers[channel].push_back( { subscriber, method } );
|
m_subscribers[channel].emplace_back( subscriber, method );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto value = iter.value();
|
auto value = iter.value();
|
||||||
auto it = std::find_if( value.begin(), value.end(),
|
auto it = std::find_if( value.begin(), value.end(),
|
||||||
[subscriber, method]( const std::pair< QObject*, const char* >& p ) {
|
[subscriber, method]( std::pair< QObject*, const char* > p ) {
|
||||||
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
||||||
} );
|
} );
|
||||||
if ( it == iter.value().end() )
|
if ( it == iter.value().end() )
|
||||||
{
|
{
|
||||||
iter.value().push_back( { subscriber, method } );
|
iter.value().emplace_back( subscriber, method );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // PubSub::subscribe
|
} // PubSub::subscribe
|
||||||
@ -148,32 +158,32 @@ void PubSub::subscribe( QString channel, QObject* subscriber, const char* method
|
|||||||
* \param subscriber Объект-подписчик
|
* \param subscriber Объект-подписчик
|
||||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||||
*/
|
*/
|
||||||
void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method )
|
inline void PubSub::psubscribe( const QString& channel, QObject* subscriber, const char* method )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
qWarning() << "no redis connection for caller thread";
|
qWarning() << "no redis connection for caller thread";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto iter = _psubscribers.find( channel );
|
auto iter = m_psubscribers.find( channel );
|
||||||
if ( iter == _psubscribers.end() )
|
if ( iter == m_psubscribers.end() )
|
||||||
{
|
{
|
||||||
if ( _socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
_socket->write( array( { "PSUBSCRIBE", channel } ) );
|
m_socket->write( array( { "PSUBSCRIBE", channel } ) );
|
||||||
}
|
}
|
||||||
_psubscribers[channel].push_back( { subscriber, method } );
|
m_psubscribers[channel].emplace_back( subscriber, method );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto value = iter.value();
|
auto value = iter.value();
|
||||||
auto it = std::find_if( value.begin(), value.end(),
|
auto it = std::find_if( value.begin(), value.end(),
|
||||||
[subscriber, method]( const std::pair< QObject*, const char* >& p ) {
|
[subscriber, method]( std::pair< QObject*, const char* > p ) {
|
||||||
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
||||||
} );
|
} );
|
||||||
if ( it == iter.value().end() )
|
if ( it == iter.value().end() )
|
||||||
{
|
{
|
||||||
iter.value().push_back( { subscriber, method } );
|
iter.value().emplace_back( subscriber, method );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // PubSub::psubscribe
|
} // PubSub::psubscribe
|
||||||
@ -186,40 +196,40 @@ void PubSub::psubscribe( QString channel, QObject* subscriber, const char* metho
|
|||||||
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
|
||||||
* \param database Номер БД (-1 для всех)
|
* \param database Номер БД (-1 для всех)
|
||||||
*/
|
*/
|
||||||
void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database )
|
inline void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
qWarning() << "no redis connection for caller thread";
|
qWarning() << "no redis connection for caller thread";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto iter = _psubscribers.find( channel );
|
auto iter = m_psubscribers.find( channel );
|
||||||
if ( iter == _psubscribers.end() )
|
if ( iter == m_psubscribers.end() )
|
||||||
{
|
{
|
||||||
if ( database == -1 )
|
if ( database == -1 )
|
||||||
{
|
{
|
||||||
channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() );
|
channel.prepend( k_KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
channel.prepend( k_KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
if ( _socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
_socket->write( array( { "PSUBSCRIBE", channel } ) );
|
m_socket->write( array( { "PSUBSCRIBE", channel } ) );
|
||||||
}
|
}
|
||||||
_psubscribers[channel].push_back( { subscriber, method } );
|
m_psubscribers[channel].emplace_back( subscriber, method );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto value = iter.value();
|
auto value = iter.value();
|
||||||
auto it = std::find_if( value.begin(), value.end(),
|
auto it = std::find_if( value.begin(), value.end(),
|
||||||
[subscriber, method]( const std::pair< QObject*, const char* >& p ) {
|
[subscriber, method]( std::pair< QObject*, const char* > p ) {
|
||||||
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
|
||||||
} );
|
} );
|
||||||
if ( it == iter.value().end() )
|
if ( it == iter.value().end() )
|
||||||
{
|
{
|
||||||
iter.value().push_back( { subscriber, method } );
|
iter.value().emplace_back( subscriber, method );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // PubSub::psubscribe
|
} // PubSub::psubscribe
|
||||||
@ -232,18 +242,18 @@ void PubSub::psubscribe( QString channel, QObject* subscriber, const char* metho
|
|||||||
* \param method Метод обработки данных
|
* \param method Метод обработки данных
|
||||||
* \param keyEvents Признак подписки на изменение данных
|
* \param keyEvents Признак подписки на изменение данных
|
||||||
*/
|
*/
|
||||||
void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method )
|
inline void PubSub::unsubscribe( const QString& channel, QObject* subscriber, const char* method )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
qWarning() << "no redis connection for caller thread";
|
qWarning() << "no redis connection for caller thread";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto iter = _subscribers.find( channel );
|
auto iter = m_subscribers.find( channel );
|
||||||
if ( iter != _subscribers.end() )
|
if ( iter != m_subscribers.end() )
|
||||||
{
|
{
|
||||||
auto ctx = std::find_if( iter->begin(), iter->end(),
|
auto ctx = std::find_if( iter->begin(), iter->end(),
|
||||||
[subscriber, method]( const std::pair< QObject*, const char* >& value ) {
|
[subscriber, method]( std::pair< QObject*, const char* > value ) {
|
||||||
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
||||||
} );
|
} );
|
||||||
if ( ctx != iter->end() )
|
if ( ctx != iter->end() )
|
||||||
@ -252,11 +262,11 @@ void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* meth
|
|||||||
}
|
}
|
||||||
if ( iter->empty() )
|
if ( iter->empty() )
|
||||||
{
|
{
|
||||||
if ( _socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
_socket->write( array( { "UNSUBSCRIBE", channel } ) );
|
m_socket->write( array( { "UNSUBSCRIBE", channel } ) );
|
||||||
}
|
}
|
||||||
_subscribers.remove( channel );
|
m_subscribers.remove( channel );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // PubSub::unsubscribe
|
} // PubSub::unsubscribe
|
||||||
@ -269,7 +279,7 @@ void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* meth
|
|||||||
* \param method Метод обработки данных
|
* \param method Метод обработки данных
|
||||||
* \param keyEvents Признак подписки на изменение данных
|
* \param keyEvents Признак подписки на изменение данных
|
||||||
*/
|
*/
|
||||||
void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database )
|
inline void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
{
|
{
|
||||||
@ -278,17 +288,17 @@ void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* meth
|
|||||||
}
|
}
|
||||||
if ( database == -1 )
|
if ( database == -1 )
|
||||||
{
|
{
|
||||||
channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() );
|
channel.prepend( k_KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
channel.prepend( k_KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
auto iter = _subscribers.find( channel );
|
auto iter = m_subscribers.find( channel );
|
||||||
if ( iter != _subscribers.end() )
|
if ( iter != m_subscribers.end() )
|
||||||
{
|
{
|
||||||
auto ctx = std::find_if( iter->begin(), iter->end(),
|
auto ctx = std::find_if( iter->begin(), iter->end(),
|
||||||
[subscriber, method]( const std::pair< QObject*, const char* >& value ) {
|
[subscriber, method]( std::pair< QObject*, const char* > value ) {
|
||||||
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
||||||
} );
|
} );
|
||||||
if ( ctx != iter->end() )
|
if ( ctx != iter->end() )
|
||||||
@ -297,11 +307,11 @@ void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* meth
|
|||||||
}
|
}
|
||||||
if ( iter->empty() )
|
if ( iter->empty() )
|
||||||
{
|
{
|
||||||
if ( _socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
_socket->write( array( { "UNSUBSCRIBE", channel } ) );
|
m_socket->write( array( { "UNSUBSCRIBE", channel } ) );
|
||||||
}
|
}
|
||||||
_subscribers.remove( channel );
|
m_subscribers.remove( channel );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // PubSub::unsubscribe
|
} // PubSub::unsubscribe
|
||||||
@ -314,7 +324,7 @@ void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* meth
|
|||||||
* \param method Метод обработки данных
|
* \param method Метод обработки данных
|
||||||
* \param keyEvents Признак подписки на изменение данных
|
* \param keyEvents Признак подписки на изменение данных
|
||||||
*/
|
*/
|
||||||
void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
inline void PubSub::punsubscribe( const QString& channel, QObject* subscriber,
|
||||||
const char* method )
|
const char* method )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
@ -322,11 +332,11 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
|||||||
qWarning() << "no redis connection for caller thread";
|
qWarning() << "no redis connection for caller thread";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto iter = _psubscribers.find( channel );
|
auto iter = m_psubscribers.find( channel );
|
||||||
if ( iter != _psubscribers.end() )
|
if ( iter != m_psubscribers.end() )
|
||||||
{
|
{
|
||||||
auto ctx = std::find_if( iter->begin(), iter->end(),
|
auto ctx = std::find_if( iter->begin(), iter->end(),
|
||||||
[subscriber, method]( const std::pair< QObject*, const char* >& value ) {
|
[subscriber, method]( std::pair< QObject*, const char* > value ) {
|
||||||
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
||||||
} );
|
} );
|
||||||
if ( ctx != iter->end() )
|
if ( ctx != iter->end() )
|
||||||
@ -335,11 +345,11 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
|||||||
}
|
}
|
||||||
if ( iter->empty() )
|
if ( iter->empty() )
|
||||||
{
|
{
|
||||||
if ( _socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
_socket->write( array( { "PUNSUBSCRIBE", channel } ) );
|
m_socket->write( array( { "PUNSUBSCRIBE", channel } ) );
|
||||||
}
|
}
|
||||||
_psubscribers.remove( channel );
|
m_psubscribers.remove( channel );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // PubSub::punsubscribe
|
} // PubSub::punsubscribe
|
||||||
@ -352,7 +362,7 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
|||||||
* \param method Метод обработки данных
|
* \param method Метод обработки данных
|
||||||
* \param keyEvents Признак подписки на изменение данных
|
* \param keyEvents Признак подписки на изменение данных
|
||||||
*/
|
*/
|
||||||
void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
inline void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
||||||
const char* method, int database )
|
const char* method, int database )
|
||||||
{
|
{
|
||||||
if ( QThread::currentThread() != thread() )
|
if ( QThread::currentThread() != thread() )
|
||||||
@ -362,17 +372,17 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
|||||||
}
|
}
|
||||||
if ( database == -1 )
|
if ( database == -1 )
|
||||||
{
|
{
|
||||||
channel.prepend( KeySpacePrefix.arg( "*" ).toLocal8Bit() );
|
channel.prepend( k_KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
channel.prepend( KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
channel.prepend( k_KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
|
||||||
}
|
}
|
||||||
auto iter = _psubscribers.find( channel );
|
auto iter = m_psubscribers.find( channel );
|
||||||
if ( iter != _psubscribers.end() )
|
if ( iter != m_psubscribers.end() )
|
||||||
{
|
{
|
||||||
auto ctx = std::find_if( iter->begin(), iter->end(),
|
auto ctx = std::find_if( iter->begin(), iter->end(),
|
||||||
[subscriber, method]( const std::pair< QObject*, const char* >& value ) {
|
[subscriber, method]( std::pair< QObject*, const char* > value ) {
|
||||||
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
|
||||||
} );
|
} );
|
||||||
if ( ctx != iter->end() )
|
if ( ctx != iter->end() )
|
||||||
@ -381,11 +391,11 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
|||||||
}
|
}
|
||||||
if ( iter->empty() )
|
if ( iter->empty() )
|
||||||
{
|
{
|
||||||
if ( _socket->state() == QAbstractSocket::ConnectedState )
|
if ( m_socket->state() == QAbstractSocket::ConnectedState )
|
||||||
{
|
{
|
||||||
_socket->write( array( { "PUNSUBSCRIBE", channel } ) );
|
m_socket->write( array( { "PUNSUBSCRIBE", channel } ) );
|
||||||
}
|
}
|
||||||
_psubscribers.remove( channel );
|
m_psubscribers.remove( channel );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // PubSub::punsubscribe
|
} // PubSub::punsubscribe
|
||||||
@ -395,23 +405,23 @@ void PubSub::punsubscribe( QString channel, QObject* subscriber,
|
|||||||
* \brief Разбирает поступившие от сервера уведомления и вызывает требуемые
|
* \brief Разбирает поступившие от сервера уведомления и вызывает требуемые
|
||||||
* метаметоды обработки
|
* метаметоды обработки
|
||||||
*/
|
*/
|
||||||
void PubSub::read()
|
inline void PubSub::read()
|
||||||
{
|
{
|
||||||
int splitLength;
|
int splitLength = 0;
|
||||||
while ( _socket->bytesAvailable() > 0 )
|
while ( m_socket->bytesAvailable() > 0 )
|
||||||
{
|
{
|
||||||
_buffer += _socket->readAll();
|
m_buffer += m_socket->readAll();
|
||||||
while ( !_buffer.isEmpty() )
|
while ( !m_buffer.isEmpty() )
|
||||||
{
|
{
|
||||||
auto parts = split( _buffer, &splitLength );
|
auto parts = split( m_buffer, &splitLength );
|
||||||
if ( parts.empty() )
|
if ( parts.empty() )
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if ( ( parts[0] == "pmessage" ) && ( parts.size() == 4 ) )
|
if ( ( parts[0] == "pmessage" ) && ( parts.size() == 4 ) )
|
||||||
{
|
{
|
||||||
auto iter = _psubscribers.find( parts[1] );
|
auto iter = m_psubscribers.find( parts[1] );
|
||||||
if ( iter != _psubscribers.end() )
|
if ( iter != m_psubscribers.end() )
|
||||||
{
|
{
|
||||||
for ( const auto& i: iter.value() )
|
for ( const auto& i: iter.value() )
|
||||||
{
|
{
|
||||||
@ -425,8 +435,8 @@ void PubSub::read()
|
|||||||
{
|
{
|
||||||
if ( ( parts[0] == "message" ) && ( parts.size() == 3 ) )
|
if ( ( parts[0] == "message" ) && ( parts.size() == 3 ) )
|
||||||
{
|
{
|
||||||
auto iter = _subscribers.find( parts[1] );
|
auto iter = m_subscribers.find( parts[1] );
|
||||||
if ( iter != _subscribers.end() )
|
if ( iter != m_subscribers.end() )
|
||||||
{
|
{
|
||||||
for ( const auto& i: iter.value() )
|
for ( const auto& i: iter.value() )
|
||||||
{
|
{
|
||||||
@ -437,7 +447,7 @@ void PubSub::read()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_buffer.remove( 0, splitLength );
|
m_buffer.remove( 0, splitLength );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // PubSub::read
|
} // PubSub::read
|
||||||
@ -446,30 +456,32 @@ void PubSub::read()
|
|||||||
/*!
|
/*!
|
||||||
* \brief Обработка изменения состояния сокета
|
* \brief Обработка изменения состояния сокета
|
||||||
*/
|
*/
|
||||||
void PubSub::onSocketStateChanged()
|
inline void PubSub::onSocketStateChanged()
|
||||||
{
|
{
|
||||||
switch ( _socket->state() )
|
switch ( m_socket->state() )
|
||||||
{
|
{
|
||||||
case QAbstractSocket::ConnectedState:
|
case QAbstractSocket::ConnectedState:
|
||||||
for ( auto iter = _psubscribers.cbegin(); iter != _psubscribers.cend(); ++iter ) _socket->write( array( { "PSUBSCRIBE", iter.key() } ) );
|
for ( auto iter = m_psubscribers.cbegin(); iter != m_psubscribers.cend(); ++iter )
|
||||||
for ( auto iter = _subscribers.cbegin(); iter != _subscribers.cend(); ++iter ) _socket->write( array( { "SUBSCRIBE", iter.key() } ) );
|
{
|
||||||
qInfo() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" ) <<
|
m_socket->write( array( { "PSUBSCRIBE", iter.key() } ) );
|
||||||
QString( "redis at %1:%2 is connected" ).arg( _host ).arg( _port );
|
}
|
||||||
emit connectionStateChanged( _connectionFlags = Connected );
|
for ( auto iter = m_subscribers.cbegin(); iter != m_subscribers.cend(); ++iter )
|
||||||
emit connected();
|
{
|
||||||
|
m_socket->write( array( { "SUBSCRIBE", iter.key() } ) );
|
||||||
|
}
|
||||||
|
setConnectionFlags( kConnected );
|
||||||
|
Q_EMIT connectionStateChanged( connectionFlags() );
|
||||||
|
Q_EMIT connected();
|
||||||
break;
|
break;
|
||||||
case QAbstractSocket::UnconnectedState:
|
case QAbstractSocket::UnconnectedState:
|
||||||
_buffer.clear();
|
m_buffer.clear();
|
||||||
QTimer::singleShot( Timeout, [this]() {
|
m_connectionTimer->start();
|
||||||
_socket->connectToHost( _host, _port );
|
if ( !isConnected() )
|
||||||
} );
|
|
||||||
qInfo() << QDateTime::currentDateTimeUtc().toString( "hh:mm:ss:zzz" ) <<
|
|
||||||
QString( "redis at %1:%2 is unconnected" ).arg( _host ).arg( _port );
|
|
||||||
if ( _connectionFlags & Connected )
|
|
||||||
{
|
{
|
||||||
emit disconnected();
|
Q_EMIT disconnected();
|
||||||
}
|
}
|
||||||
emit connectionStateChanged( _connectionFlags = Unconnected );
|
setConnectionFlags( kDisconnected );
|
||||||
|
Q_EMIT connectionStateChanged( connectionFlags() );
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
@ -477,23 +489,13 @@ void PubSub::onSocketStateChanged()
|
|||||||
} // PubSub::onSocketStateChanged
|
} // PubSub::onSocketStateChanged
|
||||||
|
|
||||||
|
|
||||||
/*!
|
|
||||||
* \brief Возвращает признак установленного соединения
|
|
||||||
* \return Признак установленного соединения
|
|
||||||
*/
|
|
||||||
bool PubSub::isConnected() const
|
|
||||||
{
|
|
||||||
return ( _connectionFlags & Connected );
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Возвращает сокет
|
* \brief Возвращает сокет
|
||||||
* \return Сокет
|
* \return Сокет
|
||||||
*/
|
*/
|
||||||
QTcpSocket* PubSub::socket() const
|
inline QTcpSocket* PubSub::socket() const
|
||||||
{
|
{
|
||||||
return( _socket );
|
return( m_socket );
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace redis
|
} // namespace redis
|
||||||
|
@ -22,21 +22,20 @@ class PubSub : public Base
|
|||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
|
|
||||||
public:
|
public:
|
||||||
PubSub( QObject* parent = nullptr );
|
explicit PubSub( QObject* parent = nullptr );
|
||||||
bool isConnected() const;
|
|
||||||
QTcpSocket* socket() const;
|
QTcpSocket* socket() const;
|
||||||
Q_SLOT void start();
|
Q_SLOT void start();
|
||||||
Q_SLOT void stop();
|
Q_SLOT void stop();
|
||||||
Q_INVOKABLE void subscribe( QString channel, QObject* subscriber, const char* method );
|
Q_INVOKABLE void subscribe( const QString& channel, QObject* subscriber, const char* method );
|
||||||
Q_INVOKABLE void subscribe( QString channel, QObject* subscriber,
|
Q_INVOKABLE void subscribe( QString channel, QObject* subscriber,
|
||||||
const char* method, int database );
|
const char* method, int database );
|
||||||
Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber, const char* method );
|
Q_INVOKABLE void psubscribe( const QString& channel, QObject* subscriber, const char* method );
|
||||||
Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber,
|
Q_INVOKABLE void psubscribe( QString channel, QObject* subscriber,
|
||||||
const char* method, int database );
|
const char* method, int database );
|
||||||
Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber, const char* method );
|
Q_INVOKABLE void unsubscribe( const QString& channel, QObject* subscriber, const char* method );
|
||||||
Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber,
|
Q_INVOKABLE void unsubscribe( QString channel, QObject* subscriber,
|
||||||
const char* method, int database );
|
const char* method, int database );
|
||||||
Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber, const char* method );
|
Q_INVOKABLE void punsubscribe( const QString& channel, QObject* subscriber, const char* method );
|
||||||
Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber,
|
Q_INVOKABLE void punsubscribe( QString channel, QObject* subscriber,
|
||||||
const char* method, int database );
|
const char* method, int database );
|
||||||
/*!
|
/*!
|
||||||
@ -51,14 +50,16 @@ public:
|
|||||||
private:
|
private:
|
||||||
using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >;
|
using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >;
|
||||||
|
|
||||||
static const QString KeySpacePrefix; //!< Приставка для наблюдения за данными
|
static const QString k_KeySpacePrefix; //!< Приставка для наблюдения за данными
|
||||||
QTcpSocket* _socket;
|
QTcpSocket* m_socket;
|
||||||
|
QTimer* m_connectionTimer; //!< Таймер установления сетевого соединения
|
||||||
|
static constexpr int k_ConnectionTimeout { 2000 }; //!< Таймаут на сетевые операции
|
||||||
/*!
|
/*!
|
||||||
* \brief Таблица подписчиков
|
* \brief Таблица подписчиков
|
||||||
*/
|
*/
|
||||||
QByteArray _buffer; //!< Буффер прочитанных данных
|
QByteArray m_buffer; //!< Буффер прочитанных данных
|
||||||
Sub _subscribers; //!< Обработчики сообщений по каналу подписки
|
Sub m_subscribers; //!< Обработчики сообщений по каналу подписки
|
||||||
Sub _psubscribers; //!< Обработчики сообщений по шаблону канала подписки
|
Sub m_psubscribers; //!< Обработчики сообщений по шаблону канала подписки
|
||||||
|
|
||||||
Q_SLOT void read();
|
Q_SLOT void read();
|
||||||
Q_SLOT void onSocketStateChanged();
|
Q_SLOT void onSocketStateChanged();
|
||||||
|
Loading…
Reference in New Issue
Block a user