From 5caa829ac17a84d356932c5dcc354fdb05ed0b96 Mon Sep 17 00:00:00 2001 From: Pijus Kamandulis Date: Tue, 4 Feb 2025 20:35:15 +0200 Subject: [PATCH] Implement 'Transactional batch operations' --- api/api_models/models.go | 24 ++++++ api/handlers/documents.go | 161 ++++++++++++++++++++++++++++++------ api/tests/config_test.go | 4 + api/tests/documents_test.go | 137 +++++++++++++++++++++++++++++- docs/COMPATIBILITY.md | 14 ++-- 5 files changed, 310 insertions(+), 30 deletions(-) create mode 100644 api/api_models/models.go diff --git a/api/api_models/models.go b/api/api_models/models.go new file mode 100644 index 0000000..10a6c40 --- /dev/null +++ b/api/api_models/models.go @@ -0,0 +1,24 @@ +package apimodels + +const ( + BatchOperationTypeCreate = "Create" + BatchOperationTypeDelete = "Delete" + BatchOperationTypeReplace = "Replace" + BatchOperationTypeUpsert = "Upsert" + BatchOperationTypeRead = "Read" + BatchOperationTypePatch = "Patch" +) + +type BatchOperation struct { + OperationType string `json:"operationType"` + Id string `json:"id"` + ResourceBody map[string]interface{} `json:"resourceBody"` +} + +type BatchOperationResult struct { + StatusCode int `json:"statusCode"` + RequestCharge float64 `json:"requestCharge"` + ResourceBody map[string]interface{} `json:"resourceBody"` + Etag string `json:"etag"` + Message string `json:"message"` +} diff --git a/api/handlers/documents.go b/api/handlers/documents.go index 2cf2b85..ba0a958 100644 --- a/api/handlers/documents.go +++ b/api/handlers/documents.go @@ -8,6 +8,7 @@ import ( jsonpatch "github.com/cosmiumdev/json-patch/v5" "github.com/gin-gonic/gin" + apimodels "github.com/pikami/cosmium/api/api_models" "github.com/pikami/cosmium/internal/constants" "github.com/pikami/cosmium/internal/logger" repositorymodels "github.com/pikami/cosmium/internal/repository_models" @@ -183,6 +184,13 @@ func (h *Handlers) DocumentsPost(c *gin.Context) { databaseId := c.Param("databaseId") collectionId := c.Param("collId") + // Handle batch requests + isBatchRequest, _ := strconv.ParseBool(c.GetHeader("x-ms-cosmos-is-batch-request")) + if isBatchRequest { + h.handleBatchRequest(c) + return + } + var requestBody map[string]interface{} if err := c.BindJSON(&requestBody); err != nil { c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()}) @@ -191,30 +199,7 @@ func (h *Handlers) DocumentsPost(c *gin.Context) { query := requestBody["query"] if query != nil { - if c.GetHeader("x-ms-cosmos-is-query-plan-request") != "" { - c.IndentedJSON(http.StatusOK, constants.QueryPlanResponse) - return - } - - var queryParameters map[string]interface{} - if paramsArray, ok := requestBody["parameters"].([]interface{}); ok { - queryParameters = parametersToMap(paramsArray) - } - - docs, status := h.repository.ExecuteQueryDocuments(databaseId, collectionId, query.(string), queryParameters) - if status != repositorymodels.StatusOk { - // TODO: Currently we return everything if the query fails - h.GetAllDocuments(c) - return - } - - collection, _ := h.repository.GetCollection(databaseId, collectionId) - c.Header("x-ms-item-count", fmt.Sprintf("%d", len(docs))) - c.IndentedJSON(http.StatusOK, gin.H{ - "_rid": collection.ResourceID, - "Documents": docs, - "_count": len(docs), - }) + h.handleDocumentQuery(c, requestBody) return } @@ -253,3 +238,131 @@ func parametersToMap(pairs []interface{}) map[string]interface{} { return result } + +func (h *Handlers) handleDocumentQuery(c *gin.Context, requestBody map[string]interface{}) { + databaseId := c.Param("databaseId") + collectionId := c.Param("collId") + + if c.GetHeader("x-ms-cosmos-is-query-plan-request") != "" { + c.IndentedJSON(http.StatusOK, constants.QueryPlanResponse) + return + } + + var queryParameters map[string]interface{} + if paramsArray, ok := requestBody["parameters"].([]interface{}); ok { + queryParameters = parametersToMap(paramsArray) + } + + docs, status := h.repository.ExecuteQueryDocuments(databaseId, collectionId, requestBody["query"].(string), queryParameters) + if status != repositorymodels.StatusOk { + // TODO: Currently we return everything if the query fails + h.GetAllDocuments(c) + return + } + + collection, _ := h.repository.GetCollection(databaseId, collectionId) + c.Header("x-ms-item-count", fmt.Sprintf("%d", len(docs))) + c.IndentedJSON(http.StatusOK, gin.H{ + "_rid": collection.ResourceID, + "Documents": docs, + "_count": len(docs), + }) +} + +func (h *Handlers) handleBatchRequest(c *gin.Context) { + databaseId := c.Param("databaseId") + collectionId := c.Param("collId") + + batchOperations := make([]apimodels.BatchOperation, 0) + if err := c.BindJSON(&batchOperations); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"message": err.Error()}) + return + } + + batchOperationResults := make([]apimodels.BatchOperationResult, len(batchOperations)) + for idx, operation := range batchOperations { + switch operation.OperationType { + case apimodels.BatchOperationTypeCreate: + createdDocument, status := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody) + responseCode := repositoryStatusToResponseCode(status) + if status == repositorymodels.StatusOk { + responseCode = http.StatusCreated + } + batchOperationResults[idx] = apimodels.BatchOperationResult{ + StatusCode: responseCode, + ResourceBody: createdDocument, + } + case apimodels.BatchOperationTypeDelete: + status := h.repository.DeleteDocument(databaseId, collectionId, operation.Id) + responseCode := repositoryStatusToResponseCode(status) + if status == repositorymodels.StatusOk { + responseCode = http.StatusNoContent + } + batchOperationResults[idx] = apimodels.BatchOperationResult{ + StatusCode: responseCode, + } + case apimodels.BatchOperationTypeReplace: + deleteStatus := h.repository.DeleteDocument(databaseId, collectionId, operation.Id) + if deleteStatus == repositorymodels.StatusNotFound { + batchOperationResults[idx] = apimodels.BatchOperationResult{ + StatusCode: http.StatusNotFound, + } + continue + } + createdDocument, createStatus := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody) + responseCode := repositoryStatusToResponseCode(createStatus) + if createStatus == repositorymodels.StatusOk { + responseCode = http.StatusCreated + } + batchOperationResults[idx] = apimodels.BatchOperationResult{ + StatusCode: responseCode, + ResourceBody: createdDocument, + } + case apimodels.BatchOperationTypeUpsert: + documentId := operation.ResourceBody["id"].(string) + h.repository.DeleteDocument(databaseId, collectionId, documentId) + createdDocument, createStatus := h.repository.CreateDocument(databaseId, collectionId, operation.ResourceBody) + responseCode := repositoryStatusToResponseCode(createStatus) + if createStatus == repositorymodels.StatusOk { + responseCode = http.StatusCreated + } + batchOperationResults[idx] = apimodels.BatchOperationResult{ + StatusCode: responseCode, + ResourceBody: createdDocument, + } + case apimodels.BatchOperationTypeRead: + document, status := h.repository.GetDocument(databaseId, collectionId, operation.Id) + batchOperationResults[idx] = apimodels.BatchOperationResult{ + StatusCode: repositoryStatusToResponseCode(status), + ResourceBody: document, + } + case apimodels.BatchOperationTypePatch: + batchOperationResults[idx] = apimodels.BatchOperationResult{ + StatusCode: http.StatusNotImplemented, + Message: "Patch operation is not implemented", + } + default: + batchOperationResults[idx] = apimodels.BatchOperationResult{ + StatusCode: http.StatusBadRequest, + Message: "Unknown operation type", + } + } + } + + c.JSON(http.StatusOK, batchOperationResults) +} + +func repositoryStatusToResponseCode(status repositorymodels.RepositoryStatus) int { + switch status { + case repositorymodels.StatusOk: + return http.StatusOK + case repositorymodels.StatusNotFound: + return http.StatusNotFound + case repositorymodels.Conflict: + return http.StatusConflict + case repositorymodels.BadRequest: + return http.StatusBadRequest + default: + return http.StatusInternalServerError + } +} diff --git a/api/tests/config_test.go b/api/tests/config_test.go index 2fe2e72..c92cf84 100644 --- a/api/tests/config_test.go +++ b/api/tests/config_test.go @@ -5,6 +5,7 @@ import ( "github.com/pikami/cosmium/api" "github.com/pikami/cosmium/api/config" + "github.com/pikami/cosmium/internal/logger" "github.com/pikami/cosmium/internal/repositories" ) @@ -35,6 +36,9 @@ func runTestServer() *TestServer { ExplorerBaseUrlLocation: config.ExplorerBaseUrlLocation, } + config.LogLevel = "debug" + logger.SetLogLevel(logger.LogLevelDebug) + return runTestServerCustomConfig(config) } diff --git a/api/tests/documents_test.go b/api/tests/documents_test.go index 0f71d90..54609d3 100644 --- a/api/tests/documents_test.go +++ b/api/tests/documents_test.go @@ -377,5 +377,140 @@ func Test_Documents_Patch(t *testing.T) { assert.NotNil(t, r) assert.Nil(t, err2) }) - +} + +func Test_Documents_TransactionalBatch(t *testing.T) { + ts, collectionClient := documents_InitializeDb(t) + defer ts.Server.Close() + + t.Run("Should execute CREATE transactional batch", func(t *testing.T) { + context := context.TODO() + batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) + + newItem := map[string]interface{}{ + "id": "678901", + } + bytes, err := json.Marshal(newItem) + assert.Nil(t, err) + + batch.CreateItem(bytes, nil) + response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) + assert.Nil(t, err) + assert.True(t, response.Success) + assert.Equal(t, 1, len(response.OperationResults)) + + operationResponse := response.OperationResults[0] + assert.NotNil(t, operationResponse) + assert.NotNil(t, operationResponse.ResourceBody) + assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode) + + var itemResponseBody map[string]interface{} + json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) + assert.Equal(t, newItem["id"], itemResponseBody["id"]) + + createdDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string)) + assert.Equal(t, newItem["id"], createdDoc["id"]) + }) + + t.Run("Should execute DELETE transactional batch", func(t *testing.T) { + context := context.TODO() + batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) + + batch.DeleteItem("12345", nil) + response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) + assert.Nil(t, err) + assert.True(t, response.Success) + assert.Equal(t, 1, len(response.OperationResults)) + + operationResponse := response.OperationResults[0] + assert.NotNil(t, operationResponse) + assert.Equal(t, int32(http.StatusNoContent), operationResponse.StatusCode) + + _, status := ts.Repository.GetDocument(testDatabaseName, testCollectionName, "12345") + assert.Equal(t, repositorymodels.StatusNotFound, int(status)) + }) + + t.Run("Should execute REPLACE transactional batch", func(t *testing.T) { + context := context.TODO() + batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) + + newItem := map[string]interface{}{ + "id": "67890", + "pk": "666", + } + bytes, err := json.Marshal(newItem) + assert.Nil(t, err) + + batch.ReplaceItem("67890", bytes, nil) + response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) + assert.Nil(t, err) + assert.True(t, response.Success) + assert.Equal(t, 1, len(response.OperationResults)) + + operationResponse := response.OperationResults[0] + assert.NotNil(t, operationResponse) + assert.NotNil(t, operationResponse.ResourceBody) + assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode) + + var itemResponseBody map[string]interface{} + json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) + assert.Equal(t, newItem["id"], itemResponseBody["id"]) + assert.Equal(t, newItem["pk"], itemResponseBody["pk"]) + + updatedDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string)) + assert.Equal(t, newItem["id"], updatedDoc["id"]) + assert.Equal(t, newItem["pk"], updatedDoc["pk"]) + }) + + t.Run("Should execute UPSERT transactional batch", func(t *testing.T) { + context := context.TODO() + batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) + + newItem := map[string]interface{}{ + "id": "678901", + "pk": "666", + } + bytes, err := json.Marshal(newItem) + assert.Nil(t, err) + + batch.UpsertItem(bytes, nil) + response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) + assert.Nil(t, err) + assert.True(t, response.Success) + assert.Equal(t, 1, len(response.OperationResults)) + + operationResponse := response.OperationResults[0] + assert.NotNil(t, operationResponse) + assert.NotNil(t, operationResponse.ResourceBody) + assert.Equal(t, int32(http.StatusCreated), operationResponse.StatusCode) + + var itemResponseBody map[string]interface{} + json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) + assert.Equal(t, newItem["id"], itemResponseBody["id"]) + assert.Equal(t, newItem["pk"], itemResponseBody["pk"]) + + updatedDoc, _ := ts.Repository.GetDocument(testDatabaseName, testCollectionName, newItem["id"].(string)) + assert.Equal(t, newItem["id"], updatedDoc["id"]) + assert.Equal(t, newItem["pk"], updatedDoc["pk"]) + }) + + t.Run("Should execute READ transactional batch", func(t *testing.T) { + context := context.TODO() + batch := collectionClient.NewTransactionalBatch(azcosmos.NewPartitionKeyString("pk")) + + batch.ReadItem("67890", nil) + response, err := collectionClient.ExecuteTransactionalBatch(context, batch, &azcosmos.TransactionalBatchOptions{}) + assert.Nil(t, err) + assert.True(t, response.Success) + assert.Equal(t, 1, len(response.OperationResults)) + + operationResponse := response.OperationResults[0] + assert.NotNil(t, operationResponse) + assert.NotNil(t, operationResponse.ResourceBody) + assert.Equal(t, int32(http.StatusOK), operationResponse.StatusCode) + + var itemResponseBody map[string]interface{} + json.Unmarshal(operationResponse.ResourceBody, &itemResponseBody) + assert.Equal(t, "67890", itemResponseBody["id"]) + }) } diff --git a/docs/COMPATIBILITY.md b/docs/COMPATIBILITY.md index 4721da6..d3f9c0e 100644 --- a/docs/COMPATIBILITY.md +++ b/docs/COMPATIBILITY.md @@ -204,14 +204,18 @@ Cosmium strives to support the core features of Cosmos DB, including: | IS_PRIMITIVE | Yes | | IS_STRING | Yes | -### Document Batch Requests +### Transactional batch operations + +Note: There's actually no transaction here. Think of this as a 'bulk operation' that can partially succeed. | Operation | Implemented | | --------- | ----------- | -| Create | No | -| Update | No | -| Delete | No | -| Read | No | +| Create | Yes | +| Delete | Yes | +| Replace | Yes | +| Upsert | Yes | +| Read | Yes | +| Patch | No | ## Known Differences