Problem when uploading MCAPs > 1MB with Reduct CPP

Hey there,

I am using reduct-cpp to upload ROS2 MCAP files. This MCAP files are created dynamically from the subscriptions available. I tested it with different data and it works fine for small sizes like (50KB, 300KB) except for mcap files that are bigger (1MB).

The full log on the reduct side:

2025-11-18 08:32:14.053 (64512) \[INFO\] – reductstore/main.rs:40 ReductStore 1.17.2 \[5dd7ec6 at 2025-11-11T21:42:05Z\]
2025-11-18 08:32:14.054 (64512) \[INFO\] – reductstore/cfg.rs:159 Configuration:
RS_API_BASE_PATH = / (default)
RS_HOST = 0.0.0.0 (default)
RS_PORT = 8383 (default)
RS_PUBLIC_URL = http://0.0.0.0:8383/ (default)
RS_LOG_LEVEL = TRACE
RS_DATA_PATH = /data (default)
RS_API_TOKEN = \*\*\*\*\*\*\*\*\*\*\*\*\*
RS_IO_BATCH_MAX_METADATA_SIZE = 97.7 KiB

2025-11-18 08:32:14.054 (64512) \[INFO\] – reductstore/cfg.rs:170 License: BUSL-1.1 \[ https://github.com/reductstore/reductstore/blob/5dd7ec67b63872aed9cff6befcde817d135fa258/LICENSE \]
2025-11-18 08:32:14.054 (64512) \[INFO\] – reductstore/main.rs:82 Public URL: http://0.0.0.0:8383/
2025-11-18 08:32:14.054 (42592) \[TRACE\] – reductstore/core/thread_pool.rs:333 Spawn unique child task 'data/rolling-buffer/default: restore entry
2025-11-18 08:32:14.054 (90944) \[TRACE\] – reductstore/core/thread_pool.rs:430 Task ‘data/rolling-buffer/default’ started: restore entry
2025-11-18 08:32:14.054 (90944) \[DEBUG\] – reductstore/storage/entry/entry_loader.rs:75 Restored entry `default` in 0ms: size=112.3 MiB, records=4
2025-11-18 08:32:14.054 (90944) \[TRACE\] – reductstore/core/thread_pool.rs:436 Task ‘data/rolling-buffer/default’ completed: restore entry
2025-11-18 08:32:14.054 (42592) \[INFO\] – reductstore/storage/engine.rs:70 Load 1 bucket(s) in 552.188µs
2025-11-18 08:32:14.054 (42592) \[DEBUG\] – reductstore/auth/token_repository.rs:216 Loading token repository from /data/.auth
2025-11-18 08:32:14.054 (42592) \[INFO\] – reductstore/cfg.rs:326 Load Web Console
2025-11-18 08:32:14.055 (42592) \[TRACE\] – reductstore/asset/asset_manager.rs:61 Extracting zip archive to “/tmp/.tmpl7geCF”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/asset-manifest.json”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/favicon.ico”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/index.html”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/manifest.json”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/robots.txt”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/media/main_logo.8e0fec4b275a78f63cc9.png”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/css/main.97ff3307.css.map”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/css/main.97ff3307.css”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/js/453.9cfd7767.chunk.js”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/js/main.4dbb56ea.js.LICENSE.txt”
2025-11-18 08:32:14.055 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/js/main.4dbb56ea.js.map”
2025-11-18 08:32:14.073 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/js/main.4dbb56ea.js”
2025-11-18 08:32:14.078 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpl7geCF/static/js/453.9cfd7767.chunk.js.map”
2025-11-18 08:32:14.078 (42592) \[INFO\] – reductstore/cfg.rs:338 Load Reduct Select Extension
2025-11-18 08:32:14.078 (42592) \[TRACE\] – reductstore/asset/asset_manager.rs:61 Extracting zip archive to “/tmp/.tmpgvB67A”
2025-11-18 08:32:14.078 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpgvB67A/libselect_ext.so”
2025-11-18 08:32:14.100 (42592) \[INFO\] – reductstore/cfg.rs:350 Load Reduct ROS Extension
2025-11-18 08:32:14.100 (42592) \[TRACE\] – reductstore/asset/asset_manager.rs:61 Extracting zip archive to “/tmp/.tmpdzGnKe”
2025-11-18 08:32:14.100 (42592) \[DEBUG\] – reductstore/asset/asset_manager.rs:73 Extracting file to “/tmp/.tmpdzGnKe/libros_ext.so”
2025-11-18 08:32:14.128 (42592) \[DEBUG\] – reductstore/replication/replication_repository.rs:239 Reading replication repository from /data/.replications
2025-11-18 08:32:14.129 (42592) \[WARN\] – select_ext/ext.rs:54 License not found: disabling select extension
2025-11-18 08:32:14.129 (42592) \[INFO\] – reductstore/ext/ext_repository/load.rs:56 Load extension: IoExtensionInfo { name: “select”, version: “0.6.0” }
2025-11-18 08:32:14.129 (42592) \[WARN\] – ros_ext/ext.rs:54 License not found: disabling select extension
2025-11-18 08:32:14.129 (42592) \[INFO\] – reductstore/ext/ext_repository/load.rs:56 Load extension: IoExtensionInfo { name: “ros”, version: “0.4.0” }
2025-11-18 08:32:25.640 (56128) \[TRACE\] – reductstore/core/thread_pool.rs:316 Spawn unique task 'data/rolling-buffer/default: begin write
2025-11-18 08:32:25.640 (89696) \[TRACE\] – reductstore/core/thread_pool.rs:430 Task ‘data/rolling-buffer/default’ started: begin write
2025-11-18 08:32:25.640 (89696) \[TRACE\] – reductstore/core/thread_pool.rs:406 Spawn isolated child task 'data/rolling-buffer/default/1763454638854389: write record content
2025-11-18 08:32:25.640 (89696) \[TRACE\] – reductstore/core/thread_pool.rs:436 Task ‘data/rolling-buffer/default’ completed: begin write
2025-11-18 08:32:25.640 ( 3456) \[TRACE\] – reductstore/core/thread_pool.rs:430 Task ‘data/rolling-buffer/default/1763454638854389’ started: write record content
2025-11-18 08:32:25.640 (56128) \[ERROR\] – reductstore/api/entry/write_single.rs:95 Error while receiving data: error reading a body from connection
2025-11-18 08:32:25.640 ( 3456) \[ERROR\] – reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763454744815370: \[BadRequest\] error reading a body from connection
2025-11-18 08:32:25.640 ( 3456) \[DEBUG\] – reductstore/storage/block_manager.rs:511 Finished writing record 1763454744815370 to block rolling-buffer/default/1763454638854389.meta with state Errored
2025-11-18 08:32:25.640 ( 3456) \[TRACE\] – reductstore/core/thread_pool.rs:436 Task ‘data/rolling-buffer/default/1763454638854389’ completed: write record content
2025-11-18 08:32:25.640 (57376) \[DEBUG\] – reductstore/api/middleware.rs:57 POST /api/v1/b/rolling-buffer/default?ts=1763454744815370 \[400 Bad Request\] error reading a body from connection
2025-11-18 08:32:25.761 ( 4480) \[DEBUG\] – reductstore/backend/file.rs:100 File /data/rolling-buffer/default/1763454638854389.blk synced to storage backend
2025-11-18 08:32:25.763 ( 4480) \[DEBUG\] – reductstore/backend/file.rs:100 File /data/rolling-buffer/default/wal/1763454638854389.wal synced to storage backend
2025-11-18 08:32:28.847 (57376) \[TRACE\] – reductstore/core/thread_pool.rs:316 Spawn unique task 'data/rolling-buffer/default: begin write
2025-11-18 08:32:28.847 (88448) \[TRACE\] – reductstore/core/thread_pool.rs:430 Task ‘data/rolling-buffer/default’ started: begin write
2025-11-18 08:32:28.847 (88448) \[DEBUG\] – reductstore/storage/entry/write_record.rs:118 Creating a new block
2025-11-18 08:32:28.847 (88448) \[DEBUG\] – reductstore/backend/file.rs:100 File /data/rolling-buffer/default/1763454638854389.blk synced to storage backend
2025-11-18 08:32:28.849 (88448) \[TRACE\] – reductstore/core/thread_pool.rs:406 Spawn isolated child task 'data/rolling-buffer/default/1763454745655942: write record content
2025-11-18 08:32:28.849 (88448) \[TRACE\] – reductstore/core/thread_pool.rs:436 Task ‘data/rolling-buffer/default’ completed: begin write
2025-11-18 08:32:28.850 ( 3456) \[TRACE\] – reductstore/core/thread_pool.rs:430 Task ‘data/rolling-buffer/default/1763454745655942’ started: write record content
2025-11-18 08:32:28.850 (42592) \[ERROR\] – reductstore/api/entry/write_single.rs:95 Error while receiving data: error reading a body from connection
2025-11-18 08:32:28.850 ( 3456) \[ERROR\] – reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763454745655942: \[BadRequest\] error reading a body from connection
2025-11-18 08:32:28.850 ( 3456) \[DEBUG\] – reductstore/storage/block_manager.rs:511 Finished writing record 1763454745655942 to block rolling-buffer/default/1763454745655942.meta with state Errored
2025-11-18 08:32:28.850 ( 3456) \[TRACE\] – reductstore/core/thread_pool.rs:436 Task ‘data/rolling-buffer/default/1763454745655942’ completed: write record content
2025-11-18 08:32:28.850 (56128) \[DEBUG\] – reductstore/api/middleware.rs:57 POST /api/v1/b/rolling-buffer/default?ts=1763454745655942 \[400 Bad Request\] error reading a body from connection
2025-11-18 08:32:28.966 ( 4480) \[DEBUG\] – reductstore/backend/file.rs:100 File /data/rolling-buffer/default/wal/1763454745655942.wal synced to storage backend
2025-11-18 08:32:28.968 ( 4480) \[DEBUG\] – reductstore/backend/file.rs:100 File /data/rolling-buffer/default/1763454745655942.blk synced to storage backend

Simplified code snippet that you can see how I write to the reduct store

void McapRecordingManager::upload_mcap_task(std::string pipeline_name, std::shared_ptr<WriterData> writer_data, uint64_t file_index, PipelineState *state_ptr, std::map<std::string, std::string> static_labels)
{
  try
  {
    if (!bucket_)
    {
      std::cerr << "[UploadThread " << pipeline_name << "] Bucket not initialized. Aborting upload." << std::endl;
      state_ptr->upload_in_progress = false;
      return;
    }
    writer_data->buffer->clear();

    // Get content length (matches: content_length = buffer.tell())
    writer_data->buffer->seekg(0, std::ios::end);
    size_t content_length = static_cast<size_t>(writer_data->buffer->tellg());

    // Reset to beginning (matches: buffer.seek(0))
    writer_data->buffer->seekg(0, std::ios::beg);

    if (content_length == 0)
    {
      std::cerr << "[UploadThread " << pipeline_name << "] Empty MCAP buffer, skipping upload." << std::endl;
      return;
    }

    reduct::IBucket::WriteOptions write_opts;
    write_opts.content_type = "application/mcap";
    write_opts.labels = static_labels; // Use labels captured from the calling thread
    write_opts.timestamp = std::chrono::time_point<std::chrono::system_clock>(
        std::chrono::microseconds(file_index));

    auto write_err = bucket_->Write(pipeline_name, write_opts, [&](auto rec)
                                    {
      rec->Write(content_length, [&](size_t offset, size_t size) -> std::pair<bool, std::string>
      {
        writer_data->buffer->seekg(static_cast<std::streamsize>(offset), std::ios::beg);
        
        size_t to_read = std::min(size, content_length - offset);
        std::vector<char> buf(to_read);
        writer_data->buffer->read(buf.data(), static_cast<std::streamsize>(to_read));
        std::streamsize read_count = writer_data->buffer->gcount();
        
        if (read_count <= 0)
        {
          // No data read - we're done or there's an error
          return {true, ""};
        }
        
        bool is_last = (offset + static_cast<size_t>(read_count)) >= content_length;
        
        return {is_last, std::string(buf.data(), static_cast<size_t>(read_count))};
      }); });

    if (write_err != reduct::Error::kOk)
    {
      std::cerr << "[UploadThread " << pipeline_name << "] Upload failed: " << write_err.message << std::endl;
    }
    else
    {
      std::cout << "[UploadThread " << pipeline_name << "] Upload complete (" << content_length << " bytes)." << std::endl;
    }
  }
  catch (const std::exception &e)
  {
    std::cerr << "[UploadThread " << pipeline_name << "] Upload exception: " << e.what() << std::endl;
  }
}

Any help is really appreciated.

Thank you

1 Like

Hi @vicmassy , Thank you for the report. Judging by the logs, the client code stopped sending data earlier than expected, meaning the database could not receive the entire record. This is because you need to return true if you want to continue streaming. Please change your code accordingly.

    writer_data->buffer->seekg(static_caststd::streamsize(offset), std::ios::beg);

   size_t to_read = std::min(size, content_length - offset);
    std::vector<char> buf(to_read);
    writer_data->buffer->read(buf.data(), static_cast<std::streamsize>(to_read));
    std::streamsize read_count = writer_data->buffer->gcount();
    
    if (read_count <= 0)
    {
      // No data read - we're done or there's an error
      return {false, ""};
    }
    
    bool is_last = (offset + static_cast<size_t>(read_count)) >= content_length;
    
    return {!is_last, std::string(buf.data(), static_cast<size_t>(read_count))};
  })

I see that the documentation says opposite. I’ll correct it.

@atimin thanks for your quick reply.

When I return true Reduct returns this:

2025-11-18 11:41:50.427 (88576) [ERROR] -- reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763466110238752: [BadRequest] Content is smaller than in content-length
2025-11-18 11:41:50.442 (88576) [ERROR] -- reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763466110425776: [BadRequest] Content is smaller than in content-length
2025-11-18 11:41:50.479 (88576) [ERROR] -- reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763466110461725: [BadRequest] Content is smaller than in content-length
2025-11-18 11:41:50.551 (88576) [ERROR] -- reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763466110526542: [BadRequest] Content is smaller than in content-length
2025-11-18 11:41:50.573 (88576) [ERROR] -- reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763466110556855: [BadRequest] Content is smaller than in content-length
2025-11-18 11:41:50.610 (88576) [ERROR] -- reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763466110593074: [BadRequest] Content is smaller than in content-length
2025-11-18 11:41:50.640 (88576) [ERROR] -- reductstore/storage/entry/io/record_writer.rs:163 Failed to write record rolling-buffer/default/1763466110623067: [BadRequest] Content is smaller than in content-length

Now it indeed seems its related to early termination of the streaming. Seems like the previous value (return false) was attempting to stream

I still think it’s miscalculation of the length or something like this. Could you remove all your checks and return always true like in this test:


REQUIRE(bucket->Write("entry", ts, [&blob](auto rec) {
  rec->Write(blob.size(), [&](auto offset, auto size) {
    return std::pair{
        true,
        blob.substr(offset, size),
    };
  });
}) == Error::kOk);

The client won’t call the callback when all the data has been sent; it checks the content length on its own side. The flag was added to interrupt the streaming in case of an error.

1 Like

@atimin thank you it now works. Clarifying that you handle the checks of the content length and stop calling it on your side greatly simplifies the code and now its solved. I for sure was miscalculating something about the length.

I leave the simplified code here for future reference if someone is interested

auto write_err = bucket_->Write(pipeline_name, write_opts, [&](auto rec)
                                    {
      rec->Write(content_length, [&](size_t offset, size_t size) -> std::pair<bool, std::string>
      {

        writer_data->buffer->seekg(static_cast<std::streamoff>(offset), std::ios::beg);
        std::vector<char> buf(size);
        writer_data->buffer->read(buf.data(), static_cast<std::streamsize>(size));
     
        return {true, std::string(buf.data(), size)};
      }); });
2 Likes