From 477cebf235cb02ad3ff4ebacc5d967942154d681 Mon Sep 17 00:00:00 2001 From: Andrey Astafyev Date: Sat, 25 Apr 2020 14:51:04 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/myx/redis/pubsub-inl.hpp | 80 +++++++++++++++++++++++++++++++++--- src/myx/redis/pubsub.hpp | 13 +++--- 2 files changed, 82 insertions(+), 11 deletions(-) diff --git a/src/myx/redis/pubsub-inl.hpp b/src/myx/redis/pubsub-inl.hpp index fceef6b..4ee72b4 100644 --- a/src/myx/redis/pubsub-inl.hpp +++ b/src/myx/redis/pubsub-inl.hpp @@ -94,7 +94,7 @@ inline void PubSub::subscribe( const QString& channel, QObject* subscriber, cons { if ( m_socket->state() == QAbstractSocket::ConnectedState ) { - m_socket->write( array( { "SUBSCIBE", channel } ) ); + m_socket->write( array( { "SUBSCRIBE", channel } ) ); } m_subscribers[channel].emplace_back( subscriber, method ); } @@ -107,7 +107,9 @@ inline void PubSub::subscribe( const QString& channel, QObject* subscriber, cons } ); if ( it == iter.value().end() ) { - iter.value().emplace_back( subscriber, method ); + iter.value().push_back( { subscriber, method } ); + connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed, + Qt::UniqueConnection ); } } } // PubSub::subscribe @@ -146,7 +148,9 @@ inline void PubSub::subscribe( QString channel, QObject* subscriber, const char* } ); if ( it == iter.value().end() ) { - iter.value().emplace_back( subscriber, method ); + iter.value().push_back( { subscriber, method } ); + connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed, + Qt::UniqueConnection ); } } } // PubSub::subscribe @@ -183,7 +187,9 @@ inline void PubSub::psubscribe( const QString& channel, QObject* subscriber, con } ); if ( it == iter.value().end() ) { - iter.value().emplace_back( subscriber, method ); + iter.value().push_back( { subscriber, method } ); + connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed, + Qt::UniqueConnection ); } } } // PubSub::psubscribe @@ -229,7 +235,9 @@ inline void PubSub::psubscribe( QString channel, QObject* subscriber, const char } ); if ( it == iter.value().end() ) { - iter.value().emplace_back( subscriber, method ); + iter.value().push_back( { subscriber, method } ); + connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed, + Qt::UniqueConnection ); } } } // PubSub::psubscribe @@ -489,6 +497,68 @@ inline void PubSub::onSocketStateChanged() } // PubSub::onSocketStateChanged +/*! + * \brief При удалении подписчика уничтожает все его подписки + * \param subscriber Удаленный подписчик + */ +void PubSub::onSubscriberDestroyed( QObject* subscriber ) +{ + for ( auto i = m_subscribers.begin(); i != m_subscribers.end(); ) + { + for ( auto j = i.value().begin(); j != i.value().end(); ) + { + if ( j->first == subscriber ) + { + j = i.value().erase( j ); + } + else + { + j++; + } + } + if ( i.value().empty() ) + { + if ( m_socket->state() == QAbstractSocket::ConnectedState ) + { + m_socket->write( array( { "UNSUBSCRIBE", i.key() } ) ); + } + i = m_subscribers.erase( i ); + } + else + { + i++; + } + } + + for ( auto i = m_psubscribers.begin(); i != m_psubscribers.end(); ) + { + for ( auto j = i.value().begin(); j != i.value().end(); ) + { + if ( j->first == subscriber ) + { + j = i.value().erase( j ); + } + else + { + j++; + } + } + if ( i.value().empty() ) + { + if ( m_socket->state() == QAbstractSocket::ConnectedState ) + { + m_socket->write( array( { "PUNSUBSCRIBE", i.key() } ) ); + } + i = m_psubscribers.erase( i ); + } + else + { + i++; + } + } +} // PubSub::onSubscriberDestroyed + + /*! * \brief Возвращает сокет * \return Сокет diff --git a/src/myx/redis/pubsub.hpp b/src/myx/redis/pubsub.hpp index 7f415b4..7c3b2f3 100644 --- a/src/myx/redis/pubsub.hpp +++ b/src/myx/redis/pubsub.hpp @@ -50,19 +50,20 @@ public: private: using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >; - static const QString k_KeySpacePrefix; //!< Приставка для наблюдения за данными + static const QString k_KeySpacePrefix; //!< Приставка для наблюдения за данными QTcpSocket* m_socket; - QTimer* m_connectionTimer; //!< Таймер установления сетевого соединения - static constexpr int k_ConnectionTimeout { 2000 }; //!< Таймаут на сетевые операции + QTimer* m_connectionTimer; //!< Таймер установления сетевого соединения + static constexpr int ConnectionTimeout { 2000 }; //!< Таймаут на сетевые операции /*! * \brief Таблица подписчиков */ - QByteArray m_buffer; //!< Буффер прочитанных данных - Sub m_subscribers; //!< Обработчики сообщений по каналу подписки - Sub m_psubscribers; //!< Обработчики сообщений по шаблону канала подписки + QByteArray m_buffer; //!< Буффер прочитанных данных + Sub m_subscribers; //!< Обработчики сообщений по каналу подписки + Sub m_psubscribers; //!< Обработчики сообщений по шаблону канала подписки Q_SLOT void read(); Q_SLOT void onSocketStateChanged(); + Q_SLOT void onSubscriberDestroyed( QObject* subscriber ); }; // class PubSub } // namespace redis