2022-10-20 11:49:27 +00:00
|
|
|
// Copyright (C) 2022 The Qt Company Ltd.
|
|
|
|
// Copyright (C) 2019 Alexey Edelev <semlanik@gmail.com>
|
|
|
|
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only
|
|
|
|
|
|
|
|
#include <QtCore/QThread>
|
|
|
|
#include <QtCore/QTimer>
|
|
|
|
#include <QtGrpc/qgrpccallreply.h>
|
|
|
|
#include <QtGrpc/qgrpcstream.h>
|
|
|
|
#include <QtProtobuf/qprotobufserializer.h>
|
|
|
|
#include <qtgrpcglobal_p.h>
|
|
|
|
|
2023-01-02 15:06:11 +00:00
|
|
|
#include <private/qobject_p.h>
|
|
|
|
|
2022-10-20 11:49:27 +00:00
|
|
|
#include "qabstractgrpcclient.h"
|
|
|
|
|
|
|
|
QT_BEGIN_NAMESPACE
|
|
|
|
|
|
|
|
namespace {
|
|
|
|
static QString threadSafetyWarning(QLatin1StringView methodName)
|
|
|
|
{
|
2022-12-08 12:53:08 +00:00
|
|
|
return QLatin1StringView("%1 is called from a different thread.\n"
|
2022-12-13 11:28:02 +00:00
|
|
|
"Qt GRPC doesn't guarantee thread safety on the channel level.\n"
|
2022-10-20 11:49:27 +00:00
|
|
|
"You have to be confident that channel routines are working in "
|
2022-12-08 12:53:08 +00:00
|
|
|
"the same thread as QAbstractGrpcClient.")
|
2022-10-20 11:49:27 +00:00
|
|
|
.arg(methodName);
|
|
|
|
}
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
/*!
|
|
|
|
\class QAbstractGrpcClient
|
2022-12-13 11:28:02 +00:00
|
|
|
\inmodule QtGRPC
|
2022-12-08 12:53:08 +00:00
|
|
|
\brief The QAbstractGrpcClient class is bridge between gRPC clients
|
|
|
|
and channels.
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-08 12:53:08 +00:00
|
|
|
QAbstractGrpcClient provides a set of functions for client classes
|
|
|
|
generated out of protobuf services.
|
|
|
|
QAbstractGrpcClient enforces thread safety for stream() and call() methods
|
|
|
|
of generated clients.
|
|
|
|
*/
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
/*!
|
2022-12-08 12:53:08 +00:00
|
|
|
\fn template <typename ParamType> QGrpcStatus QAbstractGrpcClient::call(const QString &method,
|
|
|
|
const QProtobufMessage &arg);
|
|
|
|
\internal
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-08 12:53:08 +00:00
|
|
|
Synchronously calls the given \a method of this service client,
|
|
|
|
with argument \a arg.
|
|
|
|
*/
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
/*!
|
2022-12-08 12:53:08 +00:00
|
|
|
\fn template <typename ParamType, typename ReturnType> QGrpcStatus
|
|
|
|
QAbstractGrpcClient::call(const QString &method, const QProtobufMessage &arg,
|
|
|
|
QWeakPointer<ReturnType> ret);
|
|
|
|
\internal
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-08 12:53:08 +00:00
|
|
|
Synchronously calls the given \a method of this service client,
|
|
|
|
with argument \a arg and fills \a ret with gRPC reply.
|
|
|
|
*/
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
/*!
|
2022-12-08 12:53:08 +00:00
|
|
|
\fn template <typename ParamType> QSharedPointer<QGrpcStream> QAbstractGrpcClient::stream(const
|
|
|
|
QString &method, const QProtobufMessage &arg);
|
|
|
|
\internal
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-08 12:53:08 +00:00
|
|
|
Streams messages from the server stream \a method with the message
|
|
|
|
argument \a arg to the attached channel.
|
|
|
|
*/
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
/*!
|
2022-12-08 12:53:08 +00:00
|
|
|
\fn template <typename ParamType, typename ReturnType> QSharedPointer<QGrpcStream>
|
|
|
|
QAbstractGrpcClient::stream(const QString &method, const QProtobufMessage &arg,
|
|
|
|
QWeakPointer<ReturnType> ret);
|
|
|
|
\internal
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-08 12:53:08 +00:00
|
|
|
Streams messages from the server stream \a method with the message
|
|
|
|
argument \a arg to the attached channel.
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-08 12:53:08 +00:00
|
|
|
Makes \a ret argument point to allocated return-message structure.
|
|
|
|
The return-message structure will be updated each time a message
|
|
|
|
is received from the server-stream.
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-08 12:53:08 +00:00
|
|
|
\note If \a ret is used as property-fields in other object,
|
|
|
|
property NOTIFY signal won't be called in case of updated
|
|
|
|
message received from server-stream.
|
2022-10-20 11:49:27 +00:00
|
|
|
*/
|
|
|
|
|
2023-01-02 15:06:11 +00:00
|
|
|
class QAbstractGrpcClientPrivate : public QObjectPrivate
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
2023-01-02 15:06:11 +00:00
|
|
|
Q_DECLARE_PUBLIC(QAbstractGrpcClient)
|
2022-10-20 11:49:27 +00:00
|
|
|
public:
|
|
|
|
QAbstractGrpcClientPrivate(const QString &service) : service(service) { }
|
|
|
|
|
2022-12-09 13:03:47 +00:00
|
|
|
std::shared_ptr<QAbstractGrpcChannel> channel;
|
2022-10-20 11:49:27 +00:00
|
|
|
const QString service;
|
2022-12-09 13:03:47 +00:00
|
|
|
std::vector<std::shared_ptr<QGrpcStream>> activeStreams;
|
2022-10-20 11:49:27 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
QAbstractGrpcClient::QAbstractGrpcClient(const QString &service, QObject *parent)
|
2023-01-02 15:06:11 +00:00
|
|
|
: QObject(*new QAbstractGrpcClientPrivate(service), parent)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
QAbstractGrpcClient::~QAbstractGrpcClient() = default;
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
/*!
|
|
|
|
Attaches \a channel to client as transport layer for gRPC.
|
2022-12-08 12:53:08 +00:00
|
|
|
|
|
|
|
Parameters and return values will be serialized to the channel
|
|
|
|
in a format it supports.
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-13 11:28:02 +00:00
|
|
|
\note \b Warning: Qt GRPC doesn't guarantee thread safety on the channel level.
|
2022-12-08 12:53:08 +00:00
|
|
|
You have to invoke the channel-related functions on the same thread as
|
|
|
|
QAbstractGrpcClient.
|
2022-10-20 11:49:27 +00:00
|
|
|
*/
|
2022-12-09 13:03:47 +00:00
|
|
|
void QAbstractGrpcClient::attachChannel(const std::shared_ptr<QAbstractGrpcChannel> &channel)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
|
|
|
if (channel->thread() != QThread::currentThread()) {
|
|
|
|
const QString status = threadSafetyWarning(
|
|
|
|
QLatin1StringView("QAbstractGrpcClient::attachChannel"));
|
|
|
|
logError(status);
|
|
|
|
errorOccurred({ QGrpcStatus::Unknown, status });
|
|
|
|
return;
|
|
|
|
}
|
2023-01-02 15:06:11 +00:00
|
|
|
Q_D(QAbstractGrpcClient);
|
|
|
|
for (auto &stream : d->activeStreams)
|
2022-10-20 11:49:27 +00:00
|
|
|
stream->abort();
|
2022-12-08 12:53:08 +00:00
|
|
|
|
2023-01-02 15:06:11 +00:00
|
|
|
d->channel = channel;
|
|
|
|
for (auto &stream : d->activeStreams)
|
2022-10-20 11:49:27 +00:00
|
|
|
stream->abort();
|
|
|
|
}
|
|
|
|
|
|
|
|
QGrpcStatus QAbstractGrpcClient::call(const QString &method, const QByteArray &arg, QByteArray &ret)
|
|
|
|
{
|
|
|
|
QGrpcStatus callStatus{ QGrpcStatus::Unknown };
|
|
|
|
if (thread() != QThread::currentThread()) {
|
|
|
|
const QGrpcStatus status(
|
|
|
|
{ QGrpcStatus::Unknown,
|
|
|
|
threadSafetyWarning(QLatin1StringView("QAbstractGrpcClient::call")) });
|
|
|
|
logError(status.message());
|
|
|
|
errorOccurred(status);
|
|
|
|
return status;
|
|
|
|
}
|
2023-01-02 15:06:11 +00:00
|
|
|
Q_D(QAbstractGrpcClient);
|
|
|
|
|
|
|
|
callStatus = d->channel
|
|
|
|
? d->channel->call(method, d->service, arg, ret)
|
2022-10-20 11:49:27 +00:00
|
|
|
: QGrpcStatus{ QGrpcStatus::Unknown, QLatin1StringView("No channel(s) attached.") };
|
|
|
|
|
|
|
|
if (callStatus != QGrpcStatus::Ok)
|
|
|
|
errorOccurred(callStatus);
|
|
|
|
|
|
|
|
return callStatus;
|
|
|
|
}
|
|
|
|
|
2022-12-09 13:03:47 +00:00
|
|
|
std::shared_ptr<QGrpcCallReply> QAbstractGrpcClient::call(const QString &method,
|
|
|
|
const QByteArray &arg)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
2022-12-09 13:03:47 +00:00
|
|
|
std::shared_ptr<QGrpcCallReply> reply;
|
2022-10-20 11:49:27 +00:00
|
|
|
if (thread() != QThread::currentThread()) {
|
|
|
|
const QGrpcStatus status(
|
|
|
|
{ QGrpcStatus::Unknown,
|
|
|
|
threadSafetyWarning(QLatin1StringView("QAbstractGrpcClient::call")) });
|
|
|
|
logError(status.message());
|
|
|
|
errorOccurred(status);
|
|
|
|
return reply;
|
|
|
|
}
|
2023-01-02 15:06:11 +00:00
|
|
|
Q_D(QAbstractGrpcClient);
|
|
|
|
|
|
|
|
if (d->channel) {
|
2022-10-20 11:49:27 +00:00
|
|
|
reply.reset(new QGrpcCallReply(this), [](QGrpcCallReply *reply) { reply->deleteLater(); });
|
|
|
|
|
2022-12-09 13:03:47 +00:00
|
|
|
auto errorConnection = std::make_shared<QMetaObject::Connection>();
|
|
|
|
auto finishedConnection = std::make_shared<QMetaObject::Connection>();
|
2022-10-20 11:49:27 +00:00
|
|
|
*errorConnection = connect(reply.get(), &QGrpcCallReply::errorOccurred, this,
|
|
|
|
[this, reply, errorConnection,
|
|
|
|
finishedConnection](const QGrpcStatus &status) mutable {
|
|
|
|
errorOccurred(status);
|
|
|
|
QObject::disconnect(*finishedConnection);
|
|
|
|
QObject::disconnect(*errorConnection);
|
|
|
|
reply.reset();
|
|
|
|
});
|
|
|
|
|
2023-01-03 14:26:20 +00:00
|
|
|
*finishedConnection = connect(reply.get(), &QGrpcCallReply::finished, this,
|
2022-10-20 11:49:27 +00:00
|
|
|
[this, reply, errorConnection, finishedConnection]() mutable {
|
2022-12-08 12:53:08 +00:00
|
|
|
// The usage of 'QObject::disconnect' requires the
|
|
|
|
// compiler to capture 'this', but some compilers
|
|
|
|
// think that 'this' is unused. That's why we use
|
|
|
|
// explicit call to disconnect().
|
2022-10-20 11:49:27 +00:00
|
|
|
this->disconnect(*finishedConnection);
|
|
|
|
this->disconnect(*errorConnection);
|
|
|
|
reply.reset();
|
|
|
|
});
|
|
|
|
|
2023-01-02 15:06:11 +00:00
|
|
|
d->channel->call(method, d->service, arg, reply.get());
|
2022-10-20 11:49:27 +00:00
|
|
|
} else {
|
|
|
|
errorOccurred({ QGrpcStatus::Unknown, QLatin1StringView("No channel(s) attached.") });
|
|
|
|
}
|
|
|
|
|
|
|
|
return reply;
|
|
|
|
}
|
|
|
|
|
2022-12-09 13:03:47 +00:00
|
|
|
std::shared_ptr<QGrpcStream> QAbstractGrpcClient::stream(const QString &method,
|
|
|
|
const QByteArray &arg,
|
|
|
|
const StreamHandler &handler)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
2022-12-09 13:03:47 +00:00
|
|
|
std::shared_ptr<QGrpcStream> grpcStream;
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
if (thread() != QThread::currentThread()) {
|
|
|
|
const QGrpcStatus status(
|
|
|
|
{ QGrpcStatus::Unknown,
|
|
|
|
threadSafetyWarning(QLatin1StringView("QAbstractGrpcClient::stream")) });
|
|
|
|
logError(status.message());
|
|
|
|
errorOccurred(status);
|
|
|
|
return grpcStream;
|
|
|
|
}
|
2023-01-02 15:06:11 +00:00
|
|
|
Q_D(QAbstractGrpcClient);
|
|
|
|
|
|
|
|
if (d->channel) {
|
2022-10-20 11:49:27 +00:00
|
|
|
grpcStream.reset(new QGrpcStream(method, arg, handler, this),
|
|
|
|
[](QGrpcStream *stream) { stream->deleteLater(); });
|
|
|
|
|
2023-01-02 15:06:11 +00:00
|
|
|
auto it = std::find_if(d->activeStreams.begin(), d->activeStreams.end(),
|
2022-12-09 13:03:47 +00:00
|
|
|
[grpcStream](const std::shared_ptr<QGrpcStream> &activeStream) {
|
2022-10-20 11:49:27 +00:00
|
|
|
return *activeStream == *grpcStream;
|
|
|
|
});
|
|
|
|
|
2023-01-02 15:06:11 +00:00
|
|
|
if (it != d->activeStreams.end()) {
|
2022-10-20 11:49:27 +00:00
|
|
|
// TODO:
|
|
|
|
// This mechanism is not something that is specified by gRPC standard. From the server
|
|
|
|
// perspective each stream request supposed to create a new connection with own scope.
|
|
|
|
// Caching and reusing streams lead potential security risks, since we cannot
|
|
|
|
// guarantee that the stream sharing is intentional.
|
2022-12-08 12:53:08 +00:00
|
|
|
// This feature should have an explicit switch that controls its usage.
|
2022-10-20 11:49:27 +00:00
|
|
|
(*it)->addHandler(handler);
|
|
|
|
return *it; // If stream already exists return it for handling
|
|
|
|
}
|
|
|
|
|
2022-12-09 13:03:47 +00:00
|
|
|
auto errorConnection = std::make_shared<QMetaObject::Connection>();
|
2022-10-20 11:49:27 +00:00
|
|
|
*errorConnection = connect(
|
|
|
|
grpcStream.get(), &QGrpcStream::errorOccurred, this,
|
|
|
|
[this, grpcStream](const QGrpcStatus &status) {
|
2023-01-02 15:06:11 +00:00
|
|
|
Q_D(QAbstractGrpcClient);
|
|
|
|
qGrpcWarning() << grpcStream->method() << "call" << d->service
|
2022-10-20 11:49:27 +00:00
|
|
|
<< "stream error: " << status.message();
|
|
|
|
errorOccurred(status);
|
2022-12-09 13:03:47 +00:00
|
|
|
std::weak_ptr<QGrpcStream> weakStream(grpcStream);
|
2022-10-20 11:49:27 +00:00
|
|
|
// TODO: Make timeout configurable from channel settings
|
|
|
|
QTimer::singleShot(1000, this,
|
|
|
|
[this, weakStream, method = grpcStream->method()] {
|
2023-01-02 15:06:11 +00:00
|
|
|
Q_D(QAbstractGrpcClient);
|
2022-10-20 11:49:27 +00:00
|
|
|
auto stream = weakStream.lock();
|
|
|
|
if (stream) {
|
2023-01-03 14:47:29 +00:00
|
|
|
d->channel->stream(stream.get(), d->service);
|
2022-10-20 11:49:27 +00:00
|
|
|
} else {
|
2023-01-02 15:06:11 +00:00
|
|
|
qGrpcDebug() << "Stream for" << d->service
|
2022-10-20 11:49:27 +00:00
|
|
|
<< "method" << method
|
2022-12-08 12:53:08 +00:00
|
|
|
<< "will not be restored by timeout.";
|
2022-10-20 11:49:27 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
2022-12-09 13:03:47 +00:00
|
|
|
auto finishedConnection = std::make_shared<QMetaObject::Connection>();
|
2022-10-20 11:49:27 +00:00
|
|
|
*finishedConnection = connect(
|
|
|
|
grpcStream.get(), &QGrpcStream::finished, this,
|
|
|
|
[this, grpcStream, errorConnection, finishedConnection]() mutable {
|
2023-01-02 15:06:11 +00:00
|
|
|
Q_D(QAbstractGrpcClient);
|
2023-01-03 14:47:29 +00:00
|
|
|
qGrpcWarning()
|
|
|
|
<< grpcStream->method() << "call" << d->service << "stream finished.";
|
2023-01-02 15:06:11 +00:00
|
|
|
auto it = std::find_if(d->activeStreams.begin(), d->activeStreams.end(),
|
2022-12-09 13:03:47 +00:00
|
|
|
[grpcStream](std::shared_ptr<QGrpcStream> activeStream) {
|
2022-10-20 11:49:27 +00:00
|
|
|
return *activeStream == *grpcStream;
|
|
|
|
});
|
|
|
|
|
2023-01-02 15:06:11 +00:00
|
|
|
if (it != d->activeStreams.end())
|
|
|
|
d->activeStreams.erase(it);
|
2022-12-08 12:53:08 +00:00
|
|
|
|
2022-10-20 11:49:27 +00:00
|
|
|
QObject::disconnect(*errorConnection);
|
|
|
|
QObject::disconnect(*finishedConnection);
|
|
|
|
grpcStream.reset();
|
|
|
|
});
|
|
|
|
|
2023-01-03 14:47:29 +00:00
|
|
|
d->channel->stream(grpcStream.get(), d->service);
|
2023-01-02 15:06:11 +00:00
|
|
|
d->activeStreams.push_back(grpcStream);
|
2022-10-20 11:49:27 +00:00
|
|
|
} else {
|
|
|
|
errorOccurred({ QGrpcStatus::Unknown, QLatin1StringView("No channel(s) attached.") });
|
|
|
|
}
|
|
|
|
return grpcStream;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*!
|
|
|
|
\internal
|
|
|
|
|
|
|
|
Serializer provides assigned to client serializer.
|
|
|
|
Returns pointer to serializerowned by QProtobufSerializerRegistry.
|
2022-12-08 12:53:08 +00:00
|
|
|
*/
|
2022-12-09 13:03:47 +00:00
|
|
|
std::shared_ptr<QAbstractProtobufSerializer> QAbstractGrpcClient::serializer() const
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
2023-01-02 15:06:11 +00:00
|
|
|
Q_D(const QAbstractGrpcClient);
|
|
|
|
if (const auto &c = d->channel)
|
2022-12-08 12:53:08 +00:00
|
|
|
return c->serializer();
|
|
|
|
return nullptr;
|
2022-10-20 11:49:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void QAbstractGrpcClient::logError(const QString &str) const
|
|
|
|
{
|
|
|
|
qGrpcCritical() << str;
|
|
|
|
}
|
|
|
|
|
|
|
|
QGrpcStatus QAbstractGrpcClient::handleDeserializationError(
|
|
|
|
const QAbstractProtobufSerializer::DeserializationError &err) const
|
|
|
|
{
|
|
|
|
QGrpcStatus status{ QGrpcStatus::Ok };
|
|
|
|
switch (err) {
|
|
|
|
case QAbstractProtobufSerializer::InvalidHeaderError: {
|
2022-12-08 12:53:08 +00:00
|
|
|
const QLatin1StringView errStr("Response deserialization failed: invalid field found.");
|
2022-10-20 11:49:27 +00:00
|
|
|
status = { QGrpcStatus::InvalidArgument, errStr };
|
|
|
|
logError(errStr);
|
|
|
|
errorOccurred(status);
|
|
|
|
} break;
|
|
|
|
case QAbstractProtobufSerializer::NoDeserializerError: {
|
2022-12-08 12:53:08 +00:00
|
|
|
const QLatin1StringView errStr("No deserializer was found for a given type.");
|
2022-10-20 11:49:27 +00:00
|
|
|
status = { QGrpcStatus::InvalidArgument, errStr };
|
|
|
|
logError(errStr);
|
|
|
|
errorOccurred(status);
|
|
|
|
} break;
|
|
|
|
case QAbstractProtobufSerializer::UnexpectedEndOfStreamError: {
|
2022-12-08 12:53:08 +00:00
|
|
|
const QLatin1StringView errStr("Invalid size of received buffer.");
|
2022-10-20 11:49:27 +00:00
|
|
|
status = { QGrpcStatus::OutOfRange, errStr };
|
|
|
|
logError(errStr);
|
|
|
|
errorOccurred(status);
|
|
|
|
} break;
|
|
|
|
case QAbstractProtobufSerializer::NoError:
|
|
|
|
Q_FALLTHROUGH();
|
|
|
|
default:
|
|
|
|
const QLatin1StringView errStr("Deserializing failed, but no error was set.");
|
|
|
|
status = { QGrpcStatus::InvalidArgument, errStr };
|
|
|
|
logError(errStr);
|
|
|
|
errorOccurred(status);
|
|
|
|
}
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
|
|
|
QT_END_NAMESPACE
|
2022-12-09 12:03:04 +00:00
|
|
|
|
|
|
|
#include "moc_qabstractgrpcclient.cpp"
|