// Copyright 2017 Google Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package finder import ( "bufio" "bytes" "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "runtime" "sort" "strings" "sync" "sync/atomic" "time" "android/soong/finder/fs" ) // This file provides a Finder struct that can quickly search for files satisfying // certain criteria. // This Finder gets its speed partially from parallelism and partially from caching. // If a Stat call returns the same result as last time, then it means Finder // can skip the ReadDir call for that dir. // The primary data structure used by the finder is the field Finder.nodes , // which is a tree of nodes of type *pathMap . // Each node represents a directory on disk, along with its stats, subdirectories, // and contained files. // The common use case for the Finder is that the caller creates a Finder and gives // it the same query that was given to it in the previous execution. // In this situation, the major events that take place are: // 1. The Finder begins to load its db // 2. The Finder begins to stat the directories mentioned in its db (using multiple threads) // Calling Stat on each of these directories is generally a large fraction of the total time // 3. The Finder begins to construct a separate tree of nodes in each of its threads // 4. The Finder merges the individual node trees into the main node tree // 5. The Finder may call ReadDir a few times if there are a few directories that are out-of-date // These ReadDir calls might prompt additional Stat calls, etc // 6. The Finder waits for all loading to complete // 7. The Finder searches the cache for files matching the user's query (using multiple threads) // These are the invariants regarding concurrency: // 1. The public methods of Finder are threadsafe. // The public methods are only performance-optimized for one caller at a time, however. // For the moment, multiple concurrent callers shouldn't expect any better performance than // multiple serial callers. // 2. While building the node tree, only one thread may ever access the collection of a // *pathMap at once. // a) The thread that accesses the collection is the thread that discovers the // children (by reading them from the cache or by having received a response to ReadDir). // 1) Consequently, the thread that discovers the children also spawns requests to stat // subdirs. // b) Consequently, while building the node tree, no thread may do a lookup of its // *pathMap via filepath because another thread may be adding children to the // collection of an ancestor node. Additionally, in rare cases, another thread // may be removing children from an ancestor node if the children were only discovered to // be irrelevant after calling ReadDir (which happens if a prune-file was just added). // 3. No query will begin to be serviced until all loading (both reading the db // and scanning the filesystem) is complete. // Tests indicate that it only takes about 10% as long to search the in-memory cache as to // generate it, making this not a huge loss in performance. // 4. The parsing of the db and the initial setup of the pathMap tree must complete before // beginning to call listDirSync (because listDirSync can create new entries in the pathMap) // see cmd/finder.go or finder_test.go for usage examples // Update versionString whenever making a backwards-incompatible change to the cache file format const versionString = "Android finder version 1" // a CacheParams specifies which files and directories the user wishes be scanned and // potentially added to the cache type CacheParams struct { // WorkingDirectory is used as a base for any relative file paths given to the Finder WorkingDirectory string // RootDirs are the root directories used to initiate the search RootDirs []string // ExcludeDirs are directory names that if encountered are removed from the search ExcludeDirs []string // PruneFiles are file names that if encountered prune their entire directory // (including siblings) PruneFiles []string // IncludeFiles are file names to include as matches IncludeFiles []string } // a cacheConfig stores the inputs that determine what should be included in the cache type cacheConfig struct { CacheParams // FilesystemView is a unique identifier telling which parts of which file systems // are readable by the Finder. In practice its value is essentially username@hostname. // FilesystemView is set to ensure that a cache file copied to another host or // found by another user doesn't inadvertently get reused. FilesystemView string } func (p *cacheConfig) Dump() ([]byte, error) { bytes, err := json.Marshal(p) return bytes, err } // a cacheMetadata stores version information about the cache type cacheMetadata struct { // The Version enables the Finder to determine whether it can even parse the file // If the version changes, the entire cache file must be regenerated Version string // The CacheParams enables the Finder to determine whether the parameters match // If the CacheParams change, the Finder can choose how much of the cache file to reuse // (although in practice, the Finder will probably choose to ignore the entire file anyway) Config cacheConfig } type Logger interface { Output(calldepth int, s string) error } // the Finder is the main struct that callers will want to use type Finder struct { // configuration DbPath string numDbLoadingThreads int numSearchingThreads int cacheMetadata cacheMetadata logger Logger filesystem fs.FileSystem // temporary state threadPool *threadPool mutex sync.Mutex fsErrs []fsErr errlock sync.Mutex shutdownWaitgroup sync.WaitGroup // non-temporary state modifiedFlag int32 nodes pathMap } var defaultNumThreads = runtime.NumCPU() * 2 // New creates a new Finder for use func New(cacheParams CacheParams, filesystem fs.FileSystem, logger Logger, dbPath string) (f *Finder, err error) { return newImpl(cacheParams, filesystem, logger, dbPath, defaultNumThreads) } // newImpl is like New but accepts more params func newImpl(cacheParams CacheParams, filesystem fs.FileSystem, logger Logger, dbPath string, numThreads int) (f *Finder, err error) { numDbLoadingThreads := numThreads numSearchingThreads := numThreads metadata := cacheMetadata{ Version: versionString, Config: cacheConfig{ CacheParams: cacheParams, FilesystemView: filesystem.ViewId(), }, } f = &Finder{ numDbLoadingThreads: numDbLoadingThreads, numSearchingThreads: numSearchingThreads, cacheMetadata: metadata, logger: logger, filesystem: filesystem, nodes: *newPathMap("/"), DbPath: dbPath, shutdownWaitgroup: sync.WaitGroup{}, } f.loadFromFilesystem() // check for any filesystem errors err = f.getErr() if err != nil { return nil, err } // confirm that every path mentioned in the CacheConfig exists for _, path := range cacheParams.RootDirs { if !filepath.IsAbs(path) { path = filepath.Join(f.cacheMetadata.Config.WorkingDirectory, path) } node := f.nodes.GetNode(filepath.Clean(path), false) if node == nil || node.ModTime == 0 { return nil, fmt.Errorf("path %v was specified to be included in the cache but does not exist\n", path) } } return f, nil } // FindNamed searches for every cached file func (f *Finder) FindAll() []string { return f.FindAt("/") } // FindNamed searches for every cached file under func (f *Finder) FindAt(rootDir string) []string { filter := func(entries DirEntries) (dirNames []string, fileNames []string) { return entries.DirNames, entries.FileNames } return f.FindMatching(rootDir, filter) } // FindNamed searches for every cached file named func (f *Finder) FindNamed(fileName string) []string { return f.FindNamedAt("/", fileName) } // FindNamedAt searches under for every file named // The reason a caller might use FindNamedAt instead of FindNamed is if they want // to limit their search to a subset of the cache func (f *Finder) FindNamedAt(rootPath string, fileName string) []string { filter := func(entries DirEntries) (dirNames []string, fileNames []string) { matches := []string{} for _, foundName := range entries.FileNames { if foundName == fileName { matches = append(matches, foundName) } } return entries.DirNames, matches } return f.FindMatching(rootPath, filter) } // FindFirstNamed searches for every file named // Whenever it finds a match, it stops search subdirectories func (f *Finder) FindFirstNamed(fileName string) []string { return f.FindFirstNamedAt("/", fileName) } // FindFirstNamedAt searches for every file named // Whenever it finds a match, it stops search subdirectories func (f *Finder) FindFirstNamedAt(rootPath string, fileName string) []string { filter := func(entries DirEntries) (dirNames []string, fileNames []string) { matches := []string{} for _, foundName := range entries.FileNames { if foundName == fileName { matches = append(matches, foundName) } } if len(matches) > 0 { return []string{}, matches } return entries.DirNames, matches } return f.FindMatching(rootPath, filter) } // FindMatching is the most general exported function for searching for files in the cache // The WalkFunc will be invoked repeatedly and is expected to modify the provided DirEntries // in place, removing file paths and directories as desired. // WalkFunc will be invoked potentially many times in parallel, and must be threadsafe. func (f *Finder) FindMatching(rootPath string, filter WalkFunc) []string { // set up some parameters scanStart := time.Now() var isRel bool workingDir := f.cacheMetadata.Config.WorkingDirectory isRel = !filepath.IsAbs(rootPath) if isRel { rootPath = filepath.Join(workingDir, rootPath) } rootPath = filepath.Clean(rootPath) // ensure nothing else is using the Finder f.verbosef("FindMatching waiting for finder to be idle\n") f.lock() defer f.unlock() node := f.nodes.GetNode(rootPath, false) if node == nil { f.verbosef("No data for path %v ; apparently not included in cache params: %v\n", rootPath, f.cacheMetadata.Config.CacheParams) // path is not found; don't do a search return []string{} } // search for matching files f.verbosef("Finder finding %v using cache\n", rootPath) results := f.findInCacheMultithreaded(node, filter, f.numSearchingThreads) // format and return results if isRel { for i := 0; i < len(results); i++ { results[i] = strings.Replace(results[i], workingDir+"/", "", 1) } } sort.Strings(results) f.verbosef("Found %v files under %v in %v using cache\n", len(results), rootPath, time.Since(scanStart)) return results } // Shutdown declares that the finder is no longer needed and waits for its cleanup to complete // Currently, that only entails waiting for the database dump to complete. func (f *Finder) Shutdown() { f.WaitForDbDump() } // WaitForDbDump returns once the database has been written to f.DbPath. func (f *Finder) WaitForDbDump() { f.shutdownWaitgroup.Wait() } // End of public api func (f *Finder) goDumpDb() { if f.wasModified() { f.shutdownWaitgroup.Add(1) go func() { err := f.dumpDb() if err != nil { f.verbosef("%v\n", err) } f.shutdownWaitgroup.Done() }() } else { f.verbosef("Skipping dumping unmodified db\n") } } // joinCleanPaths is like filepath.Join but is faster because // joinCleanPaths doesn't have to support paths ending in "/" or containing ".." func joinCleanPaths(base string, leaf string) string { if base == "" { return leaf } if base == "/" { return base + leaf } if leaf == "" { return base } return base + "/" + leaf } func (f *Finder) verbosef(format string, args ...interface{}) { f.logger.Output(2, fmt.Sprintf(format, args...)) } // loadFromFilesystem populates the in-memory cache based on the contents of the filesystem func (f *Finder) loadFromFilesystem() { f.threadPool = newThreadPool(f.numDbLoadingThreads) err := f.startFromExternalCache() if err != nil { f.startWithoutExternalCache() } f.goDumpDb() f.threadPool = nil } func (f *Finder) startFind(path string) { if !filepath.IsAbs(path) { path = filepath.Join(f.cacheMetadata.Config.WorkingDirectory, path) } node := f.nodes.GetNode(path, true) f.statDirAsync(node) } func (f *Finder) lock() { f.mutex.Lock() } func (f *Finder) unlock() { f.mutex.Unlock() } // a statResponse is the relevant portion of the response from the filesystem to a Stat call type statResponse struct { ModTime int64 Inode uint64 Device uint64 } // a pathAndStats stores a path and its stats type pathAndStats struct { statResponse Path string } // a dirFullInfo stores all of the relevant information we know about a directory type dirFullInfo struct { pathAndStats FileNames []string } // a PersistedDirInfo is the information about a dir that we save to our cache on disk type PersistedDirInfo struct { // These field names are short because they are repeated many times in the output json file P string // path T int64 // modification time I uint64 // inode number F []string // relevant filenames contained } // a PersistedDirs is the information that we persist for a group of dirs type PersistedDirs struct { // the device on which each directory is stored Device uint64 // the common root path to which all contained dirs are relative Root string // the directories themselves Dirs []PersistedDirInfo } // a CacheEntry is the smallest unit that can be read and parsed from the cache (on disk) at a time type CacheEntry []PersistedDirs // a DirEntries lists the files and directories contained directly within a specific directory type DirEntries struct { Path string // elements of DirNames are just the dir names; they don't include any '/' character DirNames []string // elements of FileNames are just the file names; they don't include '/' character FileNames []string } // a WalkFunc is the type that is passed into various Find functions for determining which // directories the caller wishes be walked. The WalkFunc is expected to decide which // directories to walk and which files to consider as matches to the original query. type WalkFunc func(DirEntries) (dirs []string, files []string) // a mapNode stores the relevant stats about a directory to be stored in a pathMap type mapNode struct { statResponse FileNames []string } // a pathMap implements the directory tree structure of nodes type pathMap struct { mapNode path string children map[string]*pathMap // number of descendent nodes, including self approximateNumDescendents int } func newPathMap(path string) *pathMap { result := &pathMap{path: path, children: make(map[string]*pathMap, 4), approximateNumDescendents: 1} return result } // GetNode returns the node at func (m *pathMap) GetNode(path string, createIfNotFound bool) *pathMap { if len(path) > 0 && path[0] == '/' { path = path[1:] } node := m for { if path == "" { return node } index := strings.Index(path, "/") var firstComponent string if index >= 0 { firstComponent = path[:index] path = path[index+1:] } else { firstComponent = path path = "" } child, found := node.children[firstComponent] if !found { if createIfNotFound { child = node.newChild(firstComponent) } else { return nil } } node = child } } func (m *pathMap) newChild(name string) (child *pathMap) { path := joinCleanPaths(m.path, name) newChild := newPathMap(path) m.children[name] = newChild return m.children[name] } func (m *pathMap) UpdateNumDescendents() int { count := 1 for _, child := range m.children { count += child.approximateNumDescendents } m.approximateNumDescendents = count return count } func (m *pathMap) UpdateNumDescendentsRecursive() { for _, child := range m.children { child.UpdateNumDescendentsRecursive() } m.UpdateNumDescendents() } func (m *pathMap) MergeIn(other *pathMap) { for key, theirs := range other.children { ours, found := m.children[key] if found { ours.MergeIn(theirs) } else { m.children[key] = theirs } } if other.ModTime != 0 { m.mapNode = other.mapNode } m.UpdateNumDescendents() } func (m *pathMap) DumpAll() []dirFullInfo { results := []dirFullInfo{} m.dumpInto("", &results) return results } func (m *pathMap) dumpInto(path string, results *[]dirFullInfo) { *results = append(*results, dirFullInfo{ pathAndStats{statResponse: m.statResponse, Path: path}, m.FileNames}, ) for key, child := range m.children { childPath := joinCleanPaths(path, key) if len(childPath) == 0 || childPath[0] != '/' { childPath = "/" + childPath } child.dumpInto(childPath, results) } } // a semaphore can be locked by up to callers at once type semaphore struct { pool chan bool } func newSemaphore(capacity int) *semaphore { return &semaphore{pool: make(chan bool, capacity)} } func (l *semaphore) Lock() { l.pool <- true } func (l *semaphore) Unlock() { <-l.pool } // A threadPool runs goroutines and supports throttling and waiting. // Without throttling, Go may exhaust the maximum number of various resources, such as // threads or file descriptors, and crash the program. type threadPool struct { receivedRequests sync.WaitGroup activeRequests semaphore } func newThreadPool(maxNumConcurrentThreads int) *threadPool { return &threadPool{ receivedRequests: sync.WaitGroup{}, activeRequests: *newSemaphore(maxNumConcurrentThreads), } } // Run requests to run the given function in its own goroutine func (p *threadPool) Run(function func()) { p.receivedRequests.Add(1) // If Run() was called from within a goroutine spawned by this threadPool, // then we may need to return from Run() before having capacity to actually // run . // // It's possible that the body of contains a statement (such as a syscall) // that will cause Go to pin it to a thread, or will contain a statement that uses // another resource that is in short supply (such as a file descriptor), so we can't // actually run until we have capacity. // // However, the semaphore used for synchronization is implemented via a channel and // shouldn't require a new thread for each access. go func() { p.activeRequests.Lock() function() p.activeRequests.Unlock() p.receivedRequests.Done() }() } // Wait waits until all goroutines are done, just like sync.WaitGroup's Wait func (p *threadPool) Wait() { p.receivedRequests.Wait() } type fsErr struct { path string err error } func (e fsErr) String() string { return e.path + ": " + e.err.Error() } func (f *Finder) serializeCacheEntry(dirInfos []dirFullInfo) ([]byte, error) { // group each dirFullInfo by its Device, to avoid having to repeat it in the output dirsByDevice := map[uint64][]PersistedDirInfo{} for _, entry := range dirInfos { _, found := dirsByDevice[entry.Device] if !found { dirsByDevice[entry.Device] = []PersistedDirInfo{} } dirsByDevice[entry.Device] = append(dirsByDevice[entry.Device], PersistedDirInfo{P: entry.Path, T: entry.ModTime, I: entry.Inode, F: entry.FileNames}) } cacheEntry := CacheEntry{} for device, infos := range dirsByDevice { // find common prefix prefix := "" if len(infos) > 0 { prefix = infos[0].P } for _, info := range infos { for !strings.HasPrefix(info.P+"/", prefix+"/") { prefix = filepath.Dir(prefix) if prefix == "/" { break } } } // remove common prefix for i := range infos { suffix := strings.Replace(infos[i].P, prefix, "", 1) if len(suffix) > 0 && suffix[0] == '/' { suffix = suffix[1:] } infos[i].P = suffix } // turn the map (keyed by device) into a list of structs with labeled fields // this is to improve readability of the output cacheEntry = append(cacheEntry, PersistedDirs{Device: device, Root: prefix, Dirs: infos}) } // convert to json. // it would save some space to use a different format than json for the db file, // but the space and time savings are small, and json is easy for humans to read bytes, err := json.Marshal(cacheEntry) return bytes, err } func (f *Finder) parseCacheEntry(bytes []byte) ([]dirFullInfo, error) { var cacheEntry CacheEntry err := json.Unmarshal(bytes, &cacheEntry) if err != nil { return nil, err } // convert from a CacheEntry to a []dirFullInfo (by copying a few fields) capacity := 0 for _, element := range cacheEntry { capacity += len(element.Dirs) } nodes := make([]dirFullInfo, capacity) count := 0 for _, element := range cacheEntry { for _, dir := range element.Dirs { path := joinCleanPaths(element.Root, dir.P) nodes[count] = dirFullInfo{ pathAndStats: pathAndStats{ statResponse: statResponse{ ModTime: dir.T, Inode: dir.I, Device: element.Device, }, Path: path}, FileNames: dir.F} count++ } } return nodes, nil } // We use the following separator byte to distinguish individually parseable blocks of json // because we know this separator won't appear in the json that we're parsing. // // The newline byte can only appear in a UTF-8 stream if the newline character appears, because: // - The newline character is encoded as "0000 1010" in binary ("0a" in hex) // - UTF-8 dictates that bytes beginning with a "0" bit are never emitted as part of a multibyte // character. // // We know that the newline character will never appear in our json string, because: // - If a newline character appears as part of a data string, then json encoding will // emit two characters instead: '\' and 'n'. // - The json encoder that we use doesn't emit the optional newlines between any of its // other outputs. const lineSeparator = byte('\n') func (f *Finder) readLine(reader *bufio.Reader) ([]byte, error) { return reader.ReadBytes(lineSeparator) } // validateCacheHeader reads the cache header from cacheReader and tells whether the cache is compatible with this Finder func (f *Finder) validateCacheHeader(cacheReader *bufio.Reader) bool { cacheVersionBytes, err := f.readLine(cacheReader) if err != nil { f.verbosef("Failed to read database header; database is invalid\n") return false } if len(cacheVersionBytes) > 0 && cacheVersionBytes[len(cacheVersionBytes)-1] == lineSeparator { cacheVersionBytes = cacheVersionBytes[:len(cacheVersionBytes)-1] } cacheVersionString := string(cacheVersionBytes) currentVersion := f.cacheMetadata.Version if cacheVersionString != currentVersion { f.verbosef("Version changed from %q to %q, database is not applicable\n", cacheVersionString, currentVersion) return false } cacheParamBytes, err := f.readLine(cacheReader) if err != nil { f.verbosef("Failed to read database search params; database is invalid\n") return false } if len(cacheParamBytes) > 0 && cacheParamBytes[len(cacheParamBytes)-1] == lineSeparator { cacheParamBytes = cacheParamBytes[:len(cacheParamBytes)-1] } currentParamBytes, err := f.cacheMetadata.Config.Dump() if err != nil { panic("Finder failed to serialize its parameters") } cacheParamString := string(cacheParamBytes) currentParamString := string(currentParamBytes) if cacheParamString != currentParamString { f.verbosef("Params changed from %q to %q, database is not applicable\n", cacheParamString, currentParamString) return false } return true } // loadBytes compares the cache info in to the state of the filesystem // loadBytes returns a map representing and also a slice of dirs that need to be re-walked func (f *Finder) loadBytes(id int, data []byte) (m *pathMap, dirsToWalk []string, err error) { helperStartTime := time.Now() cachedNodes, err := f.parseCacheEntry(data) if err != nil { return nil, nil, fmt.Errorf("Failed to parse block %v: %v\n", id, err.Error()) } unmarshalDate := time.Now() f.verbosef("Unmarshaled %v objects for %v in %v\n", len(cachedNodes), id, unmarshalDate.Sub(helperStartTime)) tempMap := newPathMap("/") stats := make([]statResponse, len(cachedNodes)) for i, node := range cachedNodes { // check the file system for an updated timestamp stats[i] = f.statDirSync(node.Path) } dirsToWalk = []string{} for i, cachedNode := range cachedNodes { updated := stats[i] // save the cached value container := tempMap.GetNode(cachedNode.Path, true) container.mapNode = mapNode{statResponse: updated} // if the metadata changed and the directory still exists, then // make a note to walk it later if !f.isInfoUpToDate(cachedNode.statResponse, updated) && updated.ModTime != 0 { f.setModified() // make a note that the directory needs to be walked dirsToWalk = append(dirsToWalk, cachedNode.Path) } else { container.mapNode.FileNames = cachedNode.FileNames } } // count the number of nodes to improve our understanding of the shape of the tree, // thereby improving parallelism of subsequent searches tempMap.UpdateNumDescendentsRecursive() f.verbosef("Statted inodes of block %v in %v\n", id, time.Now().Sub(unmarshalDate)) return tempMap, dirsToWalk, nil } // startFromExternalCache loads the cache database from disk // startFromExternalCache waits to return until the load of the cache db is complete, but // startFromExternalCache does not wait for all every listDir() or statDir() request to complete func (f *Finder) startFromExternalCache() (err error) { startTime := time.Now() dbPath := f.DbPath // open cache file and validate its header reader, err := f.filesystem.Open(dbPath) if err != nil { return errors.New("No data to load from database\n") } bufferedReader := bufio.NewReader(reader) if !f.validateCacheHeader(bufferedReader) { return errors.New("Cache header does not match") } f.verbosef("Database header matches, will attempt to use database %v\n", f.DbPath) // read the file and spawn threads to process it nodesToWalk := [][]*pathMap{} mainTree := newPathMap("/") // read the blocks and stream them into type dataBlock struct { id int err error data []byte } blockChannel := make(chan dataBlock, f.numDbLoadingThreads) readBlocks := func() { index := 0 for { // It takes some time to unmarshal the input from json, so we want // to unmarshal it in parallel. In order to find valid places to // break the input, we scan for the line separators that we inserted // (for this purpose) when we dumped the database. data, err := f.readLine(bufferedReader) var response dataBlock done := false if err != nil && err != io.EOF { response = dataBlock{id: index, err: err, data: nil} done = true } else { done = (err == io.EOF) response = dataBlock{id: index, err: nil, data: data} } blockChannel <- response index++ duration := time.Since(startTime) f.verbosef("Read block %v after %v\n", index, duration) if done { f.verbosef("Read %v blocks in %v\n", index, duration) close(blockChannel) return } } } go readBlocks() // Read from and stream the responses into . type workResponse struct { id int err error tree *pathMap updatedDirs []string } resultChannel := make(chan workResponse) processBlocks := func() { numProcessed := 0 threadPool := newThreadPool(f.numDbLoadingThreads) for { // get a block to process block, received := <-blockChannel if !received { break } if block.err != nil { resultChannel <- workResponse{err: block.err} break } numProcessed++ // wait until there is CPU available to process it threadPool.Run( func() { processStartTime := time.Now() f.verbosef("Starting to process block %v after %v\n", block.id, processStartTime.Sub(startTime)) tempMap, updatedDirs, err := f.loadBytes(block.id, block.data) var response workResponse if err != nil { f.verbosef( "Block %v failed to parse with error %v\n", block.id, err) response = workResponse{err: err} } else { response = workResponse{ id: block.id, err: nil, tree: tempMap, updatedDirs: updatedDirs, } } f.verbosef("Processed block %v in %v\n", block.id, time.Since(processStartTime), ) resultChannel <- response }, ) } threadPool.Wait() f.verbosef("Finished processing %v blocks in %v\n", numProcessed, time.Since(startTime)) close(resultChannel) } go processBlocks() // Read from and use the results combineResults := func() (err error) { for { result, received := <-resultChannel if !received { break } if err != nil { // In case of an error, wait for work to complete before // returning the error. This ensures that any subsequent // work doesn't need to compete for resources (and possibly // fail due to, for example, a filesystem limit on the number of // concurrently open files) with past work. continue } if result.err != nil { err = result.err continue } // update main tree mainTree.MergeIn(result.tree) // record any new directories that we will need to Stat() updatedNodes := make([]*pathMap, len(result.updatedDirs)) for j, dir := range result.updatedDirs { node := mainTree.GetNode(dir, false) updatedNodes[j] = node } nodesToWalk = append(nodesToWalk, updatedNodes) } return err } err = combineResults() if err != nil { return err } f.nodes = *mainTree // after having loaded the entire db and therefore created entries for // the directories we know of, now it's safe to start calling ReadDir on // any updated directories for i := range nodesToWalk { f.listDirsAsync(nodesToWalk[i]) } f.verbosef("Loaded db and statted known dirs in %v\n", time.Since(startTime)) f.threadPool.Wait() f.verbosef("Loaded db and statted all dirs in %v\n", time.Now().Sub(startTime)) return err } // startWithoutExternalCache starts scanning the filesystem according to the cache config // startWithoutExternalCache should be called if startFromExternalCache is not applicable func (f *Finder) startWithoutExternalCache() { startTime := time.Now() configDirs := f.cacheMetadata.Config.RootDirs // clean paths candidates := make([]string, len(configDirs)) for i, dir := range configDirs { candidates[i] = filepath.Clean(dir) } // remove duplicates dirsToScan := make([]string, 0, len(configDirs)) for _, candidate := range candidates { include := true for _, included := range dirsToScan { if included == "/" || strings.HasPrefix(candidate+"/", included+"/") { include = false break } } if include { dirsToScan = append(dirsToScan, candidate) } } // start searching finally for _, path := range dirsToScan { f.verbosef("Starting find of %v\n", path) f.startFind(path) } f.threadPool.Wait() f.verbosef("Scanned filesystem (not using cache) in %v\n", time.Now().Sub(startTime)) } // isInfoUpToDate tells whether can confirm that results computed at are still valid func (f *Finder) isInfoUpToDate(old statResponse, new statResponse) (equal bool) { if old.Inode != new.Inode { return false } if old.ModTime != new.ModTime { return false } if old.Device != new.Device { return false } return true } func (f *Finder) wasModified() bool { return atomic.LoadInt32(&f.modifiedFlag) > 0 } func (f *Finder) setModified() { var newVal int32 newVal = 1 atomic.StoreInt32(&f.modifiedFlag, newVal) } // sortedDirEntries exports directory entries to facilitate dumping them to the external cache func (f *Finder) sortedDirEntries() []dirFullInfo { startTime := time.Now() nodes := make([]dirFullInfo, 0) for _, node := range f.nodes.DumpAll() { if node.ModTime != 0 { nodes = append(nodes, node) } } discoveryDate := time.Now() f.verbosef("Generated %v cache entries in %v\n", len(nodes), discoveryDate.Sub(startTime)) less := func(i int, j int) bool { return nodes[i].Path < nodes[j].Path } sort.Slice(nodes, less) sortDate := time.Now() f.verbosef("Sorted %v cache entries in %v\n", len(nodes), sortDate.Sub(discoveryDate)) return nodes } // serializeDb converts the cache database into a form to save to disk func (f *Finder) serializeDb() ([]byte, error) { // sort dir entries var entryList = f.sortedDirEntries() // Generate an output file that can be conveniently loaded using the same number of threads // as were used in this execution (because presumably that will be the number of threads // used in the next execution too) // generate header header := []byte{} header = append(header, []byte(f.cacheMetadata.Version)...) header = append(header, lineSeparator) configDump, err := f.cacheMetadata.Config.Dump() if err != nil { return nil, err } header = append(header, configDump...) // serialize individual blocks in parallel numBlocks := f.numDbLoadingThreads if numBlocks > len(entryList) { numBlocks = len(entryList) } blocks := make([][]byte, 1+numBlocks) blocks[0] = header blockMin := 0 wg := sync.WaitGroup{} var errLock sync.Mutex for i := 1; i <= numBlocks; i++ { // identify next block blockMax := len(entryList) * i / numBlocks block := entryList[blockMin:blockMax] // process block wg.Add(1) go func(index int, block []dirFullInfo) { byteBlock, subErr := f.serializeCacheEntry(block) f.verbosef("Serialized block %v into %v bytes\n", index, len(byteBlock)) if subErr != nil { f.verbosef("%v\n", subErr.Error()) errLock.Lock() err = subErr errLock.Unlock() } else { blocks[index] = byteBlock } wg.Done() }(i, block) blockMin = blockMax } wg.Wait() if err != nil { return nil, err } content := bytes.Join(blocks, []byte{lineSeparator}) return content, nil } // dumpDb saves the cache database to disk func (f *Finder) dumpDb() error { startTime := time.Now() f.verbosef("Dumping db\n") tempPath := f.DbPath + ".tmp" bytes, err := f.serializeDb() if err != nil { return err } serializeDate := time.Now() f.verbosef("Serialized db in %v\n", serializeDate.Sub(startTime)) // dump file and atomically move err = f.filesystem.WriteFile(tempPath, bytes, 0777) if err != nil { return err } err = f.filesystem.Rename(tempPath, f.DbPath) if err != nil { return err } f.verbosef("Wrote db in %v\n", time.Now().Sub(serializeDate)) return nil } // canIgnoreFsErr checks for certain classes of filesystem errors that are safe to ignore func (f *Finder) canIgnoreFsErr(err error) bool { pathErr, isPathErr := err.(*os.PathError) if !isPathErr { // Don't recognize this error return false } if os.IsPermission(pathErr) { // Permission errors are ignored: // https://issuetracker.google.com/37553659 // https://github.com/google/kati/pull/116 return true } if pathErr.Err == os.ErrNotExist { // If a directory doesn't exist, that generally means the cache is out-of-date return true } // Don't recognize this error return false } // onFsError should be called whenever a potentially fatal error is returned from a filesystem call func (f *Finder) onFsError(path string, err error) { if !f.canIgnoreFsErr(err) { // We could send the errors through a channel instead, although that would cause this call // to block unless we preallocated a sufficient buffer or spawned a reader thread. // Although it wouldn't be too complicated to spawn a reader thread, it's still slightly // more convenient to use a lock. Only in an unusual situation should this code be // invoked anyway. f.errlock.Lock() f.fsErrs = append(f.fsErrs, fsErr{path: path, err: err}) f.errlock.Unlock() } } // discardErrsForPrunedPaths removes any errors for paths that are no longer included in the cache func (f *Finder) discardErrsForPrunedPaths() { // This function could be somewhat inefficient due to being single-threaded, // but the length of f.fsErrs should be approximately 0, so it shouldn't take long anyway. relevantErrs := make([]fsErr, 0, len(f.fsErrs)) for _, fsErr := range f.fsErrs { path := fsErr.path node := f.nodes.GetNode(path, false) if node != nil { // The path in question wasn't pruned due to a failure to process a parent directory. // So, the failure to process this path is important relevantErrs = append(relevantErrs, fsErr) } } f.fsErrs = relevantErrs } // getErr returns an error based on previous calls to onFsErr, if any func (f *Finder) getErr() error { f.discardErrsForPrunedPaths() numErrs := len(f.fsErrs) if numErrs < 1 { return nil } maxNumErrsToInclude := 10 message := "" if numErrs > maxNumErrsToInclude { message = fmt.Sprintf("finder encountered %v errors: %v...", numErrs, f.fsErrs[:maxNumErrsToInclude]) } else { message = fmt.Sprintf("finder encountered %v errors: %v", numErrs, f.fsErrs) } return errors.New(message) } func (f *Finder) statDirAsync(dir *pathMap) { node := dir path := dir.path f.threadPool.Run( func() { updatedStats := f.statDirSync(path) if !f.isInfoUpToDate(node.statResponse, updatedStats) { node.mapNode = mapNode{ statResponse: updatedStats, FileNames: []string{}, } f.setModified() if node.statResponse.ModTime != 0 { // modification time was updated, so re-scan for // child directories f.listDirAsync(dir) } } }, ) } func (f *Finder) statDirSync(path string) statResponse { fileInfo, err := f.filesystem.Lstat(path) var stats statResponse if err != nil { // possibly record this error f.onFsError(path, err) // in case of a failure to stat the directory, treat the directory as missing (modTime = 0) return stats } modTime := fileInfo.ModTime() stats = statResponse{} inode, err := f.filesystem.InodeNumber(fileInfo) if err != nil { panic(fmt.Sprintf("Could not get inode number of %v: %v\n", path, err.Error())) } stats.Inode = inode device, err := f.filesystem.DeviceNumber(fileInfo) if err != nil { panic(fmt.Sprintf("Could not get device number of %v: %v\n", path, err.Error())) } stats.Device = device permissionsChangeTime, err := f.filesystem.PermTime(fileInfo) if err != nil { panic(fmt.Sprintf("Could not get permissions modification time (CTime) of %v: %v\n", path, err.Error())) } // We're only interested in knowing whether anything about the directory // has changed since last check, so we use the latest of the two // modification times (content modification (mtime) and // permission modification (ctime)) if permissionsChangeTime.After(modTime) { modTime = permissionsChangeTime } stats.ModTime = modTime.UnixNano() return stats } // pruneCacheCandidates removes the items that we don't want to include in our persistent cache func (f *Finder) pruneCacheCandidates(items *DirEntries) { for _, fileName := range items.FileNames { for _, abortedName := range f.cacheMetadata.Config.PruneFiles { if fileName == abortedName { items.FileNames = []string{} items.DirNames = []string{} return } } } // remove any files that aren't the ones we want to include writeIndex := 0 for _, fileName := range items.FileNames { // include only these files for _, includedName := range f.cacheMetadata.Config.IncludeFiles { if fileName == includedName { items.FileNames[writeIndex] = fileName writeIndex++ break } } } // resize items.FileNames = items.FileNames[:writeIndex] writeIndex = 0 for _, dirName := range items.DirNames { items.DirNames[writeIndex] = dirName // ignore other dirs that are known to not be inputs to the build process include := true for _, excludedName := range f.cacheMetadata.Config.ExcludeDirs { if dirName == excludedName { // don't include include = false break } } if include { writeIndex++ } } // resize items.DirNames = items.DirNames[:writeIndex] } func (f *Finder) listDirsAsync(nodes []*pathMap) { f.threadPool.Run( func() { for i := range nodes { f.listDirSync(nodes[i]) } }, ) } func (f *Finder) listDirAsync(node *pathMap) { f.threadPool.Run( func() { f.listDirSync(node) }, ) } func (f *Finder) listDirSync(dir *pathMap) { path := dir.path children, err := f.filesystem.ReadDir(path) if err != nil { // possibly record this error f.onFsError(path, err) // if listing the contents of the directory fails (presumably due to // permission denied), then treat the directory as empty children = nil } var subdirs []string var subfiles []string for _, child := range children { linkBits := child.Mode() & os.ModeSymlink isLink := linkBits != 0 if isLink { childPath := filepath.Join(path, child.Name()) childStat, err := f.filesystem.Stat(childPath) if err != nil { // If stat fails this is probably a broken or dangling symlink, treat it as a file. subfiles = append(subfiles, child.Name()) } else if childStat.IsDir() { // Skip symlink dirs. // We don't have to support symlink dirs because // that would cause duplicates. } else { // We do have to support symlink files because the link name might be // different than the target name // (for example, Android.bp -> build/soong/root.bp) subfiles = append(subfiles, child.Name()) } } else if child.IsDir() { subdirs = append(subdirs, child.Name()) } else { subfiles = append(subfiles, child.Name()) } } parentNode := dir entry := &DirEntries{Path: path, DirNames: subdirs, FileNames: subfiles} f.pruneCacheCandidates(entry) // create a pathMap node for each relevant subdirectory relevantChildren := map[string]*pathMap{} for _, subdirName := range entry.DirNames { childNode, found := parentNode.children[subdirName] // if we already knew of this directory, then we already have a request pending to Stat it // if we didn't already know of this directory, then we must Stat it now if !found { childNode = parentNode.newChild(subdirName) f.statDirAsync(childNode) } relevantChildren[subdirName] = childNode } // Note that in rare cases, it's possible that we're reducing the set of // children via this statement, if these are all true: // 1. we previously had a cache that knew about subdirectories of parentNode // 2. the user created a prune-file (described in pruneCacheCandidates) // inside , which specifies that the contents of parentNode // are to be ignored. // The fact that it's possible to remove children here means that *pathMap structs // must not be looked up from f.nodes by filepath (and instead must be accessed by // direct pointer) until after every listDirSync completes parentNode.FileNames = entry.FileNames parentNode.children = relevantChildren } // listMatches takes a node and a function that specifies which subdirectories and // files to include, and listMatches returns the matches func (f *Finder) listMatches(node *pathMap, filter WalkFunc) (subDirs []*pathMap, filePaths []string) { entries := DirEntries{ FileNames: node.FileNames, } entries.DirNames = make([]string, 0, len(node.children)) for childName := range node.children { entries.DirNames = append(entries.DirNames, childName) } dirNames, fileNames := filter(entries) subDirs = []*pathMap{} filePaths = make([]string, 0, len(fileNames)) for _, fileName := range fileNames { filePaths = append(filePaths, joinCleanPaths(node.path, fileName)) } subDirs = make([]*pathMap, 0, len(dirNames)) for _, childName := range dirNames { child, ok := node.children[childName] if ok { subDirs = append(subDirs, child) } } return subDirs, filePaths } // findInCacheMultithreaded spawns potentially multiple goroutines with which to search the cache. func (f *Finder) findInCacheMultithreaded(node *pathMap, filter WalkFunc, approxNumThreads int) []string { if approxNumThreads < 2 { // Done spawning threads; process remaining directories return f.findInCacheSinglethreaded(node, filter) } totalWork := 0 for _, child := range node.children { totalWork += child.approximateNumDescendents } childrenResults := make(chan []string, len(node.children)) subDirs, filePaths := f.listMatches(node, filter) // process child directories for _, child := range subDirs { numChildThreads := approxNumThreads * child.approximateNumDescendents / totalWork childProcessor := func(child *pathMap) { childResults := f.findInCacheMultithreaded(child, filter, numChildThreads) childrenResults <- childResults } // If we're allowed to use more than 1 thread to process this directory, // then instead we use 1 thread for each subdirectory. // It would be strange to spawn threads for only some subdirectories. go childProcessor(child) } // collect results for i := 0; i < len(subDirs); i++ { childResults := <-childrenResults filePaths = append(filePaths, childResults...) } close(childrenResults) return filePaths } // findInCacheSinglethreaded synchronously searches the cache for all matching file paths // note findInCacheSinglethreaded runs 2X to 4X as fast by being iterative rather than recursive func (f *Finder) findInCacheSinglethreaded(node *pathMap, filter WalkFunc) []string { if node == nil { return []string{} } nodes := []*pathMap{node} matches := []string{} for len(nodes) > 0 { currentNode := nodes[0] nodes = nodes[1:] subDirs, filePaths := f.listMatches(currentNode, filter) nodes = append(nodes, subDirs...) matches = append(matches, filePaths...) } return matches }