mirror of
https://github.com/pikami/cosmium.git
synced 2025-03-13 05:15:52 +00:00
Added support for Badger as an alternative storage backend
This commit is contained in:
parent
e526b2269e
commit
813b9faeaa
@ -15,6 +15,11 @@ const (
|
||||
ExplorerBaseUrlLocation = "/_explorer"
|
||||
)
|
||||
|
||||
const (
|
||||
DataStoreMap = "map"
|
||||
DataStoreBadger = "badger"
|
||||
)
|
||||
|
||||
func ParseFlags() ServerConfig {
|
||||
host := flag.String("Host", "localhost", "Hostname")
|
||||
port := flag.Int("Port", 8081, "Listen port")
|
||||
@ -28,6 +33,8 @@ func ParseFlags() ServerConfig {
|
||||
persistDataPath := flag.String("Persist", "", "Saves data to given path on application exit")
|
||||
logLevel := NewEnumValue("info", []string{"debug", "info", "error", "silent"})
|
||||
flag.Var(logLevel, "LogLevel", fmt.Sprintf("Sets the logging level %s", logLevel.AllowedValuesList()))
|
||||
dataStore := NewEnumValue("map", []string{DataStoreMap, DataStoreBadger})
|
||||
flag.Var(dataStore, "DataStore", fmt.Sprintf("Sets the data store %s, (badger is currently in the experimental phase)", dataStore.AllowedValuesList()))
|
||||
|
||||
flag.Parse()
|
||||
setFlagsFromEnvironment()
|
||||
@ -44,6 +51,7 @@ func ParseFlags() ServerConfig {
|
||||
config.DisableTls = *disableTls
|
||||
config.AccountKey = *accountKey
|
||||
config.LogLevel = logLevel.value
|
||||
config.DataStore = dataStore.value
|
||||
|
||||
config.PopulateCalculatedFields()
|
||||
|
||||
@ -68,6 +76,13 @@ func (c *ServerConfig) PopulateCalculatedFields() {
|
||||
default:
|
||||
logger.SetLogLevel(logger.LogLevelInfo)
|
||||
}
|
||||
|
||||
if c.DataStore == DataStoreBadger &&
|
||||
(c.InitialDataFilePath != "" || c.PersistDataFilePath != "") {
|
||||
logger.ErrorLn("InitialData and Persist options are currently not supported with Badger data store")
|
||||
c.InitialDataFilePath = ""
|
||||
c.PersistDataFilePath = ""
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ServerConfig) ApplyDefaultsToEmptyFields() {
|
||||
|
@ -17,4 +17,6 @@ type ServerConfig struct {
|
||||
DisableTls bool `json:"disableTls"`
|
||||
LogLevel string `json:"logLevel"`
|
||||
ExplorerBaseUrlLocation string `json:"explorerBaseUrlLocation"`
|
||||
|
||||
DataStore string `json:"dataStore"`
|
||||
}
|
||||
|
@ -383,6 +383,7 @@ func (h *Handlers) executeQueryDocuments(databaseId string, collectionId string,
|
||||
if status != datastore.StatusOk {
|
||||
return nil, status
|
||||
}
|
||||
defer allDocumentsIterator.Close()
|
||||
|
||||
rowsIterator := converters.NewDocumentToRowTypeIterator(allDocumentsIterator)
|
||||
|
||||
|
@ -3,32 +3,29 @@ package tests_test
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
|
||||
"github.com/pikami/cosmium/api/config"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_Collections(t *testing.T) {
|
||||
ts := runTestServer()
|
||||
defer ts.Server.Close()
|
||||
presets := []testPreset{PresetMapStore, PresetBadgerStore}
|
||||
|
||||
client, err := azcosmos.NewClientFromConnectionString(
|
||||
fmt.Sprintf("AccountEndpoint=%s;AccountKey=%s", ts.URL, config.DefaultAccountKey),
|
||||
&azcosmos.ClientOptions{},
|
||||
)
|
||||
assert.Nil(t, err)
|
||||
setUp := func(ts *TestServer, client *azcosmos.Client) *azcosmos.DatabaseClient {
|
||||
ts.DataStore.CreateDatabase(datastore.Database{ID: testDatabaseName})
|
||||
databaseClient, err := client.NewDatabase(testDatabaseName)
|
||||
assert.Nil(t, err)
|
||||
|
||||
ts.DataStore.CreateDatabase(datastore.Database{ID: testDatabaseName})
|
||||
databaseClient, err := client.NewDatabase(testDatabaseName)
|
||||
assert.Nil(t, err)
|
||||
return databaseClient
|
||||
}
|
||||
|
||||
runTestsWithPresets(t, "Collection Create", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||
databaseClient := setUp(ts, client)
|
||||
|
||||
t.Run("Collection Create", func(t *testing.T) {
|
||||
t.Run("Should create collection", func(t *testing.T) {
|
||||
createResponse, err := databaseClient.CreateContainer(context.TODO(), azcosmos.ContainerProperties{
|
||||
ID: testCollectionName,
|
||||
@ -57,7 +54,9 @@ func Test_Collections(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Collection Read", func(t *testing.T) {
|
||||
runTestsWithPresets(t, "Collection Read", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||
databaseClient := setUp(ts, client)
|
||||
|
||||
t.Run("Should read collection", func(t *testing.T) {
|
||||
ts.DataStore.CreateCollection(testDatabaseName, datastore.Collection{
|
||||
ID: testCollectionName,
|
||||
@ -90,7 +89,9 @@ func Test_Collections(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Collection Delete", func(t *testing.T) {
|
||||
runTestsWithPresets(t, "Collection Delete", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||
databaseClient := setUp(ts, client)
|
||||
|
||||
t.Run("Should delete collection", func(t *testing.T) {
|
||||
ts.DataStore.CreateCollection(testDatabaseName, datastore.Collection{
|
||||
ID: testCollectionName,
|
||||
|
@ -1,13 +1,18 @@
|
||||
package tests_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
|
||||
"github.com/pikami/cosmium/api"
|
||||
"github.com/pikami/cosmium/api/config"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
badgerdatastore "github.com/pikami/cosmium/internal/datastore/badger_datastore"
|
||||
mapdatastore "github.com/pikami/cosmium/internal/datastore/map_datastore"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type TestServer struct {
|
||||
@ -16,14 +21,29 @@ type TestServer struct {
|
||||
URL string
|
||||
}
|
||||
|
||||
func runTestServerCustomConfig(config *config.ServerConfig) *TestServer {
|
||||
dataStore := mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{})
|
||||
func getDefaultTestServerConfig() *config.ServerConfig {
|
||||
return &config.ServerConfig{
|
||||
AccountKey: config.DefaultAccountKey,
|
||||
ExplorerPath: "/tmp/nothing",
|
||||
ExplorerBaseUrlLocation: config.ExplorerBaseUrlLocation,
|
||||
DataStore: "map",
|
||||
}
|
||||
}
|
||||
|
||||
api := api.NewApiServer(dataStore, config)
|
||||
func runTestServerCustomConfig(configuration *config.ServerConfig) *TestServer {
|
||||
var dataStore datastore.DataStore
|
||||
switch configuration.DataStore {
|
||||
case config.DataStoreBadger:
|
||||
dataStore = badgerdatastore.NewBadgerDataStore()
|
||||
default:
|
||||
dataStore = mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{})
|
||||
}
|
||||
|
||||
api := api.NewApiServer(dataStore, configuration)
|
||||
|
||||
server := httptest.NewServer(api.GetRouter())
|
||||
|
||||
config.DatabaseEndpoint = server.URL
|
||||
configuration.DatabaseEndpoint = server.URL
|
||||
|
||||
return &TestServer{
|
||||
Server: server,
|
||||
@ -33,11 +53,7 @@ func runTestServerCustomConfig(config *config.ServerConfig) *TestServer {
|
||||
}
|
||||
|
||||
func runTestServer() *TestServer {
|
||||
config := &config.ServerConfig{
|
||||
AccountKey: config.DefaultAccountKey,
|
||||
ExplorerPath: "/tmp/nothing",
|
||||
ExplorerBaseUrlLocation: config.ExplorerBaseUrlLocation,
|
||||
}
|
||||
config := getDefaultTestServerConfig()
|
||||
|
||||
config.LogLevel = "debug"
|
||||
logger.SetLogLevel(logger.LogLevelDebug)
|
||||
@ -50,3 +66,46 @@ const (
|
||||
testDatabaseName = "test-db"
|
||||
testCollectionName = "test-coll"
|
||||
)
|
||||
|
||||
type testFunc func(t *testing.T, ts *TestServer, cosmosClient *azcosmos.Client)
|
||||
type testPreset string
|
||||
|
||||
const (
|
||||
PresetMapStore testPreset = "MapDS"
|
||||
PresetBadgerStore testPreset = "BadgerDS"
|
||||
)
|
||||
|
||||
func runTestsWithPreset(t *testing.T, name string, testPreset testPreset, f testFunc) {
|
||||
serverConfig := getDefaultTestServerConfig()
|
||||
|
||||
serverConfig.LogLevel = "debug"
|
||||
logger.SetLogLevel(logger.LogLevelDebug)
|
||||
|
||||
switch testPreset {
|
||||
case PresetBadgerStore:
|
||||
serverConfig.DataStore = config.DataStoreBadger
|
||||
case PresetMapStore:
|
||||
serverConfig.DataStore = config.DataStoreMap
|
||||
}
|
||||
|
||||
ts := runTestServerCustomConfig(serverConfig)
|
||||
defer ts.Server.Close()
|
||||
|
||||
client, err := azcosmos.NewClientFromConnectionString(
|
||||
fmt.Sprintf("AccountEndpoint=%s;AccountKey=%s", ts.URL, config.DefaultAccountKey),
|
||||
&azcosmos.ClientOptions{},
|
||||
)
|
||||
assert.Nil(t, err)
|
||||
|
||||
testName := fmt.Sprintf("%s_%s", testPreset, name)
|
||||
|
||||
t.Run(testName, func(t *testing.T) {
|
||||
f(t, ts, client)
|
||||
})
|
||||
}
|
||||
|
||||
func runTestsWithPresets(t *testing.T, name string, testPresets []testPreset, f testFunc) {
|
||||
for _, testPreset := range testPresets {
|
||||
runTestsWithPreset(t, name, testPreset, f)
|
||||
}
|
||||
}
|
||||
|
@ -3,28 +3,19 @@ package tests_test
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
|
||||
"github.com/pikami/cosmium/api/config"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_Databases(t *testing.T) {
|
||||
ts := runTestServer()
|
||||
defer ts.Server.Close()
|
||||
presets := []testPreset{PresetMapStore, PresetBadgerStore}
|
||||
|
||||
client, err := azcosmos.NewClientFromConnectionString(
|
||||
fmt.Sprintf("AccountEndpoint=%s;AccountKey=%s", ts.URL, config.DefaultAccountKey),
|
||||
&azcosmos.ClientOptions{},
|
||||
)
|
||||
assert.Nil(t, err)
|
||||
|
||||
t.Run("Database Create", func(t *testing.T) {
|
||||
runTestsWithPresets(t, "Database Create", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||
t.Run("Should create database", func(t *testing.T) {
|
||||
ts.DataStore.DeleteDatabase(testDatabaseName)
|
||||
|
||||
@ -55,7 +46,7 @@ func Test_Databases(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Database Read", func(t *testing.T) {
|
||||
runTestsWithPresets(t, "Database Read", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||
t.Run("Should read database", func(t *testing.T) {
|
||||
ts.DataStore.CreateDatabase(datastore.Database{
|
||||
ID: testDatabaseName,
|
||||
@ -88,7 +79,7 @@ func Test_Databases(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Database Delete", func(t *testing.T) {
|
||||
runTestsWithPresets(t, "Database Delete", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||
t.Run("Should delete database", func(t *testing.T) {
|
||||
ts.DataStore.CreateDatabase(datastore.Database{
|
||||
ID: testDatabaseName,
|
||||
|
@ -53,9 +53,7 @@ func testCosmosQuery(t *testing.T,
|
||||
}
|
||||
}
|
||||
|
||||
func documents_InitializeDb(t *testing.T) (*TestServer, *azcosmos.ContainerClient) {
|
||||
ts := runTestServer()
|
||||
|
||||
func documents_InitializeDb(t *testing.T, ts *TestServer) *azcosmos.ContainerClient {
|
||||
ts.DataStore.CreateDatabase(datastore.Database{ID: testDatabaseName})
|
||||
ts.DataStore.CreateCollection(testDatabaseName, datastore.Collection{
|
||||
ID: testCollectionName,
|
||||
@ -79,438 +77,439 @@ func documents_InitializeDb(t *testing.T) (*TestServer, *azcosmos.ContainerClien
|
||||
collectionClient, err := client.NewContainer(testDatabaseName, testCollectionName)
|
||||
assert.Nil(t, err)
|
||||
|
||||
return ts, collectionClient
|
||||
return collectionClient
|
||||
}
|
||||
|
||||
func Test_Documents(t *testing.T) {
|
||||
ts, collectionClient := documents_InitializeDb(t)
|
||||
defer ts.Server.Close()
|
||||
presets := []testPreset{PresetMapStore, PresetBadgerStore}
|
||||
|
||||
t.Run("Should query document", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
"SELECT c.id, c[\"pk\"] FROM c ORDER BY c.id",
|
||||
nil,
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "12345", "pk": "123"},
|
||||
map[string]interface{}{"id": "67890", "pk": "456"},
|
||||
},
|
||||
)
|
||||
runTestsWithPresets(t, "Test_Documents", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||
collectionClient := documents_InitializeDb(t, ts)
|
||||
|
||||
t.Run("Should query document", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
"SELECT c.id, c[\"pk\"] FROM c ORDER BY c.id",
|
||||
nil,
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "12345", "pk": "123"},
|
||||
map[string]interface{}{"id": "67890", "pk": "456"},
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("Should query VALUE array", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
"SELECT VALUE [c.id, c[\"pk\"]] FROM c ORDER BY c.id",
|
||||
nil,
|
||||
[]interface{}{
|
||||
[]interface{}{"12345", "123"},
|
||||
[]interface{}{"67890", "456"},
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("Should query VALUE object", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
"SELECT VALUE { id: c.id, _pk: c.pk } FROM c ORDER BY c.id",
|
||||
nil,
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "12345", "_pk": "123"},
|
||||
map[string]interface{}{"id": "67890", "_pk": "456"},
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("Should query document with single WHERE condition", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
`select c.id
|
||||
FROM c
|
||||
WHERE c.isCool=true
|
||||
ORDER BY c.id`,
|
||||
nil,
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "67890"},
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("Should query document with query parameters", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
`select c.id
|
||||
FROM c
|
||||
WHERE c.id=@param_id
|
||||
ORDER BY c.id`,
|
||||
[]azcosmos.QueryParameter{
|
||||
{Name: "@param_id", Value: "67890"},
|
||||
},
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "67890"},
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("Should query document with query parameters as accessor", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
`select c.id
|
||||
FROM c
|
||||
WHERE c[@param]="67890"
|
||||
ORDER BY c.id`,
|
||||
[]azcosmos.QueryParameter{
|
||||
{Name: "@param", Value: "id"},
|
||||
},
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "67890"},
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("Should query array accessor", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
`SELECT c.id,
|
||||
c["arr"][0] AS arr0,
|
||||
c["arr"][1] AS arr1,
|
||||
c["arr"][2] AS arr2,
|
||||
c["arr"][3] AS arr3
|
||||
FROM c ORDER BY c.id`,
|
||||
nil,
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "12345", "arr0": 1.0, "arr1": 2.0, "arr2": 3.0, "arr3": nil},
|
||||
map[string]interface{}{"id": "67890", "arr0": 6.0, "arr1": 7.0, "arr2": 8.0, "arr3": nil},
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("Should handle parallel writes", func(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
rutineCount := 100
|
||||
results := make(chan error, rutineCount)
|
||||
|
||||
createCall := func(i int) {
|
||||
defer wg.Done()
|
||||
item := map[string]interface{}{
|
||||
"id": fmt.Sprintf("id-%d", i),
|
||||
"pk": fmt.Sprintf("pk-%d", i),
|
||||
"val": i,
|
||||
}
|
||||
bytes, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
results <- err
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err = collectionClient.CreateItem(
|
||||
ctx,
|
||||
azcosmos.PartitionKey{},
|
||||
bytes,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
results <- err
|
||||
|
||||
collectionClient.ReadItem(ctx, azcosmos.PartitionKey{}, fmt.Sprintf("id-%d", i), nil)
|
||||
collectionClient.DeleteItem(ctx, azcosmos.PartitionKey{}, fmt.Sprintf("id-%d", i), nil)
|
||||
}
|
||||
|
||||
for i := 0; i < rutineCount; i++ {
|
||||
wg.Add(1)
|
||||
go createCall(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(results)
|
||||
|
||||
for err := range results {
|
||||
if err != nil {
|
||||
t.Errorf("Error creating item: %v", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Should query VALUE array", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
"SELECT VALUE [c.id, c[\"pk\"]] FROM c ORDER BY c.id",
|
||||
nil,
|
||||
[]interface{}{
|
||||
[]interface{}{"12345", "123"},
|
||||
[]interface{}{"67890", "456"},
|
||||
},
|
||||
)
|
||||
})
|
||||
runTestsWithPresets(t, "Test_Documents_Patch", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||
collectionClient := documents_InitializeDb(t, ts)
|
||||
|
||||
t.Run("Should query VALUE object", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
"SELECT VALUE { id: c.id, _pk: c.pk } FROM c ORDER BY c.id",
|
||||
nil,
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "12345", "_pk": "123"},
|
||||
map[string]interface{}{"id": "67890", "_pk": "456"},
|
||||
},
|
||||
)
|
||||
})
|
||||
t.Run("Should PATCH document", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
expectedData := map[string]interface{}{"id": "67890", "pk": "666", "newField": "newValue", "incr": 15., "setted": "isSet"}
|
||||
|
||||
t.Run("Should query document with single WHERE condition", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
`select c.id
|
||||
FROM c
|
||||
WHERE c.isCool=true
|
||||
ORDER BY c.id`,
|
||||
nil,
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "67890"},
|
||||
},
|
||||
)
|
||||
})
|
||||
patch := azcosmos.PatchOperations{}
|
||||
patch.AppendAdd("/newField", "newValue")
|
||||
patch.AppendIncrement("/incr", 15)
|
||||
patch.AppendRemove("/isCool")
|
||||
patch.AppendReplace("/pk", "666")
|
||||
patch.AppendSet("/setted", "isSet")
|
||||
|
||||
t.Run("Should query document with query parameters", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
`select c.id
|
||||
FROM c
|
||||
WHERE c.id=@param_id
|
||||
ORDER BY c.id`,
|
||||
[]azcosmos.QueryParameter{
|
||||
{Name: "@param_id", Value: "67890"},
|
||||
},
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "67890"},
|
||||
},
|
||||
)
|
||||
})
|
||||
itemResponse, err := collectionClient.PatchItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
"67890",
|
||||
patch,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.Nil(t, err)
|
||||
|
||||
t.Run("Should query document with query parameters as accessor", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
`select c.id
|
||||
FROM c
|
||||
WHERE c[@param]="67890"
|
||||
ORDER BY c.id`,
|
||||
[]azcosmos.QueryParameter{
|
||||
{Name: "@param", Value: "id"},
|
||||
},
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "67890"},
|
||||
},
|
||||
)
|
||||
})
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(itemResponse.Value, &itemResponseBody)
|
||||
|
||||
t.Run("Should query array accessor", func(t *testing.T) {
|
||||
testCosmosQuery(t, collectionClient,
|
||||
`SELECT c.id,
|
||||
c["arr"][0] AS arr0,
|
||||
c["arr"][1] AS arr1,
|
||||
c["arr"][2] AS arr2,
|
||||
c["arr"][3] AS arr3
|
||||
FROM c ORDER BY c.id`,
|
||||
nil,
|
||||
[]interface{}{
|
||||
map[string]interface{}{"id": "12345", "arr0": 1.0, "arr1": 2.0, "arr2": 3.0, "arr3": nil},
|
||||
map[string]interface{}{"id": "67890", "arr0": 6.0, "arr1": 7.0, "arr2": 8.0, "arr3": nil},
|
||||
},
|
||||
)
|
||||
})
|
||||
assert.Equal(t, expectedData["id"], itemResponseBody["id"])
|
||||
assert.Equal(t, expectedData["pk"], itemResponseBody["pk"])
|
||||
assert.Empty(t, itemResponseBody["isCool"])
|
||||
assert.Equal(t, expectedData["newField"], itemResponseBody["newField"])
|
||||
assert.Equal(t, expectedData["incr"], itemResponseBody["incr"])
|
||||
assert.Equal(t, expectedData["setted"], itemResponseBody["setted"])
|
||||
})
|
||||
|
||||
t.Run("Should handle parallel writes", func(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
rutineCount := 100
|
||||
results := make(chan error, rutineCount)
|
||||
t.Run("Should not allow to PATCH document ID", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
|
||||
patch := azcosmos.PatchOperations{}
|
||||
patch.AppendReplace("/id", "newValue")
|
||||
|
||||
_, err := collectionClient.PatchItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
"67890",
|
||||
patch,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
var respErr *azcore.ResponseError
|
||||
if errors.As(err, &respErr) {
|
||||
assert.Equal(t, http.StatusUnprocessableEntity, respErr.StatusCode)
|
||||
} else {
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("CreateItem", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
|
||||
createCall := func(i int) {
|
||||
defer wg.Done()
|
||||
item := map[string]interface{}{
|
||||
"id": fmt.Sprintf("id-%d", i),
|
||||
"pk": fmt.Sprintf("pk-%d", i),
|
||||
"val": i,
|
||||
"Id": "6789011",
|
||||
"pk": "456",
|
||||
"newField": "newValue2",
|
||||
}
|
||||
bytes, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
results <- err
|
||||
return
|
||||
}
|
||||
assert.Nil(t, err)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err = collectionClient.CreateItem(
|
||||
ctx,
|
||||
r, err2 := collectionClient.CreateItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
bytes,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
results <- err
|
||||
assert.NotNil(t, r)
|
||||
assert.Nil(t, err2)
|
||||
})
|
||||
|
||||
collectionClient.ReadItem(ctx, azcosmos.PartitionKey{}, fmt.Sprintf("id-%d", i), nil)
|
||||
collectionClient.DeleteItem(ctx, azcosmos.PartitionKey{}, fmt.Sprintf("id-%d", i), nil)
|
||||
}
|
||||
t.Run("CreateItem that already exists", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
|
||||
for i := 0; i < rutineCount; i++ {
|
||||
wg.Add(1)
|
||||
go createCall(i)
|
||||
}
|
||||
item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": false, "arr": []int{1, 2, 3}}
|
||||
bytes, err := json.Marshal(item)
|
||||
assert.Nil(t, err)
|
||||
|
||||
wg.Wait()
|
||||
close(results)
|
||||
r, err := collectionClient.CreateItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
bytes,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.NotNil(t, r)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
for err := range results {
|
||||
if err != nil {
|
||||
t.Errorf("Error creating item: %v", err)
|
||||
var respErr *azcore.ResponseError
|
||||
if errors.As(err, &respErr) {
|
||||
assert.Equal(t, http.StatusConflict, respErr.StatusCode)
|
||||
} else {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func Test_Documents_Patch(t *testing.T) {
|
||||
ts, collectionClient := documents_InitializeDb(t)
|
||||
defer ts.Server.Close()
|
||||
|
||||
t.Run("Should PATCH document", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
expectedData := map[string]interface{}{"id": "67890", "pk": "666", "newField": "newValue", "incr": 15., "setted": "isSet"}
|
||||
|
||||
patch := azcosmos.PatchOperations{}
|
||||
patch.AppendAdd("/newField", "newValue")
|
||||
patch.AppendIncrement("/incr", 15)
|
||||
patch.AppendRemove("/isCool")
|
||||
patch.AppendReplace("/pk", "666")
|
||||
patch.AppendSet("/setted", "isSet")
|
||||
|
||||
itemResponse, err := collectionClient.PatchItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
"67890",
|
||||
patch,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(itemResponse.Value, &itemResponseBody)
|
||||
|
||||
assert.Equal(t, expectedData["id"], itemResponseBody["id"])
|
||||
assert.Equal(t, expectedData["pk"], itemResponseBody["pk"])
|
||||
assert.Empty(t, itemResponseBody["isCool"])
|
||||
assert.Equal(t, expectedData["newField"], itemResponseBody["newField"])
|
||||
assert.Equal(t, expectedData["incr"], itemResponseBody["incr"])
|
||||
assert.Equal(t, expectedData["setted"], itemResponseBody["setted"])
|
||||
})
|
||||
|
||||
t.Run("Should not allow to PATCH document ID", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
|
||||
patch := azcosmos.PatchOperations{}
|
||||
patch.AppendReplace("/id", "newValue")
|
||||
|
||||
_, err := collectionClient.PatchItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
"67890",
|
||||
patch,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
var respErr *azcore.ResponseError
|
||||
if errors.As(err, &respErr) {
|
||||
assert.Equal(t, http.StatusUnprocessableEntity, respErr.StatusCode)
|
||||
} else {
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("CreateItem", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
|
||||
item := map[string]interface{}{
|
||||
"Id": "6789011",
|
||||
"pk": "456",
|
||||
"newField": "newValue2",
|
||||
}
|
||||
bytes, err := json.Marshal(item)
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err2 := collectionClient.CreateItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
bytes,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.NotNil(t, r)
|
||||
assert.Nil(t, err2)
|
||||
})
|
||||
|
||||
t.Run("CreateItem that already exists", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
|
||||
item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": false, "arr": []int{1, 2, 3}}
|
||||
bytes, err := json.Marshal(item)
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err := collectionClient.CreateItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
bytes,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.NotNil(t, r)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
var respErr *azcore.ResponseError
|
||||
if errors.As(err, &respErr) {
|
||||
assert.Equal(t, http.StatusConflict, respErr.StatusCode)
|
||||
} else {
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("UpsertItem new", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
|
||||
item := map[string]interface{}{"id": "123456", "pk": "1234", "isCool": false, "arr": []int{1, 2, 3}}
|
||||
bytes, err := json.Marshal(item)
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err2 := collectionClient.UpsertItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
bytes,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.NotNil(t, r)
|
||||
assert.Nil(t, err2)
|
||||
})
|
||||
|
||||
t.Run("UpsertItem that already exists", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
|
||||
item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": false, "arr": []int{1, 2, 3, 4}}
|
||||
bytes, err := json.Marshal(item)
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err2 := collectionClient.UpsertItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
bytes,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.NotNil(t, r)
|
||||
assert.Nil(t, err2)
|
||||
})
|
||||
}
|
||||
|
||||
func Test_Documents_TransactionalBatch(t *testing.T) {
|
||||
ts, collectionClient := documents_InitializeDb(t)
|
||||
defer ts.Server.Close()
|
||||
|
||||
t.Run("Should execute CREATE transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
newItem := map[string]interface{}{
|
||||
"id": "678901",
|
||||
}
|
||||
bytes, err := json.Marshal(newItem)
|
||||
assert.Nil(t, err)
|
||||
|
||||
batch.CreateItem(bytes, nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||
|
||||
createdDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||
assert.Equal(t, newItem["id"], createdDoc["id"])
|
||||
})
|
||||
|
||||
t.Run("Should execute DELETE transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
batch.DeleteItem("12345", nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.Equal(t, int32(http.StatusNoContent), operationResponse.StatusCode)
|
||||
|
||||
_, status := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, "12345")
|
||||
assert.Equal(t, datastore.StatusNotFound, int(status))
|
||||
})
|
||||
|
||||
t.Run("Should execute REPLACE transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
newItem := map[string]interface{}{
|
||||
"id": "67890",
|
||||
"pk": "666",
|
||||
}
|
||||
bytes, err := json.Marshal(newItem)
|
||||
assert.Nil(t, err)
|
||||
|
||||
batch.ReplaceItem("67890", bytes, nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||
assert.Equal(t, newItem["pk"], itemResponseBody["pk"])
|
||||
|
||||
updatedDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||
assert.Equal(t, newItem["id"], updatedDoc["id"])
|
||||
assert.Equal(t, newItem["pk"], updatedDoc["pk"])
|
||||
})
|
||||
|
||||
t.Run("Should execute UPSERT transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
newItem := map[string]interface{}{
|
||||
"id": "678901",
|
||||
"pk": "666",
|
||||
}
|
||||
bytes, err := json.Marshal(newItem)
|
||||
assert.Nil(t, err)
|
||||
|
||||
batch.UpsertItem(bytes, nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||
assert.Equal(t, newItem["pk"], itemResponseBody["pk"])
|
||||
|
||||
updatedDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||
assert.Equal(t, newItem["id"], updatedDoc["id"])
|
||||
assert.Equal(t, newItem["pk"], updatedDoc["pk"])
|
||||
})
|
||||
|
||||
t.Run("Should execute READ transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
batch.ReadItem("67890", nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusOK), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, "67890", itemResponseBody["id"])
|
||||
})
|
||||
|
||||
t.Run("UpsertItem new", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
|
||||
item := map[string]interface{}{"id": "123456", "pk": "1234", "isCool": false, "arr": []int{1, 2, 3}}
|
||||
bytes, err := json.Marshal(item)
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err2 := collectionClient.UpsertItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
bytes,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.NotNil(t, r)
|
||||
assert.Nil(t, err2)
|
||||
})
|
||||
|
||||
t.Run("UpsertItem that already exists", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
|
||||
item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": false, "arr": []int{1, 2, 3, 4}}
|
||||
bytes, err := json.Marshal(item)
|
||||
assert.Nil(t, err)
|
||||
|
||||
r, err2 := collectionClient.UpsertItem(
|
||||
context,
|
||||
azcosmos.PartitionKey{},
|
||||
bytes,
|
||||
&azcosmos.ItemOptions{
|
||||
EnableContentResponseOnWrite: false,
|
||||
},
|
||||
)
|
||||
assert.NotNil(t, r)
|
||||
assert.Nil(t, err2)
|
||||
})
|
||||
})
|
||||
|
||||
runTestsWithPresets(t, "Test_Documents_TransactionalBatch", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||
collectionClient := documents_InitializeDb(t, ts)
|
||||
|
||||
t.Run("Should execute CREATE transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
newItem := map[string]interface{}{
|
||||
"id": "678901",
|
||||
}
|
||||
bytes, err := json.Marshal(newItem)
|
||||
assert.Nil(t, err)
|
||||
|
||||
batch.CreateItem(bytes, nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||
|
||||
createdDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||
assert.Equal(t, newItem["id"], createdDoc["id"])
|
||||
})
|
||||
|
||||
t.Run("Should execute DELETE transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
batch.DeleteItem("12345", nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.Equal(t, int32(http.StatusNoContent), operationResponse.StatusCode)
|
||||
|
||||
_, status := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, "12345")
|
||||
assert.Equal(t, datastore.StatusNotFound, int(status))
|
||||
})
|
||||
|
||||
t.Run("Should execute REPLACE transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
newItem := map[string]interface{}{
|
||||
"id": "67890",
|
||||
"pk": "666",
|
||||
}
|
||||
bytes, err := json.Marshal(newItem)
|
||||
assert.Nil(t, err)
|
||||
|
||||
batch.ReplaceItem("67890", bytes, nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||
assert.Equal(t, newItem["pk"], itemResponseBody["pk"])
|
||||
|
||||
updatedDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||
assert.Equal(t, newItem["id"], updatedDoc["id"])
|
||||
assert.Equal(t, newItem["pk"], updatedDoc["pk"])
|
||||
})
|
||||
|
||||
t.Run("Should execute UPSERT transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
newItem := map[string]interface{}{
|
||||
"id": "678901",
|
||||
"pk": "666",
|
||||
}
|
||||
bytes, err := json.Marshal(newItem)
|
||||
assert.Nil(t, err)
|
||||
|
||||
batch.UpsertItem(bytes, nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||
assert.Equal(t, newItem["pk"], itemResponseBody["pk"])
|
||||
|
||||
updatedDoc, _ := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||
assert.Equal(t, newItem["id"], updatedDoc["id"])
|
||||
assert.Equal(t, newItem["pk"], updatedDoc["pk"])
|
||||
})
|
||||
|
||||
t.Run("Should execute READ transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
batch.ReadItem("67890", nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusOK), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, "67890", itemResponseBody["id"])
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -14,7 +14,8 @@ import (
|
||||
|
||||
// Request document with trailing slash like python cosmosdb client does.
|
||||
func Test_Documents_Read_Trailing_Slash(t *testing.T) {
|
||||
ts, _ := documents_InitializeDb(t)
|
||||
ts := runTestServer()
|
||||
documents_InitializeDb(t, ts)
|
||||
defer ts.Server.Close()
|
||||
|
||||
t.Run("Read doc with client that appends slash to path", func(t *testing.T) {
|
||||
|
@ -8,16 +8,23 @@ import (
|
||||
"github.com/pikami/cosmium/api"
|
||||
"github.com/pikami/cosmium/api/config"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
badgerdatastore "github.com/pikami/cosmium/internal/datastore/badger_datastore"
|
||||
mapdatastore "github.com/pikami/cosmium/internal/datastore/map_datastore"
|
||||
)
|
||||
|
||||
func main() {
|
||||
configuration := config.ParseFlags()
|
||||
|
||||
var dataStore datastore.DataStore = mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{
|
||||
InitialDataFilePath: configuration.InitialDataFilePath,
|
||||
PersistDataFilePath: configuration.PersistDataFilePath,
|
||||
})
|
||||
var dataStore datastore.DataStore
|
||||
switch configuration.DataStore {
|
||||
case config.DataStoreBadger:
|
||||
dataStore = badgerdatastore.NewBadgerDataStore()
|
||||
default:
|
||||
dataStore = mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{
|
||||
InitialDataFilePath: configuration.InitialDataFilePath,
|
||||
PersistDataFilePath: configuration.PersistDataFilePath,
|
||||
})
|
||||
}
|
||||
|
||||
server := api.NewApiServer(dataStore, &configuration)
|
||||
err := server.Start()
|
||||
@ -25,10 +32,10 @@ func main() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
waitForExit(server, dataStore, configuration)
|
||||
waitForExit(server, dataStore)
|
||||
}
|
||||
|
||||
func waitForExit(server *api.ApiServer, dataStore datastore.DataStore, config config.ServerConfig) {
|
||||
func waitForExit(server *api.ApiServer, dataStore datastore.DataStore) {
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
|
12
go.mod
12
go.mod
@ -6,6 +6,7 @@ require (
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.3.0
|
||||
github.com/cosmiumdev/json-patch/v5 v5.9.3
|
||||
github.com/dgraph-io/badger/v4 v4.6.0
|
||||
github.com/gin-gonic/gin v1.10.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
@ -17,15 +18,22 @@ require (
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
|
||||
github.com/bytedance/sonic v1.12.9 // indirect
|
||||
github.com/bytedance/sonic/loader v0.2.3 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.5 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgraph-io/ristretto/v2 v2.1.0 // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
|
||||
github.com/gin-contrib/sse v1.0.0 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.25.0 // indirect
|
||||
github.com/goccy/go-json v0.10.5 // indirect
|
||||
github.com/google/flatbuffers v25.2.10+incompatible // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
@ -36,6 +44,10 @@ require (
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/ugorji/go/codec v1.2.12 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/otel v1.34.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.34.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.34.0 // indirect
|
||||
golang.org/x/arch v0.14.0 // indirect
|
||||
golang.org/x/crypto v0.35.0 // indirect
|
||||
golang.org/x/net v0.35.0 // indirect
|
||||
|
31
go.sum
31
go.sum
@ -15,6 +15,8 @@ github.com/bytedance/sonic v1.12.9/go.mod h1:uVvFidNmlt9+wa31S1urfwwthTWteBgG0hW
|
||||
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
|
||||
github.com/bytedance/sonic/loader v0.2.3 h1:yctD0Q3v2NOGfSWPLPvG2ggA2kV6TS6s4wioyEqssH0=
|
||||
github.com/bytedance/sonic/loader v0.2.3/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4=
|
||||
github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
|
||||
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
|
||||
@ -23,12 +25,25 @@ github.com/cosmiumdev/json-patch/v5 v5.9.3/go.mod h1:WzSTCdia0WrlZtjnL19P4RiwWtf
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgraph-io/badger/v4 v4.6.0 h1:acOwfOOZ4p1dPRnYzvkVm7rUk2Y21TgPVepCy5dJdFQ=
|
||||
github.com/dgraph-io/badger/v4 v4.6.0/go.mod h1:KSJ5VTuZNC3Sd+YhvVjk2nYua9UZnnTr/SkXvdtiPgI=
|
||||
github.com/dgraph-io/ristretto/v2 v2.1.0 h1:59LjpOJLNDULHh8MC4UaegN52lC4JnO2dITsie/Pa8I=
|
||||
github.com/dgraph-io/ristretto/v2 v2.1.0/go.mod h1:uejeqfYXpUomfse0+lO+13ATz4TypQYLJZzBSAemuB4=
|
||||
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
|
||||
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
|
||||
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
|
||||
github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E=
|
||||
github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0=
|
||||
github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
|
||||
github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
|
||||
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
|
||||
@ -41,6 +56,8 @@ github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
|
||||
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
|
||||
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
|
||||
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
|
||||
github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q=
|
||||
github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
@ -48,6 +65,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
|
||||
github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||
@ -75,8 +94,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
|
||||
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
|
||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
@ -93,6 +112,14 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
|
||||
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
|
||||
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
|
||||
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
|
||||
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
|
||||
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
|
||||
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
|
||||
golang.org/x/arch v0.14.0 h1:z9JUEZWr8x4rR0OU6c4/4t6E6jOZ8/QBS2bBYBm4tx4=
|
||||
golang.org/x/arch v0.14.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
|
||||
golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs=
|
||||
|
36
internal/datastore/badger_datastore/badger_datastore.go
Normal file
36
internal/datastore/badger_datastore/badger_datastore.go
Normal file
@ -0,0 +1,36 @@
|
||||
package badgerdatastore
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
)
|
||||
|
||||
type BadgerDataStore struct {
|
||||
db *badger.DB
|
||||
}
|
||||
|
||||
func NewBadgerDataStore() *BadgerDataStore {
|
||||
gob.Register([]interface{}{})
|
||||
|
||||
badgerOpts := badger.DefaultOptions("").WithInMemory(true)
|
||||
|
||||
db, err := badger.Open(badgerOpts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &BadgerDataStore{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) Close() {
|
||||
r.db.Close()
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) DumpToJson() (string, error) {
|
||||
logger.ErrorLn("Badger datastore does not support state export currently.")
|
||||
return "{}", nil
|
||||
}
|
103
internal/datastore/badger_datastore/collections.go
Normal file
103
internal/datastore/badger_datastore/collections.go
Normal file
@ -0,0 +1,103 @@
|
||||
package badgerdatastore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
"github.com/pikami/cosmium/internal/resourceid"
|
||||
structhidrators "github.com/pikami/cosmium/internal/struct_hidrators"
|
||||
)
|
||||
|
||||
func (r *BadgerDataStore) GetAllCollections(databaseId string) ([]datastore.Collection, datastore.DataStoreStatus) {
|
||||
exists, err := keyExists(r.db.NewTransaction(false), generateDatabaseKey(databaseId))
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while checking if database exists:", err)
|
||||
return nil, datastore.Unknown
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
colls, status := listByPrefix[datastore.Collection](r.db, generateKey(resourceid.ResourceTypeCollection, databaseId, "", ""))
|
||||
if status == datastore.StatusOk {
|
||||
return colls, datastore.StatusOk
|
||||
}
|
||||
|
||||
return nil, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) GetCollection(databaseId string, collectionId string) (datastore.Collection, datastore.DataStoreStatus) {
|
||||
collectionKey := generateCollectionKey(databaseId, collectionId)
|
||||
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
var collection datastore.Collection
|
||||
status := getKey(txn, collectionKey, &collection)
|
||||
|
||||
return collection, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) DeleteCollection(databaseId string, collectionId string) datastore.DataStoreStatus {
|
||||
collectionKey := generateCollectionKey(databaseId, collectionId)
|
||||
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
prefixes := []string{
|
||||
generateKey(resourceid.ResourceTypeDocument, databaseId, collectionId, ""),
|
||||
generateKey(resourceid.ResourceTypeTrigger, databaseId, collectionId, ""),
|
||||
generateKey(resourceid.ResourceTypeStoredProcedure, databaseId, collectionId, ""),
|
||||
generateKey(resourceid.ResourceTypeUserDefinedFunction, databaseId, collectionId, ""),
|
||||
collectionKey,
|
||||
}
|
||||
for _, prefix := range prefixes {
|
||||
if err := deleteKeysByPrefix(txn, prefix); err != nil {
|
||||
return datastore.Unknown
|
||||
}
|
||||
}
|
||||
|
||||
err := txn.Commit()
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while committing transaction:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
return datastore.StatusOk
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) CreateCollection(databaseId string, newCollection datastore.Collection) (datastore.Collection, datastore.DataStoreStatus) {
|
||||
collectionKey := generateCollectionKey(databaseId, newCollection.ID)
|
||||
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
collectionExists, err := keyExists(txn, collectionKey)
|
||||
if err != nil || collectionExists {
|
||||
return datastore.Collection{}, datastore.Conflict
|
||||
}
|
||||
|
||||
var database datastore.Database
|
||||
status := getKey(txn, generateDatabaseKey(databaseId), &database)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.Collection{}, status
|
||||
}
|
||||
|
||||
newCollection = structhidrators.Hidrate(newCollection).(datastore.Collection)
|
||||
|
||||
newCollection.TimeStamp = time.Now().Unix()
|
||||
newCollection.ResourceID = resourceid.NewCombined(database.ResourceID, resourceid.New(resourceid.ResourceTypeCollection))
|
||||
newCollection.ETag = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
newCollection.Self = fmt.Sprintf("dbs/%s/colls/%s/", database.ResourceID, newCollection.ResourceID)
|
||||
|
||||
status = insertKey(txn, collectionKey, newCollection)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.Collection{}, status
|
||||
}
|
||||
|
||||
return newCollection, datastore.StatusOk
|
||||
}
|
80
internal/datastore/badger_datastore/databases.go
Normal file
80
internal/datastore/badger_datastore/databases.go
Normal file
@ -0,0 +1,80 @@
|
||||
package badgerdatastore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
"github.com/pikami/cosmium/internal/resourceid"
|
||||
)
|
||||
|
||||
func (r *BadgerDataStore) GetAllDatabases() ([]datastore.Database, datastore.DataStoreStatus) {
|
||||
dbs, status := listByPrefix[datastore.Database](r.db, DatabaseKeyPrefix)
|
||||
if status == datastore.StatusOk {
|
||||
return dbs, datastore.StatusOk
|
||||
}
|
||||
|
||||
return nil, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) GetDatabase(id string) (datastore.Database, datastore.DataStoreStatus) {
|
||||
databaseKey := generateDatabaseKey(id)
|
||||
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
var database datastore.Database
|
||||
status := getKey(txn, databaseKey, &database)
|
||||
|
||||
return database, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) DeleteDatabase(id string) datastore.DataStoreStatus {
|
||||
databaseKey := generateDatabaseKey(id)
|
||||
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
prefixes := []string{
|
||||
generateKey(resourceid.ResourceTypeCollection, id, "", ""),
|
||||
generateKey(resourceid.ResourceTypeDocument, id, "", ""),
|
||||
generateKey(resourceid.ResourceTypeTrigger, id, "", ""),
|
||||
generateKey(resourceid.ResourceTypeStoredProcedure, id, "", ""),
|
||||
generateKey(resourceid.ResourceTypeUserDefinedFunction, id, "", ""),
|
||||
databaseKey,
|
||||
}
|
||||
for _, prefix := range prefixes {
|
||||
if err := deleteKeysByPrefix(txn, prefix); err != nil {
|
||||
return datastore.Unknown
|
||||
}
|
||||
}
|
||||
|
||||
err := txn.Commit()
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while committing transaction:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
return datastore.StatusOk
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) CreateDatabase(newDatabase datastore.Database) (datastore.Database, datastore.DataStoreStatus) {
|
||||
databaseKey := generateDatabaseKey(newDatabase.ID)
|
||||
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
newDatabase.TimeStamp = time.Now().Unix()
|
||||
newDatabase.ResourceID = resourceid.New(resourceid.ResourceTypeDatabase)
|
||||
newDatabase.ETag = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
newDatabase.Self = fmt.Sprintf("dbs/%s/", newDatabase.ResourceID)
|
||||
|
||||
status := insertKey(txn, databaseKey, newDatabase)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.Database{}, status
|
||||
}
|
||||
|
||||
return newDatabase, datastore.StatusOk
|
||||
}
|
207
internal/datastore/badger_datastore/db_abstractions.go
Normal file
207
internal/datastore/badger_datastore/db_abstractions.go
Normal file
@ -0,0 +1,207 @@
|
||||
package badgerdatastore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
"github.com/pikami/cosmium/internal/resourceid"
|
||||
)
|
||||
|
||||
const (
|
||||
DatabaseKeyPrefix = "DB:"
|
||||
CollectionKeyPrefix = "COL:"
|
||||
DocumentKeyPrefix = "DOC:"
|
||||
TriggerKeyPrefix = "TRG:"
|
||||
StoredProcedureKeyPrefix = "SP:"
|
||||
UserDefinedFunctionKeyPrefix = "UDF:"
|
||||
)
|
||||
|
||||
func generateKey(
|
||||
resourceType resourceid.ResourceType,
|
||||
databaseId string,
|
||||
collectionId string,
|
||||
resourceId string,
|
||||
) string {
|
||||
result := ""
|
||||
|
||||
switch resourceType {
|
||||
case resourceid.ResourceTypeDatabase:
|
||||
result += DatabaseKeyPrefix
|
||||
case resourceid.ResourceTypeCollection:
|
||||
result += CollectionKeyPrefix
|
||||
case resourceid.ResourceTypeDocument:
|
||||
result += DocumentKeyPrefix
|
||||
case resourceid.ResourceTypeTrigger:
|
||||
result += TriggerKeyPrefix
|
||||
case resourceid.ResourceTypeStoredProcedure:
|
||||
result += StoredProcedureKeyPrefix
|
||||
case resourceid.ResourceTypeUserDefinedFunction:
|
||||
result += UserDefinedFunctionKeyPrefix
|
||||
}
|
||||
|
||||
if databaseId != "" {
|
||||
result += databaseId
|
||||
}
|
||||
|
||||
if collectionId != "" {
|
||||
result += "/colls/" + collectionId
|
||||
}
|
||||
|
||||
if resourceId != "" {
|
||||
result += "/" + resourceId
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func generateDatabaseKey(databaseId string) string {
|
||||
return generateKey(resourceid.ResourceTypeDatabase, databaseId, "", "")
|
||||
}
|
||||
|
||||
func generateCollectionKey(databaseId string, collectionId string) string {
|
||||
return generateKey(resourceid.ResourceTypeCollection, databaseId, collectionId, "")
|
||||
}
|
||||
|
||||
func generateDocumentKey(databaseId string, collectionId string, documentId string) string {
|
||||
return generateKey(resourceid.ResourceTypeDocument, databaseId, collectionId, documentId)
|
||||
}
|
||||
|
||||
func generateTriggerKey(databaseId string, collectionId string, triggerId string) string {
|
||||
return generateKey(resourceid.ResourceTypeTrigger, databaseId, collectionId, triggerId)
|
||||
}
|
||||
|
||||
func generateStoredProcedureKey(databaseId string, collectionId string, storedProcedureId string) string {
|
||||
return generateKey(resourceid.ResourceTypeStoredProcedure, databaseId, collectionId, storedProcedureId)
|
||||
}
|
||||
|
||||
func generateUserDefinedFunctionKey(databaseId string, collectionId string, udfId string) string {
|
||||
return generateKey(resourceid.ResourceTypeUserDefinedFunction, databaseId, collectionId, udfId)
|
||||
}
|
||||
|
||||
func insertKey(txn *badger.Txn, key string, value interface{}) datastore.DataStoreStatus {
|
||||
_, err := txn.Get([]byte(key))
|
||||
if err == nil {
|
||||
return datastore.Conflict
|
||||
}
|
||||
|
||||
if err != badger.ErrKeyNotFound {
|
||||
logger.ErrorLn("Error while checking if key exists:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = gob.NewEncoder(&buf).Encode(value)
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while encoding value:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
err = txn.Set([]byte(key), buf.Bytes())
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while setting key:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
err = txn.Commit()
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while committing transaction:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
return datastore.StatusOk
|
||||
}
|
||||
|
||||
func getKey(txn *badger.Txn, key string, value interface{}) datastore.DataStoreStatus {
|
||||
item, err := txn.Get([]byte(key))
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return datastore.StatusNotFound
|
||||
}
|
||||
logger.ErrorLn("Error while getting key:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
val, err := item.ValueCopy(nil)
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while copying value:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
if value == nil {
|
||||
logger.ErrorLn("getKey called with nil value")
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
err = gob.NewDecoder(bytes.NewReader(val)).Decode(value)
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while decoding value:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
return datastore.StatusOk
|
||||
}
|
||||
|
||||
func keyExists(txn *badger.Txn, key string) (bool, error) {
|
||||
_, err := txn.Get([]byte(key))
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
func listByPrefix[T any](db *badger.DB, prefix string) ([]T, datastore.DataStoreStatus) {
|
||||
var results []T
|
||||
|
||||
err := db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = []byte(prefix)
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
var entry T
|
||||
|
||||
status := getKey(txn, string(item.Key()), &entry)
|
||||
if status != datastore.StatusOk {
|
||||
logger.ErrorLn("Failed to retrieve entry:", string(item.Key()))
|
||||
continue
|
||||
}
|
||||
|
||||
results = append(results, entry)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while listing entries:", err)
|
||||
return nil, datastore.Unknown
|
||||
}
|
||||
|
||||
return results, datastore.StatusOk
|
||||
}
|
||||
|
||||
func deleteKeysByPrefix(txn *badger.Txn, prefix string) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = []byte(prefix)
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
key := it.Item().KeyCopy(nil)
|
||||
if err := txn.Delete(key); err != nil {
|
||||
logger.ErrorLn("Failed to delete key:", string(key), "Error:", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
60
internal/datastore/badger_datastore/document_iterator.go
Normal file
60
internal/datastore/badger_datastore/document_iterator.go
Normal file
@ -0,0 +1,60 @@
|
||||
package badgerdatastore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
)
|
||||
|
||||
type BadgerDocumentIterator struct {
|
||||
txn *badger.Txn
|
||||
it *badger.Iterator
|
||||
prefix string
|
||||
}
|
||||
|
||||
func NewBadgerDocumentIterator(txn *badger.Txn, prefix string) *BadgerDocumentIterator {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = []byte(prefix)
|
||||
|
||||
it := txn.NewIterator(opts)
|
||||
it.Rewind()
|
||||
|
||||
return &BadgerDocumentIterator{
|
||||
txn: txn,
|
||||
it: it,
|
||||
prefix: prefix,
|
||||
}
|
||||
}
|
||||
|
||||
func (i *BadgerDocumentIterator) Next() (datastore.Document, datastore.DataStoreStatus) {
|
||||
if !i.it.Valid() {
|
||||
i.it.Close()
|
||||
return datastore.Document{}, datastore.IterEOF
|
||||
}
|
||||
|
||||
item := i.it.Item()
|
||||
val, err := item.ValueCopy(nil)
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while copying value:", err)
|
||||
return datastore.Document{}, datastore.Unknown
|
||||
}
|
||||
|
||||
current := &datastore.Document{}
|
||||
err = gob.NewDecoder(bytes.NewReader(val)).Decode(current)
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while decoding value:", err)
|
||||
return datastore.Document{}, datastore.Unknown
|
||||
}
|
||||
|
||||
i.it.Next()
|
||||
|
||||
return *current, datastore.StatusOk
|
||||
}
|
||||
|
||||
func (i *BadgerDocumentIterator) Close() {
|
||||
i.it.Close()
|
||||
i.txn.Discard()
|
||||
}
|
127
internal/datastore/badger_datastore/documents.go
Normal file
127
internal/datastore/badger_datastore/documents.go
Normal file
@ -0,0 +1,127 @@
|
||||
package badgerdatastore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
"github.com/pikami/cosmium/internal/resourceid"
|
||||
)
|
||||
|
||||
func (r *BadgerDataStore) GetAllDocuments(databaseId string, collectionId string) ([]datastore.Document, datastore.DataStoreStatus) {
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
dbExists, err := keyExists(txn, generateDatabaseKey(databaseId))
|
||||
if err != nil || !dbExists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
collExists, err := keyExists(txn, generateCollectionKey(databaseId, collectionId))
|
||||
if err != nil || !collExists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
docs, status := listByPrefix[datastore.Document](r.db, generateKey(resourceid.ResourceTypeDocument, databaseId, collectionId, ""))
|
||||
if status == datastore.StatusOk {
|
||||
return docs, datastore.StatusOk
|
||||
}
|
||||
|
||||
return nil, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) GetDocumentIterator(databaseId string, collectionId string) (datastore.DocumentIterator, datastore.DataStoreStatus) {
|
||||
txn := r.db.NewTransaction(false)
|
||||
|
||||
dbExists, err := keyExists(txn, generateDatabaseKey(databaseId))
|
||||
if err != nil || !dbExists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
collExists, err := keyExists(txn, generateCollectionKey(databaseId, collectionId))
|
||||
if err != nil || !collExists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
iter := NewBadgerDocumentIterator(txn, generateKey(resourceid.ResourceTypeDocument, databaseId, collectionId, ""))
|
||||
return iter, datastore.StatusOk
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) GetDocument(databaseId string, collectionId string, documentId string) (datastore.Document, datastore.DataStoreStatus) {
|
||||
documentKey := generateDocumentKey(databaseId, collectionId, documentId)
|
||||
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
var document datastore.Document
|
||||
status := getKey(txn, documentKey, &document)
|
||||
|
||||
return document, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) DeleteDocument(databaseId string, collectionId string, documentId string) datastore.DataStoreStatus {
|
||||
documentKey := generateDocumentKey(databaseId, collectionId, documentId)
|
||||
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
exists, err := keyExists(txn, documentKey)
|
||||
if err != nil {
|
||||
return datastore.Unknown
|
||||
}
|
||||
if !exists {
|
||||
return datastore.StatusNotFound
|
||||
}
|
||||
|
||||
err = txn.Delete([]byte(documentKey))
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while deleting document:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
err = txn.Commit()
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while committing transaction:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
return datastore.StatusOk
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) CreateDocument(databaseId string, collectionId string, document map[string]interface{}) (datastore.Document, datastore.DataStoreStatus) {
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
var database datastore.Database
|
||||
status := getKey(txn, generateDatabaseKey(databaseId), &database)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.Document{}, status
|
||||
}
|
||||
|
||||
var collection datastore.Collection
|
||||
status = getKey(txn, generateCollectionKey(databaseId, collectionId), &collection)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.Document{}, status
|
||||
}
|
||||
|
||||
var ok bool
|
||||
var documentId string
|
||||
if documentId, ok = document["id"].(string); !ok || documentId == "" {
|
||||
documentId = fmt.Sprint(uuid.New())
|
||||
document["id"] = documentId
|
||||
}
|
||||
|
||||
document["_ts"] = time.Now().Unix()
|
||||
document["_rid"] = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeDocument))
|
||||
document["_etag"] = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
document["_self"] = fmt.Sprintf("dbs/%s/colls/%s/docs/%s/", database.ResourceID, collection.ResourceID, document["_rid"])
|
||||
|
||||
status = insertKey(txn, generateDocumentKey(databaseId, collectionId, documentId), document)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.Document{}, status
|
||||
}
|
||||
|
||||
return document, datastore.StatusOk
|
||||
}
|
53
internal/datastore/badger_datastore/partition_key_ranges.go
Normal file
53
internal/datastore/badger_datastore/partition_key_ranges.go
Normal file
@ -0,0 +1,53 @@
|
||||
package badgerdatastore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/pikami/cosmium/internal/resourceid"
|
||||
)
|
||||
|
||||
// I have no idea what this is tbh
|
||||
func (r *BadgerDataStore) GetPartitionKeyRanges(databaseId string, collectionId string) ([]datastore.PartitionKeyRange, datastore.DataStoreStatus) {
|
||||
databaseRid := databaseId
|
||||
collectionRid := collectionId
|
||||
var timestamp int64 = 0
|
||||
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
var database datastore.Database
|
||||
status := getKey(txn, generateDatabaseKey(databaseId), &database)
|
||||
if status != datastore.StatusOk {
|
||||
databaseRid = database.ResourceID
|
||||
}
|
||||
|
||||
var collection datastore.Collection
|
||||
status = getKey(txn, generateCollectionKey(databaseId, collectionId), &collection)
|
||||
if status != datastore.StatusOk {
|
||||
collectionRid = collection.ResourceID
|
||||
timestamp = collection.TimeStamp
|
||||
}
|
||||
|
||||
pkrResourceId := resourceid.NewCombined(collectionRid, resourceid.New(resourceid.ResourceTypePartitionKeyRange))
|
||||
pkrSelf := fmt.Sprintf("dbs/%s/colls/%s/pkranges/%s/", databaseRid, collectionRid, pkrResourceId)
|
||||
etag := fmt.Sprintf("\"%s\"", uuid.New())
|
||||
|
||||
return []datastore.PartitionKeyRange{
|
||||
{
|
||||
ResourceID: pkrResourceId,
|
||||
ID: "0",
|
||||
Etag: etag,
|
||||
MinInclusive: "",
|
||||
MaxExclusive: "FF",
|
||||
RidPrefix: 0,
|
||||
Self: pkrSelf,
|
||||
ThroughputFraction: 1,
|
||||
Status: "online",
|
||||
Parents: []interface{}{},
|
||||
TimeStamp: timestamp,
|
||||
Lsn: 17,
|
||||
},
|
||||
}, datastore.StatusOk
|
||||
}
|
107
internal/datastore/badger_datastore/stored_procedures.go
Normal file
107
internal/datastore/badger_datastore/stored_procedures.go
Normal file
@ -0,0 +1,107 @@
|
||||
package badgerdatastore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
"github.com/pikami/cosmium/internal/resourceid"
|
||||
)
|
||||
|
||||
func (r *BadgerDataStore) GetAllStoredProcedures(databaseId string, collectionId string) ([]datastore.StoredProcedure, datastore.DataStoreStatus) {
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
dbExists, err := keyExists(txn, generateDatabaseKey(databaseId))
|
||||
if err != nil || !dbExists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
collExists, err := keyExists(txn, generateCollectionKey(databaseId, collectionId))
|
||||
if err != nil || !collExists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
storedProcedures, status := listByPrefix[datastore.StoredProcedure](r.db, generateKey(resourceid.ResourceTypeStoredProcedure, databaseId, collectionId, ""))
|
||||
if status == datastore.StatusOk {
|
||||
return storedProcedures, datastore.StatusOk
|
||||
}
|
||||
|
||||
return nil, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) GetStoredProcedure(databaseId string, collectionId string, storedProcedureId string) (datastore.StoredProcedure, datastore.DataStoreStatus) {
|
||||
storedProcedureKey := generateStoredProcedureKey(databaseId, collectionId, storedProcedureId)
|
||||
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
var storedProcedure datastore.StoredProcedure
|
||||
status := getKey(txn, storedProcedureKey, &storedProcedure)
|
||||
|
||||
return storedProcedure, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) DeleteStoredProcedure(databaseId string, collectionId string, storedProcedureId string) datastore.DataStoreStatus {
|
||||
storedProcedureKey := generateStoredProcedureKey(databaseId, collectionId, storedProcedureId)
|
||||
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
exists, err := keyExists(txn, storedProcedureKey)
|
||||
if err != nil {
|
||||
return datastore.Unknown
|
||||
}
|
||||
if !exists {
|
||||
return datastore.StatusNotFound
|
||||
}
|
||||
|
||||
err = txn.Delete([]byte(storedProcedureKey))
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while deleting stored procedure:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
err = txn.Commit()
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while committing transaction:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
return datastore.StatusOk
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) CreateStoredProcedure(databaseId string, collectionId string, storedProcedure datastore.StoredProcedure) (datastore.StoredProcedure, datastore.DataStoreStatus) {
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
if storedProcedure.ID == "" {
|
||||
return datastore.StoredProcedure{}, datastore.BadRequest
|
||||
}
|
||||
|
||||
var database datastore.Database
|
||||
status := getKey(txn, generateDatabaseKey(databaseId), &database)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.StoredProcedure{}, status
|
||||
}
|
||||
|
||||
var collection datastore.Collection
|
||||
status = getKey(txn, generateCollectionKey(databaseId, collectionId), &collection)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.StoredProcedure{}, status
|
||||
}
|
||||
|
||||
storedProcedure.TimeStamp = time.Now().Unix()
|
||||
storedProcedure.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeStoredProcedure))
|
||||
storedProcedure.ETag = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
storedProcedure.Self = fmt.Sprintf("dbs/%s/colls/%s/sprocs/%s/", database.ResourceID, collection.ResourceID, storedProcedure.ResourceID)
|
||||
|
||||
status = insertKey(txn, generateStoredProcedureKey(databaseId, collectionId, storedProcedure.ID), storedProcedure)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.StoredProcedure{}, status
|
||||
}
|
||||
|
||||
return storedProcedure, datastore.StatusOk
|
||||
}
|
107
internal/datastore/badger_datastore/triggers.go
Normal file
107
internal/datastore/badger_datastore/triggers.go
Normal file
@ -0,0 +1,107 @@
|
||||
package badgerdatastore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
"github.com/pikami/cosmium/internal/resourceid"
|
||||
)
|
||||
|
||||
func (r *BadgerDataStore) GetAllTriggers(databaseId string, collectionId string) ([]datastore.Trigger, datastore.DataStoreStatus) {
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
dbExists, err := keyExists(txn, generateDatabaseKey(databaseId))
|
||||
if err != nil || !dbExists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
collExists, err := keyExists(txn, generateCollectionKey(databaseId, collectionId))
|
||||
if err != nil || !collExists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
triggers, status := listByPrefix[datastore.Trigger](r.db, generateKey(resourceid.ResourceTypeTrigger, databaseId, collectionId, ""))
|
||||
if status == datastore.StatusOk {
|
||||
return triggers, datastore.StatusOk
|
||||
}
|
||||
|
||||
return nil, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) GetTrigger(databaseId string, collectionId string, triggerId string) (datastore.Trigger, datastore.DataStoreStatus) {
|
||||
triggerKey := generateTriggerKey(databaseId, collectionId, triggerId)
|
||||
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
var trigger datastore.Trigger
|
||||
status := getKey(txn, triggerKey, &trigger)
|
||||
|
||||
return trigger, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) DeleteTrigger(databaseId string, collectionId string, triggerId string) datastore.DataStoreStatus {
|
||||
triggerKey := generateTriggerKey(databaseId, collectionId, triggerId)
|
||||
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
exists, err := keyExists(txn, triggerKey)
|
||||
if err != nil {
|
||||
return datastore.Unknown
|
||||
}
|
||||
if !exists {
|
||||
return datastore.StatusNotFound
|
||||
}
|
||||
|
||||
err = txn.Delete([]byte(triggerKey))
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while deleting trigger:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
err = txn.Commit()
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while committing transaction:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
return datastore.StatusOk
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) CreateTrigger(databaseId string, collectionId string, trigger datastore.Trigger) (datastore.Trigger, datastore.DataStoreStatus) {
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
if trigger.ID == "" {
|
||||
return datastore.Trigger{}, datastore.BadRequest
|
||||
}
|
||||
|
||||
var database datastore.Database
|
||||
status := getKey(txn, generateDatabaseKey(databaseId), &database)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.Trigger{}, status
|
||||
}
|
||||
|
||||
var collection datastore.Collection
|
||||
status = getKey(txn, generateCollectionKey(databaseId, collectionId), &collection)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.Trigger{}, status
|
||||
}
|
||||
|
||||
trigger.TimeStamp = time.Now().Unix()
|
||||
trigger.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeTrigger))
|
||||
trigger.ETag = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
trigger.Self = fmt.Sprintf("dbs/%s/colls/%s/triggers/%s/", database.ResourceID, collection.ResourceID, trigger.ResourceID)
|
||||
|
||||
status = insertKey(txn, generateTriggerKey(databaseId, collectionId, trigger.ID), trigger)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.Trigger{}, status
|
||||
}
|
||||
|
||||
return trigger, datastore.StatusOk
|
||||
}
|
107
internal/datastore/badger_datastore/user_defined_functions.go
Normal file
107
internal/datastore/badger_datastore/user_defined_functions.go
Normal file
@ -0,0 +1,107 @@
|
||||
package badgerdatastore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
"github.com/pikami/cosmium/internal/resourceid"
|
||||
)
|
||||
|
||||
func (r *BadgerDataStore) GetAllUserDefinedFunctions(databaseId string, collectionId string) ([]datastore.UserDefinedFunction, datastore.DataStoreStatus) {
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
dbExists, err := keyExists(txn, generateDatabaseKey(databaseId))
|
||||
if err != nil || !dbExists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
collExists, err := keyExists(txn, generateCollectionKey(databaseId, collectionId))
|
||||
if err != nil || !collExists {
|
||||
return nil, datastore.StatusNotFound
|
||||
}
|
||||
|
||||
udfs, status := listByPrefix[datastore.UserDefinedFunction](r.db, generateKey(resourceid.ResourceTypeUserDefinedFunction, databaseId, collectionId, ""))
|
||||
if status == datastore.StatusOk {
|
||||
return udfs, datastore.StatusOk
|
||||
}
|
||||
|
||||
return nil, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) GetUserDefinedFunction(databaseId string, collectionId string, udfId string) (datastore.UserDefinedFunction, datastore.DataStoreStatus) {
|
||||
udfKey := generateUserDefinedFunctionKey(databaseId, collectionId, udfId)
|
||||
|
||||
txn := r.db.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
var udf datastore.UserDefinedFunction
|
||||
status := getKey(txn, udfKey, &udf)
|
||||
|
||||
return udf, status
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) DeleteUserDefinedFunction(databaseId string, collectionId string, udfId string) datastore.DataStoreStatus {
|
||||
udfKey := generateUserDefinedFunctionKey(databaseId, collectionId, udfId)
|
||||
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
exists, err := keyExists(txn, udfKey)
|
||||
if err != nil {
|
||||
return datastore.Unknown
|
||||
}
|
||||
if !exists {
|
||||
return datastore.StatusNotFound
|
||||
}
|
||||
|
||||
err = txn.Delete([]byte(udfKey))
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while deleting user defined function:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
err = txn.Commit()
|
||||
if err != nil {
|
||||
logger.ErrorLn("Error while committing transaction:", err)
|
||||
return datastore.Unknown
|
||||
}
|
||||
|
||||
return datastore.StatusOk
|
||||
}
|
||||
|
||||
func (r *BadgerDataStore) CreateUserDefinedFunction(databaseId string, collectionId string, udf datastore.UserDefinedFunction) (datastore.UserDefinedFunction, datastore.DataStoreStatus) {
|
||||
txn := r.db.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
if udf.ID == "" {
|
||||
return datastore.UserDefinedFunction{}, datastore.BadRequest
|
||||
}
|
||||
|
||||
var database datastore.Database
|
||||
status := getKey(txn, generateDatabaseKey(databaseId), &database)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.UserDefinedFunction{}, status
|
||||
}
|
||||
|
||||
var collection datastore.Collection
|
||||
status = getKey(txn, generateCollectionKey(databaseId, collectionId), &collection)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.UserDefinedFunction{}, status
|
||||
}
|
||||
|
||||
udf.TimeStamp = time.Now().Unix()
|
||||
udf.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeUserDefinedFunction))
|
||||
udf.ETag = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
udf.Self = fmt.Sprintf("dbs/%s/colls/%s/udfs/%s/", database.ResourceID, collection.ResourceID, udf.ResourceID)
|
||||
|
||||
status = insertKey(txn, generateUserDefinedFunctionKey(databaseId, collectionId, udf.ID), udf)
|
||||
if status != datastore.StatusOk {
|
||||
return datastore.UserDefinedFunction{}, status
|
||||
}
|
||||
|
||||
return udf, datastore.StatusOk
|
||||
}
|
@ -40,4 +40,5 @@ type DataStore interface {
|
||||
|
||||
type DocumentIterator interface {
|
||||
Next() (Document, DataStoreStatus)
|
||||
Close()
|
||||
}
|
||||
|
@ -15,3 +15,7 @@ func (i *ArrayDocumentIterator) Next() (datastore.Document, datastore.DataStoreS
|
||||
|
||||
return i.documents[i.index], datastore.StatusOk
|
||||
}
|
||||
|
||||
func (i *ArrayDocumentIterator) Close() {
|
||||
i.documents = []datastore.Document{}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ const (
|
||||
Conflict = 3
|
||||
BadRequest = 4
|
||||
IterEOF = 5
|
||||
Unknown = 6
|
||||
)
|
||||
|
||||
type TriggerOperation string
|
||||
|
@ -11,6 +11,8 @@ import (
|
||||
|
||||
"github.com/pikami/cosmium/api"
|
||||
"github.com/pikami/cosmium/api/config"
|
||||
"github.com/pikami/cosmium/internal/datastore"
|
||||
badgerdatastore "github.com/pikami/cosmium/internal/datastore/badger_datastore"
|
||||
mapdatastore "github.com/pikami/cosmium/internal/datastore/map_datastore"
|
||||
)
|
||||
|
||||
@ -32,10 +34,16 @@ func CreateServerInstance(serverName *C.char, configurationJSON *C.char) int {
|
||||
configuration.ApplyDefaultsToEmptyFields()
|
||||
configuration.PopulateCalculatedFields()
|
||||
|
||||
dataStore := mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{
|
||||
InitialDataFilePath: configuration.InitialDataFilePath,
|
||||
PersistDataFilePath: configuration.PersistDataFilePath,
|
||||
})
|
||||
var dataStore datastore.DataStore
|
||||
switch configuration.DataStore {
|
||||
case config.DataStoreBadger:
|
||||
dataStore = badgerdatastore.NewBadgerDataStore()
|
||||
default:
|
||||
dataStore = mapdatastore.NewMapDataStore(mapdatastore.MapDataStoreOptions{
|
||||
InitialDataFilePath: configuration.InitialDataFilePath,
|
||||
PersistDataFilePath: configuration.PersistDataFilePath,
|
||||
})
|
||||
}
|
||||
|
||||
server := api.NewApiServer(dataStore, &configuration)
|
||||
err = server.Start()
|
||||
|
Loading…
x
Reference in New Issue
Block a user