mirror of https://github.com/qt/qtgrpc.git
QGrpcHttp2Channel: Replace ExpectedData with GrpcDataParser
We are missing functionality! We are omitting any compressed messages. This patch doesn't add the compression handling but a more structured and sane way of encapsulating the data parsing. Added the compression flag extraction to the parsing. Task-number: QTBUG-129286 Pick-to: 6.10 6.9 6.8 Change-Id: I699fdb6fb0279453a367930e950c2af3e992063d Reviewed-by: Mårten Nordheim <marten.nordheim@qt.io>
This commit is contained in:
parent
be670fea5b
commit
88b73b67b0
|
|
@ -252,21 +252,48 @@ bool hasSslConfiguration(const QGrpcChannelOptions &opts)
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
struct ExpectedData
|
class GrpcDataParser
|
||||||
{
|
{
|
||||||
qsizetype expectedSize = 0;
|
public:
|
||||||
QByteArray container;
|
struct Frame
|
||||||
|
|
||||||
bool updateExpectedSize()
|
|
||||||
{
|
{
|
||||||
if (expectedSize == 0) {
|
Frame(QByteArray &&payload, bool isCompressed)
|
||||||
if (container.size() < GrpcMessageSizeHeaderSize)
|
: payload(std::move(payload)), isCompressed(isCompressed)
|
||||||
return false;
|
{
|
||||||
expectedSize = qFromBigEndian<quint32>(container.data() + 1)
|
|
||||||
+ GrpcMessageSizeHeaderSize;
|
|
||||||
}
|
}
|
||||||
return true;
|
QByteArray payload;
|
||||||
|
bool isCompressed = false;
|
||||||
|
};
|
||||||
|
// Parses the next complete gRPC frame from the buffer. Removes the frame
|
||||||
|
// on success, or returns std::nullopt if incomplete.
|
||||||
|
std::optional<Frame> parseNextFrame()
|
||||||
|
{
|
||||||
|
static constexpr qsizetype FlagOffset = 0;
|
||||||
|
static constexpr qsizetype LengthOffset = 1;
|
||||||
|
|
||||||
|
std::optional<Frame> out;
|
||||||
|
if (container.size() < GrpcMessageSizeHeaderSize)
|
||||||
|
return out;
|
||||||
|
|
||||||
|
// Parse length (big endian, 4 bytes after flag)
|
||||||
|
const auto messageLength = qFromBigEndian<
|
||||||
|
quint32>(reinterpret_cast<const uchar *>(container.constData() + LengthOffset));
|
||||||
|
const qsizetype frameSize = GrpcMessageSizeHeaderSize + messageLength;
|
||||||
|
|
||||||
|
if (container.size() < frameSize)
|
||||||
|
return out; // Incomplete frame in buffer. Wait for more data
|
||||||
|
|
||||||
|
out.emplace(container.mid(GrpcMessageSizeHeaderSize, messageLength),
|
||||||
|
container.at(FlagOffset) != 0);
|
||||||
|
container.remove(0, frameSize);
|
||||||
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void feed(const QByteArray &data) { container.append(data); }
|
||||||
|
qsizetype bytesAvailable() const { return container.size(); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
QByteArray container;
|
||||||
};
|
};
|
||||||
|
|
||||||
// The Http2Handler manages an individual RPC over the HTTP/2 channel.
|
// The Http2Handler manages an individual RPC over the HTTP/2 channel.
|
||||||
|
|
@ -334,7 +361,7 @@ private:
|
||||||
HPack::HttpHeader m_initialHeaders;
|
HPack::HttpHeader m_initialHeaders;
|
||||||
QQueue<QByteArray> m_queue;
|
QQueue<QByteArray> m_queue;
|
||||||
QPointer<QHttp2Stream> m_stream;
|
QPointer<QHttp2Stream> m_stream;
|
||||||
ExpectedData m_expectedData;
|
GrpcDataParser m_grpcDataParser;
|
||||||
State m_state = State::Idle;
|
State m_state = State::Idle;
|
||||||
const bool m_endStreamAtFirstData;
|
const bool m_endStreamAtFirstData;
|
||||||
bool m_writesDoneSent = false;
|
bool m_writesDoneSent = false;
|
||||||
|
|
@ -508,25 +535,15 @@ void Http2Handler::attachStream(QHttp2Stream *stream_)
|
||||||
if (m_state == State::Cancelled)
|
if (m_state == State::Cancelled)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
m_expectedData.container.append(data);
|
m_grpcDataParser.feed(data);
|
||||||
|
while (auto frame = m_grpcDataParser.parseNextFrame()) {
|
||||||
if (!m_expectedData.updateExpectedSize())
|
|
||||||
return;
|
|
||||||
|
|
||||||
while (m_expectedData.container.size() >= m_expectedData.expectedSize) {
|
|
||||||
qCDebug(lcStream,
|
qCDebug(lcStream,
|
||||||
"[%p] About to process message (receivedSize=%" PRIdQSIZETYPE ", "
|
"[%p] Processed gRPC message (compressed=%s, "
|
||||||
"expectedSize=%" PRIdQSIZETYPE ", containerSize=%" PRIdQSIZETYPE ")",
|
"payloadSize=%" PRIdQSIZETYPE ", bufferRemaining=%" PRIdQSIZETYPE ")",
|
||||||
this, data.size(), m_expectedData.expectedSize,
|
this, frame->isCompressed ? "true" : "false", frame->payload.size(),
|
||||||
m_expectedData.container.size());
|
m_grpcDataParser.bytesAvailable());
|
||||||
const auto len = m_expectedData.expectedSize - GrpcMessageSizeHeaderSize;
|
|
||||||
const auto msg = m_expectedData.container.mid(GrpcMessageSizeHeaderSize, len);
|
|
||||||
emit m_context->messageReceived(msg);
|
|
||||||
|
|
||||||
m_expectedData.container.remove(0, m_expectedData.expectedSize);
|
emit m_context->messageReceived(frame->payload);
|
||||||
m_expectedData.expectedSize = 0;
|
|
||||||
if (!m_expectedData.updateExpectedSize())
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (endStream)
|
if (endStream)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue