This commit is contained in:
Horis 2024-01-31 18:13:20 +08:00
parent 0f0b25f78d
commit 8e7e82b213
4 changed files with 187 additions and 97 deletions

View File

@ -10,6 +10,7 @@ import io.legado.app.data.appDb
import io.legado.app.data.entities.Book
import io.legado.app.data.entities.BookChapter
import io.legado.app.data.entities.BookSource
import io.legado.app.help.config.AppConfig
import io.legado.app.model.analyzeRule.AnalyzeUrl
import io.legado.app.model.localBook.LocalBook
import io.legado.app.utils.ArchiveUtils
@ -24,12 +25,13 @@ import io.legado.app.utils.exists
import io.legado.app.utils.externalFiles
import io.legado.app.utils.getFile
import io.legado.app.utils.isContentScheme
import io.legado.app.utils.onEachParallel
import io.legado.app.utils.postEvent
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.withContext
import org.apache.commons.text.similarity.JaccardSimilarity
import splitties.init.appCtx
@ -151,19 +153,16 @@ object BookHelp {
bookChapter: BookChapter,
content: String
) = coroutineScope {
val awaitList = arrayListOf<Deferred<Unit>>()
val matcher = AppPattern.imgPattern.matcher(content)
while (matcher.find()) {
matcher.group(1)?.let { src ->
flow {
val matcher = AppPattern.imgPattern.matcher(content)
while (matcher.find()) {
val src = matcher.group(1) ?: continue
val mSrc = NetworkUtils.getAbsoluteURL(bookChapter.url, src)
awaitList.add(async {
saveImage(bookSource, book, mSrc, bookChapter)
})
emit(mSrc)
}
}
awaitList.forEach {
it.await()
}
}.onEachParallel(AppConfig.threadCount) { mSrc ->
saveImage(bookSource, book, mSrc, bookChapter)
}.collect()
}
suspend fun saveImage(

View File

@ -11,13 +11,15 @@ import io.legado.app.data.entities.rule.TocRule
import io.legado.app.exception.NoStackTraceException
import io.legado.app.exception.TocEmptyException
import io.legado.app.help.book.ContentProcessor
import io.legado.app.help.config.AppConfig
import io.legado.app.model.Debug
import io.legado.app.model.analyzeRule.AnalyzeRule
import io.legado.app.model.analyzeRule.AnalyzeUrl
import io.legado.app.utils.isTrue
import io.legado.app.utils.mapAsync
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.async
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.withContext
import splitties.init.appCtx
import kotlin.coroutines.coroutineContext
@ -87,23 +89,23 @@ object BookChapterList {
"◇并发解析目录,总页数:${chapterData.second.size}"
)
withContext(IO) {
val asyncArray = Array(chapterData.second.size) {
async(IO) {
val urlStr = chapterData.second[it]
val res = AnalyzeUrl(
mUrl = urlStr,
source = bookSource,
ruleData = book,
headerMapF = bookSource.getHeaderMap()
).getStrResponseAwait() //控制并发访问
analyzeChapterList(
book, urlStr, res.url,
res.body!!, tocRule, listRule, bookSource, false
).first
flow {
for (urlStr in chapterData.second) {
emit(urlStr)
}
}
asyncArray.forEach { coroutine ->
chapterList.addAll(coroutine.await())
}.mapAsync(AppConfig.threadCount) { urlStr ->
val res = AnalyzeUrl(
mUrl = urlStr,
source = bookSource,
ruleData = book,
headerMapF = bookSource.getHeaderMap()
).getStrResponseAwait() //控制并发访问
analyzeChapterList(
book, urlStr, res.url,
res.body!!, tocRule, listRule, bookSource, false
).first
}.collect {
chapterList.addAll(it)
}
}
}
@ -142,7 +144,8 @@ object BookChapterList {
}
}
val replaceRules = ContentProcessor.get(book.name, book.origin).getTitleReplaceRules()
book.latestChapterTitle = list.last().getDisplayTitle(replaceRules, book.getUseReplaceRule())
book.latestChapterTitle =
list.last().getDisplayTitle(replaceRules, book.getUseReplaceRule())
book.durChapterTitle = list.getOrElse(book.durChapterIndex) { list.last() }
.getDisplayTitle(replaceRules, book.getUseReplaceRule())
if (book.totalChapterNum < list.size) {

View File

@ -42,6 +42,8 @@ import io.legado.app.utils.activityPendingIntent
import io.legado.app.utils.cnCompare
import io.legado.app.utils.createFolderIfNotExist
import io.legado.app.utils.isContentScheme
import io.legado.app.utils.mapAsync
import io.legado.app.utils.mapAsyncIndexed
import io.legado.app.utils.outputStream
import io.legado.app.utils.postEvent
import io.legado.app.utils.readBytes
@ -49,20 +51,14 @@ import io.legado.app.utils.readText
import io.legado.app.utils.servicePendingIntent
import io.legado.app.utils.toastOnUi
import io.legado.app.utils.writeBytes
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers.Default
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collectIndexed
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.withIndex
import kotlinx.coroutines.launch
import me.ag2s.epublib.domain.Author
import me.ag2s.epublib.domain.Date
@ -327,23 +323,19 @@ class ExportBookService : BaseService() {
val threads = if (AppConfig.parallelExportBook) {
AppConst.MAX_THREAD
} else {
0
1
}
flow {
appDb.bookChapterDao.getChapterList(book.bookUrl).forEach { chapter ->
val task = async(Default, start = CoroutineStart.LAZY) {
getExportData(book, chapter, contentProcessor, useReplace)
}
emit(task)
}
}.onEach { it.start() }
.buffer(threads)
.map { it.await() }
.collectIndexed { index, result ->
postEvent(EventBus.EXPORT_BOOK, book.bookUrl)
exportProgress[book.bookUrl] = index
append.invoke(result.first, result.second)
emit(chapter)
}
}.mapAsync(threads) { chapter ->
getExportData(book, chapter, contentProcessor, useReplace)
}.collectIndexed { index, result ->
postEvent(EventBus.EXPORT_BOOK, book.bookUrl)
exportProgress[book.bookUrl] = index
append.invoke(result.first, result.second)
}
}
@ -641,56 +633,52 @@ class ExportBookService : BaseService() {
val threads = if (AppConfig.parallelExportBook) {
AppConst.MAX_THREAD
} else {
0
1
}
flow {
appDb.bookChapterDao.getChapterList(book.bookUrl).forEachIndexed { index, chapter ->
val task = async(Default, start = CoroutineStart.LAZY) {
val content = BookHelp.getContent(book, chapter)
val (contentFix, resources) = fixPic(
book,
content ?: if (chapter.isVolume) "" else "null",
chapter
)
// 不导出vip标识
chapter.isVip = false
val content1 = contentProcessor
.getContent(
book,
chapter,
contentFix,
includeTitle = false,
useReplace = useReplace,
chineseConvert = false,
reSegment = false
).toString()
val title = chapter.run {
// 不导出vip标识
isVip = false
getDisplayTitle(
contentProcessor.getTitleReplaceRules(),
useReplace = useReplace
)
}
val chapterResource = ResourceUtil.createChapterResource(
title.replace("\uD83D\uDD12", ""),
content1,
contentModel,
"Text/chapter_${index}.html"
)
ExportChapter(title, chapterResource, resources)
}
emit(task)
appDb.bookChapterDao.getChapterList(book.bookUrl).forEach { chapter ->
emit(chapter)
}
}.onEach { it.start() }
.buffer(threads)
.map { it.await() }
.collectIndexed { index, exportChapter ->
postEvent(EventBus.EXPORT_BOOK, book.bookUrl)
exportProgress[book.bookUrl] = index
epubBook.resources.addAll(exportChapter.resources)
epubBook.addSection(exportChapter.title, exportChapter.chapterResource)
}.mapAsyncIndexed(threads) { index, chapter ->
val content = BookHelp.getContent(book, chapter)
val (contentFix, resources) = fixPic(
book,
content ?: if (chapter.isVolume) "" else "null",
chapter
)
// 不导出vip标识
chapter.isVip = false
val content1 = contentProcessor
.getContent(
book,
chapter,
contentFix,
includeTitle = false,
useReplace = useReplace,
chineseConvert = false,
reSegment = false
).toString()
val title = chapter.run {
// 不导出vip标识
isVip = false
getDisplayTitle(
contentProcessor.getTitleReplaceRules(),
useReplace = useReplace
)
}
val chapterResource = ResourceUtil.createChapterResource(
title.replace("\uD83D\uDD12", ""),
content1,
contentModel,
"Text/chapter_${index}.html"
)
ExportChapter(title, chapterResource, resources)
}.collectIndexed { index, exportChapter ->
postEvent(EventBus.EXPORT_BOOK, book.bookUrl)
exportProgress[book.bookUrl] = index
epubBook.resources.addAll(exportChapter.resources)
epubBook.addSection(exportChapter.title, exportChapter.chapterResource)
}
}
data class ExportChapter(

View File

@ -1,17 +1,22 @@
package io.legado.app.utils
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.sync.Semaphore
@OptIn(ExperimentalCoroutinesApi::class)
inline fun <T> Flow<T>.onEachParallel(
concurrency: Int,
crossinline action: suspend (T) -> Unit
): Flow<T> = flatMapMerge(concurrency) { value ->
return@flatMapMerge flow {
flow {
action(value)
emit(value)
}
@ -37,3 +42,98 @@ inline fun <T> Flow<T>.onEachIndexed(
emit(value)
}
}
inline fun <T, R> Flow<T>.mapIndexed(
crossinline action: suspend (index: Int, T) -> R,
): Flow<R> = flow {
var index = 0
collect { value ->
emit(action(index++, value))
}
}
inline fun <T, R> Flow<T>.mapAsync(
concurrency: Int,
crossinline transform: suspend (T) -> R
): Flow<R> = if (concurrency == 1) {
map { transform(it) }
} else {
Semaphore(concurrency).let { semaphore ->
channelFlow {
collect {
semaphore.acquire()
send(async { transform(it) })
}
}.map {
it.await()
}.onEach { semaphore.release() }
}
}
inline fun <T, R> Flow<T>.mapAsyncIndexed(
concurrency: Int,
crossinline transform: suspend (index: Int, T) -> R
): Flow<R> = if (concurrency == 1) {
mapIndexed { index, value ->
transform(index, value)
}
} else {
Semaphore(concurrency).let { semaphore ->
channelFlow {
var index = 0
collect {
semaphore.acquire()
val i = index++
send(async { transform(i, it) })
}
}.map {
it.await()
}.onEach { semaphore.release() }
}
}
inline fun <T> Flow<T>.onEachAsync(
concurrency: Int,
crossinline action: suspend (T) -> Unit
): Flow<T> = if (concurrency == 1) {
onEach { action(it) }
} else {
Semaphore(concurrency).let { semaphore ->
channelFlow {
collect {
semaphore.acquire()
send(async {
action(it)
it
})
}
}.map {
it.await()
}.onEach { semaphore.release() }
}
}
inline fun <T> Flow<T>.onEachAsyncIndexed(
concurrency: Int,
crossinline action: suspend (index: Int, T) -> Unit
): Flow<T> = if (concurrency == 1) {
onEachIndexed { index, value ->
action(index, value)
}
} else {
Semaphore(concurrency).let { semaphore ->
channelFlow {
var index = 0
collect {
semaphore.acquire()
val i = index++
send(async {
action(i, it)
it
})
}
}.map {
it.await()
}.onEach { semaphore.release() }
}
}