Skip to content

Commit

Permalink
send patch vm requests to channel (#8420)
Browse files Browse the repository at this point in the history
Signed-off-by: Fan Shang Xiang <[email protected]>
Co-authored-by: Fan Shang Xiang <[email protected]>
  • Loading branch information
1 parent 27f963a commit 75685bc
Show file tree
Hide file tree
Showing 4 changed files with 516 additions and 71 deletions.
143 changes: 108 additions & 35 deletions pkg/provider/azure_vmss.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v6"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v6"
"github.com/samber/lo"
"golang.org/x/sync/errgroup"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
Expand Down Expand Up @@ -1452,40 +1452,29 @@ func (ss *ScaleSet) ensureHostsInPool(ctx context.Context, service *v1.Service,
meta := meta
update := update
hostUpdates = append(hostUpdates, func() error {
ctx, cancel := getContextWithCancel()
defer cancel()

logFields := []interface{}{
"operation", "EnsureHostsInPool UpdateVMSSVMs",
"vmssName", meta.vmssName,
"resourceGroup", meta.resourceGroup,
"backendPoolID", backendPoolID,
}

logger := klog.LoggerWithValues(klog.FromContext(ctx), logFields...)
batchSize, err := ss.VMSSBatchSize(ctx, meta.vmssName)
if err != nil {
klog.ErrorS(err, "Failed to get vmss batch size", logFields...)
logger.Error(err, "Failed to get vmss batch size")
return err
}
ctx = klog.NewContext(ctx, logger)

klog.V(2).InfoS("Begin to update VMs for VMSS with new backendPoolID", logFields...)
grp, ctx := errgroup.WithContext(ctx)
grp.SetLimit(batchSize)
for instanceID, vm := range update {
instanceID := instanceID
vm := vm
grp.Go(func() error {
_, rerr := ss.ComputeClientFactory.GetVirtualMachineScaleSetVMClient().Update(ctx, meta.resourceGroup, meta.vmssName, instanceID, vm)
return rerr
})
}
rerr := grp.Wait()
if rerr != nil {
klog.ErrorS(err, "Failed to update VMs for VMSS", logFields...)
return rerr
}
errChan := ss.UpdateVMSSVMsInBatch(ctx, meta, update, batchSize)

return nil
errs := make([]error, 0)
for err := range errChan {
if err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
})
}
errs := utilerrors.AggregateGoroutines(hostUpdates...)
Expand Down Expand Up @@ -1955,18 +1944,15 @@ func (ss *ScaleSet) ensureBackendPoolDeleted(ctx context.Context, service *v1.Se
klog.ErrorS(err, "Failed to get vmss batch size", logFields...)
return err
}
grp, ctx := errgroup.WithContext(ctx)
grp.SetLimit(batchSize)
for instanceID, vm := range update {
instanceID := instanceID
vm := vm
grp.Go(func() error {
klog.V(2).InfoS("Begin to update VMs for VMSS with new backendPoolID", logFields...)
_, rerr := ss.ComputeClientFactory.GetVirtualMachineScaleSetVMClient().Update(ctx, meta.resourceGroup, meta.vmssName, instanceID, vm)
return rerr
})

errChan := ss.UpdateVMSSVMsInBatch(ctx, meta, update, batchSize)
errs := make([]error, 0)
for err := range errChan {
if err != nil {
errs = append(errs, err)
}
}
err = grp.Wait()
err = utilerrors.NewAggregate(errs)
if err != nil {
klog.ErrorS(err, "Failed to update VMs for VMSS", logFields...)
return err
Expand Down Expand Up @@ -2359,3 +2345,90 @@ func (ss *ScaleSet) VMSSBatchSize(ctx context.Context, vmssName string) (int, er
klog.V(2).InfoS("Fetch VMSS batch size", "vmss", vmssName, "size", batchSize)
return batchSize, nil
}

func (ss *ScaleSet) UpdateVMSSVMsInBatch(ctx context.Context, meta vmssMetaInfo, update map[string]armcompute.VirtualMachineScaleSetVM, batchSize int) <-chan error {
logger := klog.FromContext(ctx)
patchVMFn := func(ctx context.Context, instanceID string, vm *armcompute.VirtualMachineScaleSetVM) (*runtime.Poller[armcompute.VirtualMachineScaleSetVMsClientUpdateResponse], error) {
return ss.ComputeClientFactory.GetVirtualMachineScaleSetVMClient().BeginUpdate(ctx, meta.resourceGroup, meta.vmssName, instanceID, *vm, &armcompute.VirtualMachineScaleSetVMsClientBeginUpdateOptions{
IfMatch: vm.Etag,
})
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errChan := make(chan error, len(update))
pollerChannel := make(chan *runtime.Poller[armcompute.VirtualMachineScaleSetVMsClientUpdateResponse], len(update))
var pollerGroup sync.WaitGroup
pollerGroup.Add(1)
go func() {
defer pollerGroup.Done()
for {
select {
case poller, ok := <-pollerChannel:
if !ok {
// pollerChannel is closed
return
}
if poller == nil {
continue
}
pollerGroup.Add(1)
go func() {
defer pollerGroup.Done()
_, err := poller.PollUntilDone(ctx, &runtime.PollUntilDoneOptions{
Frequency: 10 * time.Second,
})
if err != nil {
logger.Error(err, "Failed to update VMs for VMSS with new vm config")
errChan <- err
}
}()
case <-ctx.Done():
return
}
}
}()
// logger.Info("Begin to update VMs for VMSS with new vm config")
if batchSize > 1 {
concurrentLock := make(chan struct{}, batchSize)
var requestGroup sync.WaitGroup
for instanceID, vm := range update {
instanceID := instanceID
vm := vm
concurrentLock <- struct{}{}
requestGroup.Add(1)
go func() {
defer func() {
requestGroup.Done()
<-concurrentLock
}()
poller, rerr := patchVMFn(ctx, instanceID, &vm)
if rerr != nil {
errChan <- rerr
return
}
if poller != nil {
pollerChannel <- poller
}
}()
}
requestGroup.Wait()
close(concurrentLock)
} else {
for instanceID, vm := range update {
instanceID := instanceID
vm := vm
poller, rerr := patchVMFn(ctx, instanceID, &vm)
if rerr != nil {
errChan <- rerr
continue
}
if poller != nil {
pollerChannel <- poller
}
}
}
close(pollerChannel)
pollerGroup.Wait()
close(errChan)
return errChan
}
Loading

0 comments on commit 75685bc

Please sign in to comment.