diff --git a/cmd/list.go b/cmd/list.go index 9c10239..5f6a2da 100644 --- a/cmd/list.go +++ b/cmd/list.go @@ -28,10 +28,11 @@ var listCmd = &cobra.Command{ Long: `Lists all the TODOs and annotations in the project`, Run: func(cmd *cobra.Command, args []string) { ch := make(chan string, 10) + output := make(chan string, state.NumberOfWorkers*10) state.WaitGroup.Add(2) go core.FindAllFiles(state, ch) - go core.ProcessFiles(state, ch) + go core.ProcessFiles(state, ch, output) go state.LogErrors() state.WaitGroup.Wait() diff --git a/pkg/core/blocks.go b/pkg/core/blocks.go index 4e202a1..3c60808 100644 --- a/pkg/core/blocks.go +++ b/pkg/core/blocks.go @@ -28,6 +28,8 @@ import ( "time" "strings" + + log "github.com/sirupsen/logrus" ) type Location struct { @@ -86,75 +88,6 @@ func (b *RawCommentBlock) empty() bool { return len(b.Lines) == 0 } -func FindCommentBlocks(s *State, file string, output chan FileRawBlock, w *sync.WaitGroup) { - fullPath := path.Join(s.ProjectRoot, file) - readFile, err := os.Open(fullPath) - - if err != nil { - s.ErrChannel <- err - close(output) - w.Done() - - return - } - - ext := filepath.Ext(file) - lang := s.GetLangForExt(ext) - - if lang == nil { - close(output) - w.Done() - // log.Warnf("don't know about the language for extension '%s'", ext) - return - } - - scanner := bufio.NewScanner(readFile) - - scanner.Split(bufio.ScanLines) - - curFile := File{ - Path: file, - Blocks: []RawCommentBlock{}, - Lines: []string{}, - } - - var line = 1 - block := RawCommentBlock{0, []string{}, &curFile} - - for scanner.Scan() { - rawText := scanner.Text() - curFile.Lines = append(curFile.Lines, rawText) - - txt := strings.TrimSpace(rawText) - - if strings.HasPrefix(txt, *lang.CommentBlockMarker) { - if block.empty() { - block.LineNo = line - } - block.append(txt) - } else { - if !block.empty() { - curFile.Blocks = append(curFile.Blocks, block) - output <- FileRawBlock{block, lang} - } - - block = RawCommentBlock{0, []string{}, &curFile} - } - line++ - } - - if err = readFile.Close(); err != nil { - - close(output) - w.Done() - s.ErrChannel <- err - return - } - - w.Done() - close(output) -} - func getBlockFirstLine(lang *Lang, block RawCommentBlock) *string { txt := strings.TrimLeft(block.Lines[0], *lang.CommentBlockMarker) txt = strings.TrimSpace(txt) @@ -183,9 +116,6 @@ func JoinComments(state *State, commentMarker string, b RawCommentBlock) *string } func processCommentBlocks(s *State, file chan FileRawBlock, output chan Block, w *sync.WaitGroup) { - defer close(output) - defer w.Done() - // To keep track of opening and closing tags var stack CommentStack @@ -211,12 +141,12 @@ func processCommentBlocks(s *State, file chan FileRawBlock, output chan Block, w lastOpenBlock := stack.Pop() if lastOpenBlock == nil { - // e := fmt.Errorf( - // "closing comment with no Opening at '%s:%d'", - // block.File.Path, block.LineNo, - // ) - // s.ErrChannel <- e - return + e := fmt.Errorf( + "closing comment with no Opening at '%s:%d'", + block.File.Path, block.LineNo, + ) + s.ErrChannel <- e + continue } if type_ == &lastOpenBlock.BlockType { @@ -246,34 +176,31 @@ func processCommentBlocks(s *State, file chan FileRawBlock, output chan Block, w stack.Push(&processedBlock) output <- processedBlock } + log.Debug("Terminating the block processor") + close(output) + w.Done() + } -func PrintResult(state *State, res chan Block, w *sync.WaitGroup) { - for b := range res { +func formatter(state *State, input chan Block, output chan string, w *sync.WaitGroup) { + for b := range input { tags := strings.Join(b.Tags, ",") - fmt.Printf("* %s at %s:%d | Tags: %s\n%s\n%s\n\n", + output <- fmt.Sprintf("* %s at %s:%d | Tags: %s\n%s\n%s\n\n", b.BlockType, *b.FilePath, b.Loc.Start, tags, b.Desc, b.WrappedCode) } + log.Debug("Terminating the formatter") w.Done() } -func ProcessFiles(state *State, files chan string) error { +func ProcessFiles(state *State, files chan string, output chan string) { var workers sync.WaitGroup - for file := range files { - fileBlocks := make(chan FileRawBlock, 10) - processed := make(chan Block, 100) - - if file == "utils/bazel/zlib.bzl" { - println("file: ", file) - } - workers.Add(3) - go FindCommentBlocks(state, file, fileBlocks, &workers) - go processCommentBlocks(state, fileBlocks, processed, &workers) - go PrintResult(state, processed, &workers) + for i := 0; i < state.NumberOfWorkers; i++ { + workers.Add(1) + go Worker(state, files, output, i, &workers) } controller := make(chan bool) @@ -284,11 +211,86 @@ func ProcessFiles(state *State, files chan string) error { select { case <-controller: - println("Wait done") + log.Debug("Workers are terminated correctly.") case <-time.After(time.Second * 20): - println("timeout") + log.Fatal(`Workers timout! This is a bug! +This is probably due to the buffer size difference between +formatter and the printer. Please file a bug.`) } + close(output) state.WaitGroup.Done() - return nil - +} + +func Worker(state *State, input chan string, output chan string, workerNumber int, w *sync.WaitGroup) { + log.Debugf("Spawning worker '%d'", workerNumber) + + fileBlocks := make(chan FileRawBlock, state.NumberOfWorkers) + processed := make(chan Block, state.NumberOfWorkers) + + w.Add(2) + go processCommentBlocks(state, fileBlocks, processed, w) + go formatter(state, processed, output, w) + + for file := range input { + fullPath := path.Join(state.ProjectRoot, file) + readFile, err := os.Open(fullPath) + + if err != nil { + state.ErrChannel <- err + continue + } + + ext := filepath.Ext(file) + lang := state.GetLangForExt(ext) + + if lang == nil { + err := fmt.Errorf("don't know about the language for extension '%s'", ext) + state.ErrChannel <- err + continue + } + + scanner := bufio.NewScanner(readFile) + scanner.Split(bufio.ScanLines) + + curFile := File{ + Path: file, + Blocks: []RawCommentBlock{}, + Lines: []string{}, + } + + var line = 1 + block := RawCommentBlock{0, []string{}, &curFile} + + for scanner.Scan() { + rawText := scanner.Text() + curFile.Lines = append(curFile.Lines, rawText) + + txt := strings.TrimSpace(rawText) + + if strings.HasPrefix(txt, *lang.CommentBlockMarker) { + if block.empty() { + block.LineNo = line + } + block.append(txt) + } else { + if !block.empty() { + curFile.Blocks = append(curFile.Blocks, block) + fileBlocks <- FileRawBlock{block, lang} + } + + block = RawCommentBlock{0, []string{}, &curFile} + } + line++ + } + + if err = readFile.Close(); err != nil { + state.ErrChannel <- err + continue + } + + } + close(fileBlocks) + log.Debugf("Worker '%d' terminated", workerNumber) + + w.Done() } diff --git a/pkg/core/core.go b/pkg/core/core.go index 998fdb2..33f92bc 100644 --- a/pkg/core/core.go +++ b/pkg/core/core.go @@ -28,8 +28,6 @@ func FindAllFiles(state *State, files chan string) { if state == nil { panic("state is nil") } - defer state.WaitGroup.Done() - defer close(files) repo := state.Repo @@ -42,7 +40,6 @@ func FindAllFiles(state *State, files chan string) { tree, err := commit.Tree() cobra.CheckErr(err) - mm := 0 filesiter := tree.Files() for { f, err := filesiter.Next() @@ -66,13 +63,13 @@ func FindAllFiles(state *State, files chan string) { } if !isbin && f.Mode.IsFile() { - mm++ files <- f.Name - } } cobra.CheckErr(err) - println("closing files", mm) + println("closing files") + state.WaitGroup.Done() + close(files) } diff --git a/pkg/core/state.go b/pkg/core/state.go index aa43574..e1d1b54 100644 --- a/pkg/core/state.go +++ b/pkg/core/state.go @@ -41,6 +41,7 @@ type State struct { DB *DB WaitGroup sync.WaitGroup ErrChannel chan error + NumberOfWorkers int } func CreateState(projectRoot string, debug bool) (*State, error) { @@ -80,7 +81,9 @@ func CreateState(projectRoot string, debug bool) (*State, error) { state.FirstLineContent = regexp.MustCompile(`[A-Z]+:\s+(?:\[[^]]+\])*\s?(?P.*)`) state.EnabledLangs = ExtsToLang - state.ErrChannel = make(chan error, 10000) + state.ErrChannel = make(chan error, 10) + state.NumberOfWorkers = 2 + return &state, nil }