diff --git a/changelog/unreleased/parallelizebackups.yml b/changelog/unreleased/parallelizebackups.yml new file mode 100644 index 000000000000..d3a898d11064 --- /dev/null +++ b/changelog/unreleased/parallelizebackups.yml @@ -0,0 +1,9 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: Parallelize Backup and Restore File Operations +type: changed +authors: + - name: Samuel Verstraete + nick: elangelo +links: +- name: PR#4023 + url: https://github.com/apache/solr/pull/4023 \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java index 0e07ac0ca280..3a916ab68c40 100644 --- a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java +++ b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java @@ -23,15 +23,25 @@ import java.lang.invoke.MethodHandles; import java.net.URI; import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.math3.util.Precision; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.Directory; import org.apache.solr.client.api.model.SolrJerseyResponse; import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.EnvUtils; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.core.DirectoryFactory; import org.apache.solr.core.IndexDeletionPolicyWrapper; import org.apache.solr.core.SolrCore; @@ -52,6 +62,21 @@ */ public class IncrementalShardBackup { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Maximum number of files to upload in parallel during backup. Can be configured via the system + * property {@code solr.backup.maxparalleluploads} or environment variable {@code + * SOLR_BACKUP_MAXPARALLELUPLOADS}. + */ + private static final int MAX_PARALLEL_UPLOADS = + EnvUtils.getPropertyAsInteger("solr.backup.maxparalleluploads", 1); + + private static final ExecutorService BACKUP_UPLOAD_EXECUTOR = + ExecutorUtil.newMDCAwareCachedThreadPool( + MAX_PARALLEL_UPLOADS, + Integer.MAX_VALUE, + new SolrNamedThreadFactory("BackupUploadExecutor")); + private SolrCore solrCore; private BackupFilePaths incBackupFiles; @@ -154,8 +179,8 @@ protected IncrementalShardSnapshotResponse backup(final IndexCommit indexCommit) solrCore.getSolrConfig().indexConfig.lockType); try { BackupStats stats = incrementalCopy(files, dir); - details.indexFileCount = stats.fileCount; - details.uploadedIndexFileCount = stats.uploadedFileCount; + details.indexFileCount = stats.fileCount.get(); + details.uploadedIndexFileCount = stats.uploadedFileCount.get(); details.indexSizeMB = stats.getIndexSizeMB(); details.uploadedIndexFileMB = stats.getTotalUploadedMB(); } finally { @@ -191,25 +216,65 @@ private BackupStats incrementalCopy(Collection indexFiles, Directory dir URI indexDir = incBackupFiles.getIndexDir(); BackupStats backupStats = new BackupStats(); - for (String fileName : indexFiles) { - Optional opBackedFile = oldBackupPoint.getFile(fileName); - Checksum originalFileCS = backupRepo.checksum(dir, fileName); - - if (opBackedFile.isPresent()) { - ShardBackupMetadata.BackedFile backedFile = opBackedFile.get(); - Checksum existedFileCS = backedFile.fileChecksum; - if (existedFileCS.equals(originalFileCS)) { - currentBackupPoint.addBackedFile(opBackedFile.get()); - backupStats.skippedUploadingFile(existedFileCS); - continue; - } - } + List> uploadFutures = new ArrayList<>(); - String backedFileName = UUID.randomUUID().toString(); - backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName); + for (String fileName : indexFiles) { + // Capture variable for lambda + final String fileNameFinal = fileName; + + Runnable uploadTask = + () -> { + try { + // Calculate checksum and check if file already exists in previous backup + Optional opBackedFile = + oldBackupPoint.getFile(fileNameFinal); + Checksum originalFileCS = backupRepo.checksum(dir, fileNameFinal); + + if (opBackedFile.isPresent()) { + ShardBackupMetadata.BackedFile backedFile = opBackedFile.get(); + Checksum existedFileCS = backedFile.fileChecksum; + if (existedFileCS.equals(originalFileCS)) { + synchronized (currentBackupPoint) { + currentBackupPoint.addBackedFile(opBackedFile.get()); + } + backupStats.skippedUploadingFile(existedFileCS); + return; + } + } + + // File doesn't exist or has changed - upload it + String backedFileName = UUID.randomUUID().toString(); + backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, backedFileName); + + synchronized (currentBackupPoint) { + currentBackupPoint.addBackedFile(backedFileName, fileNameFinal, originalFileCS); + } + backupStats.uploadedFile(originalFileCS); + } catch (IOException e) { + throw new RuntimeException("Failed to process file: " + fileNameFinal, e); + } + }; + + uploadFutures.add(BACKUP_UPLOAD_EXECUTOR.submit(uploadTask)); + } - currentBackupPoint.addBackedFile(backedFileName, fileName, originalFileCS); - backupStats.uploadedFile(originalFileCS); + try { + for (Future future : uploadFutures) { + future.get(); + } + } catch (ExecutionException e) { + uploadFutures.forEach(f -> f.cancel(true)); + Throwable cause = e.getCause(); + switch (cause) { + case Error err -> throw err; + case IOException ioe -> throw ioe; + case RuntimeException re -> throw re; + default -> throw new SolrException( + SolrException.ErrorCode.UNKNOWN, "Error during parallel backup upload", cause); + } + } catch (InterruptedException e) { + uploadFutures.forEach(f -> f.cancel(true)); + throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Backup interrupted", e); } currentBackupPoint.store(backupRepo, incBackupFiles.getShardBackupMetadataDir(), shardBackupId); @@ -217,29 +282,29 @@ private BackupStats incrementalCopy(Collection indexFiles, Directory dir } private static class BackupStats { - private int fileCount; - private int uploadedFileCount; - private long indexSize; - private long totalUploadedBytes; + private final AtomicInteger fileCount = new AtomicInteger(); + private final AtomicInteger uploadedFileCount = new AtomicInteger(); + private final AtomicLong indexSize = new AtomicLong(); + private final AtomicLong totalUploadedBytes = new AtomicLong(); public void uploadedFile(Checksum file) { - fileCount++; - uploadedFileCount++; - indexSize += file.size; - totalUploadedBytes += file.size; + fileCount.incrementAndGet(); + uploadedFileCount.incrementAndGet(); + indexSize.addAndGet(file.size); + totalUploadedBytes.addAndGet(file.size); } public void skippedUploadingFile(Checksum existedFile) { - fileCount++; - indexSize += existedFile.size; + fileCount.incrementAndGet(); + indexSize.addAndGet(existedFile.size); } public double getIndexSizeMB() { - return Precision.round(indexSize / (1024.0 * 1024), 3); + return Precision.round(indexSize.get() / (1024.0 * 1024), 3); } public double getTotalUploadedMB() { - return Precision.round(totalUploadedBytes / (1024.0 * 1024), 3); + return Precision.round(totalUploadedBytes.get() / (1024.0 * 1024), 3); } } diff --git a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java index 7f4abc18ffdb..f90011209798 100644 --- a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java +++ b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java @@ -21,19 +21,26 @@ import java.lang.reflect.Array; import java.net.URI; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.EnvUtils; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.core.DirectoryFactory; import org.apache.solr.core.SolrCore; import org.apache.solr.core.backup.BackupFilePaths; @@ -48,6 +55,20 @@ public class RestoreCore implements Callable { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + /** + * Maximum number of files to download in parallel during restore. Can be configured via the + * system property {@code solr.backup.maxparalleldownloads} or environment variable {@code + * SOLR_BACKUP_MAXPARALLELDOWNLOADS}. + */ + private static final int MAX_PARALLEL_DOWNLOADS = + EnvUtils.getPropertyAsInteger("solr.backup.maxparalleldownloads", 1); + + private static final ExecutorService RESTORE_DOWNLOAD_EXECUTOR = + ExecutorUtil.newMDCAwareCachedThreadPool( + MAX_PARALLEL_DOWNLOADS, + Integer.MAX_VALUE, + new SolrNamedThreadFactory("RestoreDownloadExecutor")); + private final SolrCore core; private RestoreRepository repository; @@ -107,35 +128,73 @@ public boolean doRestore() throws Exception { DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType); Set indexDirFiles = new HashSet<>(Arrays.asList(indexDir.listAll())); + + // Capture directories as final for lambda access + final Directory finalIndexDir = indexDir; + final Directory finalRestoreIndexDir = restoreIndexDir; + + List> downloadFutures = new ArrayList<>(); + // Move all files from backupDir to restoreIndexDir for (String filename : repository.listAllFiles()) { checkInterrupted(); - try { - if (indexDirFiles.contains(filename)) { - Checksum cs = repository.checksum(filename); - IndexFetcher.CompareResult compareResult; - if (cs == null) { - compareResult = new IndexFetcher.CompareResult(); - compareResult.equal = false; - } else { - compareResult = IndexFetcher.compareFile(indexDir, filename, cs.size, cs.checksum); - } - if (!compareResult.equal - || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums( - filename, cs.size, compareResult))) { - repository.repoCopy(filename, restoreIndexDir); - } else { - // prefer local copy - repository.localCopy(indexDir, filename, restoreIndexDir); - } - } else { - repository.repoCopy(filename, restoreIndexDir); - } - } catch (Exception e) { - log.warn("Exception while restoring the backup index ", e); - throw new SolrException( - SolrException.ErrorCode.UNKNOWN, "Exception while restoring the backup index", e); + + // Capture variables for lambda + final String filenameFinal = filename; + final boolean fileExistsLocally = indexDirFiles.contains(filename); + + Runnable downloadTask = + () -> { + try { + if (fileExistsLocally) { + Checksum cs = repository.checksum(filenameFinal); + IndexFetcher.CompareResult compareResult; + if (cs == null) { + compareResult = new IndexFetcher.CompareResult(); + compareResult.equal = false; + } else { + compareResult = + IndexFetcher.compareFile( + finalIndexDir, filenameFinal, cs.size, cs.checksum); + } + if (!compareResult.equal + || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums( + filenameFinal, cs.size, compareResult))) { + repository.repoCopy(filenameFinal, finalRestoreIndexDir); + } else { + // prefer local copy + repository.localCopy(finalIndexDir, filenameFinal, finalRestoreIndexDir); + } + } else { + repository.repoCopy(filenameFinal, finalRestoreIndexDir); + } + } catch (Exception e) { + log.warn("Exception while restoring the backup index ", e); + throw new RuntimeException( + "Exception while restoring the backup index for file: " + filenameFinal, e); + } + }; + + downloadFutures.add(RESTORE_DOWNLOAD_EXECUTOR.submit(downloadTask)); + } + + try { + for (Future future : downloadFutures) { + future.get(); + } + } catch (ExecutionException e) { + downloadFutures.forEach(f -> f.cancel(true)); + Throwable cause = e.getCause(); + switch (cause) { + case Error err -> throw err; + case IOException ioe -> throw ioe; + case RuntimeException re -> throw re; + default -> throw new SolrException( + SolrException.ErrorCode.UNKNOWN, "Error during parallel restore download", cause); } + } catch (InterruptedException e) { + downloadFutures.forEach(f -> f.cancel(true)); + throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Restore interrupted", e); } log.debug("Switching directories"); core.modifyIndexProps(restoreIndexName); diff --git a/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java b/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java index 846563b929f5..ab3de1a75c2f 100644 --- a/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java +++ b/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java @@ -70,6 +70,9 @@ public class GCSIncrementalBackupTest extends AbstractIncrementalBackupTest { @BeforeClass public static void setupClass() throws Exception { + // Enable parallel backup/restore for cloud storage tests + System.setProperty("solr.backup.maxparalleluploads", "2"); + System.setProperty("solr.backup.maxparalleldownloads", "2"); configureCluster(NUM_NODES) // nodes .addConfig("conf1", getFile("conf/solrconfig.xml").getParent()) diff --git a/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java index 80c5207505b1..fc57d3350813 100644 --- a/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java +++ b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java @@ -87,6 +87,9 @@ public static void ensureCompatibleLocale() { public static void setupClass() throws Exception { System.setProperty("aws.accessKeyId", "foo"); System.setProperty("aws.secretAccessKey", "bar"); + // Enable parallel backup/restore for cloud storage tests + System.setProperty("solr.backup.maxparalleluploads", "2"); + System.setProperty("solr.backup.maxparalleldownloads", "2"); String retryMode; switch (random().nextInt(3)) { case 0: diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc index e6fa3e4d4039..10a0093fed94 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc @@ -396,6 +396,38 @@ Any children under the `` tag are passed as additional configuration Information on each of the repository implementations provided with Solr is provided below. +=== Parallel File Transfers + +Backup and restore operations can transfer multiple index files in parallel to improve throughput, especially when using cloud storage repositories like S3 or GCS where latency is higher. +The parallelism is controlled via system properties or environment variables: + +`solr.backup.maxparalleluploads`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: `1` +|=== ++ +Maximum number of index files to upload in parallel during backup operations. +Can also be set via the `SOLR_BACKUP_MAXPARALLELUPLOADS` environment variable. + +`solr.backup.maxparalleldownloads`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: `1` +|=== ++ +Maximum number of index files to download in parallel during restore operations. +Can also be set via the `SOLR_BACKUP_MAXPARALLELDOWNLOADS` environment variable. + +TIP: These are two independent thread pools — one for backup uploads and one for restore downloads. +Increasing either value can significantly improve throughput when using cloud storage (S3, GCS), but too high a value will increase IOPS and bandwidth pressure on your cluster. +Start small and increase based on observed throughput and available resources. +The configured limit applies across all concurrent backup or restore operations on the node. + +=== Checksum Verification + By default, all the repository implementations verify the integrity of the index files before they are copied to the destination. However, it is possible to disable this integrity check by setting the optional configuration property `verifyChecksum`. `verifyChecksum`:: diff --git a/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java b/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java index 298b89d38da7..5ce975a61d32 100644 --- a/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java +++ b/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java @@ -88,6 +88,12 @@ public boolean reject(Thread t) { return true; } + // Static backup/restore thread pools - stateless, no core references, threads expire on idle + if (threadName.startsWith("BackupUploadExecutor-") + || threadName.startsWith("RestoreDownloadExecutor-")) { + return true; + } + if (threadName.startsWith("closeThreadPool")) { return true; }