diff --git a/src/grpc/qgrpchttp2channel.cpp b/src/grpc/qgrpchttp2channel.cpp index b0cf2040..6967b001 100644 --- a/src/grpc/qgrpchttp2channel.cpp +++ b/src/grpc/qgrpchttp2channel.cpp @@ -237,16 +237,6 @@ constexpr StatusCode http2StatusToStatusCode(const int status) } } -// Sends the errorOccured and finished signals asynchronously to make sure user -// connections work correctly. -void operationContextAsyncError(QGrpcOperationContext *operationContext, const QGrpcStatus &status) -{ - Q_ASSERT_X(operationContext != nullptr, "QGrpcHttp2ChannelPrivate::operationContextAsyncError", - "operationContext is null"); - QTimer::singleShot(0, operationContext, - [operationContext, status]() { emit operationContext->finished(status); }); -} - } // namespace struct ExpectedData @@ -286,16 +276,18 @@ public: }; Q_ENUM(State); - explicit Http2Handler(const std::shared_ptr &operation, - QGrpcHttp2ChannelPrivate *parent, bool endStream); + explicit Http2Handler(QGrpcOperationContext *operation, QGrpcHttp2ChannelPrivate *parent, + bool endStream); ~Http2Handler() override; void sendInitialRequest(); void attachStream(QHttp2Stream *stream_); void processQueue(); - [[nodiscard]] QGrpcOperationContext *operation() const; - [[nodiscard]] bool expired() const { return m_operation.expired(); } + void finish(const QGrpcStatus &status); + void asyncFinish(const QGrpcStatus &status); + + [[nodiscard]] bool expired() const { return !m_operation; } [[nodiscard]] bool isStreamClosedForSending() const { @@ -318,8 +310,9 @@ private: [[nodiscard]] HPack::HttpHeader constructInitialHeaders() const; [[nodiscard]] QGrpcHttp2ChannelPrivate *channelPriv() const; [[nodiscard]] QGrpcHttp2Channel *channel() const; + [[nodiscard]] bool handleContextExpired(); - std::weak_ptr m_operation; + QPointer m_operation; HPack::HttpHeader m_initialHeaders; QQueue m_queue; QPointer m_stream; @@ -339,8 +332,7 @@ public: explicit QGrpcHttp2ChannelPrivate(const QUrl &uri, QGrpcHttp2Channel *q); ~QGrpcHttp2ChannelPrivate() override = default; - void processOperation(const std::shared_ptr &operationContext, - bool endStream = false); + void processOperation(QGrpcOperationContext *operationContext, bool endStream = false); [[nodiscard]] bool isLocalSocket() const { @@ -363,10 +355,10 @@ private: enum ConnectionState { Connecting = 0, Connected, SettingsReceived, Error }; template - void connectErrorHandler(T *socket, QGrpcOperationContext *operationContext) + void connectErrorHandler(T *socket, Http2Handler *handler) { - QObject::connect(socket, &T::errorOccurred, operationContext, - [operationContextPtr = QPointer(operationContext), this](auto error) { + QObject::connect(socket, &T::errorOccurred, handler, + [this, handler](auto error) { if (m_isInsideSocketErrorOccurred) { qGrpcCritical("Socket errorOccurred signal triggered while " "already handling an error"); @@ -376,10 +368,8 @@ private: auto reset = qScopeGuard([this]() { m_isInsideSocketErrorOccurred = false; }); - emit operationContextPtr->finished(QGrpcStatus{ - StatusCode::Unavailable, - QGrpcHttp2ChannelPrivate::tr("Network error occurred: %1") - .arg(error) }); + emit handler->finish({ StatusCode::Unavailable, + tr("Network error occurred: %1").arg(error) }); }); } @@ -437,28 +427,31 @@ private: /// ## Http2Handler Implementations /// -Http2Handler::Http2Handler(const std::shared_ptr &operation, - QGrpcHttp2ChannelPrivate *parent, bool endStream) +Http2Handler::Http2Handler(QGrpcOperationContext *operation, QGrpcHttp2ChannelPrivate *parent, + bool endStream) : QObject(parent), m_operation(operation), m_initialHeaders(constructInitialHeaders()), m_endStreamAtFirstData(endStream) { - auto *channelOpPtr = operation.get(); - QObject::connect(channelOpPtr, &QGrpcOperationContext::cancelRequested, this, [this] { + // If the context (lifetime bound to the user) is destroyed, this handler + // can no longer perform any meaningful work. We allow it to be deleted; + // QHttp2Stream will handle any outstanding cancellations appropriately. + QObject::connect(operation, &QGrpcOperationContext::destroyed, this, + &Http2Handler::deleteLater); + QObject::connect(operation, &QGrpcOperationContext::cancelRequested, this, [this] { cancel(); deleteLater(); }); - QObject::connect(channelOpPtr, &QGrpcOperationContext::writesDoneRequested, this, + QObject::connect(operation, &QGrpcOperationContext::writesDoneRequested, this, &Http2Handler::writesDone); if (!m_endStreamAtFirstData) { - QObject::connect(channelOpPtr, &QGrpcOperationContext::writeMessageRequested, this, + QObject::connect(operation, &QGrpcOperationContext::writeMessageRequested, this, &Http2Handler::writeMessage); } - QObject::connect(channelOpPtr, &QGrpcOperationContext::finished, &m_deadlineTimer, - &QTimer::stop); + QObject::connect(operation, &QGrpcOperationContext::finished, &m_deadlineTimer, &QTimer::stop); m_deadlineTimer.setSingleShot(true); - writeMessage(channelOpPtr->argument()); + writeMessage(operation->argument()); } Http2Handler::~Http2Handler() @@ -478,10 +471,9 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) Q_ASSERT(m_stream == nullptr); Q_ASSERT(stream_ != nullptr); - auto *channelOpPtr = operation(); m_stream = stream_; - QObject::connect(m_stream.get(), &QHttp2Stream::headersReceived, channelOpPtr, + QObject::connect(m_stream.get(), &QHttp2Stream::headersReceived, this, [this](const HPack::HttpHeader &headers, bool endStream) mutable { if (m_state >= State::Cancelled) { // In case we are Cancelled or Finished, a @@ -511,19 +503,14 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) }); QObject::connect( - m_stream.get(), &QHttp2Stream::errorOccurred, channelPriv(), + m_stream.get(), &QHttp2Stream::errorOccurred, this, [this](quint32 http2ErrorCode, const QString &errorString) { - if (!m_operation.expired()) { - auto channelOp = m_operation.lock(); - emit channelOp->finished(QGrpcStatus{ http2ErrorToStatusCode(http2ErrorCode), - errorString }); - } - deleteLater(); + finish({ http2ErrorToStatusCode(http2ErrorCode), errorString }); }, Qt::SingleShotConnection); - QObject::connect(m_stream.get(), &QHttp2Stream::dataReceived, channelOpPtr, - [channelOpPtr, this](const QByteArray &data, bool endStream) { + QObject::connect(m_stream.get(), &QHttp2Stream::dataReceived, m_operation.get(), + [this](const QByteArray &data, bool endStream) { if (m_state != State::Cancelled) { m_expectedData.container.append(data); @@ -535,7 +522,7 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) qGrpcDebug() << "Full data received:" << data.size() << "dataContainer:" << m_expectedData.container.size() << "capacity:" << m_expectedData.expectedSize; - emit channelOpPtr + emit m_operation ->messageReceived(m_expectedData.container .mid(GrpcMessageSizeHeaderSize, m_expectedData.expectedSize @@ -545,11 +532,9 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) if (!m_expectedData.updateExpectedSize()) return; } - if (endStream) { - m_state = State::Finished; - emit channelOpPtr->finished({}); - deleteLater(); - } + + if (endStream) + finish({}); } }); @@ -558,13 +543,6 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) } -QGrpcOperationContext *Http2Handler::operation() const -{ - Q_ASSERT(!m_operation.expired()); - - return m_operation.lock().get(); -} - // Builds HTTP/2 headers for the initial gRPC request. // The headers are sent once the HTTP/2 connection is established. HPack::HttpHeader Http2Handler::constructInitialHeaders() const @@ -585,11 +563,10 @@ HPack::HttpHeader Http2Handler::constructInitialHeaders() const + QSysInfo::productVersion().toUtf8() + ')'); const auto &channelOptions = channel()->channelOptions(); - const auto *operationContext = operation(); const auto *channel = channelPriv(); - QByteArray service{ operationContext->service() }; - QByteArray method{ operationContext->method() }; + QByteArray service{ m_operation->service() }; + QByteArray method{ m_operation->method() }; auto headers = HPack::HttpHeader{ { AuthorityHeader, channel->authorityHeader() }, { MethodHeader, MethodValue }, @@ -614,7 +591,7 @@ HPack::HttpHeader Http2Handler::constructInitialHeaders() const }; iterateMetadata(channelOptions.metadata(QtGrpc::MultiValue)); - iterateMetadata(operationContext->callOptions().metadata(QtGrpc::MultiValue)); + iterateMetadata(m_operation->callOptions().metadata(QtGrpc::MultiValue)); return headers; } @@ -628,6 +605,15 @@ QGrpcHttp2Channel *Http2Handler::channel() const return channelPriv()->q_ptr; } +bool Http2Handler::handleContextExpired() +{ + if (m_operation) + return false; + m_state = State::Cancelled; + deleteLater(); // m_stream will sendRST_STREAM on destruction + return true; +} + // Slot to enqueue a writeMessage request, either from the initial message // or from the user in client/bidirectional streaming RPCs. void Http2Handler::writeMessage(QByteArrayView data) @@ -661,18 +647,16 @@ void Http2Handler::sendInitialRequest() Q_ASSERT(m_state == State::Idle); if (!m_stream->sendHEADERS(m_initialHeaders, false)) { - m_state = State::Finished; - operationContextAsyncError(operation(), - QGrpcStatus{ - StatusCode::Unavailable, - tr("Unable to send initial headers to an HTTP/2 stream") }); + asyncFinish({ StatusCode::Unavailable, + tr("Unable to send initial headers to an HTTP/2 stream") }); return; } m_state = State::RequestHeadersSent; m_initialHeaders.clear(); processQueue(); - std::optional deadline = operation()->callOptions().deadlineTimeout(); + std::optional deadline = m_operation->callOptions() + .deadlineTimeout(); if (!deadline) deadline = channel()->channelOptions().deadlineTimeout(); if (deadline) { @@ -702,6 +686,24 @@ void Http2Handler::processQueue() m_stream->sendDATA(nextMessage, closeStream); } +void Http2Handler::finish(const QGrpcStatus &status) +{ + if (handleContextExpired()) + return; + if (m_state == State::Finished) + return; + if (m_state != State::Cancelled) // don't overwrite the Cancelled state + m_state = State::Finished; + emit m_operation->finished(status); + deleteLater(); +} +void Http2Handler::asyncFinish(const QGrpcStatus &status) +{ + if (handleContextExpired()) + return; + QTimer::singleShot(0, m_operation.get(), [this, status]() { finish(status); }); +} + void Http2Handler::cancel() { if (m_state >= State::Cancelled) @@ -736,13 +738,8 @@ void Http2Handler::deadlineTimeout() { Q_ASSERT_X(m_stream, "onDeadlineTimeout", "stream is not available"); - if (m_operation.expired()) { - qGrpcWarning("Operation expired on deadline timeout"); - return; - } cancel(); - emit m_operation.lock()->finished({ StatusCode::DeadlineExceeded, "Deadline Exceeded" }); - deleteLater(); + finish({ StatusCode::DeadlineExceeded, "Deadline Exceeded" }); } void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase phase) @@ -766,8 +763,7 @@ void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase p bool hasGrpcStatus : 1; }; - auto ctx = m_operation.lock(); - if (!ctx) + if (handleContextExpired()) return; HeaderValidation validation{ @@ -786,16 +782,13 @@ void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase p for (const auto &[k, v] : headers) { if (validation.requireHttpStatus && k == HttpStatusHeader) { if (const auto status = v.toInt(); status != 200) { - ctx->finished({ http2StatusToStatusCode(status), - "Received HTTP/2 status: %1"_L1.arg(v) }); - deleteLater(); + finish({ http2StatusToStatusCode(status), "Received HTTP/2 status: %1"_L1.arg(v) }); return; } validation.hasHttpStatus = true; } else if (validation.requireContentType && k == ContentTypeHeader) { if (!v.toLower().startsWith(DefaultContentType)) { - ctx->finished({ StatusCode::Internal, "Unexpected content-type: %1"_L1.arg(v) }); - deleteLater(); + finish({ StatusCode::Internal, "Unexpected content-type: %1"_L1.arg(v) }); return; } validation.hasContentType = true; @@ -803,9 +796,7 @@ void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase p bool ok; const auto parsed = v.toShort(&ok); if (!ok) { - ctx->finished({ StatusCode::Internal, - "Failed to parse gRPC-status: %1"_L1.arg(v) }); - deleteLater(); + finish({ StatusCode::Internal, "Failed to parse gRPC-status: %1"_L1.arg(v) }); return; } statusCode = static_cast(parsed); @@ -834,37 +825,31 @@ void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase p } if (validation.requireHttpStatus && !validation.hasHttpStatus) { - ctx->finished({ StatusCode::Internal, - "Missing valid '%1' header"_L1.arg(HttpStatusHeader) }); - deleteLater(); + finish({ StatusCode::Internal, "Missing valid '%1' header"_L1.arg(HttpStatusHeader) }); return; } if (validation.requireContentType && !validation.hasContentType) { - ctx->finished({ StatusCode::Internal, - "Missing valid '%1' header"_L1.arg(ContentTypeHeader) }); - deleteLater(); + finish({ StatusCode::Internal, "Missing valid '%1' header"_L1.arg(ContentTypeHeader) }); return; } if (validation.requireGrpcStatus && !validation.hasGrpcStatus) { - ctx->finished({ StatusCode::Internal, "Missing status code in trailers"_L1 }); - deleteLater(); + finish({ StatusCode::Internal, "Missing status code in trailers"_L1 }); return; } switch (phase) { case HeaderPhase::Initial: - ctx->setServerMetadata(std::move(metadata)); + m_operation->setServerMetadata(std::move(metadata)); break; case HeaderPhase::TrailersOnly: [[fallthrough]]; case HeaderPhase::Trailers: { - auto md = ctx->serverMetadata(); + auto md = m_operation->serverMetadata(); md.insert(metadata); - ctx->setServerMetadata(std::move(md)); - ctx->finished({ *statusCode, statusMessage }); - deleteLater(); + m_operation->setServerMetadata(std::move(md)); + finish({ *statusCode, statusMessage }); } break; default: Q_UNREACHABLE(); @@ -1022,34 +1007,33 @@ QGrpcHttp2ChannelPrivate::QGrpcHttp2ChannelPrivate(const QUrl &uri, QGrpcHttp2Ch m_reconnectFunction(); } -void QGrpcHttp2ChannelPrivate::processOperation(const std::shared_ptr - &operationContext, +void QGrpcHttp2ChannelPrivate::processOperation(QGrpcOperationContext *operationContext, bool endStream) { - auto *operationContextPtr = operationContext.get(); - Q_ASSERT_X(operationContextPtr != nullptr, "QGrpcHttp2ChannelPrivate::processOperation", + Q_ASSERT_X(operationContext != nullptr, "QGrpcHttp2ChannelPrivate::processOperation", "operation context is nullptr."); + // Send the finished signals asynchronously, so user connections work correctly. if (!m_socket->isWritable() && m_state == ConnectionState::Connected) { - operationContextAsyncError(operationContextPtr, - QGrpcStatus{ StatusCode::Unavailable, - m_socket->errorString() }); + QTimer::singleShot(0, operationContext, + [operationContext, err = m_socket->errorString()]() { + emit operationContext->finished({ StatusCode::Unavailable, err }); + }); return; } + auto *handler = new Http2Handler(operationContext, this, endStream); + #if QT_CONFIG(localserver) if (m_isLocalSocket) { - connectErrorHandler(static_cast(m_socket.get()), - operationContextPtr); + connectErrorHandler(static_cast(m_socket.get()), handler); } else #endif { connectErrorHandler(static_cast(m_socket.get()), - operationContextPtr); + handler); } - auto *handler = new Http2Handler(operationContext, this, endStream); - if (m_connection && !createHttp2Stream(handler)) return; @@ -1126,14 +1110,11 @@ bool QGrpcHttp2ChannelPrivate::createHttp2Stream(Http2Handler *handler) Q_ASSERT(handler != nullptr); Q_ASSERT(m_connection); - auto *channelOpPtr = handler->operation(); const auto streamAttempt = m_connection->createStream(); if (!streamAttempt.ok()) { - operationContextAsyncError(channelOpPtr, - QGrpcStatus{ - StatusCode::Unavailable, - tr("Unable to create an HTTP/2 stream (%1)") - .arg(QDebug::toString(streamAttempt.error())) }); + handler->asyncFinish({ StatusCode::Unavailable, + tr("Unable to create an HTTP/2 stream (%1)") + .arg(QDebug::toString(streamAttempt.error())) }); return false; } handler->attachStream(streamAttempt.unwrap()); @@ -1182,7 +1163,7 @@ QUrl QGrpcHttp2Channel::hostUri() const */ void QGrpcHttp2Channel::call(std::shared_ptr operationContext) { - d_ptr->processOperation(operationContext, true); + d_ptr->processOperation(operationContext.get(), true); } /*! @@ -1191,7 +1172,7 @@ void QGrpcHttp2Channel::call(std::shared_ptr operationCon */ void QGrpcHttp2Channel::serverStream(std::shared_ptr operationContext) { - d_ptr->processOperation(operationContext, true); + d_ptr->processOperation(operationContext.get(), true); } /*! @@ -1200,7 +1181,7 @@ void QGrpcHttp2Channel::serverStream(std::shared_ptr oper */ void QGrpcHttp2Channel::clientStream(std::shared_ptr operationContext) { - d_ptr->processOperation(operationContext); + d_ptr->processOperation(operationContext.get()); } /*! @@ -1209,7 +1190,7 @@ void QGrpcHttp2Channel::clientStream(std::shared_ptr oper */ void QGrpcHttp2Channel::bidiStream(std::shared_ptr operationContext) { - d_ptr->processOperation(operationContext); + d_ptr->processOperation(operationContext.get()); } /*!