Skip to content

Commit 8521392

Browse files
committed
fix bugs
1 parent 45c40c1 commit 8521392

File tree

8 files changed

+59
-296
lines changed

8 files changed

+59
-296
lines changed

.dockerignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
target/
2+
!target/universal/stage

build.sbt

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,6 @@
11
val scala3Version = "3.7.1"
22
import Dependencies.*
33

4-
Universal / packageName := "app"
5-
6-
// delete all bin except server
7-
Universal / mappings := {
8-
val universalMappings = (Universal / mappings).value
9-
universalMappings filter { case (_, name) =>
10-
!(name.startsWith("bin/") && name != "bin/app")
11-
}
12-
}
13-
144
lazy val root = project
155
.in(file("."))
166
.settings(

src/main/resources/reference.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pekko.http.server.remote-address-header = on
2+
pekko.loglevel = "ERROR"

src/main/scala/Main.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,10 @@ object Main {
6161
println(s"websocket server close")
6262
}
6363
}
64-
logWebViewer.call2
64+
logWebViewer.call
6565
case _ => (_: Any, data: String) => println(data)
6666
}
6767

68-
69-
//val proxy = ReverseProxy(config.jks, HttpRequestFormat(func))
7068
val proxy = PekkoReverseProxy(config.jks, PekkoHttpRequestFormat(func))
7169
import actorSystem.dispatcher
7270

@@ -84,7 +82,7 @@ object Main {
8482
buf.append(s"dns resolver:\nSystem DNS Resolver\n")
8583
}
8684
buf.append(
87-
s"Attention: request header Accept-Encoding would override to gzip, deflate (pekko does not support brotli), response header Content-Encoding would drop for https encode/decode\n"
85+
s"Attention: request header Accept-Encoding would override to gzip, deflate (pekko does not support brotli)\n"
8886
)
8987
println(buf.toString)
9088
v.whenTerminated

src/main/scala/com/timzaak/proxy/HttpRequestFormat.scala

Lines changed: 0 additions & 136 deletions
This file was deleted.

src/main/scala/com/timzaak/proxy/LogWebViewer.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,7 @@ class LogWebViewer(certPath: Option[String])(using system: ActorSystem) {
2626
.orElse(request.header("Remote-Address").map(v => v.take(v.lastIndexOf(':'))))
2727
.getOrElse(request.connectionInfo.remote.map(_.getAddress.getHostAddress).getOrElse(""))
2828

29-
def call(request: ServerRequest, data: String) = {
30-
val ip = extractClientIP(request)
31-
manager ! ip -> data
32-
}
33-
34-
def call2(request:HttpRequest, data:String) = {
35-
29+
def call(request:HttpRequest, data:String) = {
3630
val ip = request.header[XForwardedFor].map(_.value)
3731
.orElse(request.header[org.apache.pekko.http.javadsl.model.headers.RemoteAddress].map(v => v.value.take(v.value.lastIndexOf(':'))))
3832
.orElse(request.attribute[RemoteAddress](AttributeKeys.remoteAddress).flatMap(_.toIP.map(_.ip.getHostAddress)))

src/main/scala/com/timzaak/proxy/PekkoHttpRequestFormat.scala

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ case class Record2(
2121
actorSystem: ActorSystem
2222
) {
2323
import actorSystem.dispatcher
24+
given Materializer = Materializer(actorSystem)
2425

2526
private val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
2627
private val startTime: LocalDateTime = LocalDateTime.now()
2728
private val buf = StringBuffer()
2829

30+
31+
2932
def requestBody(req: HttpRequest): HttpRequest = {
3033
val headerDesc = req.headers
3134
.collect { case header =>
@@ -35,22 +38,45 @@ case class Record2(
3538
buf.append(s"${startTime.format(dateTimeFormatter)} [$id] ${req.method} ${req.uri} ##TIME##\n")
3639
if (headerDesc.nonEmpty) {
3740
buf.append(headerDesc)
38-
buf.append("\nBody:\n")
41+
buf.append(s"\nContentType: ${req.entity.contentType}\nBody:\n")
3942
} else {
40-
buf.append("Body:\n")
43+
buf.append(s"ContentType: ${req.entity.contentType}\nBody:\n")
4144
}
4245

46+
4347
req.method match {
44-
case HttpMethods.POST | HttpMethods.PUT | HttpMethods.PATCH if req.entity.contentLengthOption.getOrElse(0L) > 0 =>
48+
case v if v != HttpMethods.GET || v!= HttpMethods.OPTIONS =>
4549
req.entity.contentType.mediaType match {
50+
case mediaType if mediaType.isMultipart =>
51+
Option(mediaType.getParams.get("boundary")) match {
52+
case Some(boundary) =>
53+
req.withEntity(
54+
req.entity.transformDataBytes(
55+
Flow[ByteString].alsoTo(
56+
parseMultipartStreamed(boundary, buf)
57+
)
58+
)
59+
)
60+
case _ =>
61+
req.withEntity(
62+
req.entity.transformDataBytes(
63+
Flow[ByteString].alsoTo(
64+
Sink.ignore.mapMaterializedValue(_.onComplete(_ => buf.append("[request body can not parser]\n")))
65+
)
66+
)
67+
)
68+
}
69+
4670
case mediaType
4771
if mediaType.isText || mediaType.isApplication || mediaType.isMessage || mediaType.isMultipart =>
4872
req.withEntity(
4973
req.entity.transformDataBytes(
5074
Flow[ByteString].alsoTo(
5175
Sink
5276
.foreach[ByteString](v => buf.append(v.utf8String))
53-
.mapMaterializedValue(_.onComplete(_ => buf.append("\n")))
77+
.mapMaterializedValue(_.onComplete{_ =>
78+
buf.append("\n")
79+
})
5480
)
5581
)
5682
)
@@ -77,9 +103,9 @@ case class Record2(
77103
buf.append(s"Response: ${resp.status}\n")
78104
if (headerDesc.nonEmpty) {
79105
buf.append(headerDesc)
80-
buf.append("\nBody:\n")
106+
buf.append(s"\nContentType: ${resp.entity.contentType}\nBody:\n")
81107
} else {
82-
buf.append("Body:\n")
108+
buf.append(s"ContentType: ${resp.entity.contentType}\nBody:\n")
83109
}
84110

85111
resp.entity.contentType.mediaType match {
@@ -90,7 +116,6 @@ case class Record2(
90116
Sink
91117
.foreach[ByteString](v => buf.append(v.utf8String))
92118
.mapMaterializedValue(_.onComplete(_ => {
93-
buf.append(s"[response body can not parser]\n")
94119
output()
95120
}))
96121
)
@@ -116,7 +141,6 @@ case class Record2(
116141
log(serverRequest, buf.toString.replaceFirst("##TIME##", s"${duration.toMillis}ms"))
117142
}
118143

119-
120144
private def parseMultipartStreamed(boundary:String, buf: StringBuffer)(using ec: ExecutionContext, mat: Materializer) = {
121145
val delimiter = s"--$boundary"
122146
val closeDelimiter = s"--$boundary--"
@@ -134,30 +158,29 @@ case class Record2(
134158
Flow[ByteString]
135159
.via(Framing.delimiter(ByteString("\r\n"), maximumFrameLength = 8192, allowTruncation = true))
136160
.drop(1)
137-
.mapAsync(1) { part =>
138-
val partCleaned = if (part.startsWith(ByteString("\r\n"))) {
161+
.statefulMapConcat { () =>
162+
var needSkip = false
163+
var isFile = false
164+
(line:ByteString) => {
139165
buf.append("\r\n")
140-
part.drop(2)
141-
} else {
142-
part
166+
if(needSkip) {
167+
buf.append("[file content skip log]")
168+
needSkip = false
169+
} else if(line.isEmpty) {
170+
if(isFile){
171+
needSkip = true
172+
isFile = false
173+
}
174+
buf.append(line.utf8String)
175+
} else {
176+
if(line.utf8String.contains("filename=")) {
177+
isFile = true
178+
}
179+
buf.append(line.utf8String)
180+
}
143181
}
144-
145-
// 拆解 header 和 body
146-
val (headerBytes, bodyBytes, idx) = splitHeaderAndBody(partCleaned)
147-
148-
val headersStr = headerBytes.utf8String
149-
val isFilePart = headersStr.contains("filename=")
150-
151-
if (!isFilePart) {
152-
buf.append(partCleaned.utf8String)
153-
} else {
154-
buf.append(partCleaned.take(idx).utf8String)
155-
buf.append("[file body, skip log]\n")
156-
}
157-
Future.successful(())
158-
}
159-
160-
182+
List.empty
183+
}.to(Sink.ignore.mapMaterializedValue(_.onComplete(_ => buf.append("\n"))))
161184
}
162185

163186

0 commit comments

Comments
 (0)