mirror of https://github.com/qt/qtgrpc.git
Test mockserver: Add explicit Tag completion tracking
Add tag tracking functionality through the explicit EmptyGuard class. Every tag requires a reference of this tracker, so that we can guarantee that after leaving the scope of a testcase, all inserted Tags actually finished. I would like to have stack based Data management but unfortunately the windows implementation on the CI for grpc 1.51 is using the context after finishing, so we still have to delete our data with the Finished operation. Fixes: QTBUG-141780 Pick-to: 6.10 6.8 Change-Id: I1ed0d8e999a7693b85b3080acc29170a775de8d4 Reviewed-by: Alexey Edelev <alexey.edelev@qt.io>
This commit is contained in:
parent
d5414487f7
commit
d9e979f643
|
|
@ -151,7 +151,6 @@ void QtGrpcClientEnd2EndTest::init()
|
|||
void QtGrpcClientEnd2EndTest::cleanup()
|
||||
{
|
||||
m_client.reset();
|
||||
QVERIFY(m_server->stopAsyncProcessing());
|
||||
}
|
||||
|
||||
void QtGrpcClientEnd2EndTest::clientMetadataReceived_data() const
|
||||
|
|
@ -179,6 +178,7 @@ void QtGrpcClientEnd2EndTest::clientMetadataReceived()
|
|||
QFETCH(const MultiHash, channelMetadata);
|
||||
|
||||
// Setup Server-side handling
|
||||
auto processor = m_server->createProcessor();
|
||||
struct ServerData
|
||||
{
|
||||
grpc::ServerAsyncResponseWriter<None> op{ &ctx };
|
||||
|
|
@ -187,37 +187,43 @@ void QtGrpcClientEnd2EndTest::clientMetadataReceived()
|
|||
Event request;
|
||||
None response;
|
||||
};
|
||||
auto *data = new ServerData;
|
||||
ServerData *data = new ServerData;
|
||||
|
||||
CallbackTag *callHandler = new CallbackTag([&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
CallbackTag *callHandler = new CallbackTag(
|
||||
[&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
|
||||
const std::multimap<grpc::string_ref, grpc::string_ref>
|
||||
&receivedMd = data->ctx.client_metadata();
|
||||
auto mergedMd = channelMetadata;
|
||||
mergedMd.unite(callMetadata);
|
||||
const std::multimap<grpc::string_ref, grpc::string_ref>
|
||||
&receivedMd = data->ctx.client_metadata();
|
||||
auto mergedMd = channelMetadata;
|
||||
mergedMd.unite(callMetadata);
|
||||
|
||||
for (auto it = mergedMd.cbegin(); it != mergedMd.cend(); ++it) {
|
||||
// Check that each key-value pair sent by the client exists on the server
|
||||
auto serverRange = receivedMd.equal_range(it.key().toStdString());
|
||||
auto clientRange = mergedMd.equal_range(it.key());
|
||||
for (auto it = mergedMd.cbegin(); it != mergedMd.cend(); ++it) {
|
||||
// Check that each key-value pair sent by the client exists on the server
|
||||
auto serverRange = receivedMd.equal_range(it.key().toStdString());
|
||||
auto clientRange = mergedMd.equal_range(it.key());
|
||||
|
||||
QCOMPARE_EQ(std::distance(serverRange.first, serverRange.second),
|
||||
std::distance(clientRange.first, clientRange.second));
|
||||
while (clientRange.first != clientRange.second) {
|
||||
// Look for the exact entry in the server range. The order may
|
||||
// be changed but it must be present.
|
||||
const auto it = std::find_if(serverRange.first, serverRange.second, [&](auto it) {
|
||||
return it.first == clientRange.first.key().toStdString()
|
||||
&& it.second == clientRange.first.value().toStdString();
|
||||
});
|
||||
QVERIFY(it != serverRange.second);
|
||||
std::advance(clientRange.first, 1);
|
||||
QCOMPARE_EQ(std::distance(serverRange.first, serverRange.second),
|
||||
std::distance(clientRange.first, clientRange.second));
|
||||
while (clientRange.first != clientRange.second) {
|
||||
// Look for the exact entry in the server range. The order may
|
||||
// be changed but it must be present.
|
||||
const auto it = std::find_if(serverRange.first, serverRange.second,
|
||||
[&](auto it) {
|
||||
return it.first
|
||||
== clientRange.first.key().toStdString()
|
||||
&& it.second
|
||||
== clientRange.first.value().toStdString();
|
||||
});
|
||||
QVERIFY(it != serverRange.second);
|
||||
std::advance(clientRange.first, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
data->op.Finish(data->response, grpc::Status::OK, new DeleteTag<ServerData>(data));
|
||||
return CallbackTag::Delete;
|
||||
});
|
||||
data->op.Finish(data->response, grpc::Status::OK,
|
||||
new DeleteTag<ServerData>(data, processor.get()));
|
||||
return CallbackTag::Delete;
|
||||
},
|
||||
processor.get());
|
||||
m_service->RequestPush(&data->ctx, &data->request, &data->op, m_server->cq(), m_server->cq(),
|
||||
callHandler);
|
||||
|
||||
|
|
@ -232,8 +238,6 @@ void QtGrpcClientEnd2EndTest::clientMetadataReceived()
|
|||
QVERIFY(response.has_value());
|
||||
});
|
||||
|
||||
QVERIFY(m_server->startAsyncProcessing());
|
||||
|
||||
QSignalSpy finishedSpy(call.get(), &QGrpcOperation::finished);
|
||||
QVERIFY(finishedSpy.isValid());
|
||||
QVERIFY(finishedSpy.wait());
|
||||
|
|
@ -267,6 +271,7 @@ void QtGrpcClientEnd2EndTest::serverMetadataReceived()
|
|||
QFETCH(const MultiHash, expectedTrailingMd);
|
||||
|
||||
// Setup Server-side handling
|
||||
auto processor = m_server->createProcessor();
|
||||
struct ServerData
|
||||
{
|
||||
grpc::ServerAsyncResponseWriter<None> op{ &ctx };
|
||||
|
|
@ -275,18 +280,21 @@ void QtGrpcClientEnd2EndTest::serverMetadataReceived()
|
|||
Event request;
|
||||
None response;
|
||||
};
|
||||
auto *data = new ServerData;
|
||||
ServerData *data = new ServerData;
|
||||
|
||||
for (auto it = expectedInitialMd.cbegin(); it != expectedInitialMd.cend(); ++it)
|
||||
data->ctx.AddInitialMetadata(it.key().toStdString(), it.value().toStdString());
|
||||
for (auto it = expectedTrailingMd.cbegin(); it != expectedTrailingMd.cend(); ++it)
|
||||
data->ctx.AddTrailingMetadata(it.key().toStdString(), it.value().toStdString());
|
||||
|
||||
CallbackTag *callHandler = new CallbackTag([&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
data->op.Finish(data->response, grpc::Status::OK, new DeleteTag<ServerData>(data));
|
||||
return CallbackTag::Delete;
|
||||
});
|
||||
CallbackTag *callHandler = new CallbackTag(
|
||||
[&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
data->op.Finish(data->response, grpc::Status::OK,
|
||||
new DeleteTag<ServerData>(data, processor.get()));
|
||||
return CallbackTag::Delete;
|
||||
},
|
||||
processor.get());
|
||||
m_service->RequestPush(&data->ctx, &data->request, &data->op, m_server->cq(), m_server->cq(),
|
||||
callHandler);
|
||||
|
||||
|
|
@ -317,7 +325,6 @@ void QtGrpcClientEnd2EndTest::serverMetadataReceived()
|
|||
QVERIFY(trailingMd.contains(it.key(), it.value()));
|
||||
}
|
||||
});
|
||||
QVERIFY(m_server->startAsyncProcessing());
|
||||
|
||||
QSignalSpy finishedSpy(call.get(), &QGrpcOperation::finished);
|
||||
QVERIFY(finishedSpy.isValid());
|
||||
|
|
@ -327,6 +334,7 @@ void QtGrpcClientEnd2EndTest::serverMetadataReceived()
|
|||
void QtGrpcClientEnd2EndTest::serverInitialMetadataEmitted()
|
||||
{
|
||||
// Setup Server-side handling
|
||||
auto processor = m_server->createProcessor();
|
||||
struct ServerData
|
||||
{
|
||||
grpc::ServerAsyncResponseWriter<None> op{ &ctx };
|
||||
|
|
@ -335,21 +343,26 @@ void QtGrpcClientEnd2EndTest::serverInitialMetadataEmitted()
|
|||
Event request;
|
||||
None response;
|
||||
};
|
||||
auto *data = new ServerData;
|
||||
ServerData *data = new ServerData;
|
||||
data->ctx.AddInitialMetadata("initial", "value");
|
||||
data->ctx.AddTrailingMetadata("trailing", "value");
|
||||
|
||||
CallbackTag *callHandler = new CallbackTag([&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
data->op.SendInitialMetadata(new CallbackTag([&](bool ok) {
|
||||
CallbackTag *callHandler = new CallbackTag(
|
||||
[&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
// Wait one second before emitting finished.
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
data->op.Finish(data->response, grpc::Status::OK, new DeleteTag<ServerData>(data));
|
||||
data->op.SendInitialMetadata(new CallbackTag(
|
||||
[&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
// Wait one second before emitting finished.
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
data->op.Finish(data->response, grpc::Status::OK,
|
||||
new DeleteTag<ServerData>(data, processor.get()));
|
||||
return CallbackTag::Delete;
|
||||
},
|
||||
processor.get()));
|
||||
return CallbackTag::Delete;
|
||||
}));
|
||||
return CallbackTag::Delete;
|
||||
});
|
||||
},
|
||||
processor.get());
|
||||
m_service->RequestPush(&data->ctx, &data->request, &data->op, m_server->cq(), m_server->cq(),
|
||||
callHandler);
|
||||
|
||||
|
|
@ -368,7 +381,6 @@ void QtGrpcClientEnd2EndTest::serverInitialMetadataEmitted()
|
|||
initialMetadataTime = QDateTime::currentDateTime();
|
||||
QCOMPARE_EQ(call->serverInitialMetadata().size(), 1);
|
||||
});
|
||||
QVERIFY(m_server->startAsyncProcessing());
|
||||
|
||||
QSignalSpy finishedSpy(call.get(), &QGrpcOperation::finished);
|
||||
QVERIFY(finishedSpy.isValid());
|
||||
|
|
@ -386,6 +398,7 @@ void QtGrpcClientEnd2EndTest::bidiStreamsInOrder()
|
|||
constexpr auto SleepTime = std::chrono::milliseconds(5);
|
||||
|
||||
// Setup Server-side handling
|
||||
auto processor = m_server->createProcessor();
|
||||
struct ServerData
|
||||
{
|
||||
grpc::ServerAsyncReaderWriter<Event, Event> op{ &ctx };
|
||||
|
|
@ -404,51 +417,59 @@ void QtGrpcClientEnd2EndTest::bidiStreamsInOrder()
|
|||
response.set_name("server-" + std::to_string(response.number()));
|
||||
}
|
||||
};
|
||||
auto *data = new ServerData;
|
||||
ServerData *data = new ServerData;
|
||||
|
||||
CallbackTag *reader = new CallbackTag(
|
||||
[&, current = 1u](bool ok) mutable {
|
||||
if (!ok) {
|
||||
data->readerDone = true;
|
||||
if (data->writerDone)
|
||||
data->op.Finish(grpc::Status::OK,
|
||||
new DeleteTag<ServerData>(data, processor.get()));
|
||||
return CallbackTag::Delete;
|
||||
}
|
||||
QCOMPARE_EQ(data->request.type(), Event::CLIENT);
|
||||
QCOMPARE_EQ(data->request.number(), current);
|
||||
std::string name = "client-" + std::to_string(current);
|
||||
QCOMPARE_EQ(data->request.name(), name);
|
||||
++current;
|
||||
|
||||
data->op.Read(&data->request, reader);
|
||||
return CallbackTag::Proceed;
|
||||
},
|
||||
processor.get());
|
||||
CallbackTag *writer = new CallbackTag(
|
||||
[&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
if (data->response.number() >= data->count) {
|
||||
data->writerDone = true;
|
||||
if (data->readerDone)
|
||||
data->op.Finish(grpc::Status::OK,
|
||||
new DeleteTag<ServerData>(data, processor.get()));
|
||||
return CallbackTag::Delete;
|
||||
}
|
||||
std::this_thread::sleep_for(SleepTime);
|
||||
data->updateResponse();
|
||||
data->op.Write(data->response, writer);
|
||||
return CallbackTag::Proceed;
|
||||
},
|
||||
processor.get());
|
||||
CallbackTag *callHandler = new CallbackTag(
|
||||
[&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
const auto &md = data->ctx.client_metadata();
|
||||
const auto countIt = md.find("call-count");
|
||||
QVERIFY(countIt != md.cend());
|
||||
data->count = std::stoul(std::string(countIt->second.data(), countIt->second.length()));
|
||||
QCOMPARE_GT(data->count, 0);
|
||||
|
||||
data->op.Read(&data->request, reader);
|
||||
data->updateResponse();
|
||||
data->op.Write(data->response, writer);
|
||||
|
||||
CallbackTag *reader = new CallbackTag([&, current = 1u](bool ok) mutable {
|
||||
if (!ok) {
|
||||
data->readerDone = true;
|
||||
if (data->writerDone)
|
||||
data->op.Finish(grpc::Status::OK, new DeleteTag<ServerData>(data));
|
||||
return CallbackTag::Delete;
|
||||
}
|
||||
QCOMPARE_EQ(data->request.type(), Event::CLIENT);
|
||||
QCOMPARE_EQ(data->request.number(), current);
|
||||
std::string name = "client-" + std::to_string(current);
|
||||
QCOMPARE_EQ(data->request.name(), name);
|
||||
++current;
|
||||
|
||||
data->op.Read(&data->request, reader);
|
||||
return CallbackTag::Proceed;
|
||||
});
|
||||
CallbackTag *writer = new CallbackTag([&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
if (data->response.number() >= data->count) {
|
||||
data->writerDone = true;
|
||||
if (data->readerDone)
|
||||
data->op.Finish(grpc::Status::OK, new DeleteTag<ServerData>(data));
|
||||
return CallbackTag::Delete;
|
||||
}
|
||||
std::this_thread::sleep_for(SleepTime);
|
||||
data->updateResponse();
|
||||
data->op.Write(data->response, writer);
|
||||
return CallbackTag::Proceed;
|
||||
});
|
||||
CallbackTag *callHandler = new CallbackTag([&](bool ok) {
|
||||
QVERIFY(ok);
|
||||
const auto &md = data->ctx.client_metadata();
|
||||
const auto countIt = md.find("call-count");
|
||||
QVERIFY(countIt != md.cend());
|
||||
data->count = std::stoul(std::string(countIt->second.data(), countIt->second.length()));
|
||||
QCOMPARE_GT(data->count, 0);
|
||||
|
||||
data->op.Read(&data->request, reader);
|
||||
data->updateResponse();
|
||||
data->op.Write(data->response, writer);
|
||||
|
||||
return CallbackTag::Delete;
|
||||
});
|
||||
},
|
||||
processor.get());
|
||||
m_service->RequestExchange(&data->ctx, &data->op, m_server->cq(), m_server->cq(), callHandler);
|
||||
|
||||
// Client bidi stream
|
||||
|
|
@ -490,8 +511,6 @@ void QtGrpcClientEnd2EndTest::bidiStreamsInOrder()
|
|||
});
|
||||
delayedWriter.start(SleepTime);
|
||||
|
||||
QVERIFY(m_server->startAsyncProcessing());
|
||||
|
||||
QSignalSpy finishedSpy(stream.get(), &QGrpcOperation::finished);
|
||||
QVERIFY(finishedSpy.isValid());
|
||||
QVERIFY(finishedSpy.wait());
|
||||
|
|
@ -508,13 +527,16 @@ void QtGrpcClientEnd2EndTest::clientHandlesCompression_data() const
|
|||
void QtGrpcClientEnd2EndTest::clientHandlesCompression()
|
||||
{
|
||||
QFETCH(const grpc_compression_algorithm, compressionAlgo);
|
||||
EventList serverResponses;
|
||||
|
||||
class SubscribeListHandler : public AbstractRpcTag
|
||||
{
|
||||
public:
|
||||
SubscribeListHandler(EventHub::AsyncService &service_,
|
||||
const grpc_compression_algorithm compressionAlgo_)
|
||||
: op(&context()), service(service_), compressionAlgo(compressionAlgo_)
|
||||
SubscribeListHandler(EventList &responses_, EventHub::AsyncService &service_,
|
||||
const grpc_compression_algorithm compressionAlgo_,
|
||||
TagProcessor *processor)
|
||||
: AbstractRpcTag(processor), op(&context()), service(service_), responses(responses_),
|
||||
compressionAlgo(compressionAlgo_)
|
||||
{
|
||||
context().set_compression_algorithm(compressionAlgo);
|
||||
context().set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
|
||||
|
|
@ -525,7 +547,7 @@ void QtGrpcClientEnd2EndTest::clientHandlesCompression()
|
|||
Event ev;
|
||||
ev.set_name("server;server;" + std::to_string(v));
|
||||
ev.set_number(v);
|
||||
response.mutable_events()->Add(std::move(ev));
|
||||
responses.mutable_events()->Add(std::move(ev));
|
||||
}
|
||||
}
|
||||
void start(grpc::ServerCompletionQueue *cq) override
|
||||
|
|
@ -536,28 +558,32 @@ void QtGrpcClientEnd2EndTest::clientHandlesCompression()
|
|||
{
|
||||
QVERIFY(ok);
|
||||
if (index >= responseCount) {
|
||||
op.Finish(grpc::Status::OK, new VoidTag());
|
||||
op.Finish(grpc::Status::OK, new DeleteTag<SubscribeListHandler>(this, processor));
|
||||
return;
|
||||
}
|
||||
|
||||
grpc::WriteOptions wopts;
|
||||
// Enable and disable the compression per-message
|
||||
if (index % 2 == 0)
|
||||
wopts.set_no_compression();
|
||||
op.Write(response, wopts, this);
|
||||
op.Write(responses, wopts, this);
|
||||
++index;
|
||||
}
|
||||
|
||||
grpc::ServerAsyncWriter<EventList> op;
|
||||
EventHub::AsyncService &service;
|
||||
|
||||
None request;
|
||||
EventList response;
|
||||
EventList &responses;
|
||||
|
||||
size_t index = 0;
|
||||
const grpc_compression_algorithm compressionAlgo;
|
||||
const size_t responseCount = 20;
|
||||
};
|
||||
|
||||
SubscribeListHandler handler(*m_service, compressionAlgo);
|
||||
auto processor = m_server->createProcessor();
|
||||
SubscribeListHandler *handler = new SubscribeListHandler(serverResponses, *m_service,
|
||||
compressionAlgo, processor.get());
|
||||
m_server->startRpcTag(handler);
|
||||
|
||||
auto call = m_client->SubscribeList(qt::None{});
|
||||
|
|
@ -568,17 +594,15 @@ void QtGrpcClientEnd2EndTest::clientHandlesCompression()
|
|||
connect(call.get(), &QGrpcServerStream::messageReceived, this, [&] {
|
||||
auto response = call->read<qt::EventList>();
|
||||
QVERIFY(response);
|
||||
QCOMPARE_EQ(response->events().size(), handler.response.events().size());
|
||||
QCOMPARE_EQ(response->events().size(), serverResponses.events().size());
|
||||
for (int i = 0; i < response->events().size(); ++i) {
|
||||
const auto &next = response->events().at(i);
|
||||
const auto &baseline = handler.response.events().at(i);
|
||||
const auto &baseline = serverResponses.events().at(i);
|
||||
QCOMPARE_EQ(next.name(), QString::fromStdString(baseline.name()));
|
||||
QCOMPARE_EQ(next.number(), baseline.number());
|
||||
}
|
||||
});
|
||||
|
||||
QVERIFY(m_server->startAsyncProcessing());
|
||||
|
||||
QSignalSpy finishedSpy(call.get(), &QGrpcOperation::finished);
|
||||
QVERIFY(finishedSpy.isValid());
|
||||
QVERIFY(finishedSpy.wait());
|
||||
|
|
|
|||
|
|
@ -37,5 +37,6 @@ target_link_libraries(grpc_mock_server
|
|||
PUBLIC
|
||||
protobuf::libprotobuf
|
||||
gRPC::grpc++
|
||||
Qt::Test
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -2,10 +2,85 @@
|
|||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only
|
||||
|
||||
#include "mockserver.h"
|
||||
#include "tags.h"
|
||||
|
||||
#include <QtTest/qtest.h>
|
||||
|
||||
#include <grpcpp/alarm.h>
|
||||
#include <grpcpp/server_builder.h>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
TagProcessor::TagProcessor(MockServer *server)
|
||||
: mServer(server)
|
||||
{
|
||||
QVERIFY(mServer);
|
||||
QVERIFY(mServer->cq());
|
||||
mThread = std::thread(&TagProcessor::processLoop, this);
|
||||
}
|
||||
|
||||
TagProcessor::~TagProcessor()
|
||||
{
|
||||
QVERIFY(waitForTagCompletion(5s));
|
||||
|
||||
mRunning.store(false);
|
||||
wakeCQ();
|
||||
|
||||
if (mThread.joinable())
|
||||
mThread.join();
|
||||
|
||||
drainTags();
|
||||
}
|
||||
|
||||
void TagProcessor::processLoop()
|
||||
{
|
||||
while (mRunning.load(std::memory_order_acquire)) {
|
||||
mServer->processTag();
|
||||
}
|
||||
}
|
||||
|
||||
void TagProcessor::registerTag(AbstractTag *tag)
|
||||
{
|
||||
std::scoped_lock lock(mMutex);
|
||||
const auto it = mActiveTags.insert(tag);
|
||||
QVERIFY(it.second);
|
||||
}
|
||||
|
||||
void TagProcessor::unregisterTag(AbstractTag *tag)
|
||||
{
|
||||
std::scoped_lock lock(mMutex);
|
||||
const auto erased = mActiveTags.erase(tag);
|
||||
QCOMPARE_EQ(erased, 1);
|
||||
if (mActiveTags.empty())
|
||||
mCv.notify_all();
|
||||
}
|
||||
|
||||
size_t TagProcessor::activeTagCount() const noexcept
|
||||
{
|
||||
std::scoped_lock lock(mMutex);
|
||||
return mActiveTags.size();
|
||||
}
|
||||
|
||||
bool TagProcessor::waitForTagCompletion(std::chrono::milliseconds deadline) noexcept
|
||||
{
|
||||
std::unique_lock lock(mMutex);
|
||||
return mCv.wait_for(lock, deadline, [this] { return mActiveTags.empty(); });
|
||||
}
|
||||
|
||||
void TagProcessor::wakeCQ()
|
||||
{
|
||||
auto *tag = new VoidTag(this);
|
||||
auto *alarm = new grpc::Alarm();
|
||||
alarm->Set(mServer->cq(), gpr_now(GPR_CLOCK_REALTIME), tag);
|
||||
}
|
||||
|
||||
void TagProcessor::drainTags()
|
||||
{
|
||||
QVERIFY(!mRunning);
|
||||
while (mServer->processTag(50))
|
||||
;
|
||||
}
|
||||
|
||||
MockServer::MockServer() = default;
|
||||
MockServer::~MockServer()
|
||||
{
|
||||
|
|
@ -35,10 +110,7 @@ bool MockServer::start(std::vector<ListeningPort> ports, std::vector<grpc::Servi
|
|||
bool MockServer::stop()
|
||||
{
|
||||
State currentState = mState.load();
|
||||
if (currentState == State::Processing)
|
||||
stopAsyncProcessing();
|
||||
|
||||
if (currentState != State::Started && currentState != State::Processing)
|
||||
if (currentState != State::Started && currentState != State::ShuttingDown)
|
||||
return currentState == State::Stopped;
|
||||
|
||||
mState = State::ShuttingDown;
|
||||
|
|
@ -63,9 +135,8 @@ bool MockServer::stop()
|
|||
bool MockServer::processTag(int timeoutMs)
|
||||
{
|
||||
State currentState = mState.load();
|
||||
if (currentState != State::Processing && currentState != State::Started) {
|
||||
if (currentState != State::Started)
|
||||
return false;
|
||||
}
|
||||
|
||||
void *rawTag = nullptr;
|
||||
bool ok = false;
|
||||
|
|
@ -82,33 +153,6 @@ bool MockServer::processTag(int timeoutMs)
|
|||
return false;
|
||||
}
|
||||
|
||||
bool MockServer::startAsyncProcessing(int timeoutMs)
|
||||
{
|
||||
if (!transitionState(State::Started, State::Processing))
|
||||
return false;
|
||||
|
||||
mProcessingThread = std::thread([this, timeoutMs] {
|
||||
while (mState.load(std::memory_order_acquire) == State::Processing) {
|
||||
processTag(timeoutMs);
|
||||
}
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
bool MockServer::stopAsyncProcessing()
|
||||
{
|
||||
if (!transitionState(State::Processing, State::Started))
|
||||
return false;
|
||||
|
||||
if (mProcessingThread.joinable()) {
|
||||
grpc::Alarm alarm;
|
||||
// Trigger an event so that the processing loop detects the change.
|
||||
alarm.Set(mCQ.get(), gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), new VoidTag());
|
||||
mProcessingThread.join();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
MockServer &MockServer::step(int timeoutMs)
|
||||
{
|
||||
mFutures.emplace_back(std::async(std::launch::async,
|
||||
|
|
@ -126,6 +170,17 @@ bool MockServer::waitForAllSteps()
|
|||
return true;
|
||||
}
|
||||
|
||||
void MockServer::startRpcTag(AbstractRpcTag *tag)
|
||||
{
|
||||
QVERIFY(tag);
|
||||
tag->start(mCQ.get());
|
||||
}
|
||||
|
||||
std::unique_ptr<TagProcessor> MockServer::createProcessor()
|
||||
{
|
||||
return std::make_unique<TagProcessor>(this);
|
||||
}
|
||||
|
||||
bool MockServer::transitionState(State from, State to)
|
||||
{
|
||||
State expected = from;
|
||||
|
|
|
|||
|
|
@ -3,17 +3,17 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "tags.h"
|
||||
#include "certificates.h"
|
||||
|
||||
#include <grpcpp/grpcpp.h>
|
||||
#include <grpcpp/security/server_credentials.h>
|
||||
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <thread>
|
||||
#include <future>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
struct ListeningPort
|
||||
{
|
||||
|
|
@ -22,6 +22,39 @@ struct ListeningPort
|
|||
int selectedPort = -1;
|
||||
};
|
||||
|
||||
class MockServer;
|
||||
class AbstractTag;
|
||||
class AbstractRpcTag;
|
||||
|
||||
class TagProcessor
|
||||
{
|
||||
public:
|
||||
explicit TagProcessor(MockServer *server);
|
||||
~TagProcessor();
|
||||
|
||||
TagProcessor(const TagProcessor &) = delete;
|
||||
TagProcessor &operator=(const TagProcessor &) = delete;
|
||||
|
||||
void registerTag(AbstractTag *tag);
|
||||
void unregisterTag(AbstractTag *tag);
|
||||
|
||||
[[nodiscard]] size_t activeTagCount() const noexcept;
|
||||
[[nodiscard]] bool waitForTagCompletion(std::chrono::milliseconds deadline) noexcept;
|
||||
|
||||
private:
|
||||
void processLoop();
|
||||
void wakeCQ();
|
||||
void drainTags();
|
||||
|
||||
MockServer *mServer;
|
||||
std::atomic_bool mRunning = true;
|
||||
std::thread mThread;
|
||||
|
||||
mutable std::mutex mMutex;
|
||||
std::condition_variable mCv;
|
||||
std::unordered_set<AbstractTag *> mActiveTags;
|
||||
};
|
||||
|
||||
class MockServer
|
||||
{
|
||||
public:
|
||||
|
|
@ -29,7 +62,6 @@ public:
|
|||
Stopped,
|
||||
Starting,
|
||||
Started,
|
||||
Processing,
|
||||
ShuttingDown,
|
||||
};
|
||||
|
||||
|
|
@ -43,14 +75,14 @@ public:
|
|||
bool stop();
|
||||
|
||||
bool processTag(int timeoutMs = -1);
|
||||
bool startAsyncProcessing(int timeoutMs = -1);
|
||||
bool stopAsyncProcessing();
|
||||
|
||||
void startRpcTag(AbstractRpcTag &tag) { tag.start(mCQ.get()); }
|
||||
void startRpcTag(AbstractRpcTag *tag);
|
||||
|
||||
MockServer &step(int timeoutMs = -1);
|
||||
bool waitForAllSteps();
|
||||
|
||||
std::unique_ptr<TagProcessor> createProcessor();
|
||||
|
||||
private:
|
||||
bool transitionState(State from, State to);
|
||||
|
||||
|
|
@ -58,6 +90,7 @@ private:
|
|||
std::unique_ptr<grpc::Server> mServer;
|
||||
std::unique_ptr<grpc::ServerCompletionQueue> mCQ;
|
||||
std::vector<std::future<bool>> mFutures;
|
||||
std::thread mProcessingThread;
|
||||
std::atomic<State> mState = State::Stopped;
|
||||
};
|
||||
|
||||
#include "tags.h"
|
||||
|
|
|
|||
|
|
@ -3,6 +3,8 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "mockserver.h"
|
||||
|
||||
#include <grpcpp/completion_queue.h>
|
||||
#include <grpcpp/server_context.h>
|
||||
|
||||
|
|
@ -12,16 +14,22 @@
|
|||
class AbstractTag
|
||||
{
|
||||
public:
|
||||
explicit AbstractTag() = default;
|
||||
virtual ~AbstractTag() = default;
|
||||
explicit AbstractTag(TagProcessor *processor_) : processor(processor_)
|
||||
{
|
||||
processor->registerTag(this);
|
||||
}
|
||||
virtual ~AbstractTag() { processor->unregisterTag(this); }
|
||||
|
||||
AbstractTag(const AbstractTag &) = delete;
|
||||
AbstractTag &operator=(const AbstractTag &) = delete;
|
||||
|
||||
AbstractTag(AbstractTag &&) = default;
|
||||
AbstractTag &operator=(AbstractTag &&) = default;
|
||||
AbstractTag(AbstractTag &&) = delete;
|
||||
AbstractTag &operator=(AbstractTag &&) = delete;
|
||||
|
||||
virtual void process(bool ok) = 0;
|
||||
|
||||
protected:
|
||||
TagProcessor *processor;
|
||||
};
|
||||
|
||||
class CallbackTag : public AbstractTag
|
||||
|
|
@ -29,7 +37,11 @@ class CallbackTag : public AbstractTag
|
|||
public:
|
||||
enum Operation { Proceed, Delete };
|
||||
using Function = std::function<Operation(bool)>;
|
||||
explicit CallbackTag(Function fn) : mFn(std::move(fn)) { }
|
||||
|
||||
explicit CallbackTag(Function fn, TagProcessor *processor_)
|
||||
: AbstractTag(processor_), mFn(std::move(fn))
|
||||
{
|
||||
}
|
||||
void process(bool ok) override
|
||||
{
|
||||
if (mFn(ok) == Delete)
|
||||
|
|
@ -43,18 +55,27 @@ private:
|
|||
class VoidTag final : public CallbackTag
|
||||
{
|
||||
public:
|
||||
VoidTag() : CallbackTag([](bool) { return Delete; }) { }
|
||||
VoidTag(TagProcessor *processor_) : CallbackTag([](bool) { return Delete; }, processor_) { }
|
||||
};
|
||||
|
||||
// TODO: gRPC 1.50.1 on windows has faulty lifetime management and still uses
|
||||
// the context post completion in the interceptors. This should be fixed in
|
||||
// newer versions. Remove this when upgrading to favor stack based lifetime
|
||||
// management in testcases.
|
||||
template <typename Data>
|
||||
class DeleteTag final : public CallbackTag
|
||||
{
|
||||
public:
|
||||
explicit DeleteTag(Data *data) : CallbackTag([](bool) { return Delete; }), data(data)
|
||||
explicit DeleteTag(Data *data, TagProcessor *processor_)
|
||||
: CallbackTag([](bool) { return Delete; }, processor_), data(data)
|
||||
{
|
||||
assert(this->data);
|
||||
}
|
||||
~DeleteTag() { delete data; }
|
||||
~DeleteTag()
|
||||
{
|
||||
delete data;
|
||||
data = nullptr;
|
||||
}
|
||||
|
||||
private:
|
||||
Data *data;
|
||||
|
|
@ -63,13 +84,15 @@ private:
|
|||
class AbstractRpcTag : public AbstractTag
|
||||
{
|
||||
public:
|
||||
AbstractRpcTag()
|
||||
AbstractRpcTag(TagProcessor *processor_) : AbstractTag(processor_)
|
||||
{
|
||||
mContext.AsyncNotifyWhenDone(new CallbackTag([this](bool ok) {
|
||||
if (ok && mContext.IsCancelled())
|
||||
mIsCancelled = true;
|
||||
return CallbackTag::Delete;
|
||||
}));
|
||||
mContext.AsyncNotifyWhenDone(new CallbackTag(
|
||||
[this](bool ok) {
|
||||
if (ok && mContext.IsCancelled())
|
||||
mIsCancelled = true;
|
||||
return CallbackTag::Delete;
|
||||
},
|
||||
processor));
|
||||
}
|
||||
|
||||
virtual void start(grpc::ServerCompletionQueue *cq) = 0;
|
||||
|
|
|
|||
Loading…
Reference in New Issue