Обновление

This commit is contained in:
Andrei Astafev 2020-04-25 14:51:04 +03:00
parent 5936bf96f0
commit 477cebf235
2 changed files with 82 additions and 11 deletions

View File

@ -94,7 +94,7 @@ inline void PubSub::subscribe( const QString& channel, QObject* subscriber, cons
{ {
if ( m_socket->state() == QAbstractSocket::ConnectedState ) if ( m_socket->state() == QAbstractSocket::ConnectedState )
{ {
m_socket->write( array( { "SUBSCIBE", channel } ) ); m_socket->write( array( { "SUBSCRIBE", channel } ) );
} }
m_subscribers[channel].emplace_back( subscriber, method ); m_subscribers[channel].emplace_back( subscriber, method );
} }
@ -107,7 +107,9 @@ inline void PubSub::subscribe( const QString& channel, QObject* subscriber, cons
} ); } );
if ( it == iter.value().end() ) if ( it == iter.value().end() )
{ {
iter.value().emplace_back( subscriber, method ); iter.value().push_back( { subscriber, method } );
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
Qt::UniqueConnection );
} }
} }
} // PubSub::subscribe } // PubSub::subscribe
@ -146,7 +148,9 @@ inline void PubSub::subscribe( QString channel, QObject* subscriber, const char*
} ); } );
if ( it == iter.value().end() ) if ( it == iter.value().end() )
{ {
iter.value().emplace_back( subscriber, method ); iter.value().push_back( { subscriber, method } );
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
Qt::UniqueConnection );
} }
} }
} // PubSub::subscribe } // PubSub::subscribe
@ -183,7 +187,9 @@ inline void PubSub::psubscribe( const QString& channel, QObject* subscriber, con
} ); } );
if ( it == iter.value().end() ) if ( it == iter.value().end() )
{ {
iter.value().emplace_back( subscriber, method ); iter.value().push_back( { subscriber, method } );
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
Qt::UniqueConnection );
} }
} }
} // PubSub::psubscribe } // PubSub::psubscribe
@ -229,7 +235,9 @@ inline void PubSub::psubscribe( QString channel, QObject* subscriber, const char
} ); } );
if ( it == iter.value().end() ) if ( it == iter.value().end() )
{ {
iter.value().emplace_back( subscriber, method ); iter.value().push_back( { subscriber, method } );
connect( subscriber, &QObject::destroyed, this, &PubSub::onSubscriberDestroyed,
Qt::UniqueConnection );
} }
} }
} // PubSub::psubscribe } // PubSub::psubscribe
@ -489,6 +497,68 @@ inline void PubSub::onSocketStateChanged()
} // PubSub::onSocketStateChanged } // PubSub::onSocketStateChanged
/*!
* \brief При удалении подписчика уничтожает все его подписки
* \param subscriber Удаленный подписчик
*/
void PubSub::onSubscriberDestroyed( QObject* subscriber )
{
for ( auto i = m_subscribers.begin(); i != m_subscribers.end(); )
{
for ( auto j = i.value().begin(); j != i.value().end(); )
{
if ( j->first == subscriber )
{
j = i.value().erase( j );
}
else
{
j++;
}
}
if ( i.value().empty() )
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "UNSUBSCRIBE", i.key() } ) );
}
i = m_subscribers.erase( i );
}
else
{
i++;
}
}
for ( auto i = m_psubscribers.begin(); i != m_psubscribers.end(); )
{
for ( auto j = i.value().begin(); j != i.value().end(); )
{
if ( j->first == subscriber )
{
j = i.value().erase( j );
}
else
{
j++;
}
}
if ( i.value().empty() )
{
if ( m_socket->state() == QAbstractSocket::ConnectedState )
{
m_socket->write( array( { "PUNSUBSCRIBE", i.key() } ) );
}
i = m_psubscribers.erase( i );
}
else
{
i++;
}
}
} // PubSub::onSubscriberDestroyed
/*! /*!
* \brief Возвращает сокет * \brief Возвращает сокет
* \return Сокет * \return Сокет

View File

@ -50,19 +50,20 @@ public:
private: private:
using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >; using Sub = QHash< QString, std::vector< std::pair< QObject*, const char* > > >;
static const QString k_KeySpacePrefix; //!< Приставка для наблюдения за данными static const QString k_KeySpacePrefix; //!< Приставка для наблюдения за данными
QTcpSocket* m_socket; QTcpSocket* m_socket;
QTimer* m_connectionTimer; //!< Таймер установления сетевого соединения QTimer* m_connectionTimer; //!< Таймер установления сетевого соединения
static constexpr int k_ConnectionTimeout { 2000 }; //!< Таймаут на сетевые операции static constexpr int ConnectionTimeout { 2000 }; //!< Таймаут на сетевые операции
/*! /*!
* \brief Таблица подписчиков * \brief Таблица подписчиков
*/ */
QByteArray m_buffer; //!< Буффер прочитанных данных QByteArray m_buffer; //!< Буффер прочитанных данных
Sub m_subscribers; //!< Обработчики сообщений по каналу подписки Sub m_subscribers; //!< Обработчики сообщений по каналу подписки
Sub m_psubscribers; //!< Обработчики сообщений по шаблону канала подписки Sub m_psubscribers; //!< Обработчики сообщений по шаблону канала подписки
Q_SLOT void read(); Q_SLOT void read();
Q_SLOT void onSocketStateChanged(); Q_SLOT void onSocketStateChanged();
Q_SLOT void onSubscriberDestroyed( QObject* subscriber );
}; // class PubSub }; // class PubSub
} // namespace redis } // namespace redis