From 3fea4be230ac431d499d28b314a72f038f995e70 Mon Sep 17 00:00:00 2001 From: Dennis Oberst Date: Mon, 21 Jul 2025 21:23:03 +0200 Subject: [PATCH] QGrpcHttp2Channel: Improve the lifetime management of Http2Handler Our lifetime management was extremely flawed, as there were many points where Http2Handler instances were not properly deleted, resulting in inactive zombie handlers that would never be cleaned up. In a running event loop, there should be no active or pending handlers left at the time of channel destruction. If we had previously asserted this condition: QGrpcHttp2ChannelPrivate::~QGrpcHttp2ChannelPrivate() { Q_ASSERT(children().isEmpty()); } and then ran our tests, many cases would reveal that we were effectively leaking memory due to inproper lifetime management on our side. This is fixed by applying the following: When the finished signal is emitted, the corresponding Http2Handler should be deleted. It no longer makes sense to keep it alive beyond that point, as this aligns with our documented lifetime for client-side RPC handlers. Transform the operation context into a QPointer to not steady convert into shared_ptr's. Connect to the destroyed signal to keep track when the user-side handler gets deleted. We definitely don't want to take part in sharing the ownership (and therefore lifetime) of the operationContext. Pass down a pointer very early on so that no mistakes happen in the future (as to take a copy of the shared_ptr in a lambda). This is streamlined by introducing the finish and asyncFinish functions. Task-number: QTBUG-128338 Fixes: QTBUG-129160 Pick-to: 6.10 6.9 6.8 Change-Id: I8e17f7832659fb348e7d05aeabe164b16b6ff283 Reviewed-by: Alexey Edelev --- src/grpc/qgrpchttp2channel.cpp | 217 +++++++++++++++------------------ 1 file changed, 99 insertions(+), 118 deletions(-) diff --git a/src/grpc/qgrpchttp2channel.cpp b/src/grpc/qgrpchttp2channel.cpp index b0cf2040..6967b001 100644 --- a/src/grpc/qgrpchttp2channel.cpp +++ b/src/grpc/qgrpchttp2channel.cpp @@ -237,16 +237,6 @@ constexpr StatusCode http2StatusToStatusCode(const int status) } } -// Sends the errorOccured and finished signals asynchronously to make sure user -// connections work correctly. -void operationContextAsyncError(QGrpcOperationContext *operationContext, const QGrpcStatus &status) -{ - Q_ASSERT_X(operationContext != nullptr, "QGrpcHttp2ChannelPrivate::operationContextAsyncError", - "operationContext is null"); - QTimer::singleShot(0, operationContext, - [operationContext, status]() { emit operationContext->finished(status); }); -} - } // namespace struct ExpectedData @@ -286,16 +276,18 @@ public: }; Q_ENUM(State); - explicit Http2Handler(const std::shared_ptr &operation, - QGrpcHttp2ChannelPrivate *parent, bool endStream); + explicit Http2Handler(QGrpcOperationContext *operation, QGrpcHttp2ChannelPrivate *parent, + bool endStream); ~Http2Handler() override; void sendInitialRequest(); void attachStream(QHttp2Stream *stream_); void processQueue(); - [[nodiscard]] QGrpcOperationContext *operation() const; - [[nodiscard]] bool expired() const { return m_operation.expired(); } + void finish(const QGrpcStatus &status); + void asyncFinish(const QGrpcStatus &status); + + [[nodiscard]] bool expired() const { return !m_operation; } [[nodiscard]] bool isStreamClosedForSending() const { @@ -318,8 +310,9 @@ private: [[nodiscard]] HPack::HttpHeader constructInitialHeaders() const; [[nodiscard]] QGrpcHttp2ChannelPrivate *channelPriv() const; [[nodiscard]] QGrpcHttp2Channel *channel() const; + [[nodiscard]] bool handleContextExpired(); - std::weak_ptr m_operation; + QPointer m_operation; HPack::HttpHeader m_initialHeaders; QQueue m_queue; QPointer m_stream; @@ -339,8 +332,7 @@ public: explicit QGrpcHttp2ChannelPrivate(const QUrl &uri, QGrpcHttp2Channel *q); ~QGrpcHttp2ChannelPrivate() override = default; - void processOperation(const std::shared_ptr &operationContext, - bool endStream = false); + void processOperation(QGrpcOperationContext *operationContext, bool endStream = false); [[nodiscard]] bool isLocalSocket() const { @@ -363,10 +355,10 @@ private: enum ConnectionState { Connecting = 0, Connected, SettingsReceived, Error }; template - void connectErrorHandler(T *socket, QGrpcOperationContext *operationContext) + void connectErrorHandler(T *socket, Http2Handler *handler) { - QObject::connect(socket, &T::errorOccurred, operationContext, - [operationContextPtr = QPointer(operationContext), this](auto error) { + QObject::connect(socket, &T::errorOccurred, handler, + [this, handler](auto error) { if (m_isInsideSocketErrorOccurred) { qGrpcCritical("Socket errorOccurred signal triggered while " "already handling an error"); @@ -376,10 +368,8 @@ private: auto reset = qScopeGuard([this]() { m_isInsideSocketErrorOccurred = false; }); - emit operationContextPtr->finished(QGrpcStatus{ - StatusCode::Unavailable, - QGrpcHttp2ChannelPrivate::tr("Network error occurred: %1") - .arg(error) }); + emit handler->finish({ StatusCode::Unavailable, + tr("Network error occurred: %1").arg(error) }); }); } @@ -437,28 +427,31 @@ private: /// ## Http2Handler Implementations /// -Http2Handler::Http2Handler(const std::shared_ptr &operation, - QGrpcHttp2ChannelPrivate *parent, bool endStream) +Http2Handler::Http2Handler(QGrpcOperationContext *operation, QGrpcHttp2ChannelPrivate *parent, + bool endStream) : QObject(parent), m_operation(operation), m_initialHeaders(constructInitialHeaders()), m_endStreamAtFirstData(endStream) { - auto *channelOpPtr = operation.get(); - QObject::connect(channelOpPtr, &QGrpcOperationContext::cancelRequested, this, [this] { + // If the context (lifetime bound to the user) is destroyed, this handler + // can no longer perform any meaningful work. We allow it to be deleted; + // QHttp2Stream will handle any outstanding cancellations appropriately. + QObject::connect(operation, &QGrpcOperationContext::destroyed, this, + &Http2Handler::deleteLater); + QObject::connect(operation, &QGrpcOperationContext::cancelRequested, this, [this] { cancel(); deleteLater(); }); - QObject::connect(channelOpPtr, &QGrpcOperationContext::writesDoneRequested, this, + QObject::connect(operation, &QGrpcOperationContext::writesDoneRequested, this, &Http2Handler::writesDone); if (!m_endStreamAtFirstData) { - QObject::connect(channelOpPtr, &QGrpcOperationContext::writeMessageRequested, this, + QObject::connect(operation, &QGrpcOperationContext::writeMessageRequested, this, &Http2Handler::writeMessage); } - QObject::connect(channelOpPtr, &QGrpcOperationContext::finished, &m_deadlineTimer, - &QTimer::stop); + QObject::connect(operation, &QGrpcOperationContext::finished, &m_deadlineTimer, &QTimer::stop); m_deadlineTimer.setSingleShot(true); - writeMessage(channelOpPtr->argument()); + writeMessage(operation->argument()); } Http2Handler::~Http2Handler() @@ -478,10 +471,9 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) Q_ASSERT(m_stream == nullptr); Q_ASSERT(stream_ != nullptr); - auto *channelOpPtr = operation(); m_stream = stream_; - QObject::connect(m_stream.get(), &QHttp2Stream::headersReceived, channelOpPtr, + QObject::connect(m_stream.get(), &QHttp2Stream::headersReceived, this, [this](const HPack::HttpHeader &headers, bool endStream) mutable { if (m_state >= State::Cancelled) { // In case we are Cancelled or Finished, a @@ -511,19 +503,14 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) }); QObject::connect( - m_stream.get(), &QHttp2Stream::errorOccurred, channelPriv(), + m_stream.get(), &QHttp2Stream::errorOccurred, this, [this](quint32 http2ErrorCode, const QString &errorString) { - if (!m_operation.expired()) { - auto channelOp = m_operation.lock(); - emit channelOp->finished(QGrpcStatus{ http2ErrorToStatusCode(http2ErrorCode), - errorString }); - } - deleteLater(); + finish({ http2ErrorToStatusCode(http2ErrorCode), errorString }); }, Qt::SingleShotConnection); - QObject::connect(m_stream.get(), &QHttp2Stream::dataReceived, channelOpPtr, - [channelOpPtr, this](const QByteArray &data, bool endStream) { + QObject::connect(m_stream.get(), &QHttp2Stream::dataReceived, m_operation.get(), + [this](const QByteArray &data, bool endStream) { if (m_state != State::Cancelled) { m_expectedData.container.append(data); @@ -535,7 +522,7 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) qGrpcDebug() << "Full data received:" << data.size() << "dataContainer:" << m_expectedData.container.size() << "capacity:" << m_expectedData.expectedSize; - emit channelOpPtr + emit m_operation ->messageReceived(m_expectedData.container .mid(GrpcMessageSizeHeaderSize, m_expectedData.expectedSize @@ -545,11 +532,9 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) if (!m_expectedData.updateExpectedSize()) return; } - if (endStream) { - m_state = State::Finished; - emit channelOpPtr->finished({}); - deleteLater(); - } + + if (endStream) + finish({}); } }); @@ -558,13 +543,6 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) } -QGrpcOperationContext *Http2Handler::operation() const -{ - Q_ASSERT(!m_operation.expired()); - - return m_operation.lock().get(); -} - // Builds HTTP/2 headers for the initial gRPC request. // The headers are sent once the HTTP/2 connection is established. HPack::HttpHeader Http2Handler::constructInitialHeaders() const @@ -585,11 +563,10 @@ HPack::HttpHeader Http2Handler::constructInitialHeaders() const + QSysInfo::productVersion().toUtf8() + ')'); const auto &channelOptions = channel()->channelOptions(); - const auto *operationContext = operation(); const auto *channel = channelPriv(); - QByteArray service{ operationContext->service() }; - QByteArray method{ operationContext->method() }; + QByteArray service{ m_operation->service() }; + QByteArray method{ m_operation->method() }; auto headers = HPack::HttpHeader{ { AuthorityHeader, channel->authorityHeader() }, { MethodHeader, MethodValue }, @@ -614,7 +591,7 @@ HPack::HttpHeader Http2Handler::constructInitialHeaders() const }; iterateMetadata(channelOptions.metadata(QtGrpc::MultiValue)); - iterateMetadata(operationContext->callOptions().metadata(QtGrpc::MultiValue)); + iterateMetadata(m_operation->callOptions().metadata(QtGrpc::MultiValue)); return headers; } @@ -628,6 +605,15 @@ QGrpcHttp2Channel *Http2Handler::channel() const return channelPriv()->q_ptr; } +bool Http2Handler::handleContextExpired() +{ + if (m_operation) + return false; + m_state = State::Cancelled; + deleteLater(); // m_stream will sendRST_STREAM on destruction + return true; +} + // Slot to enqueue a writeMessage request, either from the initial message // or from the user in client/bidirectional streaming RPCs. void Http2Handler::writeMessage(QByteArrayView data) @@ -661,18 +647,16 @@ void Http2Handler::sendInitialRequest() Q_ASSERT(m_state == State::Idle); if (!m_stream->sendHEADERS(m_initialHeaders, false)) { - m_state = State::Finished; - operationContextAsyncError(operation(), - QGrpcStatus{ - StatusCode::Unavailable, - tr("Unable to send initial headers to an HTTP/2 stream") }); + asyncFinish({ StatusCode::Unavailable, + tr("Unable to send initial headers to an HTTP/2 stream") }); return; } m_state = State::RequestHeadersSent; m_initialHeaders.clear(); processQueue(); - std::optional deadline = operation()->callOptions().deadlineTimeout(); + std::optional deadline = m_operation->callOptions() + .deadlineTimeout(); if (!deadline) deadline = channel()->channelOptions().deadlineTimeout(); if (deadline) { @@ -702,6 +686,24 @@ void Http2Handler::processQueue() m_stream->sendDATA(nextMessage, closeStream); } +void Http2Handler::finish(const QGrpcStatus &status) +{ + if (handleContextExpired()) + return; + if (m_state == State::Finished) + return; + if (m_state != State::Cancelled) // don't overwrite the Cancelled state + m_state = State::Finished; + emit m_operation->finished(status); + deleteLater(); +} +void Http2Handler::asyncFinish(const QGrpcStatus &status) +{ + if (handleContextExpired()) + return; + QTimer::singleShot(0, m_operation.get(), [this, status]() { finish(status); }); +} + void Http2Handler::cancel() { if (m_state >= State::Cancelled) @@ -736,13 +738,8 @@ void Http2Handler::deadlineTimeout() { Q_ASSERT_X(m_stream, "onDeadlineTimeout", "stream is not available"); - if (m_operation.expired()) { - qGrpcWarning("Operation expired on deadline timeout"); - return; - } cancel(); - emit m_operation.lock()->finished({ StatusCode::DeadlineExceeded, "Deadline Exceeded" }); - deleteLater(); + finish({ StatusCode::DeadlineExceeded, "Deadline Exceeded" }); } void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase phase) @@ -766,8 +763,7 @@ void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase p bool hasGrpcStatus : 1; }; - auto ctx = m_operation.lock(); - if (!ctx) + if (handleContextExpired()) return; HeaderValidation validation{ @@ -786,16 +782,13 @@ void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase p for (const auto &[k, v] : headers) { if (validation.requireHttpStatus && k == HttpStatusHeader) { if (const auto status = v.toInt(); status != 200) { - ctx->finished({ http2StatusToStatusCode(status), - "Received HTTP/2 status: %1"_L1.arg(v) }); - deleteLater(); + finish({ http2StatusToStatusCode(status), "Received HTTP/2 status: %1"_L1.arg(v) }); return; } validation.hasHttpStatus = true; } else if (validation.requireContentType && k == ContentTypeHeader) { if (!v.toLower().startsWith(DefaultContentType)) { - ctx->finished({ StatusCode::Internal, "Unexpected content-type: %1"_L1.arg(v) }); - deleteLater(); + finish({ StatusCode::Internal, "Unexpected content-type: %1"_L1.arg(v) }); return; } validation.hasContentType = true; @@ -803,9 +796,7 @@ void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase p bool ok; const auto parsed = v.toShort(&ok); if (!ok) { - ctx->finished({ StatusCode::Internal, - "Failed to parse gRPC-status: %1"_L1.arg(v) }); - deleteLater(); + finish({ StatusCode::Internal, "Failed to parse gRPC-status: %1"_L1.arg(v) }); return; } statusCode = static_cast(parsed); @@ -834,37 +825,31 @@ void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase p } if (validation.requireHttpStatus && !validation.hasHttpStatus) { - ctx->finished({ StatusCode::Internal, - "Missing valid '%1' header"_L1.arg(HttpStatusHeader) }); - deleteLater(); + finish({ StatusCode::Internal, "Missing valid '%1' header"_L1.arg(HttpStatusHeader) }); return; } if (validation.requireContentType && !validation.hasContentType) { - ctx->finished({ StatusCode::Internal, - "Missing valid '%1' header"_L1.arg(ContentTypeHeader) }); - deleteLater(); + finish({ StatusCode::Internal, "Missing valid '%1' header"_L1.arg(ContentTypeHeader) }); return; } if (validation.requireGrpcStatus && !validation.hasGrpcStatus) { - ctx->finished({ StatusCode::Internal, "Missing status code in trailers"_L1 }); - deleteLater(); + finish({ StatusCode::Internal, "Missing status code in trailers"_L1 }); return; } switch (phase) { case HeaderPhase::Initial: - ctx->setServerMetadata(std::move(metadata)); + m_operation->setServerMetadata(std::move(metadata)); break; case HeaderPhase::TrailersOnly: [[fallthrough]]; case HeaderPhase::Trailers: { - auto md = ctx->serverMetadata(); + auto md = m_operation->serverMetadata(); md.insert(metadata); - ctx->setServerMetadata(std::move(md)); - ctx->finished({ *statusCode, statusMessage }); - deleteLater(); + m_operation->setServerMetadata(std::move(md)); + finish({ *statusCode, statusMessage }); } break; default: Q_UNREACHABLE(); @@ -1022,34 +1007,33 @@ QGrpcHttp2ChannelPrivate::QGrpcHttp2ChannelPrivate(const QUrl &uri, QGrpcHttp2Ch m_reconnectFunction(); } -void QGrpcHttp2ChannelPrivate::processOperation(const std::shared_ptr - &operationContext, +void QGrpcHttp2ChannelPrivate::processOperation(QGrpcOperationContext *operationContext, bool endStream) { - auto *operationContextPtr = operationContext.get(); - Q_ASSERT_X(operationContextPtr != nullptr, "QGrpcHttp2ChannelPrivate::processOperation", + Q_ASSERT_X(operationContext != nullptr, "QGrpcHttp2ChannelPrivate::processOperation", "operation context is nullptr."); + // Send the finished signals asynchronously, so user connections work correctly. if (!m_socket->isWritable() && m_state == ConnectionState::Connected) { - operationContextAsyncError(operationContextPtr, - QGrpcStatus{ StatusCode::Unavailable, - m_socket->errorString() }); + QTimer::singleShot(0, operationContext, + [operationContext, err = m_socket->errorString()]() { + emit operationContext->finished({ StatusCode::Unavailable, err }); + }); return; } + auto *handler = new Http2Handler(operationContext, this, endStream); + #if QT_CONFIG(localserver) if (m_isLocalSocket) { - connectErrorHandler(static_cast(m_socket.get()), - operationContextPtr); + connectErrorHandler(static_cast(m_socket.get()), handler); } else #endif { connectErrorHandler(static_cast(m_socket.get()), - operationContextPtr); + handler); } - auto *handler = new Http2Handler(operationContext, this, endStream); - if (m_connection && !createHttp2Stream(handler)) return; @@ -1126,14 +1110,11 @@ bool QGrpcHttp2ChannelPrivate::createHttp2Stream(Http2Handler *handler) Q_ASSERT(handler != nullptr); Q_ASSERT(m_connection); - auto *channelOpPtr = handler->operation(); const auto streamAttempt = m_connection->createStream(); if (!streamAttempt.ok()) { - operationContextAsyncError(channelOpPtr, - QGrpcStatus{ - StatusCode::Unavailable, - tr("Unable to create an HTTP/2 stream (%1)") - .arg(QDebug::toString(streamAttempt.error())) }); + handler->asyncFinish({ StatusCode::Unavailable, + tr("Unable to create an HTTP/2 stream (%1)") + .arg(QDebug::toString(streamAttempt.error())) }); return false; } handler->attachStream(streamAttempt.unwrap()); @@ -1182,7 +1163,7 @@ QUrl QGrpcHttp2Channel::hostUri() const */ void QGrpcHttp2Channel::call(std::shared_ptr operationContext) { - d_ptr->processOperation(operationContext, true); + d_ptr->processOperation(operationContext.get(), true); } /*! @@ -1191,7 +1172,7 @@ void QGrpcHttp2Channel::call(std::shared_ptr operationCon */ void QGrpcHttp2Channel::serverStream(std::shared_ptr operationContext) { - d_ptr->processOperation(operationContext, true); + d_ptr->processOperation(operationContext.get(), true); } /*! @@ -1200,7 +1181,7 @@ void QGrpcHttp2Channel::serverStream(std::shared_ptr oper */ void QGrpcHttp2Channel::clientStream(std::shared_ptr operationContext) { - d_ptr->processOperation(operationContext); + d_ptr->processOperation(operationContext.get()); } /*! @@ -1209,7 +1190,7 @@ void QGrpcHttp2Channel::clientStream(std::shared_ptr oper */ void QGrpcHttp2Channel::bidiStream(std::shared_ptr operationContext) { - d_ptr->processOperation(operationContext); + d_ptr->processOperation(operationContext.get()); } /*!