Unreliable Termination of Worker Routines Producing Jobs

206 Views Asked by At

In an effort to learn more about Golang, I am attempting to fetch match data from an online 5v5 video game.

Based on a given player, I'll request his matchlist and based on the gameIDs within this matchlist I am requesting the metadata for each of these individual matches. The same should be done for all participants in these matches.

In worker-paradigm-terms: working on 1 Job can create up to 9 additional Jobs.

In doing so, my way of parallelizing these efforts invoking worker goroutines seem to be unreliable in the aspect of telling workers when to stop.

For this, I am using a termination criteria and have one goroutine check it regularly. If the criterion (e.g. 100 matches, or 10 players to fetch all matches from) has been met, a cancellation signal should be forwarded, telling the workers to stop.

The issue is that with the current implementation the stoppage is very unreliable. Usually, I end up with having more matches or players than defined. On other occassions the worker routines will simply sleep and never finish.

How can I achieve a reliable way of stopping all goroutines when the termination criteria has been met without overshooting?


"QueuePlayers" stores all received participants into a datastructure, tells other goroutines which player to crawl games from and regularly checks whether the Termination Criteria has been met.

"Crawl Players" fetches matchlists of a given player, fetches all of its matches and forwards the participants of each match "QueuePlayers". After having worked on a player, signalizes to that it's ready to take the next player to work on.

func (c *Crawler) Start() {
    playerChan := make(chan string, c.concurrency)
    participants := make(chan []string, c.concurrency)
    ready := make(chan struct{}{}, c.concurrency)
    defer close(ready)
    defer close(playerChan)
    defer close(participants)
    var wg sync.WaitGroup

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    sp, err := c.GetPlayerByName(c.startPlayer)
    if err != nil {
        log.Infof("Erronous Start Player given!")
        return
    }

    // spawn worker and do work
    for i := 0; i < c.concurrency; i++ {
        go c.CrawlPlayer(ctx, i+1, playerChan, ready, participants, &wg)
        wg.Add(1)
    }

    // Store Participants and queue next Players to crawl Matches from.
    // Keep track of the number of matches and players fetched and terminate everything
    // when the termination criterion has been met
    go c.QueuePlayers(ctx, cancel, ready, participants, playerChan, &wg)

    c.store.AddToQueue(sp.Puuid)
    ready <- struct{}{}

    wg.Wait()
    log.Infof("Finished")
}




 func (c *Crawler) QueuePlayers(ctx context.Context, cancel func(), ready <-chan Void, participants chan []string, player chan<- string, wg *sync.WaitGroup) {
        for {
            select {
            case p := <-participants:
                for _, player := range p {
                    if c.store.IsPlayerKnown(player) {
                        log.Infof("Player %s already known", player)
                        continue
                    }
                    c.store.AddToQueue(player)
                    log.Infof("New Player %s pushed into Queue", player)
                }
            case <-ready:
                // Termination Crteria
                if !(c.store.NumMatches() >= c.totalNumberOfMatches || c.store.NumPlayers() >= c.TotalNumberOfPlayers) {
                    // In case the other workers are idling, specifically at the very beginning where only one player can be worked on
                    for i := 0; i < (c.concurrency - len(c.playerChan)); i++ {
                        next, err := c.store.NextPlayer()
                        if err != nil {
                            log.Error(err)
                            continue
                        }
                        player <- next
                        log.Infof("Next player in line: %s", next)
                    }
                } else {
                    cancel()
                    return
                }
            }
        }
    }

    
func (c *Crawler) CrawlPlayer(ctx context.Context, workerID int, playerChan <-chan string, ready chan<- Void, participants chan<- []string, wg *sync.WaitGroup) {
    log.Printf("[WorkerID:%v]:Goroutine started", workerID)
OUTER:
    for {
        select {
        case <-ctx.Done():
            log.Printf("[WorkerID:%v]: Goroutine finished", workerID)
            wg.Done()
            return
        case player := <-playerChan:
            ml, err := c.GetMatchList(player)
            if err != nil {
                log.Infof("Error occured for Player", player)
                continue
            }
            // 2. Process each match
        INNER:
            for _, m := range *ml {
                // Termination Criteria
                if !(c.store.NumMatches() >= c.totalNumberOfMatches || c.store.NumPlayers() >= c.TotalNumberOfPlayers)  {
                    match, err := c.GetMatch(m)
                    if err != nil {
                        log.Errorf("Error fetching match %v", m)
                        continue INNER
                    }
                    // Handle Match
                    c.dbm.InsertMatch(*match)
                    c.store.ConfirmMatch(match.MetaData.MatchID)
                    participants <- match.MetaData.Participants
                } else {
                    ready <- struct{}{}
                    continue OUTER
                }
            }
            // Finish up current player and get next player zu process
            summoner, err := c.GetPlayerByID(player)
            c.dbm.InsertPlayer(*summoner)
            c.store.ConfirmPlayer(player)
            ready <- struct{}{}
        }
    }
}

In order to regularly check the Termination criterion, I have attempted set up a thread-safe struct that entails a map and added it as a member of the Crawler struct.

type CacheType struct {
    mux   sync.RWMutex
    Cache map[string]struct{}
}

type Store struct {
    Match       *CacheType
    PlayerKnown *CacheType
    mux         sync.Mutex
    PlayerQueue *list.List
}


// AddToQueue inserts a player at the back of the queue of players to process
func (s *Store) AddToQueue(player string) {
    s.mux.Lock()
    defer s.mux.Unlock()
    s.PlayerQueue.PushBack(player)
}


func (s *Store) NumMatches() int {
    s.mux.Lock()
    defer s.mux.Unlock()
    return len(s.Match.Cache)
}

// NumPlayers returns the players of Matches crawled
func (s *Store) NumPlayers() int {
    s.mux.Lock()
    defer s.mux.Unlock()
    return len(s.PlayerKnown.Cache)
}

// ConfirmMatch inserts GameId into Sink
func (s *Store) ConfirmMatch(id string) {
    s.mux.Lock()
    defer s.mux.Unlock()
    s.Match.Cache[id] = struct{}{}
}

// ConfirmPlayer inserts IDs of players into SInk
func (s *Store) ConfirmPlayer(id string) {
    s.mux.Lock()
    defer s.mux.Unlock()
    s.PlayerKnown.Cache[id] = struct{}{}
}

// MatchExists checks if a match had been inserted to the sink
func (s *Store) MatchExists(id string) bool {
    s.mux.Lock()
    defer s.mux.Unlock()
    _, ok := s.Match.Cache[id]
    return ok
}

// IsPlayerKnown checks if a match had been inserted to the sink
func (s *Store) IsPlayerKnown(id string) bool {
    s.mux.Lock()
    defer s.mux.Unlock()
    _, ok := s.PlayerKnown.Cache[id]
    return ok
}
0

There are 0 best solutions below