Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/azurefile/azurefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func NewDriver(options *DriverOptions) *Driver {
driver.volLockMap = newLockMap()
driver.subnetLockMap = newLockMap()
driver.volumeLocks = newVolumeLocks()
driver.azcopy = &fileutil.Azcopy{}
driver.azcopy = &fileutil.Azcopy{ExecCmd: &fileutil.ExecCommand{}}
driver.kubeconfig = options.KubeConfig
driver.endpoint = options.Endpoint
driver.resolver = new(NetResolver)
Expand Down
25 changes: 14 additions & 11 deletions pkg/azurefile/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"strings"
"time"

volumehelper "sigs.k8s.io/azurefile-csi-driver/pkg/util"
"sigs.k8s.io/azurefile-csi-driver/pkg/util"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas"
Expand Down Expand Up @@ -96,7 +96,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}

capacityBytes := req.GetCapacityRange().GetRequiredBytes()
requestGiB := volumehelper.RoundUpGiB(capacityBytes)
requestGiB := util.RoundUpGiB(capacityBytes)
if requestGiB == 0 {
requestGiB = defaultAzureFileQuota
klog.Warningf("no quota specified, set as default value(%d GiB)", defaultAzureFileQuota)
Expand Down Expand Up @@ -634,7 +634,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
// use uuid as vhd disk name if file share specified
diskName = uuid.NewString() + vhdSuffix
}
diskSizeBytes := volumehelper.GiBToBytes(requestGiB)
diskSizeBytes := util.GiBToBytes(requestGiB)
klog.V(2).Infof("begin to create vhd file(%s) size(%d) on share(%s) on account(%s) type(%s) rg(%s) location(%s)",
diskName, diskSizeBytes, validFileShareName, account, sku, resourceGroup, location)
if err := createDisk(ctx, accountName, accountKey, d.getStorageEndPointSuffix(), validFileShareName, diskName, diskSizeBytes); err != nil {
Expand Down Expand Up @@ -906,7 +906,7 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
klog.V(2).Infof("snapshot(%s) already exists", snapshotName)
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: volumehelper.GiBToBytes(int64(itemSnapshotQuota)),
SizeBytes: util.GiBToBytes(int64(itemSnapshotQuota)),
SnapshotId: sourceVolumeID + "#" + itemSnapshot,
SourceVolumeId: sourceVolumeID,
CreationTime: timestamppb.New(itemSnapshotTime),
Expand Down Expand Up @@ -977,7 +977,7 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ

createResp := &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: volumehelper.GiBToBytes(int64(itemSnapshotQuota)),
SizeBytes: util.GiBToBytes(int64(itemSnapshotQuota)),
SnapshotId: sourceVolumeID + "#" + itemSnapshot,
SourceVolumeId: sourceVolumeID,
CreationTime: timestamppb.New(itemSnapshotTime),
Expand Down Expand Up @@ -1108,21 +1108,21 @@ func (d *Driver) copyFileShareByAzcopy(srcFileShareName, dstFileShareName, srcPa
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)

switch jobState {
case volumehelper.AzcopyJobError, volumehelper.AzcopyJobCompleted:
case util.AzcopyJobError, util.AzcopyJobCompleted, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped:
return err
case volumehelper.AzcopyJobRunning:
case util.AzcopyJobRunning:
err = wait.PollImmediate(20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() (bool, error) {
jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
if err != nil {
return false, err
}
if jobState == volumehelper.AzcopyJobRunning {
if jobState == util.AzcopyJobRunning {
return false, nil
}
return true, nil
})
case volumehelper.AzcopyJobNotFound:
case util.AzcopyJobNotFound:
klog.V(2).Infof("copy fileshare %s:%s to %s:%s", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName)
execAzcopyJob := func() error {
if out, err := d.execAzcopyCopy(srcPathAuth, dstPath, azcopyCopyOptions, authAzcopyEnv); err != nil {
Expand All @@ -1134,13 +1134,16 @@ func (d *Driver) copyFileShareByAzcopy(srcFileShareName, dstFileShareName, srcPa
jobState, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
return fmt.Errorf("azcopy job status: %s, timeout waiting for copy fileshare %s:%s to %s:%s complete, current copy percent: %s%%", jobState, srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent)
}
err = volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc)
err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc)
}

if err != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstFileShareName, err)
} else {
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
if out, err := d.azcopy.CleanJobs(); err != nil {
klog.Warningf("clean azcopy jobs failed with error: %v, output: %s", err, string(out))
}
}
return err
}
Expand All @@ -1165,7 +1168,7 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.Controller
if capacityBytes == 0 {
return nil, status.Error(codes.InvalidArgument, "volume capacity range missing in request")
}
requestGiB := volumehelper.RoundUpGiB(capacityBytes)
requestGiB := util.RoundUpGiB(capacityBytes)
if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid expand volume request: %v", req)
}
Expand Down
23 changes: 9 additions & 14 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ const (
type AzcopyJobState string

const (
AzcopyJobError AzcopyJobState = "Error"
AzcopyJobNotFound AzcopyJobState = "NotFound"
AzcopyJobRunning AzcopyJobState = "Running"
AzcopyJobCompleted AzcopyJobState = "Completed"
AzcopyJobError AzcopyJobState = "Error"
AzcopyJobNotFound AzcopyJobState = "NotFound"
AzcopyJobRunning AzcopyJobState = "Running"
AzcopyJobCompleted AzcopyJobState = "Completed"
AzcopyJobCompletedWithErrors AzcopyJobState = "CompletedWithErrors"
AzcopyJobCompletedWithSkipped AzcopyJobState = "CompletedWithSkipped"
AzcopyJobCompletedWithErrorsAndSkipped AzcopyJobState = "CompletedWithErrorsAndSkipped"
)

// control the number of concurrent powershell commands running on Windows node
Expand Down Expand Up @@ -122,9 +125,6 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc
// Start Time: Wednesday, 09-Aug-23 09:09:03 UTC
// Status: Cancelled
// Command: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false
if ac.ExecCmd == nil {
ac.ExecCmd = &ExecCommand{}
}
out, err := ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv)
// if grep command returns nothing, the exec will return exit status 1 error, so filter this error
if err != nil && err.Error() != "exit status 1" {
Expand Down Expand Up @@ -158,13 +158,8 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc
return jobState, percent, nil
}

// TestListJobs test azcopy jobs list command with authAzcopyEnv
func (ac *Azcopy) TestListJobs(accountName, storageEndpointSuffix string, authAzcopyEnv []string) (string, error) {
cmdStr := fmt.Sprintf("azcopy list %s", fmt.Sprintf("https://%s.file.%s", accountName, storageEndpointSuffix))
if ac.ExecCmd == nil {
ac.ExecCmd = &ExecCommand{}
}
return ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv)
func (ac *Azcopy) CleanJobs() (string, error) {
return ac.ExecCmd.RunCommand("azcopy jobs clean", nil)
}

// parseAzcopyJobList parse command azcopy jobs list, get jobid and state from joblist
Expand Down
38 changes: 36 additions & 2 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,49 @@ func TestGetAzcopyJob(t *testing.T) {
m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstFileshare -B 3"), []string{}).Return(test.showStr, test.showErr)
}

azcopyFunc := &Azcopy{}
azcopyFunc.ExecCmd = m
azcopyFunc := &Azcopy{ExecCmd: m}
jobState, percent, err := azcopyFunc.GetAzcopyJob(dstFileshare, []string{})
if jobState != test.expectedJobState || percent != test.expectedPercent || !reflect.DeepEqual(err, test.expectedErr) {
t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, err: %v, expected jobState: %v, percent: %v, err: %v", test.desc, jobState, percent, err, test.expectedJobState, test.expectedPercent, test.expectedErr)
}
}
}

func TestCleanJobs(t *testing.T) {
tests := []struct {
desc string
execStr string
execErr error
expectedErr error
}{
{
desc: "run exec get error",
execStr: "",
execErr: fmt.Errorf("error"),
expectedErr: fmt.Errorf("error"),
},
{
desc: "run exec succeed",
execStr: "cleaned",
execErr: nil,
expectedErr: nil,
},
}
for _, test := range tests {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

m := NewMockEXEC(ctrl)
m.EXPECT().RunCommand(gomock.Eq("azcopy jobs clean"), nil).Return(test.execStr, test.execErr)

azcopyFunc := &Azcopy{ExecCmd: m}
_, err := azcopyFunc.CleanJobs()
if !reflect.DeepEqual(err, test.expectedErr) {
t.Errorf("test[%s]: unexpected err: %v, expected err: %v", test.desc, err, test.expectedErr)
}
}
}

func TestParseAzcopyJobList(t *testing.T) {
tests := []struct {
desc string
Expand Down
Loading