Skip to content

Commit

Permalink
FS-1252; Streamline fleet-scheduler inventory task threading by remov…
Browse files Browse the repository at this point in the history
…ing threading (#36)
  • Loading branch information
jakeschuurmans committed Mar 11, 2024
1 parent cc7e077 commit c58b43d
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 181 deletions.
6 changes: 2 additions & 4 deletions cmd/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import (
)

var (
pageSize int
inFlightPages int
pageSize int
)

var cmdInventory = &cobra.Command{
Expand All @@ -31,7 +30,6 @@ var cmdInventory = &cobra.Command{

func init() {
rootCmd.PersistentFlags().IntVar(&pageSize, "page-size", 4, "Define how many servers to query per request")
rootCmd.PersistentFlags().IntVar(&inFlightPages, "inflight-pages", 1, "Define how many server pages to queue up before waiting for the previous to finish creating the condition")
rootCmd.AddCommand(cmdInventory)
}

Expand Down Expand Up @@ -66,7 +64,7 @@ func inventory(ctx context.Context) error {
return err
}

err = newClient.CreateConditionInventoryForAllServers(pageSize, inFlightPages)
err = newClient.CreateConditionInventoryForAllServers(pageSize)
if err != nil {
return err
}
Expand Down
82 changes: 0 additions & 82 deletions cmd/test.go

This file was deleted.

71 changes: 0 additions & 71 deletions internal/client/fleetdb.go
Original file line number Diff line number Diff line change
@@ -1,81 +1,10 @@
package client

import (
"time"

"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"

fleetdbapi "github.com/metal-toolbox/fleetdb/pkg/api/v1"
fleetDBRivets "github.com/metal-toolbox/rivets/serverservice"
)

func (c *Client) gatherServers(pageSize int, serverCh chan *fleetdbapi.Server, concLimiter *semaphore.Weighted) {
// signal to receiver that we are done
defer close(serverCh)

// First page, use the response from it to figure out how many pages we have to loop through
// Dont change page size
servers, response, err := c.getServerPage(pageSize, 1)
if err != nil {
c.log.WithFields(logrus.Fields{
"pageSize": pageSize,
"pageIndex": 1,
}).Logger.Errorf("Failed to get list of servers: %s", err.Error())
return
}
totalPages := response.TotalPages

if !concLimiter.TryAcquire(int64(response.PageSize)) {
c.log.Error("Failed to acquire semaphore! Going to attempt to continue.")
}

// send first page of servers to the channel
for i := range servers {
serverCh <- &servers[i]
}

c.log.WithFields(logrus.Fields{
"index": 1,
"iterations": totalPages,
"got": len(servers),
}).Trace("Got server page")

// Start the second page, and loop through rest the pages
for i := 2; i <= totalPages; i++ {
servers, response, err = c.getServerPage(pageSize, i)
if err != nil {
c.log.WithFields(logrus.Fields{
"pageSize": pageSize,
"pageIndex": i,
}).Logger.Errorf("Failed to get page of servers, attempting to continue: %s", err.Error())

continue
}

c.log.WithFields(logrus.Fields{
"index": i,
"iterations": totalPages,
"got": len(servers),
}).Trace("Got server page")

// throttle this loop
// Doing a spinlock to prevent a permanent lock if the ctx gets canceled
for !concLimiter.TryAcquire(int64(response.PageSize)) && c.ctx.Err() == nil {
time.Sleep(time.Second)
}

if c.ctx.Err() != nil {
c.log.Warn("Context canceled, stopping server gathering")
return
}

for i := range servers {
serverCh <- &servers[i]
}
}
}

func (c *Client) getServerPage(pageSize, page int) ([]fleetdbapi.Server, *fleetdbapi.ServerResponse, error) {
params := &fleetdbapi.ServerListParams{
FacilityCode: c.cfg.FacilityCode,
Expand Down
67 changes: 43 additions & 24 deletions internal/client/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,60 @@ package client

import (
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"

// "github.com/sirupsen/logrus"
fleetdbapi "github.com/metal-toolbox/fleetdb/pkg/api/v1"
)

func (c *Client) CreateConditionInventoryForAllServers(pageSize, inFlightPages int) error {
// Start thread to start collecting servers
serverCh, concLimiter, err := c.GatherServersNonBlocking(pageSize, inFlightPages)
func (c *Client) CreateConditionInventoryForAllServers(pageSize int) error {
// First page, use the response from it to figure out how many pages we have to loop through
// Dont change page size
servers, response, err := c.getServerPage(pageSize, 1)
if err != nil {
c.log.WithFields(logrus.Fields{
"pageSize": pageSize,
"pageIndex": 1,
}).Logger.Errorf("Failed to get list of servers: %s", err.Error())
return err
}
totalPages := response.TotalPages

// Loop through servers and create conditions
for server := range serverCh {
err := c.CreateConditionInventory(server.UUID)
// send first page of servers to the channel
for i := range servers {
err = c.CreateConditionInventory(servers[i].UUID)
if err != nil {
c.log.WithFields(logrus.Fields{
"server": server.UUID,
}).Logger.Error("Failed to create condition")
return err
}

concLimiter.Release(1)
}

return nil
}
c.log.WithFields(logrus.Fields{
"index": 1,
"iterations": totalPages,
"got": len(servers),
}).Trace("Got server page")

// Start the second page, and loop through rest the pages
for i := 2; i <= totalPages; i++ {
servers, _, err = c.getServerPage(pageSize, i)
if err != nil {
c.log.WithFields(logrus.Fields{
"pageSize": pageSize,
"pageIndex": i,
}).Logger.Errorf("Failed to get page of servers, attempting to continue: %s", err.Error())

func (c *Client) GatherServersNonBlocking(pageSize, inFlightPages int) (chan *fleetdbapi.Server, *semaphore.Weighted, error) {
serverCh := make(chan *fleetdbapi.Server)
concLimiter := semaphore.NewWeighted(int64(inFlightPages * pageSize))
continue
}

go func() {
c.gatherServers(pageSize, serverCh, concLimiter)
}()
c.log.WithFields(logrus.Fields{
"index": i,
"iterations": totalPages,
"got": len(servers),
}).Trace("Got server page")

for i := range servers {
err = c.CreateConditionInventory(servers[i].UUID)
if err != nil {
return err
}
}
}

return serverCh, concLimiter, nil
return nil
}

0 comments on commit c58b43d

Please sign in to comment.