Skip to content

Commit

Permalink
Merge pull request #1489 from damienr74/generic-crawler
Browse files Browse the repository at this point in the history
tooling performance improvements, better structure
  • Loading branch information
monopole authored Sep 3, 2019
2 parents 2677f4c + c2d6f09 commit 143c5dd
Show file tree
Hide file tree
Showing 13 changed files with 1,250 additions and 98 deletions.
170 changes: 165 additions & 5 deletions internal/tools/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@ package crawler
import (
"context"
"fmt"
"log"
"os"
"sync"

_ "github.com/gomodule/redigo/redis"

"sigs.k8s.io/kustomize/internal/tools/doc"
)

var (
logger = log.New(os.Stdout, "Crawler: ", log.LstdFlags|log.LUTC|log.Llongfile)
)

// Crawler forwards documents from source repositories to index and store them
// for searching. Each crawler is responsible for querying it's source of
// information, and forwarding files that have not been seen before or that need
Expand All @@ -19,7 +27,152 @@ type Crawler interface {
// Crawl returns when it is done processing. This method does not take
// ownership of the channel. The channel is write only, and it
// designates where the crawler should forward the documents.
Crawl(ctx context.Context, output chan<- *doc.KustomizationDocument) error
Crawl(ctx context.Context, output chan<- CrawlerDocument) error

// Get the document data given the FilePath, Repo, and Ref/Tag/Branch.
FetchDocument(context.Context, *doc.Document) error
// Write to the document what the created time is.
SetCreated(context.Context, *doc.Document) error

Match(*doc.Document) bool
}

type CrawlerDocument interface {
ID() string
GetDocument() *doc.Document
GetResources() ([]*doc.Document, error)
WasCached() bool
}

type CrawlerSeed []*doc.Document

type IndexFunc func(CrawlerDocument, Crawler) error
type Converter func(*doc.Document) (CrawlerDocument, error)

// Cleaner, more efficient, and more extensible crawler implementation.
// The seed must include the ids of each document in the index.
func CrawlFromSeed(ctx context.Context, seed CrawlerSeed,
crawlers []Crawler, conv Converter, indx IndexFunc) {

seen := make(map[string]struct{})

logIfErr := func(err error) {
if err == nil {
return
}
logger.Println("error: ", err)
}

stack := make(CrawlerSeed, 0)

findMatch := func(d *doc.Document) Crawler {
for _, crawl := range crawlers {
if crawl.Match(d) {
return crawl
}
}

return nil
}

addBranches := func(cdoc CrawlerDocument, match Crawler) {
if _, ok := seen[cdoc.ID()]; ok {
return
}

seen[cdoc.ID()] = struct{}{}
// Insert into index
err := indx(cdoc, match)
logIfErr(err)
if err != nil {
return
}

deps, err := cdoc.GetResources()
logIfErr(err)
if err != nil {
return
}
for _, dep := range deps {
if _, ok := seen[dep.ID()]; ok {
continue
}
stack = append(stack, dep)
}
}

doCrawl := func(docsPtr *CrawlerSeed) {
for len(*docsPtr) > 0 {
back := len(*docsPtr) - 1
next := (*docsPtr)[back]
*docsPtr = (*docsPtr)[:back]

match := findMatch(next)
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", next))
continue
}

err := match.FetchDocument(ctx, next)
logIfErr(err)
// If there was no change or there is an error, we don't have
// to branch out, since the dependencies are already in the
// index, or we cannot find the document.
if err != nil || next.WasCached() {
continue
}

cdoc, err := conv(next)
logIfErr(err)
if err != nil {
continue
}

addBranches(cdoc, match)
}
}
// Exploit seed to update bulk of corpus.
logger.Printf("updating %d documents from seed\n", len(seed))
doCrawl(&seed)
// Traverse any new links added while updating corpus.
logger.Printf("crawling %d new documents found in the seed\n", len(stack))
doCrawl(&stack)

ch := make(chan CrawlerDocument, 1<<10)
wg := sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
for cdoc := range ch {
if _, ok := seen[cdoc.ID()]; ok {
continue
}
match := findMatch(cdoc.GetDocument())
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", cdoc))
continue
}
addBranches(cdoc, match)
}
}()

// Exploration through APIs.
errs := CrawlerRunner(ctx, ch, crawlers)
if errs != nil {
for _, err := range errs {
logIfErr(err)
}
}
close(ch)
logger.Println("Processing the new documents from the crawlers' exploration.")
wg.Wait()
// Handle deps of newly discovered documents.
logger.Printf("crawling the %d new documents from the crawlers' exploration.",
len(stack))
doCrawl(&stack)
}

// CrawlerRunner is a blocking function and only returns once all of the
Expand All @@ -32,21 +185,28 @@ type Crawler interface {
// The return value is an array of errors in which each index represents the
// index of the crawler that emitted the error. Although the errors themselves
// can be nil, the array will always be exactly the size of the crawlers array.
//
// Crawler Runner takes in a seed, which represents the documents stored in an
// index somewhere. The document data is not required to be populated. If there
// are many documents, this is preferable. The order of iteration over the seed
// is not garanteed, but the CrawlerRunner does guarantee that every element
// from the seed will be processed before any other documents from the
// crawlers.
func CrawlerRunner(ctx context.Context,
output chan<- *doc.KustomizationDocument, crawlers []Crawler) []error {
output chan<- CrawlerDocument, crawlers []Crawler) []error {

errs := make([]error, len(crawlers))
wg := sync.WaitGroup{}

for i, crawler := range crawlers {
// Crawler implementations get their own channels to prevent a
// crawler from closing the main output channel.
docs := make(chan *doc.KustomizationDocument)
docs := make(chan CrawlerDocument)
wg.Add(2)

// Forward all of the documents from this crawler's channel to
// the main output channel.
go func(docs <-chan *doc.KustomizationDocument) {
go func(docs <-chan CrawlerDocument) {
defer wg.Done()
for doc := range docs {
output <- doc
Expand All @@ -55,7 +215,7 @@ func CrawlerRunner(ctx context.Context,

// Run this crawler and capture its returned error.
go func(idx int, crawler Crawler,
docs chan<- *doc.KustomizationDocument) {
docs chan<- CrawlerDocument) {

defer func() {
wg.Done()
Expand Down
Loading

0 comments on commit 143c5dd

Please sign in to comment.