44 "context"
55 "encoding/json"
66 "errors"
7+ "io"
78 "net/http"
89 "reflect"
910 "sort"
2728 ErrInvalidVersionFormat = errors .New ("invalid version format" )
2829 ErrCurrentVersionCannotBeEmpty = errors .New ("current version field cannot be empty" )
2930 ErrNativeTypeMigration = errors .New ("cannot register migration for native Go type; use a custom type alias instead (e.g., 'type MyString string')" )
31+ ErrAlreadyBuilt = errors .New ("cannot register migrations after Build() has been called" )
32+ ErrNotBuilt = errors .New ("must call Build() before using RequestMigration" )
3033)
3134
3235type userVersionKey struct {}
@@ -73,11 +76,13 @@ type RequestMigration struct {
7376 metric * prometheus.HistogramVec
7477 iv string
7578
76- mu * sync.RWMutex
7779 migrations map [reflect.Type ]map [string ]TypeMigration // type -> version -> migration
7880
7981 graphBuilder * typeGraphBuilder
8082 graphCache sync.Map
83+
84+ built bool
85+ err error
8186}
8287
8388func NewRequestMigration (opts * RequestMigrationOptions ) (* RequestMigration , error ) {
@@ -110,7 +115,6 @@ func NewRequestMigration(opts *RequestMigrationOptions) (*RequestMigration, erro
110115 metric : me ,
111116 iv : iv ,
112117 versions : versions ,
113- mu : new (sync.RWMutex ),
114118 migrations : make (map [reflect.Type ]map [string ]TypeMigration ),
115119 }
116120
@@ -121,6 +125,10 @@ func NewRequestMigration(opts *RequestMigrationOptions) (*RequestMigration, erro
121125
122126// For creates a request-scoped Migrator for performing migrations.
123127func (rm * RequestMigration ) For (r * http.Request ) (* Migrator , error ) {
128+ if ! rm .built {
129+ return nil , ErrNotBuilt
130+ }
131+
124132 if r == nil {
125133 return nil , errors .New ("request cannot be nil" )
126134 }
@@ -168,9 +176,6 @@ func (rm *RequestMigration) WriteVersionHeader() func(next http.Handler) http.Ha
168176
169177// FindMigrationsForType returns all migrations applicable to a type from a given version forward.
170178func (rm * RequestMigration ) FindMigrationsForType (t reflect.Type , userVersion * Version ) []TypeMigration {
171- rm .mu .RLock ()
172- defer rm .mu .RUnlock ()
173-
174179 var applicableMigrations []TypeMigration
175180
176181 typeHistory , ok := rm .migrations [t ]
@@ -192,6 +197,57 @@ func (rm *RequestMigration) FindMigrationsForType(t reflect.Type, userVersion *V
192197 return applicableMigrations
193198}
194199
200+ // Register adds one or more type migrations. Returns rm for chaining.
201+ // Errors are accumulated and surfaced when Build is called.
202+ func (rm * RequestMigration ) Register (migrations ... VersionedTypeMigration ) * RequestMigration {
203+ if rm .err != nil {
204+ return rm
205+ }
206+
207+ if rm .built {
208+ rm .err = ErrAlreadyBuilt
209+ return rm
210+ }
211+
212+ for _ , entry := range migrations {
213+ if ! isValidMigrationType (entry .t ) {
214+ rm .err = ErrNativeTypeMigration
215+ return rm
216+ }
217+ rm .registerTypeMigration (entry .version , entry .t , entry .migration )
218+ }
219+
220+ return rm
221+ }
222+
223+ // Build sorts versions, eagerly builds type graphs, and marks the instance as
224+ // ready for use. Must be called after all Register calls and before For/Bind.
225+ func (rm * RequestMigration ) Build () error {
226+ if rm .err != nil {
227+ return rm .err
228+ }
229+
230+ if rm .built {
231+ return ErrAlreadyBuilt
232+ }
233+
234+ switch rm .opts .VersionFormat {
235+ case SemverFormat :
236+ sort .Slice (rm .versions , semVerSorter (rm .versions ))
237+ case DateFormat :
238+ sort .Slice (rm .versions , dateVersionSorter (rm .versions ))
239+ default :
240+ return ErrInvalidVersionFormat
241+ }
242+
243+ for t := range rm .migrations {
244+ rm .buildAndCacheGraphsForType (t , rm .versions )
245+ }
246+
247+ rm .built = true
248+ return nil
249+ }
250+
195251func (rm * RequestMigration ) getUserVersion (req * http.Request ) (* Version , error ) {
196252 var vh = req .Header .Get (rm .opts .VersionHeader )
197253
@@ -243,17 +299,11 @@ func (rm *RequestMigration) observeRequestLatency(from, to *Version, sT time.Tim
243299 h .Observe (latency .Seconds ())
244300}
245301
246- func (rm * RequestMigration ) registerTypeMigration (version string , t reflect.Type , m TypeMigration ) error {
247- // Copy versions for graph building (done outside the lock)
248- var versionsCopy []* Version
249-
250- rm .mu .Lock ()
251-
302+ func (rm * RequestMigration ) registerTypeMigration (version string , t reflect.Type , m TypeMigration ) {
252303 if rm .migrations == nil {
253304 rm .migrations = make (map [reflect.Type ]map [string ]TypeMigration )
254305 }
255306
256- // Check if this version is already known
257307 versionKnown := false
258308 for _ , v := range rm .versions {
259309 if v .Value == version {
@@ -264,39 +314,16 @@ func (rm *RequestMigration) registerTypeMigration(version string, t reflect.Type
264314
265315 if ! versionKnown {
266316 rm .versions = append (rm .versions , & Version {Format : rm .opts .VersionFormat , Value : version })
267-
268- switch rm .opts .VersionFormat {
269- case SemverFormat :
270- sort .Slice (rm .versions , semVerSorter (rm .versions ))
271- case DateFormat :
272- sort .Slice (rm .versions , dateVersionSorter (rm .versions ))
273- default :
274- rm .mu .Unlock ()
275- return ErrInvalidVersionFormat
276- }
277317 }
278318
279- // Internal Type-Centric Pivot: map[Type]map[Version]Migration
280319 if _ , ok := rm .migrations [t ]; ! ok {
281320 rm .migrations [t ] = make (map [string ]TypeMigration )
282321 }
283322 rm.migrations [t ][version ] = m
284-
285- // Copy versions for graph building outside the lock
286- versionsCopy = make ([]* Version , len (rm .versions ))
287- copy (versionsCopy , rm .versions )
288-
289- rm .mu .Unlock ()
290-
291- // Eagerly build and cache graphs for this type across all known versions
292- // This is done outside the write lock since building only needs read access
293- rm .buildAndCacheGraphsForType (t , versionsCopy )
294-
295- return nil
296323}
297324
298325// buildAndCacheGraphsForType builds and caches type graphs for all known versions.
299- // Called during registration to eagerly populate the cache.
326+ // Called during Build to eagerly populate the cache.
300327// Types with interface fields are skipped - they require runtime value inspection
301328// and will be built lazily via buildFromValue.
302329func (rm * RequestMigration ) buildAndCacheGraphsForType (t reflect.Type , versions []* Version ) {
@@ -344,16 +371,11 @@ func (m *Migrator) Marshal(v interface{}) ([]byte, error) {
344371
345372 currentVersion := m .rm .getCurrentVersion ()
346373
347- data , err := json . Marshal (v )
374+ intermediate , err := readBody (v )
348375 if err != nil {
349376 return nil , err
350377 }
351378
352- var intermediate any
353- if err := json .Unmarshal (data , & intermediate ); err != nil {
354- return nil , err
355- }
356-
357379 if err := graph .MigrateBackward (m .ctx , & intermediate ); err != nil {
358380 return nil , err
359381 }
@@ -408,12 +430,7 @@ func (m *Migrator) Unmarshal(data []byte, v interface{}) error {
408430 return err
409431 }
410432
411- data , err := json .Marshal (intermediate )
412- if err != nil {
413- return err
414- }
415-
416- if err := json .Unmarshal (data , v ); err != nil {
433+ if err := writeBody (intermediate , v ); err != nil {
417434 return err
418435 }
419436
@@ -702,12 +719,21 @@ func (b *typeGraphBuilder) walkValue(v reflect.Value, userVersion *Version, visi
702719 return graph , nil
703720}
704721
705- func Register [T any ](rm * RequestMigration , version string , m TypeMigration ) error {
706- t := reflect .TypeOf ((* T )(nil )).Elem ()
707- if ! isValidMigrationType (t ) {
708- return ErrNativeTypeMigration
722+ // VersionedTypeMigration pairs a type with a version and its migration logic.
723+ // Construct using the Migration generic helper.
724+ type VersionedTypeMigration struct {
725+ version string
726+ t reflect.Type
727+ migration TypeMigration
728+ }
729+
730+ // Migration creates a VersionedTypeMigration entry for type T.
731+ func Migration [T any ](version string , m TypeMigration ) VersionedTypeMigration {
732+ return VersionedTypeMigration {
733+ version : version ,
734+ t : reflect .TypeOf ((* T )(nil )).Elem (),
735+ migration : m ,
709736 }
710- return rm .registerTypeMigration (version , t , m )
711737}
712738
713739// isValidMigrationType returns true ONLY if the type is a user-defined named type.
@@ -730,3 +756,49 @@ func isValidMigrationType(t reflect.Type) bool {
730756
731757 return true
732758}
759+
760+ // readBody converts v to a generic JSON representation (map/slice/primitive)
761+ // by streaming the encoding directly into the decoder via an io.Pipe,
762+ // avoiding a full intermediate []byte allocation.
763+ func readBody (v any ) (any , error ) {
764+ pr , pw := io .Pipe ()
765+
766+ var result any
767+ errCh := make (chan error , 1 )
768+ go func () {
769+ errCh <- json .NewDecoder (pr ).Decode (& result )
770+ }()
771+
772+ if err := json .NewEncoder (pw ).Encode (v ); err != nil {
773+ pw .CloseWithError (err )
774+ <- errCh
775+ return nil , err
776+ }
777+ pw .Close ()
778+
779+ if err := <- errCh ; err != nil {
780+ return nil , err
781+ }
782+
783+ return result , nil
784+ }
785+
786+ // writeBody streams a generic JSON representation into the typed destination v,
787+ // avoiding a full intermediate []byte allocation.
788+ func writeBody (src , dst any ) error {
789+ pr , pw := io .Pipe ()
790+
791+ errCh := make (chan error , 1 )
792+ go func () {
793+ errCh <- json .NewDecoder (pr ).Decode (dst )
794+ }()
795+
796+ if err := json .NewEncoder (pw ).Encode (src ); err != nil {
797+ pw .CloseWithError (err )
798+ <- errCh
799+ return err
800+ }
801+ pw .Close ()
802+
803+ return <- errCh
804+ }
0 commit comments