Add a worker pool to the process function

This commit is contained in:
Sameer Rahmani 2022-07-20 11:56:51 +01:00
parent 2f15bfdfd3
commit 3143efa5ce
4 changed files with 108 additions and 105 deletions

View File

@ -28,10 +28,11 @@ var listCmd = &cobra.Command{
Long: `Lists all the TODOs and annotations in the project`, Long: `Lists all the TODOs and annotations in the project`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
ch := make(chan string, 10) ch := make(chan string, 10)
output := make(chan string, state.NumberOfWorkers*10)
state.WaitGroup.Add(2) state.WaitGroup.Add(2)
go core.FindAllFiles(state, ch) go core.FindAllFiles(state, ch)
go core.ProcessFiles(state, ch) go core.ProcessFiles(state, ch, output)
go state.LogErrors() go state.LogErrors()
state.WaitGroup.Wait() state.WaitGroup.Wait()

View File

@ -28,6 +28,8 @@ import (
"time" "time"
"strings" "strings"
log "github.com/sirupsen/logrus"
) )
type Location struct { type Location struct {
@ -86,75 +88,6 @@ func (b *RawCommentBlock) empty() bool {
return len(b.Lines) == 0 return len(b.Lines) == 0
} }
func FindCommentBlocks(s *State, file string, output chan FileRawBlock, w *sync.WaitGroup) {
fullPath := path.Join(s.ProjectRoot, file)
readFile, err := os.Open(fullPath)
if err != nil {
s.ErrChannel <- err
close(output)
w.Done()
return
}
ext := filepath.Ext(file)
lang := s.GetLangForExt(ext)
if lang == nil {
close(output)
w.Done()
// log.Warnf("don't know about the language for extension '%s'", ext)
return
}
scanner := bufio.NewScanner(readFile)
scanner.Split(bufio.ScanLines)
curFile := File{
Path: file,
Blocks: []RawCommentBlock{},
Lines: []string{},
}
var line = 1
block := RawCommentBlock{0, []string{}, &curFile}
for scanner.Scan() {
rawText := scanner.Text()
curFile.Lines = append(curFile.Lines, rawText)
txt := strings.TrimSpace(rawText)
if strings.HasPrefix(txt, *lang.CommentBlockMarker) {
if block.empty() {
block.LineNo = line
}
block.append(txt)
} else {
if !block.empty() {
curFile.Blocks = append(curFile.Blocks, block)
output <- FileRawBlock{block, lang}
}
block = RawCommentBlock{0, []string{}, &curFile}
}
line++
}
if err = readFile.Close(); err != nil {
close(output)
w.Done()
s.ErrChannel <- err
return
}
w.Done()
close(output)
}
func getBlockFirstLine(lang *Lang, block RawCommentBlock) *string { func getBlockFirstLine(lang *Lang, block RawCommentBlock) *string {
txt := strings.TrimLeft(block.Lines[0], *lang.CommentBlockMarker) txt := strings.TrimLeft(block.Lines[0], *lang.CommentBlockMarker)
txt = strings.TrimSpace(txt) txt = strings.TrimSpace(txt)
@ -183,9 +116,6 @@ func JoinComments(state *State, commentMarker string, b RawCommentBlock) *string
} }
func processCommentBlocks(s *State, file chan FileRawBlock, output chan Block, w *sync.WaitGroup) { func processCommentBlocks(s *State, file chan FileRawBlock, output chan Block, w *sync.WaitGroup) {
defer close(output)
defer w.Done()
// To keep track of opening and closing tags // To keep track of opening and closing tags
var stack CommentStack var stack CommentStack
@ -211,12 +141,12 @@ func processCommentBlocks(s *State, file chan FileRawBlock, output chan Block, w
lastOpenBlock := stack.Pop() lastOpenBlock := stack.Pop()
if lastOpenBlock == nil { if lastOpenBlock == nil {
// e := fmt.Errorf( e := fmt.Errorf(
// "closing comment with no Opening at '%s:%d'", "closing comment with no Opening at '%s:%d'",
// block.File.Path, block.LineNo, block.File.Path, block.LineNo,
// ) )
// s.ErrChannel <- e s.ErrChannel <- e
return continue
} }
if type_ == &lastOpenBlock.BlockType { if type_ == &lastOpenBlock.BlockType {
@ -246,34 +176,31 @@ func processCommentBlocks(s *State, file chan FileRawBlock, output chan Block, w
stack.Push(&processedBlock) stack.Push(&processedBlock)
output <- processedBlock output <- processedBlock
} }
log.Debug("Terminating the block processor")
close(output)
w.Done()
} }
func PrintResult(state *State, res chan Block, w *sync.WaitGroup) { func formatter(state *State, input chan Block, output chan string, w *sync.WaitGroup) {
for b := range res { for b := range input {
tags := strings.Join(b.Tags, ",") tags := strings.Join(b.Tags, ",")
fmt.Printf("* %s at %s:%d | Tags: %s\n%s\n%s\n\n", output <- fmt.Sprintf("* %s at %s:%d | Tags: %s\n%s\n%s\n\n",
b.BlockType, *b.FilePath, b.Loc.Start, b.BlockType, *b.FilePath, b.Loc.Start,
tags, tags,
b.Desc, b.Desc,
b.WrappedCode) b.WrappedCode)
} }
log.Debug("Terminating the formatter")
w.Done() w.Done()
} }
func ProcessFiles(state *State, files chan string) error { func ProcessFiles(state *State, files chan string, output chan string) {
var workers sync.WaitGroup var workers sync.WaitGroup
for file := range files { for i := 0; i < state.NumberOfWorkers; i++ {
fileBlocks := make(chan FileRawBlock, 10) workers.Add(1)
processed := make(chan Block, 100) go Worker(state, files, output, i, &workers)
if file == "utils/bazel/zlib.bzl" {
println("file: ", file)
}
workers.Add(3)
go FindCommentBlocks(state, file, fileBlocks, &workers)
go processCommentBlocks(state, fileBlocks, processed, &workers)
go PrintResult(state, processed, &workers)
} }
controller := make(chan bool) controller := make(chan bool)
@ -284,11 +211,86 @@ func ProcessFiles(state *State, files chan string) error {
select { select {
case <-controller: case <-controller:
println("Wait done") log.Debug("Workers are terminated correctly.")
case <-time.After(time.Second * 20): case <-time.After(time.Second * 20):
println("timeout") log.Fatal(`Workers timout! This is a bug!
This is probably due to the buffer size difference between
formatter and the printer. Please file a bug.`)
} }
close(output)
state.WaitGroup.Done() state.WaitGroup.Done()
return nil }
func Worker(state *State, input chan string, output chan string, workerNumber int, w *sync.WaitGroup) {
log.Debugf("Spawning worker '%d'", workerNumber)
fileBlocks := make(chan FileRawBlock, state.NumberOfWorkers)
processed := make(chan Block, state.NumberOfWorkers)
w.Add(2)
go processCommentBlocks(state, fileBlocks, processed, w)
go formatter(state, processed, output, w)
for file := range input {
fullPath := path.Join(state.ProjectRoot, file)
readFile, err := os.Open(fullPath)
if err != nil {
state.ErrChannel <- err
continue
}
ext := filepath.Ext(file)
lang := state.GetLangForExt(ext)
if lang == nil {
err := fmt.Errorf("don't know about the language for extension '%s'", ext)
state.ErrChannel <- err
continue
}
scanner := bufio.NewScanner(readFile)
scanner.Split(bufio.ScanLines)
curFile := File{
Path: file,
Blocks: []RawCommentBlock{},
Lines: []string{},
}
var line = 1
block := RawCommentBlock{0, []string{}, &curFile}
for scanner.Scan() {
rawText := scanner.Text()
curFile.Lines = append(curFile.Lines, rawText)
txt := strings.TrimSpace(rawText)
if strings.HasPrefix(txt, *lang.CommentBlockMarker) {
if block.empty() {
block.LineNo = line
}
block.append(txt)
} else {
if !block.empty() {
curFile.Blocks = append(curFile.Blocks, block)
fileBlocks <- FileRawBlock{block, lang}
}
block = RawCommentBlock{0, []string{}, &curFile}
}
line++
}
if err = readFile.Close(); err != nil {
state.ErrChannel <- err
continue
}
}
close(fileBlocks)
log.Debugf("Worker '%d' terminated", workerNumber)
w.Done()
} }

View File

@ -28,8 +28,6 @@ func FindAllFiles(state *State, files chan string) {
if state == nil { if state == nil {
panic("state is nil") panic("state is nil")
} }
defer state.WaitGroup.Done()
defer close(files)
repo := state.Repo repo := state.Repo
@ -42,7 +40,6 @@ func FindAllFiles(state *State, files chan string) {
tree, err := commit.Tree() tree, err := commit.Tree()
cobra.CheckErr(err) cobra.CheckErr(err)
mm := 0
filesiter := tree.Files() filesiter := tree.Files()
for { for {
f, err := filesiter.Next() f, err := filesiter.Next()
@ -66,13 +63,13 @@ func FindAllFiles(state *State, files chan string) {
} }
if !isbin && f.Mode.IsFile() { if !isbin && f.Mode.IsFile() {
mm++
files <- f.Name files <- f.Name
} }
} }
cobra.CheckErr(err) cobra.CheckErr(err)
println("closing files", mm) println("closing files")
state.WaitGroup.Done()
close(files)
} }

View File

@ -41,6 +41,7 @@ type State struct {
DB *DB DB *DB
WaitGroup sync.WaitGroup WaitGroup sync.WaitGroup
ErrChannel chan error ErrChannel chan error
NumberOfWorkers int
} }
func CreateState(projectRoot string, debug bool) (*State, error) { func CreateState(projectRoot string, debug bool) (*State, error) {
@ -80,7 +81,9 @@ func CreateState(projectRoot string, debug bool) (*State, error) {
state.FirstLineContent = regexp.MustCompile(`[A-Z]+:\s+(?:\[[^]]+\])*\s?(?P<text>.*)`) state.FirstLineContent = regexp.MustCompile(`[A-Z]+:\s+(?:\[[^]]+\])*\s?(?P<text>.*)`)
state.EnabledLangs = ExtsToLang state.EnabledLangs = ExtsToLang
state.ErrChannel = make(chan error, 10000) state.ErrChannel = make(chan error, 10)
state.NumberOfWorkers = 2
return &state, nil return &state, nil
} }