Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions r/src/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,23 @@ std::string fs___SubTreeFileSystem__base_path(
return file_system->base_path();
}

// Forward declaration - defined in the ARROW_R_WITH_S3 block below.
#if defined(ARROW_R_WITH_S3)
void EnsureS3InitializedWithSigpipeHandler();
#endif

// [[arrow::export]]
cpp11::writable::list fs___FileSystemFromUri(const std::string& path) {
using cpp11::literals::operator""_nm;

#if defined(ARROW_R_WITH_S3)
// Initialize S3 before FileSystemFromUri so our options (with SIGPIPE handler)
// take effect before the C++ library's internal EnsureS3Initialized() call.
if (path.substr(0, 5) == "s3://") {
EnsureS3InitializedWithSigpipeHandler();
}
#endif

std::string out_path;
auto io_context = MainRThread::GetInstance().CancellableIOContext();
return cpp11::writable::list({"fs"_nm = cpp11::to_r6(ValueOrStop(
Expand All @@ -281,6 +294,20 @@ void fs___CopyFiles(const std::shared_ptr<fs::FileSystem>& source_fs,

#include <arrow/filesystem/s3fs.h>

// Initialize S3 with the SIGPIPE handler enabled. Without it, stale connections
// in the SDK's connection pool can trigger SIGPIPE during Aws::ShutdownAPI(),
// which causes R's signal handler to longjmp out of the teardown and segfault
// (GH-50009, GH-32026).
void EnsureS3InitializedWithSigpipeHandler() {
fs::S3GlobalOptions options = fs::S3GlobalOptions::Defaults();
options.install_sigpipe_handler = true;
auto status = fs::InitializeS3(options);
// InitializeS3 returns Invalid if already initialized - that's fine
if (!status.ok() && !fs::IsS3Initialized()) {
StopIfNotOk(status);
}
}

// [[s3::export]]
std::shared_ptr<fs::S3FileSystem> fs___S3FileSystem__create(
bool anonymous = false, std::string access_key = "", std::string secret_key = "",
Expand All @@ -291,9 +318,7 @@ std::shared_ptr<fs::S3FileSystem> fs___S3FileSystem__create(
bool allow_bucket_creation = false, bool allow_bucket_deletion = false,
bool check_directory_existence_before_creation = false, double connect_timeout = -1,
double request_timeout = -1) {
// We need to ensure that S3 is initialized before we start messing with the
// options
StopIfNotOk(fs::EnsureS3Initialized());
EnsureS3InitializedWithSigpipeHandler();
fs::S3Options s3_opts;
// Handle auth (anonymous, keys, default)
// (validation/internal coherence handled in R)
Expand Down
Loading