mirror of
https://github.com/pikami/cosmium.git
synced 2026-01-12 14:05:15 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1cf5ae92f4 | ||
|
|
5d99b653cc | ||
|
|
787cdb33cf | ||
|
|
5caa829ac1 |
24
api/api_models/models.go
Normal file
24
api/api_models/models.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package apimodels
|
||||
|
||||
const (
|
||||
BatchOperationTypeCreate = "Create"
|
||||
BatchOperationTypeDelete = "Delete"
|
||||
BatchOperationTypeReplace = "Replace"
|
||||
BatchOperationTypeUpsert = "Upsert"
|
||||
BatchOperationTypeRead = "Read"
|
||||
BatchOperationTypePatch = "Patch"
|
||||
)
|
||||
|
||||
type BatchOperation struct {
|
||||
OperationType string `json:"operationType"`
|
||||
Id string `json:"id"`
|
||||
ResourceBody map[string]interface{} `json:"resourceBody"`
|
||||
}
|
||||
|
||||
type BatchOperationResult struct {
|
||||
StatusCode int `json:"statusCode"`
|
||||
RequestCharge float64 `json:"requestCharge"`
|
||||
ResourceBody map[string]interface{} `json:"resourceBody"`
|
||||
Etag string `json:"etag"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
jsonpatch "github.com/cosmiumdev/json-patch/v5"
|
||||
"github.com/gin-gonic/gin"
|
||||
apimodels "github.com/pikami/cosmium/api/api_models"
|
||||
"github.com/pikami/cosmium/internal/constants"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
|
||||
@@ -183,6 +184,13 @@ func (h *Handlers) DocumentsPost(c *gin.Context) {
|
||||
databaseId := c.Param("databaseId")
|
||||
collectionId := c.Param("collId")
|
||||
|
||||
// Handle batch requests
|
||||
isBatchRequest, _ := strconv.ParseBool(c.GetHeader("x-ms-cosmos-is-batch-request"))
|
||||
if isBatchRequest {
|
||||
h.handleBatchRequest(c)
|
||||
return
|
||||
}
|
||||
|
||||
var requestBody map[string]interface{}
|
||||
if err := c.BindJSON(&requestBody); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()})
|
||||
@@ -191,30 +199,7 @@ func (h *Handlers) DocumentsPost(c *gin.Context) {
|
||||
|
||||
query := requestBody["query"]
|
||||
if query != nil {
|
||||
if c.GetHeader("x-ms-cosmos-is-query-plan-request") != "" {
|
||||
c.IndentedJSON(http.StatusOK, constants.QueryPlanResponse)
|
||||
return
|
||||
}
|
||||
|
||||
var queryParameters map[string]interface{}
|
||||
if paramsArray, ok := requestBody["parameters"].([]interface{}); ok {
|
||||
queryParameters = parametersToMap(paramsArray)
|
||||
}
|
||||
|
||||
docs, status := h.repository.ExecuteQueryDocuments(databaseId, collectionId, query.(string), queryParameters)
|
||||
if status != repositorymodels.StatusOk {
|
||||
// TODO: Currently we return everything if the query fails
|
||||
h.GetAllDocuments(c)
|
||||
return
|
||||
}
|
||||
|
||||
collection, _ := h.repository.GetCollection(databaseId, collectionId)
|
||||
c.Header("x-ms-item-count", fmt.Sprintf("%d", len(docs)))
|
||||
c.IndentedJSON(http.StatusOK, gin.H{
|
||||
"_rid": collection.ResourceID,
|
||||
"Documents": docs,
|
||||
"_count": len(docs),
|
||||
})
|
||||
h.handleDocumentQuery(c, requestBody)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -253,3 +238,131 @@ func parametersToMap(pairs []interface{}) map[string]interface{} {
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (h *Handlers) handleDocumentQuery(c *gin.Context, requestBody map[string]interface{}) {
|
||||
databaseId := c.Param("databaseId")
|
||||
collectionId := c.Param("collId")
|
||||
|
||||
if c.GetHeader("x-ms-cosmos-is-query-plan-request") != "" {
|
||||
c.IndentedJSON(http.StatusOK, constants.QueryPlanResponse)
|
||||
return
|
||||
}
|
||||
|
||||
var queryParameters map[string]interface{}
|
||||
if paramsArray, ok := requestBody["parameters"].([]interface{}); ok {
|
||||
queryParameters = parametersToMap(paramsArray)
|
||||
}
|
||||
|
||||
docs, status := h.repository.ExecuteQueryDocuments(databaseId, collectionId, requestBody["query"].(string), queryParameters)
|
||||
if status != repositorymodels.StatusOk {
|
||||
// TODO: Currently we return everything if the query fails
|
||||
h.GetAllDocuments(c)
|
||||
return
|
||||
}
|
||||
|
||||
collection, _ := h.repository.GetCollection(databaseId, collectionId)
|
||||
c.Header("x-ms-item-count", fmt.Sprintf("%d", len(docs)))
|
||||
c.IndentedJSON(http.StatusOK, gin.H{
|
||||
"_rid": collection.ResourceID,
|
||||
"Documents": docs,
|
||||
"_count": len(docs),
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handlers) handleBatchRequest(c *gin.Context) {
|
||||
databaseId := c.Param("databaseId")
|
||||
collectionId := c.Param("collId")
|
||||
|
||||
batchOperations := make([]apimodels.BatchOperation, 0)
|
||||
if err := c.BindJSON(&batchOperations); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
batchOperationResults := make([]apimodels.BatchOperationResult, len(batchOperations))
|
||||
for idx, operation := range batchOperations {
|
||||
switch operation.OperationType {
|
||||
case apimodels.BatchOperationTypeCreate:
|
||||
createdDocument, status := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody)
|
||||
responseCode := repositoryStatusToResponseCode(status)
|
||||
if status == repositorymodels.StatusOk {
|
||||
responseCode = http.StatusCreated
|
||||
}
|
||||
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||
StatusCode: responseCode,
|
||||
ResourceBody: createdDocument,
|
||||
}
|
||||
case apimodels.BatchOperationTypeDelete:
|
||||
status := h.repository.DeleteDocument(databaseId, collectionId, operation.Id)
|
||||
responseCode := repositoryStatusToResponseCode(status)
|
||||
if status == repositorymodels.StatusOk {
|
||||
responseCode = http.StatusNoContent
|
||||
}
|
||||
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||
StatusCode: responseCode,
|
||||
}
|
||||
case apimodels.BatchOperationTypeReplace:
|
||||
deleteStatus := h.repository.DeleteDocument(databaseId, collectionId, operation.Id)
|
||||
if deleteStatus == repositorymodels.StatusNotFound {
|
||||
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||
StatusCode: http.StatusNotFound,
|
||||
}
|
||||
continue
|
||||
}
|
||||
createdDocument, createStatus := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody)
|
||||
responseCode := repositoryStatusToResponseCode(createStatus)
|
||||
if createStatus == repositorymodels.StatusOk {
|
||||
responseCode = http.StatusCreated
|
||||
}
|
||||
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||
StatusCode: responseCode,
|
||||
ResourceBody: createdDocument,
|
||||
}
|
||||
case apimodels.BatchOperationTypeUpsert:
|
||||
documentId := operation.ResourceBody["id"].(string)
|
||||
h.repository.DeleteDocument(databaseId, collectionId, documentId)
|
||||
createdDocument, createStatus := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody)
|
||||
responseCode := repositoryStatusToResponseCode(createStatus)
|
||||
if createStatus == repositorymodels.StatusOk {
|
||||
responseCode = http.StatusCreated
|
||||
}
|
||||
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||
StatusCode: responseCode,
|
||||
ResourceBody: createdDocument,
|
||||
}
|
||||
case apimodels.BatchOperationTypeRead:
|
||||
document, status := h.repository.GetDocument(databaseId, collectionId, operation.Id)
|
||||
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||
StatusCode: repositoryStatusToResponseCode(status),
|
||||
ResourceBody: document,
|
||||
}
|
||||
case apimodels.BatchOperationTypePatch:
|
||||
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||
StatusCode: http.StatusNotImplemented,
|
||||
Message: "Patch operation is not implemented",
|
||||
}
|
||||
default:
|
||||
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||
StatusCode: http.StatusBadRequest,
|
||||
Message: "Unknown operation type",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, batchOperationResults)
|
||||
}
|
||||
|
||||
func repositoryStatusToResponseCode(status repositorymodels.RepositoryStatus) int {
|
||||
switch status {
|
||||
case repositorymodels.StatusOk:
|
||||
return http.StatusOK
|
||||
case repositorymodels.StatusNotFound:
|
||||
return http.StatusNotFound
|
||||
case repositorymodels.Conflict:
|
||||
return http.StatusConflict
|
||||
case repositorymodels.BadRequest:
|
||||
return http.StatusBadRequest
|
||||
default:
|
||||
return http.StatusInternalServerError
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,8 +75,7 @@ func requestToResourceId(c *gin.Context) string {
|
||||
|
||||
isFeed := c.Request.Header.Get("A-Im") == "Incremental Feed"
|
||||
if resourceType == "pkranges" && isFeed {
|
||||
// CosmosSDK replaces '/' with '-' in resource id requests
|
||||
resourceId = strings.Replace(collId, "-", "/", -1)
|
||||
resourceId = collId
|
||||
}
|
||||
|
||||
return resourceId
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
|
||||
"github.com/pikami/cosmium/internal/resourceid"
|
||||
)
|
||||
|
||||
func (h *Handlers) GetPartitionKeyRanges(c *gin.Context) {
|
||||
@@ -31,8 +32,9 @@ func (h *Handlers) GetPartitionKeyRanges(c *gin.Context) {
|
||||
collectionRid = collection.ResourceID
|
||||
}
|
||||
|
||||
rid := resourceid.NewCombined(collectionRid, resourceid.New(resourceid.ResourceTypePartitionKeyRange))
|
||||
c.IndentedJSON(http.StatusOK, gin.H{
|
||||
"_rid": collectionRid,
|
||||
"_rid": rid,
|
||||
"_count": len(partitionKeyRanges),
|
||||
"PartitionKeyRanges": partitionKeyRanges,
|
||||
})
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/pikami/cosmium/api"
|
||||
"github.com/pikami/cosmium/api/config"
|
||||
"github.com/pikami/cosmium/internal/logger"
|
||||
"github.com/pikami/cosmium/internal/repositories"
|
||||
)
|
||||
|
||||
@@ -35,6 +36,9 @@ func runTestServer() *TestServer {
|
||||
ExplorerBaseUrlLocation: config.ExplorerBaseUrlLocation,
|
||||
}
|
||||
|
||||
config.LogLevel = "debug"
|
||||
logger.SetLogLevel(logger.LogLevelDebug)
|
||||
|
||||
return runTestServerCustomConfig(config)
|
||||
}
|
||||
|
||||
|
||||
@@ -377,5 +377,140 @@ func Test_Documents_Patch(t *testing.T) {
|
||||
assert.NotNil(t, r)
|
||||
assert.Nil(t, err2)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func Test_Documents_TransactionalBatch(t *testing.T) {
|
||||
ts, collectionClient := documents_InitializeDb(t)
|
||||
defer ts.Server.Close()
|
||||
|
||||
t.Run("Should execute CREATE transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
newItem := map[string]interface{}{
|
||||
"id": "678901",
|
||||
}
|
||||
bytes, err := json.Marshal(newItem)
|
||||
assert.Nil(t, err)
|
||||
|
||||
batch.CreateItem(bytes, nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||
|
||||
createdDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||
assert.Equal(t, newItem["id"], createdDoc["id"])
|
||||
})
|
||||
|
||||
t.Run("Should execute DELETE transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
batch.DeleteItem("12345", nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.Equal(t, int32(http.StatusNoContent), operationResponse.StatusCode)
|
||||
|
||||
_, status := ts.Repository.GetDocument(testDatabaseName, testCollectionName, "12345")
|
||||
assert.Equal(t, repositorymodels.StatusNotFound, int(status))
|
||||
})
|
||||
|
||||
t.Run("Should execute REPLACE transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
newItem := map[string]interface{}{
|
||||
"id": "67890",
|
||||
"pk": "666",
|
||||
}
|
||||
bytes, err := json.Marshal(newItem)
|
||||
assert.Nil(t, err)
|
||||
|
||||
batch.ReplaceItem("67890", bytes, nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||
assert.Equal(t, newItem["pk"], itemResponseBody["pk"])
|
||||
|
||||
updatedDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||
assert.Equal(t, newItem["id"], updatedDoc["id"])
|
||||
assert.Equal(t, newItem["pk"], updatedDoc["pk"])
|
||||
})
|
||||
|
||||
t.Run("Should execute UPSERT transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
newItem := map[string]interface{}{
|
||||
"id": "678901",
|
||||
"pk": "666",
|
||||
}
|
||||
bytes, err := json.Marshal(newItem)
|
||||
assert.Nil(t, err)
|
||||
|
||||
batch.UpsertItem(bytes, nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||
assert.Equal(t, newItem["pk"], itemResponseBody["pk"])
|
||||
|
||||
updatedDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||
assert.Equal(t, newItem["id"], updatedDoc["id"])
|
||||
assert.Equal(t, newItem["pk"], updatedDoc["pk"])
|
||||
})
|
||||
|
||||
t.Run("Should execute READ transactional batch", func(t *testing.T) {
|
||||
context := context.TODO()
|
||||
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||
|
||||
batch.ReadItem("67890", nil)
|
||||
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, response.Success)
|
||||
assert.Equal(t, 1, len(response.OperationResults))
|
||||
|
||||
operationResponse := response.OperationResults[0]
|
||||
assert.NotNil(t, operationResponse)
|
||||
assert.NotNil(t, operationResponse.ResourceBody)
|
||||
assert.Equal(t, int32(http.StatusOK), operationResponse.StatusCode)
|
||||
|
||||
var itemResponseBody map[string]interface{}
|
||||
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||
assert.Equal(t, "67890", itemResponseBody["id"])
|
||||
})
|
||||
}
|
||||
|
||||
@@ -204,14 +204,18 @@ Cosmium strives to support the core features of Cosmos DB, including:
|
||||
| IS_PRIMITIVE | Yes |
|
||||
| IS_STRING | Yes |
|
||||
|
||||
### Document Batch Requests
|
||||
### Transactional batch operations
|
||||
|
||||
Note: There's actually no transaction here. Think of this as a 'bulk operation' that can partially succeed.
|
||||
|
||||
| Operation | Implemented |
|
||||
| --------- | ----------- |
|
||||
| Create | No |
|
||||
| Update | No |
|
||||
| Delete | No |
|
||||
| Read | No |
|
||||
| Create | Yes |
|
||||
| Delete | Yes |
|
||||
| Replace | Yes |
|
||||
| Upsert | Yes |
|
||||
| Read | Yes |
|
||||
| Patch | No |
|
||||
|
||||
## Known Differences
|
||||
|
||||
|
||||
@@ -50,6 +50,10 @@ func (r *DataRepository) DeleteCollection(databaseId string, collectionId string
|
||||
}
|
||||
|
||||
delete(r.storeState.Collections[databaseId], collectionId)
|
||||
delete(r.storeState.Documents[databaseId], collectionId)
|
||||
delete(r.storeState.Triggers[databaseId], collectionId)
|
||||
delete(r.storeState.StoredProcedures[databaseId], collectionId)
|
||||
delete(r.storeState.UserDefinedFunctions[databaseId], collectionId)
|
||||
|
||||
return repositorymodels.StatusOk
|
||||
}
|
||||
@@ -71,7 +75,7 @@ func (r *DataRepository) CreateCollection(databaseId string, newCollection repos
|
||||
newCollection = structhidrators.Hidrate(newCollection).(repositorymodels.Collection)
|
||||
|
||||
newCollection.TimeStamp = time.Now().Unix()
|
||||
newCollection.ResourceID = resourceid.NewCombined(database.ResourceID, resourceid.New())
|
||||
newCollection.ResourceID = resourceid.NewCombined(database.ResourceID, resourceid.New(resourceid.ResourceTypeCollection))
|
||||
newCollection.ETag = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
newCollection.Self = fmt.Sprintf("dbs/%s/colls/%s/", database.ResourceID, newCollection.ResourceID)
|
||||
|
||||
|
||||
@@ -37,6 +37,11 @@ func (r *DataRepository) DeleteDatabase(id string) repositorymodels.RepositorySt
|
||||
}
|
||||
|
||||
delete(r.storeState.Databases, id)
|
||||
delete(r.storeState.Collections, id)
|
||||
delete(r.storeState.Documents, id)
|
||||
delete(r.storeState.Triggers, id)
|
||||
delete(r.storeState.StoredProcedures, id)
|
||||
delete(r.storeState.UserDefinedFunctions, id)
|
||||
|
||||
return repositorymodels.StatusOk
|
||||
}
|
||||
@@ -50,7 +55,7 @@ func (r *DataRepository) CreateDatabase(newDatabase repositorymodels.Database) (
|
||||
}
|
||||
|
||||
newDatabase.TimeStamp = time.Now().Unix()
|
||||
newDatabase.ResourceID = resourceid.New()
|
||||
newDatabase.ResourceID = resourceid.New(resourceid.ResourceTypeDatabase)
|
||||
newDatabase.ETag = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
newDatabase.Self = fmt.Sprintf("dbs/%s/", newDatabase.ResourceID)
|
||||
|
||||
|
||||
@@ -95,7 +95,7 @@ func (r *DataRepository) CreateDocument(databaseId string, collectionId string,
|
||||
}
|
||||
|
||||
document["_ts"] = time.Now().Unix()
|
||||
document["_rid"] = resourceid.NewCombined(database.ResourceID, collection.ResourceID, resourceid.New())
|
||||
document["_rid"] = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeDocument))
|
||||
document["_etag"] = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
document["_self"] = fmt.Sprintf("dbs/%s/colls/%s/docs/%s/", database.ResourceID, collection.ResourceID, document["_rid"])
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ func (r *DataRepository) GetPartitionKeyRanges(databaseId string, collectionId s
|
||||
timestamp = collection.TimeStamp
|
||||
}
|
||||
|
||||
pkrResourceId := resourceid.NewCombined(databaseRid, collectionRid, resourceid.New())
|
||||
pkrResourceId := resourceid.NewCombined(collectionRid, resourceid.New(resourceid.ResourceTypePartitionKeyRange))
|
||||
pkrSelf := fmt.Sprintf("dbs/%s/colls/%s/pkranges/%s/", databaseRid, collectionRid, pkrResourceId)
|
||||
etag := fmt.Sprintf("\"%s\"", uuid.New())
|
||||
|
||||
|
||||
@@ -81,7 +81,7 @@ func (r *DataRepository) CreateStoredProcedure(databaseId string, collectionId s
|
||||
}
|
||||
|
||||
sp.TimeStamp = time.Now().Unix()
|
||||
sp.ResourceID = resourceid.NewCombined(database.ResourceID, collection.ResourceID, resourceid.New())
|
||||
sp.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeStoredProcedure))
|
||||
sp.ETag = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
sp.Self = fmt.Sprintf("dbs/%s/colls/%s/sprocs/%s/", database.ResourceID, collection.ResourceID, sp.ResourceID)
|
||||
|
||||
|
||||
@@ -81,7 +81,7 @@ func (r *DataRepository) CreateTrigger(databaseId string, collectionId string, t
|
||||
}
|
||||
|
||||
trigger.TimeStamp = time.Now().Unix()
|
||||
trigger.ResourceID = resourceid.NewCombined(database.ResourceID, collection.ResourceID, resourceid.New())
|
||||
trigger.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeTrigger))
|
||||
trigger.ETag = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
trigger.Self = fmt.Sprintf("dbs/%s/colls/%s/triggers/%s/", database.ResourceID, collection.ResourceID, trigger.ResourceID)
|
||||
|
||||
|
||||
@@ -81,7 +81,7 @@ func (r *DataRepository) CreateUserDefinedFunction(databaseId string, collection
|
||||
}
|
||||
|
||||
udf.TimeStamp = time.Now().Unix()
|
||||
udf.ResourceID = resourceid.NewCombined(database.ResourceID, collection.ResourceID, resourceid.New())
|
||||
udf.ResourceID = resourceid.NewCombined(collection.ResourceID, resourceid.New(resourceid.ResourceTypeUserDefinedFunction))
|
||||
udf.ETag = fmt.Sprintf("\"%s\"", uuid.New())
|
||||
udf.Self = fmt.Sprintf("dbs/%s/colls/%s/udfs/%s/", database.ResourceID, collection.ResourceID, udf.ResourceID)
|
||||
|
||||
|
||||
@@ -3,32 +3,76 @@ package resourceid
|
||||
import (
|
||||
"encoding/base64"
|
||||
"math/rand"
|
||||
"strings"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func New() string {
|
||||
id := uuid.New().ID()
|
||||
idBytes := uintToBytes(id)
|
||||
type ResourceType int
|
||||
|
||||
// first byte should be bigger than 0x80 for collection ids
|
||||
// clients classify this id as "user" otherwise
|
||||
if (idBytes[0] & 0x80) <= 0 {
|
||||
idBytes[0] = byte(rand.Intn(0x80) + 0x80)
|
||||
const (
|
||||
ResourceTypeDatabase ResourceType = iota
|
||||
ResourceTypeCollection
|
||||
ResourceTypeDocument
|
||||
ResourceTypeStoredProcedure
|
||||
ResourceTypeTrigger
|
||||
ResourceTypeUserDefinedFunction
|
||||
ResourceTypeConflict
|
||||
ResourceTypePartitionKeyRange
|
||||
ResourceTypeSchema
|
||||
)
|
||||
|
||||
func New(resourceType ResourceType) string {
|
||||
var idBytes []byte
|
||||
switch resourceType {
|
||||
case ResourceTypeDatabase:
|
||||
idBytes = randomBytes(4)
|
||||
case ResourceTypeCollection:
|
||||
idBytes = randomBytes(4)
|
||||
// first byte should be bigger than 0x80 for collection ids
|
||||
// clients classify this id as "user" otherwise
|
||||
if (idBytes[0] & 0x80) <= 0 {
|
||||
idBytes[0] = byte(rand.Intn(0x80) + 0x80)
|
||||
}
|
||||
case ResourceTypeDocument:
|
||||
idBytes = randomBytes(8)
|
||||
idBytes[7] = byte(rand.Intn(0x10)) // Upper 4 bits = 0
|
||||
case ResourceTypeStoredProcedure:
|
||||
idBytes = randomBytes(8)
|
||||
idBytes[7] = byte(rand.Intn(0x10)) | 0x08 // Upper 4 bits = 0x08
|
||||
case ResourceTypeTrigger:
|
||||
idBytes = randomBytes(8)
|
||||
idBytes[7] = byte(rand.Intn(0x10)) | 0x07 // Upper 4 bits = 0x07
|
||||
case ResourceTypeUserDefinedFunction:
|
||||
idBytes = randomBytes(8)
|
||||
idBytes[7] = byte(rand.Intn(0x10)) | 0x06 // Upper 4 bits = 0x06
|
||||
case ResourceTypeConflict:
|
||||
idBytes = randomBytes(8)
|
||||
idBytes[7] = byte(rand.Intn(0x10)) | 0x04 // Upper 4 bits = 0x04
|
||||
case ResourceTypePartitionKeyRange:
|
||||
// we don't do partitions yet, so just use a fixed id
|
||||
idBytes = []byte{0x69, 0x69, 0x69, 0x69, 0x69, 0x69, 0x69, 0x50}
|
||||
case ResourceTypeSchema:
|
||||
idBytes = randomBytes(8)
|
||||
idBytes[7] = byte(rand.Intn(0x10)) | 0x09 // Upper 4 bits = 0x09
|
||||
default:
|
||||
idBytes = randomBytes(4)
|
||||
}
|
||||
|
||||
return base64.StdEncoding.EncodeToString(idBytes)
|
||||
encoded := base64.StdEncoding.EncodeToString(idBytes)
|
||||
return strings.ReplaceAll(encoded, "/", "-")
|
||||
}
|
||||
|
||||
func NewCombined(ids ...string) string {
|
||||
combinedIdBytes := make([]byte, 0)
|
||||
|
||||
for _, id := range ids {
|
||||
idBytes, _ := base64.StdEncoding.DecodeString(id)
|
||||
idBytes, _ := base64.StdEncoding.DecodeString(strings.ReplaceAll(id, "-", "/"))
|
||||
combinedIdBytes = append(combinedIdBytes, idBytes...)
|
||||
}
|
||||
|
||||
return base64.StdEncoding.EncodeToString(combinedIdBytes)
|
||||
encoded := base64.StdEncoding.EncodeToString(combinedIdBytes)
|
||||
return strings.ReplaceAll(encoded, "/", "-")
|
||||
}
|
||||
|
||||
func uintToBytes(id uint32) []byte {
|
||||
@@ -39,3 +83,13 @@ func uintToBytes(id uint32) []byte {
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
func randomBytes(count int) []byte {
|
||||
buf := make([]byte, count)
|
||||
for i := 0; i < count; i += 4 {
|
||||
id := uuid.New().ID()
|
||||
idBytes := uintToBytes(id)
|
||||
copy(buf[i:], idBytes)
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
@@ -60,6 +60,15 @@ func ExecuteQuery(query parsers.SelectStmt, documents []RowType) []RowType {
|
||||
projectedDocuments = deduplicate(projectedDocuments)
|
||||
}
|
||||
|
||||
// Apply offset
|
||||
if query.Offset > 0 {
|
||||
if query.Offset < len(projectedDocuments) {
|
||||
projectedDocuments = projectedDocuments[query.Offset:]
|
||||
} else {
|
||||
projectedDocuments = []RowType{}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply result limit
|
||||
if query.Count > 0 && len(projectedDocuments) > query.Count {
|
||||
projectedDocuments = projectedDocuments[:query.Count]
|
||||
|
||||
@@ -10,10 +10,10 @@ import (
|
||||
|
||||
func Test_Execute_Select(t *testing.T) {
|
||||
mockData := []memoryexecutor.RowType{
|
||||
map[string]interface{}{"id": "12345", "pk": 123, "_self": "self1", "_rid": "rid1", "_ts": 123456, "isCool": false},
|
||||
map[string]interface{}{"id": "67890", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true},
|
||||
map[string]interface{}{"id": "456", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true},
|
||||
map[string]interface{}{"id": "123", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true},
|
||||
map[string]interface{}{"id": "12345", "pk": 123, "_self": "self1", "_rid": "rid1", "_ts": 123456, "isCool": false, "order": 1},
|
||||
map[string]interface{}{"id": "67890", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true, "order": 2},
|
||||
map[string]interface{}{"id": "456", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true, "order": 3},
|
||||
map[string]interface{}{"id": "123", "pk": 456, "_self": "self2", "_rid": "rid2", "_ts": 789012, "isCool": true, "order": 4},
|
||||
}
|
||||
|
||||
t.Run("Should execute simple SELECT", func(t *testing.T) {
|
||||
@@ -108,15 +108,15 @@ func Test_Execute_Select(t *testing.T) {
|
||||
Offset: 1,
|
||||
OrderExpressions: []parsers.OrderExpression{
|
||||
{
|
||||
SelectItem: parsers.SelectItem{Path: []string{"c", "id"}},
|
||||
SelectItem: parsers.SelectItem{Path: []string{"c", "order"}},
|
||||
Direction: parsers.OrderDirectionDesc,
|
||||
},
|
||||
},
|
||||
},
|
||||
mockData,
|
||||
[]memoryexecutor.RowType{
|
||||
map[string]interface{}{"id": "67890", "pk": 456},
|
||||
map[string]interface{}{"id": "456", "pk": 456},
|
||||
map[string]interface{}{"id": "67890", "pk": 456},
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
@@ -3,6 +3,7 @@ package main
|
||||
import "C"
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
|
||||
)
|
||||
@@ -20,7 +21,7 @@ func CreateCollection(serverName *C.char, databaseId *C.char, collectionJson *C.
|
||||
}
|
||||
|
||||
var collection repositorymodels.Collection
|
||||
err := json.Unmarshal([]byte(collectionStr), &collection)
|
||||
err := json.NewDecoder(strings.NewReader(collectionStr)).Decode(&collection)
|
||||
if err != nil {
|
||||
return ResponseFailedToParseRequest
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package main
|
||||
import "C"
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
|
||||
)
|
||||
@@ -19,7 +20,7 @@ func CreateDatabase(serverName *C.char, databaseJson *C.char) int {
|
||||
}
|
||||
|
||||
var database repositorymodels.Database
|
||||
err := json.Unmarshal([]byte(databaseStr), &database)
|
||||
err := json.NewDecoder(strings.NewReader(databaseStr)).Decode(&database)
|
||||
if err != nil {
|
||||
return ResponseFailedToParseRequest
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package main
|
||||
import "C"
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
|
||||
)
|
||||
@@ -21,7 +22,7 @@ func CreateDocument(serverName *C.char, databaseId *C.char, collectionId *C.char
|
||||
}
|
||||
|
||||
var document repositorymodels.Document
|
||||
err := json.Unmarshal([]byte(documentStr), &document)
|
||||
err := json.NewDecoder(strings.NewReader(documentStr)).Decode(&document)
|
||||
if err != nil {
|
||||
return ResponseFailedToParseRequest
|
||||
}
|
||||
|
||||
@@ -13,8 +13,10 @@ type ServerInstance struct {
|
||||
repository *repositories.DataRepository
|
||||
}
|
||||
|
||||
var serverInstances map[string]*ServerInstance
|
||||
var mutex sync.Mutex
|
||||
var (
|
||||
serverInstances = make(map[string]*ServerInstance)
|
||||
mutex = sync.Mutex{}
|
||||
)
|
||||
|
||||
const (
|
||||
ResponseSuccess = 0
|
||||
@@ -36,10 +38,6 @@ func getInstance(serverName string) (*ServerInstance, bool) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
|
||||
if serverInstances == nil {
|
||||
serverInstances = make(map[string]*ServerInstance)
|
||||
}
|
||||
|
||||
var ok bool
|
||||
var serverInstance *ServerInstance
|
||||
if serverInstance, ok = serverInstances[serverName]; !ok {
|
||||
@@ -53,10 +51,6 @@ func addInstance(serverName string, serverInstance *ServerInstance) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
|
||||
if serverInstances == nil {
|
||||
serverInstances = make(map[string]*ServerInstance)
|
||||
}
|
||||
|
||||
serverInstances[serverName] = serverInstance
|
||||
}
|
||||
|
||||
@@ -64,10 +58,6 @@ func removeInstance(serverName string) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
|
||||
if serverInstances == nil {
|
||||
return
|
||||
}
|
||||
|
||||
delete(serverInstances, serverName)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ package main
|
||||
import "C"
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pikami/cosmium/api"
|
||||
@@ -15,21 +16,21 @@ import (
|
||||
|
||||
//export CreateServerInstance
|
||||
func CreateServerInstance(serverName *C.char, configurationJSON *C.char) int {
|
||||
configStr := C.GoString(configurationJSON)
|
||||
serverNameStr := C.GoString(serverName)
|
||||
configStr := C.GoString(configurationJSON)
|
||||
|
||||
if _, ok := getInstance(serverNameStr); ok {
|
||||
return ResponseServerInstanceAlreadyExists
|
||||
}
|
||||
|
||||
var configuration config.ServerConfig
|
||||
err := json.Unmarshal([]byte(configStr), &configuration)
|
||||
err := json.NewDecoder(strings.NewReader(configStr)).Decode(&configuration)
|
||||
if err != nil {
|
||||
return ResponseFailedToParseConfiguration
|
||||
}
|
||||
|
||||
configuration.PopulateCalculatedFields()
|
||||
configuration.ApplyDefaultsToEmptyFields()
|
||||
configuration.PopulateCalculatedFields()
|
||||
|
||||
repository := repositories.NewDataRepository(repositories.RepositoryOptions{
|
||||
InitialDataFilePath: configuration.InitialDataFilePath,
|
||||
|
||||
Reference in New Issue
Block a user