4 Commits

Author SHA1 Message Date
Pijus Kamandulis
1cf5ae92f4 Shared library stability improvements 2025-02-09 11:45:10 +02:00
Pijus Kamandulis
5d99b653cc Generate more realistic resource ids 2025-02-09 00:36:35 +02:00
Pijus Kamandulis
787cdb33cf Fix OFFSET clause 2025-02-08 15:28:06 +02:00
Pijus Kamandulis
5caa829ac1 Implement 'Transactional batch operations' 2025-02-04 20:35:15 +02:00
22 changed files with 423 additions and 76 deletions

24
api/api_models/models.go Normal file
View File

@@ -0,0 +1,24 @@
package apimodels
const (
BatchOperationTypeCreate = "Create"
BatchOperationTypeDelete = "Delete"
BatchOperationTypeReplace = "Replace"
BatchOperationTypeUpsert = "Upsert"
BatchOperationTypeRead = "Read"
BatchOperationTypePatch = "Patch"
)
type BatchOperation struct {
OperationType string `json:"operationType"`
Id string `json:"id"`
ResourceBody map[string]interface{} `json:"resourceBody"`
}
type BatchOperationResult struct {
StatusCode int `json:"statusCode"`
RequestCharge float64 `json:"requestCharge"`
ResourceBody map[string]interface{} `json:"resourceBody"`
Etag string `json:"etag"`
Message string `json:"message"`
}

View File

@@ -8,6 +8,7 @@ import (
jsonpatch "github.com/cosmiumdev/json-patch/v5"
"github.com/gin-gonic/gin"
apimodels "github.com/pikami/cosmium/api/api_models"
"github.com/pikami/cosmium/internal/constants"
"github.com/pikami/cosmium/internal/logger"
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
@@ -183,6 +184,13 @@ func (h *Handlers) DocumentsPost(c *gin.Context) {
databaseId := c.Param("databaseId")
collectionId := c.Param("collId")
// Handle batch requests
isBatchRequest, _ := strconv.ParseBool(c.GetHeader("x-ms-cosmos-is-batch-request"))
if isBatchRequest {
h.handleBatchRequest(c)
return
}
var requestBody map[string]interface{}
if err := c.BindJSON(&requestBody); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()})
@@ -191,30 +199,7 @@ func (h *Handlers) DocumentsPost(c *gin.Context) {
query := requestBody["query"]
if query != nil {
if c.GetHeader("x-ms-cosmos-is-query-plan-request") != "" {
c.IndentedJSON(http.StatusOK, constants.QueryPlanResponse)
return
}
var queryParameters map[string]interface{}
if paramsArray, ok := requestBody["parameters"].([]interface{}); ok {
queryParameters = parametersToMap(paramsArray)
}
docs, status := h.repository.ExecuteQueryDocuments(databaseId, collectionId, query.(string), queryParameters)
if status != repositorymodels.StatusOk {
// TODO: Currently we return everything if the query fails
h.GetAllDocuments(c)
return
}
collection, _ := h.repository.GetCollection(databaseId, collectionId)
c.Header("x-ms-item-count", fmt.Sprintf("%d", len(docs)))
c.IndentedJSON(http.StatusOK, gin.H{
"_rid": collection.ResourceID,
"Documents": docs,
"_count": len(docs),
})
h.handleDocumentQuery(c, requestBody)
return
}
@@ -253,3 +238,131 @@ func parametersToMap(pairs []interface{}) map[string]interface{} {
return result
}
func (h *Handlers) handleDocumentQuery(c *gin.Context, requestBody map[string]interface{}) {
databaseId := c.Param("databaseId")
collectionId := c.Param("collId")
if c.GetHeader("x-ms-cosmos-is-query-plan-request") != "" {
c.IndentedJSON(http.StatusOK, constants.QueryPlanResponse)
return
}
var queryParameters map[string]interface{}
if paramsArray, ok := requestBody["parameters"].([]interface{}); ok {
queryParameters = parametersToMap(paramsArray)
}
docs, status := h.repository.ExecuteQueryDocuments(databaseId, collectionId, requestBody["query"].(string), queryParameters)
if status != repositorymodels.StatusOk {
// TODO: Currently we return everything if the query fails
h.GetAllDocuments(c)
return
}
collection, _ := h.repository.GetCollection(databaseId, collectionId)
c.Header("x-ms-item-count", fmt.Sprintf("%d", len(docs)))
c.IndentedJSON(http.StatusOK, gin.H{
"_rid": collection.ResourceID,
"Documents": docs,
"_count": len(docs),
})
}
func (h *Handlers) handleBatchRequest(c *gin.Context) {
databaseId := c.Param("databaseId")
collectionId := c.Param("collId")
batchOperations := make([]apimodels.BatchOperation, 0)
if err := c.BindJSON(&batchOperations); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()})
return
}
batchOperationResults := make([]apimodels.BatchOperationResult, len(batchOperations))
for idx, operation := range batchOperations {
switch operation.OperationType {
case apimodels.BatchOperationTypeCreate:
createdDocument, status := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody)
responseCode := repositoryStatusToResponseCode(status)
if status == repositorymodels.StatusOk {
responseCode = http.StatusCreated
}
batchOperationResults[idx] = apimodels.BatchOperationResult{
StatusCode: responseCode,
ResourceBody: createdDocument,
}
case apimodels.BatchOperationTypeDelete:
status := h.repository.DeleteDocument(databaseId, collectionId, operation.Id)
responseCode := repositoryStatusToResponseCode(status)
if status == repositorymodels.StatusOk {
responseCode = http.StatusNoContent
}
batchOperationResults[idx] = apimodels.BatchOperationResult{
StatusCode: responseCode,
}
case apimodels.BatchOperationTypeReplace:
deleteStatus := h.repository.DeleteDocument(databaseId, collectionId, operation.Id)
if deleteStatus == repositorymodels.StatusNotFound {
batchOperationResults[idx] = apimodels.BatchOperationResult{
StatusCode: http.StatusNotFound,
}
continue
}
createdDocument, createStatus := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody)
responseCode := repositoryStatusToResponseCode(createStatus)
if createStatus == repositorymodels.StatusOk {
responseCode = http.StatusCreated
}
batchOperationResults[idx] = apimodels.BatchOperationResult{
StatusCode: responseCode,
ResourceBody: createdDocument,
}
case apimodels.BatchOperationTypeUpsert:
documentId := operation.ResourceBody["id"].(string)
h.repository.DeleteDocument(databaseId, collectionId, documentId)
createdDocument, createStatus := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody)
responseCode := repositoryStatusToResponseCode(createStatus)
if createStatus == repositorymodels.StatusOk {
responseCode = http.StatusCreated
}
batchOperationResults[idx] = apimodels.BatchOperationResult{
StatusCode: responseCode,
ResourceBody: createdDocument,
}
case apimodels.BatchOperationTypeRead:
document, status := h.repository.GetDocument(databaseId, collectionId, operation.Id)
batchOperationResults[idx] = apimodels.BatchOperationResult{
StatusCode: repositoryStatusToResponseCode(status),
ResourceBody: document,
}
case apimodels.BatchOperationTypePatch:
batchOperationResults[idx] = apimodels.BatchOperationResult{
StatusCode: http.StatusNotImplemented,
Message: "Patch operation is not implemented",
}
default:
batchOperationResults[idx] = apimodels.BatchOperationResult{
StatusCode: http.StatusBadRequest,
Message: "Unknown operation type",
}
}
}
c.JSON(http.StatusOK, batchOperationResults)
}
func repositoryStatusToResponseCode(status repositorymodels.RepositoryStatus) int {
switch status {
case repositorymodels.StatusOk:
return http.StatusOK
case repositorymodels.StatusNotFound:
return http.StatusNotFound
case repositorymodels.Conflict:
return http.StatusConflict
case repositorymodels.BadRequest:
return http.StatusBadRequest
default:
return http.StatusInternalServerError
}
}

View File

@@ -75,8 +75,7 @@ func requestToResourceId(c *gin.Context) string {
isFeed := c.Request.Header.Get("A-Im") == "Incremental Feed"
if resourceType == "pkranges" && isFeed {
// CosmosSDK replaces '/' with '-' in resource id requests
resourceId = strings.Replace(collId, "-", "/", -1)
resourceId = collId
}
return resourceId

View File

@@ -6,6 +6,7 @@ import (
"github.com/gin-gonic/gin"
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
"github.com/pikami/cosmium/internal/resourceid"
)
func (h *Handlers) GetPartitionKeyRanges(c *gin.Context) {
@@ -31,8 +32,9 @@ func (h *Handlers) GetPartitionKeyRanges(c *gin.Context) {
collectionRid = collection.ResourceID
}
rid := resourceid.NewCombined(collectionRid, resourceid.New(resourceid.ResourceTypePartitionKeyRange))
c.IndentedJSON(http.StatusOK, gin.H{
"_rid": collectionRid,
"_rid": rid,
"_count": len(partitionKeyRanges),
"PartitionKeyRanges": partitionKeyRanges,
})

View File

@@ -5,6 +5,7 @@ import (
"github.com/pikami/cosmium/api"
"github.com/pikami/cosmium/api/config"
"github.com/pikami/cosmium/internal/logger"
"github.com/pikami/cosmium/internal/repositories"
)
@@ -35,6 +36,9 @@ func runTestServer() *TestServer {
ExplorerBaseUrlLocation: config.ExplorerBaseUrlLocation,
}
config.LogLevel = "debug"
logger.SetLogLevel(logger.LogLevelDebug)
return runTestServerCustomConfig(config)
}

View File

@@ -377,5 +377,140 @@ func Test_Documents_Patch(t *testing.T) {
assert.NotNil(t, r)
assert.Nil(t, err2)
})
}
func Test_Documents_TransactionalBatch(t *testing.T) {
ts, collectionClient := documents_InitializeDb(t)
defer ts.Server.Close()
t.Run("Should execute CREATE transactional batch", func(t *testing.T) {
context := context.TODO()
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
newItem := map[string]interface{}{
"id": "678901",
}
bytes, err := json.Marshal(newItem)
assert.Nil(t, err)
batch.CreateItem(bytes, nil)
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
assert.Nil(t, err)
assert.True(t, response.Success)
assert.Equal(t, 1, len(response.OperationResults))
operationResponse := response.OperationResults[0]
assert.NotNil(t, operationResponse)
assert.NotNil(t, operationResponse.ResourceBody)
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
var itemResponseBody map[string]interface{}
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
assert.Equal(t, newItem["id"], itemResponseBody["id"])
createdDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
assert.Equal(t, newItem["id"], createdDoc["id"])
})
t.Run("Should execute DELETE transactional batch", func(t *testing.T) {
context := context.TODO()
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
batch.DeleteItem("12345", nil)
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
assert.Nil(t, err)
assert.True(t, response.Success)
assert.Equal(t, 1, len(response.OperationResults))
operationResponse := response.OperationResults[0]
assert.NotNil(t, operationResponse)
assert.Equal(t, int32(http.StatusNoContent), operationResponse.StatusCode)
_, status := ts.Repository.GetDocument(testDatabaseName, testCollectionName, "12345")
assert.Equal(t, repositorymodels.StatusNotFound, int(status))
})
t.Run("Should execute REPLACE transactional batch", func(t *testing.T) {
context := context.TODO()
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
newItem := map[string]interface{}{
"id": "67890",
"pk": "666",
}
bytes, err := json.Marshal(newItem)
assert.Nil(t, err)
batch.ReplaceItem("67890", bytes, nil)
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
assert.Nil(t, err)
assert.True(t, response.Success)
assert.Equal(t, 1, len(response.OperationResults))
operationResponse := response.OperationResults[0]
assert.NotNil(t, operationResponse)
assert.NotNil(t, operationResponse.ResourceBody)
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
var itemResponseBody map[string]interface{}
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
assert.Equal(t, newItem["id"], itemResponseBody["id"])
assert.Equal(t, newItem["pk"], itemResponseBody["pk"])
updatedDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
assert.Equal(t, newItem["id"], updatedDoc["id"])
assert.Equal(t, newItem["pk"], updatedDoc["pk"])
})
t.Run("Should execute UPSERT transactional batch", func(t *testing.T) {
context := context.TODO()
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
newItem := map[string]interface{}{
"id": "678901",
"pk": "666",
}
bytes, err := json.Marshal(newItem)
assert.Nil(t, err)
batch.UpsertItem(bytes, nil)
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
assert.Nil(t, err)
assert.True(t, response.Success)
assert.Equal(t, 1, len(response.OperationResults))
operationResponse := response.OperationResults[0]
assert.NotNil(t, operationResponse)
assert.NotNil(t, operationResponse.ResourceBody)
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
var itemResponseBody map[string]interface{}
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
assert.Equal(t, newItem["id"], itemResponseBody["id"])
assert.Equal(t, newItem["pk"], itemResponseBody["pk"])
updatedDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
assert.Equal(t, newItem["id"], updatedDoc["id"])
assert.Equal(t, newItem["pk"], updatedDoc["pk"])
})
t.Run("Should execute READ transactional batch", func(t *testing.T) {
context := context.TODO()
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
batch.ReadItem("67890", nil)
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
assert.Nil(t, err)
assert.True(t, response.Success)
assert.Equal(t, 1, len(response.OperationResults))
operationResponse := response.OperationResults[0]
assert.NotNil(t, operationResponse)
assert.NotNil(t, operationResponse.ResourceBody)
assert.Equal(t, int32(http.StatusOK), operationResponse.StatusCode)
var itemResponseBody map[string]interface{}
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
assert.Equal(t, "67890", itemResponseBody["id"])
})
}

View File

@@ -204,14 +204,18 @@ Cosmium strives to support the core features of Cosmos DB, including:
| IS_PRIMITIVE | Yes |
| IS_STRING | Yes |
### Document Batch Requests
### Transactional batch operations
Note: There's actually no transaction here. Think of this as a 'bulk operation' that can partially succeed.
| Operation | Implemented |
| --------- | ----------- |
| Create | No |
| Update | No |
| Delete | No |
| Read | No |
| Create | Yes |
| Delete | Yes |
| Replace | Yes |
| Upsert | Yes |
| Read | Yes |
| Patch | No |
## Known Differences

View File

@@ -50,6 +50,10 @@ func (r *DataRepository) DeleteCollection(databaseId string, collectionId string
}
delete(r.storeState.Collections[databaseId], collectionId)
delete(r.storeState.Documents[databaseId], collectionId)
delete(r.storeState.Triggers[databaseId], collectionId)
delete(r.storeState.StoredProcedures[databaseId], collectionId)
delete(r.storeState.UserDefinedFunctions[databaseId], collectionId)
return repositorymodels.StatusOk
}
@@ -71,7 +75,7 @@ func (r *DataRepository) CreateCollection(databaseId string, newCollection repos
newCollection = structhidrators.Hidrate(newCollection).(repositorymodels.Collection)
newCollection.TimeStamp = time.Now().Unix()
newCollection.ResourceID = resourceid.NewCombined(database.ResourceID, resourceid.New())
newCollection.ResourceID = resourceid.NewCombined(database.ResourceID, resourceid.New(resourceid.ResourceTypeCollection))
newCollection.ETag = fmt.Sprintf("\"%s\"", uuid.New())
newCollection.Self = fmt.Sprintf("dbs/%s/colls/%s/", database.ResourceID, newCollection.ResourceID)

View File

@@ -37,6 +37,11 @@ func (r *DataRepository) DeleteDatabase(id string) repositorymodels.RepositorySt
}
delete(r.storeState.Databases, id)
delete(r.storeState.Collections, id)
delete(r.storeState.Documents, id)
delete(r.storeState.Triggers, id)
delete(r.storeState.StoredProcedures, id)
delete(r.storeState.UserDefinedFunctions, id)
return repositorymodels.StatusOk
}
@@ -50,7 +55,7 @@ func (r *DataRepository) CreateDatabase(newDatabase repositorymodels.Database) (
}
newDatabase.TimeStamp = time.Now().Unix()
newDatabase.ResourceID = resourceid.New()
newDatabase.ResourceID = resourceid.New(resourceid.ResourceTypeDatabase)
newDatabase.ETag = fmt.Sprintf("\"%s\"", uuid.New())
newDatabase.Self = fmt.Sprintf("dbs/%s/", newDatabase.ResourceID)

View File

@@ -95,7 +95,7 @@ func (r *DataRepository) CreateDocument(databaseId string, collectionId string,
}
document["_ts"] = time.Now().Unix()
document["_rid"] = resourceid.NewCombined(database.ResourceID, collection.ResourceID, resourceid.New())
document["_rid"] = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeDocument))
document["_etag"] = fmt.Sprintf("\"%s\"", uuid.New())
document["_self"] = fmt.Sprintf("dbs/%s/colls/%s/docs/%s/", database.ResourceID, collection.ResourceID, document["_rid"])

View File

@@ -26,7 +26,7 @@ func (r *DataRepository) GetPartitionKeyRanges(databaseId string, collectionId s
timestamp = collection.TimeStamp
}
pkrResourceId := resourceid.NewCombined(databaseRid, collectionRid, resourceid.New())
pkrResourceId := resourceid.NewCombined(collectionRid, resourceid.New(resourceid.ResourceTypePartitionKeyRange))
pkrSelf := fmt.Sprintf("dbs/%s/colls/%s/pkranges/%s/", databaseRid, collectionRid, pkrResourceId)
etag := fmt.Sprintf("\"%s\"", uuid.New())

View File

@@ -81,7 +81,7 @@ func (r *DataRepository) CreateStoredProcedure(databaseId string, collectionId s
}
sp.TimeStamp = time.Now().Unix()
sp.ResourceID = resourceid.NewCombined(database.ResourceID, collection.ResourceID, resourceid.New())
sp.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeStoredProcedure))
sp.ETag = fmt.Sprintf("\"%s\"", uuid.New())
sp.Self = fmt.Sprintf("dbs/%s/colls/%s/sprocs/%s/", database.ResourceID, collection.ResourceID, sp.ResourceID)

View File

@@ -81,7 +81,7 @@ func (r *DataRepository) CreateTrigger(databaseId string, collectionId string, t
}
trigger.TimeStamp = time.Now().Unix()
trigger.ResourceID = resourceid.NewCombined(database.ResourceID, collection.ResourceID, resourceid.New())
trigger.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeTrigger))
trigger.ETag = fmt.Sprintf("\"%s\"", uuid.New())
trigger.Self = fmt.Sprintf("dbs/%s/colls/%s/triggers/%s/", database.ResourceID, collection.ResourceID, trigger.ResourceID)

View File

@@ -81,7 +81,7 @@ func (r *DataRepository) CreateUserDefinedFunction(databaseId string, collection
}
udf.TimeStamp = time.Now().Unix()
udf.ResourceID = resourceid.NewCombined(database.ResourceID, collection.ResourceID, resourceid.New())
udf.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeUserDefinedFunction))
udf.ETag = fmt.Sprintf("\"%s\"", uuid.New())
udf.Self = fmt.Sprintf("dbs/%s/colls/%s/udfs/%s/", database.ResourceID, collection.ResourceID, udf.ResourceID)

View File

@@ -3,32 +3,76 @@ package resourceid
import (
"encoding/base64"
"math/rand"
"strings"
"github.com/google/uuid"
)
func New() string {
id := uuid.New().ID()
idBytes := uintToBytes(id)
type ResourceType int
// first byte should be bigger than 0x80 for collection ids
// clients classify this id as "user" otherwise
if (idBytes[0] & 0x80) <= 0 {
idBytes[0] = byte(rand.Intn(0x80) + 0x80)
const (
ResourceTypeDatabase ResourceType = iota
ResourceTypeCollection
ResourceTypeDocument
ResourceTypeStoredProcedure
ResourceTypeTrigger
ResourceTypeUserDefinedFunction
ResourceTypeConflict
ResourceTypePartitionKeyRange
ResourceTypeSchema
)
func New(resourceType ResourceType) string {
var idBytes []byte
switch resourceType {
case ResourceTypeDatabase:
idBytes = randomBytes(4)
case ResourceTypeCollection:
idBytes = randomBytes(4)
// first byte should be bigger than 0x80 for collection ids
// clients classify this id as "user" otherwise
if (idBytes[0] & 0x80) <= 0 {
idBytes[0] = byte(rand.Intn(0x80) + 0x80)
}
case ResourceTypeDocument:
idBytes = randomBytes(8)
idBytes[7] = byte(rand.Intn(0x10)) // Upper 4 bits = 0
case ResourceTypeStoredProcedure:
idBytes = randomBytes(8)
idBytes[7] = byte(rand.Intn(0x10)) | 0x08 // Upper 4 bits = 0x08
case ResourceTypeTrigger:
idBytes = randomBytes(8)
idBytes[7] = byte(rand.Intn(0x10)) | 0x07 // Upper 4 bits = 0x07
case ResourceTypeUserDefinedFunction:
idBytes = randomBytes(8)
idBytes[7] = byte(rand.Intn(0x10)) | 0x06 // Upper 4 bits = 0x06
case ResourceTypeConflict:
idBytes = randomBytes(8)
idBytes[7] = byte(rand.Intn(0x10)) | 0x04 // Upper 4 bits = 0x04
case ResourceTypePartitionKeyRange:
// we don't do partitions yet, so just use a fixed id
idBytes = []byte{0x69, 0x69, 0x69, 0x69, 0x69, 0x69, 0x69, 0x50}
case ResourceTypeSchema:
idBytes = randomBytes(8)
idBytes[7] = byte(rand.Intn(0x10)) | 0x09 // Upper 4 bits = 0x09
default:
idBytes = randomBytes(4)
}
return base64.StdEncoding.EncodeToString(idBytes)
encoded := base64.StdEncoding.EncodeToString(idBytes)
return strings.ReplaceAll(encoded, "/", "-")
}
func NewCombined(ids ...string) string {
combinedIdBytes := make([]byte, 0)
for _, id := range ids {
idBytes, _ := base64.StdEncoding.DecodeString(id)
idBytes, _ := base64.StdEncoding.DecodeString(strings.ReplaceAll(id, "-", "/"))
combinedIdBytes = append(combinedIdBytes, idBytes...)
}
return base64.StdEncoding.EncodeToString(combinedIdBytes)
encoded := base64.StdEncoding.EncodeToString(combinedIdBytes)
return strings.ReplaceAll(encoded, "/", "-")
}
func uintToBytes(id uint32) []byte {
@@ -39,3 +83,13 @@ func uintToBytes(id uint32) []byte {
return buf
}
func randomBytes(count int) []byte {
buf := make([]byte, count)
for i := 0; i < count; i += 4 {
id := uuid.New().ID()
idBytes := uintToBytes(id)
copy(buf[i:], idBytes)
}
return buf
}

View File

@@ -60,6 +60,15 @@ func ExecuteQuery(query parsers.SelectStmt, documents []RowType) []RowType {
projectedDocuments = deduplicate(projectedDocuments)
}
// Apply offset
if query.Offset > 0 {
if query.Offset < len(projectedDocuments) {
projectedDocuments = projectedDocuments[query.Offset:]
} else {
projectedDocuments = []RowType{}
}
}
// Apply result limit
if query.Count > 0 && len(projectedDocuments) > query.Count {
projectedDocuments = projectedDocuments[:query.Count]

View File

@@ -10,10 +10,10 @@ import (
func Test_Execute_Select(t *testing.T) {
mockData := []memoryexecutor.RowType{
map[string]interface{}{"id": "12345", "pk": 123, "_self": "self1", "_rid": "rid1", "_ts": 123456, "isCool": false},
map[string]interface{}{"id": "67890", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true},
map[string]interface{}{"id": "456", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true},
map[string]interface{}{"id": "123", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true},
map[string]interface{}{"id": "12345", "pk": 123, "_self": "self1", "_rid": "rid1", "_ts": 123456, "isCool": false, "order": 1},
map[string]interface{}{"id": "67890", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true, "order": 2},
map[string]interface{}{"id": "456", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true, "order": 3},
map[string]interface{}{"id": "123", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true, "order": 4},
}
t.Run("Should execute simple SELECT", func(t *testing.T) {
@@ -108,15 +108,15 @@ func Test_Execute_Select(t *testing.T) {
Offset: 1,
OrderExpressions: []parsers.OrderExpression{
{
SelectItem: parsers.SelectItem{Path: []string{"c", "id"}},
SelectItem: parsers.SelectItem{Path: []string{"c", "order"}},
Direction: parsers.OrderDirectionDesc,
},
},
},
mockData,
[]memoryexecutor.RowType{
map[string]interface{}{"id": "67890", "pk": 456},
map[string]interface{}{"id": "456", "pk": 456},
map[string]interface{}{"id": "67890", "pk": 456},
},
)
})

View File

@@ -3,6 +3,7 @@ package main
import "C"
import (
"encoding/json"
"strings"
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
)
@@ -20,7 +21,7 @@ func CreateCollection(serverName *C.char, databaseId *C.char, collectionJson *C.
}
var collection repositorymodels.Collection
err := json.Unmarshal([]byte(collectionStr), &collection)
err := json.NewDecoder(strings.NewReader(collectionStr)).Decode(&collection)
if err != nil {
return ResponseFailedToParseRequest
}

View File

@@ -3,6 +3,7 @@ package main
import "C"
import (
"encoding/json"
"strings"
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
)
@@ -19,7 +20,7 @@ func CreateDatabase(serverName *C.char, databaseJson *C.char) int {
}
var database repositorymodels.Database
err := json.Unmarshal([]byte(databaseStr), &database)
err := json.NewDecoder(strings.NewReader(databaseStr)).Decode(&database)
if err != nil {
return ResponseFailedToParseRequest
}

View File

@@ -3,6 +3,7 @@ package main
import "C"
import (
"encoding/json"
"strings"
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
)
@@ -21,7 +22,7 @@ func CreateDocument(serverName *C.char, databaseId *C.char, collectionId *C.char
}
var document repositorymodels.Document
err := json.Unmarshal([]byte(documentStr), &document)
err := json.NewDecoder(strings.NewReader(documentStr)).Decode(&document)
if err != nil {
return ResponseFailedToParseRequest
}

View File

@@ -13,8 +13,10 @@ type ServerInstance struct {
repository *repositories.DataRepository
}
var serverInstances map[string]*ServerInstance
var mutex sync.Mutex
var (
serverInstances = make(map[string]*ServerInstance)
mutex = sync.Mutex{}
)
const (
ResponseSuccess = 0
@@ -36,10 +38,6 @@ func getInstance(serverName string) (*ServerInstance, bool) {
mutex.Lock()
defer mutex.Unlock()
if serverInstances == nil {
serverInstances = make(map[string]*ServerInstance)
}
var ok bool
var serverInstance *ServerInstance
if serverInstance, ok = serverInstances[serverName]; !ok {
@@ -53,10 +51,6 @@ func addInstance(serverName string, serverInstance *ServerInstance) {
mutex.Lock()
defer mutex.Unlock()
if serverInstances == nil {
serverInstances = make(map[string]*ServerInstance)
}
serverInstances[serverName] = serverInstance
}
@@ -64,10 +58,6 @@ func removeInstance(serverName string) {
mutex.Lock()
defer mutex.Unlock()
if serverInstances == nil {
return
}
delete(serverInstances, serverName)
}

View File

@@ -6,6 +6,7 @@ package main
import "C"
import (
"encoding/json"
"strings"
"unsafe"
"github.com/pikami/cosmium/api"
@@ -15,21 +16,21 @@ import (
//export CreateServerInstance
func CreateServerInstance(serverName *C.char, configurationJSON *C.char) int {
configStr := C.GoString(configurationJSON)
serverNameStr := C.GoString(serverName)
configStr := C.GoString(configurationJSON)
if _, ok := getInstance(serverNameStr); ok {
return ResponseServerInstanceAlreadyExists
}
var configuration config.ServerConfig
err := json.Unmarshal([]byte(configStr), &configuration)
err := json.NewDecoder(strings.NewReader(configStr)).Decode(&configuration)
if err != nil {
return ResponseFailedToParseConfiguration
}
configuration.PopulateCalculatedFields()
configuration.ApplyDefaultsToEmptyFields()
configuration.PopulateCalculatedFields()
repository := repositories.NewDataRepository(repositories.RepositoryOptions{
InitialDataFilePath: configuration.InitialDataFilePath,