From 813b9faeaa465efb925e04558c28030c1a623b40 Mon Sep 17 00:00:00 2001 From: Pijus Kamandulis Date: Wed, 12 Mar 2025 21:06:10 +0200 Subject: [PATCH] Added support for Badger as an alternative storage backend --- api/config/config.go | 15 + api/config/models.go | 2 + api/handlers/documents.go | 1 + api/tests/collections_test.go | 31 +- api/tests/config_test.go | 77 +- api/tests/databases_test.go | 17 +- api/tests/documents_test.go | 815 +++++++++--------- api/tests/documents_trailingslash_test.go | 3 +- cmd/server/server.go | 19 +- go.mod | 12 + go.sum | 31 +- .../badger_datastore/badger_datastore.go | 36 + .../datastore/badger_datastore/collections.go | 103 +++ .../datastore/badger_datastore/databases.go | 80 ++ .../badger_datastore/db_abstractions.go | 207 +++++ .../badger_datastore/document_iterator.go | 60 ++ .../datastore/badger_datastore/documents.go | 127 +++ .../badger_datastore/partition_key_ranges.go | 53 ++ .../badger_datastore/stored_procedures.go | 107 +++ .../datastore/badger_datastore/triggers.go | 107 +++ .../user_defined_functions.go | 107 +++ internal/datastore/datastore.go | 1 + .../map_datastore/array_document_iterator.go | 4 + internal/datastore/models.go | 1 + sharedlibrary/sharedlibrary.go | 16 +- 25 files changed, 1574 insertions(+), 458 deletions(-) create mode 100644 internal/datastore/badger_datastore/badger_datastore.go create mode 100644 internal/datastore/badger_datastore/collections.go create mode 100644 internal/datastore/badger_datastore/databases.go create mode 100644 internal/datastore/badger_datastore/db_abstractions.go create mode 100644 internal/datastore/badger_datastore/document_iterator.go create mode 100644 internal/datastore/badger_datastore/documents.go create mode 100644 internal/datastore/badger_datastore/partition_key_ranges.go create mode 100644 internal/datastore/badger_datastore/stored_procedures.go create mode 100644 internal/datastore/badger_datastore/triggers.go create mode 100644 internal/datastore/badger_datastore/user_defined_functions.go diff --git a/api/config/config.go b/api/config/config.go index 676fac1..984ea41 100644 --- a/api/config/config.go +++ b/api/config/config.go @@ -15,6 +15,11 @@ const ( ExplorerBaseUrlLocation = "/_explorer" ) +const ( + DataStoreMap = "map" + DataStoreBadger = "badger" +) + func ParseFlags() ServerConfig { host := flag.String("Host", "localhost", "Hostname") port := flag.Int("Port", 8081, "Listen port") @@ -28,6 +33,8 @@ func ParseFlags() ServerConfig { persistDataPath := flag.String("Persist", "", "Saves data to given path on application exit") logLevel := NewEnumValue("info", []string{"debug", "info", "error", "silent"}) flag.Var(logLevel, "LogLevel", fmt.Sprintf("Sets the logging level %s", logLevel.AllowedValuesList())) + dataStore := NewEnumValue("map", []string{DataStoreMap, DataStoreBadger}) + flag.Var(dataStore, "DataStore", fmt.Sprintf("Sets the data store %s, (badger is currently in the experimental phase)", dataStore.AllowedValuesList())) flag.Parse() setFlagsFromEnvironment() @@ -44,6 +51,7 @@ func ParseFlags() ServerConfig { config.DisableTls = *disableTls config.AccountKey = *accountKey config.LogLevel = logLevel.value + config.DataStore = dataStore.value config.PopulateCalculatedFields() @@ -68,6 +76,13 @@ func (c *ServerConfig) PopulateCalculatedFields() { default: logger.SetLogLevel(logger.LogLevelInfo) } + + if c.DataStore == DataStoreBadger && + (c.InitialDataFilePath != "" || c.PersistDataFilePath != "") { + logger.ErrorLn("InitialData and Persist options are currently not supported with Badger data store") + c.InitialDataFilePath = "" + c.PersistDataFilePath = "" + } } func (c *ServerConfig) ApplyDefaultsToEmptyFields() { diff --git a/api/config/models.go b/api/config/models.go index 9322e3f..4613160 100644 --- a/api/config/models.go +++ b/api/config/models.go @@ -17,4 +17,6 @@ type ServerConfig struct { DisableTls bool `json:"disableTls"` LogLevel string `json:"logLevel"` ExplorerBaseUrlLocation string `json:"explorerBaseUrlLocation"` + + DataStore string `json:"dataStore"` } diff --git a/api/handlers/documents.go b/api/handlers/documents.go index 10bc0bc..54c0151 100644 --- a/api/handlers/documents.go +++ b/api/handlers/documents.go @@ -383,6 +383,7 @@ func (h *Handlers) executeQueryDocuments(databaseId string, collectionId string, if status != datastore.StatusOk { return nil, status } + defer allDocumentsIterator.Close() rowsIterator := converters.NewDocumentToRowTypeIterator(allDocumentsIterator) diff --git a/api/tests/collections_test.go b/api/tests/collections_test.go index 845ab36..251ed57 100644 --- a/api/tests/collections_test.go +++ b/api/tests/collections_test.go @@ -3,32 +3,29 @@ package tests_test import ( "context" "errors" - "fmt" "net/http" "testing" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" - "github.com/pikami/cosmium/api/config" "github.com/pikami/cosmium/internal/datastore" "github.com/stretchr/testify/assert" ) func Test_Collections(t *testing.T) { - ts := runTestServer() - defer ts.Server.Close() + presets := []testPreset{PresetMapStore, PresetBadgerStore} - client, err := azcosmos.NewClientFromConnectionString( - fmt.Sprintf("AccountEndpoint=%s;AccountKey=%s", ts.URL, config.DefaultAccountKey), - &azcosmos.ClientOptions{}, - ) - assert.Nil(t, err) + setUp := func(ts *TestServer, client *azcosmos.Client) *azcosmos.DatabaseClient { + ts.DataStore.CreateDatabase(datastore.Database{ID: testDatabaseName}) + databaseClient, err := client.NewDatabase(testDatabaseName) + assert.Nil(t, err) - ts.DataStore.CreateDatabase(datastore.Database{ID: testDatabaseName}) - databaseClient, err := client.NewDatabase(testDatabaseName) - assert.Nil(t, err) + return databaseClient + } + + runTestsWithPresets(t, "Collection Create", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { + databaseClient := setUp(ts, client) - t.Run("Collection Create", func(t *testing.T) { t.Run("Should create collection", func(t *testing.T) { createResponse, err := databaseClient.CreateContainer(context.TODO(), azcosmos.ContainerProperties{ ID: testCollectionName, @@ -57,7 +54,9 @@ func Test_Collections(t *testing.T) { }) }) - t.Run("Collection Read", func(t *testing.T) { + runTestsWithPresets(t, "Collection Read", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { + databaseClient := setUp(ts, client) + t.Run("Should read collection", func(t *testing.T) { ts.DataStore.CreateCollection(testDatabaseName, datastore.Collection{ ID: testCollectionName, @@ -90,7 +89,9 @@ func Test_Collections(t *testing.T) { }) }) - t.Run("Collection Delete", func(t *testing.T) { + runTestsWithPresets(t, "Collection Delete", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { + databaseClient := setUp(ts, client) + t.Run("Should delete collection", func(t *testing.T) { ts.DataStore.CreateCollection(testDatabaseName, datastore.Collection{ ID: testCollectionName, diff --git a/api/tests/config_test.go b/api/tests/config_test.go index d54b4c9..cbc172b 100644 --- a/api/tests/config_test.go +++ b/api/tests/config_test.go @@ -1,13 +1,18 @@ package tests_test import ( + "fmt" "net/http/httptest" + "testing" + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" "github.com/pikami/cosmium/api" "github.com/pikami/cosmium/api/config" "github.com/pikami/cosmium/internal/datastore" + badgerdatastore "github.com/pikami/cosmium/internal/datastore/badger_datastore" mapdatastore "github.com/pikami/cosmium/internal/datastore/map_datastore" "github.com/pikami/cosmium/internal/logger" + "github.com/stretchr/testify/assert" ) type TestServer struct { @@ -16,14 +21,29 @@ type TestServer struct { URL string } -func runTestServerCustomConfig(config *config.ServerConfig) *TestServer { - dataStore := mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{}) +func getDefaultTestServerConfig() *config.ServerConfig { + return &config.ServerConfig{ + AccountKey: config.DefaultAccountKey, + ExplorerPath: "/tmp/nothing", + ExplorerBaseUrlLocation: config.ExplorerBaseUrlLocation, + DataStore: "map", + } +} - api := api.NewApiServer(dataStore, config) +func runTestServerCustomConfig(configuration *config.ServerConfig) *TestServer { + var dataStore datastore.DataStore + switch configuration.DataStore { + case config.DataStoreBadger: + dataStore = badgerdatastore.NewBadgerDataStore() + default: + dataStore = mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{}) + } + + api := api.NewApiServer(dataStore, configuration) server := httptest.NewServer(api.GetRouter()) - config.DatabaseEndpoint = server.URL + configuration.DatabaseEndpoint = server.URL return &TestServer{ Server: server, @@ -33,11 +53,7 @@ func runTestServerCustomConfig(config *config.ServerConfig) *TestServer { } func runTestServer() *TestServer { - config := &config.ServerConfig{ - AccountKey: config.DefaultAccountKey, - ExplorerPath: "/tmp/nothing", - ExplorerBaseUrlLocation: config.ExplorerBaseUrlLocation, - } + config := getDefaultTestServerConfig() config.LogLevel = "debug" logger.SetLogLevel(logger.LogLevelDebug) @@ -50,3 +66,46 @@ const ( testDatabaseName = "test-db" testCollectionName = "test-coll" ) + +type testFunc func(t *testing.T, ts *TestServer, cosmosClient *azcosmos.Client) +type testPreset string + +const ( + PresetMapStore testPreset = "MapDS" + PresetBadgerStore testPreset = "BadgerDS" +) + +func runTestsWithPreset(t *testing.T, name string, testPreset testPreset, f testFunc) { + serverConfig := getDefaultTestServerConfig() + + serverConfig.LogLevel = "debug" + logger.SetLogLevel(logger.LogLevelDebug) + + switch testPreset { + case PresetBadgerStore: + serverConfig.DataStore = config.DataStoreBadger + case PresetMapStore: + serverConfig.DataStore = config.DataStoreMap + } + + ts := runTestServerCustomConfig(serverConfig) + defer ts.Server.Close() + + client, err := azcosmos.NewClientFromConnectionString( + fmt.Sprintf("AccountEndpoint=%s;AccountKey=%s", ts.URL, config.DefaultAccountKey), + &azcosmos.ClientOptions{}, + ) + assert.Nil(t, err) + + testName := fmt.Sprintf("%s_%s", testPreset, name) + + t.Run(testName, func(t *testing.T) { + f(t, ts, client) + }) +} + +func runTestsWithPresets(t *testing.T, name string, testPresets []testPreset, f testFunc) { + for _, testPreset := range testPresets { + runTestsWithPreset(t, name, testPreset, f) + } +} diff --git a/api/tests/databases_test.go b/api/tests/databases_test.go index 79257a1..323e4da 100644 --- a/api/tests/databases_test.go +++ b/api/tests/databases_test.go @@ -3,28 +3,19 @@ package tests_test import ( "context" "errors" - "fmt" "net/http" "testing" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" - "github.com/pikami/cosmium/api/config" "github.com/pikami/cosmium/internal/datastore" "github.com/stretchr/testify/assert" ) func Test_Databases(t *testing.T) { - ts := runTestServer() - defer ts.Server.Close() + presets := []testPreset{PresetMapStore, PresetBadgerStore} - client, err := azcosmos.NewClientFromConnectionString( - fmt.Sprintf("AccountEndpoint=%s;AccountKey=%s", ts.URL, config.DefaultAccountKey), - &azcosmos.ClientOptions{}, - ) - assert.Nil(t, err) - - t.Run("Database Create", func(t *testing.T) { + runTestsWithPresets(t, "Database Create", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { t.Run("Should create database", func(t *testing.T) { ts.DataStore.DeleteDatabase(testDatabaseName) @@ -55,7 +46,7 @@ func Test_Databases(t *testing.T) { }) }) - t.Run("Database Read", func(t *testing.T) { + runTestsWithPresets(t, "Database Read", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { t.Run("Should read database", func(t *testing.T) { ts.DataStore.CreateDatabase(datastore.Database{ ID: testDatabaseName, @@ -88,7 +79,7 @@ func Test_Databases(t *testing.T) { }) }) - t.Run("Database Delete", func(t *testing.T) { + runTestsWithPresets(t, "Database Delete", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { t.Run("Should delete database", func(t *testing.T) { ts.DataStore.CreateDatabase(datastore.Database{ ID: testDatabaseName, diff --git a/api/tests/documents_test.go b/api/tests/documents_test.go index 95fc0a0..e3988f0 100644 --- a/api/tests/documents_test.go +++ b/api/tests/documents_test.go @@ -53,9 +53,7 @@ func testCosmosQuery(t *testing.T, } } -func documents_InitializeDb(t *testing.T) (*TestServer, *azcosmos.ContainerClient) { - ts := runTestServer() - +func documents_InitializeDb(t *testing.T, ts *TestServer) *azcosmos.ContainerClient { ts.DataStore.CreateDatabase(datastore.Database{ID: testDatabaseName}) ts.DataStore.CreateCollection(testDatabaseName, datastore.Collection{ ID: testCollectionName, @@ -79,438 +77,439 @@ func documents_InitializeDb(t *testing.T) (*TestServer, *azcosmos.ContainerClien collectionClient, err := client.NewContainer(testDatabaseName, testCollectionName) assert.Nil(t, err) - return ts, collectionClient + return collectionClient } func Test_Documents(t *testing.T) { - ts, collectionClient := documents_InitializeDb(t) - defer ts.Server.Close() + presets := []testPreset{PresetMapStore, PresetBadgerStore} - t.Run("Should query document", func(t *testing.T) { - testCosmosQuery(t, collectionClient, - "SELECT c.id, c[\"pk\"] FROM c ORDER BY c.id", - nil, - []interface{}{ - map[string]interface{}{"id": "12345", "pk": "123"}, - map[string]interface{}{"id": "67890", "pk": "456"}, - }, - ) + runTestsWithPresets(t, "Test_Documents", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { + collectionClient := documents_InitializeDb(t, ts) + + t.Run("Should query document", func(t *testing.T) { + testCosmosQuery(t, collectionClient, + "SELECT c.id, c[\"pk\"] FROM c ORDER BY c.id", + nil, + []interface{}{ + map[string]interface{}{"id": "12345", "pk": "123"}, + map[string]interface{}{"id": "67890", "pk": "456"}, + }, + ) + }) + + t.Run("Should query VALUE array", func(t *testing.T) { + testCosmosQuery(t, collectionClient, + "SELECT VALUE [c.id, c[\"pk\"]] FROM c ORDER BY c.id", + nil, + []interface{}{ + []interface{}{"12345", "123"}, + []interface{}{"67890", "456"}, + }, + ) + }) + + t.Run("Should query VALUE object", func(t *testing.T) { + testCosmosQuery(t, collectionClient, + "SELECT VALUE { id: c.id, _pk: c.pk } FROM c ORDER BY c.id", + nil, + []interface{}{ + map[string]interface{}{"id": "12345", "_pk": "123"}, + map[string]interface{}{"id": "67890", "_pk": "456"}, + }, + ) + }) + + t.Run("Should query document with single WHERE condition", func(t *testing.T) { + testCosmosQuery(t, collectionClient, + `select c.id + FROM c + WHERE c.isCool=true + ORDER BY c.id`, + nil, + []interface{}{ + map[string]interface{}{"id": "67890"}, + }, + ) + }) + + t.Run("Should query document with query parameters", func(t *testing.T) { + testCosmosQuery(t, collectionClient, + `select c.id + FROM c + WHERE c.id=@param_id + ORDER BY c.id`, + []azcosmos.QueryParameter{ + {Name: "@param_id", Value: "67890"}, + }, + []interface{}{ + map[string]interface{}{"id": "67890"}, + }, + ) + }) + + t.Run("Should query document with query parameters as accessor", func(t *testing.T) { + testCosmosQuery(t, collectionClient, + `select c.id + FROM c + WHERE c[@param]="67890" + ORDER BY c.id`, + []azcosmos.QueryParameter{ + {Name: "@param", Value: "id"}, + }, + []interface{}{ + map[string]interface{}{"id": "67890"}, + }, + ) + }) + + t.Run("Should query array accessor", func(t *testing.T) { + testCosmosQuery(t, collectionClient, + `SELECT c.id, + c["arr"][0] AS arr0, + c["arr"][1] AS arr1, + c["arr"][2] AS arr2, + c["arr"][3] AS arr3 + FROM c ORDER BY c.id`, + nil, + []interface{}{ + map[string]interface{}{"id": "12345", "arr0": 1.0, "arr1": 2.0, "arr2": 3.0, "arr3": nil}, + map[string]interface{}{"id": "67890", "arr0": 6.0, "arr1": 7.0, "arr2": 8.0, "arr3": nil}, + }, + ) + }) + + t.Run("Should handle parallel writes", func(t *testing.T) { + var wg sync.WaitGroup + rutineCount := 100 + results := make(chan error, rutineCount) + + createCall := func(i int) { + defer wg.Done() + item := map[string]interface{}{ + "id": fmt.Sprintf("id-%d", i), + "pk": fmt.Sprintf("pk-%d", i), + "val": i, + } + bytes, err := json.Marshal(item) + if err != nil { + results <- err + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _, err = collectionClient.CreateItem( + ctx, + azcosmos.PartitionKey{}, + bytes, + &azcosmos.ItemOptions{ + EnableContentResponseOnWrite: false, + }, + ) + results <- err + + collectionClient.ReadItem(ctx, azcosmos.PartitionKey{}, fmt.Sprintf("id-%d", i), nil) + collectionClient.DeleteItem(ctx, azcosmos.PartitionKey{}, fmt.Sprintf("id-%d", i), nil) + } + + for i := 0; i < rutineCount; i++ { + wg.Add(1) + go createCall(i) + } + + wg.Wait() + close(results) + + for err := range results { + if err != nil { + t.Errorf("Error creating item: %v", err) + } + } + }) }) - t.Run("Should query VALUE array", func(t *testing.T) { - testCosmosQuery(t, collectionClient, - "SELECT VALUE [c.id, c[\"pk\"]] FROM c ORDER BY c.id", - nil, - []interface{}{ - []interface{}{"12345", "123"}, - []interface{}{"67890", "456"}, - }, - ) - }) + runTestsWithPresets(t, "Test_Documents_Patch", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { + collectionClient := documents_InitializeDb(t, ts) - t.Run("Should query VALUE object", func(t *testing.T) { - testCosmosQuery(t, collectionClient, - "SELECT VALUE { id: c.id, _pk: c.pk } FROM c ORDER BY c.id", - nil, - []interface{}{ - map[string]interface{}{"id": "12345", "_pk": "123"}, - map[string]interface{}{"id": "67890", "_pk": "456"}, - }, - ) - }) + t.Run("Should PATCH document", func(t *testing.T) { + context := context.TODO() + expectedData := map[string]interface{}{"id": "67890", "pk": "666", "newField": "newValue", "incr": 15., "setted": "isSet"} - t.Run("Should query document with single WHERE condition", func(t *testing.T) { - testCosmosQuery(t, collectionClient, - `select c.id - FROM c - WHERE c.isCool=true - ORDER BY c.id`, - nil, - []interface{}{ - map[string]interface{}{"id": "67890"}, - }, - ) - }) + patch := azcosmos.PatchOperations{} + patch.AppendAdd("/newField", "newValue") + patch.AppendIncrement("/incr", 15) + patch.AppendRemove("/isCool") + patch.AppendReplace("/pk", "666") + patch.AppendSet("/setted", "isSet") - t.Run("Should query document with query parameters", func(t *testing.T) { - testCosmosQuery(t, collectionClient, - `select c.id - FROM c - WHERE c.id=@param_id - ORDER BY c.id`, - []azcosmos.QueryParameter{ - {Name: "@param_id", Value: "67890"}, - }, - []interface{}{ - map[string]interface{}{"id": "67890"}, - }, - ) - }) + itemResponse, err := collectionClient.PatchItem( + context, + azcosmos.PartitionKey{}, + "67890", + patch, + &azcosmos.ItemOptions{ + EnableContentResponseOnWrite: false, + }, + ) + assert.Nil(t, err) - t.Run("Should query document with query parameters as accessor", func(t *testing.T) { - testCosmosQuery(t, collectionClient, - `select c.id - FROM c - WHERE c[@param]="67890" - ORDER BY c.id`, - []azcosmos.QueryParameter{ - {Name: "@param", Value: "id"}, - }, - []interface{}{ - map[string]interface{}{"id": "67890"}, - }, - ) - }) + var itemResponseBody map[string]interface{} + json.Unmarshal(itemResponse.Value, &itemResponseBody) - t.Run("Should query array accessor", func(t *testing.T) { - testCosmosQuery(t, collectionClient, - `SELECT c.id, - c["arr"][0] AS arr0, - c["arr"][1] AS arr1, - c["arr"][2] AS arr2, - c["arr"][3] AS arr3 - FROM c ORDER BY c.id`, - nil, - []interface{}{ - map[string]interface{}{"id": "12345", "arr0": 1.0, "arr1": 2.0, "arr2": 3.0, "arr3": nil}, - map[string]interface{}{"id": "67890", "arr0": 6.0, "arr1": 7.0, "arr2": 8.0, "arr3": nil}, - }, - ) - }) + assert.Equal(t, expectedData["id"], itemResponseBody["id"]) + assert.Equal(t, expectedData["pk"], itemResponseBody["pk"]) + assert.Empty(t, itemResponseBody["isCool"]) + assert.Equal(t, expectedData["newField"], itemResponseBody["newField"]) + assert.Equal(t, expectedData["incr"], itemResponseBody["incr"]) + assert.Equal(t, expectedData["setted"], itemResponseBody["setted"]) + }) - t.Run("Should handle parallel writes", func(t *testing.T) { - var wg sync.WaitGroup - rutineCount := 100 - results := make(chan error, rutineCount) + t.Run("Should not allow to PATCH document ID", func(t *testing.T) { + context := context.TODO() + + patch := azcosmos.PatchOperations{} + patch.AppendReplace("/id", "newValue") + + _, err := collectionClient.PatchItem( + context, + azcosmos.PartitionKey{}, + "67890", + patch, + &azcosmos.ItemOptions{ + EnableContentResponseOnWrite: false, + }, + ) + assert.NotNil(t, err) + + var respErr *azcore.ResponseError + if errors.As(err, &respErr) { + assert.Equal(t, http.StatusUnprocessableEntity, respErr.StatusCode) + } else { + panic(err) + } + }) + + t.Run("CreateItem", func(t *testing.T) { + context := context.TODO() - createCall := func(i int) { - defer wg.Done() item := map[string]interface{}{ - "id": fmt.Sprintf("id-%d", i), - "pk": fmt.Sprintf("pk-%d", i), - "val": i, + "Id": "6789011", + "pk": "456", + "newField": "newValue2", } bytes, err := json.Marshal(item) - if err != nil { - results <- err - return - } + assert.Nil(t, err) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - _, err = collectionClient.CreateItem( - ctx, + r, err2 := collectionClient.CreateItem( + context, azcosmos.PartitionKey{}, bytes, &azcosmos.ItemOptions{ EnableContentResponseOnWrite: false, }, ) - results <- err + assert.NotNil(t, r) + assert.Nil(t, err2) + }) - collectionClient.ReadItem(ctx, azcosmos.PartitionKey{}, fmt.Sprintf("id-%d", i), nil) - collectionClient.DeleteItem(ctx, azcosmos.PartitionKey{}, fmt.Sprintf("id-%d", i), nil) - } + t.Run("CreateItem that already exists", func(t *testing.T) { + context := context.TODO() - for i := 0; i < rutineCount; i++ { - wg.Add(1) - go createCall(i) - } + item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": false, "arr": []int{1, 2, 3}} + bytes, err := json.Marshal(item) + assert.Nil(t, err) - wg.Wait() - close(results) + r, err := collectionClient.CreateItem( + context, + azcosmos.PartitionKey{}, + bytes, + &azcosmos.ItemOptions{ + EnableContentResponseOnWrite: false, + }, + ) + assert.NotNil(t, r) + assert.NotNil(t, err) - for err := range results { - if err != nil { - t.Errorf("Error creating item: %v", err) + var respErr *azcore.ResponseError + if errors.As(err, &respErr) { + assert.Equal(t, http.StatusConflict, respErr.StatusCode) + } else { + panic(err) } - } - }) -} - -func Test_Documents_Patch(t *testing.T) { - ts, collectionClient := documents_InitializeDb(t) - defer ts.Server.Close() - - t.Run("Should PATCH document", func(t *testing.T) { - context := context.TODO() - expectedData := map[string]interface{}{"id": "67890", "pk": "666", "newField": "newValue", "incr": 15., "setted": "isSet"} - - patch := azcosmos.PatchOperations{} - patch.AppendAdd("/newField", "newValue") - patch.AppendIncrement("/incr", 15) - patch.AppendRemove("/isCool") - patch.AppendReplace("/pk", "666") - patch.AppendSet("/setted", "isSet") - - itemResponse, err := collectionClient.PatchItem( - context, - azcosmos.PartitionKey{}, - "67890", - patch, - &azcosmos.ItemOptions{ - EnableContentResponseOnWrite: false, - }, - ) - assert.Nil(t, err) - - var itemResponseBody map[string]interface{} - json.Unmarshal(itemResponse.Value, &itemResponseBody) - - assert.Equal(t, expectedData["id"], itemResponseBody["id"]) - assert.Equal(t, expectedData["pk"], itemResponseBody["pk"]) - assert.Empty(t, itemResponseBody["isCool"]) - assert.Equal(t, expectedData["newField"], itemResponseBody["newField"]) - assert.Equal(t, expectedData["incr"], itemResponseBody["incr"]) - assert.Equal(t, expectedData["setted"], itemResponseBody["setted"]) - }) - - t.Run("Should not allow to PATCH document ID", func(t *testing.T) { - context := context.TODO() - - patch := azcosmos.PatchOperations{} - patch.AppendReplace("/id", "newValue") - - _, err := collectionClient.PatchItem( - context, - azcosmos.PartitionKey{}, - "67890", - patch, - &azcosmos.ItemOptions{ - EnableContentResponseOnWrite: false, - }, - ) - assert.NotNil(t, err) - - var respErr *azcore.ResponseError - if errors.As(err, &respErr) { - assert.Equal(t, http.StatusUnprocessableEntity, respErr.StatusCode) - } else { - panic(err) - } - }) - - t.Run("CreateItem", func(t *testing.T) { - context := context.TODO() - - item := map[string]interface{}{ - "Id": "6789011", - "pk": "456", - "newField": "newValue2", - } - bytes, err := json.Marshal(item) - assert.Nil(t, err) - - r, err2 := collectionClient.CreateItem( - context, - azcosmos.PartitionKey{}, - bytes, - &azcosmos.ItemOptions{ - EnableContentResponseOnWrite: false, - }, - ) - assert.NotNil(t, r) - assert.Nil(t, err2) - }) - - t.Run("CreateItem that already exists", func(t *testing.T) { - context := context.TODO() - - item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": false, "arr": []int{1, 2, 3}} - bytes, err := json.Marshal(item) - assert.Nil(t, err) - - r, err := collectionClient.CreateItem( - context, - azcosmos.PartitionKey{}, - bytes, - &azcosmos.ItemOptions{ - EnableContentResponseOnWrite: false, - }, - ) - assert.NotNil(t, r) - assert.NotNil(t, err) - - var respErr *azcore.ResponseError - if errors.As(err, &respErr) { - assert.Equal(t, http.StatusConflict, respErr.StatusCode) - } else { - panic(err) - } - }) - - t.Run("UpsertItem new", func(t *testing.T) { - context := context.TODO() - - item := map[string]interface{}{"id": "123456", "pk": "1234", "isCool": false, "arr": []int{1, 2, 3}} - bytes, err := json.Marshal(item) - assert.Nil(t, err) - - r, err2 := collectionClient.UpsertItem( - context, - azcosmos.PartitionKey{}, - bytes, - &azcosmos.ItemOptions{ - EnableContentResponseOnWrite: false, - }, - ) - assert.NotNil(t, r) - assert.Nil(t, err2) - }) - - t.Run("UpsertItem that already exists", func(t *testing.T) { - context := context.TODO() - - item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": false, "arr": []int{1, 2, 3, 4}} - bytes, err := json.Marshal(item) - assert.Nil(t, err) - - r, err2 := collectionClient.UpsertItem( - context, - azcosmos.PartitionKey{}, - bytes, - &azcosmos.ItemOptions{ - EnableContentResponseOnWrite: false, - }, - ) - assert.NotNil(t, r) - assert.Nil(t, err2) - }) -} - -func Test_Documents_TransactionalBatch(t *testing.T) { - ts, collectionClient := documents_InitializeDb(t) - defer ts.Server.Close() - - t.Run("Should execute CREATE transactional batch", func(t *testing.T) { - context := context.TODO() - batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) - - newItem := map[string]interface{}{ - "id": "678901", - } - bytes, err := json.Marshal(newItem) - assert.Nil(t, err) - - batch.CreateItem(bytes, nil) - response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) - assert.Nil(t, err) - assert.True(t, response.Success) - assert.Equal(t, 1, len(response.OperationResults)) - - operationResponse := response.OperationResults[0] - assert.NotNil(t, operationResponse) - assert.NotNil(t, operationResponse.ResourceBody) - assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode) - - var itemResponseBody map[string]interface{} - json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) - assert.Equal(t, newItem["id"], itemResponseBody["id"]) - - createdDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string)) - assert.Equal(t, newItem["id"], createdDoc["id"]) - }) - - t.Run("Should execute DELETE transactional batch", func(t *testing.T) { - context := context.TODO() - batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) - - batch.DeleteItem("12345", nil) - response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) - assert.Nil(t, err) - assert.True(t, response.Success) - assert.Equal(t, 1, len(response.OperationResults)) - - operationResponse := response.OperationResults[0] - assert.NotNil(t, operationResponse) - assert.Equal(t, int32(http.StatusNoContent), operationResponse.StatusCode) - - _, status := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, "12345") - assert.Equal(t, datastore.StatusNotFound, int(status)) - }) - - t.Run("Should execute REPLACE transactional batch", func(t *testing.T) { - context := context.TODO() - batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) - - newItem := map[string]interface{}{ - "id": "67890", - "pk": "666", - } - bytes, err := json.Marshal(newItem) - assert.Nil(t, err) - - batch.ReplaceItem("67890", bytes, nil) - response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) - assert.Nil(t, err) - assert.True(t, response.Success) - assert.Equal(t, 1, len(response.OperationResults)) - - operationResponse := response.OperationResults[0] - assert.NotNil(t, operationResponse) - assert.NotNil(t, operationResponse.ResourceBody) - assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode) - - var itemResponseBody map[string]interface{} - json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) - assert.Equal(t, newItem["id"], itemResponseBody["id"]) - assert.Equal(t, newItem["pk"], itemResponseBody["pk"]) - - updatedDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string)) - assert.Equal(t, newItem["id"], updatedDoc["id"]) - assert.Equal(t, newItem["pk"], updatedDoc["pk"]) - }) - - t.Run("Should execute UPSERT transactional batch", func(t *testing.T) { - context := context.TODO() - batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) - - newItem := map[string]interface{}{ - "id": "678901", - "pk": "666", - } - bytes, err := json.Marshal(newItem) - assert.Nil(t, err) - - batch.UpsertItem(bytes, nil) - response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) - assert.Nil(t, err) - assert.True(t, response.Success) - assert.Equal(t, 1, len(response.OperationResults)) - - operationResponse := response.OperationResults[0] - assert.NotNil(t, operationResponse) - assert.NotNil(t, operationResponse.ResourceBody) - assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode) - - var itemResponseBody map[string]interface{} - json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) - assert.Equal(t, newItem["id"], itemResponseBody["id"]) - assert.Equal(t, newItem["pk"], itemResponseBody["pk"]) - - updatedDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string)) - assert.Equal(t, newItem["id"], updatedDoc["id"]) - assert.Equal(t, newItem["pk"], updatedDoc["pk"]) - }) - - t.Run("Should execute READ transactional batch", func(t *testing.T) { - context := context.TODO() - batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) - - batch.ReadItem("67890", nil) - response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) - assert.Nil(t, err) - assert.True(t, response.Success) - assert.Equal(t, 1, len(response.OperationResults)) - - operationResponse := response.OperationResults[0] - assert.NotNil(t, operationResponse) - assert.NotNil(t, operationResponse.ResourceBody) - assert.Equal(t, int32(http.StatusOK), operationResponse.StatusCode) - - var itemResponseBody map[string]interface{} - json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) - assert.Equal(t, "67890", itemResponseBody["id"]) + }) + + t.Run("UpsertItem new", func(t *testing.T) { + context := context.TODO() + + item := map[string]interface{}{"id": "123456", "pk": "1234", "isCool": false, "arr": []int{1, 2, 3}} + bytes, err := json.Marshal(item) + assert.Nil(t, err) + + r, err2 := collectionClient.UpsertItem( + context, + azcosmos.PartitionKey{}, + bytes, + &azcosmos.ItemOptions{ + EnableContentResponseOnWrite: false, + }, + ) + assert.NotNil(t, r) + assert.Nil(t, err2) + }) + + t.Run("UpsertItem that already exists", func(t *testing.T) { + context := context.TODO() + + item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": false, "arr": []int{1, 2, 3, 4}} + bytes, err := json.Marshal(item) + assert.Nil(t, err) + + r, err2 := collectionClient.UpsertItem( + context, + azcosmos.PartitionKey{}, + bytes, + &azcosmos.ItemOptions{ + EnableContentResponseOnWrite: false, + }, + ) + assert.NotNil(t, r) + assert.Nil(t, err2) + }) + }) + + runTestsWithPresets(t, "Test_Documents_TransactionalBatch", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { + collectionClient := documents_InitializeDb(t, ts) + + t.Run("Should execute CREATE transactional batch", func(t *testing.T) { + context := context.TODO() + batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) + + newItem := map[string]interface{}{ + "id": "678901", + } + bytes, err := json.Marshal(newItem) + assert.Nil(t, err) + + batch.CreateItem(bytes, nil) + response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) + assert.Nil(t, err) + assert.True(t, response.Success) + assert.Equal(t, 1, len(response.OperationResults)) + + operationResponse := response.OperationResults[0] + assert.NotNil(t, operationResponse) + assert.NotNil(t, operationResponse.ResourceBody) + assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode) + + var itemResponseBody map[string]interface{} + json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) + assert.Equal(t, newItem["id"], itemResponseBody["id"]) + + createdDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string)) + assert.Equal(t, newItem["id"], createdDoc["id"]) + }) + + t.Run("Should execute DELETE transactional batch", func(t *testing.T) { + context := context.TODO() + batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) + + batch.DeleteItem("12345", nil) + response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) + assert.Nil(t, err) + assert.True(t, response.Success) + assert.Equal(t, 1, len(response.OperationResults)) + + operationResponse := response.OperationResults[0] + assert.NotNil(t, operationResponse) + assert.Equal(t, int32(http.StatusNoContent), operationResponse.StatusCode) + + _, status := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, "12345") + assert.Equal(t, datastore.StatusNotFound, int(status)) + }) + + t.Run("Should execute REPLACE transactional batch", func(t *testing.T) { + context := context.TODO() + batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) + + newItem := map[string]interface{}{ + "id": "67890", + "pk": "666", + } + bytes, err := json.Marshal(newItem) + assert.Nil(t, err) + + batch.ReplaceItem("67890", bytes, nil) + response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) + assert.Nil(t, err) + assert.True(t, response.Success) + assert.Equal(t, 1, len(response.OperationResults)) + + operationResponse := response.OperationResults[0] + assert.NotNil(t, operationResponse) + assert.NotNil(t, operationResponse.ResourceBody) + assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode) + + var itemResponseBody map[string]interface{} + json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) + assert.Equal(t, newItem["id"], itemResponseBody["id"]) + assert.Equal(t, newItem["pk"], itemResponseBody["pk"]) + + updatedDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string)) + assert.Equal(t, newItem["id"], updatedDoc["id"]) + assert.Equal(t, newItem["pk"], updatedDoc["pk"]) + }) + + t.Run("Should execute UPSERT transactional batch", func(t *testing.T) { + context := context.TODO() + batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) + + newItem := map[string]interface{}{ + "id": "678901", + "pk": "666", + } + bytes, err := json.Marshal(newItem) + assert.Nil(t, err) + + batch.UpsertItem(bytes, nil) + response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) + assert.Nil(t, err) + assert.True(t, response.Success) + assert.Equal(t, 1, len(response.OperationResults)) + + operationResponse := response.OperationResults[0] + assert.NotNil(t, operationResponse) + assert.NotNil(t, operationResponse.ResourceBody) + assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode) + + var itemResponseBody map[string]interface{} + json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) + assert.Equal(t, newItem["id"], itemResponseBody["id"]) + assert.Equal(t, newItem["pk"], itemResponseBody["pk"]) + + updatedDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string)) + assert.Equal(t, newItem["id"], updatedDoc["id"]) + assert.Equal(t, newItem["pk"], updatedDoc["pk"]) + }) + + t.Run("Should execute READ transactional batch", func(t *testing.T) { + context := context.TODO() + batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) + + batch.ReadItem("67890", nil) + response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) + assert.Nil(t, err) + assert.True(t, response.Success) + assert.Equal(t, 1, len(response.OperationResults)) + + operationResponse := response.OperationResults[0] + assert.NotNil(t, operationResponse) + assert.NotNil(t, operationResponse.ResourceBody) + assert.Equal(t, int32(http.StatusOK), operationResponse.StatusCode) + + var itemResponseBody map[string]interface{} + json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) + assert.Equal(t, "67890", itemResponseBody["id"]) + }) }) } diff --git a/api/tests/documents_trailingslash_test.go b/api/tests/documents_trailingslash_test.go index a82acde..7e7fa26 100644 --- a/api/tests/documents_trailingslash_test.go +++ b/api/tests/documents_trailingslash_test.go @@ -14,7 +14,8 @@ import ( // Request document with trailing slash like python cosmosdb client does. func Test_Documents_Read_Trailing_Slash(t *testing.T) { - ts, _ := documents_InitializeDb(t) + ts := runTestServer() + documents_InitializeDb(t, ts) defer ts.Server.Close() t.Run("Read doc with client that appends slash to path", func(t *testing.T) { diff --git a/cmd/server/server.go b/cmd/server/server.go index 5f1383c..6d6c9ab 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -8,16 +8,23 @@ import ( "github.com/pikami/cosmium/api" "github.com/pikami/cosmium/api/config" "github.com/pikami/cosmium/internal/datastore" + badgerdatastore "github.com/pikami/cosmium/internal/datastore/badger_datastore" mapdatastore "github.com/pikami/cosmium/internal/datastore/map_datastore" ) func main() { configuration := config.ParseFlags() - var dataStore datastore.DataStore = mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{ - InitialDataFilePath: configuration.InitialDataFilePath, - PersistDataFilePath: configuration.PersistDataFilePath, - }) + var dataStore datastore.DataStore + switch configuration.DataStore { + case config.DataStoreBadger: + dataStore = badgerdatastore.NewBadgerDataStore() + default: + dataStore = mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{ + InitialDataFilePath: configuration.InitialDataFilePath, + PersistDataFilePath: configuration.PersistDataFilePath, + }) + } server := api.NewApiServer(dataStore, &configuration) err := server.Start() @@ -25,10 +32,10 @@ func main() { panic(err) } - waitForExit(server, dataStore, configuration) + waitForExit(server, dataStore) } -func waitForExit(server *api.ApiServer, dataStore datastore.DataStore, config config.ServerConfig) { +func waitForExit(server *api.ApiServer, dataStore datastore.DataStore) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) diff --git a/go.mod b/go.mod index 0042136..02004e9 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.3.0 github.com/cosmiumdev/json-patch/v5 v5.9.3 + github.com/dgraph-io/badger/v4 v4.6.0 github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.10.0 @@ -17,15 +18,22 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect github.com/bytedance/sonic v1.12.9 // indirect github.com/bytedance/sonic/loader v0.2.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.5 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgraph-io/ristretto/v2 v2.1.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/gin-contrib/sse v1.0.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.25.0 // indirect github.com/goccy/go-json v0.10.5 // indirect + github.com/google/flatbuffers v25.2.10+incompatible // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -36,6 +44,10 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel v1.34.0 // indirect + go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/otel/trace v1.34.0 // indirect golang.org/x/arch v0.14.0 // indirect golang.org/x/crypto v0.35.0 // indirect golang.org/x/net v0.35.0 // indirect diff --git a/go.sum b/go.sum index e9d6394..959b151 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/bytedance/sonic v1.12.9/go.mod h1:uVvFidNmlt9+wa31S1urfwwthTWteBgG0hW github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/bytedance/sonic/loader v0.2.3 h1:yctD0Q3v2NOGfSWPLPvG2ggA2kV6TS6s4wioyEqssH0= github.com/bytedance/sonic/loader v0.2.3/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= @@ -23,12 +25,25 @@ github.com/cosmiumdev/json-patch/v5 v5.9.3/go.mod h1:WzSTCdia0WrlZtjnL19P4RiwWtf github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger/v4 v4.6.0 h1:acOwfOOZ4p1dPRnYzvkVm7rUk2Y21TgPVepCy5dJdFQ= +github.com/dgraph-io/badger/v4 v4.6.0/go.mod h1:KSJ5VTuZNC3Sd+YhvVjk2nYua9UZnnTr/SkXvdtiPgI= +github.com/dgraph-io/ristretto/v2 v2.1.0 h1:59LjpOJLNDULHh8MC4UaegN52lC4JnO2dITsie/Pa8I= +github.com/dgraph-io/ristretto/v2 v2.1.0/go.mod h1:uejeqfYXpUomfse0+lO+13ATz4TypQYLJZzBSAemuB4= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E= github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0= github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= @@ -41,6 +56,8 @@ github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q= +github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -48,6 +65,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= @@ -75,8 +94,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -93,6 +112,14 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= golang.org/x/arch v0.14.0 h1:z9JUEZWr8x4rR0OU6c4/4t6E6jOZ8/QBS2bBYBm4tx4= golang.org/x/arch v0.14.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= diff --git a/internal/datastore/badger_datastore/badger_datastore.go b/internal/datastore/badger_datastore/badger_datastore.go new file mode 100644 index 0000000..17292e4 --- /dev/null +++ b/internal/datastore/badger_datastore/badger_datastore.go @@ -0,0 +1,36 @@ +package badgerdatastore + +import ( + "encoding/gob" + + "github.com/dgraph-io/badger/v4" + "github.com/pikami/cosmium/internal/logger" +) + +type BadgerDataStore struct { + db *badger.DB +} + +func NewBadgerDataStore() *BadgerDataStore { + gob.Register([]interface{}{}) + + badgerOpts := badger.DefaultOptions("").WithInMemory(true) + + db, err := badger.Open(badgerOpts) + if err != nil { + panic(err) + } + + return &BadgerDataStore{ + db: db, + } +} + +func (r *BadgerDataStore) Close() { + r.db.Close() +} + +func (r *BadgerDataStore) DumpToJson() (string, error) { + logger.ErrorLn("Badger datastore does not support state export currently.") + return "{}", nil +} diff --git a/internal/datastore/badger_datastore/collections.go b/internal/datastore/badger_datastore/collections.go new file mode 100644 index 0000000..5c4b433 --- /dev/null +++ b/internal/datastore/badger_datastore/collections.go @@ -0,0 +1,103 @@ +package badgerdatastore + +import ( + "fmt" + "time" + + "github.com/google/uuid" + "github.com/pikami/cosmium/internal/datastore" + "github.com/pikami/cosmium/internal/logger" + "github.com/pikami/cosmium/internal/resourceid" + structhidrators "github.com/pikami/cosmium/internal/struct_hidrators" +) + +func (r *BadgerDataStore) GetAllCollections(databaseId string) ([]datastore.Collection, datastore.DataStoreStatus) { + exists, err := keyExists(r.db.NewTransaction(false), generateDatabaseKey(databaseId)) + if err != nil { + logger.ErrorLn("Error while checking if database exists:", err) + return nil, datastore.Unknown + } + + if !exists { + return nil, datastore.StatusNotFound + } + + colls, status := listByPrefix[datastore.Collection](r.db, generateKey(resourceid.ResourceTypeCollection, databaseId, "", "")) + if status == datastore.StatusOk { + return colls, datastore.StatusOk + } + + return nil, status +} + +func (r *BadgerDataStore) GetCollection(databaseId string, collectionId string) (datastore.Collection, datastore.DataStoreStatus) { + collectionKey := generateCollectionKey(databaseId, collectionId) + + txn := r.db.NewTransaction(false) + defer txn.Discard() + + var collection datastore.Collection + status := getKey(txn, collectionKey, &collection) + + return collection, status +} + +func (r *BadgerDataStore) DeleteCollection(databaseId string, collectionId string) datastore.DataStoreStatus { + collectionKey := generateCollectionKey(databaseId, collectionId) + + txn := r.db.NewTransaction(true) + defer txn.Discard() + + prefixes := []string{ + generateKey(resourceid.ResourceTypeDocument, databaseId, collectionId, ""), + generateKey(resourceid.ResourceTypeTrigger, databaseId, collectionId, ""), + generateKey(resourceid.ResourceTypeStoredProcedure, databaseId, collectionId, ""), + generateKey(resourceid.ResourceTypeUserDefinedFunction, databaseId, collectionId, ""), + collectionKey, + } + for _, prefix := range prefixes { + if err := deleteKeysByPrefix(txn, prefix); err != nil { + return datastore.Unknown + } + } + + err := txn.Commit() + if err != nil { + logger.ErrorLn("Error while committing transaction:", err) + return datastore.Unknown + } + + return datastore.StatusOk +} + +func (r *BadgerDataStore) CreateCollection(databaseId string, newCollection datastore.Collection) (datastore.Collection, datastore.DataStoreStatus) { + collectionKey := generateCollectionKey(databaseId, newCollection.ID) + + txn := r.db.NewTransaction(true) + defer txn.Discard() + + collectionExists, err := keyExists(txn, collectionKey) + if err != nil || collectionExists { + return datastore.Collection{}, datastore.Conflict + } + + var database datastore.Database + status := getKey(txn, generateDatabaseKey(databaseId), &database) + if status != datastore.StatusOk { + return datastore.Collection{}, status + } + + newCollection = structhidrators.Hidrate(newCollection).(datastore.Collection) + + newCollection.TimeStamp = time.Now().Unix() + newCollection.ResourceID = resourceid.NewCombined(database.ResourceID, resourceid.New(resourceid.ResourceTypeCollection)) + newCollection.ETag = fmt.Sprintf("\"%s\"", uuid.New()) + newCollection.Self = fmt.Sprintf("dbs/%s/colls/%s/", database.ResourceID, newCollection.ResourceID) + + status = insertKey(txn, collectionKey, newCollection) + if status != datastore.StatusOk { + return datastore.Collection{}, status + } + + return newCollection, datastore.StatusOk +} diff --git a/internal/datastore/badger_datastore/databases.go b/internal/datastore/badger_datastore/databases.go new file mode 100644 index 0000000..5fff5ad --- /dev/null +++ b/internal/datastore/badger_datastore/databases.go @@ -0,0 +1,80 @@ +package badgerdatastore + +import ( + "fmt" + "time" + + "github.com/google/uuid" + "github.com/pikami/cosmium/internal/datastore" + "github.com/pikami/cosmium/internal/logger" + "github.com/pikami/cosmium/internal/resourceid" +) + +func (r *BadgerDataStore) GetAllDatabases() ([]datastore.Database, datastore.DataStoreStatus) { + dbs, status := listByPrefix[datastore.Database](r.db, DatabaseKeyPrefix) + if status == datastore.StatusOk { + return dbs, datastore.StatusOk + } + + return nil, status +} + +func (r *BadgerDataStore) GetDatabase(id string) (datastore.Database, datastore.DataStoreStatus) { + databaseKey := generateDatabaseKey(id) + + txn := r.db.NewTransaction(false) + defer txn.Discard() + + var database datastore.Database + status := getKey(txn, databaseKey, &database) + + return database, status +} + +func (r *BadgerDataStore) DeleteDatabase(id string) datastore.DataStoreStatus { + databaseKey := generateDatabaseKey(id) + + txn := r.db.NewTransaction(true) + defer txn.Discard() + + prefixes := []string{ + generateKey(resourceid.ResourceTypeCollection, id, "", ""), + generateKey(resourceid.ResourceTypeDocument, id, "", ""), + generateKey(resourceid.ResourceTypeTrigger, id, "", ""), + generateKey(resourceid.ResourceTypeStoredProcedure, id, "", ""), + generateKey(resourceid.ResourceTypeUserDefinedFunction, id, "", ""), + databaseKey, + } + for _, prefix := range prefixes { + if err := deleteKeysByPrefix(txn, prefix); err != nil { + return datastore.Unknown + } + } + + err := txn.Commit() + if err != nil { + logger.ErrorLn("Error while committing transaction:", err) + return datastore.Unknown + } + + return datastore.StatusOk +} + +func (r *BadgerDataStore) CreateDatabase(newDatabase datastore.Database) (datastore.Database, datastore.DataStoreStatus) { + databaseKey := generateDatabaseKey(newDatabase.ID) + + txn := r.db.NewTransaction(true) + defer txn.Discard() + + newDatabase.TimeStamp = time.Now().Unix() + newDatabase.ResourceID = resourceid.New(resourceid.ResourceTypeDatabase) + newDatabase.ETag = fmt.Sprintf("\"%s\"", uuid.New()) + newDatabase.Self = fmt.Sprintf("dbs/%s/", newDatabase.ResourceID) + + status := insertKey(txn, databaseKey, newDatabase) + if status != datastore.StatusOk { + return datastore.Database{}, status + } + + return newDatabase, datastore.StatusOk +} diff --git a/internal/datastore/badger_datastore/db_abstractions.go b/internal/datastore/badger_datastore/db_abstractions.go new file mode 100644 index 0000000..fdc5aa6 --- /dev/null +++ b/internal/datastore/badger_datastore/db_abstractions.go @@ -0,0 +1,207 @@ +package badgerdatastore + +import ( + "bytes" + "encoding/gob" + + "github.com/dgraph-io/badger/v4" + "github.com/pikami/cosmium/internal/datastore" + "github.com/pikami/cosmium/internal/logger" + "github.com/pikami/cosmium/internal/resourceid" +) + +const ( + DatabaseKeyPrefix = "DB:" + CollectionKeyPrefix = "COL:" + DocumentKeyPrefix = "DOC:" + TriggerKeyPrefix = "TRG:" + StoredProcedureKeyPrefix = "SP:" + UserDefinedFunctionKeyPrefix = "UDF:" +) + +func generateKey( + resourceType resourceid.ResourceType, + databaseId string, + collectionId string, + resourceId string, +) string { + result := "" + + switch resourceType { + case resourceid.ResourceTypeDatabase: + result += DatabaseKeyPrefix + case resourceid.ResourceTypeCollection: + result += CollectionKeyPrefix + case resourceid.ResourceTypeDocument: + result += DocumentKeyPrefix + case resourceid.ResourceTypeTrigger: + result += TriggerKeyPrefix + case resourceid.ResourceTypeStoredProcedure: + result += StoredProcedureKeyPrefix + case resourceid.ResourceTypeUserDefinedFunction: + result += UserDefinedFunctionKeyPrefix + } + + if databaseId != "" { + result += databaseId + } + + if collectionId != "" { + result += "/colls/" + collectionId + } + + if resourceId != "" { + result += "/" + resourceId + } + + return result +} + +func generateDatabaseKey(databaseId string) string { + return generateKey(resourceid.ResourceTypeDatabase, databaseId, "", "") +} + +func generateCollectionKey(databaseId string, collectionId string) string { + return generateKey(resourceid.ResourceTypeCollection, databaseId, collectionId, "") +} + +func generateDocumentKey(databaseId string, collectionId string, documentId string) string { + return generateKey(resourceid.ResourceTypeDocument, databaseId, collectionId, documentId) +} + +func generateTriggerKey(databaseId string, collectionId string, triggerId string) string { + return generateKey(resourceid.ResourceTypeTrigger, databaseId, collectionId, triggerId) +} + +func generateStoredProcedureKey(databaseId string, collectionId string, storedProcedureId string) string { + return generateKey(resourceid.ResourceTypeStoredProcedure, databaseId, collectionId, storedProcedureId) +} + +func generateUserDefinedFunctionKey(databaseId string, collectionId string, udfId string) string { + return generateKey(resourceid.ResourceTypeUserDefinedFunction, databaseId, collectionId, udfId) +} + +func insertKey(txn *badger.Txn, key string, value interface{}) datastore.DataStoreStatus { + _, err := txn.Get([]byte(key)) + if err == nil { + return datastore.Conflict + } + + if err != badger.ErrKeyNotFound { + logger.ErrorLn("Error while checking if key exists:", err) + return datastore.Unknown + } + + var buf bytes.Buffer + err = gob.NewEncoder(&buf).Encode(value) + if err != nil { + logger.ErrorLn("Error while encoding value:", err) + return datastore.Unknown + } + + err = txn.Set([]byte(key), buf.Bytes()) + if err != nil { + logger.ErrorLn("Error while setting key:", err) + return datastore.Unknown + } + + err = txn.Commit() + if err != nil { + logger.ErrorLn("Error while committing transaction:", err) + return datastore.Unknown + } + + return datastore.StatusOk +} + +func getKey(txn *badger.Txn, key string, value interface{}) datastore.DataStoreStatus { + item, err := txn.Get([]byte(key)) + if err != nil { + if err == badger.ErrKeyNotFound { + return datastore.StatusNotFound + } + logger.ErrorLn("Error while getting key:", err) + return datastore.Unknown + } + + val, err := item.ValueCopy(nil) + if err != nil { + logger.ErrorLn("Error while copying value:", err) + return datastore.Unknown + } + + if value == nil { + logger.ErrorLn("getKey called with nil value") + return datastore.Unknown + } + + err = gob.NewDecoder(bytes.NewReader(val)).Decode(value) + if err != nil { + logger.ErrorLn("Error while decoding value:", err) + return datastore.Unknown + } + + return datastore.StatusOk +} + +func keyExists(txn *badger.Txn, key string) (bool, error) { + _, err := txn.Get([]byte(key)) + if err == nil { + return true, nil + } + + if err == badger.ErrKeyNotFound { + return false, nil + } + + return false, err +} + +func listByPrefix[T any](db *badger.DB, prefix string) ([]T, datastore.DataStoreStatus) { + var results []T + + err := db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = []byte(prefix) + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + var entry T + + status := getKey(txn, string(item.Key()), &entry) + if status != datastore.StatusOk { + logger.ErrorLn("Failed to retrieve entry:", string(item.Key())) + continue + } + + results = append(results, entry) + } + return nil + }) + + if err != nil { + logger.ErrorLn("Error while listing entries:", err) + return nil, datastore.Unknown + } + + return results, datastore.StatusOk +} + +func deleteKeysByPrefix(txn *badger.Txn, prefix string) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = []byte(prefix) + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + key := it.Item().KeyCopy(nil) + if err := txn.Delete(key); err != nil { + logger.ErrorLn("Failed to delete key:", string(key), "Error:", err) + return err + } + } + + return nil +} diff --git a/internal/datastore/badger_datastore/document_iterator.go b/internal/datastore/badger_datastore/document_iterator.go new file mode 100644 index 0000000..6fe9172 --- /dev/null +++ b/internal/datastore/badger_datastore/document_iterator.go @@ -0,0 +1,60 @@ +package badgerdatastore + +import ( + "bytes" + "encoding/gob" + + "github.com/dgraph-io/badger/v4" + "github.com/pikami/cosmium/internal/datastore" + "github.com/pikami/cosmium/internal/logger" +) + +type BadgerDocumentIterator struct { + txn *badger.Txn + it *badger.Iterator + prefix string +} + +func NewBadgerDocumentIterator(txn *badger.Txn, prefix string) *BadgerDocumentIterator { + opts := badger.DefaultIteratorOptions + opts.Prefix = []byte(prefix) + + it := txn.NewIterator(opts) + it.Rewind() + + return &BadgerDocumentIterator{ + txn: txn, + it: it, + prefix: prefix, + } +} + +func (i *BadgerDocumentIterator) Next() (datastore.Document, datastore.DataStoreStatus) { + if !i.it.Valid() { + i.it.Close() + return datastore.Document{}, datastore.IterEOF + } + + item := i.it.Item() + val, err := item.ValueCopy(nil) + if err != nil { + logger.ErrorLn("Error while copying value:", err) + return datastore.Document{}, datastore.Unknown + } + + current := &datastore.Document{} + err = gob.NewDecoder(bytes.NewReader(val)).Decode(current) + if err != nil { + logger.ErrorLn("Error while decoding value:", err) + return datastore.Document{}, datastore.Unknown + } + + i.it.Next() + + return *current, datastore.StatusOk +} + +func (i *BadgerDocumentIterator) Close() { + i.it.Close() + i.txn.Discard() +} diff --git a/internal/datastore/badger_datastore/documents.go b/internal/datastore/badger_datastore/documents.go new file mode 100644 index 0000000..f55f571 --- /dev/null +++ b/internal/datastore/badger_datastore/documents.go @@ -0,0 +1,127 @@ +package badgerdatastore + +import ( + "fmt" + "time" + + "github.com/google/uuid" + "github.com/pikami/cosmium/internal/datastore" + "github.com/pikami/cosmium/internal/logger" + "github.com/pikami/cosmium/internal/resourceid" +) + +func (r *BadgerDataStore) GetAllDocuments(databaseId string, collectionId string) ([]datastore.Document, datastore.DataStoreStatus) { + txn := r.db.NewTransaction(false) + defer txn.Discard() + + dbExists, err := keyExists(txn, generateDatabaseKey(databaseId)) + if err != nil || !dbExists { + return nil, datastore.StatusNotFound + } + + collExists, err := keyExists(txn, generateCollectionKey(databaseId, collectionId)) + if err != nil || !collExists { + return nil, datastore.StatusNotFound + } + + docs, status := listByPrefix[datastore.Document](r.db, generateKey(resourceid.ResourceTypeDocument, databaseId, collectionId, "")) + if status == datastore.StatusOk { + return docs, datastore.StatusOk + } + + return nil, status +} + +func (r *BadgerDataStore) GetDocumentIterator(databaseId string, collectionId string) (datastore.DocumentIterator, datastore.DataStoreStatus) { + txn := r.db.NewTransaction(false) + + dbExists, err := keyExists(txn, generateDatabaseKey(databaseId)) + if err != nil || !dbExists { + return nil, datastore.StatusNotFound + } + + collExists, err := keyExists(txn, generateCollectionKey(databaseId, collectionId)) + if err != nil || !collExists { + return nil, datastore.StatusNotFound + } + + iter := NewBadgerDocumentIterator(txn, generateKey(resourceid.ResourceTypeDocument, databaseId, collectionId, "")) + return iter, datastore.StatusOk +} + +func (r *BadgerDataStore) GetDocument(databaseId string, collectionId string, documentId string) (datastore.Document, datastore.DataStoreStatus) { + documentKey := generateDocumentKey(databaseId, collectionId, documentId) + + txn := r.db.NewTransaction(false) + defer txn.Discard() + + var document datastore.Document + status := getKey(txn, documentKey, &document) + + return document, status +} + +func (r *BadgerDataStore) DeleteDocument(databaseId string, collectionId string, documentId string) datastore.DataStoreStatus { + documentKey := generateDocumentKey(databaseId, collectionId, documentId) + + txn := r.db.NewTransaction(true) + defer txn.Discard() + + exists, err := keyExists(txn, documentKey) + if err != nil { + return datastore.Unknown + } + if !exists { + return datastore.StatusNotFound + } + + err = txn.Delete([]byte(documentKey)) + if err != nil { + logger.ErrorLn("Error while deleting document:", err) + return datastore.Unknown + } + + err = txn.Commit() + if err != nil { + logger.ErrorLn("Error while committing transaction:", err) + return datastore.Unknown + } + + return datastore.StatusOk +} + +func (r *BadgerDataStore) CreateDocument(databaseId string, collectionId string, document map[string]interface{}) (datastore.Document, datastore.DataStoreStatus) { + txn := r.db.NewTransaction(true) + defer txn.Discard() + + var database datastore.Database + status := getKey(txn, generateDatabaseKey(databaseId), &database) + if status != datastore.StatusOk { + return datastore.Document{}, status + } + + var collection datastore.Collection + status = getKey(txn, generateCollectionKey(databaseId, collectionId), &collection) + if status != datastore.StatusOk { + return datastore.Document{}, status + } + + var ok bool + var documentId string + if documentId, ok = document["id"].(string); !ok || documentId == "" { + documentId = fmt.Sprint(uuid.New()) + document["id"] = documentId + } + + document["_ts"] = time.Now().Unix() + document["_rid"] = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeDocument)) + document["_etag"] = fmt.Sprintf("\"%s\"", uuid.New()) + document["_self"] = fmt.Sprintf("dbs/%s/colls/%s/docs/%s/", database.ResourceID, collection.ResourceID, document["_rid"]) + + status = insertKey(txn, generateDocumentKey(databaseId, collectionId, documentId), document) + if status != datastore.StatusOk { + return datastore.Document{}, status + } + + return document, datastore.StatusOk +} diff --git a/internal/datastore/badger_datastore/partition_key_ranges.go b/internal/datastore/badger_datastore/partition_key_ranges.go new file mode 100644 index 0000000..bc0cb2e --- /dev/null +++ b/internal/datastore/badger_datastore/partition_key_ranges.go @@ -0,0 +1,53 @@ +package badgerdatastore + +import ( + "fmt" + + "github.com/google/uuid" + "github.com/pikami/cosmium/internal/datastore" + "github.com/pikami/cosmium/internal/resourceid" +) + +// I have no idea what this is tbh +func (r *BadgerDataStore) GetPartitionKeyRanges(databaseId string, collectionId string) ([]datastore.PartitionKeyRange, datastore.DataStoreStatus) { + databaseRid := databaseId + collectionRid := collectionId + var timestamp int64 = 0 + + txn := r.db.NewTransaction(false) + defer txn.Discard() + + var database datastore.Database + status := getKey(txn, generateDatabaseKey(databaseId), &database) + if status != datastore.StatusOk { + databaseRid = database.ResourceID + } + + var collection datastore.Collection + status = getKey(txn, generateCollectionKey(databaseId, collectionId), &collection) + if status != datastore.StatusOk { + collectionRid = collection.ResourceID + timestamp = collection.TimeStamp + } + + pkrResourceId := resourceid.NewCombined(collectionRid, resourceid.New(resourceid.ResourceTypePartitionKeyRange)) + pkrSelf := fmt.Sprintf("dbs/%s/colls/%s/pkranges/%s/", databaseRid, collectionRid, pkrResourceId) + etag := fmt.Sprintf("\"%s\"", uuid.New()) + + return []datastore.PartitionKeyRange{ + { + ResourceID: pkrResourceId, + ID: "0", + Etag: etag, + MinInclusive: "", + MaxExclusive: "FF", + RidPrefix: 0, + Self: pkrSelf, + ThroughputFraction: 1, + Status: "online", + Parents: []interface{}{}, + TimeStamp: timestamp, + Lsn: 17, + }, + }, datastore.StatusOk +} diff --git a/internal/datastore/badger_datastore/stored_procedures.go b/internal/datastore/badger_datastore/stored_procedures.go new file mode 100644 index 0000000..2559527 --- /dev/null +++ b/internal/datastore/badger_datastore/stored_procedures.go @@ -0,0 +1,107 @@ +package badgerdatastore + +import ( + "fmt" + "time" + + "github.com/google/uuid" + "github.com/pikami/cosmium/internal/datastore" + "github.com/pikami/cosmium/internal/logger" + "github.com/pikami/cosmium/internal/resourceid" +) + +func (r *BadgerDataStore) GetAllStoredProcedures(databaseId string, collectionId string) ([]datastore.StoredProcedure, datastore.DataStoreStatus) { + txn := r.db.NewTransaction(false) + defer txn.Discard() + + dbExists, err := keyExists(txn, generateDatabaseKey(databaseId)) + if err != nil || !dbExists { + return nil, datastore.StatusNotFound + } + + collExists, err := keyExists(txn, generateCollectionKey(databaseId, collectionId)) + if err != nil || !collExists { + return nil, datastore.StatusNotFound + } + + storedProcedures, status := listByPrefix[datastore.StoredProcedure](r.db, generateKey(resourceid.ResourceTypeStoredProcedure, databaseId, collectionId, "")) + if status == datastore.StatusOk { + return storedProcedures, datastore.StatusOk + } + + return nil, status +} + +func (r *BadgerDataStore) GetStoredProcedure(databaseId string, collectionId string, storedProcedureId string) (datastore.StoredProcedure, datastore.DataStoreStatus) { + storedProcedureKey := generateStoredProcedureKey(databaseId, collectionId, storedProcedureId) + + txn := r.db.NewTransaction(false) + defer txn.Discard() + + var storedProcedure datastore.StoredProcedure + status := getKey(txn, storedProcedureKey, &storedProcedure) + + return storedProcedure, status +} + +func (r *BadgerDataStore) DeleteStoredProcedure(databaseId string, collectionId string, storedProcedureId string) datastore.DataStoreStatus { + storedProcedureKey := generateStoredProcedureKey(databaseId, collectionId, storedProcedureId) + + txn := r.db.NewTransaction(true) + defer txn.Discard() + + exists, err := keyExists(txn, storedProcedureKey) + if err != nil { + return datastore.Unknown + } + if !exists { + return datastore.StatusNotFound + } + + err = txn.Delete([]byte(storedProcedureKey)) + if err != nil { + logger.ErrorLn("Error while deleting stored procedure:", err) + return datastore.Unknown + } + + err = txn.Commit() + if err != nil { + logger.ErrorLn("Error while committing transaction:", err) + return datastore.Unknown + } + + return datastore.StatusOk +} + +func (r *BadgerDataStore) CreateStoredProcedure(databaseId string, collectionId string, storedProcedure datastore.StoredProcedure) (datastore.StoredProcedure, datastore.DataStoreStatus) { + txn := r.db.NewTransaction(true) + defer txn.Discard() + + if storedProcedure.ID == "" { + return datastore.StoredProcedure{}, datastore.BadRequest + } + + var database datastore.Database + status := getKey(txn, generateDatabaseKey(databaseId), &database) + if status != datastore.StatusOk { + return datastore.StoredProcedure{}, status + } + + var collection datastore.Collection + status = getKey(txn, generateCollectionKey(databaseId, collectionId), &collection) + if status != datastore.StatusOk { + return datastore.StoredProcedure{}, status + } + + storedProcedure.TimeStamp = time.Now().Unix() + storedProcedure.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeStoredProcedure)) + storedProcedure.ETag = fmt.Sprintf("\"%s\"", uuid.New()) + storedProcedure.Self = fmt.Sprintf("dbs/%s/colls/%s/sprocs/%s/", database.ResourceID, collection.ResourceID, storedProcedure.ResourceID) + + status = insertKey(txn, generateStoredProcedureKey(databaseId, collectionId, storedProcedure.ID), storedProcedure) + if status != datastore.StatusOk { + return datastore.StoredProcedure{}, status + } + + return storedProcedure, datastore.StatusOk +} diff --git a/internal/datastore/badger_datastore/triggers.go b/internal/datastore/badger_datastore/triggers.go new file mode 100644 index 0000000..fc03f36 --- /dev/null +++ b/internal/datastore/badger_datastore/triggers.go @@ -0,0 +1,107 @@ +package badgerdatastore + +import ( + "fmt" + "time" + + "github.com/google/uuid" + "github.com/pikami/cosmium/internal/datastore" + "github.com/pikami/cosmium/internal/logger" + "github.com/pikami/cosmium/internal/resourceid" +) + +func (r *BadgerDataStore) GetAllTriggers(databaseId string, collectionId string) ([]datastore.Trigger, datastore.DataStoreStatus) { + txn := r.db.NewTransaction(false) + defer txn.Discard() + + dbExists, err := keyExists(txn, generateDatabaseKey(databaseId)) + if err != nil || !dbExists { + return nil, datastore.StatusNotFound + } + + collExists, err := keyExists(txn, generateCollectionKey(databaseId, collectionId)) + if err != nil || !collExists { + return nil, datastore.StatusNotFound + } + + triggers, status := listByPrefix[datastore.Trigger](r.db, generateKey(resourceid.ResourceTypeTrigger, databaseId, collectionId, "")) + if status == datastore.StatusOk { + return triggers, datastore.StatusOk + } + + return nil, status +} + +func (r *BadgerDataStore) GetTrigger(databaseId string, collectionId string, triggerId string) (datastore.Trigger, datastore.DataStoreStatus) { + triggerKey := generateTriggerKey(databaseId, collectionId, triggerId) + + txn := r.db.NewTransaction(false) + defer txn.Discard() + + var trigger datastore.Trigger + status := getKey(txn, triggerKey, &trigger) + + return trigger, status +} + +func (r *BadgerDataStore) DeleteTrigger(databaseId string, collectionId string, triggerId string) datastore.DataStoreStatus { + triggerKey := generateTriggerKey(databaseId, collectionId, triggerId) + + txn := r.db.NewTransaction(true) + defer txn.Discard() + + exists, err := keyExists(txn, triggerKey) + if err != nil { + return datastore.Unknown + } + if !exists { + return datastore.StatusNotFound + } + + err = txn.Delete([]byte(triggerKey)) + if err != nil { + logger.ErrorLn("Error while deleting trigger:", err) + return datastore.Unknown + } + + err = txn.Commit() + if err != nil { + logger.ErrorLn("Error while committing transaction:", err) + return datastore.Unknown + } + + return datastore.StatusOk +} + +func (r *BadgerDataStore) CreateTrigger(databaseId string, collectionId string, trigger datastore.Trigger) (datastore.Trigger, datastore.DataStoreStatus) { + txn := r.db.NewTransaction(true) + defer txn.Discard() + + if trigger.ID == "" { + return datastore.Trigger{}, datastore.BadRequest + } + + var database datastore.Database + status := getKey(txn, generateDatabaseKey(databaseId), &database) + if status != datastore.StatusOk { + return datastore.Trigger{}, status + } + + var collection datastore.Collection + status = getKey(txn, generateCollectionKey(databaseId, collectionId), &collection) + if status != datastore.StatusOk { + return datastore.Trigger{}, status + } + + trigger.TimeStamp = time.Now().Unix() + trigger.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeTrigger)) + trigger.ETag = fmt.Sprintf("\"%s\"", uuid.New()) + trigger.Self = fmt.Sprintf("dbs/%s/colls/%s/triggers/%s/", database.ResourceID, collection.ResourceID, trigger.ResourceID) + + status = insertKey(txn, generateTriggerKey(databaseId, collectionId, trigger.ID), trigger) + if status != datastore.StatusOk { + return datastore.Trigger{}, status + } + + return trigger, datastore.StatusOk +} diff --git a/internal/datastore/badger_datastore/user_defined_functions.go b/internal/datastore/badger_datastore/user_defined_functions.go new file mode 100644 index 0000000..dabf644 --- /dev/null +++ b/internal/datastore/badger_datastore/user_defined_functions.go @@ -0,0 +1,107 @@ +package badgerdatastore + +import ( + "fmt" + "time" + + "github.com/google/uuid" + "github.com/pikami/cosmium/internal/datastore" + "github.com/pikami/cosmium/internal/logger" + "github.com/pikami/cosmium/internal/resourceid" +) + +func (r *BadgerDataStore) GetAllUserDefinedFunctions(databaseId string, collectionId string) ([]datastore.UserDefinedFunction, datastore.DataStoreStatus) { + txn := r.db.NewTransaction(false) + defer txn.Discard() + + dbExists, err := keyExists(txn, generateDatabaseKey(databaseId)) + if err != nil || !dbExists { + return nil, datastore.StatusNotFound + } + + collExists, err := keyExists(txn, generateCollectionKey(databaseId, collectionId)) + if err != nil || !collExists { + return nil, datastore.StatusNotFound + } + + udfs, status := listByPrefix[datastore.UserDefinedFunction](r.db, generateKey(resourceid.ResourceTypeUserDefinedFunction, databaseId, collectionId, "")) + if status == datastore.StatusOk { + return udfs, datastore.StatusOk + } + + return nil, status +} + +func (r *BadgerDataStore) GetUserDefinedFunction(databaseId string, collectionId string, udfId string) (datastore.UserDefinedFunction, datastore.DataStoreStatus) { + udfKey := generateUserDefinedFunctionKey(databaseId, collectionId, udfId) + + txn := r.db.NewTransaction(false) + defer txn.Discard() + + var udf datastore.UserDefinedFunction + status := getKey(txn, udfKey, &udf) + + return udf, status +} + +func (r *BadgerDataStore) DeleteUserDefinedFunction(databaseId string, collectionId string, udfId string) datastore.DataStoreStatus { + udfKey := generateUserDefinedFunctionKey(databaseId, collectionId, udfId) + + txn := r.db.NewTransaction(true) + defer txn.Discard() + + exists, err := keyExists(txn, udfKey) + if err != nil { + return datastore.Unknown + } + if !exists { + return datastore.StatusNotFound + } + + err = txn.Delete([]byte(udfKey)) + if err != nil { + logger.ErrorLn("Error while deleting user defined function:", err) + return datastore.Unknown + } + + err = txn.Commit() + if err != nil { + logger.ErrorLn("Error while committing transaction:", err) + return datastore.Unknown + } + + return datastore.StatusOk +} + +func (r *BadgerDataStore) CreateUserDefinedFunction(databaseId string, collectionId string, udf datastore.UserDefinedFunction) (datastore.UserDefinedFunction, datastore.DataStoreStatus) { + txn := r.db.NewTransaction(true) + defer txn.Discard() + + if udf.ID == "" { + return datastore.UserDefinedFunction{}, datastore.BadRequest + } + + var database datastore.Database + status := getKey(txn, generateDatabaseKey(databaseId), &database) + if status != datastore.StatusOk { + return datastore.UserDefinedFunction{}, status + } + + var collection datastore.Collection + status = getKey(txn, generateCollectionKey(databaseId, collectionId), &collection) + if status != datastore.StatusOk { + return datastore.UserDefinedFunction{}, status + } + + udf.TimeStamp = time.Now().Unix() + udf.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeUserDefinedFunction)) + udf.ETag = fmt.Sprintf("\"%s\"", uuid.New()) + udf.Self = fmt.Sprintf("dbs/%s/colls/%s/udfs/%s/", database.ResourceID, collection.ResourceID, udf.ResourceID) + + status = insertKey(txn, generateUserDefinedFunctionKey(databaseId, collectionId, udf.ID), udf) + if status != datastore.StatusOk { + return datastore.UserDefinedFunction{}, status + } + + return udf, datastore.StatusOk +} diff --git a/internal/datastore/datastore.go b/internal/datastore/datastore.go index 8645b7a..cbd3594 100644 --- a/internal/datastore/datastore.go +++ b/internal/datastore/datastore.go @@ -40,4 +40,5 @@ type DataStore interface { type DocumentIterator interface { Next() (Document, DataStoreStatus) + Close() } diff --git a/internal/datastore/map_datastore/array_document_iterator.go b/internal/datastore/map_datastore/array_document_iterator.go index 060998d..7796609 100644 --- a/internal/datastore/map_datastore/array_document_iterator.go +++ b/internal/datastore/map_datastore/array_document_iterator.go @@ -15,3 +15,7 @@ func (i *ArrayDocumentIterator) Next() (datastore.Document, datastore.DataStoreS return i.documents[i.index], datastore.StatusOk } + +func (i *ArrayDocumentIterator) Close() { + i.documents = []datastore.Document{} +} diff --git a/internal/datastore/models.go b/internal/datastore/models.go index 4d8a5cd..a591e60 100644 --- a/internal/datastore/models.go +++ b/internal/datastore/models.go @@ -16,6 +16,7 @@ const ( Conflict = 3 BadRequest = 4 IterEOF = 5 + Unknown = 6 ) type TriggerOperation string diff --git a/sharedlibrary/sharedlibrary.go b/sharedlibrary/sharedlibrary.go index f4a111c..d3edd62 100644 --- a/sharedlibrary/sharedlibrary.go +++ b/sharedlibrary/sharedlibrary.go @@ -11,6 +11,8 @@ import ( "github.com/pikami/cosmium/api" "github.com/pikami/cosmium/api/config" + "github.com/pikami/cosmium/internal/datastore" + badgerdatastore "github.com/pikami/cosmium/internal/datastore/badger_datastore" mapdatastore "github.com/pikami/cosmium/internal/datastore/map_datastore" ) @@ -32,10 +34,16 @@ func CreateServerInstance(serverName *C.char, configurationJSON *C.char) int { configuration.ApplyDefaultsToEmptyFields() configuration.PopulateCalculatedFields() - dataStore := mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{ - InitialDataFilePath: configuration.InitialDataFilePath, - PersistDataFilePath: configuration.PersistDataFilePath, - }) + var dataStore datastore.DataStore + switch configuration.DataStore { + case config.DataStoreBadger: + dataStore = badgerdatastore.NewBadgerDataStore() + default: + dataStore = mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{ + InitialDataFilePath: configuration.InitialDataFilePath, + PersistDataFilePath: configuration.PersistDataFilePath, + }) + } server := api.NewApiServer(dataStore, &configuration) err = server.Start()