-
Notifications
You must be signed in to change notification settings - Fork 824
SOLR-17208: Parallelize backup and restore file transfers #4023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
64619f8
34a9000
f83b6e8
3e17ce5
a19ebce
3fcf874
23d5561
5ed0f17
a417d32
d8bb9b4
a893b64
f7dd1a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| # See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc | ||
| title: Parallelize Backup and Restore File Operations | ||
| type: changed | ||
| authors: | ||
| - name: Samuel Verstraete | ||
| nick: elangelo | ||
| links: | ||
| - name: PR#4023 | ||
| url: https://github.com/apache/solr/pull/4023 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,15 +23,25 @@ | |
| import java.lang.invoke.MethodHandles; | ||
| import java.net.URI; | ||
| import java.time.Instant; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import org.apache.commons.math3.util.Precision; | ||
| import org.apache.lucene.index.IndexCommit; | ||
| import org.apache.lucene.store.Directory; | ||
| import org.apache.solr.client.api.model.SolrJerseyResponse; | ||
| import org.apache.solr.cloud.CloudDescriptor; | ||
| import org.apache.solr.common.SolrException; | ||
| import org.apache.solr.common.util.EnvUtils; | ||
| import org.apache.solr.common.util.ExecutorUtil; | ||
| import org.apache.solr.common.util.SolrNamedThreadFactory; | ||
| import org.apache.solr.core.DirectoryFactory; | ||
| import org.apache.solr.core.IndexDeletionPolicyWrapper; | ||
| import org.apache.solr.core.SolrCore; | ||
|
|
@@ -52,6 +62,21 @@ | |
| */ | ||
| public class IncrementalShardBackup { | ||
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); | ||
|
|
||
| /** | ||
| * Maximum number of files to upload in parallel during backup. Can be configured via the system | ||
| * property {@code solr.backup.maxparalleluploads} or environment variable {@code | ||
| * SOLR_BACKUP_MAXPARALLELUPLOADS}. | ||
| */ | ||
| private static final int DEFAULT_MAX_PARALLEL_UPLOADS = | ||
| EnvUtils.getPropertyAsInteger("solr.backup.maxparalleluploads", 1); | ||
|
|
||
| private static final ExecutorService BACKUP_EXECUTOR = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as this is used for "uploads" (in the context of backups) and not for backups themselves, I think the name should reflect that, like BACKUP_UPLOAD_EXECUTOR (and similarly for the thread name) |
||
| ExecutorUtil.newMDCAwareCachedThreadPool( | ||
| Math.max(1, DEFAULT_MAX_PARALLEL_UPLOADS), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't bother with the 'max'; someone configuring this should know what they are doing. Heck if they put -1 thinking it might disable, they'll now probably get the error they should get. |
||
| Integer.MAX_VALUE, | ||
| new SolrNamedThreadFactory("IncrementalBackupExecutor")); | ||
|
|
||
| private SolrCore solrCore; | ||
|
|
||
| private BackupFilePaths incBackupFiles; | ||
|
|
@@ -154,8 +179,8 @@ protected IncrementalShardSnapshotResponse backup(final IndexCommit indexCommit) | |
| solrCore.getSolrConfig().indexConfig.lockType); | ||
| try { | ||
| BackupStats stats = incrementalCopy(files, dir); | ||
| details.indexFileCount = stats.fileCount; | ||
| details.uploadedIndexFileCount = stats.uploadedFileCount; | ||
| details.indexFileCount = stats.fileCount.get(); | ||
|
epugh marked this conversation as resolved.
|
||
| details.uploadedIndexFileCount = stats.uploadedFileCount.get(); | ||
| details.indexSizeMB = stats.getIndexSizeMB(); | ||
| details.uploadedIndexFileMB = stats.getTotalUploadedMB(); | ||
| } finally { | ||
|
|
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> indexFiles, Directory dir | |
| URI indexDir = incBackupFiles.getIndexDir(); | ||
| BackupStats backupStats = new BackupStats(); | ||
|
|
||
| for (String fileName : indexFiles) { | ||
| Optional<ShardBackupMetadata.BackedFile> opBackedFile = oldBackupPoint.getFile(fileName); | ||
| Checksum originalFileCS = backupRepo.checksum(dir, fileName); | ||
|
|
||
| if (opBackedFile.isPresent()) { | ||
| ShardBackupMetadata.BackedFile backedFile = opBackedFile.get(); | ||
| Checksum existedFileCS = backedFile.fileChecksum; | ||
| if (existedFileCS.equals(originalFileCS)) { | ||
| currentBackupPoint.addBackedFile(opBackedFile.get()); | ||
| backupStats.skippedUploadingFile(existedFileCS); | ||
| continue; | ||
| } | ||
| ExecutorService executor = BACKUP_EXECUTOR; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just use |
||
|
|
||
| List<Future<?>> uploadFutures = new ArrayList<>(); | ||
|
|
||
| try { | ||
|
Comment on lines
+221
to
+223
|
||
| for (String fileName : indexFiles) { | ||
| // Capture variable for lambda | ||
| final String fileNameFinal = fileName; | ||
|
|
||
| Runnable uploadTask = | ||
| () -> { | ||
| try { | ||
| // Calculate checksum and check if file already exists in previous backup | ||
| Optional<ShardBackupMetadata.BackedFile> opBackedFile = | ||
| oldBackupPoint.getFile(fileNameFinal); | ||
| Checksum originalFileCS = backupRepo.checksum(dir, fileNameFinal); | ||
|
|
||
| if (opBackedFile.isPresent()) { | ||
| ShardBackupMetadata.BackedFile backedFile = opBackedFile.get(); | ||
| Checksum existedFileCS = backedFile.fileChecksum; | ||
| if (existedFileCS.equals(originalFileCS)) { | ||
| synchronized (currentBackupPoint) { | ||
| currentBackupPoint.addBackedFile(opBackedFile.get()); | ||
| } | ||
| backupStats.skippedUploadingFile(existedFileCS); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| // File doesn't exist or has changed - upload it | ||
| String backedFileName = UUID.randomUUID().toString(); | ||
| backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, backedFileName); | ||
|
|
||
| synchronized (currentBackupPoint) { | ||
| currentBackupPoint.addBackedFile(backedFileName, fileNameFinal, originalFileCS); | ||
| } | ||
| backupStats.uploadedFile(originalFileCS); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to process file: " + fileNameFinal, e); | ||
| } | ||
| }; | ||
|
Comment on lines
+228
to
+259
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: For these lambdas, can we put them in a private method? |
||
|
|
||
| uploadFutures.add(executor.submit(uploadTask)); | ||
| } | ||
|
|
||
| String backedFileName = UUID.randomUUID().toString(); | ||
| backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName); | ||
| // Wait for ALL futures before throwing - currentBackupPoint must reflect every | ||
| // successfully uploaded file before it is written, even when an error occurs. | ||
| Throwable firstError = null; | ||
| for (Future<?> future : uploadFutures) { | ||
| try { | ||
| future.get(); | ||
| } catch (ExecutionException e) { | ||
| if (firstError == null) { | ||
| firstError = e.getCause(); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If any of them fail, you need to cancel the existing futures here with
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. I'd actually flip the Looking forward to Structured Concurrency some day to make this overall easier/better. We'd have to |
||
| if (firstError == null) { | ||
| firstError = e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| currentBackupPoint.addBackedFile(backedFileName, fileName, originalFileCS); | ||
| backupStats.uploadedFile(originalFileCS); | ||
| if (firstError != null) { | ||
| if (firstError instanceof Error) { | ||
| // Rethrow Errors (like OutOfMemoryError) - don't try to recover | ||
| throw (Error) firstError; | ||
| } else if (firstError instanceof IOException) { | ||
| throw (IOException) firstError; | ||
| } else if (firstError instanceof RuntimeException) { | ||
| throw (RuntimeException) firstError; | ||
| } else if (firstError instanceof InterruptedException) { | ||
| throw new IOException("Backup interrupted", firstError); | ||
| } else { | ||
| throw new IOException("Error during parallel backup upload", firstError); | ||
|
Comment on lines
+291
to
+293
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You threw a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At most API layers in Solr, SolrException is best. |
||
| } | ||
|
Comment on lines
+283
to
+294
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Java 21 you can use a switch case here |
||
| } | ||
| } finally { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just remove this finally since it does nothing now. |
||
| // BACKUP_EXECUTOR is a shared static pool; do not shut it down here | ||
| } | ||
|
|
||
| currentBackupPoint.store(backupRepo, incBackupFiles.getShardBackupMetadataDir(), shardBackupId); | ||
| return backupStats; | ||
| } | ||
|
|
||
| private static class BackupStats { | ||
| private int fileCount; | ||
| private int uploadedFileCount; | ||
| private long indexSize; | ||
| private long totalUploadedBytes; | ||
| private final AtomicInteger fileCount = new AtomicInteger(); | ||
| private final AtomicInteger uploadedFileCount = new AtomicInteger(); | ||
| private final AtomicLong indexSize = new AtomicLong(); | ||
| private final AtomicLong totalUploadedBytes = new AtomicLong(); | ||
|
|
||
| public void uploadedFile(Checksum file) { | ||
| fileCount++; | ||
| uploadedFileCount++; | ||
| indexSize += file.size; | ||
| totalUploadedBytes += file.size; | ||
| fileCount.incrementAndGet(); | ||
| uploadedFileCount.incrementAndGet(); | ||
| indexSize.addAndGet(file.size); | ||
| totalUploadedBytes.addAndGet(file.size); | ||
| } | ||
|
|
||
| public void skippedUploadingFile(Checksum existedFile) { | ||
| fileCount++; | ||
| indexSize += existedFile.size; | ||
| fileCount.incrementAndGet(); | ||
| indexSize.addAndGet(existedFile.size); | ||
| } | ||
|
|
||
| public double getIndexSizeMB() { | ||
| return Precision.round(indexSize / (1024.0 * 1024), 3); | ||
| return Precision.round(indexSize.get() / (1024.0 * 1024), 3); | ||
| } | ||
|
|
||
| public double getTotalUploadedMB() { | ||
| return Precision.round(totalUploadedBytes / (1024.0 * 1024), 3); | ||
| return Precision.round(totalUploadedBytes.get() / (1024.0 * 1024), 3); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,19 +21,26 @@ | |
| import java.lang.reflect.Array; | ||
| import java.net.URI; | ||
| import java.text.SimpleDateFormat; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Date; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.concurrent.Callable; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Future; | ||
| import org.apache.lucene.codecs.CodecUtil; | ||
| import org.apache.lucene.store.Directory; | ||
| import org.apache.lucene.store.IOContext; | ||
| import org.apache.lucene.store.IndexInput; | ||
| import org.apache.solr.common.SolrException; | ||
| import org.apache.solr.common.util.EnvUtils; | ||
| import org.apache.solr.common.util.ExecutorUtil; | ||
| import org.apache.solr.common.util.SolrNamedThreadFactory; | ||
| import org.apache.solr.core.DirectoryFactory; | ||
| import org.apache.solr.core.SolrCore; | ||
| import org.apache.solr.core.backup.BackupFilePaths; | ||
|
|
@@ -48,6 +55,20 @@ public class RestoreCore implements Callable<Boolean> { | |
|
|
||
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); | ||
|
|
||
| /** | ||
| * Maximum number of files to download in parallel during restore. Can be configured via the | ||
| * system property {@code solr.backup.maxparalleldownloads} or environment variable {@code | ||
| * SOLR_BACKUP_MAXPARALLELDOWNLOADS}. | ||
| */ | ||
| private static final int DEFAULT_MAX_PARALLEL_DOWNLOADS = | ||
| EnvUtils.getPropertyAsInteger("solr.backup.maxparalleldownloads", 1); | ||
|
|
||
| private static final ExecutorService RESTORE_EXECUTOR = | ||
| ExecutorUtil.newMDCAwareCachedThreadPool( | ||
| Math.max(1, DEFAULT_MAX_PARALLEL_DOWNLOADS), | ||
| Integer.MAX_VALUE, | ||
| new SolrNamedThreadFactory("RestoreCoreExecutor")); | ||
|
|
||
| private final SolrCore core; | ||
| private RestoreRepository repository; | ||
|
|
||
|
|
@@ -107,35 +128,96 @@ public boolean doRestore() throws Exception { | |
| DirectoryFactory.DirContext.DEFAULT, | ||
| core.getSolrConfig().indexConfig.lockType); | ||
| Set<String> indexDirFiles = new HashSet<>(Arrays.asList(indexDir.listAll())); | ||
| // Move all files from backupDir to restoreIndexDir | ||
| for (String filename : repository.listAllFiles()) { | ||
| checkInterrupted(); | ||
| try { | ||
| if (indexDirFiles.contains(filename)) { | ||
| Checksum cs = repository.checksum(filename); | ||
| IndexFetcher.CompareResult compareResult; | ||
| if (cs == null) { | ||
| compareResult = new IndexFetcher.CompareResult(); | ||
| compareResult.equal = false; | ||
| } else { | ||
| compareResult = IndexFetcher.compareFile(indexDir, filename, cs.size, cs.checksum); | ||
|
|
||
| // Capture directories as final for lambda access | ||
| final Directory finalIndexDir = indexDir; | ||
| final Directory finalRestoreIndexDir = restoreIndexDir; | ||
|
|
||
| ExecutorService executor = RESTORE_EXECUTOR; | ||
|
|
||
| List<Future<?>> downloadFutures = new ArrayList<>(); | ||
|
|
||
|
Comment on lines
+138
to
+139
|
||
| try { | ||
| // Move all files from backupDir to restoreIndexDir | ||
| for (String filename : repository.listAllFiles()) { | ||
| checkInterrupted(); | ||
|
|
||
| // Capture variables for lambda | ||
| final String filenameFinal = filename; | ||
| final boolean fileExistsLocally = indexDirFiles.contains(filename); | ||
|
|
||
| Runnable downloadTask = | ||
| () -> { | ||
| try { | ||
| if (fileExistsLocally) { | ||
| Checksum cs = repository.checksum(filenameFinal); | ||
| IndexFetcher.CompareResult compareResult; | ||
| if (cs == null) { | ||
| compareResult = new IndexFetcher.CompareResult(); | ||
| compareResult.equal = false; | ||
| } else { | ||
| compareResult = | ||
| IndexFetcher.compareFile( | ||
| finalIndexDir, filenameFinal, cs.size, cs.checksum); | ||
| } | ||
| if (!compareResult.equal | ||
| || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums( | ||
| filenameFinal, cs.size, compareResult))) { | ||
| repository.repoCopy(filenameFinal, finalRestoreIndexDir); | ||
| } else { | ||
| // prefer local copy | ||
| repository.localCopy(finalIndexDir, filenameFinal, finalRestoreIndexDir); | ||
| } | ||
| } else { | ||
| repository.repoCopy(filenameFinal, finalRestoreIndexDir); | ||
| } | ||
| } catch (Exception e) { | ||
| log.warn("Exception while restoring the backup index ", e); | ||
| throw new RuntimeException( | ||
| "Exception while restoring the backup index for file: " + filenameFinal, e); | ||
| } | ||
| }; | ||
|
|
||
| downloadFutures.add(executor.submit(downloadTask)); | ||
| } | ||
|
|
||
| // Wait for ALL futures to ensure all files are processed | ||
| Throwable firstError = null; | ||
| for (Future<?> future : downloadFutures) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same code-review feedback applies this this class |
||
| try { | ||
| future.get(); | ||
| } catch (ExecutionException e) { | ||
| if (firstError == null) { | ||
| firstError = e.getCause(); | ||
| } | ||
| if (!compareResult.equal | ||
| || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums( | ||
| filename, cs.size, compareResult))) { | ||
| repository.repoCopy(filename, restoreIndexDir); | ||
| } else { | ||
| // prefer local copy | ||
| repository.localCopy(indexDir, filename, restoreIndexDir); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| if (firstError == null) { | ||
| firstError = e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (firstError != null) { | ||
| if (firstError instanceof Error) { | ||
| // Rethrow Errors (like OutOfMemoryError) - don't try to recover | ||
| throw (Error) firstError; | ||
| } else if (firstError instanceof IOException) { | ||
| throw (IOException) firstError; | ||
| } else if (firstError instanceof RuntimeException) { | ||
| throw (RuntimeException) firstError; | ||
| } else if (firstError instanceof InterruptedException) { | ||
| throw new SolrException( | ||
| SolrException.ErrorCode.UNKNOWN, "Restore interrupted", firstError); | ||
| } else { | ||
| repository.repoCopy(filename, restoreIndexDir); | ||
| throw new SolrException( | ||
| SolrException.ErrorCode.UNKNOWN, | ||
| "Error during parallel restore download", | ||
| firstError); | ||
| } | ||
| } catch (Exception e) { | ||
| log.warn("Exception while restoring the backup index ", e); | ||
| throw new SolrException( | ||
| SolrException.ErrorCode.UNKNOWN, "Exception while restoring the backup index", e); | ||
| } | ||
| } finally { | ||
| // RESTORE_EXECUTOR is a shared static pool; do not shut it down here | ||
| } | ||
| log.debug("Switching directories"); | ||
| core.modifyIndexProps(restoreIndexName); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just be
MAX_PARALLEL_UPLOADSand drop thedefaultprefix