diff --git a/vulnfeeds/cmd/combine-to-osv/main.go b/vulnfeeds/cmd/combine-to-osv/main.go index c0f0b38b56f..06a6ada272d 100644 --- a/vulnfeeds/cmd/combine-to-osv/main.go +++ b/vulnfeeds/cmd/combine-to-osv/main.go @@ -16,8 +16,8 @@ import ( "cloud.google.com/go/storage" "github.com/google/osv/vulnfeeds/conversion" + "github.com/google/osv/vulnfeeds/conversion/writer" "github.com/google/osv/vulnfeeds/models" - "github.com/google/osv/vulnfeeds/upload" "github.com/google/osv/vulnfeeds/utility/logger" "github.com/ossf/osv-schema/bindings/go/osvschema" "google.golang.org/api/iterator" @@ -92,7 +92,7 @@ func main() { vulnerabilities = append(vulnerabilities, v) } - upload.Upload(ctx, "OSV files", *uploadToGCS, *outputBucketName, *overridesBucketName, *numWorkers, *osvOutputPath, vulnerabilities, *syncDeletions) + writer.UploadVulnsToGCS(ctx, "OSV files", *uploadToGCS, *outputBucketName, *overridesBucketName, *numWorkers, *osvOutputPath, vulnerabilities, *syncDeletions) } // extractCVEName extracts the CVE name from a given filename and prefix. diff --git a/vulnfeeds/cmd/converters/alpine/main.go b/vulnfeeds/cmd/converters/alpine/main.go index b90d5a8810d..720664d8f3a 100644 --- a/vulnfeeds/cmd/converters/alpine/main.go +++ b/vulnfeeds/cmd/converters/alpine/main.go @@ -15,8 +15,8 @@ import ( "strings" "time" + "github.com/google/osv/vulnfeeds/conversion/writer" "github.com/google/osv/vulnfeeds/models" - "github.com/google/osv/vulnfeeds/upload" "github.com/google/osv/vulnfeeds/utility/logger" "github.com/google/osv/vulnfeeds/vulns" "github.com/ossf/osv-schema/bindings/go/osvschema" @@ -72,7 +72,7 @@ func main() { } ctx := context.Background() - upload.Upload(ctx, "Alpine CVEs", *uploadToGCS, *outputBucketName, "", *numWorkers, *alpineOutputPath, vulnerabilities, *syncDeletions) + writer.UploadVulnsToGCS(ctx, "Alpine CVEs", *uploadToGCS, *outputBucketName, "", *numWorkers, *alpineOutputPath, vulnerabilities, *syncDeletions) logger.Info("Alpine CVE conversion succeeded.") } diff --git a/vulnfeeds/cmd/converters/cve/cve5/bulk-converter/main.go b/vulnfeeds/cmd/converters/cve/cve5/bulk-converter/main.go index a466b36b8be..8be87eb896c 100644 --- a/vulnfeeds/cmd/converters/cve/cve5/bulk-converter/main.go +++ b/vulnfeeds/cmd/converters/cve/cve5/bulk-converter/main.go @@ -2,6 +2,8 @@ package main import ( + "bytes" + "context" _ "embed" "encoding/json" "flag" @@ -14,8 +16,9 @@ import ( "sync" "time" - "github.com/google/osv/vulnfeeds/conversion" "github.com/google/osv/vulnfeeds/conversion/cve5" + "github.com/google/osv/vulnfeeds/conversion/writer" + "github.com/google/osv/vulnfeeds/gcs-tools" "github.com/google/osv/vulnfeeds/models" "github.com/google/osv/vulnfeeds/utility/logger" ) @@ -24,9 +27,13 @@ var ( repoDir = flag.String("cve5-repo", "cvelistV5", "CVEListV5 directory path") localOutputDir = flag.String("out-dir", "cve5", "Path to output results.") startYear = flag.String("start-year", "2022", "The first in scope year to process.") - workers = flag.Int("workers", 30, "The number of concurrent workers to use for processing CVEs.") + workers = flag.Int("workers", 10, "The number of concurrent workers to use for processing CVEs.") + gcsWorkers = flag.Int("gcs-workers", 30, "The number of concurrent workers to use for GCS uploads.") cnaDenyList = flag.String("cna-denylist", "", "A comma-separated list of CNAs to skip. If not provided, defaults to cna_denylist.txt.") rejectFailed = flag.Bool("reject-failed", false, "If set, OSV records with a failed conversion outcome will not be generated.") + uploadToGCS = flag.Bool("upload-to-gcs", false, "If true, upload to GCS bucket instead of writing to local disk.") + outputBucket = flag.String("output-bucket", "osv-test-cve-osv-conversion", "The GCS bucket to write to.") + gcsPrefix = flag.String("gcs-prefix", "cve5-osv", "The prefix within the GCS bucket.") ) //go:embed cna_denylist.txt @@ -56,10 +63,22 @@ func main() { } } + var gcsHelper *gcs.Helper + ctx := context.Background() + if *uploadToGCS { + var err error + gcsHelper, err = gcs.InitUploadPool(ctx, *gcsWorkers, *outputBucket) + if err != nil { + logger.Fatal("Failed to initialize GCS upload pool", slog.Any("err", err)) + } + defer gcsHelper.CloseAndWait() + logger.Info("GCS Upload Pool initialized", slog.String("bucket", *outputBucket)) + } + // Start the worker pool. for range *workers { wg.Add(1) - go worker(&wg, jobs, *localOutputDir, cnaList, *rejectFailed) + go worker(&wg, jobs, gcsHelper, *localOutputDir, cnaList, *rejectFailed) } // Discover files and send them to the workers. @@ -98,7 +117,7 @@ func main() { } // worker is a function that processes CVE files from the jobs channel. -func worker(wg *sync.WaitGroup, jobs <-chan string, outDir string, cnas []string, rejectFailed bool) { +func worker(wg *sync.WaitGroup, jobs <-chan string, gcsHelper *gcs.Helper, outDir string, cnas []string, rejectFailed bool) { defer wg.Done() for path := range jobs { data, err := os.ReadFile(path) @@ -119,12 +138,6 @@ func worker(wg *sync.WaitGroup, jobs <-chan string, outDir string, cnas []string cveID := cve.Metadata.CVEID logger.Info("Processing "+string(cveID), slog.String("cve", string(cveID))) - osvFile, errCVE := conversion.CreateOSVFile(cveID, outDir) - metricsFile, errMetrics := conversion.CreateMetricsFile(cveID, outDir) - if errCVE != nil || errMetrics != nil { - logger.Fatal("File failed to be created for CVE", slog.String("cve", string(cveID))) - } - sourceLink := "" baseDirCVEList := "cves/" // The base folder for the CVEListV5 repository. idx := strings.Index(path, baseDirCVEList) @@ -133,21 +146,56 @@ func worker(wg *sync.WaitGroup, jobs <-chan string, outDir string, cnas []string sourceLink = "https://github.com/CVEProject/cvelistV5/tree/main/" + relPath } - // Perform the conversion and export the results. - metrics, err := cve5.ConvertAndExportCVEToOSV(cve, osvFile, metricsFile, sourceLink) - if err != nil { - logger.Warn("Failed to generate an OSV record", slog.String("cve", string(cveID)), slog.Any("err", err)) + if gcsHelper != nil { + var vulnBuf, metricsBuf bytes.Buffer + metrics, err := cve5.ConvertAndExportCVEToOSV(cve, &vulnBuf, &metricsBuf, sourceLink) + if err != nil { + logger.Warn("Failed to generate an OSV record", slog.String("cve", string(cveID)), slog.Any("err", err)) + } else { + if rejectFailed && metrics.Outcome != models.Successful { + logger.Info("Rejecting failed OSV record", slog.String("cve", string(cveID)), slog.String("outcome", metrics.Outcome.String())) + } else { + logger.Info("Queueing OSV record for "+string(cveID), slog.String("cve", string(cveID))) + objectName := filepath.Join(*gcsPrefix, string(cveID)+".json") + gcsHelper.Upload(objectName, bytes.NewReader(vulnBuf.Bytes()), "", "application/json") + + metricsObjectName := filepath.Join(*gcsPrefix, string(cveID)+".metrics.json") + gcsHelper.Upload(metricsObjectName, bytes.NewReader(metricsBuf.Bytes()), "", "application/json") + } + } + + // Always write metrics locally for outcomes CSV auditing + metricsFile, err := writer.CreateMetricsFile(cveID, outDir) + if err == nil { + err = writer.WriteMetricsFile(metrics, metricsFile) + if err != nil { + logger.Error("Failed to write metrics file", slog.String("cve", string(cveID)), slog.Any("err", err)) + } + metricsFile.Close() + } } else { - if rejectFailed && metrics.Outcome != models.Successful { - logger.Info("Rejecting failed OSV record", slog.String("cve", string(cveID)), slog.String("outcome", metrics.Outcome.String())) - osvFile.Close() - os.Remove(osvFile.Name()) + osvFile, errCVE := writer.CreateOSVFile(cveID, outDir) + metricsFile, errMetrics := writer.CreateMetricsFile(cveID, outDir) + if errCVE != nil || errMetrics != nil { + logger.Fatal("File failed to be created for CVE", slog.String("cve", string(cveID))) + } + + // Perform the conversion and export the results. + metrics, err := cve5.ConvertAndExportCVEToOSV(cve, osvFile, metricsFile, sourceLink) + if err != nil { + logger.Warn("Failed to generate an OSV record", slog.String("cve", string(cveID)), slog.Any("err", err)) } else { - logger.Info("Generated OSV record for "+string(cveID), slog.String("cve", string(cveID)), slog.String("cna", cve.Metadata.AssignerShortName), slog.String("outcome", metrics.Outcome.String())) + if rejectFailed && metrics.Outcome != models.Successful { + logger.Info("Rejecting failed OSV record", slog.String("cve", string(cveID)), slog.String("outcome", metrics.Outcome.String())) + osvFile.Close() + os.Remove(osvFile.Name()) + } else { + logger.Info("Generated OSV record for "+string(cveID), slog.String("cve", string(cveID)), slog.String("cna", cve.Metadata.AssignerShortName), slog.String("outcome", metrics.Outcome.String())) + } } - } - metricsFile.Close() - osvFile.Close() + metricsFile.Close() + osvFile.Close() + } } } diff --git a/vulnfeeds/cmd/converters/cve/cve5/bulk-converter/run-cvelist-converter.sh b/vulnfeeds/cmd/converters/cve/cve5/bulk-converter/run-cvelist-converter.sh index 4450f834452..f4c04d24c8a 100755 --- a/vulnfeeds/cmd/converters/cve/cve5/bulk-converter/run-cvelist-converter.sh +++ b/vulnfeeds/cmd/converters/cve/cve5/bulk-converter/run-cvelist-converter.sh @@ -32,7 +32,8 @@ set -u echo "Commencing cvelist conversion run" -NUM_WORKERS="${NUM_WORKERS:=30}" +NUM_WORKERS="${NUM_WORKERS:=10}" +GCS_WORKERS="${GCS_WORKERS:=30}" OUTPUT_BUCKET="${OUTPUT_BUCKET:=osv-test-cve-osv-conversion}" OSV_OUTPUT_PATH="cve5" @@ -53,30 +54,12 @@ fi # Convert CVEList records to OSV. echo "Commence CVEList bulk conversion run" ./cve-bulk-converter \ - --start-year="2022" \ - --out-dir="${LOCAL_OUT_DIR}/${OSV_OUTPUT_PATH}" \ - --workers="${NUM_WORKERS}" - -# Copy results to staging area. -echo "Copying CVEList records successfully converted to OSV to aggregated staging" -find "${LOCAL_OUT_DIR}/${OSV_OUTPUT_PATH}" -type f -name \*.json \ - -exec cp '{}' "${LOCAL_OUT_DIR}/gcs_stage/" \; - -# Copy (and remove any missing) results to GCS bucket, with some sanity -# checking. -objs_present=$(gcloud storage ls "${OSV_OUTPUT_GCS_PATH}" | wc -l) -objs_deleted=$(gcloud storage rsync --checksums-only --dry-run --delete-unmatched-destination-objects "${LOCAL_OUT_DIR}/gcs_stage" "${OSV_OUTPUT_GCS_PATH}" 2>&1 | grep "Would remove" | wc -l) - -threshold=$(echo "scale=2; ${objs_present} * (${SAFETY_THRESHOLD_PCT:-2} / 100)" | bc) - -# # Bash can't deal with floats -if (( $(echo "${objs_deleted} > ${threshold}" | bc -l) )); then - echo "Aborting. Unexpectedly high (${objs_deleted}) number of CVE records would be deleted!" >> /dev/stderr - gcloud storage rsync --checksums-only --dry-run --delete-unmatched-destination-objects "${LOCAL_OUT_DIR}/gcs_stage" "${OSV_OUTPUT_GCS_PATH}" 2>&1 | grep "Would remove" >> /dev/stderr - exit 1 -fi - -echo "Copying CVEList records successfully converted to GCS bucket" -gcloud storage rsync --checksums-only --delete-unmatched-destination-objects "${LOCAL_OUT_DIR}/gcs_stage" "${OSV_OUTPUT_GCS_PATH}" + --start-year="2022" \ + --out-dir="${LOCAL_OUT_DIR}/${OSV_OUTPUT_PATH}" \ + --workers="${NUM_WORKERS}" \ + --gcs-workers="${GCS_WORKERS}" \ + --upload-to-gcs=true \ + --output-bucket="${OUTPUT_BUCKET}" \ + --gcs-prefix="${OSV_OUTPUT_PATH}" echo "Conversion run complete" diff --git a/vulnfeeds/cmd/converters/cve/cve5/single-converter/main.go b/vulnfeeds/cmd/converters/cve/cve5/single-converter/main.go index 3dc45a05c8a..70eaf67fed0 100644 --- a/vulnfeeds/cmd/converters/cve/cve5/single-converter/main.go +++ b/vulnfeeds/cmd/converters/cve/cve5/single-converter/main.go @@ -7,8 +7,8 @@ import ( "log/slog" "os" - "github.com/google/osv/vulnfeeds/conversion" "github.com/google/osv/vulnfeeds/conversion/cve5" + "github.com/google/osv/vulnfeeds/conversion/writer" "github.com/google/osv/vulnfeeds/models" "github.com/google/osv/vulnfeeds/utility/logger" ) @@ -46,8 +46,8 @@ func main() { } // create the files - osvFile, errCVE := conversion.CreateOSVFile(cveID, outDir) - metricsFile, errMetrics := conversion.CreateMetricsFile(cveID, outDir) + osvFile, errCVE := writer.CreateOSVFile(cveID, outDir) + metricsFile, errMetrics := writer.CreateMetricsFile(cveID, outDir) if errCVE != nil || errMetrics != nil { logger.Fatal("File failed to be created for CVE", slog.String("cve", string(cveID))) } diff --git a/vulnfeeds/cmd/converters/cve/nvd-cve-osv/main.go b/vulnfeeds/cmd/converters/cve/nvd-cve-osv/main.go index 1f0a9867e38..81a05c189cb 100644 --- a/vulnfeeds/cmd/converters/cve/nvd-cve-osv/main.go +++ b/vulnfeeds/cmd/converters/cve/nvd-cve-osv/main.go @@ -2,6 +2,7 @@ package main import ( + "context" "encoding/json" "flag" "fmt" @@ -12,24 +13,40 @@ import ( "regexp" "runtime/pprof" "slices" + "strconv" "sync" + "sync/atomic" c "github.com/google/osv/vulnfeeds/conversion" "github.com/google/osv/vulnfeeds/conversion/nvd" + "github.com/google/osv/vulnfeeds/conversion/writer" + "github.com/google/osv/vulnfeeds/gcs-tools" "github.com/google/osv/vulnfeeds/git" "github.com/google/osv/vulnfeeds/models" "github.com/google/osv/vulnfeeds/utility/logger" + "github.com/google/osv/vulnfeeds/vulns" ) var ( jsonPath = flag.String("nvd-json", "", "Path to NVD CVE JSON to examine.") + jsonDir = flag.String("nvd-json-dir", "", "Path to directory containing NVD CVE JSON files to examine.") parsedCPEDictionary = flag.String("cpe-repos", "", "Path to JSON mapping of CPEs to repos generated by cpe-repo-gen") outDir = flag.String("out-dir", "", "Path to output results.") outFormat = flag.String("out-format", "OSV", "Format to output {OSV,PackageInfo}") - workers = flag.Int("workers", 30, "The number of concurrent workers to use for processing CVEs.") + workers = flag.Int("workers", 10, "The number of concurrent workers to use for processing CVEs.") + gcsWorkers = flag.Int("gcs-workers", 30, "The number of concurrent workers to use for GCS uploads.") rejectFailed = flag.Bool("reject-failed", false, "If set, OSV records with a failed conversion outcome will not be generated.") outputMetrics = flag.Bool("output-metrics", true, "If true, output the metrics information about the conversion") cpuProfile = flag.String("cpuprofile", "", "Path to write cpu profile to file (default = no output)") + uploadToGCS = flag.Bool("upload-to-gcs", false, "If true, upload to GCS bucket instead of writing to local disk.") + outputBucket = flag.String("output-bucket", "osv-test-cve-osv-conversion", "The GCS bucket to write to.") + gcsPrefix = flag.String("gcs-prefix", "nvd-osv", "The prefix within the GCS bucket.") + startYear = flag.Int("start-year", 2016, "The first in scope year to process. If 0, process all years.") +) + +var ( + totalConversionsCount atomic.Uint64 + successfulConversionsCount atomic.Uint64 ) func main() { @@ -39,6 +56,12 @@ func main() { os.Exit(1) } + if *outDir != "" { + if err := os.MkdirAll(*outDir, 0755); err != nil { + logger.Fatal("Failed to create output directory", slog.Any("err", err)) + } + } + if *cpuProfile != "" { f, err := os.Create(*cpuProfile) if err != nil { @@ -54,20 +77,49 @@ func main() { logger.InitGlobalLogger() defer logger.Close() - data, err := os.ReadFile(*jsonPath) - if err != nil { - logger.Fatal("Failed to open file", slog.Any("err", err)) // double check this is best practice output + var files []string + if *jsonDir != "" { + matches, err := filepath.Glob(filepath.Join(*jsonDir, "nvdcve-2.0-*.json")) + if err != nil { + logger.Fatal("Failed to glob NVD JSON directory", slog.Any("err", err)) + } + for _, file := range matches { + filename := filepath.Base(file) + re := regexp.MustCompile(`nvdcve-2\.0-([0-9]{4})\.json`) + submatches := re.FindStringSubmatch(filename) + if len(submatches) >= 2 { + yearInt, _ := strconv.Atoi(submatches[1]) + if *startYear > 0 && yearInt < *startYear { + continue + } + } + files = append(files, file) + } + } else if *jsonPath != "" { + files = []string{*jsonPath} + } else { + logger.Fatal("Either --nvd-json or --nvd-json-dir must be provided") } - var parsed models.CVEAPIJSON20Schema - err = json.Unmarshal(data, &parsed) - if err != nil { - logger.Fatal("Failed to parse NVD CVE JSON", slog.Any("err", err)) + if len(files) == 0 { + logger.Fatal("No NVD JSON files found to process") } + // Process newest years first (reverse chronological order) + slices.SortFunc(files, func(a, b string) int { + if a > b { + return -1 + } + if a < b { + return 1 + } + + return 0 + }) + vpRepoCache := c.NewVPRepoCache() if *parsedCPEDictionary != "" { - err = c.LoadCPEDictionary(vpRepoCache, *parsedCPEDictionary) + err := c.LoadCPEDictionary(vpRepoCache, *parsedCPEDictionary) if err != nil { logger.Fatal("Failed to load parsed CPE dictionary", slog.Any("err", err)) } @@ -76,38 +128,64 @@ func main() { repoTagsCache := &git.RepoTagsCache{} - jobs := make(chan models.NVDCVE) + var gcsHelper *gcs.Helper + ctx := context.Background() + if *uploadToGCS { + var err error + gcsHelper, err = gcs.InitUploadPool(ctx, *gcsWorkers, *outputBucket) + if err != nil { + logger.Fatal("Failed to initialize GCS upload pool", slog.Any("err", err)) + } + defer gcsHelper.CloseAndWait() + logger.Info("GCS Upload Pool initialized", slog.String("bucket", *outputBucket)) + } + + jobs := make(chan models.NVDCVE, *workers) var wg sync.WaitGroup for range *workers { wg.Add(1) - go worker(&wg, jobs, *outDir, vpRepoCache, repoTagsCache) + go worker(&wg, jobs, gcsHelper, *outDir, vpRepoCache, repoTagsCache) } - for _, cve := range parsed.Vulnerabilities { - jobs <- cve.CVE + for _, file := range files { + logger.Info("Parsing NVD CVE file", slog.String("file", file)) + data, err := os.ReadFile(file) + if err != nil { + logger.Error("Failed to read NVD JSON file, skipping", slog.String("file", file), slog.Any("err", err)) + continue + } + + var parsed models.CVEAPIJSON20Schema + err = json.Unmarshal(data, &parsed) + if err != nil { + logger.Error("Failed to parse NVD JSON file, skipping", slog.String("file", file), slog.Any("err", err)) + continue + } + + for _, cve := range parsed.Vulnerabilities { + jobs <- cve.CVE + } } close(jobs) wg.Wait() + if gcsHelper != nil { + gcsHelper.CloseAndWait() + } + logger.Info("Conversion Stats", + slog.Uint64("total_processed", totalConversionsCount.Load()), + slog.Uint64("successful_conversions", successfulConversionsCount.Load()), + ) logger.Info("NVD Conversion run complete") // Conduct analysis on the outcome of the converted files and output to a csv if *outputMetrics { - // Try to extract year from path, otherwise use "xxxx" filler - filename := filepath.Base(*jsonPath) - re := regexp.MustCompile(`nvdcve-2\.0-([0-9]{4})\.json`) - matches := re.FindStringSubmatch(filename) - if len(matches) >= 2 { - year := matches[1] - c.ConductAnalysis(year, *outDir) - } else { - c.ConductAnalysis("xxxx", *outDir) - } + c.ConductAnalysis("all", *outDir) } } -func processCVE(cve models.NVDCVE, vpRepoCache *c.VPRepoCache, repoTagsCache *git.RepoTagsCache) models.ConversionOutcome { +func processCVE(cve models.NVDCVE, vpRepoCache *c.VPRepoCache, repoTagsCache *git.RepoTagsCache) (*vulns.Vulnerability, *models.ConversionMetrics, models.ConversionOutcome) { metrics := &models.ConversionMetrics{ CVEID: cve.ID, CNA: "nvd", @@ -116,24 +194,91 @@ func processCVE(cve models.NVDCVE, vpRepoCache *c.VPRepoCache, repoTagsCache *gi metrics.Repos = repos var outcome models.ConversionOutcome + var vuln *vulns.Vulnerability + var finalMetrics *models.ConversionMetrics switch *outFormat { case "OSV": - outcome = nvd.CVEToOSV(cve, repos, repoTagsCache, *outDir, metrics, *rejectFailed, *outputMetrics) + vuln, finalMetrics, outcome = nvd.CVEToOSV(cve, repos, repoTagsCache, metrics) case "PackageInfo": outcome = nvd.CVEToPackageInfo(cve, repos, repoTagsCache, *outDir, metrics) + finalMetrics = metrics } - return outcome + return vuln, finalMetrics, outcome } -func worker(wg *sync.WaitGroup, jobs <-chan models.NVDCVE, _ string, vpRepoCache *c.VPRepoCache, repoTagsCache *git.RepoTagsCache) { +func worker(wg *sync.WaitGroup, jobs <-chan models.NVDCVE, gcsHelper *gcs.Helper, outDir string, vpRepoCache *c.VPRepoCache, repoTagsCache *git.RepoTagsCache) { defer wg.Done() for cve := range jobs { - outcome := processCVE(cve, vpRepoCache, repoTagsCache) + vuln, metrics, outcome := processCVE(cve, vpRepoCache, repoTagsCache) + totalConversionsCount.Add(1) + cveID := string(cve.ID) + if outcome == models.Error { + logger.Error("Error generating OSV record", slog.String("cve", cveID), slog.String("outcome", outcome.String())) + continue // Don't attempt to output files if there was an error + } + if outcome != models.Successful { - logger.Info("Failed to generate an OSV record", slog.String("cve", string(cve.ID)), slog.String("outcome", outcome.String())) + logger.Info("Failed to generate a successful OSV record", slog.String("cve", cveID), slog.String("outcome", outcome.String())) + if *rejectFailed { + continue // Skip outputting OSV file + } } else { - logger.Info("Generated OSV record for "+string(cve.ID), slog.String("cve", string(cve.ID))) + logger.Info("Generated OSV record for "+cveID, slog.String("cve", cveID)) + successfulConversionsCount.Add(1) + } + + // Extract year from CVE ID to organize local outputs into subfolders + re := regexp.MustCompile(`CVE-([0-9]{4})-[0-9]+`) + matches := re.FindStringSubmatch(cveID) + year := "xxxx" + if len(matches) >= 2 { + year = matches[1] + } + cveOutDir := outDir + if outDir != "" { + cveOutDir = filepath.Join(outDir, year) + if err := os.MkdirAll(cveOutDir, 0755); err != nil { + logger.Error("Failed to create year directory", slog.String("dir", cveOutDir), slog.Any("err", err)) + continue + } + } + + if *uploadToGCS && gcsHelper != nil { + if vuln != nil { + if err := writer.UploadVulnIfChangedAsync(gcsHelper, *gcsPrefix, vuln.Vulnerability); err != nil { + logger.Error("Failed to queue vulnerability upload", slog.String("cve", vuln.Id), slog.Any("err", err)) + } + } + if *outputMetrics && metrics != nil { + if err := writer.UploadMetricsToGCSAsync(gcsHelper, *gcsPrefix, models.CVEID(cveID), metrics); err != nil { + logger.Error("Failed to queue metrics upload", slog.String("cve", cveID), slog.Any("err", err)) + } + } + } else if vuln != nil { + osvFile, err := writer.CreateOSVFile(models.CVEID(vuln.Id), cveOutDir) + if err != nil { + logger.Error("Failed to create OSV file locally", slog.String("cve", vuln.Id), slog.Any("err", err)) + } else { + if err := vuln.ToJSON(osvFile); err != nil { + logger.Error("Failed to write OSV file locally", slog.String("cve", vuln.Id), slog.Any("err", err)) + } + osvFile.Close() + } + } + + // Always write metrics locally if requested, even if uploading to GCS + // so that we are able to analyse the outcomes in a csv file. + if *outputMetrics && metrics != nil { + metricsFile, err := writer.CreateMetricsFile(models.CVEID(cveID), cveOutDir) + if err != nil { + logger.Error("Failed to create metrics file locally", slog.String("cve", cveID), slog.Any("err", err)) + } else { + if err := writer.WriteMetricsFile(metrics, metricsFile); err != nil { + logger.Error("Failed to write metrics file locally", slog.String("cve", cveID), slog.Any("err", err)) + } + metricsFile.Close() + } } } } diff --git a/vulnfeeds/cmd/converters/cve/nvd-cve-osv/run_cve_to_osv_generation.sh b/vulnfeeds/cmd/converters/cve/nvd-cve-osv/run_cve_to_osv_generation.sh index 9a77e11b49c..31cce7a9a4c 100755 --- a/vulnfeeds/cmd/converters/cve/nvd-cve-osv/run_cve_to_osv_generation.sh +++ b/vulnfeeds/cmd/converters/cve/nvd-cve-osv/run_cve_to_osv_generation.sh @@ -41,43 +41,31 @@ gcloud --no-user-output-enabled storage -q cp "${NVD_GCS_PATH}/*-????.json" "${W echo "Downloading latest CPE Git repository map" gcloud --no-user-output-enabled storage -q cp "${CPEREPO_GCS_PATH}" "${WORK_DIR}" -mkdir -p "${WORK_DIR}/nvd2osv/gcs_stage" - NUM_WORKERS="${NUM_WORKERS:=10}" +GCS_WORKERS="${GCS_WORKERS:=30}" + +# Extract GCS bucket and prefix from OSV_OUTPUT_GCS_PATH. +gcs_path="${OSV_OUTPUT_GCS_PATH#gs://}" +OSV_OUTPUT_GCS_BUCKET="${gcs_path%%/*}" +if [[ "${gcs_path}" == *"/"* ]]; then + OSV_OUTPUT_GCS_PREFIX="${gcs_path#*/}" +else + OSV_OUTPUT_GCS_PREFIX="" +fi # Convert NVD CVE records to OSV. -for (( YEAR = $(date +%Y) ; YEAR >= ${FIRST_INSCOPE_YEAR} ; YEAR-- )); do - # Run OSV record generation. - echo "Converting NVD CVE records from ${YEAR} to OSV" - /usr/local/bin/nvd-cve-osv \ - --cpe-repos "${WORK_DIR}/cpe_product_to_repo.json" \ - --nvd-json "${WORK_DIR}/nvd/nvdcve-2.0-${YEAR}.json" \ - --out-dir "${WORK_DIR}/nvd2osv/${YEAR}" \ - --out-format OSV \ - --workers "${NUM_WORKERS}" - - # Copy results to staging area. - echo "Copying NVD CVE records from ${YEAR} successfully converted to OSV to aggregated staging" - find "${WORK_DIR}/nvd2osv/${YEAR}" -type f -name \*.json \ - -exec cp '{}' "${WORK_DIR}/nvd2osv/gcs_stage/" \; -done - -# Copy (and remove any missing) results to GCS bucket, with some sanity -# checking. -objs_present=$(gcloud storage ls "${OSV_OUTPUT_GCS_PATH}" | wc -l) -objs_deleted=$(gcloud storage rsync --checksums-only --dry-run --delete-unmatched-destination-objects "${WORK_DIR}/nvd2osv/gcs_stage" "${OSV_OUTPUT_GCS_PATH}" 2>&1 | grep "Would remove" | wc -l) - -threshold=$(echo "scale=2; ${objs_present} * (${SAFETY_THRESHOLD_PCT:-2} / 100)" | bc) - -# Bash can't deal with floats -if (( $(echo "${objs_deleted} > ${threshold}" | bc -l) )); then - echo "Warning. Unexpectedly high (${objs_deleted}) number of CVE records would be deleted!" >> /dev/stderr - gcloud storage rsync --checksums-only --dry-run --delete-unmatched-destination-objects "${WORK_DIR}/nvd2osv/gcs_stage" "${OSV_OUTPUT_GCS_PATH}" 2>&1 | grep "Would remove" >> /dev/stderr - # TODO: add back in once nvd-mirror issue fixed: exit 1 -fi - -echo "Copying NVD CVE records successfully converted to GCS bucket" -gcloud storage rsync --quiet --checksums-only "${WORK_DIR}/nvd2osv/gcs_stage" "${OSV_OUTPUT_GCS_PATH}" +echo "Converting NVD CVE records to OSV" +/usr/local/bin/nvd-cve-osv \ + --cpe-repos "${WORK_DIR}/cpe_product_to_repo.json" \ + --nvd-json-dir "${WORK_DIR}/nvd" \ + --start-year="${FIRST_INSCOPE_YEAR}" \ + --out-dir "${WORK_DIR}/nvd2osv" \ + --out-format OSV \ + --workers "${NUM_WORKERS}" \ + --gcs-workers "${GCS_WORKERS}" \ + --upload-to-gcs=true \ + --output-bucket="${OSV_OUTPUT_GCS_BUCKET}" \ + --gcs-prefix="${OSV_OUTPUT_GCS_PREFIX}" echo "Conversion run complete" diff --git a/vulnfeeds/cmd/converters/debian/main.go b/vulnfeeds/cmd/converters/debian/main.go index 804039d7366..33b21bfc883 100644 --- a/vulnfeeds/cmd/converters/debian/main.go +++ b/vulnfeeds/cmd/converters/debian/main.go @@ -14,9 +14,9 @@ import ( "strconv" "strings" + "github.com/google/osv/vulnfeeds/conversion/writer" "github.com/google/osv/vulnfeeds/faulttolerant" "github.com/google/osv/vulnfeeds/models" - "github.com/google/osv/vulnfeeds/upload" "github.com/google/osv/vulnfeeds/utility/logger" "github.com/google/osv/vulnfeeds/vulns" "github.com/ossf/osv-schema/bindings/go/osvschema" @@ -79,7 +79,7 @@ func main() { } ctx := context.Background() - upload.Upload(ctx, "Debian CVEs", *uploadToGCS, *outputBucketName, "", *numWorkers, *debianOutputPath, vulnerabilities, *syncDeletions) + writer.UploadVulnsToGCS(ctx, "Debian CVEs", *uploadToGCS, *outputBucketName, "", *numWorkers, *debianOutputPath, vulnerabilities, *syncDeletions) logger.Info("Debian CVE conversion succeeded.") } diff --git a/vulnfeeds/cmd/converters/dsa-dla-dtsa/main.go b/vulnfeeds/cmd/converters/dsa-dla-dtsa/main.go index 48902e19f15..34f9907248e 100644 --- a/vulnfeeds/cmd/converters/dsa-dla-dtsa/main.go +++ b/vulnfeeds/cmd/converters/dsa-dla-dtsa/main.go @@ -20,7 +20,7 @@ import ( "time" htmltomarkdown "github.com/JohannesKaufmann/html-to-markdown/v2" - "github.com/google/osv/vulnfeeds/upload" + "github.com/google/osv/vulnfeeds/conversion/writer" "github.com/google/osv/vulnfeeds/utility/logger" "github.com/ossf/osv-schema/bindings/go/osvschema" "golang.org/x/text/encoding/charmap" @@ -572,7 +572,7 @@ func run(webwmlRepo, securityTrackerRepo, outputDir, outputBucket string, upload if uploadToGCS { logger.Info("Uploading to GCS", "bucket", outputBucket) ctx := context.Background() - upload.Upload(ctx, "debian-osv", uploadToGCS, outputBucket, "", numWorkers, outputDir, allVulnerabilities, doDeletions) + writer.UploadVulnsToGCS(ctx, "debian-osv", uploadToGCS, outputBucket, "", numWorkers, outputDir, allVulnerabilities, doDeletions) } else { logger.Info("Skipping GCS upload") } diff --git a/vulnfeeds/conversion/common.go b/vulnfeeds/conversion/common.go index 340dbbb9825..6c5970fd86e 100644 --- a/vulnfeeds/conversion/common.go +++ b/vulnfeeds/conversion/common.go @@ -85,6 +85,11 @@ func ConductAnalysis(year string, dir string) { // get the current time in minutes currentTime := time.Now().Format("2006-01-02T15:04") outcomesCSV := "nvd-conversion-outcomes-" + year + "-" + currentTime + ".csv" + + if err := os.MkdirAll(dir, 0755); err != nil { + logger.Fatal("Failed to create output directory for analysis CSV file", slog.Any("err", err)) + } + csvFile, err := os.Create(filepath.Join(dir, outcomesCSV)) if err != nil { logger.Fatal("Failed to create analysis CSV file", slog.Any("err", err)) @@ -133,49 +138,6 @@ func ConductAnalysis(year string, dir string) { } } -// CreateMetricsFile creates the initial file for the metrics record. -func CreateMetricsFile(id models.CVEID, vulnDir string) (*os.File, error) { - metricsFile := filepath.Join(vulnDir, string(id)+".metrics"+models.Extension) - f, err := os.Create(metricsFile) - if err != nil { - logger.Info("Failed to open for writing "+metricsFile, slog.String("cve", string(id)), slog.String("path", metricsFile), slog.Any("err", err)) - return nil, err - } - - return f, nil -} - -// CreateOSVFile creates the initial file for the OSV record. -func CreateOSVFile(id models.CVEID, vulnDir string) (*os.File, error) { - outputFile := filepath.Join(vulnDir, string(id)+models.Extension) - - f, err := os.Create(outputFile) - if err != nil { - logger.Info("Failed to open for writing "+outputFile, slog.String("cve", string(id)), slog.String("path", outputFile), slog.Any("err", err)) - return nil, err - } - - return f, err -} - -func WriteMetricsFile(metrics *models.ConversionMetrics, metricsFile *os.File) error { - marshalledMetrics, err := json.MarshalIndent(&metrics, "", " ") - if err != nil { - logger.Info("Failed to marshal", slog.Any("err", err)) - return err - } - - _, err = metricsFile.Write(marshalledMetrics) - if err != nil { - logger.Warn("Failed to write", slog.String("path", metricsFile.Name()), slog.Any("err", err)) - return fmt.Errorf("failed to write %s: %w", metricsFile.Name(), err) - } - - metricsFile.Close() - - return nil -} - // GitVersionsToCommits examines repos and tries to convert versions to commits by treating them as Git tags. // Returns the resolved ranges, unresolved ranges, and successful repos involved. func GitVersionsToCommits(versionRanges []models.RangeWithMetadata, repos []string, metrics *models.ConversionMetrics, cache *git.RepoTagsCache) ([]models.RangeWithMetadata, []models.RangeWithMetadata, []string) { diff --git a/vulnfeeds/conversion/nvd/converter.go b/vulnfeeds/conversion/nvd/converter.go index 79b98304d73..4041ef227a7 100644 --- a/vulnfeeds/conversion/nvd/converter.go +++ b/vulnfeeds/conversion/nvd/converter.go @@ -2,6 +2,7 @@ package nvd import ( + "cmp" "encoding/json" "errors" "log/slog" @@ -12,33 +13,31 @@ import ( "slices" c "github.com/google/osv/vulnfeeds/conversion" + "github.com/google/osv/vulnfeeds/conversion/writer" "github.com/google/osv/vulnfeeds/git" "github.com/google/osv/vulnfeeds/models" "github.com/google/osv/vulnfeeds/utility" "github.com/google/osv/vulnfeeds/utility/logger" "github.com/google/osv/vulnfeeds/vulns" + "github.com/ossf/osv-schema/bindings/go/osvschema" ) var ErrNoRanges = errors.New("no ranges") var ErrUnresolvedFix = errors.New("fixes not resolved to commits") -// CVEToOSV Takes an NVD CVE record and outputs an OSV file in the specified directory. -func CVEToOSV(cve models.NVDCVE, repos []string, cache *git.RepoTagsCache, directory string, metrics *models.ConversionMetrics, rejectFailed bool, outputMetrics bool) models.ConversionOutcome { +// CVEToOSV Takes an NVD CVE record and returns an OSV Vulnerability object, ConversionMetrics, and the outcome. +func CVEToOSV(cve models.NVDCVE, repos []string, cache *git.RepoTagsCache, metrics *models.ConversionMetrics) (*vulns.Vulnerability, *models.ConversionMetrics, models.ConversionOutcome) { CPEs := c.CPEs(cve) metrics.CPEs = CPEs refs := c.DeduplicateRefs(cve.References) // The vendor name and product name are used to construct the output `vulnDir` below, so need to be set to *something* to keep the output tidy. - maybeVendorName := "ENOCPE" - maybeProductName := "ENOCPE" if len(CPEs) > 0 { - CPE, err := c.ParseCPE(CPEs[0]) // For naming the subdirectory used for output. - maybeVendorName = CPE.Vendor - maybeProductName = CPE.Product + _, err := c.ParseCPE(CPEs[0]) // For naming the subdirectory used for output. if err != nil { metrics.AddNote("Can't generate an OSV record without valid CPE data") - return models.ConversionUnknown + return nil, metrics, models.ConversionUnknown } } @@ -58,9 +57,7 @@ func CVEToOSV(cve models.NVDCVE, repos []string, cache *git.RepoTagsCache, direc // If there are no repos, there are no commits from the refs either if len(cpeRanges) == 0 && len(repos) == 0 { metrics.SetOutcome(models.NoRepos) - outputFiles(v, directory, maybeVendorName, maybeProductName, metrics, rejectFailed, outputMetrics) - - return models.NoRepos + return v, metrics, models.NoRepos } successfulRepos := make(map[string]bool) @@ -80,15 +77,13 @@ func CVEToOSV(cve models.NVDCVE, repos []string, cache *git.RepoTagsCache, direc } // Exit early - outputFiles(v, directory, maybeVendorName, maybeProductName, metrics, rejectFailed, outputMetrics) - - return models.NoRepos + return v, metrics, models.NoRepos } // If we have ranges, try to resolve them r, un, sR := c.ProcessRanges(cpeRanges, repos, metrics, cache, models.VersionSourceCPE) if metrics.Outcome == models.Error { - return models.Error + return nil, metrics, models.Error } resolvedRanges = append(resolvedRanges, r...) unresolvedRanges = append(unresolvedRanges, un...) @@ -118,7 +113,7 @@ func CVEToOSV(cve models.NVDCVE, repos []string, cache *git.RepoTagsCache, direc } r, un, sR := c.ProcessRanges(textRanges, repos, metrics, cache, models.VersionSourceDescription) if metrics.Outcome == models.Error { - return models.Error + return nil, metrics, models.Error } resolvedRanges = append(resolvedRanges, r...) unresolvedRanges = append(unresolvedRanges, un...) @@ -128,7 +123,7 @@ func CVEToOSV(cve models.NVDCVE, repos []string, cache *git.RepoTagsCache, direc } if len(resolvedRanges) == 0 && len(commits) == 0 { - metrics.AddNote("No ranges detected for %q", maybeProductName) + metrics.AddNote("No ranges detected") metrics.SetOutcome(models.NoRanges) } @@ -136,8 +131,25 @@ func CVEToOSV(cve models.NVDCVE, repos []string, cache *git.RepoTagsCache, direc keys := slices.Collect(maps.Keys(successfulRepos)) groupedRanges := c.GroupRanges(resolvedRanges) affected := c.MergeRangesAndCreateAffected(groupedRanges, commits, keys, metrics) + if metrics.Outcome == models.Error { + return nil, metrics, metrics.Outcome + } + v.Affected = append(v.Affected, affected...) + // sort affected by repository name alphabetically to ensure deterministic output and caching hashes + slices.SortFunc(v.Affected, func(a, b *osvschema.Affected) int { + var repoA, repoB string + if len(a.GetRanges()) > 0 { + repoA = a.GetRanges()[0].GetRepo() + } + if len(b.GetRanges()) > 0 { + repoB = b.GetRanges()[0].GetRepo() + } + + return cmp.Compare(repoA, repoB) + }) + unresolvedRangesList := c.CreateUnresolvedRanges(unresolvedRanges) if unresolvedRangesList != nil { if err := c.AddFieldToDatabaseSpecific(v.DatabaseSpecific, "unresolved_ranges", unresolvedRangesList); err != nil { @@ -145,9 +157,7 @@ func CVEToOSV(cve models.NVDCVE, repos []string, cache *git.RepoTagsCache, direc } } - outputFiles(v, directory, maybeVendorName, maybeProductName, metrics, rejectFailed, outputMetrics) - - return metrics.Outcome + return v, metrics, metrics.Outcome } // CVEToPackageInfo takes an NVD CVE record and outputs a PackageInfo struct in a file in the specified directory. @@ -215,6 +225,10 @@ func CVEToPackageInfo(cve models.NVDCVE, repos []string, cache *git.RepoTagsCach slices.SortStableFunc(versions.AffectedCommits, models.AffectedCommitCompare) + if metrics.Outcome == models.Error { + return metrics.Outcome + } + var pkgInfos []vulns.PackageInfo pi := vulns.PackageInfo{VersionInfo: versions} pkgInfos = append(pkgInfos, pi) // combine-to-osv expects a serialised *array* of PackageInfo @@ -242,11 +256,11 @@ func CVEToPackageInfo(cve models.NVDCVE, repos []string, cache *git.RepoTagsCach logger.Info("Generated PackageInfo record", slog.String("cve", string(cve.ID)), slog.String("product", maybeProductName)) - metricsFile, err := c.CreateMetricsFile(cve.ID, vulnDir) + metricsFile, err := writer.CreateMetricsFile(cve.ID, vulnDir) if err != nil { logger.Warn("Failed to create metrics file", slog.String("path", metricsFile.Name()), slog.Any("err", err)) } - err = c.WriteMetricsFile(metrics, metricsFile) + err = writer.WriteMetricsFile(metrics, metricsFile) if err != nil { logger.Warn("Failed to write metrics file", slog.String("path", metricsFile.Name()), slog.Any("err", err)) } @@ -331,45 +345,3 @@ func FindRepos(cve models.NVDCVE, vpRepoCache *c.VPRepoCache, repoTagsCache *git return reposForCVE } - -// outputFiles writes the OSV vulnerability record and conversion metrics to files in the specified directory. -// It creates the necessary subdirectories based on the vendor and product names and handles whether or not -// the files should be written based on the rejectFailed and outputMetrics flags. -// -// Arguments: -// - v: The OSV Vulnerability object to be written to a file. -// - dir: The base directory where the output files should be created. -// - vendor: The vendor name used to create the subdirectory. -// - product: The product name used to create the subdirectory. -// - metrics: A pointer to ConversionMetrics to be written to a metrics file. -// - rejectFailed: A boolean indicating whether to skip writing the OSV file if the conversion was not successful. -// - outputMetrics: A boolean indicating whether to write the metrics file. -func outputFiles(v *vulns.Vulnerability, dir string, vendor string, product string, metrics *models.ConversionMetrics, rejectFailed bool, outputMetrics bool) { - cveID := v.Id - vulnDir := filepath.Join(dir, vendor, product) - - if err := os.MkdirAll(vulnDir, 0755); err != nil { - logger.Info("Failed to create directory "+vulnDir, slog.String("cve", cveID), slog.String("path", vulnDir), slog.Any("err", err)) - } - - if metrics.Outcome != models.Error && (!rejectFailed || metrics.Outcome == models.Successful) { - osvFile, errCVE := c.CreateOSVFile(models.CVEID(cveID), vulnDir) - if errCVE != nil { - logger.Fatal("File failed to be created for CVE", slog.String("cve", cveID)) - } - if err := v.ToJSON(osvFile); err != nil { - logger.Error("Failed to write", slog.Any("err", err)) - } - osvFile.Close() - } - if outputMetrics { - metricsFile, errMetrics := c.CreateMetricsFile(models.CVEID(cveID), vulnDir) - if errMetrics != nil { - logger.Fatal("File failed to be created for CVE", slog.String("cve", cveID)) - } - if err := c.WriteMetricsFile(metrics, metricsFile); err != nil { - logger.Error("Failed to write metrics", slog.Any("err", err)) - } - metricsFile.Close() - } -} diff --git a/vulnfeeds/conversion/nvd/converter_test.go b/vulnfeeds/conversion/nvd/converter_test.go index a9ab2559fae..1158d24fb58 100644 --- a/vulnfeeds/conversion/nvd/converter_test.go +++ b/vulnfeeds/conversion/nvd/converter_test.go @@ -1,22 +1,17 @@ package nvd import ( - "encoding/json" "net/http" "os" "path/filepath" - "sort" "testing" - "github.com/gkampitakis/go-snaps/snaps" "github.com/go-git/go-git/v5/plumbing/transport/client" githttp "github.com/go-git/go-git/v5/plumbing/transport/http" "github.com/google/go-cmp/cmp" - "github.com/google/osv/vulnfeeds/conversion" "github.com/google/osv/vulnfeeds/git" "github.com/google/osv/vulnfeeds/models" "github.com/ossf/osv-schema/bindings/go/osvschema" - "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/testing/protocmp" ) @@ -73,7 +68,7 @@ func TestCVEToOSV_429(t *testing.T) { cache := &git.RepoTagsCache{} outDir := t.TempDir() - outcome := CVEToOSV(cve, []string{"https://github.com/foo/bar"}, cache, outDir, metrics, false, false) + _, _, outcome := CVEToOSV(cve, []string{"https://github.com/foo/bar"}, cache, metrics) // It should fail because of the 429 error causing unresolved fixes if outcome != models.Error { @@ -97,67 +92,6 @@ func TestCVEToOSV_429(t *testing.T) { } } -func TestNVDSnapshot(t *testing.T) { - testPath := "test.json" - //TODO: individually test records - file, err := os.Open(testPath) - - if err != nil { - t.Fatalf("Failed to open test data from %s: %v", testPath, err) - } - defer file.Close() - - var nvd models.CVEAPIJSON20Schema - err = json.NewDecoder(file).Decode(&nvd) - if err != nil { - t.Fatalf("Failed to decode %s: %v", testPath, err) - } - - cpeData := "cpe_testdata.json" - vpcache := conversion.NewVPRepoCache() - err = conversion.LoadCPEDictionary(vpcache, cpeData) - if err != nil { - t.Fatalf("Failed to decode %s: %v", cpeData, err) - } - - outDir := t.TempDir() - metrics := &models.ConversionMetrics{} - cache := &git.RepoTagsCache{} - - for _, vuln := range nvd.Vulnerabilities { - CVEToOSV(vuln.CVE, []string{}, cache, outDir, metrics, false, false) - } - - var fileContents []string - err = filepath.Walk(outDir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.IsDir() && filepath.Ext(path) == ".json" { - content, err := os.ReadFile(path) - if err != nil { - return err - } - fileContents = append(fileContents, string(content)) - } - - return nil - }) - if err != nil { - t.Fatalf("Failed to walk outDir: %v", err) - } - - // To make snapshot deterministic - sort.Strings(fileContents) - - keys := make([]any, 0, len(fileContents)) - for _, c := range fileContents { - keys = append(keys, c) - } - - snaps.MatchSnapshot(t, keys...) -} - func TestCVEToOSV_ReferencesDeterminism(t *testing.T) { cve := models.NVDCVE{ ID: "CVE-2025-12345", @@ -172,37 +106,13 @@ func TestCVEToOSV_ReferencesDeterminism(t *testing.T) { Metrics: &models.CVEItemMetrics{}, } metrics := &models.ConversionMetrics{} - outDir := t.TempDir() var firstResult []*osvschema.Reference for i := range 10 { cache := &git.RepoTagsCache{} - CVEToOSV(cve, nil, cache, outDir, metrics, false, false) - - var b []byte - err := filepath.Walk(outDir, func(path string, info os.FileInfo, _ error) error { - if !info.IsDir() && filepath.Ext(path) == ".json" { - var fileErr error - b, fileErr = os.ReadFile(path) - if fileErr != nil { - return fileErr - } - } - - return nil - }) - if err != nil { - t.Fatalf("Failed to walk or read OSV file: %v", err) - } - - if len(b) == 0 { - t.Fatalf("Failed to find OSV file") - } - - var vuln osvschema.Vulnerability - err = protojson.Unmarshal(b, &vuln) - if err != nil { - t.Fatalf("Failed to unmarshal OSV: %v", err) + vuln, _, _ := CVEToOSV(cve, nil, cache, metrics) + if vuln == nil { + t.Fatalf("Iteration %d produced nil vulnerability", i) } if i == 0 { diff --git a/vulnfeeds/conversion/writer/writer.go b/vulnfeeds/conversion/writer/writer.go new file mode 100644 index 00000000000..f30fed98e7f --- /dev/null +++ b/vulnfeeds/conversion/writer/writer.go @@ -0,0 +1,443 @@ +// Package writer handles allocating workers to intelligently uploading OSV records to a bucket +package writer + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "os" + "path" + "sync" + "sync/atomic" + "time" + + "cloud.google.com/go/storage" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/google/osv/vulnfeeds/gcs-tools" + "github.com/google/osv/vulnfeeds/models" + "github.com/google/osv/vulnfeeds/utility/logger" + "github.com/google/osv/vulnfeeds/vulns" + "github.com/ossf/osv-schema/bindings/go/osvschema" +) + +const ( + // hashMetadataKey is the key for the sha256 hash in the GCS object metadata. + hashMetadataKey = "sha256-hash" + overrideFolder = "osv-output-overrides" // location of overrides within bucket +) + +// ErrUploadSkipped indicates that an upload was intentionally skipped because +// the vulnerability payload is unchanged. +var ErrUploadSkipped = errors.New("upload skipped") + +// writeToDisk writes the vulnerability to a local file. +// It returns an error if the file could not be written. +func writeToDisk(v *osvschema.Vulnerability, preModifiedBuf []byte, outputPrefix string) error { + filename := v.GetId() + ".json" + filePath := path.Join(outputPrefix, filename) + err := os.WriteFile(filePath, preModifiedBuf, 0600) + if err != nil { + return fmt.Errorf("failed to write OSV file at %s: %w", filePath, err) + } + + return nil +} + +// prepareVulnUpload marshals a vulnerability record, calculates its SHA256 hash (excluding the Modified time), updates its Modified time, and returns the hash and the updated payload. +func prepareVulnUpload(vuln *osvschema.Vulnerability) (hexHash string, postModifiedBuf []byte, err error) { + if vuln == nil || vuln.GetId() == "" { + return "", nil, errors.New("invalid vulnerability provided") + } + + var buf bytes.Buffer + v := vulns.Vulnerability{Vulnerability: vuln} + if err := v.ToJSON(&buf); err != nil { + return "", nil, fmt.Errorf("failed to marshal vulnerability %s: %w", vuln.GetId(), err) + } + preModifiedBuf := buf.Bytes() + hash := sha256.Sum256(preModifiedBuf) + hexHash = hex.EncodeToString(hash[:]) + + vuln.Modified = timestamppb.New(time.Now().UTC()) + var postBuf bytes.Buffer + vPost := vulns.Vulnerability{Vulnerability: vuln} + if err := vPost.ToJSON(&postBuf); err != nil { + return "", nil, fmt.Errorf("failed to marshal vulnerability with modified time for %s: %w", vuln.GetId(), err) + } + + return hexHash, postBuf.Bytes(), nil +} + +// uploadIfChanged uploads the vulnerability to a GCS bucket if it has changed. +// It returns an error if the upload failed, or ErrUploadSkipped if the upload +// was intentionally avoided (e.g. because the GCS object has a matching hash). +func uploadIfChanged(ctx context.Context, v *osvschema.Vulnerability, hexHash string, postModifiedBuf []byte, outBkt *storage.BucketHandle, outputPrefix string) error { + vulnID := v.GetId() + filename := vulnID + ".json" + + objName := path.Join(outputPrefix, filename) + obj := outBkt.Object(objName) + + // Check if object exists and if hash matches. + attrs, err := obj.Attrs(ctx) + if err == nil { + // Object exists, check hash. + if attrs.Metadata != nil && attrs.Metadata[hashMetadataKey] == hexHash { + logger.Info("Skipping GCS upload, hash matches", slog.String("id", vulnID), slog.String("object", objName)) + return ErrUploadSkipped + } + } else if !errors.Is(err, storage.ErrObjectNotExist) { + return fmt.Errorf("failed to get object attributes for %s: %w", vulnID, err) + } + + // Object does not exist or hash differs, upload. + logger.Info("Uploading vulnerability record to GCS", slog.String("id", vulnID), slog.String("object", objName)) + + wc := obj.NewWriter(ctx) + wc.Metadata = map[string]string{ + hashMetadataKey: hexHash, + } + wc.ContentType = "application/json" + + if _, err := wc.Write(postModifiedBuf); err != nil { + // Try to close writer even if write failed. + if closeErr := wc.Close(); closeErr != nil { + logger.Error("failed to close GCS writer after write error", slog.String("id", vulnID), slog.Any("err", closeErr)) + } + + return fmt.Errorf("failed to write to GCS object for %s: %w", vulnID, err) + } + + if err := wc.Close(); err != nil { + return fmt.Errorf("failed to close GCS writer for %s: %w", vulnID, err) + } + + return nil +} + +// handleOverride checks for and applies a vulnerability override if it exists. +// It returns the vulnerability to process, a pre-marshalled buffer if an override was used, +// and an error if a critical failure occurred. +func handleOverride(ctx context.Context, v *osvschema.Vulnerability, overridesBkt *storage.BucketHandle) (*osvschema.Vulnerability, []byte, error) { + filename := v.GetId() + ".json" + overrideObj := overridesBkt.Object(path.Join(overrideFolder, filename)) + if _, err := overrideObj.Attrs(ctx); err != nil { + if errors.Is(err, storage.ErrObjectNotExist) { + // No override found. + return v, nil, nil + } + // For any other error, we can't know if an override exists, so we return an error. + logger.Error("failed to check for override object", slog.String("id", v.GetId()), slog.Any("err", err)) + + return nil, nil, err + } + + // Override exists, read it and replace original vulnerability. + logger.Info("Using override", slog.String("id", v.GetId())) + rc, err := overrideObj.NewReader(ctx) + if err != nil { + logger.Error("failed to get reader for override object", slog.String("id", v.GetId()), slog.Any("err", err)) + return nil, nil, err + } + defer rc.Close() + + overrideBuf, err := io.ReadAll(rc) + if err != nil { + logger.Error("failed to read override object", slog.String("id", v.GetId()), slog.Any("err", err)) + return nil, nil, err + } + + var overrideV osvschema.Vulnerability + if err := protojson.Unmarshal(overrideBuf, &overrideV); err != nil { + logger.Error("failed to unmarshal override object", slog.String("id", v.GetId()), slog.Any("err", err)) + return nil, nil, err + } + + return &overrideV, overrideBuf, nil +} + +// VulnWorker is a generic worker that processes OSV vulnerabilities from a channel. +// It can upload them to a GCS bucket or write them to disk. +// It supports checking for overrides in a separate GCS bucket location if overridesBkt is not nil. +// For GCS uploads, it calculates a hash of the vulnerability (excluding the modified time) and compares it +// with the existing object's hash. The vulnerability is uploaded only if the hashes differ, with the +// modified time updated. This prevents updating the modified time for vulnerabilities with no content changes. +func VulnWorker(ctx context.Context, vulnChan <-chan *osvschema.Vulnerability, outBkt, overridesBkt *storage.BucketHandle, gcsHelper *gcs.Helper, outputPrefix string, counter *atomic.Uint64) { + for v := range vulnChan { + vulnID := v.GetId() + if len(v.GetAffected()) == 0 { + logger.Warn("Skipping OSV record as no affected versions found.", slog.String("id", vulnID)) + continue + } + vulnToProcess := v + var preModifiedBuf []byte + var err error + + if overridesBkt != nil { + vulnToProcess, preModifiedBuf, err = handleOverride(ctx, v, overridesBkt) + if err != nil { + logger.Error("Failed to use override", slog.Any("error", err)) + continue + } + } + + var writeErr error + if outBkt == nil && gcsHelper == nil { + // Write to local disk + if preModifiedBuf == nil { + // Marshal before setting modified time to generate hash. + vuln := vulns.Vulnerability{Vulnerability: v} + var buf bytes.Buffer + if err := vuln.ToJSON(&buf); err != nil { + logger.Error("failed to marshal vulnerability", slog.String("id", vulnID), slog.Any("err", err)) + continue + } + preModifiedBuf = buf.Bytes() + } + writeErr = writeToDisk(vulnToProcess, preModifiedBuf, outputPrefix) + } else if gcsHelper != nil { + // Upload to GCS asynchronously using pool + writeErr = UploadVulnIfChangedAsync(gcsHelper, outputPrefix, vulnToProcess) + } else { + // Upload to GCS synchronously + hexHash, postModifiedBuf, err := prepareVulnUpload(vulnToProcess) + if err != nil { + writeErr = err + } else { + writeErr = uploadIfChanged(ctx, vulnToProcess, hexHash, postModifiedBuf, outBkt, outputPrefix) + } + } + + if writeErr == nil { + logger.Info("Uploaded successfully", slog.String("id", vulnID)) + if counter != nil { + counter.Add(1) + } + } else if errors.Is(writeErr, ErrUploadSkipped) { + logger.Info("Skipping upload, hash matches", slog.String("id", vulnID)) + } else { + logger.Error("Failed to upload/write", slog.String("id", vulnID), slog.Any("err", writeErr)) + } + } +} + +// UploadVulnsToGCS delegates workers to upload vulnerabilities to the buckets. +func UploadVulnsToGCS( + ctx context.Context, + jobName string, + uploadToGCS bool, + outputBucketName string, + overridesBucketName string, + numWorkers int, + osvOutputPath string, + vulnerabilities []*osvschema.Vulnerability, + doDeletions bool, +) { + var outBkt, overridesBkt *storage.BucketHandle + var gcsHelper *gcs.Helper + if uploadToGCS { + storageClient, err := storage.NewClient(ctx) + if err != nil { + logger.Fatal("Failed to create storage client", slog.Any("err", err)) + } + defer storageClient.Close() + + outBkt = storageClient.Bucket(outputBucketName) + if overridesBucketName != "" { + overridesBkt = storageClient.Bucket(overridesBucketName) + } + + if doDeletions { + HandleDeletion(ctx, outBkt, osvOutputPath, vulnerabilities) + } + + gcsHelper, err = gcs.InitUploadPool(ctx, numWorkers, outputBucketName) + if err != nil { + logger.Fatal("Failed to initialize GCS upload pool", slog.Any("err", err)) + } + defer gcsHelper.CloseAndWait() + } + var wg sync.WaitGroup + var successCount atomic.Uint64 + vulnChan := make(chan *osvschema.Vulnerability, numWorkers) + + for range numWorkers { + wg.Add(1) + go func() { + defer wg.Done() + VulnWorker(ctx, vulnChan, outBkt, overridesBkt, gcsHelper, osvOutputPath, &successCount) + }() + } + + for _, v := range vulnerabilities { + vulnChan <- v + } + + close(vulnChan) + wg.Wait() + if gcsHelper != nil { + gcsHelper.CloseAndWait() + } + logger.Info("Successfully processed "+jobName, slog.Int("count", len(vulnerabilities))) + if outBkt == nil && gcsHelper == nil { + logger.Info("Successfully wrote records to disk", slog.Uint64("count", successCount.Load())) + } +} + +func HandleDeletion(ctx context.Context, outBkt *storage.BucketHandle, osvOutputPath string, vulnerabilities []*osvschema.Vulnerability) { + // Check if any need to be deleted + bucketObjects, err := gcs.ListBucketObjects(ctx, outBkt, osvOutputPath) + if err != nil { + logger.Error("Failed to list bucket objects for deletion check, skipping deletion.", slog.Any("err", err)) + return + } + vulnFilenames := make(map[string]bool) + for _, v := range vulnerabilities { + filename := v.GetId() + ".json" + filePath := path.Join(osvOutputPath, filename) + vulnFilenames[filePath] = true + } + for _, objName := range bucketObjects { + if !vulnFilenames[objName] { + logger.Info("Deleting stale object from bucket", slog.String("name", objName)) + obj := outBkt.Object(objName) + if err := obj.Delete(ctx); err != nil { + logger.Error("Failed to delete object", slog.String("name", objName), slog.Any("err", err)) + } + } + } +} + +// UploadVulnToGCS marshals a single OSV Vulnerability to JSON and unconditionally uploads it to GCS. +func UploadVulnToGCS(ctx context.Context, bkt *storage.BucketHandle, prefix string, vuln *osvschema.Vulnerability) error { + if vuln == nil || vuln.GetId() == "" { + return errors.New("invalid vulnerability provided") + } + + data, err := protojson.MarshalOptions{Indent: " "}.Marshal(vuln) + if err != nil { + return fmt.Errorf("failed to marshal vulnerability %s: %w", vuln.GetId(), err) + } + + objectName := path.Join(prefix, vuln.GetId()+".json") + logger.Info("Uploading vulnerability record to GCS", slog.String("id", vuln.GetId()), slog.String("object", objectName)) + reader := bytes.NewReader(data) + + return gcs.UploadToGCS(ctx, bkt, objectName, reader, "application/json", nil) +} + +// UploadVulnIfChanged marshals a single OSV Vulnerability to JSON and uploads it to GCS if it has changed. +func UploadVulnIfChanged(ctx context.Context, bkt *storage.BucketHandle, prefix string, vuln *osvschema.Vulnerability) error { + hexHash, postModifiedBuf, err := prepareVulnUpload(vuln) + if err != nil { + return err + } + + err = uploadIfChanged(ctx, vuln, hexHash, postModifiedBuf, bkt, prefix) + if errors.Is(err, ErrUploadSkipped) { + return nil + } + + return err +} + +// UploadMetricsToGCS marshals ConversionMetrics to JSON and uploads it to GCS. +func UploadMetricsToGCS(ctx context.Context, bkt *storage.BucketHandle, prefix string, cveID models.CVEID, metrics *models.ConversionMetrics) error { + if metrics == nil || cveID == "" { + return errors.New("invalid metrics or CVE ID provided") + } + + data, err := json.MarshalIndent(metrics, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal metrics for %s: %w", cveID, err) + } + + objectName := path.Join(prefix, string(cveID)+".metrics.json") + logger.Debug("Uploading conversion metrics record to GCS", slog.String("id", string(cveID)), slog.String("object", objectName)) + reader := bytes.NewReader(data) + + return gcs.UploadToGCS(ctx, bkt, objectName, reader, "application/json", nil) +} + +// UploadVulnIfChangedAsync marshals a single OSV Vulnerability to JSON and schedules it for upload via the Helper pool if it has changed. +func UploadVulnIfChangedAsync(gcsHelper *gcs.Helper, prefix string, vuln *osvschema.Vulnerability) error { + hexHash, postModifiedBuf, err := prepareVulnUpload(vuln) + if err != nil { + return err + } + + objectName := path.Join(prefix, vuln.GetId()+".json") + gcsHelper.Upload(objectName, bytes.NewReader(postModifiedBuf), hexHash, "application/json") + + return nil +} + +// UploadMetricsToGCSAsync marshals ConversionMetrics to JSON and schedules it for upload via the Helper pool. +func UploadMetricsToGCSAsync(gcsHelper *gcs.Helper, prefix string, cveID models.CVEID, metrics *models.ConversionMetrics) error { + if metrics == nil || cveID == "" { + return errors.New("invalid metrics or CVE ID provided") + } + + data, err := json.MarshalIndent(metrics, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal metrics for %s: %w", cveID, err) + } + + objectName := path.Join(prefix, string(cveID)+".metrics.json") + reader := bytes.NewReader(data) + + gcsHelper.Upload(objectName, reader, "", "application/json") + + return nil +} + +// CreateMetricsFile creates the initial file for the metrics record. +func CreateMetricsFile(id models.CVEID, vulnDir string) (*os.File, error) { + metricsFile := path.Join(vulnDir, string(id)+".metrics"+models.Extension) + f, err := os.Create(metricsFile) + if err != nil { + logger.Info("Failed to open for writing "+metricsFile, slog.String("cve", string(id)), slog.String("path", metricsFile), slog.Any("err", err)) + return nil, err + } + + return f, nil +} + +// CreateOSVFile creates the initial file for the OSV record. +func CreateOSVFile(id models.CVEID, vulnDir string) (*os.File, error) { + outputFile := path.Join(vulnDir, string(id)+models.Extension) + + f, err := os.Create(outputFile) + if err != nil { + logger.Info("Failed to open for writing "+outputFile, slog.String("cve", string(id)), slog.String("path", outputFile), slog.Any("err", err)) + return nil, err + } + + return f, err +} + +func WriteMetricsFile(metrics *models.ConversionMetrics, metricsFile *os.File) error { + marshalledMetrics, err := json.MarshalIndent(&metrics, "", " ") + if err != nil { + logger.Info("Failed to marshal", slog.Any("err", err)) + return err + } + + _, err = metricsFile.Write(marshalledMetrics) + if err != nil { + logger.Warn("Failed to write", slog.String("path", metricsFile.Name()), slog.Any("err", err)) + return fmt.Errorf("failed to write %s: %w", metricsFile.Name(), err) + } + + metricsFile.Close() + + return nil +} diff --git a/vulnfeeds/upload/cveworker_test.go b/vulnfeeds/conversion/writer/writer_test.go similarity index 76% rename from vulnfeeds/upload/cveworker_test.go rename to vulnfeeds/conversion/writer/writer_test.go index adaa84132f3..72fb09c4ca1 100644 --- a/vulnfeeds/upload/cveworker_test.go +++ b/vulnfeeds/conversion/writer/writer_test.go @@ -1,4 +1,4 @@ -package upload +package writer import ( "bytes" @@ -15,6 +15,8 @@ import ( "time" "github.com/fsouza/fake-gcs-server/fakestorage" + gcs "github.com/google/osv/vulnfeeds/gcs-tools" + "github.com/google/osv/vulnfeeds/models" "github.com/ossf/osv-schema/bindings/go/osvschema" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/timestamppb" @@ -59,7 +61,9 @@ func TestUploadToGCS(t *testing.T) { preModifiedBuf := []byte(`{"id":"CVE-2023-1234"}`) t.Run("Upload new object", func(t *testing.T) { - err := uploadToGCS(ctx, v, preModifiedBuf, bkt, "") + hash := sha256.Sum256(preModifiedBuf) + hexHash := hex.EncodeToString(hash[:]) + err := uploadIfChanged(ctx, v, hexHash, preModifiedBuf, bkt, "") if err != nil { t.Errorf("Expected uploadToGCS to return nil for new object, got %v", err) } @@ -70,9 +74,6 @@ func TestUploadToGCS(t *testing.T) { t.Fatalf("Failed to get object attrs: %v", err) } - hash := sha256.Sum256(preModifiedBuf) - hexHash := hex.EncodeToString(hash[:]) - if attrs.Metadata[hashMetadataKey] != hexHash { t.Errorf("Expected hash %s, got %s", hexHash, attrs.Metadata[hashMetadataKey]) } @@ -81,7 +82,9 @@ func TestUploadToGCS(t *testing.T) { t.Run("Skip upload if hash matches", func(t *testing.T) { // Modify the vulnerability to simulate a change in modified time but not content v.Modified = timestamppb.New(time.Now().Add(1 * time.Hour)) - err := uploadToGCS(ctx, v, preModifiedBuf, bkt, "") + hash := sha256.Sum256(preModifiedBuf) + hexHash := hex.EncodeToString(hash[:]) + err := uploadIfChanged(ctx, v, hexHash, preModifiedBuf, bkt, "") if !errors.Is(err, ErrUploadSkipped) { t.Errorf("Expected uploadToGCS to return ErrUploadSkipped when hash matches, got %v", err) } @@ -101,7 +104,9 @@ func TestUploadToGCS(t *testing.T) { t.Run("Upload if hash differs", func(t *testing.T) { preModifiedBuf2 := []byte(`{"id":"CVE-2023-1234", "summary": "updated"}`) - err := uploadToGCS(ctx, v, preModifiedBuf2, bkt, "") + hash2 := sha256.Sum256(preModifiedBuf2) + hexHash2 := hex.EncodeToString(hash2[:]) + err := uploadIfChanged(ctx, v, hexHash2, preModifiedBuf2, bkt, "") if err != nil { t.Errorf("Expected uploadToGCS to return nil when hash differs, got %v", err) } @@ -112,9 +117,6 @@ func TestUploadToGCS(t *testing.T) { t.Fatalf("Failed to get object attrs: %v", err) } - hash2 := sha256.Sum256(preModifiedBuf2) - hexHash2 := hex.EncodeToString(hash2[:]) - if attrs3.Metadata[hashMetadataKey] != hexHash2 { t.Errorf("Expected hash %s, got %s", hexHash2, attrs3.Metadata[hashMetadataKey]) } @@ -244,7 +246,7 @@ func TestWorker(t *testing.T) { w.Close() var counter atomic.Uint64 - Worker(ctx, vulnChan, outBkt, overridesBkt, "", &counter) + VulnWorker(ctx, vulnChan, outBkt, overridesBkt, nil, "", &counter) if counter.Load() != 2 { t.Errorf("Expected counter to be 2, got %d", counter.Load()) @@ -299,7 +301,7 @@ func TestUpload(t *testing.T) { }, } - Upload(ctx, "test-job", true, outBucketName, "", 1, "", vulnerabilities, false) + UploadVulnsToGCS(ctx, "test-job", true, outBucketName, "", 1, "", vulnerabilities, false) client := server.Client() bkt := client.Bucket(outBucketName) @@ -337,7 +339,7 @@ func TestHandleDeletion(t *testing.T) { {Id: "CVE-2023-3333"}, } - handleDeletion(ctx, bkt, "", vulnerabilities) + HandleDeletion(ctx, bkt, "", vulnerabilities) // CVE-2023-1111.json should still exist if _, err := bkt.Object("CVE-2023-1111.json").Attrs(ctx); err != nil { @@ -349,3 +351,94 @@ func TestHandleDeletion(t *testing.T) { t.Errorf("Expected CVE-2023-2222.json to be deleted, but it still exists") } } + +func TestUploadVulnIfChangedAsync(t *testing.T) { + server, err := fakestorage.NewServerWithOptions(fakestorage.Options{ + Scheme: "http", + }) + if err != nil { + t.Fatalf("Failed to create fake storage server: %v", err) + } + defer server.Stop() + + t.Setenv("STORAGE_EMULATOR_HOST", server.URL()) + + ctx := context.Background() + bucketName := "test-out-bucket" + server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: bucketName}) + + gcsHelper, err := gcs.InitUploadPool(ctx, 2, bucketName) + if err != nil { + t.Fatalf("Failed to init upload pool: %v", err) + } + + v := &osvschema.Vulnerability{ + Id: "CVE-2023-9999", + Affected: []*osvschema.Affected{ + {Package: &osvschema.Package{Name: "test-pkg"}}, + }, + } + + t.Run("Async upload new object", func(t *testing.T) { + err := UploadVulnIfChangedAsync(gcsHelper, "nvd-prefix", v) + if err != nil { + t.Fatalf("Expected UploadVulnIfChangedAsync to succeed, got %v", err) + } + + gcsHelper.CloseAndWait() + + client := server.Client() + bkt := client.Bucket(bucketName) + objName := "nvd-prefix/CVE-2023-9999.json" + obj := bkt.Object(objName) + attrs, err := obj.Attrs(ctx) + if err != nil { + t.Fatalf("Expected object %q to exist on GCS, got error: %v", objName, err) + } + + if attrs.Metadata[hashMetadataKey] == "" { + t.Errorf("Expected hash metadata to be set on GCS object") + } + }) +} + +func TestUploadMetricsToGCSAsync(t *testing.T) { + server, err := fakestorage.NewServerWithOptions(fakestorage.Options{ + Scheme: "http", + }) + if err != nil { + t.Fatalf("Failed to create fake storage server: %v", err) + } + defer server.Stop() + + t.Setenv("STORAGE_EMULATOR_HOST", server.URL()) + + ctx := context.Background() + bucketName := "test-out-bucket" + server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: bucketName}) + + gcsHelper, err := gcs.InitUploadPool(ctx, 2, bucketName) + if err != nil { + t.Fatalf("Failed to init upload pool: %v", err) + } + + metrics := &models.ConversionMetrics{ + CVEID: "CVE-2023-9999", + CNA: "nvd", + } + + err = UploadMetricsToGCSAsync(gcsHelper, "nvd-prefix", "CVE-2023-9999", metrics) + if err != nil { + t.Fatalf("Expected UploadMetricsToGCSAsync to succeed, got %v", err) + } + + gcsHelper.CloseAndWait() + + client := server.Client() + bkt := client.Bucket(bucketName) + objName := "nvd-prefix/CVE-2023-9999.metrics.json" + _, err = bkt.Object(objName).Attrs(ctx) + if err != nil { + t.Fatalf("Expected metrics object %q to exist on GCS, got error: %v", objName, err) + } +} diff --git a/vulnfeeds/gcs-tools/gcs.go b/vulnfeeds/gcs-tools/gcs.go new file mode 100644 index 00000000000..c63905d5b3e --- /dev/null +++ b/vulnfeeds/gcs-tools/gcs.go @@ -0,0 +1,229 @@ +// Package gcs provides utilities for working with Google Cloud Storage. +package gcs + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "strings" + "sync" + + "cloud.google.com/go/storage" + "github.com/google/osv/vulnfeeds/utility/logger" + "golang.org/x/sync/errgroup" + "google.golang.org/api/iterator" +) + +const ( + hashMetadataKey = "sha256-hash" // hashMetadataKey is the key for the sha256 hash in the GCS object metadata. +) + +type Helper struct { + wg sync.WaitGroup + bus chan *uploadMsg + bkt *storage.BucketHandle + client *storage.Client + once sync.Once +} + +type uploadMsg struct { + objectName string + data io.Reader + contentType string + hash string // if hash is empty, always upload +} + +func InitUploadPool(ctx context.Context, workers int, bktName string) (*Helper, error) { + client, err := storage.NewClient(ctx) + if err != nil { + return nil, fmt.Errorf("storage.NewClient: %w", err) + } + + helper := &Helper{ + bus: make(chan *uploadMsg, workers), + bkt: client.Bucket(bktName), + client: client, + } + + for range workers { + helper.wg.Add(1) + go bucketWorker(ctx, helper) + } + + return helper, nil +} + +func bucketWorker(ctx context.Context, gcsHelper *Helper) { + defer gcsHelper.wg.Done() + for msg := range gcsHelper.bus { + func() { + if closer, ok := msg.data.(io.Closer); ok { + defer closer.Close() + } + if msg.hash != "" { + attrs, err := gcsHelper.bkt.Object(msg.objectName).Attrs(ctx) + if err == nil { + if attrs.Metadata != nil && attrs.Metadata[hashMetadataKey] == msg.hash { + logger.Info("Skipping GCS upload, hash matches", slog.String("id", msg.objectName)) + return + } + } else if !errors.Is(err, storage.ErrObjectNotExist) { + logger.Info("Failed to get object attributes", slog.String("object", msg.objectName), slog.String("error", err.Error())) + return + } + } + var metadata map[string]string + if msg.hash != "" { + metadata = map[string]string{hashMetadataKey: msg.hash} + } + if err := UploadToGCS(ctx, gcsHelper.bkt, msg.objectName, msg.data, msg.contentType, metadata); err != nil { + logger.Info("Failed to upload object", slog.String("object", msg.objectName), slog.String("error", err.Error())) + } + }() + } +} + +func (g *Helper) Upload(objectName string, data io.Reader, hash string, contentType string) { + g.bus <- &uploadMsg{ + objectName: objectName, + data: data, + hash: hash, + contentType: contentType, + } +} + +func (g *Helper) CloseAndWait() { + g.once.Do(func() { + close(g.bus) + g.wg.Wait() + if g.client != nil { + g.client.Close() + } + }) +} + +// UploadToGCS uploads data from an io.Reader to a GCS bucket. +func UploadToGCS(ctx context.Context, bkt *storage.BucketHandle, objectName string, data io.Reader, contentType string, metadata map[string]string) error { + obj := bkt.Object(objectName) + wc := obj.NewWriter(ctx) + if contentType != "" { + wc.ContentType = contentType + } + if metadata != nil { + wc.Metadata = metadata + } + + if _, err := io.Copy(wc, data); err != nil { + if closeErr := wc.Close(); closeErr != nil { + return fmt.Errorf("failed to write to GCS object %q: %w (also failed to close writer: %w)", objectName, err, closeErr) + } + + return fmt.Errorf("failed to write to GCS object %q: %w", objectName, err) + } + + if err := wc.Close(); err != nil { + return fmt.Errorf("failed to close GCS writer for object %q: %w", objectName, err) + } + + return nil +} + +// UploadFile uploads a local file to a GCS bucket. +func UploadFile(ctx context.Context, bkt *storage.BucketHandle, objectName string, filePath string) error { + f, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("os.Open: %w", err) + } + defer f.Close() + + return UploadToGCS(ctx, bkt, objectName, f, "", nil) +} + +// DownloadBucket downloads all objects from a GCS bucket to a local directory. +func DownloadBucket(ctx context.Context, bkt *storage.BucketHandle, prefix string, destDir string) error { + it := bkt.Objects(ctx, &storage.Query{Prefix: prefix}) + + g, ctx := errgroup.WithContext(ctx) + // Limit concurrency to avoid running out of file descriptors or overwhelming the network + g.SetLimit(10) + + for { + if err := ctx.Err(); err != nil { + return err + } + + attrs, err := it.Next() + if errors.Is(err, iterator.Done) { + break + } + if err != nil { + return fmt.Errorf("bucket.Objects: %w", err) + } + + // Skip directories + if strings.HasSuffix(attrs.Name, "/") { + continue + } + + destPath := filepath.Join(destDir, attrs.Name) + if !strings.HasPrefix(destPath, filepath.Clean(destDir)+string(os.PathSeparator)) { + return fmt.Errorf("invalid object name %q: path traversal attempt", attrs.Name) + } + + // Capture loop variable for the goroutine + objName := attrs.Name + + g.Go(func() error { + if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil { + return fmt.Errorf("os.MkdirAll: %w", err) + } + + f, err := os.Create(destPath) + if err != nil { + return fmt.Errorf("os.Create: %w", err) + } + defer f.Close() + + rc, err := bkt.Object(objName).NewReader(ctx) + if err != nil { + return fmt.Errorf("Object(%q).NewReader: %w", objName, err) + } + defer rc.Close() + + if _, err := io.Copy(f, rc); err != nil { + return fmt.Errorf("io.Copy: %w", err) + } + + return nil + }) + } + + if err := g.Wait(); err != nil { + return err + } + + return nil +} + +// listBucketObjects lists the names of all objects in a Google Cloud Storage bucket. +// It does not download the file contents. +func ListBucketObjects(ctx context.Context, bucket *storage.BucketHandle, prefix string) ([]string, error) { + it := bucket.Objects(ctx, &storage.Query{Prefix: prefix}) + var filenames []string + for { + attrs, err := it.Next() + if errors.Is(err, iterator.Done) { + break // All objects have been listed. + } + if err != nil { + return nil, fmt.Errorf("bucket.Objects: %w", err) + } + filenames = append(filenames, attrs.Name) + } + + return filenames, nil +} diff --git a/vulnfeeds/gcs-tools/gcs_test.go b/vulnfeeds/gcs-tools/gcs_test.go new file mode 100644 index 00000000000..af8a2c7f94f --- /dev/null +++ b/vulnfeeds/gcs-tools/gcs_test.go @@ -0,0 +1,210 @@ +package gcs + +import ( + "bytes" + "context" + "os" + "path/filepath" + "testing" + + "github.com/fsouza/fake-gcs-server/fakestorage" +) + +func TestUploadToGCS(t *testing.T) { + server := fakestorage.NewServer([]fakestorage.Object{}) + t.Cleanup(server.Stop) + + client := server.Client() + bkt := client.Bucket("test-bucket") + if err := bkt.Create(context.Background(), "project", nil); err != nil { + t.Fatalf("failed to create bucket: %v", err) + } + + content := []byte("test content") + err := UploadToGCS(context.Background(), bkt, "test-object.txt", bytes.NewReader(content), "text/plain", nil) + if err != nil { + t.Fatalf("UploadToGCS failed: %v", err) + } + + obj, err := server.GetObject("test-bucket", "test-object.txt") + if err != nil { + t.Fatalf("failed to get object: %v", err) + } + + if !bytes.Equal(obj.Content, content) { + t.Errorf("expected content %q, got %q", content, obj.Content) + } + if obj.ContentType != "text/plain" { + t.Errorf("expected content type %q, got %q", "text/plain", obj.ContentType) + } +} + +func TestUploadFile(t *testing.T) { + server := fakestorage.NewServer([]fakestorage.Object{}) + t.Cleanup(server.Stop) + + client := server.Client() + bkt := client.Bucket("test-bucket") + if err := bkt.Create(context.Background(), "project", nil); err != nil { + t.Fatalf("failed to create bucket: %v", err) + } + + tmpFile, err := os.CreateTemp(t.TempDir(), "test-upload-*.txt") + if err != nil { + t.Fatalf("failed to create temp file: %v", err) + } + + content := []byte("file content") + if _, err := tmpFile.Write(content); err != nil { + t.Fatalf("failed to write to temp file: %v", err) + } + tmpFile.Close() + + err = UploadFile(context.Background(), bkt, "uploaded-file.txt", tmpFile.Name()) + if err != nil { + t.Fatalf("UploadFile failed: %v", err) + } + + obj, err := server.GetObject("test-bucket", "uploaded-file.txt") + if err != nil { + t.Fatalf("failed to get object: %v", err) + } + + if !bytes.Equal(obj.Content, content) { + t.Errorf("expected content %q, got %q", content, obj.Content) + } +} + +func TestDownloadBucket(t *testing.T) { + t.Run("success", func(t *testing.T) { + objects := []fakestorage.Object{ + { + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-bucket", + Name: "folder/file1.txt", + }, + Content: []byte("content 1"), + }, + { + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-bucket", + Name: "folder/file2.txt", + }, + Content: []byte("content 2"), + }, + { + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-bucket", + Name: "folder/subfolder/", // Should be skipped + }, + Content: []byte(""), + }, + { + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-bucket", + Name: "other-folder/file3.txt", + }, + Content: []byte("content 3"), + }, + } + + server := fakestorage.NewServer(objects) + t.Cleanup(server.Stop) + + client := server.Client() + bkt := client.Bucket("test-bucket") + + tmpDir := t.TempDir() + + err := DownloadBucket(context.Background(), bkt, "folder/", tmpDir) + if err != nil { + t.Fatalf("DownloadBucket failed: %v", err) + } + + // Verify file1.txt + content1, err := os.ReadFile(filepath.Join(tmpDir, "folder/file1.txt")) + if err != nil { + t.Fatalf("failed to read downloaded file1: %v", err) + } + if !bytes.Equal(content1, []byte("content 1")) { + t.Errorf("expected content 1, got %q", content1) + } + + // Verify file2.txt + content2, err := os.ReadFile(filepath.Join(tmpDir, "folder/file2.txt")) + if err != nil { + t.Fatalf("failed to read downloaded file2: %v", err) + } + if !bytes.Equal(content2, []byte("content 2")) { + t.Errorf("expected content 2, got %q", content2) + } + + // Verify file3.txt is NOT downloaded because of the prefix + if _, err := os.Stat(filepath.Join(tmpDir, "other-folder/file3.txt")); !os.IsNotExist(err) { + t.Errorf("expected file3.txt to not exist, but it does") + } + }) + + t.Run("path traversal", func(t *testing.T) { + objects := []fakestorage.Object{ + { + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-bucket", + Name: "../malicious.txt", + }, + Content: []byte("malicious content"), + }, + } + + server := fakestorage.NewServer(objects) + t.Cleanup(server.Stop) + + client := server.Client() + bkt := client.Bucket("test-bucket") + + tmpDir := t.TempDir() + + err := DownloadBucket(context.Background(), bkt, "", tmpDir) + if err == nil { + t.Fatalf("expected path traversal error, got nil") + } + if err.Error() != "invalid object name \"../malicious.txt\": path traversal attempt" { + t.Errorf("unexpected error message: %v", err) + } + }) + + t.Run("relative dest dir", func(t *testing.T) { + objects := []fakestorage.Object{ + { + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-bucket", + Name: "file.txt", + }, + Content: []byte("content"), + }, + } + + server := fakestorage.NewServer(objects) + t.Cleanup(server.Stop) + + client := server.Client() + bkt := client.Bucket("test-bucket") + + // Use a relative directory + destDir := "test-relative-dir" + t.Cleanup(func() { os.RemoveAll(destDir) }) + + err := DownloadBucket(context.Background(), bkt, "", destDir) + if err != nil { + t.Fatalf("DownloadBucket failed with relative dir: %v", err) + } + + content, err := os.ReadFile(filepath.Join(destDir, "file.txt")) + if err != nil { + t.Fatalf("failed to read downloaded file: %v", err) + } + if !bytes.Equal(content, []byte("content")) { + t.Errorf("expected content, got %q", content) + } + }) +} diff --git a/vulnfeeds/upload/cveworker.go b/vulnfeeds/upload/cveworker.go deleted file mode 100644 index 47ef47e13f3..00000000000 --- a/vulnfeeds/upload/cveworker.go +++ /dev/null @@ -1,296 +0,0 @@ -// Package upload handles allocating workers to intelligently uploading OSV records to a bucket -package upload - -import ( - "bytes" - "context" - "crypto/sha256" - "encoding/hex" - "errors" - "fmt" - "io" - "log/slog" - "os" - "path" - "sync" - "sync/atomic" - "time" - - "cloud.google.com/go/storage" - "google.golang.org/api/iterator" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/google/osv/vulnfeeds/utility/logger" - "github.com/google/osv/vulnfeeds/vulns" - "github.com/ossf/osv-schema/bindings/go/osvschema" -) - -const ( - // hashMetadataKey is the key for the sha256 hash in the GCS object metadata. - hashMetadataKey = "sha256-hash" - overrideFolder = "osv-output-overrides" // location of overrides within bucket -) - -// ErrUploadSkipped indicates that an upload was intentionally skipped because -// the vulnerability payload is unchanged. -var ErrUploadSkipped = errors.New("upload skipped") - -// writeToDisk writes the vulnerability to a local file. -// It returns an error if the file could not be written. -func writeToDisk(v *osvschema.Vulnerability, preModifiedBuf []byte, outputPrefix string) error { - filename := v.GetId() + ".json" - filePath := path.Join(outputPrefix, filename) - err := os.WriteFile(filePath, preModifiedBuf, 0600) - if err != nil { - return fmt.Errorf("failed to write OSV file at %s: %w", filePath, err) - } - - return nil -} - -// uploadToGCS uploads the vulnerability to a GCS bucket. -// It returns an error if the upload failed, or ErrUploadSkipped if the upload -// was intentionally avoided (e.g. because the GCS object has a matching hash). -func uploadToGCS(ctx context.Context, v *osvschema.Vulnerability, preModifiedBuf []byte, outBkt *storage.BucketHandle, outputPrefix string) error { - vulnID := v.GetId() - filename := vulnID + ".json" - - hash := sha256.Sum256(preModifiedBuf) - hexHash := hex.EncodeToString(hash[:]) - - objName := path.Join(outputPrefix, filename) - obj := outBkt.Object(objName) - - // Check if object exists and if hash matches. - attrs, err := obj.Attrs(ctx) - if err == nil { - // Object exists, check hash. - if attrs.Metadata != nil && attrs.Metadata[hashMetadataKey] == hexHash { - return ErrUploadSkipped - } - } else if !errors.Is(err, storage.ErrObjectNotExist) { - return fmt.Errorf("failed to get object attributes for %s: %w", vulnID, err) - } - - // Object does not exist or hash differs, upload. - v.Modified = timestamppb.New(time.Now().UTC()) - vuln := vulns.Vulnerability{Vulnerability: v} - var buf bytes.Buffer - if err := vuln.ToJSON(&buf); err != nil { - return fmt.Errorf("failed to marshal vulnerability with modified time for %s: %w", vulnID, err) - } - postModifiedBuf := buf.Bytes() - - wc := obj.NewWriter(ctx) - wc.Metadata = map[string]string{ - hashMetadataKey: hexHash, - } - wc.ContentType = "application/json" - - if _, err := wc.Write(postModifiedBuf); err != nil { - // Try to close writer even if write failed. - if closeErr := wc.Close(); closeErr != nil { - logger.Error("failed to close GCS writer after write error", slog.String("id", vulnID), slog.Any("err", closeErr)) - } - - return fmt.Errorf("failed to write to GCS object for %s: %w", vulnID, err) - } - - if err := wc.Close(); err != nil { - return fmt.Errorf("failed to close GCS writer for %s: %w", vulnID, err) - } - - return nil -} - -// handleOverride checks for and applies a vulnerability override if it exists. -// It returns the vulnerability to process, a pre-marshalled buffer if an override was used, -// and an error if a critical failure occurred. -func handleOverride(ctx context.Context, v *osvschema.Vulnerability, overridesBkt *storage.BucketHandle) (*osvschema.Vulnerability, []byte, error) { - filename := v.GetId() + ".json" - overrideObj := overridesBkt.Object(path.Join(overrideFolder, filename)) - if _, err := overrideObj.Attrs(ctx); err != nil { - if errors.Is(err, storage.ErrObjectNotExist) { - // No override found. - return v, nil, nil - } - // For any other error, we can't know if an override exists, so we return an error. - logger.Error("failed to check for override object", slog.String("id", v.GetId()), slog.Any("err", err)) - - return nil, nil, err - } - - // Override exists, read it and replace original vulnerability. - logger.Info("Using override", slog.String("id", v.GetId())) - rc, err := overrideObj.NewReader(ctx) - if err != nil { - logger.Error("failed to get reader for override object", slog.String("id", v.GetId()), slog.Any("err", err)) - return nil, nil, err - } - defer rc.Close() - - overrideBuf, err := io.ReadAll(rc) - if err != nil { - logger.Error("failed to read override object", slog.String("id", v.GetId()), slog.Any("err", err)) - return nil, nil, err - } - - var overrideV osvschema.Vulnerability - if err := protojson.Unmarshal(overrideBuf, &overrideV); err != nil { - logger.Error("failed to unmarshal override object", slog.String("id", v.GetId()), slog.Any("err", err)) - return nil, nil, err - } - - return &overrideV, overrideBuf, nil -} - -// Worker is a generic worker that processes OSV vulnerabilities from a channel. -// It can upload them to a GCS bucket or write them to disk. -// It supports checking for overrides in a separate GCS bucket location if overridesBkt is not nil. -// For GCS uploads, it calculates a hash of the vulnerability (excluding the modified time) and compares it -// with the existing object's hash. The vulnerability is uploaded only if the hashes differ, with the -// modified time updated. This prevents updating the modified time for vulnerabilities with no content changes. -func Worker(ctx context.Context, vulnChan <-chan *osvschema.Vulnerability, outBkt, overridesBkt *storage.BucketHandle, outputPrefix string, counter *atomic.Uint64) { - for v := range vulnChan { - vulnID := v.GetId() - if len(v.GetAffected()) == 0 { - logger.Warn("Skipping OSV record as no affected versions found.", slog.String("id", vulnID)) - continue - } - vulnToProcess := v - var preModifiedBuf []byte - var err error - - if overridesBkt != nil { - vulnToProcess, preModifiedBuf, err = handleOverride(ctx, v, overridesBkt) - if err != nil { - logger.Error("Failed to use override", slog.Any("error", err)) - continue - } - } - - if preModifiedBuf == nil { - // Marshal before setting modified time to generate hash. - vuln := vulns.Vulnerability{Vulnerability: v} - var buf bytes.Buffer - if err := vuln.ToJSON(&buf); err != nil { - logger.Error("failed to marshal vulnerability", slog.String("id", vulnID), slog.Any("err", err)) - continue - } - preModifiedBuf = buf.Bytes() - } - - var writeErr error - if outBkt == nil { - // Write to local disk - writeErr = writeToDisk(vulnToProcess, preModifiedBuf, outputPrefix) - } else { - // Upload to GCS - writeErr = uploadToGCS(ctx, vulnToProcess, preModifiedBuf, outBkt, outputPrefix) - } - - if writeErr == nil { - logger.Info("Uploaded successfully", slog.String("id", vulnID)) - if counter != nil { - counter.Add(1) - } - } else if errors.Is(writeErr, ErrUploadSkipped) { - logger.Info("Skipping upload, hash matches", slog.String("id", vulnID)) - } else { - logger.Error("Failed to upload/write", slog.String("id", vulnID), slog.Any("err", writeErr)) - } - } -} - -// Upload delegates workers to upload vulnerabilities to the buckets. -func Upload( - ctx context.Context, - jobName string, - uploadToGCS bool, - outputBucketName string, - overridesBucketName string, - numWorkers int, - osvOutputPath string, - vulnerabilities []*osvschema.Vulnerability, - doDeletions bool, -) { - var outBkt, overridesBkt *storage.BucketHandle - if uploadToGCS { - storageClient, err := storage.NewClient(ctx) - if err != nil { - logger.Fatal("Failed to create storage client", slog.Any("err", err)) - } - outBkt = storageClient.Bucket(outputBucketName) - if overridesBucketName != "" { - overridesBkt = storageClient.Bucket(overridesBucketName) - } - - if doDeletions { - handleDeletion(ctx, outBkt, osvOutputPath, vulnerabilities) - } - } - var wg sync.WaitGroup - var successCount atomic.Uint64 - vulnChan := make(chan *osvschema.Vulnerability, numWorkers) - - for range numWorkers { - wg.Add(1) - go func() { - defer wg.Done() - Worker(ctx, vulnChan, outBkt, overridesBkt, osvOutputPath, &successCount) - }() - } - - for _, v := range vulnerabilities { - vulnChan <- v - } - - close(vulnChan) - wg.Wait() - logger.Info("Successfully processed "+jobName, slog.Int("count", len(vulnerabilities))) - logger.Info("Successfully uploaded records", slog.Uint64("count", successCount.Load())) -} - -func handleDeletion(ctx context.Context, outBkt *storage.BucketHandle, osvOutputPath string, vulnerabilities []*osvschema.Vulnerability) { - // Check if any need to be deleted - bucketObjects, err := listBucketObjects(ctx, outBkt, osvOutputPath) - if err != nil { - logger.Error("Failed to list bucket objects for deletion check, skipping deletion.", slog.Any("err", err)) - return - } - vulnFilenames := make(map[string]bool) - for _, v := range vulnerabilities { - filename := v.GetId() + ".json" - filePath := path.Join(osvOutputPath, filename) - vulnFilenames[filePath] = true - } - for _, objName := range bucketObjects { - if !vulnFilenames[objName] { - logger.Info("Deleting stale object from bucket", slog.String("name", objName)) - obj := outBkt.Object(objName) - if err := obj.Delete(ctx); err != nil { - logger.Error("Failed to delete object", slog.String("name", objName), slog.Any("err", err)) - } - } - } -} - -// listBucketObjects lists the names of all objects in a Google Cloud Storage bucket. -// It does not download the file contents. -func listBucketObjects(ctx context.Context, bucket *storage.BucketHandle, prefix string) ([]string, error) { - it := bucket.Objects(ctx, &storage.Query{Prefix: prefix}) - var filenames []string - for { - attrs, err := it.Next() - if errors.Is(err, iterator.Done) { - break // All objects have been listed. - } - if err != nil { - return nil, fmt.Errorf("bucket.Objects: %w", err) - } - filenames = append(filenames, attrs.Name) - } - - return filenames, nil -}