pool-stats/jobs/ingestSharesJob.go
Pijus Kamandulis 4ddd9abd2e Restructure
2025-06-23 19:38:02 +03:00

109 lines
2.4 KiB
Go

package jobs
import (
"encoding/json"
"log"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/ostafen/clover/v2"
"github.com/ostafen/clover/v2/document"
"pool-stats/constants"
"pool-stats/database"
"pool-stats/models"
)
type IngestSharesJob struct {
db *clover.DB
logPath string
}
func NewIngestSharesJob(db *clover.DB, path string) *IngestSharesJob {
return &IngestSharesJob{db: db, logPath: path}
}
func (this *IngestSharesJob) WatchAndIngest() {
ticker := time.NewTicker(constants.IngestSharesJobInterval)
defer ticker.Stop()
for {
<-ticker.C
this.ingestClosedBlocks()
}
}
func (this *IngestSharesJob) ingestClosedBlocks() {
entries, err := os.ReadDir(this.logPath)
if err != nil {
log.Println("Error reading logsDir:", err)
return
}
// Find block-like directories (starts with 000)
blockDirs := []os.DirEntry{}
for _, entry := range entries {
if entry.IsDir() && strings.HasPrefix(entry.Name(), "000") {
blockDirs = append(blockDirs, entry)
}
}
if len(blockDirs) <= 1 {
return // Nothing to ingest
}
// Sort dirs alphabetically (ascending), last is newest
sort.Slice(blockDirs, func(i, j int) bool {
return blockDirs[i].Name() < blockDirs[j].Name()
})
// Ingest all except last (current block dir)
for _, dir := range blockDirs[:len(blockDirs)-1] {
this.ingestBlockDir(this.db, filepath.Join(this.logPath, dir.Name()))
_ = os.RemoveAll(filepath.Join(this.logPath, dir.Name()))
}
}
func (this *IngestSharesJob) ingestBlockDir(db *clover.DB, dirPath string) {
files, err := os.ReadDir(dirPath)
if err != nil {
log.Printf("Failed to read block dir %s: %v", dirPath, err)
return
}
for _, f := range files {
if !strings.HasSuffix(f.Name(), ".sharelog") {
continue
}
data, err := os.ReadFile(filepath.Join(dirPath, f.Name()))
if err != nil {
log.Println("Failed to read file:", err)
continue
}
lines := strings.Split(string(data), "\n")
for _, line := range lines {
if strings.TrimSpace(line) == "" {
continue
}
var share models.ShareLog
if err := json.Unmarshal([]byte(line), &share); err != nil {
log.Println("JSON parse error:", err)
continue
}
doc := document.NewDocumentOf(&share)
if _, err := db.InsertOne(database.CollectionName, doc); err != nil {
log.Println("DB insert error:", err)
}
}
}
log.Printf("Ingested and deleted %s", dirPath)
}