mirror of https://github.com/qt/qtgrpc.git
QGrpcHttp2Channel: Implement dataframe decompression
Implement the missing decompression handling. We were advertising an
accepted encoding of GrpcAcceptEncodingValue("identity,deflate,gzip")
but only ever handled identity (i.e. no compression) !
We now check the negotiated content-encoding in the initialMetadata to
set up the appropriate decompression. Since compression can vary per
message, dynamic processing is required.
[ChangeLog][QGrpcHttp2Channel] Added missing decompression handling for
'deflate' and 'gzip'.
Fixes: QTBUG-129286
Pick-to: 6.10 6.9 6.8
Change-Id: I3ed4af7b21b51c52bccc3e6c314ae166e80c94a8
Reviewed-by: Alexey Edelev <alexey.edelev@qt.io>
This commit is contained in:
parent
dbf2b504c7
commit
efaba18e4a
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
#include <QtNetwork/private/hpack_p.h>
|
||||
#include <QtNetwork/private/http2protocol_p.h>
|
||||
#include <QtNetwork/private/qdecompresshelper_p.h>
|
||||
#include <QtNetwork/private/qhttp2connection_p.h>
|
||||
#if QT_CONFIG(localserver)
|
||||
# include <QtNetwork/qlocalsocket.h>
|
||||
|
|
@ -178,6 +179,7 @@ const QByteArray GrpcStatusMessageHeader("grpc-message");
|
|||
const QByteArray DefaultContentType("application/grpc");
|
||||
const QByteArray GrpcStatusDetailsHeader("grpc-status-details-bin");
|
||||
const QByteArray GrpcAcceptEncodingHeader("grpc-accept-encoding");
|
||||
const QByteArray GrpcAcceptEncodingValue("identity,deflate,gzip");
|
||||
const QByteArray GrpcEncodingHeader("grpc-encoding");
|
||||
constexpr qsizetype GrpcMessageSizeHeaderSize = 5;
|
||||
|
||||
|
|
@ -362,6 +364,8 @@ private:
|
|||
QQueue<QByteArray> m_queue;
|
||||
QPointer<QHttp2Stream> m_stream;
|
||||
GrpcDataParser m_grpcDataParser;
|
||||
QByteArray m_negotiatedEncoding;
|
||||
std::unique_ptr<QDecompressHelper> m_decompressor;
|
||||
State m_state = State::Idle;
|
||||
const bool m_endStreamAtFirstData;
|
||||
bool m_writesDoneSent = false;
|
||||
|
|
@ -537,13 +541,41 @@ void Http2Handler::attachStream(QHttp2Stream *stream_)
|
|||
|
||||
m_grpcDataParser.feed(data);
|
||||
while (auto frame = m_grpcDataParser.parseNextFrame()) {
|
||||
QByteArray finalPayload;
|
||||
|
||||
if (frame->isCompressed) {
|
||||
if (!m_decompressor || m_negotiatedEncoding.isEmpty()) {
|
||||
finish({ QtGrpc::StatusCode::Internal,
|
||||
"Protocol error: received compressed message "
|
||||
"but no encoding was negotiated." });
|
||||
return;
|
||||
}
|
||||
m_decompressor->feed(std::move(frame->payload));
|
||||
// Read all decompressed data for this single message.
|
||||
while (m_decompressor->hasData()) {
|
||||
char buffer[4096];
|
||||
qsizetype bytesRead = m_decompressor->read(buffer, sizeof(buffer));
|
||||
if (bytesRead < 0) {
|
||||
finish({ QtGrpc::StatusCode::Internal,
|
||||
"Decompression failed: %1"_L1
|
||||
.arg(m_decompressor->errorString()) });
|
||||
return;
|
||||
}
|
||||
finalPayload.append(buffer, bytesRead);
|
||||
}
|
||||
m_decompressor->clear();
|
||||
m_decompressor->setEncoding(m_negotiatedEncoding);
|
||||
} else {
|
||||
finalPayload = std::move(frame->payload);
|
||||
}
|
||||
|
||||
qCDebug(lcStream,
|
||||
"[%p] Processed gRPC message (compressed=%s, "
|
||||
"payloadSize=%" PRIdQSIZETYPE ", bufferRemaining=%" PRIdQSIZETYPE ")",
|
||||
this, frame->isCompressed ? "true" : "false", frame->payload.size(),
|
||||
this, frame->isCompressed ? "true" : "false", finalPayload.size(),
|
||||
m_grpcDataParser.bytesAvailable());
|
||||
|
||||
emit m_context->messageReceived(frame->payload);
|
||||
emit m_context->messageReceived(finalPayload);
|
||||
}
|
||||
|
||||
if (endStream) {
|
||||
|
|
@ -573,7 +605,6 @@ HPack::HttpHeader Http2Handler::constructInitialHeaders() const
|
|||
const static QByteArray TEHeader("te");
|
||||
const static QByteArray TEValue("trailers");
|
||||
const static QByteArray GrpcServiceNameHeader("service-name");
|
||||
const static QByteArray GrpcAcceptEncodingValue("identity,deflate,gzip");
|
||||
const static QByteArray UserAgentHeader("user-agent");
|
||||
const static QByteArray UserAgentValue("grpc-c++-qtgrpc/"_ba + QT_VERSION_STR + " ("_ba
|
||||
+ QSysInfo::productType().toUtf8() + '/'
|
||||
|
|
@ -832,10 +863,28 @@ void Http2Handler::handleHeaders(const HPack::HttpHeader &headers, HeaderPhase p
|
|||
} else if (validation.requireGrpcStatus && k == GrpcStatusDetailsHeader) {
|
||||
// Allowed optional headers
|
||||
// TODO: Implement status-details - QTBUG-138362
|
||||
} else if (phase == HeaderPhase::Initial
|
||||
&& (k == GrpcEncodingHeader || k == GrpcAcceptEncodingHeader)) {
|
||||
} else if (phase == HeaderPhase::Initial && k == GrpcEncodingHeader) {
|
||||
// Allowed optional headers
|
||||
// TODO: Implement compression handling - QTBUG-129286
|
||||
if (v == "identity"_ba)
|
||||
continue;
|
||||
if (!GrpcAcceptEncodingValue.contains(v)
|
||||
|| !QDecompressHelper::isSupportedEncoding(v)) {
|
||||
finish({ StatusCode::Internal,
|
||||
"Server responded with an unsupported compression algorithm: %1"_L1
|
||||
.arg(v) });
|
||||
return;
|
||||
}
|
||||
// Create and configure the decompressor for this stream.
|
||||
m_decompressor = std::make_unique<QDecompressHelper>();
|
||||
if (!m_decompressor->setEncoding(v)) {
|
||||
finish({ StatusCode::Internal,
|
||||
"Failed to initialize decompressor for algorithm: %1"_L1.arg(v) });
|
||||
return;
|
||||
}
|
||||
m_negotiatedEncoding = v;
|
||||
} else if (phase == HeaderPhase::Initial && k == GrpcAcceptEncodingHeader) {
|
||||
// Allowed optional headers
|
||||
// TODO: Implement client-side (request) compression handling - QTBUG-140235
|
||||
} else if (k.startsWith(':')) {
|
||||
qCWarning(lcStream,
|
||||
"[%p] Received unhandled HTTP/2 pseudo-header: { key: '%s', value: '%s' } "
|
||||
|
|
|
|||
|
|
@ -13,3 +13,7 @@ message Event {
|
|||
string name = 2;
|
||||
uint64 number = 3;
|
||||
}
|
||||
|
||||
message EventList {
|
||||
repeated Event events = 1;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ message None {}
|
|||
service EventHub {
|
||||
rpc Push(Event) returns (None) {}
|
||||
rpc Subscribe(None) returns (stream Event) {}
|
||||
rpc SubscribeList(None) returns (stream EventList) {}
|
||||
rpc Notify(stream Event) returns (None) {}
|
||||
rpc Exchange(stream Event) returns (stream Event) {}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,6 +79,9 @@ private Q_SLOTS:
|
|||
|
||||
void bidiStreamsInOrder();
|
||||
|
||||
void clientHandlesCompression_data() const;
|
||||
void clientHandlesCompression();
|
||||
|
||||
private:
|
||||
static std::shared_ptr<grpc::ServerCredentials> serverSslCredentials()
|
||||
{
|
||||
|
|
@ -494,6 +497,93 @@ void QtGrpcClientEnd2EndTest::bidiStreamsInOrder()
|
|||
QVERIFY(finishedSpy.wait());
|
||||
}
|
||||
|
||||
void QtGrpcClientEnd2EndTest::clientHandlesCompression_data() const
|
||||
{
|
||||
QTest::addColumn<grpc_compression_algorithm>("compressionAlgo");
|
||||
QTest::addRow("compress(None)") << GRPC_COMPRESS_NONE;
|
||||
QTest::addRow("compress(Deflate)") << GRPC_COMPRESS_DEFLATE;
|
||||
QTest::addRow("compress(Gzip)") << GRPC_COMPRESS_GZIP;
|
||||
}
|
||||
|
||||
void QtGrpcClientEnd2EndTest::clientHandlesCompression()
|
||||
{
|
||||
QFETCH(const grpc_compression_algorithm, compressionAlgo);
|
||||
|
||||
class SubscribeListHandler : public AbstractRpcTag
|
||||
{
|
||||
public:
|
||||
SubscribeListHandler(EventHub::AsyncService &service_,
|
||||
const grpc_compression_algorithm compressionAlgo_)
|
||||
: op(&context()), service(service_), compressionAlgo(compressionAlgo_)
|
||||
{
|
||||
context().set_compression_algorithm(compressionAlgo);
|
||||
context().set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
|
||||
// create some 'compressable' data. Try to make it more complex
|
||||
// as compression is not guaranteed to actually be applied.
|
||||
for (size_t i = 0; i < 100; ++i) {
|
||||
const auto v = i % 10;
|
||||
Event ev;
|
||||
ev.set_name("server;server;" + std::to_string(v));
|
||||
ev.set_number(v);
|
||||
response.mutable_events()->Add(std::move(ev));
|
||||
}
|
||||
}
|
||||
void start(grpc::ServerCompletionQueue *cq) override
|
||||
{
|
||||
service.RequestSubscribeList(&context(), &request, &op, cq, cq, this);
|
||||
}
|
||||
void process(bool ok) override
|
||||
{
|
||||
QVERIFY(ok);
|
||||
if (index >= responseCount) {
|
||||
op.Finish(grpc::Status::OK, new VoidTag());
|
||||
return;
|
||||
}
|
||||
grpc::WriteOptions wopts;
|
||||
// Enable and disable the compression per-message
|
||||
if (index % 2 == 0)
|
||||
wopts.set_no_compression();
|
||||
op.Write(response, wopts, this);
|
||||
++index;
|
||||
}
|
||||
|
||||
grpc::ServerAsyncWriter<EventList> op;
|
||||
EventHub::AsyncService &service;
|
||||
None request;
|
||||
EventList response;
|
||||
|
||||
size_t index = 0;
|
||||
const grpc_compression_algorithm compressionAlgo;
|
||||
const size_t responseCount = 20;
|
||||
};
|
||||
|
||||
SubscribeListHandler handler(*m_service, compressionAlgo);
|
||||
m_server->startRpcTag(handler);
|
||||
|
||||
auto call = m_client->SubscribeList(qt::None{});
|
||||
QVERIFY(call);
|
||||
|
||||
connect(call.get(), &QGrpcOperation::finished, this,
|
||||
[&](const QGrpcStatus &status) { QCOMPARE(status.code(), QtGrpc::StatusCode::Ok); });
|
||||
connect(call.get(), &QGrpcServerStream::messageReceived, this, [&] {
|
||||
auto response = call->read<qt::EventList>();
|
||||
QVERIFY(response);
|
||||
QCOMPARE_EQ(response->events().size(), handler.response.events().size());
|
||||
for (int i = 0; i < response->events().size(); ++i) {
|
||||
const auto &next = response->events().at(i);
|
||||
const auto &baseline = handler.response.events().at(i);
|
||||
QCOMPARE_EQ(next.name(), QString::fromStdString(baseline.name()));
|
||||
QCOMPARE_EQ(next.number(), baseline.number());
|
||||
}
|
||||
});
|
||||
|
||||
QVERIFY(m_server->startAsyncProcessing());
|
||||
|
||||
QSignalSpy finishedSpy(call.get(), &QGrpcOperation::finished);
|
||||
QVERIFY(finishedSpy.isValid());
|
||||
QVERIFY(finishedSpy.wait());
|
||||
}
|
||||
|
||||
QTEST_MAIN(QtGrpcClientEnd2EndTest)
|
||||
|
||||
#include "tst_grpc_client_end2end.moc"
|
||||
|
|
|
|||
Loading…
Reference in New Issue