mirror of
https://github.com/pikami/cosmium.git
synced 2026-06-11 14:57:01 +01:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 36fd7f48cc |
@@ -47,6 +47,9 @@ func (h *Handlers) GetDocument(c *gin.Context) {
|
|||||||
|
|
||||||
document, status := h.dataStore.GetDocument(databaseId, collectionId, documentId)
|
document, status := h.dataStore.GetDocument(databaseId, collectionId, documentId)
|
||||||
if status == datastore.StatusOk {
|
if status == datastore.StatusOk {
|
||||||
|
if etag, ok := document["_etag"].(string); ok {
|
||||||
|
c.Header(headers.ETag, etag)
|
||||||
|
}
|
||||||
c.IndentedJSON(http.StatusOK, document)
|
c.IndentedJSON(http.StatusOK, document)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -90,7 +93,25 @@ func (h *Handlers) ReplaceDocument(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
status := h.dataStore.DeleteDocument(databaseId, collectionId, documentId)
|
existingDocument, status := h.dataStore.GetDocument(databaseId, collectionId, documentId)
|
||||||
|
if status == datastore.StatusNotFound {
|
||||||
|
c.IndentedJSON(http.StatusNotFound, constants.NotFoundResponse)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if status != datastore.StatusOk {
|
||||||
|
c.IndentedJSON(http.StatusInternalServerError, constants.UnknownErrorResponse)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if ifMatch := c.GetHeader(headers.IfMatch); ifMatch != "" {
|
||||||
|
if existingDocument["_etag"] != ifMatch {
|
||||||
|
c.Header(headers.ErrorCode, "PreconditionFailed")
|
||||||
|
c.JSON(http.StatusPreconditionFailed, constants.PreconditionFailedResponse)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
status = h.dataStore.DeleteDocument(databaseId, collectionId, documentId)
|
||||||
if status == datastore.StatusNotFound {
|
if status == datastore.StatusNotFound {
|
||||||
c.IndentedJSON(http.StatusNotFound, constants.NotFoundResponse)
|
c.IndentedJSON(http.StatusNotFound, constants.NotFoundResponse)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -4,8 +4,10 @@ const (
|
|||||||
AIM = "A-Im"
|
AIM = "A-Im"
|
||||||
Authorization = "authorization"
|
Authorization = "authorization"
|
||||||
CosmosLsn = "x-ms-cosmos-llsn"
|
CosmosLsn = "x-ms-cosmos-llsn"
|
||||||
|
ErrorCode = "x-ms-error-code"
|
||||||
ETag = "etag"
|
ETag = "etag"
|
||||||
GlobalCommittedLsn = "x-ms-global-committed-lsn"
|
GlobalCommittedLsn = "x-ms-global-committed-lsn"
|
||||||
|
IfMatch = "if-match"
|
||||||
IfNoneMatch = "if-none-match"
|
IfNoneMatch = "if-none-match"
|
||||||
IsBatchRequest = "x-ms-cosmos-is-batch-request"
|
IsBatchRequest = "x-ms-cosmos-is-batch-request"
|
||||||
IsQueryPlanRequest = "x-ms-cosmos-is-query-plan-request"
|
IsQueryPlanRequest = "x-ms-cosmos-is-query-plan-request"
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -379,6 +380,80 @@ func Test_Documents(t *testing.T) {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
runTestsWithPresets(t, "Test_Documents_ETag_OptimisticConcurrency", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||||
|
collectionClient := documents_InitializeDb(t, ts)
|
||||||
|
|
||||||
|
t.Run("Should fail replace with incorrect etag", func(t *testing.T) {
|
||||||
|
context := context.TODO()
|
||||||
|
|
||||||
|
item := map[string]interface{}{"id": "12345", "pk": "123", "isCool": true}
|
||||||
|
bytes, err := json.Marshal(item)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
wrongETag := azcore.ETag("\"incorrect-etag\"")
|
||||||
|
_, err = collectionClient.ReplaceItem(
|
||||||
|
context,
|
||||||
|
azcosmos.PartitionKey{},
|
||||||
|
"12345",
|
||||||
|
bytes,
|
||||||
|
&azcosmos.ItemOptions{IfMatchEtag: &wrongETag},
|
||||||
|
)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
|
||||||
|
var respErr *azcore.ResponseError
|
||||||
|
if errors.As(err, &respErr) {
|
||||||
|
assert.Equal(t, http.StatusPreconditionFailed, respErr.StatusCode)
|
||||||
|
assert.Equal(t, "PreconditionFailed", respErr.RawResponse.Header.Get("x-ms-error-code"))
|
||||||
|
|
||||||
|
responseBody, readErr := io.ReadAll(respErr.RawResponse.Body)
|
||||||
|
assert.Nil(t, readErr)
|
||||||
|
assert.JSONEq(t,
|
||||||
|
`{"code":"PreconditionFailed","message":"Operation cannot be performed because one of the specified precondition is not met."}`,
|
||||||
|
string(responseBody),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
document, status := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, "12345")
|
||||||
|
assert.Equal(t, datastore.StatusOk, status)
|
||||||
|
assert.Equal(t, false, document["isCool"])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Should replace with correct etag", func(t *testing.T) {
|
||||||
|
context := context.TODO()
|
||||||
|
|
||||||
|
readResponse, err := collectionClient.ReadItem(context, azcosmos.PartitionKey{}, "12345", nil)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.NotEmpty(t, readResponse.ETag)
|
||||||
|
|
||||||
|
var item map[string]interface{}
|
||||||
|
err = json.Unmarshal(readResponse.Value, &item)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, string(readResponse.ETag), item["_etag"])
|
||||||
|
|
||||||
|
item["pk"] = "999"
|
||||||
|
item["isCool"] = true
|
||||||
|
bytes, err := json.Marshal(item)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
etag := readResponse.ETag
|
||||||
|
_, err = collectionClient.ReplaceItem(
|
||||||
|
context,
|
||||||
|
azcosmos.PartitionKey{},
|
||||||
|
"12345",
|
||||||
|
bytes,
|
||||||
|
&azcosmos.ItemOptions{IfMatchEtag: &etag},
|
||||||
|
)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
document, status := ts.DataStore.GetDocument(testDatabaseName, testCollectionName, "12345")
|
||||||
|
assert.Equal(t, datastore.StatusOk, status)
|
||||||
|
assert.Equal(t, "999", document["pk"])
|
||||||
|
assert.Equal(t, true, document["isCool"])
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
runTestsWithPresets(t, "Test_Documents_TransactionalBatch", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
runTestsWithPresets(t, "Test_Documents_TransactionalBatch", presets, func(t *testing.T, ts *TestServer, client *azcosmos.Client) {
|
||||||
collectionClient := documents_InitializeDb(t, ts)
|
collectionClient := documents_InitializeDb(t, ts)
|
||||||
|
|
||||||
|
|||||||
@@ -35,3 +35,7 @@ var UnknownErrorResponse = gin.H{"message": "Unknown error"}
|
|||||||
var NotFoundResponse = gin.H{"message": "NotFound"}
|
var NotFoundResponse = gin.H{"message": "NotFound"}
|
||||||
var ConflictResponse = gin.H{"message": "Conflict"}
|
var ConflictResponse = gin.H{"message": "Conflict"}
|
||||||
var BadRequestResponse = gin.H{"message": "BadRequest"}
|
var BadRequestResponse = gin.H{"message": "BadRequest"}
|
||||||
|
var PreconditionFailedResponse = gin.H{
|
||||||
|
"code": "PreconditionFailed",
|
||||||
|
"message": "Operation cannot be performed because one of the specified precondition is not met.",
|
||||||
|
}
|
||||||
|
|||||||
@@ -12,8 +12,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type BadgerDataStore struct {
|
type BadgerDataStore struct {
|
||||||
db *badger.DB
|
db *badger.DB
|
||||||
gcTicker *time.Ticker
|
gcTicker *time.Ticker
|
||||||
|
gcDone chan struct{}
|
||||||
|
gcStopped chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type BadgerDataStoreOptions struct {
|
type BadgerDataStoreOptions struct {
|
||||||
@@ -36,8 +38,10 @@ func NewBadgerDataStore(options BadgerDataStoreOptions) *BadgerDataStore {
|
|||||||
gcTicker := time.NewTicker(5 * time.Minute)
|
gcTicker := time.NewTicker(5 * time.Minute)
|
||||||
|
|
||||||
ds := &BadgerDataStore{
|
ds := &BadgerDataStore{
|
||||||
db: db,
|
db: db,
|
||||||
gcTicker: gcTicker,
|
gcTicker: gcTicker,
|
||||||
|
gcDone: make(chan struct{}),
|
||||||
|
gcStopped: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
ds.initializeDataStore(options.InitialDataFilePath)
|
ds.initializeDataStore(options.InitialDataFilePath)
|
||||||
@@ -50,7 +54,8 @@ func NewBadgerDataStore(options BadgerDataStoreOptions) *BadgerDataStore {
|
|||||||
func (r *BadgerDataStore) Close() {
|
func (r *BadgerDataStore) Close() {
|
||||||
if r.gcTicker != nil {
|
if r.gcTicker != nil {
|
||||||
r.gcTicker.Stop()
|
r.gcTicker.Stop()
|
||||||
r.gcTicker = nil
|
close(r.gcDone)
|
||||||
|
<-r.gcStopped
|
||||||
}
|
}
|
||||||
|
|
||||||
r.db.Close()
|
r.db.Close()
|
||||||
@@ -63,11 +68,19 @@ func (r *BadgerDataStore) DumpToJson() (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *BadgerDataStore) runGarbageCollector() {
|
func (r *BadgerDataStore) runGarbageCollector() {
|
||||||
for range r.gcTicker.C {
|
defer close(r.gcStopped)
|
||||||
again:
|
|
||||||
err := r.db.RunValueLogGC(0.7)
|
for {
|
||||||
if err == nil {
|
select {
|
||||||
goto again
|
case <-r.gcTicker.C:
|
||||||
|
for {
|
||||||
|
err := r.db.RunValueLogGC(0.7)
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-r.gcDone:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user