本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon DocumentDB 中的事务
Amazon DocumentDB(与 MongoDB 兼容)现在支持 MongoDB 4.0 兼容性,包括事务。您可以跨多个文档、报表、集合和数据库执行事务。事务通过使您能够在 Amazon DocumentDB 集群内部跨一个或多个文档执行原子操作、一致操作、隔离操作和持久操作 (ACID),简化应用程序开发。常见的事务用例包括财务处理、履行和管理订单以及开发多人游戏。
启用事务无需其他成本。您只需为交易中消耗的读写 IOs 费用付费。
要求
要使用事务特征,您需要满足以下要求:
最佳实践
以下是一些最佳实践,帮助您通过 Amazon DocumentDB 充分利用事务。
限制
-
Amazon DocumentDB 不支持事务内部的游标。
-
Amazon DocumentDB 无法在事务中创建新集合,并且无法针对不存在的集合查询/更新。
-
文档级写入锁定易受 1 分钟超时影响,用户无法更改设置。
-
Amazon DocumentDB 不支持可重试写入、可重试提交和可重试中止命令。如果您使用旧版 mongo Shell(而非 mongosh),不要在任何代码字符串中包含 retryWrites=false 命令。默认情况下,禁用可重试写入。包含 retryWrites=false 可能导致正常读取命令失败。
-
每个 Amazon DocumentDB 实例对实例上同时打开的并发事务数目都有上限限值。关于限值,请参阅 实例限制。
-
对于给定的事务,事务日志大小必须小于 32MB。
-
Amazon DocumentDB 确实支持事务内部的 count(),但并非所有驱动程序都支持此功能。一种替代方法是使用 countDocuments() API,它将计数查询转换为客户端上的聚合查询。
-
事务有一分钟执行限值,会话有 30 分钟超时。如果事务超时,将被中止,并且会话内部对现有事务发出的任何后续命令都将产生以下错误:
WriteCommandError({
"ok" : 0,
"operationTime" : Timestamp(1603491424, 627726),
"code" : 251,
"errmsg" : "Given transaction number 0 does not match any in-progress transactions."
}
监控和诊断
借助 Amazon DocumentDB 4.0 中对交易的支持,增加了其他 CloudWatch 指标来帮助您监控交易。
新 CloudWatch 指标
-
DatabaseTransactions:在一分钟时段进行的开放事务的数目。
-
DatabaseTransactionsAborted:在一分钟时段进行的已中止事务的数目。
-
DatabaseTransactionsMax:一分钟时段内开放事务的最大数目。
-
TransactionsAborted:一分钟时段内对一个实例中止的事务数目。
-
TransactionsCommitted:一分钟时段内对一个实例提交的事务数目。
-
TransactionsOpen:在一分钟时段对一个实例开放的事务数目。
-
TransactionsOpenMax:一分钟时段内对一个实例开放事务的最大数目。
-
TransactionsStarted:一分钟时段内对一个实例启动的事务数目。
另外,将新字段同时添加至 currentOp lsid、transactionThreadId ,“idle transaction”的新状态与 serverStatus 事务:currentActive、currentInactive、currentOpen、totalAborted、totalCommitted 和 totalStarted。
事务隔离级别
启动事务时,您可以同时指定readConcern和,writeConcern如下例所示:
mySession.startTransaction({readConcern: {level: 'snapshot'}, writeConcern: {w: 'majority'}});
对于readConcern,Amazon DocumentDB 默认支持快照隔离。如果指定了“本地”、“可用”或“多数”的 readConcern,则 Amazon DocumentDB 会将该readConcern级别升级成快照。Amazon DocumentDB 不支持可线性化 readConcern,而指定这样的读取问题会导致错误。
对于writeConcern,Amazon DocumentDB 默认支持多数,当在三个副本中保留四个数据副本时,即达到写入法定人数。 AZs如果指定更低的 writeConcern,则 Amazon DocumentDB 会将 writeConcern 升级成多数。此外,所有 Amazon DocumentDB 写入都被记录,并且日志功能无法禁用。
使用案例
本节将介绍两个事务用例:多语句和多集合。
多语句事务
Amazon DocumentDB 事务有多语句性,这意味着您可以通过显式提交或回滚写入跨多个语句的事务。您可以将 insert、update、delete 和findAndModify操作分组为单一原子操作。
一个多语句事务的常见用例是借-贷事务。例如:您欠朋友购衣钱。因此,您需要从您的账户借记(取出)500美元,并贷记(存入) 500美元至您朋友的账户。要执行该操作,您需要在单笔交易中同时执行借记和贷记操作,以确保原子性。这样做防止出现从您的账户借记了 500美元但未存入您朋友账户的情况。以下是这个用例的样子:
// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
// add $500 to Bob's account
var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
session.commitTransaction();
accountColl.find();
// *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
session.abortTransaction();
多集合事务
我们的事务也有多集合性,这意味着它们可用于执行单一事务内部且跨多个集合的多项操作。这提供一致的数据视图并维持数据完整性。当你将命令作为一个单独提交时<>,事务就是 all-or-nothing执行,也就是说,它们要么全部成功,要么全部失败。
以下是多集合事务的一个示例,其使用来自多语句事务示例的相同场景和数据。
// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var amountToTransfer = 500;
var collectionName = "account";
var session = db.getMongo().startSession({causalConsistency: false});
var accountCollInBankA = session.getDatabase("bankA")[collectionName];
var accountCollInBankB = session.getDatabase("bankB")[collectionName];
accountCollInBankA.drop();
accountCollInBankB.drop();
accountCollInBankA.insert({name: "Alice", balance: 1000});
accountCollInBankB.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
// add $500 to Bob's account
var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
session.commitTransaction();
accountCollInBankA.find(); // Alice holds $500 in bankA
accountCollInBankB.find(); // Bob holds $1500 in bankB
// *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var accountCollInBankA = session.getDatabase("bankA")[collectionName];
var accountCollInBankB = session.getDatabase("bankB")[collectionName];
accountCollInBankA.drop();
accountCollInBankB.drop();
accountCollInBankA.insert({name: "Alice", balance: 1000});
accountCollInBankB.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
// add $500 to Bob's account
var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
session.abortTransaction();
accountCollInBankA.find(); // Alice holds $1000 in bankA
accountCollInBankB.find(); // Bob holds $1000 in bankB
回调 API 的事务 API 示例
回调 API 仅可用于 4.2 及以上驱动程序。
- Javascript
-
以下代码演示如何配合 Javascript 使用 Amazon DocumentDB 事务 API。
// *** Transfer $500 from Alice to Bob inside a transaction: Success ***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert.eq(newAliceBalance, findAliceBalance);
// add $500 to Bob's account
var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
assert.eq(newBobBalance, findBobBalance);
session.commitTransaction();
accountColl.find();
- Node.js
-
以下代码演示如何配合 Node.js 使用 Amazon DocumentDB 事务 API。
// Node.js callback API:
const bankDB = await mongoclient.db("bank");
var accountColl = await bankDB.createCollection("account");
var amountToTransfer = 500;
const session = mongoclient.startSession({causalConsistency: false});
await accountColl.drop();
await accountColl.insertOne({name: "Alice", balance: 1000}, { session });
await accountColl.insertOne({name: "Bob", balance: 1000}, { session });
const transactionOptions = {
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' }
};
// deduct $500 from Alice's account
var aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
assert(aliceBalance.balance >= amountToTransfer);
var newAliceBalance = aliceBalance - amountToTransfer;
session.startTransaction(transactionOptions);
await accountColl.updateOne({name: "Alice"}, {$set: {balance: newAliceBalance}}, {session });
await session.commitTransaction();
aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
assert(newAliceBalance == aliceBalance.balance);
// add $500 to Bob's account
var bobBalance = await accountColl.findOne({name: "Bob"}, {session});
var newBobBalance = bobBalance.balance + amountToTransfer;
session.startTransaction(transactionOptions);
await accountColl.updateOne({name: "Bob"}, {$set: {balance: newBobBalance}}, {session });
await session.commitTransaction();
bobBalance = await accountColl.findOne({name: "Bob"}, {session});
assert(newBobBalance == bobBalance.balance);
- C#
-
以下代码演示如何配合 C# 使用 Amazon DocumentDB 事务 API。
// C# Callback API
var dbName = "bank";
var collName = "account";
var amountToTransfer = 500;
using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
{
var bankDB = client.GetDatabase(dbName);
var accountColl = bankDB.GetCollection<BsonDocument>(collName);
bankDB.DropCollection(collName);
accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
// start transaction
var transactionOptions = new TransactionOptions(
readConcern: ReadConcern.Snapshot,
writeConcern: WriteConcern.WMajority);
var result = session.WithTransaction(
(sess, cancellationtoken) =>
{
// deduct $500 from Alice's account
var aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice"),
Builders<BsonDocument>.Update.Set("balance", newAliceBalance));
aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance == newAliceBalance);
// add $500 from Bob's account
var bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob"),
Builders<BsonDocument>.Update.Set("balance", newBobBalance));
bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(bobBalance == newBobBalance);
return "Transaction committed";
}, transactionOptions);
// check values outside of transaction
var aliceNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
var bobNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceNewBalance == 500);
Debug.Assert(bobNewBalance == 1500);
}
- Ruby
-
以下代码演示如何配合 Ruby 使用 Amazon DocumentDB 事务 API。
// Ruby Callback API
dbName = "bank"
collName = "account"
amountToTransfer = 500
session = client.start_session(:causal_consistency=> false)
bankDB = Mongo::Database.new(client, dbName)
accountColl = bankDB[collName]
accountColl.drop()
accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
# start transaction
session.with_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) do
# deduct $500 from Alice's account
aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
assert aliceBalance >= amountToTransfer
newAliceBalance = aliceBalance - amountToTransfer
accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
aliceBalance = accountColl.find({"name"=>>"Alice"}, :session=> session).first['balance']
assert_equal(newAliceBalance, aliceBalance)
# add $500 from Bob's account
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
newBobBalance = bobBalance + amountToTransfer
accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
assert_equal(newBobBalance, bobBalance)
end
# check results outside of transaction
aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
assert_equal(aliceBalance, 500)
assert_equal(bobBalance, 1500)
session.end_session
- Go
-
以下代码演示如何配合 Go 使用 Amazon DocumentDB 事务 API。
// Go - Callback API
type Account struct {
Name string
Balance int
}
ctx := context.TODO()
dbName := "bank"
collName := "account"
amountToTransfer := 500
session, err := client.StartSession(options.Session().SetCausalConsistency(false))
assert.NilError(t, err)
defer session.EndSession(ctx)
bankDB := client.Database(dbName)
accountColl := bankDB.Collection(collName)
accountColl.Drop(ctx)
_, err = accountColl.InsertOne(ctx, bson.M{"name" : "Alice", "balance":1000})
_, err = accountColl.InsertOne(ctx, bson.M{"name" : "Bob", "balance":1000})
transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
_, err = session.WithTransaction(ctx, func(sessionCtx mongo.SessionContext) (interface{}, error) {
var result Account
// deduct $500 from Alice's account
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance := result.Balance
newAliceBalance := aliceBalance - amountToTransfer
_, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance = result.Balance
assert.Equal(t, aliceBalance, newAliceBalance)
// add $500 to Bob's account
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)
bobBalance := result.Balance
newBobBalance := bobBalance + amountToTransfer
_, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)
bobBalance = result.Balance
assert.Equal(t, bobBalance, newBobBalance)
if err != nil {
return nil, err
}
return "transaction committed", err
}, transactionOptions)
// check results outside of transaction
var result Account
err = accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result)
aliceNewBalance := result.Balance
err = accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)
bobNewBalance := result.Balance
assert.Equal(t, aliceNewBalance, 500)
assert.Equal(t, bobNewBalance, 1500)
- Java
-
以下代码演示如何配合 Java 使用 Amazon DocumentDB 事务 API。
// Java (sync) - Callback API
MongoDatabase bankDB = mongoClient.getDatabase("bank");
MongoCollection accountColl = bankDB.getCollection("account");
accountColl.drop();
int amountToTransfer = 500;
// add sample data
accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
TransactionOptions txnOptions = TransactionOptions.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build();
ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
clientSession.withTransaction(new TransactionBody<Void>() {
@Override
public Void execute() {
// deduct $500 from Alice's account
List<Document> documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int aliceBalance = (int) documentList.get(0).get("balance");
int newAliceBalance = aliceBalance - amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
// check Alice's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newAliceBalance);
// add $500 to Bob's account
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
int bobBalance = (int) documentList.get(0).get("balance");
int newBobBalance = bobBalance + amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
// check Bob's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newBobBalance);
return null;
}
}, txnOptions);
}
- C
-
以下代码演示如何配合 C 使用 Amazon DocumentDB 事务 API。
// Sample Code for C with Callback
#include <bson.h>
#include <mongoc.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
typedef struct {
int64_t balance;
bson_t *account;
bson_t *opts;
mongoc_collection_t *collection;
} ctx_t;
bool callback_session (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
bool r = true;
ctx_t *data = (ctx_t *) ctx;
bson_t local_reply;
bson_t *selector = data->account;
bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (data->balance), "}");
mongoc_collection_update_one (data->collection, selector, update, data->opts, &local_reply, error);
*reply = bson_copy (&local_reply);
bson_destroy (&local_reply);
bson_destroy (update);
return r;
}
void test_callback_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
bson_t reply;
bool r = true;
const bson_t *doc;
bson_iter_t iter;
ctx_t alice_ctx;
ctx_t bob_ctx;
bson_error_t error;
// find query
bson_t *alice_query = bson_new ();
BSON_APPEND_UTF8(alice_query, "name", "Alice");
bson_t *bob_query = bson_new ();
BSON_APPEND_UTF8(bob_query, "name", "Bob");
// create session
// set causal consistency to false
mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
mongoc_session_opts_set_causal_consistency (session_opts, false);
// start the session
mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
// add session to options
bson_t *opts = bson_new();
mongoc_client_session_append (client_session, opts, &error);
// deduct 500 from Alice
// find account balance of Alice
mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance >= amount_to_transfer);
int64_t new_alice_balance = alice_balance - amount_to_transfer;
// set variables which will be used by callback function
alice_ctx.collection = collection;
alice_ctx.opts = opts;
alice_ctx.balance = new_alice_balance;
alice_ctx.account = alice_query;
// callback
r = mongoc_client_session_with_transaction (client_session, &callback_session, NULL, &alice_ctx, &reply, &error);
assert(r);
// find account balance of Alice after transaction
cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance == new_alice_balance);
assert(alice_balance == 500);
// add 500 to bob's balance
// find account balance of Bob
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
int64_t new_bob_balance = bob_balance + amount_to_transfer;
bob_ctx.collection = collection;
bob_ctx.opts = opts;
bob_ctx.balance = new_bob_balance;
bob_ctx.account = bob_query;
// set read & write concern
mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
// callback
r = mongoc_client_session_with_transaction (client_session, &callback_session, txn_opts, &bob_ctx, &reply, &error);
assert(r);
// find account balance of Bob after transaction
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
bob_balance = (bson_iter_value (&iter))->value.v_int64;
assert(bob_balance == new_bob_balance);
assert(bob_balance == 1500);
// cleanup
bson_destroy(alice_query);
bson_destroy(bob_query);
mongoc_client_session_destroy(client_session);
bson_destroy(opts);
mongoc_transaction_opts_destroy(txn_opts);
mongoc_read_concern_destroy(read_concern);
mongoc_write_concern_destroy(write_concern);
mongoc_cursor_destroy(cursor);
bson_destroy(doc);
}
int main(int argc, char* argv[]) {
mongoc_init ();
mongoc_client_t* client = mongoc_client_new (<connection uri>);
bson_error_t error;
// connect to bank db
mongoc_database_t *database = mongoc_client_get_database (client, "bank");
// access account collection
mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
// set amount to transfer
int64_t amount_to_transfer = 500;
// delete the collection if already existing
mongoc_collection_drop(collection, &error);
// open Alice account
bson_t *alice_account = bson_new ();
BSON_APPEND_UTF8(alice_account, "name", "Alice");
BSON_APPEND_INT64(alice_account, "balance", 1000);
// open Bob account
bson_t *bob_account = bson_new ();
BSON_APPEND_UTF8(bob_account, "name", "Bob");
BSON_APPEND_INT64(bob_account, "balance", 1000);
bool r = true;
r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
test_callback_money_transfer(client, collection, amount_to_transfer);
}
- Python
-
以下代码演示如何配合 Python 使用 Amazon DocumentDB 事务 API。
// Sample Python code with callback api
import pymongo
def callback(session, balance, query):
collection.update_one(query, {'$set': {"balance": balance}}, session=session)
client = pymongo.MongoClient(<connection uri>)
rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
wc_majority = pymongo.write_concern.WriteConcern('majority')
# To start, drop and create an account collection and insert balances for both Alice and Bob
collection = client.get_database("bank").get_collection("account")
collection.drop()
collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
amount_to_transfer = 500
# deduct 500 from Alice's account
alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert alice_balance >= amount_to_transfer
new_alice_balance = alice_balance - amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.with_transaction(lambda s: callback(s, new_alice_balance, {"name": "Alice"}), read_concern=rc_snapshot, write_concern=wc_majority)
updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert updated_alice_balance == new_alice_balance
# add 500 to Bob's account
bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert bob_balance >= amount_to_transfer
new_bob_balance = bob_balance + amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.with_transaction(lambda s: callback(s, new_bob_balance, {"name": "Bob"}), read_concern=rc_snapshot, write_concern=wc_majority)
updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert updated_bob_balance == new_bob_balance
核心 API 的事务 API 示例
- Javascript
-
以下代码演示如何配合 Javascript 使用 Amazon DocumentDB 事务 API。
// *** Transfer $500 from Alice to Bob inside a transaction: Success ***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert.eq(newAliceBalance, findAliceBalance);
// add $500 to Bob's account
var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
assert.eq(newBobBalance, findBobBalance);
session.commitTransaction();
accountColl.find();
- C#
-
以下代码演示如何配合 C# 使用 Amazon DocumentDB 事务 API。
// C# Core API
public void TransferMoneyWithRetry(IMongoCollection<bSondocument> accountColl, IClientSessionHandle session)
{
var amountToTransfer = 500;
// start transaction
var transactionOptions = new TransactionOptions(
readConcern: ReadConcern.Snapshot,
writeConcern: WriteConcern.WMajority);
session.StartTransaction(transactionOptions);
try
{
// deduct $500 from Alice's account
var aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Alice"),
Builders<bSondocument>.Update.Set("balance", newAliceBalance));
aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance == newAliceBalance);
// add $500 from Bob's account
var bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Bob"),
Builders<bSondocument>.Update.Set("balance", newBobBalance));
bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(bobBalance == newBobBalance);
}
catch (Exception e)
{
session.AbortTransaction();
throw;
}
session.CommitTransaction();
}
}
public void DoTransactionWithRetry(MongoClient client)
{
var dbName = "bank";
var collName = "account";
using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
{
try
{
var bankDB = client.GetDatabase(dbName);
var accountColl = bankDB.GetCollection<bSondocument>(collName);
bankDB.DropCollection(collName);
accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
while(true) {
try
{
TransferMoneyWithRetry(accountColl, session);
break;
}
catch (MongoException e)
{
if(e.HasErrorLabel("TransientTransactionError"))
{
continue;
}
else
{
throw;
}
}
}
// check values outside of transaction
var aliceNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
var bobNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceNewBalance == 500);
Debug.Assert(bobNewBalance == 1500);
}
catch (Exception e)
{
Console.WriteLine("Error running transaction: " + e.Message);
}
}
}
- Ruby
-
以下代码演示如何配合 Ruby 使用 Amazon DocumentDB 事务 API。
# Ruby Core API
def transfer_money_w_retry(session, accountColl)
amountToTransfer = 500
session.start_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority})
# deduct $500 from Alice's account
aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
assert aliceBalance >= amountToTransfer
newAliceBalance = aliceBalance - amountToTransfer
accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
assert_equal(newAliceBalance, aliceBalance)
# add $500 to Bob's account
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
newBobBalance = bobBalance + amountToTransfer
accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
assert_equal(newBobBalance, bobBalance)
session.commit_transaction
end
def do_txn_w_retry(client)
dbName = "bank"
collName = "account"
session = client.start_session(:causal_consistency=> false)
bankDB = Mongo::Database.new(client, dbName)
accountColl = bankDB[collName]
accountColl.drop()
accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
begin
transferMoneyWithRetry(session, accountColl)
puts "transaction committed"
rescue Mongo::Error => e
if e.label?('TransientTransactionError')
retry
else
puts "transaction failed"
raise
end
end
# check results outside of transaction
aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
assert_equal(aliceBalance, 500)
assert_equal(bobBalance, 1500)
end
- Go
-
以下代码演示如何配合 Go 使用 Amazon DocumentDB 事务 API。
// Go - Core API
type Account struct {
Name string
Balance int
}
func transferMoneyWithRetry(sessionContext mongo.SessionContext, accountColl *mongo.Collection, t *testing.T) error {
amountToTransfer := 500
transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
if err := sessionContext.StartTransaction(transactionOptions); err != nil {
panic(err)
}
var result Account
// deduct $500 from Alice's account
err := accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance := result.Balance
newAliceBalance := aliceBalance - amountToTransfer
_, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})
if err != nil {
sessionContext.AbortTransaction(sessionContext)
}
err = accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance = result.Balance
assert.Equal(t, aliceBalance, newAliceBalance)
// add $500 to Bob's account
err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)
bobBalance := result.Balance
newBobBalance := bobBalance + amountToTransfer
_, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})
if err != nil {
sessionContext.AbortTransaction(sessionContext)
}
err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)
bobBalance = result.Balance
assert.Equal(t, bobBalance, newBobBalance)
err = sessionContext.CommitTransaction(sessionContext)
return err
}
func doTransactionWithRetry(t *testing.T) {
ctx := context.TODO()
dbName := "bank"
collName := "account"
bankDB := client.Database(dbName)
accountColl := bankDB.Collection(collName)
client.UseSessionWithOptions(ctx, options.Session().SetCausalConsistency(false), func(sessionContext mongo.SessionContext) error {
accountColl.Drop(ctx)
accountColl.InsertOne(sessionContext, bson.M{"name" : "Alice", "balance":1000})
accountColl.InsertOne(sessionContext, bson.M{"name" : "Bob", "balance":1000})
for {
err := transferMoneyWithRetry(sessionContext, accountColl, t)
if err == nil {
println("transaction committed")
return nil
}
if mongoErr := err.(mongo.CommandError); mongoErr.HasErrorLabel("TransientTransactionError") {
continue
}
println("transaction failed")
return err
}
})
// check results outside of transaction
var result Account
accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance := result.Balance
assert.Equal(t, aliceBalance, 500)
accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)
bobBalance := result.Balance
assert.Equal(t, bobBalance, 1500)
}
- Java
-
以下代码演示如何配合 Java 使用 Amazon DocumentDB 事务 API。
// Java (sync) - Core API
public void transferMoneyWithRetry() {
// connect to server
MongoClientURI mongoURI = new MongoClientURI(uri);
MongoClient mongoClient = new MongoClient(mongoURI);
MongoDatabase bankDB = mongoClient.getDatabase("bank");
MongoCollection accountColl = bankDB.getCollection("account");
accountColl.drop();
// insert some sample data
accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
while (true) {
try {
doTransferMoneyWithRetry(accountColl, mongoClient);
break;
} catch (MongoException e) {
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
continue;
} else {
throw e;
}
}
}
}
public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {
int amountToTransfer = 500;
TransactionOptions txnOptions = TransactionOptions.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build();
ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
clientSession.startTransaction(txnOptions);
// deduct $500 from Alice's account
List<Document> documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int aliceBalance = (int) documentList.get(0).get("balance");
Assert.assertTrue(aliceBalance >= amountToTransfer);
int newAliceBalance = aliceBalance - amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
// check Alice's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newAliceBalance);
// add $500 to Bob's account
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
int bobBalance = (int) documentList.get(0).get("balance");
int newBobBalance = bobBalance + amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
// check Bob's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newBobBalance);
// commit transaction
clientSession.commitTransaction();
}
}
// Java (async) -- Core API
public void transferMoneyWithRetry() {
// connect to the server
MongoClient mongoClient = MongoClients.create(uri);
MongoDatabase bankDB = mongoClient.getDatabase("bank");
MongoCollection accountColl = bankDB.getCollection("account");
SubscriberLatchWrapper<Void> dropCallback = new SubscriberLatchWrapper<>();
mongoClient.getDatabase("bank").drop().subscribe(dropCallback);
dropCallback.await();
// insert some sample data
SubscriberLatchWrapper<InsertOneResult> insertionCallback = new SubscriberLatchWrapper<>();
accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)).subscribe(insertionCallback);
insertionCallback.await();
insertionCallback = new SubscriberLatchWrapper<>();
accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)).subscribe(insertionCallback);;
insertionCallback.await();
while (true) {
try {
doTransferMoneyWithRetry(accountColl, mongoClient);
break;
} catch (MongoException e) {
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
continue;
} else {
throw e;
}
}
}
}
public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {
int amountToTransfer = 500;
// start the transaction
TransactionOptions txnOptions = TransactionOptions.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build();
ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
SubscriberLatchWrapper<ClientSession> sessionCallback = new SubscriberLatchWrapper<>();
mongoClient.startSession(sessionOptions).subscribe(sessionCallback);
ClientSession session = sessionCallback.get().get(0);
session.startTransaction(txnOptions);
// deduct $500 from Alice's account
SubscriberLatchWrapper<Document> findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);
Document documentFound = findCallback.get().get(0);
int aliceBalance = (int) documentFound.get("balance");
int newAliceBalance = aliceBalance - amountToTransfer;
SubscriberLatchWrapper<UpdateResult> updateCallback = new SubscriberLatchWrapper<>();
accountColl.updateOne(session, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))).subscribe(updateCallback);
updateCallback.await();
// check Alice's new balance
findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);
documentFound = findCallback.get().get(0);
int updatedBalance = (int) documentFound.get("balance");
Assert.assertEquals(updatedBalance, newAliceBalance);
// add $500 to Bob's account
findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);
documentFound = findCallback.get().get(0);
int bobBalance = (int) documentFound.get("balance");
int newBobBalance = bobBalance + amountToTransfer;
updateCallback = new SubscriberLatchWrapper<>();
accountColl.updateOne(session, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))).subscribe(updateCallback);
updateCallback.await();
// check Bob's new balance
findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);
documentFound = findCallback.get().get(0);
updatedBalance = (int) documentFound.get("balance");
Assert.assertEquals(updatedBalance, newBobBalance);
// commit the transaction
SubscriberLatchWrapper<Void> transactionCallback = new SubscriberLatchWrapper<>();
session.commitTransaction().subscribe(transactionCallback);
transactionCallback.await();
}
public class SubscriberLatchWrapper<T> implements Subscriber<T> {
/**
* A Subscriber that stores the publishers results and provides a latch so can block on completion.
*
* @param <T> The publishers result type
*/
private final List<T> received;
private final List<RuntimeException> errors;
private final CountDownLatch latch;
private volatile Subscription subscription;
private volatile boolean completed;
/**
* Construct an instance
*/
public SubscriberLatchWrapper() {
this.received = new ArrayList<>();
this.errors = new ArrayList<>();
this.latch = new CountDownLatch(1);
}
@Override
public void onSubscribe(final Subscription s) {
subscription = s;
subscription.request(Integer.MAX_VALUE);
}
@Override
public void onNext(final T t) {
received.add(t);
}
@Override
public void onError(final Throwable t) {
if (t instanceof RuntimeException) {
errors.add((RuntimeException) t);
} else {
errors.add(new RuntimeException("Unexpected exception", t));
}
onComplete();
}
@Override
public void onComplete() {
completed = true;
subscription.cancel();
latch.countDown();
}
/**
* Get received elements
*
* @return the list of received elements
*/
public List<T> getReceived() {
return received;
}
/**
* Get received elements.
*
* @return the list of receive elements
*/
public List<T> get() {
return await().getReceived();
}
/**
* Await completion or error
*
* @return this
*/
public SubscriberLatchWrapper<T> await() {
subscription.request(Integer.MAX_VALUE);
try {
if (!latch.await(300, TimeUnit.SECONDS)) {
throw new MongoTimeoutException("Publisher onComplete timed out for 300 seconds");
}
} catch (InterruptedException e) {
throw new MongoInterruptedException("Interrupted waiting for observeration", e);
}
if (!errors.isEmpty()) {
throw errors.get(0);
}
return this;
}
public boolean getCompleted() {
return this.completed;
}
public void close() {
subscription.cancel();
received.clear();
}
}
- C
-
以下代码演示如何配合 C 使用 Amazon DocumentDB 事务 API。
// Sample C code with core session
bool core_session(mongoc_client_session_t *client_session, mongoc_collection_t* collection, bson_t *selector, int64_t balance){
bool r = true;
bson_error_t error;
bson_t *opts = bson_new();
bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (balance), "}");
// set read & write concern
mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
mongoc_client_session_start_transaction (client_session, txn_opts, &error);
mongoc_client_session_append (client_session, opts, &error);
r = mongoc_collection_update_one (collection, selector, update, opts, NULL, &error);
mongoc_client_session_commit_transaction (client_session, NULL, &error);
bson_destroy (opts);
mongoc_transaction_opts_destroy(txn_opts);
mongoc_read_concern_destroy(read_concern);
mongoc_write_concern_destroy(write_concern);
bson_destroy (update);
return r;
}
void test_core_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
bson_t reply;
bool r = true;
const bson_t *doc;
bson_iter_t iter;
bson_error_t error;
// find query
bson_t *alice_query = bson_new ();
BSON_APPEND_UTF8(alice_query, "name", "Alice");
bson_t *bob_query = bson_new ();
BSON_APPEND_UTF8(bob_query, "name", "Bob");
// create session
// set causal consistency to false
mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
mongoc_session_opts_set_causal_consistency (session_opts, false);
// start the session
mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
// add session to options
bson_t *opts = bson_new();
mongoc_client_session_append (client_session, opts, &error);
// deduct 500 from Alice
// find account balance of Alice
mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance >= amount_to_transfer);
int64_t new_alice_balance = alice_balance - amount_to_transfer;
// core
r = core_session (client_session, collection, alice_query, new_alice_balance);
assert(r);
// find account balance of Alice after transaction
cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance == new_alice_balance);
assert(alice_balance == 500);
// add 500 to Bob's balance
// find account balance of Bob
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
int64_t new_bob_balance = bob_balance + amount_to_transfer;
//core
r = core_session (client_session, collection, bob_query, new_bob_balance);
assert(r);
// find account balance of Bob after transaction
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
bob_balance = (bson_iter_value (&iter))->value.v_int64;
assert(bob_balance == new_bob_balance);
assert(bob_balance == 1500);
// cleanup
bson_destroy(alice_query);
bson_destroy(bob_query);
mongoc_client_session_destroy(client_session);
bson_destroy(opts);
mongoc_cursor_destroy(cursor);
bson_destroy(doc);
}
int main(int argc, char* argv[]) {
mongoc_init ();
mongoc_client_t* client = mongoc_client_new (<connection uri>);
bson_error_t error;
// connect to bank db
mongoc_database_t *database = mongoc_client_get_database (client, "bank");
// access account collection
mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
// set amount to transfer
int64_t amount_to_transfer = 500;
// delete the collection if already existing
mongoc_collection_drop(collection, &error);
// open Alice account
bson_t *alice_account = bson_new ();
BSON_APPEND_UTF8(alice_account, "name", "Alice");
BSON_APPEND_INT64(alice_account, "balance", 1000);
// open Bob account
bson_t *bob_account = bson_new ();
BSON_APPEND_UTF8(bob_account, "name", "Bob");
BSON_APPEND_INT64(bob_account, "balance", 1000);
bool r = true;
r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
test_core_money_transfer(client, collection, amount_to_transfer);
}
- Scala
-
以下代码演示如何配合 Scala 使用 Amazon DocumentDB 事务 API。
// Scala Core API
def transferMoneyWithRetry(sessionObservable: SingleObservable[ClientSession] , database: MongoDatabase ): Unit = {
val accountColl = database.getCollection("account")
var amountToTransfer = 500
var transactionObservable: Observable[ClientSession] = sessionObservable.map(clientSession => {
clientSession.startTransaction()
// deduct $500 from Alice's account
var aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
assert(aliceBalance >= amountToTransfer)
var newAliceBalance = aliceBalance - amountToTransfer
accountColl.updateOne(clientSession, Document("name" -> "Alice"), Document("$set" -> Document("balance" -> newAliceBalance))).await()
aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
assert(aliceBalance == newAliceBalance)
// add $500 to Bob's account
var bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
var newBobBalance = bobBalance + amountToTransfer
accountColl.updateOne(clientSession, Document("name" -> "Bob"), Document("$set" -> Document("balance" -> newBobBalance))).await()
bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
assert(bobBalance == newBobBalance)
clientSession
})
transactionObservable.flatMap(clientSession => clientSession.commitTransaction()).await()
}
def doTransactionWithRetry(): Unit = {
val client: MongoClient = MongoClientWrapper.getMongoClient()
val database: MongoDatabase = client.getDatabase("bank")
val accountColl = database.getCollection("account")
accountColl.drop().await()
val sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build()
var sessionObservable: SingleObservable[ClientSession] = client.startSession(sessionOptions)
accountColl.insertOne(Document("name" -> "Alice", "balance" -> 1000)).await()
accountColl.insertOne(Document("name" -> "Bob", "balance" -> 1000)).await()
var retry = true
while (retry) {
try {
transferMoneyWithRetry(sessionObservable, database)
println("transaction committed")
retry = false
}
catch {
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("retrying transaction")
}
case other: Throwable => {
println("transaction failed")
retry = false
throw other
}
}
}
// check results outside of transaction
assert(accountColl.find(Document("name" -> "Alice")).results().head.getInteger("balance") == 500)
assert(accountColl.find(Document("name" -> "Bob")).results().head.getInteger("balance") == 1500)
accountColl.drop().await()
}
- Python
-
以下代码演示如何配合 Python 使用 Amazon DocumentDB 事务 API。
// Sample Python code with Core api
import pymongo
client = pymongo.MongoClient(<connection_string>)
rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
wc_majority = pymongo.write_concern.WriteConcern('majority')
# To start, drop and create an account collection and insert balances for both Alice and Bob
collection = client.get_database("bank").get_collection("account")
collection.drop()
collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
amount_to_transfer = 500
# deduct 500 from Alice's account
alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert alice_balance >= amount_to_transfer
new_alice_balance = alice_balance - amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
collection.update_one({"name": "Alice"}, {'$set': {"balance": new_alice_balance}}, session=session)
session.commit_transaction()
updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert updated_alice_balance == new_alice_balance
# add 500 to Bob's account
bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert bob_balance >= amount_to_transfer
new_bob_balance = bob_balance + amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
collection.update_one({"name": "Bob"}, {'$set': {"balance": new_bob_balance}}, session=session)
session.commit_transaction()
updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert updated_bob_balance == new_bob_balance
支持的 命令
| 命令 |
支持 |
|
abortTransaction
|
支持
|
|
commitTransaction
|
是
|
|
endSessions
|
是
|
|
killSession
|
是
|
|
killAllSession
|
是
|
|
killAllSessionsByPattern
|
否
|
|
refreshSessions
|
否
|
|
startSession
|
是
|
不支持的功能
| 方法 |
阶段或命令 |
|
db.collection.aggregate()
|
$collStats
$currentOp
$indexStats
$listSessions
$out
|
|
db.collection.count()
db.collection.countDocuments()
|
$where
$near
$nearSphere
|
|
db.collection.insert()
|
如果 insert 不针对现有集合运行,则得不到支持。如果这种方法以预存在集合为目标,则得到支持。
|
会话
MongoDB 会话是用于支持可重试写入、因果一致性、事务及管理跨数据库操作的框架。创建会话时,逻辑会话标识符 (lsid) 由客户端生成,并且向服务器发送命令时,用于标记该会话内部的所有操作。
Amazon DocumentDB 支持使用会话启用事务,但不支持因果一致性或可重试写入。
在 Amazon DocumentDB 内部使用事务时,事务将使用 session.startTransaction() API 从会话内部启动,并且一个会话一次支持单一事务。同样,使用提交 (session.commitTransaction()) 或abort (session.abortTransaction()) APIs 来完成事务。
因果一致性
因果一致性保证,在单个客户端会话中,客户端将观察到 read-after-write一致性,单原子读取/写入将在读取之后执行,这些保证适用于集群中的所有实例,而不仅仅是主实例。Amazon DocumentDB 不支持因果一致性,以下语句将导致错误。
var mySession = db.getMongo().startSession();
var mySessionObject = mySession.getDatabase('test').getCollection('account');
mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
//Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
mySessionObject.find()
//Error: error: {
// "ok" : 0,
// "code" : 303,
// "errmsg" : "Feature not supported: 'causal consistency'",
// "operationTime" : Timestamp(1603461817, 493214)
//}
mySession.endSession()
您可以在会话内部禁用因果一致性。请注意,这样做将使您能够使用会话框架,但将不对读取提供因果一致性保证。使用 Amazon DocumentDB 时,从主实例中读取的内容将 read-after-write保持一致,而从副本实例中读取的内容最终将保持一致。事务是利用会话的主要用例。
var mySession = db.getMongo().startSession({causalConsistency: false});
var mySessionObject = mySession.getDatabase('test').getCollection('account');
mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
//Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
mySessionObject.find()
//{ "_id" : 1, "name" : "Bob", "balance" : 100 }
//{ "_id" : 2, "name" : "Alice", "balance" : 1700 }
可重试写入
可重试写入是一种功能,当发生网络错误或客户端无法找到主实例时,客户端将尝试重试写入操作一次。Amazon DocumentDB 不支持且必须禁用可重试写入。您可以用连接字符串中的命令 (retryWrites=false) 禁用它。
如果您使用旧版 mongo Shell(而非 mongosh),不要在任何代码字符串中包含 retryWrites=false 命令。默认情况下,禁用可重试写入。包含 retryWrites=false 可能导致正常读取命令失败。
事务错误
使用交易时,有些情况可能会产生错误,指出交易编号与任何正在进行的交易都不匹配。
该错误可能在至少两种不同场景下生成:
处理这种错误的最佳方法是使事务性更新幂等——例如,使用 $set 变异器而非递增/递减操作。请参阅下面的:
{ "ok" : 0,
"operationTime" : Timestamp(1603938167, 1),
"code" : 251,
"errmsg" : "Given transaction number 1 does not match any in-progress transactions."
}