mirror of
https://github.com/pikami/cosmium.git
synced 2025-03-13 05:15:52 +00:00
Implement 'Transactional batch operations'
This commit is contained in:
parent
887d456ad4
commit
5caa829ac1
24
api/api_models/models.go
Normal file
24
api/api_models/models.go
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
package apimodels
|
||||||
|
|
||||||
|
const (
|
||||||
|
BatchOperationTypeCreate = "Create"
|
||||||
|
BatchOperationTypeDelete = "Delete"
|
||||||
|
BatchOperationTypeReplace = "Replace"
|
||||||
|
BatchOperationTypeUpsert = "Upsert"
|
||||||
|
BatchOperationTypeRead = "Read"
|
||||||
|
BatchOperationTypePatch = "Patch"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BatchOperation struct {
|
||||||
|
OperationType string `json:"operationType"`
|
||||||
|
Id string `json:"id"`
|
||||||
|
ResourceBody map[string]interface{} `json:"resourceBody"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type BatchOperationResult struct {
|
||||||
|
StatusCode int `json:"statusCode"`
|
||||||
|
RequestCharge float64 `json:"requestCharge"`
|
||||||
|
ResourceBody map[string]interface{} `json:"resourceBody"`
|
||||||
|
Etag string `json:"etag"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
jsonpatch "github.com/cosmiumdev/json-patch/v5"
|
jsonpatch "github.com/cosmiumdev/json-patch/v5"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
apimodels "github.com/pikami/cosmium/api/api_models"
|
||||||
"github.com/pikami/cosmium/internal/constants"
|
"github.com/pikami/cosmium/internal/constants"
|
||||||
"github.com/pikami/cosmium/internal/logger"
|
"github.com/pikami/cosmium/internal/logger"
|
||||||
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
|
repositorymodels "github.com/pikami/cosmium/internal/repository_models"
|
||||||
@ -183,6 +184,13 @@ func (h *Handlers) DocumentsPost(c *gin.Context) {
|
|||||||
databaseId := c.Param("databaseId")
|
databaseId := c.Param("databaseId")
|
||||||
collectionId := c.Param("collId")
|
collectionId := c.Param("collId")
|
||||||
|
|
||||||
|
// Handle batch requests
|
||||||
|
isBatchRequest, _ := strconv.ParseBool(c.GetHeader("x-ms-cosmos-is-batch-request"))
|
||||||
|
if isBatchRequest {
|
||||||
|
h.handleBatchRequest(c)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var requestBody map[string]interface{}
|
var requestBody map[string]interface{}
|
||||||
if err := c.BindJSON(&requestBody); err != nil {
|
if err := c.BindJSON(&requestBody); err != nil {
|
||||||
c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()})
|
c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()})
|
||||||
@ -191,30 +199,7 @@ func (h *Handlers) DocumentsPost(c *gin.Context) {
|
|||||||
|
|
||||||
query := requestBody["query"]
|
query := requestBody["query"]
|
||||||
if query != nil {
|
if query != nil {
|
||||||
if c.GetHeader("x-ms-cosmos-is-query-plan-request") != "" {
|
h.handleDocumentQuery(c, requestBody)
|
||||||
c.IndentedJSON(http.StatusOK, constants.QueryPlanResponse)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var queryParameters map[string]interface{}
|
|
||||||
if paramsArray, ok := requestBody["parameters"].([]interface{}); ok {
|
|
||||||
queryParameters = parametersToMap(paramsArray)
|
|
||||||
}
|
|
||||||
|
|
||||||
docs, status := h.repository.ExecuteQueryDocuments(databaseId, collectionId, query.(string), queryParameters)
|
|
||||||
if status != repositorymodels.StatusOk {
|
|
||||||
// TODO: Currently we return everything if the query fails
|
|
||||||
h.GetAllDocuments(c)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
collection, _ := h.repository.GetCollection(databaseId, collectionId)
|
|
||||||
c.Header("x-ms-item-count", fmt.Sprintf("%d", len(docs)))
|
|
||||||
c.IndentedJSON(http.StatusOK, gin.H{
|
|
||||||
"_rid": collection.ResourceID,
|
|
||||||
"Documents": docs,
|
|
||||||
"_count": len(docs),
|
|
||||||
})
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -253,3 +238,131 @@ func parametersToMap(pairs []interface{}) map[string]interface{} {
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handlers) handleDocumentQuery(c *gin.Context, requestBody map[string]interface{}) {
|
||||||
|
databaseId := c.Param("databaseId")
|
||||||
|
collectionId := c.Param("collId")
|
||||||
|
|
||||||
|
if c.GetHeader("x-ms-cosmos-is-query-plan-request") != "" {
|
||||||
|
c.IndentedJSON(http.StatusOK, constants.QueryPlanResponse)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var queryParameters map[string]interface{}
|
||||||
|
if paramsArray, ok := requestBody["parameters"].([]interface{}); ok {
|
||||||
|
queryParameters = parametersToMap(paramsArray)
|
||||||
|
}
|
||||||
|
|
||||||
|
docs, status := h.repository.ExecuteQueryDocuments(databaseId, collectionId, requestBody["query"].(string), queryParameters)
|
||||||
|
if status != repositorymodels.StatusOk {
|
||||||
|
// TODO: Currently we return everything if the query fails
|
||||||
|
h.GetAllDocuments(c)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
collection, _ := h.repository.GetCollection(databaseId, collectionId)
|
||||||
|
c.Header("x-ms-item-count", fmt.Sprintf("%d", len(docs)))
|
||||||
|
c.IndentedJSON(http.StatusOK, gin.H{
|
||||||
|
"_rid": collection.ResourceID,
|
||||||
|
"Documents": docs,
|
||||||
|
"_count": len(docs),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handlers) handleBatchRequest(c *gin.Context) {
|
||||||
|
databaseId := c.Param("databaseId")
|
||||||
|
collectionId := c.Param("collId")
|
||||||
|
|
||||||
|
batchOperations := make([]apimodels.BatchOperation, 0)
|
||||||
|
if err := c.BindJSON(&batchOperations); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
batchOperationResults := make([]apimodels.BatchOperationResult, len(batchOperations))
|
||||||
|
for idx, operation := range batchOperations {
|
||||||
|
switch operation.OperationType {
|
||||||
|
case apimodels.BatchOperationTypeCreate:
|
||||||
|
createdDocument, status := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody)
|
||||||
|
responseCode := repositoryStatusToResponseCode(status)
|
||||||
|
if status == repositorymodels.StatusOk {
|
||||||
|
responseCode = http.StatusCreated
|
||||||
|
}
|
||||||
|
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||||
|
StatusCode: responseCode,
|
||||||
|
ResourceBody: createdDocument,
|
||||||
|
}
|
||||||
|
case apimodels.BatchOperationTypeDelete:
|
||||||
|
status := h.repository.DeleteDocument(databaseId, collectionId, operation.Id)
|
||||||
|
responseCode := repositoryStatusToResponseCode(status)
|
||||||
|
if status == repositorymodels.StatusOk {
|
||||||
|
responseCode = http.StatusNoContent
|
||||||
|
}
|
||||||
|
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||||
|
StatusCode: responseCode,
|
||||||
|
}
|
||||||
|
case apimodels.BatchOperationTypeReplace:
|
||||||
|
deleteStatus := h.repository.DeleteDocument(databaseId, collectionId, operation.Id)
|
||||||
|
if deleteStatus == repositorymodels.StatusNotFound {
|
||||||
|
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||||
|
StatusCode: http.StatusNotFound,
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
createdDocument, createStatus := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody)
|
||||||
|
responseCode := repositoryStatusToResponseCode(createStatus)
|
||||||
|
if createStatus == repositorymodels.StatusOk {
|
||||||
|
responseCode = http.StatusCreated
|
||||||
|
}
|
||||||
|
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||||
|
StatusCode: responseCode,
|
||||||
|
ResourceBody: createdDocument,
|
||||||
|
}
|
||||||
|
case apimodels.BatchOperationTypeUpsert:
|
||||||
|
documentId := operation.ResourceBody["id"].(string)
|
||||||
|
h.repository.DeleteDocument(databaseId, collectionId, documentId)
|
||||||
|
createdDocument, createStatus := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody)
|
||||||
|
responseCode := repositoryStatusToResponseCode(createStatus)
|
||||||
|
if createStatus == repositorymodels.StatusOk {
|
||||||
|
responseCode = http.StatusCreated
|
||||||
|
}
|
||||||
|
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||||
|
StatusCode: responseCode,
|
||||||
|
ResourceBody: createdDocument,
|
||||||
|
}
|
||||||
|
case apimodels.BatchOperationTypeRead:
|
||||||
|
document, status := h.repository.GetDocument(databaseId, collectionId, operation.Id)
|
||||||
|
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||||
|
StatusCode: repositoryStatusToResponseCode(status),
|
||||||
|
ResourceBody: document,
|
||||||
|
}
|
||||||
|
case apimodels.BatchOperationTypePatch:
|
||||||
|
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||||
|
StatusCode: http.StatusNotImplemented,
|
||||||
|
Message: "Patch operation is not implemented",
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
batchOperationResults[idx] = apimodels.BatchOperationResult{
|
||||||
|
StatusCode: http.StatusBadRequest,
|
||||||
|
Message: "Unknown operation type",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, batchOperationResults)
|
||||||
|
}
|
||||||
|
|
||||||
|
func repositoryStatusToResponseCode(status repositorymodels.RepositoryStatus) int {
|
||||||
|
switch status {
|
||||||
|
case repositorymodels.StatusOk:
|
||||||
|
return http.StatusOK
|
||||||
|
case repositorymodels.StatusNotFound:
|
||||||
|
return http.StatusNotFound
|
||||||
|
case repositorymodels.Conflict:
|
||||||
|
return http.StatusConflict
|
||||||
|
case repositorymodels.BadRequest:
|
||||||
|
return http.StatusBadRequest
|
||||||
|
default:
|
||||||
|
return http.StatusInternalServerError
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
|
|
||||||
"github.com/pikami/cosmium/api"
|
"github.com/pikami/cosmium/api"
|
||||||
"github.com/pikami/cosmium/api/config"
|
"github.com/pikami/cosmium/api/config"
|
||||||
|
"github.com/pikami/cosmium/internal/logger"
|
||||||
"github.com/pikami/cosmium/internal/repositories"
|
"github.com/pikami/cosmium/internal/repositories"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -35,6 +36,9 @@ func runTestServer() *TestServer {
|
|||||||
ExplorerBaseUrlLocation: config.ExplorerBaseUrlLocation,
|
ExplorerBaseUrlLocation: config.ExplorerBaseUrlLocation,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
config.LogLevel = "debug"
|
||||||
|
logger.SetLogLevel(logger.LogLevelDebug)
|
||||||
|
|
||||||
return runTestServerCustomConfig(config)
|
return runTestServerCustomConfig(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,5 +377,140 @@ func Test_Documents_Patch(t *testing.T) {
|
|||||||
assert.NotNil(t, r)
|
assert.NotNil(t, r)
|
||||||
assert.Nil(t, err2)
|
assert.Nil(t, err2)
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_Documents_TransactionalBatch(t *testing.T) {
|
||||||
|
ts, collectionClient := documents_InitializeDb(t)
|
||||||
|
defer ts.Server.Close()
|
||||||
|
|
||||||
|
t.Run("Should execute CREATE transactional batch", func(t *testing.T) {
|
||||||
|
context := context.TODO()
|
||||||
|
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||||
|
|
||||||
|
newItem := map[string]interface{}{
|
||||||
|
"id": "678901",
|
||||||
|
}
|
||||||
|
bytes, err := json.Marshal(newItem)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
batch.CreateItem(bytes, nil)
|
||||||
|
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.True(t, response.Success)
|
||||||
|
assert.Equal(t, 1, len(response.OperationResults))
|
||||||
|
|
||||||
|
operationResponse := response.OperationResults[0]
|
||||||
|
assert.NotNil(t, operationResponse)
|
||||||
|
assert.NotNil(t, operationResponse.ResourceBody)
|
||||||
|
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||||
|
|
||||||
|
var itemResponseBody map[string]interface{}
|
||||||
|
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||||
|
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||||
|
|
||||||
|
createdDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||||
|
assert.Equal(t, newItem["id"], createdDoc["id"])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Should execute DELETE transactional batch", func(t *testing.T) {
|
||||||
|
context := context.TODO()
|
||||||
|
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||||
|
|
||||||
|
batch.DeleteItem("12345", nil)
|
||||||
|
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.True(t, response.Success)
|
||||||
|
assert.Equal(t, 1, len(response.OperationResults))
|
||||||
|
|
||||||
|
operationResponse := response.OperationResults[0]
|
||||||
|
assert.NotNil(t, operationResponse)
|
||||||
|
assert.Equal(t, int32(http.StatusNoContent), operationResponse.StatusCode)
|
||||||
|
|
||||||
|
_, status := ts.Repository.GetDocument(testDatabaseName, testCollectionName, "12345")
|
||||||
|
assert.Equal(t, repositorymodels.StatusNotFound, int(status))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Should execute REPLACE transactional batch", func(t *testing.T) {
|
||||||
|
context := context.TODO()
|
||||||
|
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||||
|
|
||||||
|
newItem := map[string]interface{}{
|
||||||
|
"id": "67890",
|
||||||
|
"pk": "666",
|
||||||
|
}
|
||||||
|
bytes, err := json.Marshal(newItem)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
batch.ReplaceItem("67890", bytes, nil)
|
||||||
|
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.True(t, response.Success)
|
||||||
|
assert.Equal(t, 1, len(response.OperationResults))
|
||||||
|
|
||||||
|
operationResponse := response.OperationResults[0]
|
||||||
|
assert.NotNil(t, operationResponse)
|
||||||
|
assert.NotNil(t, operationResponse.ResourceBody)
|
||||||
|
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||||
|
|
||||||
|
var itemResponseBody map[string]interface{}
|
||||||
|
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||||
|
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||||
|
assert.Equal(t, newItem["pk"], itemResponseBody["pk"])
|
||||||
|
|
||||||
|
updatedDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||||
|
assert.Equal(t, newItem["id"], updatedDoc["id"])
|
||||||
|
assert.Equal(t, newItem["pk"], updatedDoc["pk"])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Should execute UPSERT transactional batch", func(t *testing.T) {
|
||||||
|
context := context.TODO()
|
||||||
|
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||||
|
|
||||||
|
newItem := map[string]interface{}{
|
||||||
|
"id": "678901",
|
||||||
|
"pk": "666",
|
||||||
|
}
|
||||||
|
bytes, err := json.Marshal(newItem)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
batch.UpsertItem(bytes, nil)
|
||||||
|
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.True(t, response.Success)
|
||||||
|
assert.Equal(t, 1, len(response.OperationResults))
|
||||||
|
|
||||||
|
operationResponse := response.OperationResults[0]
|
||||||
|
assert.NotNil(t, operationResponse)
|
||||||
|
assert.NotNil(t, operationResponse.ResourceBody)
|
||||||
|
assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode)
|
||||||
|
|
||||||
|
var itemResponseBody map[string]interface{}
|
||||||
|
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||||
|
assert.Equal(t, newItem["id"], itemResponseBody["id"])
|
||||||
|
assert.Equal(t, newItem["pk"], itemResponseBody["pk"])
|
||||||
|
|
||||||
|
updatedDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string))
|
||||||
|
assert.Equal(t, newItem["id"], updatedDoc["id"])
|
||||||
|
assert.Equal(t, newItem["pk"], updatedDoc["pk"])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Should execute READ transactional batch", func(t *testing.T) {
|
||||||
|
context := context.TODO()
|
||||||
|
batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk"))
|
||||||
|
|
||||||
|
batch.ReadItem("67890", nil)
|
||||||
|
response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.True(t, response.Success)
|
||||||
|
assert.Equal(t, 1, len(response.OperationResults))
|
||||||
|
|
||||||
|
operationResponse := response.OperationResults[0]
|
||||||
|
assert.NotNil(t, operationResponse)
|
||||||
|
assert.NotNil(t, operationResponse.ResourceBody)
|
||||||
|
assert.Equal(t, int32(http.StatusOK), operationResponse.StatusCode)
|
||||||
|
|
||||||
|
var itemResponseBody map[string]interface{}
|
||||||
|
json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody)
|
||||||
|
assert.Equal(t, "67890", itemResponseBody["id"])
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
@ -204,14 +204,18 @@ Cosmium strives to support the core features of Cosmos DB, including:
|
|||||||
| IS_PRIMITIVE | Yes |
|
| IS_PRIMITIVE | Yes |
|
||||||
| IS_STRING | Yes |
|
| IS_STRING | Yes |
|
||||||
|
|
||||||
### Document Batch Requests
|
### Transactional batch operations
|
||||||
|
|
||||||
|
Note: There's actually no transaction here. Think of this as a 'bulk operation' that can partially succeed.
|
||||||
|
|
||||||
| Operation | Implemented |
|
| Operation | Implemented |
|
||||||
| --------- | ----------- |
|
| --------- | ----------- |
|
||||||
| Create | No |
|
| Create | Yes |
|
||||||
| Update | No |
|
| Delete | Yes |
|
||||||
| Delete | No |
|
| Replace | Yes |
|
||||||
| Read | No |
|
| Upsert | Yes |
|
||||||
|
| Read | Yes |
|
||||||
|
| Patch | No |
|
||||||
|
|
||||||
## Known Differences
|
## Known Differences
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user