Skip to content

Commit 4ef9a74

Browse files
authored
Implement retry for snapshots/deployment upload/status check (#244)
1 parent 1cce394 commit 4ef9a74

2 files changed

Lines changed: 146 additions & 79 deletions

File tree

nmcp-tasks/src/main/kotlin/nmcp/internal/task/nmcpPublishWithPublisherApi.kt

Lines changed: 35 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import kotlin.time.TimeSource.Monotonic.markNow
1111
import kotlinx.serialization.json.Json
1212
import kotlinx.serialization.json.JsonObject
1313
import kotlinx.serialization.json.JsonPrimitive
14+
import nmcp.transport.Success
15+
import nmcp.transport.executeWithRetries
1416
import nmcp.transport.nmcpClient
1517
import okhttp3.MediaType.Companion.toMediaType
1618
import okhttp3.MultipartBody
@@ -60,20 +62,17 @@ internal fun nmcpPublishWithPublisherApi(
6062
val url = baseUrl + "api/v1/publisher/upload?publishingType=$publishingType"
6163

6264
logger.lifecycle("Uploading deployment to '$url'")
63-
val deploymentId = Request.Builder()
65+
val request = Request.Builder()
6466
.post(body)
6567
.addHeader("Authorization", "Bearer $token")
6668
.url(url)
6769
.build()
68-
.let {
69-
nmcpClient.newCall(it).execute()
70-
}.use {
71-
if (!it.isSuccessful) {
72-
error("Cannot deploy to maven central (status='${it.code}'): ${it.body.string()}")
73-
}
70+
val result = executeWithRetries(logger, nmcpClient, request)
7471

75-
it.body.string()
76-
}
72+
if (result !is Success) {
73+
error("Cannot upload deployment to maven central: ($result)}")
74+
}
75+
val deploymentId = result.body.use { it.readUtf8() }
7776

7877
logger.lifecycle("Nmcp: deployment bundle '$deploymentId' uploaded.")
7978

@@ -118,6 +117,7 @@ private fun waitFor(
118117
}
119118

120119
val status = verifyStatus(
120+
logger = logger,
121121
deploymentId = deploymentId,
122122
baseUrl = baseUrl,
123123
token = token,
@@ -137,9 +137,6 @@ private fun waitFor(
137137

138138
private sealed interface Status
139139

140-
// A deployment has successfully been uploaded to Maven Central
141-
private data object UNKNOWN_QUERY_LATER : Status
142-
143140
// A deployment is uploaded and waiting for processing by the validation service
144141
private data object PENDING : Status
145142

@@ -159,47 +156,42 @@ private data object PUBLISHED : Status
159156
private class FAILED(val error: String) : Status
160157

161158
private fun verifyStatus(
159+
logger: GLogger,
162160
deploymentId: String,
163161
baseUrl: String,
164162
token: String,
165163
): Status {
166-
Request.Builder()
164+
val request = Request.Builder()
167165
.post(ByteString.EMPTY.toRequestBody())
168166
.addHeader("Authorization", "Bearer $token")
169167
.url(baseUrl + "api/v1/publisher/status?id=$deploymentId")
170168
.build()
171-
.let {
172-
try {
173-
nmcpClient.newCall(it).execute()
174-
} catch (_: SocketTimeoutException) {
175-
return UNKNOWN_QUERY_LATER
176-
}
177-
}.use {
178-
if (!it.isSuccessful) {
179-
error("Cannot verify deployment $deploymentId status (HTTP status='${it.code}'): ${it.body.string()}")
180-
}
169+
val result = executeWithRetries(logger, nmcpClient, request)
170+
if (result !is Success) {
171+
error("Cannot verify deployment $deploymentId status ($result)")
172+
}
181173

182-
val str = it.body.string()
183-
val element = Json.parseToJsonElement(str)
184-
check(element is JsonObject) {
185-
"Nmcp: unexpected status response for deployment $deploymentId: $str"
186-
}
174+
val str = result.body.use { it.readUtf8() }
175+
val element = Json.parseToJsonElement(str)
176+
check(element is JsonObject) {
177+
"Nmcp: unexpected status response for deployment $deploymentId: $str"
178+
}
187179

188-
val state = element["deploymentState"]
189-
check(state is JsonPrimitive && state.isString) {
190-
"Nmcp: unexpected deploymentState for deployment $deploymentId: $state"
191-
}
180+
val state = element["deploymentState"]
181+
check(state is JsonPrimitive && state.isString) {
182+
"Nmcp: unexpected deploymentState for deployment $deploymentId: $state"
183+
}
192184

193-
return when (state.content) {
194-
"PENDING" -> PENDING
195-
"VALIDATING" -> VALIDATING
196-
"VALIDATED" -> VALIDATED
197-
"PUBLISHING" -> PUBLISHING
198-
"PUBLISHED" -> PUBLISHED
199-
"FAILED" -> {
200-
FAILED(element["errors"].toString())
201-
}
202-
else -> error("Nmcp: unexpected deploymentState for deployment $deploymentId: $state")
203-
}
185+
return when (state.content) {
186+
"PENDING" -> PENDING
187+
"VALIDATING" -> VALIDATING
188+
"VALIDATED" -> VALIDATED
189+
"PUBLISHING" -> PUBLISHING
190+
"PUBLISHED" -> PUBLISHED
191+
"FAILED" -> {
192+
FAILED(element["errors"].toString())
204193
}
194+
else -> error("Nmcp: unexpected deploymentState for deployment $deploymentId: $state")
195+
}
196+
205197
}

nmcp-tasks/src/main/kotlin/nmcp/transport/transport.kt

Lines changed: 111 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package nmcp.transport
22

33
import gratatouille.tasks.GLogger
44
import java.io.File
5+
import kotlin.math.pow
56
import okhttp3.HttpUrl.Companion.toHttpUrl
67
import okhttp3.MediaType
78
import okhttp3.MediaType.Companion.toMediaType
9+
import okhttp3.OkHttpClient
810
import okhttp3.Request
911
import okhttp3.RequestBody
1012
import okio.BufferedSink
1113
import okio.BufferedSource
14+
import okio.IOException
1215
import okio.buffer
1316
import okio.sink
1417
import okio.source
@@ -88,25 +91,21 @@ internal class HttpTransport(
8891

8992
logger.info("Nmcp: get '$url'")
9093

91-
val response = Request.Builder()
94+
val request = Request.Builder()
9295
.get()
9396
.url(url)
9497
.maybeAddAuthorization(getAuthorization)
9598
.build()
96-
.let {
97-
client.newCall(it).execute()
98-
}
9999

100-
if (response.code == 404) {
101-
response.close()
100+
val result = executeWithRetries(logger, client, request)
101+
if (result is HttpError && result.code == 404) {
102102
return null
103103
}
104-
if (!response.isSuccessful) {
105-
response.close()
106-
error("Nmcp: cannot GET '$url' (statusCode=${response.code}):\n${response.body.string()}")
104+
if (result !is Success) {
105+
error("Nmcp: cannot GET '$url' (${result})")
107106
}
108107

109-
return response.body.source()
108+
return result.body
110109
}
111110

112111
override fun put(path: String, body: Content) {
@@ -116,41 +115,117 @@ internal class HttpTransport(
116115

117116
logger.info("Nmcp: put '$url'")
118117

119-
Request.Builder()
118+
val request = Request.Builder()
120119
.put(body.toRequestBody())
121120
.url(url)
122121
.maybeAddAuthorization(putAuthorization)
123122
.build()
124-
.let {
125-
client.newCall(it).execute()
126-
}.use { response ->
127-
check(response.isSuccessful) {
128-
buildString {
129-
appendLine("Nmcp: cannot PUT '$url' (statusCode=${response.code}).")
130-
appendLine("Response body: '${response.body.string()}'")
131-
when (response.code) {
132-
400 -> {
133-
appendLine("Things to double check:")
134-
appendLine("Your artifacts have proper extensions (.jar, .pom, ...).")
135-
appendLine("If publishing a XML file, the XML version is 1.0.")
136-
appendLine("If publishing a snapshot, the artifacts version is ending with `-SNAPSHOT`.")
137-
}
138-
401 -> {
139-
appendLine("Check your credentials")
140-
appendLine("If publishing a snapshot, make sure you enabled snapshots on your namespace at https://central.sonatype.com/publishing/namespaces.")
141-
}
142-
403 -> {
143-
appendLine("Check that you are publishing to the correct groupId.")
144-
}
145-
429 -> {
146-
appendLine("Too many requests, try again later")
147-
}
148-
}
123+
124+
val result = executeWithRetries(logger, client, request)
125+
if (result is Success) {
126+
result.body.close()
127+
return
128+
}
129+
130+
val error = buildString {
131+
appendLine("Nmcp: cannot PUT '$url'")
132+
appendLine("$result")
133+
if (result is HttpError) {
134+
when (result.code) {
135+
400 -> {
136+
appendLine("Things to double check:")
137+
appendLine("Your artifacts have proper extensions (.jar, .pom, ...).")
138+
appendLine("If publishing a XML file, the XML version is 1.0.")
139+
appendLine("If publishing a snapshot, the artifacts version is ending with `-SNAPSHOT`.")
140+
}
141+
401 -> {
142+
appendLine("Check your credentials")
143+
appendLine("If publishing a snapshot, make sure you enabled snapshots on your namespace at https://central.sonatype.com/publishing/namespaces.")
144+
}
145+
403 -> {
146+
appendLine("Check that you are publishing to the correct groupId.")
147+
}
148+
429 -> {
149+
appendLine("Too many requests, try again later")
149150
}
150151
}
151152
}
153+
}
154+
error(error)
155+
}
156+
}
157+
158+
/**
159+
* In some cases, 401 is actually retryable.
160+
* This is the case for:
161+
* - PUT on htps://central.sonatype.com/repository/maven-snapshots/
162+
* - verification of a deployment
163+
*
164+
* This is quite unexpected, and we code defensively here to be robust to those cases.
165+
* We also retry other errors.
166+
*
167+
* Example of transient 401:
168+
* ```
169+
* Execution failed for task ':nmcpPublishAggregationToCentralPortal'.
170+
* > A failure occurred while executing nmcp.internal.task.NmcpPublishWithPublisherApiWorkAction
171+
* > Cannot verify deployment fbed2636-e25d-4538-be7d-7693d475595d status (HTTP status='401'): {"error":{"message":"Invalid token"}}
172+
* ```
173+
*
174+
* TODO:
175+
* - rework this to not block the thread.
176+
* - move the logic to some upper, sonatype-only layer
177+
* - fine tune the retry logic. Do we want to retry everything like we do here? Or are some HTTP errors actually
178+
* not retryable?
179+
*
180+
* @return the result. If the result is a success, the caller MUST close its body.
181+
*/
182+
internal fun executeWithRetries(logger: GLogger, client: OkHttpClient, request: Request): Result {
183+
var attempt = 0
184+
val attemptCount = 3
185+
while(true) {
186+
val result = executeInternal(client, request)
187+
if (result is Success) {
188+
return result
189+
}
190+
if (result is HttpError && result.code == 404) {
191+
// 404 is not retryable
192+
return result
193+
}
194+
if (attempt == attemptCount - 1) {
195+
return result
196+
}
197+
198+
logger.lifecycle("Nmcp: put '${request.url}' failed (${result}), retrying... (attempt ${attempt + 1}/${attemptCount})")
199+
Thread.sleep(2.0.pow(attempt.toDouble()).toLong() * 1_000)
200+
attempt++
201+
}
202+
}
203+
204+
internal fun executeInternal(client: OkHttpClient, request: Request): Result {
205+
return try {
206+
val response = client.newCall(request).execute()
207+
if (response.isSuccessful) {
208+
return Success(response.body.source())
209+
}
210+
211+
HttpError(response.code, response.body.string())
212+
} catch (e: IOException) {
213+
NetworkError(e)
214+
}
215+
}
216+
217+
internal sealed interface Result
218+
internal class NetworkError(val exception: IOException) : Result {
219+
override fun toString(): String {
220+
return "NetworkError: ${exception.message}"
221+
}
222+
}
223+
internal class HttpError(val code: Int, val body: String): Result {
224+
override fun toString(): String {
225+
return "HTTP error $code: '$body'"
152226
}
153227
}
228+
internal class Success(val body: BufferedSource) : Result
154229

155230
fun Content.toRequestBody(): RequestBody {
156231
return object : RequestBody() {

0 commit comments

Comments
 (0)