Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/unreleased/parallelizebackups.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
title: Parallelize Backup and Restore File Operations
type: changed
authors:
- name: Samuel Verstraete
nick: elangelo
links:
- name: PR#4023
url: https://github.com/apache/solr/pull/4023
145 changes: 115 additions & 30 deletions solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.math3.util.Precision;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.api.model.SolrJerseyResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.EnvUtils;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
Expand All @@ -52,6 +62,21 @@
*/
public class IncrementalShardBackup {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

/**
* Maximum number of files to upload in parallel during backup. Can be configured via the system
* property {@code solr.backup.maxparalleluploads} or environment variable {@code
* SOLR_BACKUP_MAXPARALLELUPLOADS}.
*/
private static final int DEFAULT_MAX_PARALLEL_UPLOADS =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be MAX_PARALLEL_UPLOADS and drop the default prefix

EnvUtils.getPropertyAsInteger("solr.backup.maxparalleluploads", 1);

private static final ExecutorService BACKUP_EXECUTOR =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as this is used for "uploads" (in the context of backups) and not for backups themselves, I think the name should reflect that, like BACKUP_UPLOAD_EXECUTOR (and similarly for the thread name)

ExecutorUtil.newMDCAwareCachedThreadPool(
Math.max(1, DEFAULT_MAX_PARALLEL_UPLOADS),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't bother with the 'max'; someone configuring this should know what they are doing. Heck if they put -1 thinking it might disable, they'll now probably get the error they should get.

Integer.MAX_VALUE,
new SolrNamedThreadFactory("IncrementalBackupExecutor"));

private SolrCore solrCore;

private BackupFilePaths incBackupFiles;
Expand Down Expand Up @@ -154,8 +179,8 @@ protected IncrementalShardSnapshotResponse backup(final IndexCommit indexCommit)
solrCore.getSolrConfig().indexConfig.lockType);
try {
BackupStats stats = incrementalCopy(files, dir);
details.indexFileCount = stats.fileCount;
details.uploadedIndexFileCount = stats.uploadedFileCount;
details.indexFileCount = stats.fileCount.get();
Comment thread
epugh marked this conversation as resolved.
details.uploadedIndexFileCount = stats.uploadedFileCount.get();
details.indexSizeMB = stats.getIndexSizeMB();
details.uploadedIndexFileMB = stats.getTotalUploadedMB();
} finally {
Expand Down Expand Up @@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();

for (String fileName : indexFiles) {
Optional<ShardBackupMetadata.BackedFile> opBackedFile = oldBackupPoint.getFile(fileName);
Checksum originalFileCS = backupRepo.checksum(dir, fileName);

if (opBackedFile.isPresent()) {
ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
Checksum existedFileCS = backedFile.fileChecksum;
if (existedFileCS.equals(originalFileCS)) {
currentBackupPoint.addBackedFile(opBackedFile.get());
backupStats.skippedUploadingFile(existedFileCS);
continue;
}
ExecutorService executor = BACKUP_EXECUTOR;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use BACKUP_EXECUTOR instead of doing this.


List<Future<?>> uploadFutures = new ArrayList<>();

try {
Comment on lines +221 to +223
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation queues a Future for every index file and holds them in uploadFutures until the end. For large indexes this can create significant memory overhead and delays error reporting. Consider processing completed tasks as they finish (e.g., ExecutorCompletionService) and/or limiting in-flight submissions to maxParallelUploads.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this really doesn't hold. we need to wait for all futures anyway, so storing them in a list is what we need to do here

for (String fileName : indexFiles) {
// Capture variable for lambda
final String fileNameFinal = fileName;

Runnable uploadTask =
() -> {
try {
// Calculate checksum and check if file already exists in previous backup
Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileNameFinal);
Checksum originalFileCS = backupRepo.checksum(dir, fileNameFinal);

if (opBackedFile.isPresent()) {
ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
Checksum existedFileCS = backedFile.fileChecksum;
if (existedFileCS.equals(originalFileCS)) {
synchronized (currentBackupPoint) {
currentBackupPoint.addBackedFile(opBackedFile.get());
}
backupStats.skippedUploadingFile(existedFileCS);
return;
}
}

// File doesn't exist or has changed - upload it
String backedFileName = UUID.randomUUID().toString();
backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, backedFileName);

synchronized (currentBackupPoint) {
currentBackupPoint.addBackedFile(backedFileName, fileNameFinal, originalFileCS);
}
backupStats.uploadedFile(originalFileCS);
} catch (IOException e) {
throw new RuntimeException("Failed to process file: " + fileNameFinal, e);
}
};
Comment on lines +228 to +259
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For these lambdas, can we put them in a private method?


uploadFutures.add(executor.submit(uploadTask));
}

String backedFileName = UUID.randomUUID().toString();
backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
// Wait for ALL futures before throwing - currentBackupPoint must reflect every
// successfully uploaded file before it is written, even when an error occurs.
Throwable firstError = null;
for (Future<?> future : uploadFutures) {
try {
future.get();
} catch (ExecutionException e) {
if (firstError == null) {
firstError = e.getCause();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any of them fail, you need to cancel the existing futures here with future.cancel(True) to stop executing the rest of the jobs in the queue. I believe all this does it interrupt the calling thread.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I'd actually flip the try and for nesting so we don't continue loop iterations on an error condition. Don't bother propagating interruption status to this thread as we're going to end things expeditiously and report the error.

Looking forward to Structured Concurrency some day to make this overall easier/better. We'd have to --enable-preview since it's still incubating :-/

if (firstError == null) {
firstError = e;
}
}
}

currentBackupPoint.addBackedFile(backedFileName, fileName, originalFileCS);
backupStats.uploadedFile(originalFileCS);
if (firstError != null) {
if (firstError instanceof Error) {
// Rethrow Errors (like OutOfMemoryError) - don't try to recover
throw (Error) firstError;
} else if (firstError instanceof IOException) {
throw (IOException) firstError;
} else if (firstError instanceof RuntimeException) {
throw (RuntimeException) firstError;
} else if (firstError instanceof InterruptedException) {
throw new IOException("Backup interrupted", firstError);
} else {
throw new IOException("Error during parallel backup upload", firstError);
Comment on lines +291 to +293
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You threw a SolrException in RestoreCore but throw IOException here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At most API layers in Solr, SolrException is best.

}
Comment on lines +283 to +294
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java 21 you can use a switch case here

}
} finally {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just remove this finally since it does nothing now.

// BACKUP_EXECUTOR is a shared static pool; do not shut it down here
}

currentBackupPoint.store(backupRepo, incBackupFiles.getShardBackupMetadataDir(), shardBackupId);
return backupStats;
}

private static class BackupStats {
private int fileCount;
private int uploadedFileCount;
private long indexSize;
private long totalUploadedBytes;
private final AtomicInteger fileCount = new AtomicInteger();
private final AtomicInteger uploadedFileCount = new AtomicInteger();
private final AtomicLong indexSize = new AtomicLong();
private final AtomicLong totalUploadedBytes = new AtomicLong();

public void uploadedFile(Checksum file) {
fileCount++;
uploadedFileCount++;
indexSize += file.size;
totalUploadedBytes += file.size;
fileCount.incrementAndGet();
uploadedFileCount.incrementAndGet();
indexSize.addAndGet(file.size);
totalUploadedBytes.addAndGet(file.size);
}

public void skippedUploadingFile(Checksum existedFile) {
fileCount++;
indexSize += existedFile.size;
fileCount.incrementAndGet();
indexSize.addAndGet(existedFile.size);
}

public double getIndexSizeMB() {
return Precision.round(indexSize / (1024.0 * 1024), 3);
return Precision.round(indexSize.get() / (1024.0 * 1024), 3);
}

public double getTotalUploadedMB() {
return Precision.round(totalUploadedBytes / (1024.0 * 1024), 3);
return Precision.round(totalUploadedBytes.get() / (1024.0 * 1024), 3);
}
}

Expand Down
130 changes: 106 additions & 24 deletions solr/core/src/java/org/apache/solr/handler/RestoreCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,26 @@
import java.lang.reflect.Array;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.EnvUtils;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.backup.BackupFilePaths;
Expand All @@ -48,6 +55,20 @@ public class RestoreCore implements Callable<Boolean> {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

/**
* Maximum number of files to download in parallel during restore. Can be configured via the
* system property {@code solr.backup.maxparalleldownloads} or environment variable {@code
* SOLR_BACKUP_MAXPARALLELDOWNLOADS}.
*/
private static final int DEFAULT_MAX_PARALLEL_DOWNLOADS =
EnvUtils.getPropertyAsInteger("solr.backup.maxparalleldownloads", 1);

private static final ExecutorService RESTORE_EXECUTOR =
ExecutorUtil.newMDCAwareCachedThreadPool(
Math.max(1, DEFAULT_MAX_PARALLEL_DOWNLOADS),
Integer.MAX_VALUE,
new SolrNamedThreadFactory("RestoreCoreExecutor"));

private final SolrCore core;
private RestoreRepository repository;

Expand Down Expand Up @@ -107,35 +128,96 @@ public boolean doRestore() throws Exception {
DirectoryFactory.DirContext.DEFAULT,
core.getSolrConfig().indexConfig.lockType);
Set<String> indexDirFiles = new HashSet<>(Arrays.asList(indexDir.listAll()));
// Move all files from backupDir to restoreIndexDir
for (String filename : repository.listAllFiles()) {
checkInterrupted();
try {
if (indexDirFiles.contains(filename)) {
Checksum cs = repository.checksum(filename);
IndexFetcher.CompareResult compareResult;
if (cs == null) {
compareResult = new IndexFetcher.CompareResult();
compareResult.equal = false;
} else {
compareResult = IndexFetcher.compareFile(indexDir, filename, cs.size, cs.checksum);

// Capture directories as final for lambda access
final Directory finalIndexDir = indexDir;
final Directory finalRestoreIndexDir = restoreIndexDir;

ExecutorService executor = RESTORE_EXECUTOR;

List<Future<?>> downloadFutures = new ArrayList<>();

Comment on lines +138 to +139
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This submits one task per index file and retains every Future in downloadFutures until the end. For large collections with many segment files, that can add substantial memory/GC overhead and delays surfacing failures until all tasks are submitted. Consider processing completions incrementally (e.g., ExecutorCompletionService) and/or bounding the number of in-flight tasks to maxParallelDownloads.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

try {
// Move all files from backupDir to restoreIndexDir
for (String filename : repository.listAllFiles()) {
checkInterrupted();

// Capture variables for lambda
final String filenameFinal = filename;
final boolean fileExistsLocally = indexDirFiles.contains(filename);

Runnable downloadTask =
() -> {
try {
if (fileExistsLocally) {
Checksum cs = repository.checksum(filenameFinal);
IndexFetcher.CompareResult compareResult;
if (cs == null) {
compareResult = new IndexFetcher.CompareResult();
compareResult.equal = false;
} else {
compareResult =
IndexFetcher.compareFile(
finalIndexDir, filenameFinal, cs.size, cs.checksum);
}
if (!compareResult.equal
|| (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
filenameFinal, cs.size, compareResult))) {
repository.repoCopy(filenameFinal, finalRestoreIndexDir);
} else {
// prefer local copy
repository.localCopy(finalIndexDir, filenameFinal, finalRestoreIndexDir);
}
} else {
repository.repoCopy(filenameFinal, finalRestoreIndexDir);
}
} catch (Exception e) {
log.warn("Exception while restoring the backup index ", e);
throw new RuntimeException(
"Exception while restoring the backup index for file: " + filenameFinal, e);
}
};

downloadFutures.add(executor.submit(downloadTask));
}

// Wait for ALL futures to ensure all files are processed
Throwable firstError = null;
for (Future<?> future : downloadFutures) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same code-review feedback applies this this class

try {
future.get();
} catch (ExecutionException e) {
if (firstError == null) {
firstError = e.getCause();
}
if (!compareResult.equal
|| (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
filename, cs.size, compareResult))) {
repository.repoCopy(filename, restoreIndexDir);
} else {
// prefer local copy
repository.localCopy(indexDir, filename, restoreIndexDir);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (firstError == null) {
firstError = e;
}
}
}

if (firstError != null) {
if (firstError instanceof Error) {
// Rethrow Errors (like OutOfMemoryError) - don't try to recover
throw (Error) firstError;
} else if (firstError instanceof IOException) {
throw (IOException) firstError;
} else if (firstError instanceof RuntimeException) {
throw (RuntimeException) firstError;
} else if (firstError instanceof InterruptedException) {
throw new SolrException(
SolrException.ErrorCode.UNKNOWN, "Restore interrupted", firstError);
} else {
repository.repoCopy(filename, restoreIndexDir);
throw new SolrException(
SolrException.ErrorCode.UNKNOWN,
"Error during parallel restore download",
firstError);
}
} catch (Exception e) {
log.warn("Exception while restoring the backup index ", e);
throw new SolrException(
SolrException.ErrorCode.UNKNOWN, "Exception while restoring the backup index", e);
}
} finally {
// RESTORE_EXECUTOR is a shared static pool; do not shut it down here
}
log.debug("Switching directories");
core.modifyIndexProps(restoreIndexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class GCSIncrementalBackupTest extends AbstractIncrementalBackupTest {

@BeforeClass
public static void setupClass() throws Exception {
// Enable parallel backup/restore for cloud storage tests
System.setProperty("solr.backup.maxparalleluploads", "2");
System.setProperty("solr.backup.maxparalleldownloads", "2");

configureCluster(NUM_NODES) // nodes
.addConfig("conf1", getFile("conf/solrconfig.xml").getParent())
Expand Down
Loading
Loading