diff --git a/pkg/azurefile/azurefile.go b/pkg/azurefile/azurefile.go index e1add4f2a6..0f29769c5e 100644 --- a/pkg/azurefile/azurefile.go +++ b/pkg/azurefile/azurefile.go @@ -321,7 +321,7 @@ func NewDriver(options *DriverOptions) *Driver { driver.volLockMap = newLockMap() driver.subnetLockMap = newLockMap() driver.volumeLocks = newVolumeLocks() - driver.azcopy = &fileutil.Azcopy{} + driver.azcopy = &fileutil.Azcopy{ExecCmd: &fileutil.ExecCommand{}} driver.kubeconfig = options.KubeConfig driver.endpoint = options.Endpoint driver.resolver = new(NetResolver) diff --git a/pkg/azurefile/controllerserver.go b/pkg/azurefile/controllerserver.go index 7c2b7ca1b4..76889d35c6 100644 --- a/pkg/azurefile/controllerserver.go +++ b/pkg/azurefile/controllerserver.go @@ -26,7 +26,7 @@ import ( "strings" "time" - volumehelper "sigs.k8s.io/azurefile-csi-driver/pkg/util" + "sigs.k8s.io/azurefile-csi-driver/pkg/util" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas" @@ -96,7 +96,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) } capacityBytes := req.GetCapacityRange().GetRequiredBytes() - requestGiB := volumehelper.RoundUpGiB(capacityBytes) + requestGiB := util.RoundUpGiB(capacityBytes) if requestGiB == 0 { requestGiB = defaultAzureFileQuota klog.Warningf("no quota specified, set as default value(%d GiB)", defaultAzureFileQuota) @@ -634,7 +634,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) // use uuid as vhd disk name if file share specified diskName = uuid.NewString() + vhdSuffix } - diskSizeBytes := volumehelper.GiBToBytes(requestGiB) + diskSizeBytes := util.GiBToBytes(requestGiB) klog.V(2).Infof("begin to create vhd file(%s) size(%d) on share(%s) on account(%s) type(%s) rg(%s) location(%s)", diskName, diskSizeBytes, validFileShareName, account, sku, resourceGroup, location) if err := createDisk(ctx, accountName, accountKey, d.getStorageEndPointSuffix(), validFileShareName, diskName, diskSizeBytes); err != nil { @@ -906,7 +906,7 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ klog.V(2).Infof("snapshot(%s) already exists", snapshotName) return &csi.CreateSnapshotResponse{ Snapshot: &csi.Snapshot{ - SizeBytes: volumehelper.GiBToBytes(int64(itemSnapshotQuota)), + SizeBytes: util.GiBToBytes(int64(itemSnapshotQuota)), SnapshotId: sourceVolumeID + "#" + itemSnapshot, SourceVolumeId: sourceVolumeID, CreationTime: timestamppb.New(itemSnapshotTime), @@ -977,7 +977,7 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ createResp := &csi.CreateSnapshotResponse{ Snapshot: &csi.Snapshot{ - SizeBytes: volumehelper.GiBToBytes(int64(itemSnapshotQuota)), + SizeBytes: util.GiBToBytes(int64(itemSnapshotQuota)), SnapshotId: sourceVolumeID + "#" + itemSnapshot, SourceVolumeId: sourceVolumeID, CreationTime: timestamppb.New(itemSnapshotTime), @@ -1108,21 +1108,21 @@ func (d *Driver) copyFileShareByAzcopy(srcFileShareName, dstFileShareName, srcPa klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) switch jobState { - case volumehelper.AzcopyJobError, volumehelper.AzcopyJobCompleted: + case util.AzcopyJobError, util.AzcopyJobCompleted, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped: return err - case volumehelper.AzcopyJobRunning: + case util.AzcopyJobRunning: err = wait.PollImmediate(20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() (bool, error) { jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) if err != nil { return false, err } - if jobState == volumehelper.AzcopyJobRunning { + if jobState == util.AzcopyJobRunning { return false, nil } return true, nil }) - case volumehelper.AzcopyJobNotFound: + case util.AzcopyJobNotFound: klog.V(2).Infof("copy fileshare %s:%s to %s:%s", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName) execAzcopyJob := func() error { if out, err := d.execAzcopyCopy(srcPathAuth, dstPath, azcopyCopyOptions, authAzcopyEnv); err != nil { @@ -1134,13 +1134,16 @@ func (d *Driver) copyFileShareByAzcopy(srcFileShareName, dstFileShareName, srcPa jobState, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) return fmt.Errorf("azcopy job status: %s, timeout waiting for copy fileshare %s:%s to %s:%s complete, current copy percent: %s%%", jobState, srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent) } - err = volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc) + err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc) } if err != nil { klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstFileShareName, err) } else { klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName) + if out, err := d.azcopy.CleanJobs(); err != nil { + klog.Warningf("clean azcopy jobs failed with error: %v, output: %s", err, string(out)) + } } return err } @@ -1165,7 +1168,7 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.Controller if capacityBytes == 0 { return nil, status.Error(codes.InvalidArgument, "volume capacity range missing in request") } - requestGiB := volumehelper.RoundUpGiB(capacityBytes) + requestGiB := util.RoundUpGiB(capacityBytes) if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil { return nil, status.Errorf(codes.InvalidArgument, "invalid expand volume request: %v", req) } diff --git a/pkg/util/util.go b/pkg/util/util.go index 3531d2efa9..e84466d0ca 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -34,10 +34,13 @@ const ( type AzcopyJobState string const ( - AzcopyJobError AzcopyJobState = "Error" - AzcopyJobNotFound AzcopyJobState = "NotFound" - AzcopyJobRunning AzcopyJobState = "Running" - AzcopyJobCompleted AzcopyJobState = "Completed" + AzcopyJobError AzcopyJobState = "Error" + AzcopyJobNotFound AzcopyJobState = "NotFound" + AzcopyJobRunning AzcopyJobState = "Running" + AzcopyJobCompleted AzcopyJobState = "Completed" + AzcopyJobCompletedWithErrors AzcopyJobState = "CompletedWithErrors" + AzcopyJobCompletedWithSkipped AzcopyJobState = "CompletedWithSkipped" + AzcopyJobCompletedWithErrorsAndSkipped AzcopyJobState = "CompletedWithErrorsAndSkipped" ) // control the number of concurrent powershell commands running on Windows node @@ -122,9 +125,6 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc // Start Time: Wednesday, 09-Aug-23 09:09:03 UTC // Status: Cancelled // Command: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false - if ac.ExecCmd == nil { - ac.ExecCmd = &ExecCommand{} - } out, err := ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv) // if grep command returns nothing, the exec will return exit status 1 error, so filter this error if err != nil && err.Error() != "exit status 1" { @@ -158,13 +158,8 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc return jobState, percent, nil } -// TestListJobs test azcopy jobs list command with authAzcopyEnv -func (ac *Azcopy) TestListJobs(accountName, storageEndpointSuffix string, authAzcopyEnv []string) (string, error) { - cmdStr := fmt.Sprintf("azcopy list %s", fmt.Sprintf("https://%s.file.%s", accountName, storageEndpointSuffix)) - if ac.ExecCmd == nil { - ac.ExecCmd = &ExecCommand{} - } - return ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv) +func (ac *Azcopy) CleanJobs() (string, error) { + return ac.ExecCmd.RunCommand("azcopy jobs clean", nil) } // parseAzcopyJobList parse command azcopy jobs list, get jobid and state from joblist diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 07b4372769..6380f17751 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -161,8 +161,7 @@ func TestGetAzcopyJob(t *testing.T) { m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstFileshare -B 3"), []string{}).Return(test.showStr, test.showErr) } - azcopyFunc := &Azcopy{} - azcopyFunc.ExecCmd = m + azcopyFunc := &Azcopy{ExecCmd: m} jobState, percent, err := azcopyFunc.GetAzcopyJob(dstFileshare, []string{}) if jobState != test.expectedJobState || percent != test.expectedPercent || !reflect.DeepEqual(err, test.expectedErr) { t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, err: %v, expected jobState: %v, percent: %v, err: %v", test.desc, jobState, percent, err, test.expectedJobState, test.expectedPercent, test.expectedErr) @@ -170,6 +169,41 @@ func TestGetAzcopyJob(t *testing.T) { } } +func TestCleanJobs(t *testing.T) { + tests := []struct { + desc string + execStr string + execErr error + expectedErr error + }{ + { + desc: "run exec get error", + execStr: "", + execErr: fmt.Errorf("error"), + expectedErr: fmt.Errorf("error"), + }, + { + desc: "run exec succeed", + execStr: "cleaned", + execErr: nil, + expectedErr: nil, + }, + } + for _, test := range tests { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := NewMockEXEC(ctrl) + m.EXPECT().RunCommand(gomock.Eq("azcopy jobs clean"), nil).Return(test.execStr, test.execErr) + + azcopyFunc := &Azcopy{ExecCmd: m} + _, err := azcopyFunc.CleanJobs() + if !reflect.DeepEqual(err, test.expectedErr) { + t.Errorf("test[%s]: unexpected err: %v, expected err: %v", test.desc, err, test.expectedErr) + } + } +} + func TestParseAzcopyJobList(t *testing.T) { tests := []struct { desc string