Hey there,
I am using reduct-cpp to upload ROS2 MCAP files. This MCAP files are created dynamically from the subscriptions available. I tested it with different data and it works fine for small sizes like (50KB, 300KB) except for mcap files that are bigger (1MB).
The full log on the reduct side:
2025-11-18 08:32:14.053 (64512) \[INFO\] – reductstore/main.rs:40 ReductStore 1.17.2 \[5dd7ec6 at 2025-11-11T21:42:05Z\]
2025-11-18 08:32:14.054 (64512) \[INFO\] – reductstore/cfg.rs:159 Configuration:
RS_API_BASE_PATH = / (default)
RS_HOST = 0.0.0.0 (default)
RS_PORT = 8383 (default)
RS_PUBLIC_URL = http://0.0.0.0:8383/ (default)
RS_LOG_LEVEL = TRACE
RS_DATA_PATH = /data (default)
RS_API_TOKEN = \*\*\*\*\*\*\*\*\*\*\*\*\*
RS_IO_BATCH_MAX_METADATA_SIZE = 97.7 KiB
2025-11-18 08:32:14.054 (64512) \[INFO\] – reductstore/cfg.rs:170 License: BUSL-1.1 \[ https://github.com/reductstore/reductstore/blob/5dd7ec67b63872aed9cff6befcde817d135fa258/LICENSE \]
2025-11-18 08:32:14.054 (64512) \[INFO\] – reductstore/main.rs:82 Public URL: http://0.0.0.0:8383/
2025-11-18 08:32:14.054 (42592) \[TRACE\] – reductstore/core/thread_pool.rs:333 Spawn unique child task 'data/rolling-buffer/default: restore entry
2025-11-18 08:32:14.054 (90944) \[TRACE\] – reductstore/core/thread_pool.rs:430 Task ‘data/rolling-buffer/default’ started: restore entry
2025-11-18 08:32:14.054 (90944) \[DEBUG\] – reductstore/storage/entry/entry_loader.rs:75 Restored entry `default` in 0ms: size=112.3 MiB, records=4
2025-11-18 08:32:14.054 (90944) \[TRACE\] – reductstore/core/thread_pool.rs:436 Task ‘data/rolling-buffer/default’ completed: restore entry
2025-11-18 08:32:14.054 (42592) \[INFO\] – reductstore/storage/engine.rs:70 Load 1 bucket(s) in 552.188µs
2025-11-18 08:32:14.054 (42592) \[DEBUG\] – reductstore/auth/token_repository.rs:216 Loading token repository from /data/.auth
2025-11-18 08:32:14.054 (42592) \[INFO\] – reductstore/cfg.rs:326 Load Web Console
2025-11-18 08:32:14.055 (42592) \[TRACE\] – reductstore/asset/asset_manager.rs:61 Extracting zip archive to “/tmp/.tmpl7geCF”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/asset-manifest.json”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/favicon.ico”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/index.html”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/manifest.json”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/robots.txt”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/media/main_logo.8e0fec4b275a78f63cc9.png”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/css/main.97ff3307.css.map”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/css/main.97ff3307.css”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/js/453.9cfd7767.chunk.js”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/js/main.4dbb56ea.js.LICENSE.txt”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/js/main.4dbb56ea.js.map”
2025-11-18 08:32:14.073 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/js/main.4dbb56ea.js”
2025-11-18 08:32:14.078 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/js/453.9cfd7767.chunk.js.map”
2025-11-18 08:32:14.078 (42592) \[INFO\] – reductstore/cfg.rs:338 Load Reduct Select Extension
2025-11-18 08:32:14.078 (42592) \[TRACE\] – reductstore/asset/asset_manager.rs:61 Extracting zip archive to “/tmp/.tmpgvB67A”
2025-11-18 08:32:14.078 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpgvB67A/libselect_ext.so”
2025-11-18 08:32:14.100 (42592) \[INFO\] – reductstore/cfg.rs:350 Load Reduct ROS Extension
2025-11-18 08:32:14.100 (42592) \[TRACE\] – reductstore/asset/asset_manager.rs:61 Extracting zip archive to “/tmp/.tmpdzGnKe”
2025-11-18 08:32:14.100 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpdzGnKe/libros_ext.so”
2025-11-18 08:32:14.128 (42592) \[DEBUG\] – reductstore/replication/replication_repository.rs:239 Reading replication repository from /data/.replications
2025-11-18 08:32:14.129 (42592) \[WARN\] – select_ext/ext.rs:54 License not found: disabling select extension
2025-11-18 08:32:14.129 (42592) \[INFO\] – reductstore/ext/ext_repository/load.rs:56 Load extension: IoExtensionInfo { name: “select”, version: “0.6.0” }
2025-11-18 08:32:14.129 (42592) \[WARN\] – ros_ext/ext.rs:54 License not found: disabling select extension
2025-11-18 08:32:14.129 (42592) \[INFO\] – reductstore/ext/ext_repository/load.rs:56 Load extension: IoExtensionInfo { name: “ros”, version: “0.4.0” }
2025-11-18 08:32:25.640 (56128) \[TRACE\] – reductstore/core/thread_pool.rs:316 Spawn unique task 'data/rolling-buffer/default: begin write
2025-11-18 08:32:25.640 (89696) \[TRACE\] – reductstore/core/thread_pool.rs:430 Task ‘data/rolling-buffer/default’ started: begin write
2025-11-18 08:32:25.640 (89696) \[TRACE\] – reductstore/core/thread_pool.rs:406 Spawn isolated child task 'data/rolling-buffer/default/1763454638854389: write record content
2025-11-18 08:32:25.640 (89696) \[TRACE\] – reductstore/core/thread_pool.rs:436 Task ‘data/rolling-buffer/default’ completed: begin write
2025-11-18 08:32:25.640 ( 3456) \[TRACE\] – reductstore/core/thread_pool.rs:430 Task ‘data/rolling-buffer/default/1763454638854389’ started: write record content
2025-11-18 08:32:25.640 (56128) \[ERROR\] – reductstore/api/entry/write_single.rs:95 Error while receiving data: error reading a body from connection
2025-11-18 08:32:25.640 ( 3456) \[ERROR\] – reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763454744815370: \[BadRequest\] error reading a body from connection
2025-11-18 08:32:25.640 ( 3456) \[DEBUG\] – reductstore/storage/block_manager.rs:511 Finished writing record 1763454744815370 to block rolling-buffer/default/1763454638854389.meta with state Errored
2025-11-18 08:32:25.640 ( 3456) \[TRACE\] – reductstore/core/thread_pool.rs:436 Task ‘data/rolling-buffer/default/1763454638854389’ completed: write record content
2025-11-18 08:32:25.640 (57376) \[DEBUG\] – reductstore/api/middleware.rs:57 POST /api/v1/b/rolling-buffer/default?ts=1763454744815370 \[400 Bad Request\] error reading a body from connection
2025-11-18 08:32:25.761 ( 4480) \[DEBUG\] – reductstore/backend/file.rs:100 File /data/rolling-buffer/default/1763454638854389.blk synced to storage backend
2025-11-18 08:32:25.763 ( 4480) \[DEBUG\] – reductstore/backend/file.rs:100 File /data/rolling-buffer/default/wal/1763454638854389.wal synced to storage backend
2025-11-18 08:32:28.847 (57376) \[TRACE\] – reductstore/core/thread_pool.rs:316 Spawn unique task 'data/rolling-buffer/default: begin write
2025-11-18 08:32:28.847 (88448) \[TRACE\] – reductstore/core/thread_pool.rs:430 Task ‘data/rolling-buffer/default’ started: begin write
2025-11-18 08:32:28.847 (88448) \[DEBUG\] – reductstore/storage/entry/write_record.rs:118 Creating a new block
2025-11-18 08:32:28.847 (88448) \[DEBUG\] – reductstore/backend/file.rs:100 File /data/rolling-buffer/default/1763454638854389.blk synced to storage backend
2025-11-18 08:32:28.849 (88448) \[TRACE\] – reductstore/core/thread_pool.rs:406 Spawn isolated child task 'data/rolling-buffer/default/1763454745655942: write record content
2025-11-18 08:32:28.849 (88448) \[TRACE\] – reductstore/core/thread_pool.rs:436 Task ‘data/rolling-buffer/default’ completed: begin write
2025-11-18 08:32:28.850 ( 3456) \[TRACE\] – reductstore/core/thread_pool.rs:430 Task ‘data/rolling-buffer/default/1763454745655942’ started: write record content
2025-11-18 08:32:28.850 (42592) \[ERROR\] – reductstore/api/entry/write_single.rs:95 Error while receiving data: error reading a body from connection
2025-11-18 08:32:28.850 ( 3456) \[ERROR\] – reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763454745655942: \[BadRequest\] error reading a body from connection
2025-11-18 08:32:28.850 ( 3456) \[DEBUG\] – reductstore/storage/block_manager.rs:511 Finished writing record 1763454745655942 to block rolling-buffer/default/1763454745655942.meta with state Errored
2025-11-18 08:32:28.850 ( 3456) \[TRACE\] – reductstore/core/thread_pool.rs:436 Task ‘data/rolling-buffer/default/1763454745655942’ completed: write record content
2025-11-18 08:32:28.850 (56128) \[DEBUG\] – reductstore/api/middleware.rs:57 POST /api/v1/b/rolling-buffer/default?ts=1763454745655942 \[400 Bad Request\] error reading a body from connection
2025-11-18 08:32:28.966 ( 4480) \[DEBUG\] – reductstore/backend/file.rs:100 File /data/rolling-buffer/default/wal/1763454745655942.wal synced to storage backend
2025-11-18 08:32:28.968 ( 4480) \[DEBUG\] – reductstore/backend/file.rs:100 File /data/rolling-buffer/default/1763454745655942.blk synced to storage backend
Simplified code snippet that you can see how I write to the reduct store
void McapRecordingManager::upload_mcap_task(std::string pipeline_name, std::shared_ptr<WriterData> writer_data, uint64_t file_index, PipelineState *state_ptr, std::map<std::string, std::string> static_labels)
{
try
{
if (!bucket_)
{
std::cerr << "[UploadThread " << pipeline_name << "] Bucket not initialized. Aborting upload." << std::endl;
state_ptr->upload_in_progress = false;
return;
}
writer_data->buffer->clear();
// Get content length (matches: content_length = buffer.tell())
writer_data->buffer->seekg(0, std::ios::end);
size_t content_length = static_cast<size_t>(writer_data->buffer->tellg());
// Reset to beginning (matches: buffer.seek(0))
writer_data->buffer->seekg(0, std::ios::beg);
if (content_length == 0)
{
std::cerr << "[UploadThread " << pipeline_name << "] Empty MCAP buffer, skipping upload." << std::endl;
return;
}
reduct::IBucket::WriteOptions write_opts;
write_opts.content_type = "application/mcap";
write_opts.labels = static_labels; // Use labels captured from the calling thread
write_opts.timestamp = std::chrono::time_point<std::chrono::system_clock>(
std::chrono::microseconds(file_index));
auto write_err = bucket_->Write(pipeline_name, write_opts, [&](auto rec)
{
rec->Write(content_length, [&](size_t offset, size_t size) -> std::pair<bool, std::string>
{
writer_data->buffer->seekg(static_cast<std::streamsize>(offset), std::ios::beg);
size_t to_read = std::min(size, content_length - offset);
std::vector<char> buf(to_read);
writer_data->buffer->read(buf.data(), static_cast<std::streamsize>(to_read));
std::streamsize read_count = writer_data->buffer->gcount();
if (read_count <= 0)
{
// No data read - we're done or there's an error
return {true, ""};
}
bool is_last = (offset + static_cast<size_t>(read_count)) >= content_length;
return {is_last, std::string(buf.data(), static_cast<size_t>(read_count))};
}); });
if (write_err != reduct::Error::kOk)
{
std::cerr << "[UploadThread " << pipeline_name << "] Upload failed: " << write_err.message << std::endl;
}
else
{
std::cout << "[UploadThread " << pipeline_name << "] Upload complete (" << content_length << " bytes)." << std::endl;
}
}
catch (const std::exception &e)
{
std::cerr << "[UploadThread " << pipeline_name << "] Upload exception: " << e.what() << std::endl;
}
}
Any help is really appreciated.
Thank you