This commit is contained in:
Horis 2023-04-23 16:40:04 +08:00
parent faee44c605
commit e1b84f2521
4 changed files with 169 additions and 56 deletions

View File

@ -9,12 +9,18 @@ import okhttp3.EventListener
import okhttp3.MediaType.Companion.toMediaTypeOrNull import okhttp3.MediaType.Companion.toMediaTypeOrNull
import okhttp3.ResponseBody.Companion.asResponseBody import okhttp3.ResponseBody.Companion.asResponseBody
import okio.Buffer import okio.Buffer
import okio.Source
import okio.Timeout
import okio.buffer
import org.chromium.net.CronetException import org.chromium.net.CronetException
import org.chromium.net.UrlRequest import org.chromium.net.UrlRequest
import org.chromium.net.UrlResponseInfo import org.chromium.net.UrlResponseInfo
import java.io.IOException import java.io.IOException
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.* import java.util.*
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
@Keep @Keep
abstract class AbsCallBack( abstract class AbsCallBack(
@ -22,13 +28,14 @@ abstract class AbsCallBack(
val mCall: Call, val mCall: Call,
private val eventListener: EventListener? = null, private val eventListener: EventListener? = null,
private val responseCallback: Callback? = null private val responseCallback: Callback? = null
) : UrlRequest.Callback(), AutoCloseable { ) : UrlRequest.Callback() {
val buffer = Buffer()
var mResponse: Response var mResponse: Response
private var followCount = 0 private var followCount = 0
private var request: UrlRequest? = null
private var finished = AtomicBoolean(false)
private val callbackResults = ArrayBlockingQueue<CallbackResult>(2)
private val urlResponseInfoChain = arrayListOf<UrlResponseInfo>()
@Throws(IOException::class) @Throws(IOException::class)
@ -63,6 +70,7 @@ abstract class AbsCallBack(
return return
} }
followCount += 1 followCount += 1
urlResponseInfoChain.add(info)
val client = okHttpClient val client = okHttpClient
if (originalRequest.url.isHttps && newLocationUrl.startsWith("http://") && client.followSslRedirects) { if (originalRequest.url.isHttps && newLocationUrl.startsWith("http://") && client.followSslRedirects) {
request.followRedirect() request.followRedirect()
@ -78,14 +86,31 @@ abstract class AbsCallBack(
override fun onResponseStarted(request: UrlRequest, info: UrlResponseInfo) { override fun onResponseStarted(request: UrlRequest, info: UrlResponseInfo) {
this.mResponse = responseFromResponse(this.mResponse, info) this.request = request
val contentLength = info.allHeaders["Content-Length"]?.lastOrNull()?.toLongOrNull() ?: -1
val contentType = (info.allHeaders["content-type"]?.lastOrNull()
?: "text/plain; charset=\"utf-8\"").toMediaTypeOrNull()
val responseBody = CronetBodySource().buffer().asResponseBody(contentType, contentLength)
val newRequest = originalRequest.newBuilder().url(info.url).build()
val response = createResponse(originalRequest, info)
.request(newRequest)
.body(responseBody)
.priorResponse(buildPriorResponse(originalRequest, urlResponseInfoChain, info.urlChain))
.build()
mResponse = response
onSuccess(response)
//打印协议,用于调试 //打印协议,用于调试
DebugLog.i(javaClass.simpleName, "start[${info.negotiatedProtocol}]${info.url}") DebugLog.i(javaClass.simpleName, "start[${info.negotiatedProtocol}]${info.url}")
if (eventListener != null) { if (eventListener != null) {
eventListener.responseHeadersEnd(mCall, this.mResponse) eventListener.responseHeadersEnd(mCall, response)
eventListener.responseBodyStart(mCall) eventListener.responseBodyStart(mCall)
} }
request.read(ByteBuffer.allocateDirect(32 * 1024)) try {
responseCallback?.onResponse(mCall, response)
} catch (e: IOException) {
// Pass?
}
} }
@ -95,60 +120,31 @@ abstract class AbsCallBack(
info: UrlResponseInfo, info: UrlResponseInfo,
byteBuffer: ByteBuffer byteBuffer: ByteBuffer
) { ) {
callbackResults.add(CallbackResult(CallbackStep.ON_READ_COMPLETED, byteBuffer))
if (mCall.isCanceled()) {
request.cancel()
onError(IOException("Request Canceled"))
}
byteBuffer.flip()
try {
buffer.write(byteBuffer)
} catch (e: IOException) {
DebugLog.e(javaClass.name, "IOException during ByteBuffer read. Details: ", e)
onError(IOException("IOException during ByteBuffer read. Details:", e))
return
}
byteBuffer.clear()
request.read(byteBuffer)
} }
override fun onSucceeded(request: UrlRequest, info: UrlResponseInfo) { override fun onSucceeded(request: UrlRequest, info: UrlResponseInfo) {
callbackResults.add(CallbackResult(CallbackStep.ON_SUCCESS))
eventListener?.responseBodyEnd(mCall, info.receivedByteCount) eventListener?.responseBodyEnd(mCall, info.receivedByteCount)
val contentType: MediaType? = (this.mResponse.header("content-type")
?: "text/plain; charset=\"utf-8\"").toMediaTypeOrNull()
val responseBody: ResponseBody =
buffer.asResponseBody(contentType)
val newRequest = originalRequest.newBuilder().url(info.url).build()
this.mResponse = this.mResponse.newBuilder().body(responseBody).request(newRequest).build()
onSuccess(this.mResponse)
//DebugLog.i(javaClass.simpleName, "end[${info.negotiatedProtocol}]${info.url}") //DebugLog.i(javaClass.simpleName, "end[${info.negotiatedProtocol}]${info.url}")
eventListener?.callEnd(mCall) eventListener?.callEnd(mCall)
if (responseCallback != null) {
try {
responseCallback.onResponse(mCall, this.mResponse)
} catch (e: IOException) {
// Pass?
}
}
} }
//UrlResponseInfo可能为null //UrlResponseInfo可能为null
override fun onFailed(request: UrlRequest, info: UrlResponseInfo?, error: CronetException) { override fun onFailed(request: UrlRequest, info: UrlResponseInfo?, error: CronetException) {
callbackResults.add(CallbackResult(CallbackStep.ON_FAILED, null, error))
DebugLog.e(javaClass.name, error.message.toString()) DebugLog.e(javaClass.name, error.message.toString())
onError(error.asIOException()) onError(error.asIOException())
this.eventListener?.callFailed(mCall, error) eventListener?.callFailed(mCall, error)
responseCallback?.onFailure(mCall, error) responseCallback?.onFailure(mCall, error)
} }
override fun onCanceled(request: UrlRequest?, info: UrlResponseInfo?) { override fun onCanceled(request: UrlRequest?, info: UrlResponseInfo?) {
super.onCanceled(request, info) callbackResults.add(CallbackResult(CallbackStep.ON_CANCELED))
this.eventListener?.callEnd(mCall) eventListener?.callEnd(mCall)
//onError(IOException("Cronet Request Canceled")) //onError(IOException("Cronet Request Canceled"))
} }
@ -169,21 +165,26 @@ abstract class AbsCallBack(
val negotiatedProtocol = responseInfo.negotiatedProtocol.lowercase(Locale.getDefault()) val negotiatedProtocol = responseInfo.negotiatedProtocol.lowercase(Locale.getDefault())
return when { return when {
negotiatedProtocol.contains("h3") -> { negotiatedProtocol.contains("h3") -> {
return Protocol.QUIC Protocol.QUIC
} }
negotiatedProtocol.contains("quic") -> { negotiatedProtocol.contains("quic") -> {
Protocol.QUIC Protocol.QUIC
} }
negotiatedProtocol.contains("spdy") -> { negotiatedProtocol.contains("spdy") -> {
@Suppress("DEPRECATION") @Suppress("DEPRECATION")
Protocol.SPDY_3 Protocol.SPDY_3
} }
negotiatedProtocol.contains("h2") -> { negotiatedProtocol.contains("h2") -> {
Protocol.HTTP_2 Protocol.HTTP_2
} }
negotiatedProtocol.contains("1.1") -> { negotiatedProtocol.contains("1.1") -> {
Protocol.HTTP_1_1 Protocol.HTTP_1_1
} }
else -> { else -> {
Protocol.HTTP_1_0 Protocol.HTTP_1_0
} }
@ -211,23 +212,114 @@ abstract class AbsCallBack(
} }
private fun responseFromResponse( private fun createResponse(
response: Response, request: Request,
responseInfo: UrlResponseInfo responseInfo: UrlResponseInfo
): Response { ): Response.Builder {
val protocol = protocolFromNegotiatedProtocol(responseInfo) val protocol = protocolFromNegotiatedProtocol(responseInfo)
val headers = headersFromResponse(responseInfo) val headers = headersFromResponse(responseInfo)
return response.newBuilder() return Response.Builder()
.request(request)
.receivedResponseAtMillis(System.currentTimeMillis()) .receivedResponseAtMillis(System.currentTimeMillis())
.protocol(protocol) .protocol(protocol)
.code(responseInfo.httpStatusCode) .code(responseInfo.httpStatusCode)
.message(responseInfo.httpStatusText) .message(responseInfo.httpStatusText)
.headers(headers) .headers(headers)
}
private fun buildPriorResponse(
request: Request,
redirectResponseInfos: List<UrlResponseInfo>,
urlChain: List<String>
): Response? {
var priorResponse: Response? = null
if (redirectResponseInfos.isNotEmpty()) {
check(urlChain.size == redirectResponseInfos.size + 1) {
"The number of redirects should be consistent across URLs and headers!"
}
for (i in redirectResponseInfos.indices) {
val redirectedRequest = request.newBuilder().url(urlChain[i]).build()
priorResponse = createResponse(redirectedRequest, redirectResponseInfos[i])
.priorResponse(priorResponse)
.build() .build()
} }
}
return priorResponse
}
} }
inner class CronetBodySource : Source {
private var buffer = ByteBuffer.allocateDirect(32 * 1024)
private var closed = false
private val timeout = mCall.timeout().timeoutNanos()
override fun close() { override fun close() {
buffer.clear() if (closed) {
return
}
closed = true
if (!finished.get()) {
request?.cancel()
}
}
@Suppress("NULLABILITY_MISMATCH_BASED_ON_JAVA_ANNOTATIONS")
override fun read(sink: Buffer, byteCount: Long): Long {
if (mCall.isCanceled()) {
throw IOException("Request Canceled")
}
if (closed) {
throw IOException("Source Closed")
}
if (finished.get()) {
return -1
}
if (byteCount < buffer.limit()) {
buffer.limit(byteCount.toInt())
}
request?.read(buffer)
val result = callbackResults.poll(timeout, TimeUnit.NANOSECONDS)
if (result == null) {
request?.cancel()
throw IOException("Request Timeout")
}
return when (result.callbackStep) {
CallbackStep.ON_FAILED -> {
finished.set(true)
buffer = null
throw IOException(result.exception)
}
CallbackStep.ON_SUCCESS -> {
finished.set(true)
buffer = null
-1
}
CallbackStep.ON_CANCELED -> {
buffer = null
throw IOException("Request Canceled")
}
CallbackStep.ON_READ_COMPLETED -> {
result.buffer!!.flip()
val bytesWritten = sink.write(result.buffer)
result.buffer.clear()
bytesWritten.toLong()
}
}
}
override fun timeout(): Timeout {
return mCall.timeout()
}
} }
} }

View File

@ -0,0 +1,12 @@
package io.legado.app.lib.cronet
import org.chromium.net.CronetException
import java.nio.ByteBuffer
data class CallbackResult(
val callbackStep: CallbackStep,
val buffer: ByteBuffer? = null,
val exception: CronetException? = null
)

View File

@ -0,0 +1,8 @@
package io.legado.app.lib.cronet
enum class CallbackStep {
ON_READ_COMPLETED,
ON_SUCCESS,
ON_FAILED,
ON_CANCELED
}

View File

@ -11,6 +11,8 @@ import io.legado.app.help.coroutine.Coroutine
import io.legado.app.service.WebService import io.legado.app.service.WebService
import io.legado.app.utils.* import io.legado.app.utils.*
import io.legado.app.web.utils.AssetsWeb import io.legado.app.web.utils.AssetsWeb
import okio.Pipe
import okio.buffer
import java.io.* import java.io.*
class HttpServer(port: Int) : NanoHTTPD(port) { class HttpServer(port: Int) : NanoHTTPD(port) {
@ -101,11 +103,10 @@ class HttpServer(port: Int) : NanoHTTPD(port) {
) )
} else { } else {
val data = returnData.data val data = returnData.data
if (data is List<*> && data.size > 1000) { if (data is List<*> && data.size > 3000) {
val pis = PipedInputStream(1024 * 1024) val pipe = Pipe(16 * 1024)
Coroutine.async { Coroutine.async {
@Suppress("BlockingMethodInNonBlockingContext") pipe.sink.buffer().outputStream().use { out ->
PipedOutputStream(pis).use { out ->
BufferedWriter(OutputStreamWriter(out, "UTF-8")).use { BufferedWriter(OutputStreamWriter(out, "UTF-8")).use {
GSON.toJson(returnData, it) GSON.toJson(returnData, it)
} }
@ -114,7 +115,7 @@ class HttpServer(port: Int) : NanoHTTPD(port) {
newChunkedResponse( newChunkedResponse(
Response.Status.OK, Response.Status.OK,
"application/json", "application/json",
pis pipe.source.buffer().inputStream()
) )
} else { } else {
newFixedLengthResponse(GSON.toJson(returnData)) newFixedLengthResponse(GSON.toJson(returnData))