From 88b73b67b098ab99d33c01fa3c347d6c0dec7ef1 Mon Sep 17 00:00:00 2001 From: Dennis Oberst Date: Wed, 17 Sep 2025 18:04:43 +0200 Subject: [PATCH] QGrpcHttp2Channel: Replace ExpectedData with GrpcDataParser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We are missing functionality! We are omitting any compressed messages. This patch doesn't add the compression handling but a more structured and sane way of encapsulating the data parsing. Added the compression flag extraction to the parsing. Task-number: QTBUG-129286 Pick-to: 6.10 6.9 6.8 Change-Id: I699fdb6fb0279453a367930e950c2af3e992063d Reviewed-by: MÃ¥rten Nordheim --- src/grpc/qgrpchttp2channel.cpp | 75 +++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/src/grpc/qgrpchttp2channel.cpp b/src/grpc/qgrpchttp2channel.cpp index ce7dc03d..37dd3c3a 100644 --- a/src/grpc/qgrpchttp2channel.cpp +++ b/src/grpc/qgrpchttp2channel.cpp @@ -252,21 +252,48 @@ bool hasSslConfiguration(const QGrpcChannelOptions &opts) } // namespace -struct ExpectedData +class GrpcDataParser { - qsizetype expectedSize = 0; - QByteArray container; - - bool updateExpectedSize() +public: + struct Frame { - if (expectedSize == 0) { - if (container.size() < GrpcMessageSizeHeaderSize) - return false; - expectedSize = qFromBigEndian(container.data() + 1) - + GrpcMessageSizeHeaderSize; + Frame(QByteArray &&payload, bool isCompressed) + : payload(std::move(payload)), isCompressed(isCompressed) + { } - return true; + QByteArray payload; + bool isCompressed = false; + }; + // Parses the next complete gRPC frame from the buffer. Removes the frame + // on success, or returns std::nullopt if incomplete. + std::optional parseNextFrame() + { + static constexpr qsizetype FlagOffset = 0; + static constexpr qsizetype LengthOffset = 1; + + std::optional out; + if (container.size() < GrpcMessageSizeHeaderSize) + return out; + + // Parse length (big endian, 4 bytes after flag) + const auto messageLength = qFromBigEndian< + quint32>(reinterpret_cast(container.constData() + LengthOffset)); + const qsizetype frameSize = GrpcMessageSizeHeaderSize + messageLength; + + if (container.size() < frameSize) + return out; // Incomplete frame in buffer. Wait for more data + + out.emplace(container.mid(GrpcMessageSizeHeaderSize, messageLength), + container.at(FlagOffset) != 0); + container.remove(0, frameSize); + return out; } + + void feed(const QByteArray &data) { container.append(data); } + qsizetype bytesAvailable() const { return container.size(); } + +private: + QByteArray container; }; // The Http2Handler manages an individual RPC over the HTTP/2 channel. @@ -334,7 +361,7 @@ private: HPack::HttpHeader m_initialHeaders; QQueue m_queue; QPointer m_stream; - ExpectedData m_expectedData; + GrpcDataParser m_grpcDataParser; State m_state = State::Idle; const bool m_endStreamAtFirstData; bool m_writesDoneSent = false; @@ -508,25 +535,15 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) if (m_state == State::Cancelled) return; - m_expectedData.container.append(data); - - if (!m_expectedData.updateExpectedSize()) - return; - - while (m_expectedData.container.size() >= m_expectedData.expectedSize) { + m_grpcDataParser.feed(data); + while (auto frame = m_grpcDataParser.parseNextFrame()) { qCDebug(lcStream, - "[%p] About to process message (receivedSize=%" PRIdQSIZETYPE ", " - "expectedSize=%" PRIdQSIZETYPE ", containerSize=%" PRIdQSIZETYPE ")", - this, data.size(), m_expectedData.expectedSize, - m_expectedData.container.size()); - const auto len = m_expectedData.expectedSize - GrpcMessageSizeHeaderSize; - const auto msg = m_expectedData.container.mid(GrpcMessageSizeHeaderSize, len); - emit m_context->messageReceived(msg); + "[%p] Processed gRPC message (compressed=%s, " + "payloadSize=%" PRIdQSIZETYPE ", bufferRemaining=%" PRIdQSIZETYPE ")", + this, frame->isCompressed ? "true" : "false", frame->payload.size(), + m_grpcDataParser.bytesAvailable()); - m_expectedData.container.remove(0, m_expectedData.expectedSize); - m_expectedData.expectedSize = 0; - if (!m_expectedData.updateExpectedSize()) - return; + emit m_context->messageReceived(frame->payload); } if (endStream)