myxlib/src/myx/redis/pubsub-inl.hpp

580 lines
17 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#ifndef MYX_REDIS_PUBSUB_INL_HPP_
#define MYX_REDIS_PUBSUB_INL_HPP_
#pragma once
#ifndef MYXLIB_HEADER_ONLY
#include <myx/redis/pubsub.hpp>
#endif
#include <myx/redis/containers.hpp>
#include <QCoreApplication>
#include <QDateTime>
#include <QTcpSocket>
#include <QThread>
#include <QTimer>
namespace myx {
namespace redis {
/*!
* \brief Конструктор класса
* \param parent Родительский объект для m_socket
*/
MYXLIB_INLINE PubSub::PubSub( QObject* parent ) :
Base { parent },
m_socket ( new QTcpSocket( this ) ),
m_connectionTimer( new QTimer( this ) )
{
connect( m_socket, &QAbstractSocket::stateChanged, this, &PubSub::onSocketStateChanged );
connect( m_socket, &QIODevice::readyRead, this, &PubSub::onReadyRead );
connect( m_socket, &QIODevice::bytesWritten, this, &PubSub::onBytesWritten );
connect( m_socket, static_cast< void ( QAbstractSocket::* )( QAbstractSocket::SocketError ) >
( &QAbstractSocket::error ), this, [this]() {
qDebug() << QDateTime::currentDateTimeUtc().toString( QStringLiteral( "hh:mm:ss:zzz" ) )
<< QByteArray( "redis pubsub connection error" ) << m_socket->errorString();
} );
connect( m_socket, &QIODevice::readyRead, this, &PubSub::read );
m_connectionTimer->setInterval( connectionTimeout() );
m_connectionTimer->setSingleShot( true );
connect( m_connectionTimer, &QTimer::timeout, this, [this]() {
m_socket->connectToHost( host(), port() );
} );
}
MYXLIB_INLINE int PubSub::connectionTimeout() const
{
return( kConnectionTimeout );
}
/*!
* \brief Инициирует работу с БД
*/
MYXLIB_INLINE void PubSub::start()
{
Q_EMIT disconnected();
Q_EMIT connectionStateChanged( connectionFlags() );
m_socket->connectToHost( host(), port() );
}
/*!
* \brief Завершает работу с БД
*/
MYXLIB_INLINE void PubSub::stop()
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->disconnectFromHost();
while ( m_socket->state() != QAbstractSocket::UnconnectedState )
{
QCoreApplication::processEvents();
}
}
}
/*!
* \brief Создает подписку на канал сообщений
* \param channel Rанал подписки
* \param subscriber Объект-подписчик
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
*/
MYXLIB_INLINE void PubSub::subscribe( const QString& channel, QObject* subscriber, const char* method )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = m_subscribers.find( channel );
if ( iter == m_subscribers.end() )
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "SUBSCRIBE", channel } ) );
}
m_subscribers[channel].emplace_back( subscriber, method );
}
else
{
auto value = iter.value();
auto it = std::find_if( value.begin(), value.end(),
[subscriber, method]( std::pair< QObject*, const char* > p ) {
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
} );
if ( it == iter.value().end() )
{
iter.value().emplace_back( subscriber, method );
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
Qt::UniqueConnection );
}
}
} // PubSub::subscribe
/*!
* \brief Создает подписку на канал изменения данных
* \param channel Rанал подписки
* \param subscriber Объект-подписчик
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
* \param database Номер БД
*/
MYXLIB_INLINE void PubSub::subscribe( QString channel, QObject* subscriber, const char* method, int database )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = m_subscribers.find( channel );
if ( iter == m_subscribers.end() )
{
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "SUBSCRIBE", channel } ) );
}
m_subscribers[channel].emplace_back( subscriber, method );
}
else
{
auto value = iter.value();
auto it = std::find_if( value.begin(), value.end(),
[subscriber, method]( std::pair< QObject*, const char* > p ) {
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
} );
if ( it == iter.value().end() )
{
iter.value().emplace_back( subscriber, method );
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
Qt::UniqueConnection );
}
}
} // PubSub::subscribe
/*!
* \brief Создает подписку на шаблон канала сообщений
* \param channel Шаблон канала подписки
* \param subscriber Объект-подписчик
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
*/
MYXLIB_INLINE void PubSub::psubscribe( const QString& channel, QObject* subscriber, const char* method )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = m_psubscribers.find( channel );
if ( iter == m_psubscribers.end() )
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "PSUBSCRIBE", channel } ) );
}
m_psubscribers[channel].emplace_back( subscriber, method );
}
else
{
auto value = iter.value();
auto it = std::find_if( value.begin(), value.end(),
[subscriber, method]( std::pair< QObject*, const char* > p ) {
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
} );
if ( it == iter.value().end() )
{
iter.value().emplace_back( subscriber, method );
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
Qt::UniqueConnection );
}
}
} // PubSub::psubscribe
/*!
* \brief Создает подписку на шаблон канала изменения данных
* \param channel Шаблон канала подписки
* \param subscriber Объект-подписчик
* \param method Имя метода, вызываемого при изменении получении серверного сообщения
* \param database Номер БД (-1 для всех)
*/
MYXLIB_INLINE void PubSub::psubscribe( QString channel, QObject* subscriber, const char* method, int database )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = m_psubscribers.find( channel );
if ( iter == m_psubscribers.end() )
{
if ( database == -1 )
{
channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
}
else
{
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
}
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "PSUBSCRIBE", channel } ) );
}
m_psubscribers[channel].emplace_back( subscriber, method );
}
else
{
auto value = iter.value();
auto it = std::find_if( value.begin(), value.end(),
[subscriber, method]( std::pair< QObject*, const char* > p ) {
return( p.first == subscriber && strcmp( p.second, method ) == 0 );
} );
if ( it == iter.value().end() )
{
iter.value().emplace_back( subscriber, method );
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
Qt::UniqueConnection );
}
}
} // PubSub::psubscribe
/*!
* \brief Отписывает получателя от канала
* \param channel Имя канала
* \param subscriber Подписчик на канал
* \param method Метод обработки данных
* \param keyEvents Признак подписки на изменение данных
*/
MYXLIB_INLINE void PubSub::unsubscribe( const QString& channel, QObject* subscriber, const char* method )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = m_subscribers.find( channel );
if ( iter != m_subscribers.end() )
{
auto ctx = std::find_if( iter->begin(), iter->end(),
[subscriber, method]( std::pair< QObject*, const char* > value ) {
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
} );
if ( ctx != iter->end() )
{
iter->erase( ctx );
}
if ( iter->empty() )
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "UNSUBSCRIBE", channel } ) );
}
m_subscribers.remove( channel );
}
}
} // PubSub::unsubscribe
/*!
* \brief Отписывает получателя от канала
* \param channel Имя канала
* \param subscriber Подписчик на канал
* \param method Метод обработки данных
* \param keyEvents Признак подписки на изменение данных
*/
MYXLIB_INLINE void PubSub::unsubscribe( QString channel, QObject* subscriber, const char* method, int database )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
if ( database == -1 )
{
channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
}
else
{
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
}
auto iter = m_subscribers.find( channel );
if ( iter != m_subscribers.end() )
{
auto ctx = std::find_if( iter->begin(), iter->end(),
[subscriber, method]( std::pair< QObject*, const char* > value ) {
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
} );
if ( ctx != iter->end() )
{
iter->erase( ctx );
}
if ( iter->empty() )
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "UNSUBSCRIBE", channel } ) );
}
m_subscribers.remove( channel );
}
}
} // PubSub::unsubscribe
/*!
* \brief Отписывает получателя от шаблона канала
* \param channel Шаблон канала
* \param subscriber Подписчик на канал
* \param method Метод обработки данных
* \param keyEvents Признак подписки на изменение данных
*/
MYXLIB_INLINE void PubSub::punsubscribe( const QString& channel, QObject* subscriber,
const char* method )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
auto iter = m_psubscribers.find( channel );
if ( iter != m_psubscribers.end() )
{
auto ctx = std::find_if( iter->begin(), iter->end(),
[subscriber, method]( std::pair< QObject*, const char* > value ) {
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
} );
if ( ctx != iter->end() )
{
iter->erase( ctx );
}
if ( iter->empty() )
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "PUNSUBSCRIBE", channel } ) );
}
m_psubscribers.remove( channel );
}
}
} // PubSub::punsubscribe
/*!
* \brief Отписывает получателя от шаблона канала
* \param channel Шаблон канала
* \param subscriber Подписчик на канал
* \param method Метод обработки данных
* \param keyEvents Признак подписки на изменение данных
*/
MYXLIB_INLINE void PubSub::punsubscribe( QString channel, QObject* subscriber,
const char* method, int database )
{
if ( QThread::currentThread() != thread() )
{
qWarning() << "no redis connection for caller thread";
return;
}
if ( database == -1 )
{
channel.prepend( C::KeySpacePrefix.arg( QStringLiteral( "*" ) ).toLocal8Bit() );
}
else
{
channel.prepend( C::KeySpacePrefix.arg( "space@" + QString::number( database ) ).toLocal8Bit() );
}
auto iter = m_psubscribers.find( channel );
if ( iter != m_psubscribers.end() )
{
auto ctx = std::find_if( iter->begin(), iter->end(),
[subscriber, method]( std::pair< QObject*, const char* > value ) {
return( subscriber == value.first && strcmp( method, value.second ) == 0 );
} );
if ( ctx != iter->end() )
{
iter->erase( ctx );
}
if ( iter->empty() )
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "PUNSUBSCRIBE", channel } ) );
}
m_psubscribers.remove( channel );
}
}
} // PubSub::punsubscribe
/*!
* \brief Разбирает поступившие от сервера уведомления и вызывает требуемые
* метаметоды обработки
*/
MYXLIB_INLINE void PubSub::read()
{
int splitLength = 0;
while ( m_socket->bytesAvailable() > 0 )
{
m_buffer += m_socket->readAll();
while ( !m_buffer.isEmpty() )
{
auto parts = split( m_buffer, &splitLength );
if ( parts.empty() )
{
break;
}
if ( ( parts[0] == "pmessage" ) && ( parts.size() == 4 ) )
{
auto iter = m_psubscribers.find( parts[1] );
if ( iter != m_psubscribers.end() )
{
for ( const auto& i: iter.value() )
{
QMetaObject::invokeMethod( i.first, i.second,
Q_ARG( QString, parts[2] ),
Q_ARG( QByteArray, parts[3] ) );
}
}
}
else
{
if ( ( parts[0] == "message" ) && ( parts.size() == 3 ) )
{
auto iter = m_subscribers.find( parts[1] );
if ( iter != m_subscribers.end() )
{
for ( const auto& i: iter.value() )
{
QMetaObject::invokeMethod( i.first, i.second,
Q_ARG( QString, parts[1] ),
Q_ARG( QByteArray, parts[2] ) );
}
}
}
}
m_buffer.remove( 0, splitLength );
}
}
} // PubSub::read
/*!
* \brief Обработка изменения состояния сокета
*/
MYXLIB_INLINE void PubSub::onSocketStateChanged()
{
switch ( m_socket->state() )
{
case QAbstractSocket::ConnectedState:
for ( auto iter = m_psubscribers.cbegin(); iter != m_psubscribers.cend(); ++iter )
{
m_socket->write( array( { "PSUBSCRIBE", iter.key() } ) );
}
for ( auto iter = m_subscribers.cbegin(); iter != m_subscribers.cend(); ++iter )
{
m_socket->write( array( { "SUBSCRIBE", iter.key() } ) );
}
setConnectionFlags( kConnected );
Q_EMIT connectionStateChanged( connectionFlags() );
Q_EMIT connected();
break;
case QAbstractSocket::UnconnectedState:
m_buffer.clear();
m_connectionTimer->start();
if ( !isConnected() )
{
Q_EMIT disconnected();
}
setConnectionFlags( kDisconnected );
Q_EMIT connectionStateChanged( connectionFlags() );
break;
default:
break;
} // switch
} // PubSub::onSocketStateChanged
/*!
* \brief При удалении подписчика уничтожает все его подписки
* \param subscriber Удаленный подписчик
*/
MYXLIB_INLINE void PubSub::onSubscriberDestroyed( QObject* subscriber )
{
for ( auto i = m_subscribers.begin(); i != m_subscribers.end(); )
{
for ( auto j = i.value().begin(); j != i.value().end(); )
{
if ( j->first == subscriber )
{
j = i.value().erase( j );
}
else
{
j++;
}
}
if ( i.value().empty() )
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "UNSUBSCRIBE", i.key() } ) );
}
i = m_subscribers.erase( i );
}
else
{
i++;
}
}
for ( auto i = m_psubscribers.begin(); i != m_psubscribers.end(); )
{
for ( auto j = i.value().begin(); j != i.value().end(); )
{
if ( j->first == subscriber )
{
j = i.value().erase( j );
}
else
{
j++;
}
}
if ( i.value().empty() )
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "PUNSUBSCRIBE", i.key() } ) );
}
i = m_psubscribers.erase( i );
}
else
{
i++;
}
}
} // PubSub::onSubscriberDestroyed
/*!
* \brief Возвращает сокет
* \return Сокет
*/
MYXLIB_INLINE QTcpSocket* PubSub::socket() const
{
return( m_socket );
}
} // namespace redis
} // namespace myx
#endif // ifndef MYX_REDIS_PUBSUB_INL_HPP_