Restructure
This commit is contained in:
108
jobs/ingestSharesJob.go
Normal file
108
jobs/ingestSharesJob.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ostafen/clover/v2"
|
||||
"github.com/ostafen/clover/v2/document"
|
||||
|
||||
"pool-stats/constants"
|
||||
"pool-stats/database"
|
||||
"pool-stats/models"
|
||||
)
|
||||
|
||||
type IngestSharesJob struct {
|
||||
db *clover.DB
|
||||
logPath string
|
||||
}
|
||||
|
||||
func NewIngestSharesJob(db *clover.DB, path string) *IngestSharesJob {
|
||||
return &IngestSharesJob{db: db, logPath: path}
|
||||
}
|
||||
|
||||
func (this *IngestSharesJob) WatchAndIngest() {
|
||||
ticker := time.NewTicker(constants.IngestSharesJobInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
<-ticker.C
|
||||
this.ingestClosedBlocks()
|
||||
}
|
||||
}
|
||||
|
||||
func (this *IngestSharesJob) ingestClosedBlocks() {
|
||||
entries, err := os.ReadDir(this.logPath)
|
||||
if err != nil {
|
||||
log.Println("Error reading logsDir:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Find block-like directories (starts with 000)
|
||||
blockDirs := []os.DirEntry{}
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() && strings.HasPrefix(entry.Name(), "000") {
|
||||
blockDirs = append(blockDirs, entry)
|
||||
}
|
||||
}
|
||||
|
||||
if len(blockDirs) <= 1 {
|
||||
return // Nothing to ingest
|
||||
}
|
||||
|
||||
// Sort dirs alphabetically (ascending), last is newest
|
||||
sort.Slice(blockDirs, func(i, j int) bool {
|
||||
return blockDirs[i].Name() < blockDirs[j].Name()
|
||||
})
|
||||
|
||||
// Ingest all except last (current block dir)
|
||||
for _, dir := range blockDirs[:len(blockDirs)-1] {
|
||||
this.ingestBlockDir(this.db, filepath.Join(this.logPath, dir.Name()))
|
||||
_ = os.RemoveAll(filepath.Join(this.logPath, dir.Name()))
|
||||
}
|
||||
}
|
||||
|
||||
func (this *IngestSharesJob) ingestBlockDir(db *clover.DB, dirPath string) {
|
||||
files, err := os.ReadDir(dirPath)
|
||||
if err != nil {
|
||||
log.Printf("Failed to read block dir %s: %v", dirPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
if !strings.HasSuffix(f.Name(), ".sharelog") {
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(filepath.Join(dirPath, f.Name()))
|
||||
if err != nil {
|
||||
log.Println("Failed to read file:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
lines := strings.Split(string(data), "\n")
|
||||
for _, line := range lines {
|
||||
if strings.TrimSpace(line) == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
var share models.ShareLog
|
||||
if err := json.Unmarshal([]byte(line), &share); err != nil {
|
||||
log.Println("JSON parse error:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
doc := document.NewDocumentOf(&share)
|
||||
if _, err := db.InsertOne(database.CollectionName, doc); err != nil {
|
||||
log.Println("DB insert error:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Ingested and deleted %s", dirPath)
|
||||
}
|
||||
@@ -31,19 +31,18 @@ func (job *RecalculateTopSharesJob) Run() error {
|
||||
}
|
||||
|
||||
func (job *RecalculateTopSharesJob) recalculateTopShares() {
|
||||
topSharesAmount := database.TopSharesAmount
|
||||
currentTopShares := database.ListTopShares(job.DB)
|
||||
|
||||
var newTopShares []models.ShareLog
|
||||
if currentTopShares == nil || len(currentTopShares) < topSharesAmount {
|
||||
newTopShares, _ = database.GetHighestSharesInRange(job.DB, database.CollectionName, time.Unix(0, 0), topSharesAmount)
|
||||
if currentTopShares == nil || len(currentTopShares) < constants.TopSharesAmount {
|
||||
newTopShares, _ = database.GetHighestSharesInRange(job.DB, database.CollectionName, time.Unix(0, 0), constants.TopSharesAmount)
|
||||
} else {
|
||||
sort.Slice(currentTopShares, func(i, j int) bool {
|
||||
return currentTopShares[i].CreateDate > currentTopShares[j].CreateDate
|
||||
})
|
||||
lastTopShareDate := currentTopShares[0].CreateDate
|
||||
lastTopShareDateTime := helpers.ParseCreateDate(lastTopShareDate)
|
||||
newTopShares, _ = database.GetHighestSharesInRange(job.DB, database.CollectionName, lastTopShareDateTime, topSharesAmount)
|
||||
newTopShares, _ = database.GetHighestSharesInRange(job.DB, database.CollectionName, lastTopShareDateTime, constants.TopSharesAmount)
|
||||
}
|
||||
|
||||
newTopShares = append(newTopShares, currentTopShares...)
|
||||
@@ -53,8 +52,8 @@ func (job *RecalculateTopSharesJob) recalculateTopShares() {
|
||||
newTopShares = notlinq.UniqueBy(newTopShares, func(s models.ShareLog) string {
|
||||
return s.Hash
|
||||
})
|
||||
if len(newTopShares) > topSharesAmount {
|
||||
newTopShares = newTopShares[:topSharesAmount]
|
||||
if len(newTopShares) > constants.TopSharesAmount {
|
||||
newTopShares = newTopShares[:constants.TopSharesAmount]
|
||||
}
|
||||
|
||||
database.ReplaceTopShares(job.DB, newTopShares)
|
||||
|
||||
Reference in New Issue
Block a user