174 lines
4.2 KiB
Go
174 lines
4.2 KiB
Go
package app
|
||
|
||
import (
|
||
"archive/zip"
|
||
"book-tools/internal/config"
|
||
"book-tools/pkg/reaper"
|
||
"fmt"
|
||
"log"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
type App struct {
|
||
cfg *config.Config
|
||
db *gorm.DB
|
||
}
|
||
|
||
func NewApp(cfg *config.Config, db *gorm.DB) *App {
|
||
return &App{cfg: cfg, db: db}
|
||
}
|
||
|
||
var totalAddedBooks uint64
|
||
|
||
type BookJob struct {
|
||
FB2 reaper.FB2
|
||
}
|
||
|
||
func (a *App) Run() error {
|
||
// fmt.Printf("Работаем с папкой: %s\n", a.cfg.BaseDir)
|
||
// fmt.Printf("Используем базу: %s\n", a.cfg.DBPath)
|
||
|
||
zipFiles := findZipFiles(a.cfg.BaseDir)
|
||
totalZips := len(zipFiles)
|
||
fmt.Printf("Found %d archives\n", totalZips)
|
||
|
||
jobChan := make(chan BookJob, 1000)
|
||
var dbWg sync.WaitGroup
|
||
var processedCount uint64 = 0
|
||
|
||
// 🧠 Писатель в БД, добавляем пачками по 50 книг
|
||
dbWg.Add(1)
|
||
go func() {
|
||
defer dbWg.Done()
|
||
|
||
batchSize := 500
|
||
batch := make([]BookJob, 0, batchSize)
|
||
flush := func() {
|
||
addedBooks := 0
|
||
if len(batch) == 0 {
|
||
return
|
||
}
|
||
tx := a.db.Begin()
|
||
for _, job := range batch {
|
||
bookID := reaper.FB2toDB(tx, job.FB2)
|
||
if tx.Error != nil {
|
||
tx.Rollback()
|
||
log.Printf("Failed add book to transaction: %v", tx.Error)
|
||
return
|
||
}
|
||
if bookID > 0 {
|
||
addedBooks++
|
||
}
|
||
}
|
||
tx.Commit()
|
||
atomic.AddUint64(&processedCount, uint64(len(batch)))
|
||
atomic.AddUint64(&totalAddedBooks, uint64(addedBooks))
|
||
batch = batch[:0]
|
||
}
|
||
|
||
for job := range jobChan {
|
||
batch = append(batch, job)
|
||
if len(batch) >= batchSize {
|
||
flush()
|
||
}
|
||
}
|
||
flush()
|
||
}()
|
||
|
||
// Обработка ZIP в параллели
|
||
processZipFilesParallel(a.cfg.BaseDir, zipFiles, jobChan, &processedCount, totalZips)
|
||
|
||
close(jobChan)
|
||
dbWg.Wait()
|
||
|
||
fmt.Printf("\nAll done. Added %d books.\n", totalAddedBooks)
|
||
return nil
|
||
}
|
||
|
||
func findZipFiles(basePath string) []string {
|
||
var zips []string
|
||
_ = filepath.Walk(basePath, func(path string, info os.FileInfo, err error) error {
|
||
if err == nil && !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".zip") {
|
||
zips = append(zips, path)
|
||
}
|
||
return nil
|
||
})
|
||
return zips
|
||
}
|
||
|
||
func processZipFilesParallel(basePath string, zipFiles []string, jobChan chan<- BookJob, processedCount *uint64, totalZips int) {
|
||
const workers = 16
|
||
|
||
var wg sync.WaitGroup
|
||
tasks := make(chan string, workers)
|
||
|
||
// Для прогресса - можно сделать так: считаем обработанные zip файлы
|
||
var zipProcessed uint64 = 0
|
||
|
||
// Запускаем воркеров
|
||
for i := 0; i < workers; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
for zipPath := range tasks {
|
||
start := time.Now()
|
||
processZip(basePath, zipPath, jobChan)
|
||
duration := time.Since(start)
|
||
|
||
atomic.AddUint64(&zipProcessed, 1)
|
||
// Вывод прогресса в одну строку, перезаписывая её
|
||
percentage := float64(atomic.LoadUint64(&zipProcessed)) / float64(totalZips) * 100
|
||
fmt.Printf("\r[Worker %d] Processed archives: %d/%d (%.1f%%) — %s (%.2fs) (%d books)", id, atomic.LoadUint64(&zipProcessed), totalZips, percentage, filepath.Base(zipPath), duration.Seconds(), totalAddedBooks)
|
||
}
|
||
}(i + 1)
|
||
}
|
||
|
||
for _, z := range zipFiles {
|
||
tasks <- z
|
||
}
|
||
close(tasks)
|
||
wg.Wait()
|
||
}
|
||
|
||
func processZip(basePath string, zipPath string, jobChan chan<- BookJob) {
|
||
r, err := zip.OpenReader(zipPath)
|
||
if err != nil {
|
||
log.Printf("Failed open zip: %v, %s\n", err, zipPath)
|
||
return
|
||
}
|
||
bookcase, err := filepath.Rel(basePath, zipPath)
|
||
if err != nil {
|
||
log.Printf("Error rel path: %v\n", err)
|
||
return
|
||
}
|
||
defer r.Close()
|
||
|
||
for _, f := range r.File {
|
||
if strings.HasSuffix(strings.ToLower(f.Name), ".fb2") {
|
||
rc, err := f.Open()
|
||
if err != nil {
|
||
log.Printf("Unable read file from archive: %v\n", err)
|
||
return
|
||
}
|
||
rawFB2 := reaper.Parse(rc)
|
||
_ = rc.Close()
|
||
|
||
if rawFB2 == nil {
|
||
// log.Printf("Не удалось распарсить: %s\n", f.Name)
|
||
return
|
||
}
|
||
fb2 := reaper.RawToFB2(*rawFB2, f.FileInfo().Name(), &bookcase, f.UncompressedSize64, nil)
|
||
// total := atomic.LoadUint64(&totalAddedBooks)
|
||
// fmt.Printf("\rpaersed book %s %d", "total", total)
|
||
jobChan <- BookJob{FB2: fb2}
|
||
}
|
||
}
|
||
}
|