diff --git a/src/grpc/qgrpchttp2channel.cpp b/src/grpc/qgrpchttp2channel.cpp index 669ab6fb..698d97a0 100644 --- a/src/grpc/qgrpchttp2channel.cpp +++ b/src/grpc/qgrpchttp2channel.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #if QT_CONFIG(localserver) # include @@ -178,6 +179,7 @@ const QByteArray GrpcStatusMessageHeader("grpc-message"); const QByteArray DefaultContentType("application/grpc"); const QByteArray GrpcStatusDetailsHeader("grpc-status-details-bin"); const QByteArray GrpcAcceptEncodingHeader("grpc-accept-encoding"); +const QByteArray GrpcAcceptEncodingValue("identity,deflate,gzip"); const QByteArray GrpcEncodingHeader("grpc-encoding"); constexpr qsizetype GrpcMessageSizeHeaderSize = 5; @@ -362,6 +364,8 @@ private: QQueue m_queue; QPointer m_stream; GrpcDataParser m_grpcDataParser; + QByteArray m_negotiatedEncoding; + std::unique_ptr m_decompressor; State m_state = State::Idle; const bool m_endStreamAtFirstData; bool m_writesDoneSent = false; @@ -537,13 +541,41 @@ void Http2Handler::attachStream(QHttp2Stream *stream_) m_grpcDataParser.feed(data); while (auto frame = m_grpcDataParser.parseNextFrame()) { + QByteArray finalPayload; + + if (frame->isCompressed) { + if (!m_decompressor || m_negotiatedEncoding.isEmpty()) { + finish({ QtGrpc::StatusCode::Internal, + "Protocol error: received compressed message " + "but no encoding was negotiated." }); + return; + } + m_decompressor->feed(std::move(frame->payload)); + // Read all decompressed data for this single message. + while (m_decompressor->hasData()) { + char buffer[4096]; + qsizetype bytesRead = m_decompressor->read(buffer, sizeof(buffer)); + if (bytesRead < 0) { + finish({ QtGrpc::StatusCode::Internal, + "Decompression failed: %1"_L1 + .arg(m_decompressor->errorString()) }); + return; + } + finalPayload.append(buffer, bytesRead); + } + m_decompressor->clear(); + m_decompressor->setEncoding(m_negotiatedEncoding); + } else { + finalPayload = std::move(frame->payload); + } + qCDebug(lcStream, "[%p] Processed gRPC message (compressed=%s, " "payloadSize=%" PRIdQSIZETYPE ", bufferRemaining=%" PRIdQSIZETYPE ")", - this, frame->isCompressed ? "true" : "false", frame->payload.size(), + this, frame->isCompressed ? "true" : "false", finalPayload.size(), m_grpcDataParser.bytesAvailable()); - emit m_context->messageReceived(frame->payload); + emit m_context->messageReceived(finalPayload); } if (endStream) { @@ -573,7 +605,6 @@ HPack::HttpHeader Http2Handler::constructInitialHeaders() const const static QByteArray TEHeader("te"); const static QByteArray TEValue("trailers"); const static QByteArray GrpcServiceNameHeader("service-name"); - const static QByteArray GrpcAcceptEncodingValue("identity,deflate,gzip"); const static QByteArray UserAgentHeader("user-agent"); const static QByteArray UserAgentValue("grpc-c++-qtgrpc/"_ba + QT_VERSION_STR + " ("_ba + QSysInfo::productType().toUtf8() + '/' @@ -832,10 +863,28 @@ void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase p } else if (validation.requireGrpcStatus && k == GrpcStatusDetailsHeader) { // Allowed optional headers // TODO: Implement status-details - QTBUG-138362 - } else if (phase == HeaderPhase::Initial - && (k == GrpcEncodingHeader || k == GrpcAcceptEncodingHeader)) { + } else if (phase == HeaderPhase::Initial && k == GrpcEncodingHeader) { // Allowed optional headers - // TODO: Implement compression handling - QTBUG-129286 + if (v == "identity"_ba) + continue; + if (!GrpcAcceptEncodingValue.contains(v) + || !QDecompressHelper::isSupportedEncoding(v)) { + finish({ StatusCode::Internal, + "Server responded with an unsupported compression algorithm: %1"_L1 + .arg(v) }); + return; + } + // Create and configure the decompressor for this stream. + m_decompressor = std::make_unique(); + if (!m_decompressor->setEncoding(v)) { + finish({ StatusCode::Internal, + "Failed to initialize decompressor for algorithm: %1"_L1.arg(v) }); + return; + } + m_negotiatedEncoding = v; + } else if (phase == HeaderPhase::Initial && k == GrpcAcceptEncodingHeader) { + // Allowed optional headers + // TODO: Implement client-side (request) compression handling - QTBUG-140235 } else if (k.startsWith(':')) { qCWarning(lcStream, "[%p] Received unhandled HTTP/2 pseudo-header: { key: '%s', value: '%s' } " diff --git a/tests/auto/grpc/client/end2end/event.proto b/tests/auto/grpc/client/end2end/event.proto index 499bfc3e..7d09d928 100644 --- a/tests/auto/grpc/client/end2end/event.proto +++ b/tests/auto/grpc/client/end2end/event.proto @@ -13,3 +13,7 @@ message Event { string name = 2; uint64 number = 3; } + +message EventList { + repeated Event events = 1; +} diff --git a/tests/auto/grpc/client/end2end/eventhub.proto b/tests/auto/grpc/client/end2end/eventhub.proto index 8ef30cab..4dd925d1 100644 --- a/tests/auto/grpc/client/end2end/eventhub.proto +++ b/tests/auto/grpc/client/end2end/eventhub.proto @@ -10,6 +10,7 @@ message None {} service EventHub { rpc Push(Event) returns (None) {} rpc Subscribe(None) returns (stream Event) {} + rpc SubscribeList(None) returns (stream EventList) {} rpc Notify(stream Event) returns (None) {} rpc Exchange(stream Event) returns (stream Event) {} } diff --git a/tests/auto/grpc/client/end2end/tst_grpc_client_end2end.cpp b/tests/auto/grpc/client/end2end/tst_grpc_client_end2end.cpp index 94fe0aaa..f0b07f1a 100644 --- a/tests/auto/grpc/client/end2end/tst_grpc_client_end2end.cpp +++ b/tests/auto/grpc/client/end2end/tst_grpc_client_end2end.cpp @@ -79,6 +79,9 @@ private Q_SLOTS: void bidiStreamsInOrder(); + void clientHandlesCompression_data() const; + void clientHandlesCompression(); + private: static std::shared_ptr serverSslCredentials() { @@ -494,6 +497,93 @@ void QtGrpcClientEnd2EndTest::bidiStreamsInOrder() QVERIFY(finishedSpy.wait()); } +void QtGrpcClientEnd2EndTest::clientHandlesCompression_data() const +{ + QTest::addColumn("compressionAlgo"); + QTest::addRow("compress(None)") << GRPC_COMPRESS_NONE; + QTest::addRow("compress(Deflate)") << GRPC_COMPRESS_DEFLATE; + QTest::addRow("compress(Gzip)") << GRPC_COMPRESS_GZIP; +} + +void QtGrpcClientEnd2EndTest::clientHandlesCompression() +{ + QFETCH(const grpc_compression_algorithm, compressionAlgo); + + class SubscribeListHandler : public AbstractRpcTag + { + public: + SubscribeListHandler(EventHub::AsyncService &service_, + const grpc_compression_algorithm compressionAlgo_) + : op(&context()), service(service_), compressionAlgo(compressionAlgo_) + { + context().set_compression_algorithm(compressionAlgo); + context().set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + // create some 'compressable' data. Try to make it more complex + // as compression is not guaranteed to actually be applied. + for (size_t i = 0; i < 100; ++i) { + const auto v = i % 10; + Event ev; + ev.set_name("server;server;" + std::to_string(v)); + ev.set_number(v); + response.mutable_events()->Add(std::move(ev)); + } + } + void start(grpc::ServerCompletionQueue *cq) override + { + service.RequestSubscribeList(&context(), &request, &op, cq, cq, this); + } + void process(bool ok) override + { + QVERIFY(ok); + if (index >= responseCount) { + op.Finish(grpc::Status::OK, new VoidTag()); + return; + } + grpc::WriteOptions wopts; + // Enable and disable the compression per-message + if (index % 2 == 0) + wopts.set_no_compression(); + op.Write(response, wopts, this); + ++index; + } + + grpc::ServerAsyncWriter op; + EventHub::AsyncService &service; + None request; + EventList response; + + size_t index = 0; + const grpc_compression_algorithm compressionAlgo; + const size_t responseCount = 20; + }; + + SubscribeListHandler handler(*m_service, compressionAlgo); + m_server->startRpcTag(handler); + + auto call = m_client->SubscribeList(qt::None{}); + QVERIFY(call); + + connect(call.get(), &QGrpcOperation::finished, this, + [&](const QGrpcStatus &status) { QCOMPARE(status.code(), QtGrpc::StatusCode::Ok); }); + connect(call.get(), &QGrpcServerStream::messageReceived, this, [&] { + auto response = call->read(); + QVERIFY(response); + QCOMPARE_EQ(response->events().size(), handler.response.events().size()); + for (int i = 0; i < response->events().size(); ++i) { + const auto &next = response->events().at(i); + const auto &baseline = handler.response.events().at(i); + QCOMPARE_EQ(next.name(), QString::fromStdString(baseline.name())); + QCOMPARE_EQ(next.number(), baseline.number()); + } + }); + + QVERIFY(m_server->startAsyncProcessing()); + + QSignalSpy finishedSpy(call.get(), &QGrpcOperation::finished); + QVERIFY(finishedSpy.isValid()); + QVERIFY(finishedSpy.wait()); +} + QTEST_MAIN(QtGrpcClientEnd2EndTest) #include "tst_grpc_client_end2end.moc"