diff --git a/cmd/dataselect-opendata-proxy/README.md b/cmd/dataselect-opendata-proxy/README.md new file mode 100644 index 0000000..350a806 --- /dev/null +++ b/cmd/dataselect-opendata-proxy/README.md @@ -0,0 +1,38 @@ +# dataselect-opendata-proxy + +A FDSN dataselect web service that serves miniSEED waveform data by using [GeoNet open data waveform data] (`https://www.geonet.org.nz/data/access/aws`) as the backend. + +This service is intented to run on client's local computer/premisis, as a middle man between end user and GeoNet Open Data bucket. **The end user simply runs this service, then sets his FDSN service to localhost, or to the on-premisis host.** + +Currently, this proxy service removed all request restrictions been set on GeoNet FDSN. Users using this proxy service won't be rejected when requesting a large amount of data in one shot. + +## How it works + +The service implements the [FDSN Web Services dataselect specification](http://www.fdsn.org/webservices/FDSN-WS-Specifications-1.1.pdf) (v1.1). + +For each request it: + +1. Parses the network/station/location/channel parameters and time range. +2. Enumerates calendar days covering the query window (starting one day before the requested start time to catch files whose data crosses the day boundary). +3. Lists matching S3 keys under the open data waveform prefix using the path pattern: + ``` + waveforms/miniseed/{yyyy}/{yyyy}.{doy}/{station}.{network}/{yyyy}.{doy}.{station}.{location}-{channel}.{network}.D + ``` +4. Fetches each matching file in sorted key order and streams miniSEED records that fall within the requested time window directly to the client. + +The GeoNet Open Data S3 bucket is public — no AWS credentials are required. + +## Running + +``` +go build ./cmd/dataselect-opendata-proxy +./dataselect-opendata-proxy +``` + +The server listens on `:8080`. + +### Environment variables + +| Variable | Description | +|----------|-------------| +| `LOG_EXTRA` | Set to `true` to log POST request bodies | diff --git a/cmd/dataselect-opendata-proxy/dataselect.go b/cmd/dataselect-opendata-proxy/dataselect.go new file mode 100644 index 0000000..b72f0c2 --- /dev/null +++ b/cmd/dataselect-opendata-proxy/dataselect.go @@ -0,0 +1,445 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "net/http" + "os" + "regexp" + "sort" + "time" + + "github.com/GeoNet/fdsn/internal/fdsn" + "github.com/GeoNet/kit/aws/s3" + ms "github.com/GeoNet/kit/seis/ms" + "github.com/GeoNet/kit/weft" + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + awss3 "github.com/aws/aws-sdk-go-v2/service/s3" +) + +const ( + // miniSEED record length + RECORDLEN int = 512 +) + +const s3Region = "ap-southeast-2" +const s3Bucket = "geonet-open-data" +const s3Prefix = "waveforms/miniseed" + +var ( + s3Client *s3.S3 // kit client — used for ListAll + s3RawClient *awss3.Client // raw client — used for streaming GetObject +) + +// initS3Client creates S3 clients using anonymous credentials. +// geonet-open-data is a public bucket; no AWS credentials are required. +func initS3Client() error { + anonOpt := func(o *awss3.Options) { + o.Credentials = aws.AnonymousCredentials{} + } + + // Build raw AWS config for the streaming client. + cfg, err := awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithRegion(s3Region), + awsconfig.WithCredentialsProvider(aws.AnonymousCredentials{}), + ) + if err != nil { + return fmt.Errorf("loading AWS config: %w", err) + } + s3RawClient = awss3.NewFromConfig(cfg) + + // Kit's getConfig() requires AWS_REGION to be set as an env var. + os.Setenv("AWS_REGION", s3Region) + + // Build kit client for listing. + c, err := s3.NewWithOptions(anonOpt) + if err != nil { + return fmt.Errorf("creating S3 client: %w", err) + } + s3Client = &c + + return nil +} + +// fdsnDataselectV1Handler handles FDSN dataselect queries. +// It derives S3 object keys from the NSLC parameters and time range, +// fetches the miniSEED files in sorted order, and streams matching records +// directly to the client as they are found. +func fdsnDataselectV1Handler(r *http.Request, w http.ResponseWriter) (int64, error) { + var params []fdsn.DataSelect + + tm := time.Now() + + switch r.Method { + case "POST": + defer func() { _ = r.Body.Close() }() + if err := fdsn.ParseDataSelectPost(r.Body, ¶ms); err != nil { + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusBadRequest, Err: err}, url: r.URL.String(), timestamp: tm} + } + if len(params) == 0 { + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusBadRequest, Err: fmt.Errorf("unable to parse post request")}, url: r.URL.String(), timestamp: tm} + } + case "GET": + d, err := fdsn.ParseDataSelectGet(r.URL.Query()) + if err != nil { + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusBadRequest, Err: err}, url: r.URL.String(), timestamp: tm} + } + params = append(params, d) + default: + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusMethodNotAllowed}, url: r.URL.String(), timestamp: tm} + } + + if r.Method == "POST" && LOG_EXTRA { + log.Printf("About to execute the following query params: %+v\n", params) + } + + type dataSelect struct { + d fdsn.DataSearch + keys []string + } + + var request []dataSelect + var totalFiles int + + for _, v := range params { + d, err := v.Regexp() + if err != nil { + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusBadRequest, Err: err}, url: r.URL.String(), timestamp: tm} + } + if !d.End.After(d.Start) { + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusBadRequest, Err: fmt.Errorf("endtime must be after starttime")}, url: r.URL.String(), timestamp: tm} + } + + keys, err := s3KeysForSearch(d) + if err != nil { + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusInternalServerError, Err: err}, url: r.URL.String(), timestamp: tm} + } + + totalFiles += len(keys) + request = append(request, dataSelect{d: d, keys: keys}) + } + + if totalFiles == 0 { + return 0, fdsnError{StatusError: weft.StatusError{Code: params[0].NoData, Err: fmt.Errorf("no results for specified query")}, url: r.URL.String(), timestamp: tm} + } + + record := make([]byte, RECORDLEN) + w.Header().Set("Content-Type", "application/vnd.fdsn.mseed") + + flusher, canFlush := w.(http.Flusher) + var written int + + for _, v := range request { + for _, k := range v.keys { + // Check file time span via ranged header fetches before downloading the full file. + overlaps, err := fileOverlapsWindow(k, v.d.Start, v.d.End) + if err != nil { + log.Printf("skipping key=%s: header check failed: %v", k, err) + continue + } + if !overlaps { + log.Printf("skipping key=%s: no overlap with query window", k) + continue + } + + log.Printf("fetching key=%s start=%s end=%s", k, v.d.Start.Format(time.RFC3339), v.d.End.Format(time.RFC3339)) + + result, err := s3RawClient.GetObject(context.Background(), &awss3.GetObjectInput{ + Bucket: aws.String(s3Bucket), + Key: aws.String(k), + }) + if err != nil { + // Key not found — this day/NSLC file simply doesn't exist. + log.Printf("miniSEED file not found, key: %s (%v)", k, err) + continue + } + + loop: + for { + _, err = io.ReadFull(result.Body, record) + switch { + case err == io.EOF: + break loop + case err != nil: + result.Body.Close() + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusInternalServerError, Err: err}, url: r.URL.String(), timestamp: tm} + } + + msr, err := ms.NewRecord(record) + if err != nil { + result.Body.Close() + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusInternalServerError, Err: err}, url: r.URL.String(), timestamp: tm} + } + + if msr.StartTime().Before(v.d.End) && msr.EndTime().After(v.d.Start) { + n, err := w.Write(record) + if err != nil { + result.Body.Close() + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusInternalServerError, Err: err}, url: r.URL.String(), timestamp: tm} + } + written += n + if canFlush { + flusher.Flush() + } + } + } + result.Body.Close() + } + } + + if written == 0 { + return 0, fdsnError{StatusError: weft.StatusError{Code: http.StatusNoContent, Err: fmt.Errorf("no results for specified query")}, url: r.URL.String(), timestamp: tm} + } + + return int64(written), nil +} + +// fileOverlapsWindow fetches only the first and last 512-byte miniSEED records +// from the S3 file using ranged GET requests, and reports whether the file's +// time span overlaps [start, end). This avoids downloading the full file when +// it falls entirely outside the query window. +func fileOverlapsWindow(key string, start, end time.Time) (bool, error) { + // check file size, if the size is less than 2K then don't bother checking this + head, err := s3RawClient.HeadObject(context.Background(), &awss3.HeadObjectInput{ + Bucket: aws.String(s3Bucket), + Key: aws.String(key), + }) + if err != nil { + return false, fmt.Errorf("head object: %w", err) + } + if head.ContentLength != nil && *head.ContentLength < int64(2*RECORDLEN) { + return true, nil + } + + // First record → file start time. + buf := &bytes.Buffer{} + if err := s3Client.GetByteRange(s3Bucket, key, "", "bytes=0-511", buf); err != nil { + return false, fmt.Errorf("range fetch first record: %w", err) + } + firstRec, err := ms.NewRecord(buf.Bytes()) + if err != nil { + return false, fmt.Errorf("parsing first record: %w", err) + } + fileStart := firstRec.StartTime() + + // Last record → file end time. + buf.Reset() + if err := s3Client.GetByteRange(s3Bucket, key, "", "bytes=-512", buf); err != nil { + return false, fmt.Errorf("range fetch last record: %w", err) + } + lastRec, err := ms.NewRecord(buf.Bytes()) + if err != nil { + return false, fmt.Errorf("parsing last record: %w", err) + } + fileEnd := lastRec.EndTime() + + return fileStart.Before(end) && fileEnd.After(start), nil +} + +// s3KeysForSearch returns all S3 keys matching the DataSearch parameters, +// sorted by key path so files are fetched in chronological order. +// +// The S3 path structure is: +// +// waveforms/miniseed/{yyyy}/{yyyy}.{doy}/{station}.{network}/{yyyy}.{doy}.{station}.{location}-{channel}.{network}.D +func s3KeysForSearch(d fdsn.DataSearch) ([]string, error) { + netRe, err := regexp.Compile(d.Network) + if err != nil { + return nil, fmt.Errorf("invalid network pattern: %w", err) + } + staRe, err := regexp.Compile(d.Station) + if err != nil { + return nil, fmt.Errorf("invalid station pattern: %w", err) + } + locRe, err := regexp.Compile(d.Location) + if err != nil { + return nil, fmt.Errorf("invalid location pattern: %w", err) + } + chRe, err := regexp.Compile(d.Channel) + if err != nil { + return nil, fmt.Errorf("invalid channel pattern: %w", err) + } + + seen := make(map[string]struct{}) + var keys []string + + // Start one day before the query window to capture records whose miniSEED + // file is dated the previous day but whose data overlaps the query start + // (matching the -24h lookback used by fdsn-ws when querying the holdings DB). + startDay := d.Start.UTC().Truncate(24*time.Hour).AddDate(0, 0, -1) + // End at the day that contains d.End. If d.End falls exactly on midnight, + // that day's files start at or after d.End and cannot overlap the window, + // so we stop at the previous day. + endDay := d.End.UTC().Truncate(24 * time.Hour) + if d.End.UTC().Equal(endDay) { + endDay = endDay.AddDate(0, 0, -1) + } + + for t := startDay; !t.After(endDay); t = t.AddDate(0, 0, 1) { + yyyy := t.Format("2006") + doy := fmt.Sprintf("%03d", t.YearDay()) + dayPrefix := fmt.Sprintf("%s/%s/%s.%s/", s3Prefix, yyyy, yyyy, doy) + + candidates, err := s3Client.ListAll(s3Bucket, dayPrefix) + if err != nil { + return nil, fmt.Errorf("S3 list error for prefix %s: %w", dayPrefix, err) + } + + for _, key := range candidates { + net, sta, loc, ch, ok := parseS3Key(key) + if !ok { + continue + } + if !netRe.MatchString(net) || !staRe.MatchString(sta) || !locRe.MatchString(loc) || !chRe.MatchString(ch) { + continue + } + if _, dup := seen[key]; !dup { + seen[key] = struct{}{} + keys = append(keys, key) + } + } + } + + sort.Strings(keys) + return keys, nil +} + +// s3KeyRegexp matches the open-data S3 key format and captures NSLC components. +// +// Path format: +// +// waveforms/miniseed/{yyyy}/{yyyy}.{doy}/{station}.{network}/{yyyy}.{doy}.{station}.{location}-{channel}.{network}.D +// +// Capture groups: [1]=yyyy [2]=yyyy(dir) [3]=doy(dir) [4]=sta(dir) [5]=net(dir) +// +// [6]=yyyy(file) [7]=doy(file) [8]=station [9]=location [10]=channel [11]=network +var s3KeyRegexp = regexp.MustCompile( + `^waveforms/miniseed/` + + `(\d{4})/` + + `(\d{4})\.(\d{3})/` + + `([^/]+)\.([A-Z0-9]+)/` + + `(\d{4})\.(\d{3})\.([^.]+)\.([^-]*)-([^.]+)\.([A-Z0-9]+)\.D$`) + +// parseS3Key extracts network, station, location, and channel from an S3 key. +// Returns ok=false if the key does not match the expected pattern. +func parseS3Key(key string) (network, station, location, channel string, ok bool) { + m := s3KeyRegexp.FindStringSubmatch(key) + if m == nil { + return "", "", "", "", false + } + // m[8]=station, m[9]=location, m[10]=channel, m[11]=network (all from filename segment) + return m[11], m[8], m[9], m[10], true +} + +func fdsnDataselectV1Index(r *http.Request, h http.Header, b *bytes.Buffer) error { + if err := weft.CheckQuery(r, []string{"GET"}, []string{}, []string{}); err != nil { + return err + } + h.Set("Content-Type", "text/html") + b.Write(dataselectIndexHTML) + return nil +} + +func fdsnDataselectVersion(r *http.Request, h http.Header, b *bytes.Buffer) error { + if err := weft.CheckQuery(r, []string{"GET"}, []string{}, []string{}); err != nil { + return err + } + h.Set("Content-Type", "text/plain") + b.WriteString(dataselectVersion) + return nil +} + +func fdsnDataselectWadl(r *http.Request, h http.Header, b *bytes.Buffer) error { + if err := weft.CheckQuery(r, []string{"GET"}, []string{}, []string{}); err != nil { + return err + } + h.Set("Content-Type", "application/xml") + b.Write(dataselectWADL) + return nil +} + +var dataselectIndexHTML = []byte(` + + + + FDSNWS - Dataselect (Open Data Proxy) + + +

FDSNWS Dataselect Web Service (Open Data Proxy)

+

The dataselect Web service provides access to waveform data in +miniseed format +sourced from the GeoNet open data S3 bucket.

+ +

Available URLs

+ + +

Feature Notes

+ + +`) + +var dataselectWADL = []byte(` + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +`) diff --git a/cmd/dataselect-opendata-proxy/dataselect_test.go b/cmd/dataselect-opendata-proxy/dataselect_test.go new file mode 100644 index 0000000..bbfda74 --- /dev/null +++ b/cmd/dataselect-opendata-proxy/dataselect_test.go @@ -0,0 +1,56 @@ +package main + +import ( + "crypto/sha256" + "io" + "log" + "net/http" + "net/http/httptest" + "os" + "testing" +) + +const query = "/fdsnws/dataselect/1/query?network=NZ&station=AKCZ&channel=*&location=10&start=2026-04-01T22:00:00Z&end=2026-04-02T02:00:00Z" +const testDataFile = "testdata/AKCZ.D" + +// TestDataselectSample starts the proxy server and sends the same +// query that was used to produce testdata/AKCZ.D, then compares the binary +func TestDataselectSample(t *testing.T) { + if err := initS3Client(); err != nil { + t.Fatalf("initS3Client: %v", err) + } + + initRoutes() + + if !testing.Verbose() { + log.SetOutput(io.Discard) + } + + ts := httptest.NewServer(mux) + defer ts.Close() + + resp, err := http.Get(ts.URL + query) + if err != nil { + t.Fatalf("GET: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("expected 200, got %d: %s", resp.StatusCode, body) + } + + got, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("reading response: %v", err) + } + + want, err := os.ReadFile(testDataFile) + if err != nil { + t.Fatalf("reading testdata: %v", err) + } + + if sha256.Sum256(got) != sha256.Sum256(want) { + t.Fatalf("sha256 mismatch") + } +} diff --git a/cmd/dataselect-opendata-proxy/main.go b/cmd/dataselect-opendata-proxy/main.go new file mode 100644 index 0000000..7b4a9d9 --- /dev/null +++ b/cmd/dataselect-opendata-proxy/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + "log" + "net/http" + "os" + "time" + + "github.com/GeoNet/kit/health" + "github.com/GeoNet/kit/weft" +) + +const servicePort = ":8080" + +var ( + LOG_EXTRA bool +) + +const dataselectVersion = "1.1" + +func main() { + if health.RunningHealthCheck() { + healthCheck() + } + + LOG_EXTRA = false + if logExtra := os.Getenv("LOG_EXTRA"); logExtra == "true" { + LOG_EXTRA = true + } + + if err := initS3Client(); err != nil { + log.Fatalf("error creating S3 client: %s", err) + } + + initRoutes() + + log.Println("starting dataselect-opendata-proxy server") + server := &http.Server{ + Addr: servicePort, + Handler: mux, + ReadTimeout: 1 * time.Minute, + WriteTimeout: 10 * time.Minute, + } + log.Fatal(server.ListenAndServe()) +} + +func healthCheck() { + timeout := 30 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + msg, err := health.Check(ctx, fmt.Sprintf("%s/soh", servicePort), timeout) + if err != nil { + log.Printf("status: %v", err) + os.Exit(1) + } + log.Printf("status: %s", string(msg)) + os.Exit(0) +} + +func init() { + logger := log.New(os.Stderr, "", log.LstdFlags) + weft.SetLogger(logger) + weft.EnableLogRequest(true) + weft.EnableLogPostBody(true) +} diff --git a/cmd/dataselect-opendata-proxy/prompt.txt b/cmd/dataselect-opendata-proxy/prompt.txt new file mode 100644 index 0000000..a6e2d37 --- /dev/null +++ b/cmd/dataselect-opendata-proxy/prompt.txt @@ -0,0 +1,16 @@ +We're creating a proxy service which: + +- golang application +- runs FDSN dataselect service, the request and response be the same as what's in the dataselect handler in ../fdsn-ws/. +- we're going to change the backend by + - the request being analyzed by the service to become network/station/location/channel sets (we call it NSLC here) + - with NSLCs and the time range, the service to fetch files from s3://geonet-open-data/waveforms/miniseed/{yyyy}/{yyyy}.{doy}/{station-4-char-code}.{network-2-char-code}/{yyyy}.{doy}.{station-4-char-code}.{location}-{channel}.{network}.D + - process the fetched files by: + - unpacking the miniseed file + - find the records enclosed in the parameter specified + - for files having size > 1024 bytes, before download full file, fetch miniseed file's header first (download from S3 partially), to check if this miniseed file contains the records the user requested. same approach applies to check the bottom of file first. + - response to the client, same as in the original fdsn-ws service. +- the files fetch from S3 is ordered by their pathnames. Once the first record meets user's request range, the service starts responding immediately and streaming the rest of data along the way. +- should be using similary http service pattern with fdsn-ws +- create test cases: run test server and request "/fdsnws/dataselect/1/query?network=NZ&station=AKCZ&channel=EHE&location=10&start=2026-04-01T00:22:00Z&end=2026-04-02T02:00:00Z" comparing the output of reques vs the file testdata/AKCZ.D. +- create a README.md in explaining this service diff --git a/cmd/dataselect-opendata-proxy/routes.go b/cmd/dataselect-opendata-proxy/routes.go new file mode 100644 index 0000000..6fdbc42 --- /dev/null +++ b/cmd/dataselect-opendata-proxy/routes.go @@ -0,0 +1,69 @@ +package main + +import ( + "bytes" + "fmt" + "net/http" + "strings" + "time" + + "github.com/GeoNet/kit/weft" +) + +var mux *http.ServeMux + +func initRoutes() { + mux = http.NewServeMux() + + mux.HandleFunc("/", weft.MakeHandler(weft.NoMatch, weft.TextError)) + mux.HandleFunc("/soh/up", weft.MakeHandler(weft.Up, weft.TextError)) + mux.HandleFunc("/soh", weft.MakeHandler(soh, weft.UseError)) + + mux.HandleFunc("/fdsnws/dataselect/1/", weft.MakeHandler(fdsnDataselectV1Index, weft.TextError)) + mux.HandleFunc("/fdsnws/dataselect/1/query", weft.MakeDirectHandler(fdsnDataselectV1Handler, fdsnErrorHandler)) + mux.HandleFunc("/fdsnws/dataselect/1/version", weft.MakeHandler(fdsnDataselectVersion, weft.TextError)) + mux.HandleFunc("/fdsnws/dataselect/1/application.wadl", weft.MakeHandler(fdsnDataselectWadl, weft.TextError)) +} + +func soh(r *http.Request, h http.Header, b *bytes.Buffer) error { + err := weft.CheckQuery(r, []string{"GET"}, []string{}, []string{}) + if err != nil { + return err + } + b.WriteString("ok") + return nil +} + +const FDSN_ERR_FORMAT = `Error %03d: %s +%s +Usage details are available from https://www.geonet.org.nz/data/tools/FDSN +Request: +%s +Request Submitted: +%s +Service version: +%s` + +type fdsnError struct { + weft.StatusError + url string + timestamp time.Time +} + +func fdsnErrorHandler(err error, h http.Header, b *bytes.Buffer, nounce string) error { + switch e := err.(type) { + case fdsnError: + h.Set("Content-Type", "text/plain; charset=utf-8") + if e.Code != http.StatusNoContent && e.Code != http.StatusNotFound { + var ver string + if strings.HasPrefix(e.url, "/fdsnws/dataselect/") { + ver = dataselectVersion + } + msg := fmt.Sprintf(FDSN_ERR_FORMAT, e.Code, http.StatusText(e.Code), e.Err, e.url, e.timestamp.Format(time.RFC3339), ver) + b.WriteString(msg) + } + return nil + } + + return weft.TextError(err, h, b, nounce) +} diff --git a/cmd/dataselect-opendata-proxy/testdata/AKCZ.D b/cmd/dataselect-opendata-proxy/testdata/AKCZ.D new file mode 100644 index 0000000..bee87c3 Binary files /dev/null and b/cmd/dataselect-opendata-proxy/testdata/AKCZ.D differ