From 36fd7f48ccefb89b5c36c57c4cb138e3b4f730a2 Mon Sep 17 00:00:00 2001 From: Pijus Kamandulis Date: Fri, 5 Jun 2026 23:26:25 +0300 Subject: [PATCH] Add document ETag optimistic concurrency (#16) * Add ETag optimistic concurrency for document replace Co-authored-by: Pijus Kamandulis * Expose precondition error code header Co-authored-by: Pijus Kamandulis * Stop Badger GC before closing datastore Co-authored-by: Pijus Kamandulis --------- Co-authored-by: Cursor Agent --- api/handlers/documents.go | 23 +++++- api/headers/headers.go | 2 + api/tests/documents_test.go | 75 +++++++++++++++++++ internal/constants/responses.go | 4 + .../badger_datastore/badger_datastore.go | 33 +++++--- 5 files changed, 126 insertions(+), 11 deletions(-) diff --git a/api/handlers/documents.go b/api/handlers/documents.go index 5f3fa68..5a56e89 100644 --- a/api/handlers/documents.go +++ b/api/handlers/documents.go @@ -47,6 +47,9 @@ func (h *Handlers) GetDocument(c *gin.Context) { document, status := h.dataStore.GetDocument(databaseId, collectionId, documentId) if status == datastore.StatusOk { + if etag, ok := document["_etag"].(string); ok { + c.Header(headers.ETag, etag) + } c.IndentedJSON(http.StatusOK, document) return } @@ -90,7 +93,25 @@ func (h *Handlers) ReplaceDocument(c *gin.Context) { return } - status := h.dataStore.DeleteDocument(databaseId, collectionId, documentId) + existingDocument, status := h.dataStore.GetDocument(databaseId, collectionId, documentId) + if status == datastore.StatusNotFound { + c.IndentedJSON(http.StatusNotFound, constants.NotFoundResponse) + return + } + if status != datastore.StatusOk { + c.IndentedJSON(http.StatusInternalServerError, constants.UnknownErrorResponse) + return + } + + if ifMatch := c.GetHeader(headers.IfMatch); ifMatch != "" { + if existingDocument["_etag"] != ifMatch { + c.Header(headers.ErrorCode, "PreconditionFailed") + c.JSON(http.StatusPreconditionFailed, constants.PreconditionFailedResponse) + return + } + } + + status = h.dataStore.DeleteDocument(databaseId, collectionId, documentId) if status == datastore.StatusNotFound { c.IndentedJSON(http.StatusNotFound, constants.NotFoundResponse) return diff --git a/api/headers/headers.go b/api/headers/headers.go index 76a5999..92c9e26 100644 --- a/api/headers/headers.go +++ b/api/headers/headers.go @@ -4,8 +4,10 @@ const ( AIM = "A-Im" Authorization = "authorization" CosmosLsn = "x-ms-cosmos-llsn" + ErrorCode = "x-ms-error-code" ETag = "etag" GlobalCommittedLsn = "x-ms-global-committed-lsn" + IfMatch = "if-match" IfNoneMatch = "if-none-match" IsBatchRequest = "x-ms-cosmos-is-batch-request" IsQueryPlanRequest = "x-ms-cosmos-is-query-plan-request" diff --git a/api/tests/documents_test.go b/api/tests/documents_test.go index 8c8cc58..a1a8bfa 100644 --- a/api/tests/documents_test.go +++ b/api/tests/documents_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "reflect" "sync" @@ -379,6 +380,80 @@ func Test_Documents(t *testing.T) { }) }) + runTestsWithPresets(t, "Test_Documents_ETag_OptimisticConcurrency", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { + collectionClient := documents_InitializeDb(t, ts) + + t.Run("Should fail replace with incorrect etag", func(t *testing.T) { + context := context.TODO() + + item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": true} + bytes, err := json.Marshal(item) + assert.Nil(t, err) + + wrongETag := azcore.ETag("\"incorrect-etag\"") + _, err = collectionClient.ReplaceItem( + context, + azcosmos.PartitionKey{}, + "12345", + bytes, + &azcosmos.ItemOptions{IfMatchEtag: &wrongETag}, + ) + assert.NotNil(t, err) + + var respErr *azcore.ResponseError + if errors.As(err, &respErr) { + assert.Equal(t, http.StatusPreconditionFailed, respErr.StatusCode) + assert.Equal(t, "PreconditionFailed", respErr.RawResponse.Header.Get("x-ms-error-code")) + + responseBody, readErr := io.ReadAll(respErr.RawResponse.Body) + assert.Nil(t, readErr) + assert.JSONEq(t, + `{"code":"PreconditionFailed","message":"Operation cannot be performed because one of the specified precondition is not met."}`, + string(responseBody), + ) + } else { + panic(err) + } + + document, status := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, "12345") + assert.Equal(t, datastore.StatusOk, status) + assert.Equal(t, false, document["isCool"]) + }) + + t.Run("Should replace with correct etag", func(t *testing.T) { + context := context.TODO() + + readResponse, err := collectionClient.ReadItem(context, azcosmos.PartitionKey{}, "12345", nil) + assert.Nil(t, err) + assert.NotEmpty(t, readResponse.ETag) + + var item map[string]interface{} + err = json.Unmarshal(readResponse.Value, &item) + assert.Nil(t, err) + assert.Equal(t, string(readResponse.ETag), item["_etag"]) + + item["pk"] = "999" + item["isCool"] = true + bytes, err := json.Marshal(item) + assert.Nil(t, err) + + etag := readResponse.ETag + _, err = collectionClient.ReplaceItem( + context, + azcosmos.PartitionKey{}, + "12345", + bytes, + &azcosmos.ItemOptions{IfMatchEtag: &etag}, + ) + assert.Nil(t, err) + + document, status := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, "12345") + assert.Equal(t, datastore.StatusOk, status) + assert.Equal(t, "999", document["pk"]) + assert.Equal(t, true, document["isCool"]) + }) + }) + runTestsWithPresets(t, "Test_Documents_TransactionalBatch", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { collectionClient := documents_InitializeDb(t, ts) diff --git a/internal/constants/responses.go b/internal/constants/responses.go index ddf1e38..b8b55eb 100644 --- a/internal/constants/responses.go +++ b/internal/constants/responses.go @@ -35,3 +35,7 @@ var UnknownErrorResponse = gin.H{"message": "Unknown error"} var NotFoundResponse = gin.H{"message": "NotFound"} var ConflictResponse = gin.H{"message": "Conflict"} var BadRequestResponse = gin.H{"message": "BadRequest"} +var PreconditionFailedResponse = gin.H{ + "code": "PreconditionFailed", + "message": "Operation cannot be performed because one of the specified precondition is not met.", +} diff --git a/internal/datastore/badger_datastore/badger_datastore.go b/internal/datastore/badger_datastore/badger_datastore.go index d5401fd..7bdef2b 100644 --- a/internal/datastore/badger_datastore/badger_datastore.go +++ b/internal/datastore/badger_datastore/badger_datastore.go @@ -12,8 +12,10 @@ import ( ) type BadgerDataStore struct { - db *badger.DB - gcTicker *time.Ticker + db *badger.DB + gcTicker *time.Ticker + gcDone chan struct{} + gcStopped chan struct{} } type BadgerDataStoreOptions struct { @@ -36,8 +38,10 @@ func NewBadgerDataStore(options BadgerDataStoreOptions) *BadgerDataStore { gcTicker := time.NewTicker(5 * time.Minute) ds := &BadgerDataStore{ - db: db, - gcTicker: gcTicker, + db: db, + gcTicker: gcTicker, + gcDone: make(chan struct{}), + gcStopped: make(chan struct{}), } ds.initializeDataStore(options.InitialDataFilePath) @@ -50,7 +54,8 @@ func NewBadgerDataStore(options BadgerDataStoreOptions) *BadgerDataStore { func (r *BadgerDataStore) Close() { if r.gcTicker != nil { r.gcTicker.Stop() - r.gcTicker = nil + close(r.gcDone) + <-r.gcStopped } r.db.Close() @@ -63,11 +68,19 @@ func (r *BadgerDataStore) DumpToJson() (string, error) { } func (r *BadgerDataStore) runGarbageCollector() { - for range r.gcTicker.C { - again: - err := r.db.RunValueLogGC(0.7) - if err == nil { - goto again + defer close(r.gcStopped) + + for { + select { + case <-r.gcTicker.C: + for { + err := r.db.RunValueLogGC(0.7) + if err != nil { + break + } + } + case <-r.gcDone: + return } } }