In an effort to learn more about Golang, I am attempting to fetch match data from an online 5v5 video game.
Based on a given player, I'll request his matchlist and based on the gameIDs within this matchlist I am requesting the metadata for each of these individual matches. The same should be done for all participants in these matches.
In worker-paradigm-terms: working on 1 Job can create up to 9 additional Jobs.
In doing so, my way of parallelizing these efforts invoking worker goroutines seem to be unreliable in the aspect of telling workers when to stop.
For this, I am using a termination criteria and have one goroutine check it regularly. If the criterion (e.g. 100 matches, or 10 players to fetch all matches from) has been met, a cancellation signal should be forwarded, telling the workers to stop.
The issue is that with the current implementation the stoppage is very unreliable. Usually, I end up with having more matches or players than defined. On other occassions the worker routines will simply sleep and never finish.
How can I achieve a reliable way of stopping all goroutines when the termination criteria has been met without overshooting?
"QueuePlayers" stores all received participants into a datastructure, tells other goroutines which player to crawl games from and regularly checks whether the Termination Criteria has been met.
"Crawl Players" fetches matchlists of a given player, fetches all of its matches and forwards the participants of each match "QueuePlayers". After having worked on a player, signalizes to that it's ready to take the next player to work on.
func (c *Crawler) Start() {
playerChan := make(chan string, c.concurrency)
participants := make(chan []string, c.concurrency)
ready := make(chan struct{}{}, c.concurrency)
defer close(ready)
defer close(playerChan)
defer close(participants)
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sp, err := c.GetPlayerByName(c.startPlayer)
if err != nil {
log.Infof("Erronous Start Player given!")
return
}
// spawn worker and do work
for i := 0; i < c.concurrency; i++ {
go c.CrawlPlayer(ctx, i+1, playerChan, ready, participants, &wg)
wg.Add(1)
}
// Store Participants and queue next Players to crawl Matches from.
// Keep track of the number of matches and players fetched and terminate everything
// when the termination criterion has been met
go c.QueuePlayers(ctx, cancel, ready, participants, playerChan, &wg)
c.store.AddToQueue(sp.Puuid)
ready <- struct{}{}
wg.Wait()
log.Infof("Finished")
}
func (c *Crawler) QueuePlayers(ctx context.Context, cancel func(), ready <-chan Void, participants chan []string, player chan<- string, wg *sync.WaitGroup) {
for {
select {
case p := <-participants:
for _, player := range p {
if c.store.IsPlayerKnown(player) {
log.Infof("Player %s already known", player)
continue
}
c.store.AddToQueue(player)
log.Infof("New Player %s pushed into Queue", player)
}
case <-ready:
// Termination Crteria
if !(c.store.NumMatches() >= c.totalNumberOfMatches || c.store.NumPlayers() >= c.TotalNumberOfPlayers) {
// In case the other workers are idling, specifically at the very beginning where only one player can be worked on
for i := 0; i < (c.concurrency - len(c.playerChan)); i++ {
next, err := c.store.NextPlayer()
if err != nil {
log.Error(err)
continue
}
player <- next
log.Infof("Next player in line: %s", next)
}
} else {
cancel()
return
}
}
}
}
func (c *Crawler) CrawlPlayer(ctx context.Context, workerID int, playerChan <-chan string, ready chan<- Void, participants chan<- []string, wg *sync.WaitGroup) {
log.Printf("[WorkerID:%v]:Goroutine started", workerID)
OUTER:
for {
select {
case <-ctx.Done():
log.Printf("[WorkerID:%v]: Goroutine finished", workerID)
wg.Done()
return
case player := <-playerChan:
ml, err := c.GetMatchList(player)
if err != nil {
log.Infof("Error occured for Player", player)
continue
}
// 2. Process each match
INNER:
for _, m := range *ml {
// Termination Criteria
if !(c.store.NumMatches() >= c.totalNumberOfMatches || c.store.NumPlayers() >= c.TotalNumberOfPlayers) {
match, err := c.GetMatch(m)
if err != nil {
log.Errorf("Error fetching match %v", m)
continue INNER
}
// Handle Match
c.dbm.InsertMatch(*match)
c.store.ConfirmMatch(match.MetaData.MatchID)
participants <- match.MetaData.Participants
} else {
ready <- struct{}{}
continue OUTER
}
}
// Finish up current player and get next player zu process
summoner, err := c.GetPlayerByID(player)
c.dbm.InsertPlayer(*summoner)
c.store.ConfirmPlayer(player)
ready <- struct{}{}
}
}
}
In order to regularly check the Termination criterion, I have attempted set up a thread-safe struct that entails a map and added it as a member of the Crawler struct.
type CacheType struct {
mux sync.RWMutex
Cache map[string]struct{}
}
type Store struct {
Match *CacheType
PlayerKnown *CacheType
mux sync.Mutex
PlayerQueue *list.List
}
// AddToQueue inserts a player at the back of the queue of players to process
func (s *Store) AddToQueue(player string) {
s.mux.Lock()
defer s.mux.Unlock()
s.PlayerQueue.PushBack(player)
}
func (s *Store) NumMatches() int {
s.mux.Lock()
defer s.mux.Unlock()
return len(s.Match.Cache)
}
// NumPlayers returns the players of Matches crawled
func (s *Store) NumPlayers() int {
s.mux.Lock()
defer s.mux.Unlock()
return len(s.PlayerKnown.Cache)
}
// ConfirmMatch inserts GameId into Sink
func (s *Store) ConfirmMatch(id string) {
s.mux.Lock()
defer s.mux.Unlock()
s.Match.Cache[id] = struct{}{}
}
// ConfirmPlayer inserts IDs of players into SInk
func (s *Store) ConfirmPlayer(id string) {
s.mux.Lock()
defer s.mux.Unlock()
s.PlayerKnown.Cache[id] = struct{}{}
}
// MatchExists checks if a match had been inserted to the sink
func (s *Store) MatchExists(id string) bool {
s.mux.Lock()
defer s.mux.Unlock()
_, ok := s.Match.Cache[id]
return ok
}
// IsPlayerKnown checks if a match had been inserted to the sink
func (s *Store) IsPlayerKnown(id string) bool {
s.mux.Lock()
defer s.mux.Unlock()
_, ok := s.PlayerKnown.Cache[id]
return ok
}