Compare commits

..

1 Commits

Author SHA1 Message Date
Pijus Kamandulis 36fd7f48cc Add document ETag optimistic concurrency (#16)
* Add ETag optimistic concurrency for document replace

Co-authored-by: Pijus Kamandulis <pikami@users.noreply.github.com>

* Expose precondition error code header

Co-authored-by: Pijus Kamandulis <pikami@users.noreply.github.com>

* Stop Badger GC before closing datastore

Co-authored-by: Pijus Kamandulis <pikami@users.noreply.github.com>

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
2026-06-05 23:26:25 +03:00
5 changed files with 126 additions and 11 deletions
+22 -1
View File
@@ -47,6 +47,9 @@ func (h *Handlers) GetDocument(c *gin.Context) {
document, status := h.dataStore.GetDocument(databaseId, collectionId, documentId) document, status := h.dataStore.GetDocument(databaseId, collectionId, documentId)
if status == datastore.StatusOk { if status == datastore.StatusOk {
if etag, ok := document["_etag"].(string); ok {
c.Header(headers.ETag, etag)
}
c.IndentedJSON(http.StatusOK, document) c.IndentedJSON(http.StatusOK, document)
return return
} }
@@ -90,7 +93,25 @@ func (h *Handlers) ReplaceDocument(c *gin.Context) {
return return
} }
status := h.dataStore.DeleteDocument(databaseId, collectionId, documentId) existingDocument, status := h.dataStore.GetDocument(databaseId, collectionId, documentId)
if status == datastore.StatusNotFound {
c.IndentedJSON(http.StatusNotFound, constants.NotFoundResponse)
return
}
if status != datastore.StatusOk {
c.IndentedJSON(http.StatusInternalServerError, constants.UnknownErrorResponse)
return
}
if ifMatch := c.GetHeader(headers.IfMatch); ifMatch != "" {
if existingDocument["_etag"] != ifMatch {
c.Header(headers.ErrorCode, "PreconditionFailed")
c.JSON(http.StatusPreconditionFailed, constants.PreconditionFailedResponse)
return
}
}
status = h.dataStore.DeleteDocument(databaseId, collectionId, documentId)
if status == datastore.StatusNotFound { if status == datastore.StatusNotFound {
c.IndentedJSON(http.StatusNotFound, constants.NotFoundResponse) c.IndentedJSON(http.StatusNotFound, constants.NotFoundResponse)
return return
+2
View File
@@ -4,8 +4,10 @@ const (
AIM = "A-Im" AIM = "A-Im"
Authorization = "authorization" Authorization = "authorization"
CosmosLsn = "x-ms-cosmos-llsn" CosmosLsn = "x-ms-cosmos-llsn"
ErrorCode = "x-ms-error-code"
ETag = "etag" ETag = "etag"
GlobalCommittedLsn = "x-ms-global-committed-lsn" GlobalCommittedLsn = "x-ms-global-committed-lsn"
IfMatch = "if-match"
IfNoneMatch = "if-none-match" IfNoneMatch = "if-none-match"
IsBatchRequest = "x-ms-cosmos-is-batch-request" IsBatchRequest = "x-ms-cosmos-is-batch-request"
IsQueryPlanRequest = "x-ms-cosmos-is-query-plan-request" IsQueryPlanRequest = "x-ms-cosmos-is-query-plan-request"
+75
View File
@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"net/http" "net/http"
"reflect" "reflect"
"sync" "sync"
@@ -379,6 +380,80 @@ func Test_Documents(t *testing.T) {
}) })
}) })
runTestsWithPresets(t, "Test_Documents_ETag_OptimisticConcurrency", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
collectionClient := documents_InitializeDb(t, ts)
t.Run("Should fail replace with incorrect etag", func(t *testing.T) {
context := context.TODO()
item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": true}
bytes, err := json.Marshal(item)
assert.Nil(t, err)
wrongETag := azcore.ETag("\"incorrect-etag\"")
_, err = collectionClient.ReplaceItem(
context,
azcosmos.PartitionKey{},
"12345",
bytes,
&azcosmos.ItemOptions{IfMatchEtag: &wrongETag},
)
assert.NotNil(t, err)
var respErr *azcore.ResponseError
if errors.As(err, &respErr) {
assert.Equal(t, http.StatusPreconditionFailed, respErr.StatusCode)
assert.Equal(t, "PreconditionFailed", respErr.RawResponse.Header.Get("x-ms-error-code"))
responseBody, readErr := io.ReadAll(respErr.RawResponse.Body)
assert.Nil(t, readErr)
assert.JSONEq(t,
`{"code":"PreconditionFailed","message":"Operation cannot be performed because one of the specified precondition is not met."}`,
string(responseBody),
)
} else {
panic(err)
}
document, status := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, "12345")
assert.Equal(t, datastore.StatusOk, status)
assert.Equal(t, false, document["isCool"])
})
t.Run("Should replace with correct etag", func(t *testing.T) {
context := context.TODO()
readResponse, err := collectionClient.ReadItem(context, azcosmos.PartitionKey{}, "12345", nil)
assert.Nil(t, err)
assert.NotEmpty(t, readResponse.ETag)
var item map[string]interface{}
err = json.Unmarshal(readResponse.Value, &item)
assert.Nil(t, err)
assert.Equal(t, string(readResponse.ETag), item["_etag"])
item["pk"] = "999"
item["isCool"] = true
bytes, err := json.Marshal(item)
assert.Nil(t, err)
etag := readResponse.ETag
_, err = collectionClient.ReplaceItem(
context,
azcosmos.PartitionKey{},
"12345",
bytes,
&azcosmos.ItemOptions{IfMatchEtag: &etag},
)
assert.Nil(t, err)
document, status := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, "12345")
assert.Equal(t, datastore.StatusOk, status)
assert.Equal(t, "999", document["pk"])
assert.Equal(t, true, document["isCool"])
})
})
runTestsWithPresets(t, "Test_Documents_TransactionalBatch", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) { runTestsWithPresets(t, "Test_Documents_TransactionalBatch", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
collectionClient := documents_InitializeDb(t, ts) collectionClient := documents_InitializeDb(t, ts)
+4
View File
@@ -35,3 +35,7 @@ var UnknownErrorResponse = gin.H{"message": "Unknown error"}
var NotFoundResponse = gin.H{"message": "NotFound"} var NotFoundResponse = gin.H{"message": "NotFound"}
var ConflictResponse = gin.H{"message": "Conflict"} var ConflictResponse = gin.H{"message": "Conflict"}
var BadRequestResponse = gin.H{"message": "BadRequest"} var BadRequestResponse = gin.H{"message": "BadRequest"}
var PreconditionFailedResponse = gin.H{
"code": "PreconditionFailed",
"message": "Operation cannot be performed because one of the specified precondition is not met.",
}
@@ -12,8 +12,10 @@ import (
) )
type BadgerDataStore struct { type BadgerDataStore struct {
db *badger.DB db *badger.DB
gcTicker *time.Ticker gcTicker *time.Ticker
gcDone chan struct{}
gcStopped chan struct{}
} }
type BadgerDataStoreOptions struct { type BadgerDataStoreOptions struct {
@@ -36,8 +38,10 @@ func NewBadgerDataStore(options BadgerDataStoreOptions) *BadgerDataStore {
gcTicker := time.NewTicker(5 * time.Minute) gcTicker := time.NewTicker(5 * time.Minute)
ds := &BadgerDataStore{ ds := &BadgerDataStore{
db: db, db: db,
gcTicker: gcTicker, gcTicker: gcTicker,
gcDone: make(chan struct{}),
gcStopped: make(chan struct{}),
} }
ds.initializeDataStore(options.InitialDataFilePath) ds.initializeDataStore(options.InitialDataFilePath)
@@ -50,7 +54,8 @@ func NewBadgerDataStore(options BadgerDataStoreOptions) *BadgerDataStore {
func (r *BadgerDataStore) Close() { func (r *BadgerDataStore) Close() {
if r.gcTicker != nil { if r.gcTicker != nil {
r.gcTicker.Stop() r.gcTicker.Stop()
r.gcTicker = nil close(r.gcDone)
<-r.gcStopped
} }
r.db.Close() r.db.Close()
@@ -63,11 +68,19 @@ func (r *BadgerDataStore) DumpToJson() (string, error) {
} }
func (r *BadgerDataStore) runGarbageCollector() { func (r *BadgerDataStore) runGarbageCollector() {
for range r.gcTicker.C { defer close(r.gcStopped)
again:
err := r.db.RunValueLogGC(0.7) for {
if err == nil { select {
goto again case <-r.gcTicker.C:
for {
err := r.db.RunValueLogGC(0.7)
if err != nil {
break
}
}
case <-r.gcDone:
return
} }
} }
} }