Add QRandomAccessAsyncFile::flush() method

The flush() operation would normally return bool, indicating if it was
successful or not. As a result, we can use a base QIOOperation to
represent the async call, and just check its error code.

The new tests are designed with the assumption that flush() would act
as a barrier operation, making sure that all read/write operations
before it will finish.

Task-number: QTBUG-136763
Change-Id: I4119eb3218da1985a63fe808da7be754caf4e9d7
Reviewed-by: Fabian Kosmale <fabian.kosmale@qt.io>
This commit is contained in:
Ivan Solovev 2025-08-05 14:07:48 +02:00
parent 121106ae19
commit b9bb896755
6 changed files with 200 additions and 22 deletions

View File

@ -36,6 +36,7 @@ public:
IncorrectOffset,
Read,
Write,
Flush,
Aborted,
};
Q_ENUM(Error)
@ -45,6 +46,7 @@ public:
Unknown,
Read,
Write,
Flush,
};
Q_ENUM(Type)

View File

@ -36,6 +36,19 @@ qint64 QRandomAccessAsyncFile::size() const
return d->size();
}
/*!
\internal
Flushes any buffered data to the file.
\include qrandomaccessasyncfile.cpp returns-qiooperation
*/
QIOOperation *QRandomAccessAsyncFile::flush()
{
Q_D(QRandomAccessAsyncFile);
return d->flush();
}
/*!
\internal

View File

@ -36,6 +36,8 @@ public:
void close();
qint64 size() const;
[[nodiscard]] QIOOperation *flush();
// owning APIs: we are responsible for storing the data
[[nodiscard]] QIOReadOperation *read(qint64 offset, qint64 maxSize);
[[nodiscard]] QIOWriteOperation *write(qint64 offset, const QByteArray &data);

View File

@ -52,6 +52,8 @@ public:
void close();
qint64 size() const;
[[nodiscard]] QIOOperation *flush();
[[nodiscard]] QIOReadOperation *read(qint64 offset, qint64 maxSize);
[[nodiscard]] QIOWriteOperation *write(qint64 offset, const QByteArray &data);
[[nodiscard]] QIOWriteOperation *write(qint64 offset, QByteArray &&data);
@ -86,6 +88,7 @@ private:
void executeNextOperation();
void processBufferAt(qsizetype idx);
void processFlush();
void operationComplete();
#endif
};

View File

@ -142,6 +142,19 @@ qint64 QRandomAccessAsyncFilePrivate::size() const
return -1;
}
QIOOperation *QRandomAccessAsyncFilePrivate::flush()
{
auto *dataStorage = new QtPrivate::QIOOperationDataStorage();
auto *priv = new QIOOperationPrivate(dataStorage);
priv->type = QIOOperation::Type::Flush;
auto *op = new QIOOperation(*priv, q_ptr);
m_operations.append(op);
executeNextOperation();
return op;
}
QIOReadOperation *QRandomAccessAsyncFilePrivate::read(qint64 offset, qint64 maxSize)
{
QByteArray array;
@ -301,8 +314,22 @@ void QRandomAccessAsyncFilePrivate::executeNextOperation()
// start next
if (!m_operations.isEmpty()) {
m_currentOperation = m_operations.takeFirst();
numProcessedBuffers = 0;
processBufferAt(numProcessedBuffers);
switch (m_currentOperation->type()) {
case QIOOperation::Type::Read:
case QIOOperation::Type::Write:
numProcessedBuffers = 0;
processBufferAt(numProcessedBuffers);
break;
case QIOOperation::Type::Flush:
processFlush();
break;
case QIOOperation::Type::Unknown:
Q_ASSERT_X(false, "executeNextOperation", "Operation of type Unknown!");
// For release builds - directly complete the operation
m_watcher.setFuture(QtFuture::makeReadyValueFuture(OperationResult{}));
operationComplete();
break;
}
}
}
}
@ -373,17 +400,49 @@ void QRandomAccessAsyncFilePrivate::processBufferAt(qsizetype idx)
}
}
void QRandomAccessAsyncFilePrivate::processFlush()
{
Q_ASSERT(!m_currentOperation.isNull());
auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
auto &dataStorage = priv->dataStorage;
Q_ASSERT(dataStorage->isEmpty());
QBasicMutex *mutexPtr = &m_engineMutex;
auto op = [engine = m_engine.get(), mutexPtr] {
QMutexLocker locker(mutexPtr);
QRandomAccessAsyncFilePrivate::OperationResult result{0, QIOOperation::Error::None};
if (engine) {
if (!engine->flush())
result.error = QIOOperation::Error::Flush;
} else {
result.error = QIOOperation::Error::FileNotOpen;
}
return result;
};
QFuture<OperationResult> f =
QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
m_watcher.setFuture(f);
}
void QRandomAccessAsyncFilePrivate::operationComplete()
{
// TODO: if one of the buffers was read/written with an error,
// stop processing immediately
auto scheduleNextOperation = qScopeGuard([this]{
m_currentOperation = nullptr;
executeNextOperation();
});
if (m_currentOperation && !m_watcher.isCanceled()) {
OperationResult res = m_watcher.future().result();
auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
auto &dataStorage = priv->dataStorage;
qsizetype expectedBuffersCount = 1;
bool needProcessNext = false;
if (priv->type == QIOOperation::Type::Read) {
switch (priv->type) {
case QIOOperation::Type::Read: {
qsizetype expectedBuffersCount = 1;
if (dataStorage->containsReadSpans()) {
auto &readBuffers = dataStorage->getReadSpans();
expectedBuffersCount = readBuffers.size();
@ -398,27 +457,38 @@ void QRandomAccessAsyncFilePrivate::operationComplete()
array.resize(res.bytesProcessed);
}
priv->appendBytesProcessed(res.bytesProcessed);
needProcessNext = (++numProcessedBuffers < expectedBuffersCount);
if (!needProcessNext)
if (++numProcessedBuffers < expectedBuffersCount) {
// keep executing this command
processBufferAt(numProcessedBuffers);
scheduleNextOperation.dismiss();
} else {
priv->operationComplete(res.error);
} else if (priv->type == QIOOperation::Type::Write) {
}
break;
}
case QIOOperation::Type::Write: {
qsizetype expectedBuffersCount = 1;
if (dataStorage->containsWriteSpans())
expectedBuffersCount = dataStorage->getWriteSpans().size();
Q_ASSERT(numProcessedBuffers < expectedBuffersCount);
needProcessNext = (++numProcessedBuffers < expectedBuffersCount);
priv->appendBytesProcessed(res.bytesProcessed);
if (!needProcessNext)
if (++numProcessedBuffers < expectedBuffersCount) {
// keep executing this command
processBufferAt(numProcessedBuffers);
scheduleNextOperation.dismiss();
} else {
priv->operationComplete(res.error);
}
break;
}
if (needProcessNext) {
// keep executing this command
processBufferAt(numProcessedBuffers);
return;
} else {
m_currentOperation = nullptr;
case QIOOperation::Type::Flush:
priv->operationComplete(res.error);
break;
case QIOOperation::Type::Unknown:
priv->setError(QIOOperation::Error::Aborted);
break;
}
}
executeNextOperation();
}
QT_END_NAMESPACE

View File

@ -30,6 +30,7 @@ private Q_SLOTS:
void roundtripNonOwning();
void roundtripVectored();
void readLessThanMax();
void flushIsBarrier();
void errorHandling_data();
void errorHandling();
void fileClosedInProgress_data();
@ -45,7 +46,7 @@ private:
Owning,
NonOwning,
};
void generateReadWriteOperationColumns();
void generateOperationColumns();
// Write 100 Mb of random data to the file.
// We use such a large amount, because some of the backends will report
@ -347,6 +348,80 @@ void tst_QRandomAccessAsyncFile::readLessThanMax()
}
}
void tst_QRandomAccessAsyncFile::flushIsBarrier()
{
QRandomAccessAsyncFile file;
QVERIFY(file.open(m_file.fileName(), QIODevice::ReadWrite));
// All operations will be deleted together with the file
// Write some data into the file
const qsizetype offset = 1024 * 1024;
const qsizetype sizeA = 10 * 1024 * 1024;
const QByteArray dataToWrite(sizeA, 'a');
const qsizetype sizeB = 5 * 1024 * 1024;
const QByteArray otherDataToWrite(sizeB, 'b');
// This test tries to verify that flush() acts like a barrier.
// The logic is as follows:
// 1. submit a write() operation of 10Mb of a's followed by 5Mb of b's.
// 2. submit a flush().
// 3. submit another write() of 10 c's that overlap a's and b's.
// 4. submit another flush().
// 5. submit a read() of 20 elements from the overlapping region.
// 6. wait until the read() is completed. If flush() works as expected,
// we should get "aaaaaccccccccccbbbbb".
// First write()
QIOVectoredWriteOperation *write1 =
file.writeFrom(offset, { as_bytes(QSpan{dataToWrite}),
as_bytes(QSpan{otherDataToWrite}) });
// First flush()
QIOOperation *flush1 = file.flush();
// Second write()
const qsizetype offset2 = offset + sizeA - 5;
const qsizetype sizeC = 10;
QIOWriteOperation *write2 = file.write(offset2, QByteArray(sizeC, 'c'));
// Second flush()
QIOOperation *flush2 = file.flush();
// Read
const qsizetype readOffset = offset2 - 5;
const qsizetype readSize = 20;
QIOReadOperation *read = file.read(readOffset, readSize);
QSignalSpy readSpy(read, &QIOOperation::finished);
// Wait until the read() operation completes
QTRY_COMPARE_EQ(readSpy.size(), 1);
// Make sure that all operations have successfully finished.
QCOMPARE_EQ(write1->isFinished(), true);
QCOMPARE_EQ(write1->error(), QIOOperation::Error::None);
QCOMPARE_EQ(write1->numBytesProcessed(), sizeA + sizeB);
QCOMPARE_EQ(flush1->isFinished(), true);
QCOMPARE_EQ(flush1->error(), QIOOperation::Error::None);
QCOMPARE_EQ(write2->isFinished(), true);
QCOMPARE_EQ(write2->error(), QIOOperation::Error::None);
QCOMPARE_EQ(write2->numBytesProcessed(), sizeC);
QCOMPARE_EQ(flush2->isFinished(), true);
QCOMPARE_EQ(flush2->error(), QIOOperation::Error::None);
const QByteArray expectedReadResult = "aaaaaccccccccccbbbbb";
QCOMPARE_EQ(read->isFinished(), true);
QCOMPARE_EQ(read->error(), QIOOperation::Error::None);
QCOMPARE_EQ(read->numBytesProcessed(), expectedReadResult.size());
QCOMPARE_EQ(read->data(), expectedReadResult);
}
void tst_QRandomAccessAsyncFile::errorHandling_data()
{
QTest::addColumn<QIOOperation::Type>("operation");
@ -381,6 +456,10 @@ void tst_QRandomAccessAsyncFile::errorHandling_data()
// QTest::newRow("write_past_the_end")
// << QIOOperationBase::Type::Write << QIODeviceBase::ReadWrite
// << qint64(FileSize + 1) << QIOOperationBase::Error::IncorrectOffset;
QTest::newRow("flush_not_open")
<< QIOOperation::Type::Flush << QIODeviceBase::ReadWrite
<< qint64(0) << QIOOperation::Error::FileNotOpen;
}
void tst_QRandomAccessAsyncFile::errorHandling()
@ -399,6 +478,8 @@ void tst_QRandomAccessAsyncFile::errorHandling()
op = file.read(offset, 100);
else if (operation == QIOOperation::Type::Write)
op = file.write(offset, QByteArray(100, 'c'));
else if (operation == QIOOperation::Type::Flush)
op = file.flush();
QVERIFY(op);
@ -415,7 +496,7 @@ void tst_QRandomAccessAsyncFile::errorHandling()
void tst_QRandomAccessAsyncFile::fileClosedInProgress_data()
{
generateReadWriteOperationColumns();
generateOperationColumns();
}
void tst_QRandomAccessAsyncFile::fileClosedInProgress()
@ -447,6 +528,8 @@ void tst_QRandomAccessAsyncFile::fileClosedInProgress()
buffers[i] = QByteArray(OneMb, 'd');
op = file.writeFrom(offset, as_bytes(QSpan{buffers[i]}));
}
} else if (operation == QIOOperation::Type::Flush) {
op = file.flush();
}
QVERIFY(op);
operations[i] = op;
@ -465,7 +548,7 @@ void tst_QRandomAccessAsyncFile::fileClosedInProgress()
void tst_QRandomAccessAsyncFile::fileRemovedInProgress_data()
{
generateReadWriteOperationColumns();
generateOperationColumns();
}
void tst_QRandomAccessAsyncFile::fileRemovedInProgress()
@ -498,6 +581,8 @@ void tst_QRandomAccessAsyncFile::fileRemovedInProgress()
buffers[i] = QByteArray(OneMb, 'd');
op = file.writeFrom(offset, as_bytes(QSpan{buffers[i]}));
}
} else if (operation == QIOOperation::Type::Flush) {
op = file.flush();
}
QVERIFY(op);
operations[i] = op;
@ -509,7 +594,7 @@ void tst_QRandomAccessAsyncFile::fileRemovedInProgress()
void tst_QRandomAccessAsyncFile::operationsDeletedInProgress_data()
{
generateReadWriteOperationColumns();
generateOperationColumns();
}
void tst_QRandomAccessAsyncFile::operationsDeletedInProgress()
@ -541,6 +626,8 @@ void tst_QRandomAccessAsyncFile::operationsDeletedInProgress()
buffers[i] = QByteArray(OneMb, 'd');
op = file.writeFrom(offset, as_bytes(QSpan{buffers[i]}));
}
} else if (operation == QIOOperation::Type::Flush) {
op = file.flush();
}
QVERIFY(op);
operations[i] = op;
@ -554,7 +641,7 @@ void tst_QRandomAccessAsyncFile::operationsDeletedInProgress()
delete op;
}
void tst_QRandomAccessAsyncFile::generateReadWriteOperationColumns()
void tst_QRandomAccessAsyncFile::generateOperationColumns()
{
QTest::addColumn<Ownership>("ownership");
QTest::addColumn<QIOOperation::Type>("operation");
@ -571,6 +658,7 @@ void tst_QRandomAccessAsyncFile::generateReadWriteOperationColumns()
QTest::addRow("read_%s", v.name) << v.own << QIOOperation::Type::Read;
QTest::addRow("write_%s", v.name) << v.own << QIOOperation::Type::Write;
}
QTest::newRow("flush") << Ownership::NonOwning /* ignored */ << QIOOperation::Type::Flush;
}
QTEST_MAIN(tst_QRandomAccessAsyncFile)