diff --git a/app/src/main/java/io/legado/app/ui/main/MainViewModel.kt b/app/src/main/java/io/legado/app/ui/main/MainViewModel.kt index 252dad72e..c86abcd5b 100644 --- a/app/src/main/java/io/legado/app/ui/main/MainViewModel.kt +++ b/app/src/main/java/io/legado/app/ui/main/MainViewModel.kt @@ -25,14 +25,19 @@ import io.legado.app.model.CacheBook import io.legado.app.model.ReadBook import io.legado.app.model.webBook.WebBook import io.legado.app.service.CacheBookService +import io.legado.app.utils.onEachParallel import io.legado.app.utils.postEvent import kotlinx.coroutines.Job import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import java.util.LinkedList -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import kotlin.math.min @@ -41,7 +46,7 @@ class MainViewModel(application: Application) : BaseViewModel(application) { private var poolSize = min(threadCount, AppConst.MAX_THREAD) private var upTocPool = Executors.newFixedThreadPool(poolSize).asCoroutineDispatcher() private val waitUpTocBooks = LinkedList() - private val onUpTocBooks = ConcurrentHashMap.newKeySet() + private val onUpTocBooks = hashSetOf() val onUpBooksLiveData = MutableLiveData() private var upTocJob: Job? = null private var cacheBookJob: Job? = null @@ -101,86 +106,70 @@ class MainViewModel(application: Application) : BaseViewModel(application) { upPool() postUpBooksLiveData() upTocJob = viewModelScope.launch(upTocPool) { - while (isActive) { - when { - waitUpTocBooks.isEmpty() -> { - upTocJob?.cancel() - upTocJob = null - } - - onUpTocBooks.size < threadCount -> { - updateToc() - } - - else -> { - delay(500) - } + flow { + while (true) { + emit(waitUpTocBooks.poll() ?: break) } - } + }.onEach { + onUpTocBooks.add(it) + postEvent(EventBus.UP_BOOKSHELF, it) + }.onEachParallel(threadCount) { + updateToc(it) + }.onEach { + onUpTocBooks.remove(it) + postEvent(EventBus.UP_BOOKSHELF, it) + postUpBooksLiveData() + }.onCompletion { + upTocJob = null + if (it == null && cacheBookJob == null && !CacheBookService.isRun) { + //所有目录更新完再开始缓存章节 + cacheBook() + } + }.catch { + AppLog.put("更新目录出错\n${it.localizedMessage}", it) + }.collect() } } - @Synchronized - private fun updateToc() { - val bookUrl = waitUpTocBooks.poll() ?: return - if (onUpTocBooks.contains(bookUrl)) { - postUpBooksLiveData() - return - } - val book = appDb.bookDao.getBook(bookUrl) - if (book == null) { - postUpBooksLiveData() - return - } + private suspend fun updateToc(bookUrl: String) { + val book = appDb.bookDao.getBook(bookUrl) ?: return val source = appDb.bookSourceDao.getBookSource(book.origin) if (source == null) { if (!book.isUpError) { book.addType(BookType.updateError) appDb.bookDao.update(book) } - postUpBooksLiveData() return } - upTocAdd(bookUrl) - execute(context = upTocPool, executeContext = upTocPool) { - kotlin.runCatching { - val oldBook = book.copy() - WebBook.runPreUpdateJs(source, book) - if (book.tocUrl.isBlank()) { - WebBook.getBookInfoAwait(source, book) - } - val toc = WebBook.getChapterListAwait(source, book).getOrThrow() - book.sync(oldBook) - book.removeType(BookType.updateError) - if (book.bookUrl == bookUrl) { - appDb.bookDao.update(book) - } else { - upTocAdd(book.bookUrl) - appDb.bookDao.insert(book) - BookHelp.updateCacheFolder(oldBook, book) - } - appDb.bookChapterDao.delByBook(bookUrl) - appDb.bookChapterDao.insert(*toc.toTypedArray()) - if (book.isSameNameAuthor(ReadBook.book)) { - ReadBook.book = book - ReadBook.chapterSize = book.totalChapterNum - } - addDownload(source, book) - }.onFailure { - AppLog.put("${book.name} 更新目录失败\n${it.localizedMessage}", it) - //这里可能因为时间太长书籍信息已经更改,所以重新获取 - appDb.bookDao.getBook(book.bookUrl)?.let { book -> - book.addType(BookType.updateError) - appDb.bookDao.update(book) - } + kotlin.runCatching { + val oldBook = book.copy() + WebBook.runPreUpdateJs(source, book) + if (book.tocUrl.isBlank()) { + WebBook.getBookInfoAwait(source, book) + } + val toc = WebBook.getChapterListAwait(source, book).getOrThrow() + book.sync(oldBook) + book.removeType(BookType.updateError) + if (book.bookUrl == bookUrl) { + appDb.bookDao.update(book) + } else { + appDb.bookDao.insert(book) + BookHelp.updateCacheFolder(oldBook, book) + } + appDb.bookChapterDao.delByBook(bookUrl) + appDb.bookChapterDao.insert(*toc.toTypedArray()) + if (book.isSameNameAuthor(ReadBook.book)) { + ReadBook.book = book + ReadBook.chapterSize = book.totalChapterNum + } + addDownload(source, book) + }.onFailure { + AppLog.put("${book.name} 更新目录失败\n${it.localizedMessage}", it) + //这里可能因为时间太长书籍信息已经更改,所以重新获取 + appDb.bookDao.getBook(book.bookUrl)?.let { book -> + book.addType(BookType.updateError) + appDb.bookDao.update(book) } - }.onCancel { - upTocCancel(bookUrl) - upTocCancel(book.bookUrl) - }.onFinally { - upTocFinally(bookUrl) - upTocFinally(book.bookUrl) - postUpBooksLiveData() } } @@ -192,33 +181,6 @@ class MainViewModel(application: Application) : BaseViewModel(application) { } } - @Synchronized - private fun upTocAdd(bookUrl: String) { - onUpTocBooks.add(bookUrl) - postEvent(EventBus.UP_BOOKSHELF, bookUrl) - } - - @Synchronized - private fun upTocCancel(bookUrl: String) { - onUpTocBooks.remove(bookUrl) - waitUpTocBooks.add(bookUrl) - postEvent(EventBus.UP_BOOKSHELF, bookUrl) - } - - @Synchronized - private fun upTocFinally(bookUrl: String) { - onUpTocBooks.remove(bookUrl) - postEvent(EventBus.UP_BOOKSHELF, bookUrl) - if (waitUpTocBooks.isEmpty() - && onUpTocBooks.isEmpty() - && cacheBookJob == null - && !CacheBookService.isRun - ) { - //所有目录更新完再开始缓存章节 - cacheBook() - } - } - @Synchronized private fun addDownload(source: BookSource, book: Book) { if (AppConfig.preDownloadNum == 0) return