package jobs import ( "encoding/json" "log" "os" "path/filepath" "sort" "strings" "time" "github.com/ostafen/clover/v2" "github.com/ostafen/clover/v2/document" "pool-stats/constants" "pool-stats/database" "pool-stats/models" ) type IngestSharesJob struct { db *clover.DB logPath string } func NewIngestSharesJob(db *clover.DB, path string) *IngestSharesJob { return &IngestSharesJob{db: db, logPath: path} } func (this *IngestSharesJob) WatchAndIngest() { ticker := time.NewTicker(constants.IngestSharesJobInterval) defer ticker.Stop() for { <-ticker.C this.ingestClosedBlocks() } } func (this *IngestSharesJob) ingestClosedBlocks() { entries, err := os.ReadDir(this.logPath) if err != nil { log.Println("Error reading logsDir:", err) return } // Find block-like directories (starts with 000) blockDirs := []os.DirEntry{} for _, entry := range entries { if entry.IsDir() && strings.HasPrefix(entry.Name(), "000") { blockDirs = append(blockDirs, entry) } } if len(blockDirs) <= 1 { return // Nothing to ingest } // Sort dirs alphabetically (ascending), last is newest sort.Slice(blockDirs, func(i, j int) bool { return blockDirs[i].Name() < blockDirs[j].Name() }) // Ingest all except last (current block dir) for _, dir := range blockDirs[:len(blockDirs)-1] { this.ingestBlockDir(this.db, filepath.Join(this.logPath, dir.Name())) _ = os.RemoveAll(filepath.Join(this.logPath, dir.Name())) } } func (this *IngestSharesJob) ingestBlockDir(db *clover.DB, dirPath string) { files, err := os.ReadDir(dirPath) if err != nil { log.Printf("Failed to read block dir %s: %v", dirPath, err) return } for _, f := range files { if !strings.HasSuffix(f.Name(), ".sharelog") { continue } data, err := os.ReadFile(filepath.Join(dirPath, f.Name())) if err != nil { log.Println("Failed to read file:", err) continue } lines := strings.Split(string(data), "\n") for _, line := range lines { if strings.TrimSpace(line) == "" { continue } var share models.ShareLog if err := json.Unmarshal([]byte(line), &share); err != nil { log.Println("JSON parse error:", err) continue } doc := document.NewDocumentOf(&share) if _, err := db.InsertOne(database.CollectionName, doc); err != nil { log.Println("DB insert error:", err) } } } log.Printf("Ingested and deleted %s", dirPath) }