使用 Java 在亚马逊文档数据库中执行 CRUD 操作 - Amazon DocumentDB

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Java 在亚马逊文档数据库中执行 CRUD 操作

本节讨论如何使用 MongoDB Java 驱动程序在亚马逊文档数据库中执行 CRUD(创建、读取、更新、删除)操作。

在 DocumentDB 集合中创建和插入文档

将文档插入到 Amazon DocumentDB 可以让您在馆藏中添加新数据。根据您的需求和正在处理的数据量,有几种方法可以执行插入。将单个文档插入集合的最基本方法是insertOne()。要一次插入多个文档,可以使用该insertMany()方法,该方法允许您在单个操作中添加文档数组。在 DocumentDB 集合中插入许多文档的另一种方法是。bulkWrite()在本指南中,我们将讨论所有这些在 DocumentDB 集合中创建文档的方法。

insertOne()

让我们首先研究一下如何将单个文档插入到亚马逊 DocumentDBB 集合中。插入单个文档是通过使用insertOne()方法完成的。此方法需要插入并返回一个InsertOneResult对象,该对象可用于获取新插入文档的对象 ID。BsonDocument下面的示例代码显示了在集合中插入一个餐厅文档的情况:

Document article = new Document() .append("restaurantId", "REST-21G145") .append("name", "Future-proofed Intelligent Bronze Hat") .append("cuisine", "International") .append("rating", new Document() .append("average", 1.8) .append("totalReviews", 267)) .append("features", Arrays.asList("Outdoor Seating", "Live Music")); try { InsertOneResult result = collection.insertOne(article); System.out.println("Inserted document with the following id: " + result.getInsertedId()); } catch (MongoWriteException e) { // Handle duplicate key or other write errors System.err.println("Failed to insert document: " + e.getMessage()); throw e; } catch (MongoException e) { // Handle other MongoDB errors System.err.println("MongoDB error: " + e.getMessage()); throw e; }

使用时insertOne(),请确保包括适当的错误处理。例如,在上面的代码中,“restaurantId” 具有唯一的索引,因此再次运行此代码将引发以下结果MongoWriteException

Failed to insert document: Write operation error on server docdbCluster.docdb.amazonaws.com:27017. Write error: WriteError{code=11000, message='E11000 duplicate key error collection: Restaurants index: restaurantId_1', details={}}.

InsertMany ()

用于将许多文档插入集合的主要方法是 insertMany () 和。bulkWrite()

insertMany()方法是在单个操作中插入多个文档的最简单方法。它接受文档列表并将其插入到集合中。当您插入一批相互独立且不需要任何特殊处理或混合操作的新文档时,此方法非常理想。以下代码显示了从文件中读取 JSON 文档并将其插入到集合中的过程。该insertMany()函数返回一个可用于获取所有插入文档 IDs 的InsertManyResultInsertManyResult对象。

// Read JSON file content String content = new String(Files.readAllBytes(Paths.get(jsonFileName))); JSONArray jsonArray = new JSONArray(content); // Convert JSON articles to Documents List < Document > restaurants = new ArrayList < > (); for (int i = 0; i < jsonArray.length(); i++) { JSONObject jsonObject = jsonArray.getJSONObject(i); Document doc = Document.parse(jsonObject.toString()); restaurants.add(doc); } //insert documents in collection InsertManyResult result = collection.insertMany(restaurants); System.out.println("Count of inserted documents: " + result.getInsertedIds().size());

bulkWrite()

bulkWrite()方法允许在单个批次中执行多个写入操作(插入、更新、删除)。bulkWrite()当您需要在单个批次中执行不同类型的操作时,可以使用,例如在更新其他文档的同时插入一些文档。 bulkWrite()支持两种类型的批量写入,有序写入和无序写入:

  • 有@@ 序操作 —(默认)Amazon DocumentDB 按顺序处理写入操作,并在遇到的第一个错误时停止。当操作顺序很重要时,例如当以后的操作依赖于较早的操作时,这很有用。但是,有序操作通常比无序操作慢。对于有序操作,您必须解决批处理在第一个错误时停止的情况,这可能会使某些操作无法处理。

  • 无序操作 — 允许 Amazon DocumentDB 将插入作为数据库中的单一执行来处理。如果一个文档出现错误,则继续对其余文档进行操作。这在插入大量数据时特别有用,并且可以容忍某些故障,例如在数据迁移或批量导入期间,某些文档可能会因为密钥重复而失败。对于无序操作,您必须解决部分成功的情况,即某些操作成功而另一些操作失败。

使用该bulkWrite()方法时,需要一些基本的类。首先,该WriteModel类充当所有写入操作的基类,并具有特定的实现 InsertOneModel,例如UpdateOneModelUpdateManyModelDeleteOneModel、、和DeleteManyModel处理不同类型的操作。

BulkWriteOptions类是配置批量操作行为所必需的,例如设置 ordered/unordered 执行或绕过文档验证。该BulkWriteResult类提供有关执行结果的详细信息,包括插入、更新和删除的文档的数量。

对于错误处理,该MongoBulkWriteException类至关重要,因为它包含有关批量操作期间任何失败的信息,而该BulkWriteError类则提供有关单个操作失败的具体详细信息。以下代码显示了一个在执行单个bulkWrite()方法调用的过程中插入文档列表以及更新和删除单个文档的示例。该代码还显示了如何使用BulkWriteOptionsBulkWriteResult,以及如何正确处理bulkWrite()操作。

List < WriteModel < Document >> bulkOperations = new ArrayList < > (); // get list of 10 documents representing 10 restaurants List < Document > restaurantsToInsert = getSampleData(); for (Document doc: restaurantsToInsert) { bulkOperations.add(new InsertOneModel < > (doc)); } // Update operation bulkOperations.add(new UpdateOneModel < > ( new Document("restaurantId", "REST-Y2E9H5"), new Document("", new Document("stats.likes", 20)) .append("", new Document("rating.average", 4.5)))); // Delete operation bulkOperations.add(new DeleteOneModel < > (new Document("restaurantId", "REST-D2L431"))); // Perform bulkWrite operation try { BulkWriteOptions options = new BulkWriteOptions() .ordered(false); // Allow unordered inserts BulkWriteResult result = collection.bulkWrite(bulkOperations, options); System.out.println("Inserted: " + result.getInsertedCount()); System.out.println("Updated: " + result.getModifiedCount()); System.out.println("Deleted: " + result.getDeletedCount()); } catch (MongoBulkWriteException e) { System.err.println("Bulk write error occurred: " + e.getMessage()); // Log individual write errors for (BulkWriteError error: e.getWriteErrors()) { System.err.printf("Error at index %d: %s (Code: %d)%n", error.getIndex(), error.getMessage(), error.getCode()); // Log the problematic document Document errorDoc = new Document(error.getDetails()); if (errorDoc != null) { System.err.println("Problematic document: " + errorDoc); } } } catch (Exception e) { System.err.println("Error during bulkWrite: " + e.getMessage()); }

可重试写入

与 MongoDB 不同,亚马逊 DocumentDB 不支持可重试写入。因此,您必须在他们的应用程序中实现自定义的重试逻辑,特别是为了处理网络问题或临时服务不可用。通常,实施良好的重试策略包括增加重试尝试之间的延迟和限制重试总数。有关使用错误处理构建重试逻辑的代码示例,请参阅使用重试逻辑处理错误下文。

从 DocumentDB 集合中读取和检索数据

在 Amazon DocumentDB 中查询文档围绕几个关键组件展开,这些组件允许您精确检索和操作数据。该find()方法是 MongoDB Jav APIs a 驱动程序中的基本查询方法。它允许通过多种筛选、排序和投影结果选项进行复杂的数据检索。除了find()方法之外,FiltersFindIterable还有另外两个基本组件,它们为 MongoDB Java 驱动程序中的查询操作提供了构建块。

Filters类是 MongoDB Java 驱动程序中的一个实用类,它为构造查询过滤器提供了流畅的 API。该类提供了静态工厂方法,用于创建表示各种查询条件的Bson对象的实例。最常用的方法包括eq()相等比较、gt()lt()gte()、和lte()用于数值比较、and()组合多个条件、in()nin()用于数组成员资格测试和regex()模式匹配。or()该类被设计为类型安全,与基于原始文档的查询相比,它提供了更好的编译时检查,使其成为在 Java 应用程序中构建 DocumentDB 查询的首选方法。错误处理非常强大,对于无效的过滤器构造,会抛出明显的异常。

FindIterable是专为处理该find()方法的结果而设计的专用接口。它提供了一组丰富的方法来完善和控制查询的执行,为方法链接提供了流畅的 API。该接口包括基本的查询修改方法,例如limit()用于限制返回的文档数量、skip()分页、sort()排序结果、projection()选择特定字段和hint()索引选择。中的批处理、跳过和限制操作FindIterable是必不可少的分页和数据管理工具,可帮助控制如何从数据库中检索和处理文档。

batching (batchSize) 控制单次网络往返中向客户端返回文档数据库的文档数量。设置批量大小时,DocumentDB 不会一次返回所有匹配的文档,而是按指定批量大小成组返回它们。

Skip 允许你偏移结果的起点,本质上是让 DocumentDB 在开始返回匹配项之前跳过指定数量的文档。例如,skip(20)将绕过前 20 个匹配的文档。这通常用于要检索后续页面结果的分页场景。

Limit 限制了查询中可以返回的文档总数。当您指定时limit(n),即使数据库中有更多匹配项,DocumentDB 也将在返回 “n” 个文档后停止返回文档。

FindIterable从 Amazon DocumentDB 检索文档时支持迭代器和光标模式。使用FindIterable作为迭代器的好处是,它允许延迟加载文档,并且仅在应用程序请求时才获取文档。使用 iterator 的另一个好处是,您无需负责维护与集群的连接,因此无需明确关闭连接。

FindIterable还提供支持 MongoCursor,允许在处理 Amazon DocumentDB 查询时使用光标模式。 MongoCursor是特定于 MongoDB Java 驱动程序的实现,用于控制数据库操作和资源管理。它实现了AutoCloseable接口,允许通过 try-with-resources块进行明确的资源管理,这对于正确关闭数据库连接和释放服务器资源至关重要。默认情况下,光标会在 10 分钟后超时,DocumentDB 不允许您选择更改此超时行为。处理批处理数据时,请确保在光标超时之前检索下一批数据。使用时的一个关键考虑因素MongoCursor是,它需要明确关闭以防止资源泄漏。

本节介绍了find()Filters和的几个示例FindIterable

以下代码示例显示了find()如何使用其 “RestauranTid” 字段检索单个文档:

Document filter = new Document("restaurantId", "REST-21G145"); Document result = collection.find(filter).first();

尽管使用Filters可以更好地检查编译时的错误,但 java 驱动程序也允许您直接在find()方法中指定Bson过滤器。以下示例代码将Bson文档传递给find()

result = collection.find(new Document("$and", Arrays.asList( new Document("rating.totalReviews", new Document("$gt", 1000)), new Document("priceRange", "$$"))))

下一个示例代码显示了使用该Filters类的几个示例find()

FindIterable < Document > results; // Exact match results = collection.find(Filters.eq("name", "Thai Curry Palace")); // Not equal results = collection.find(Filters.ne("cuisine", "Thai")); // find an element in an array results = collection.find(Filters.in("features", Arrays.asList("Private Dining"))); // Greater than results = collection.find(Filters.gt("rating.average", 3.5)); // Between (inclusive) results = collection.find(Filters.and( Filters.gte("rating.totalReviews", 100), Filters.lte("rating.totalReviews", 200))); // AND results = collection.find(Filters.and( Filters.eq("cuisine", "Thai"), Filters.gt("rating.average", 4.5))); // OR results = collection.find(Filters.or( Filters.eq("cuisine", "Thai"), Filters.eq("cuisine", "American"))); // All document where the Field exists results = collection.find(Filters.exists("michelin")); // Regex results = collection.find(Filters.regex("name", Pattern.compile("Curry", Pattern.CASE_INSENSITIVE))); // Find all document where the array contain the list of value regardless of its order results = collection.find(Filters.all("features", Arrays.asList("Private Dining", "Parking"))); // Array size results = collection.find(Filters.size("features", 4));

以下示例说明如何对FindIterable对象进行链式操作sort()skip()limit()、、和batchSize()操作。这些操作的提供顺序将影响查询的性能。作为最佳实践,这些操作的顺序应为sort()projection()skip()limit()batchSize()

FindIterable < Document > results = collection.find(Filters.gt("rating.totalReviews", 1000)) // Sorting .sort(Sorts.orderBy( Sorts.descending("address.city"), Sorts.ascending("cuisine"))) // Field selection .projection(Projections.fields( Projections.include("name", "cuisine", "priceRange"), Projections.excludeId())) // Pagination .skip(20) .limit(10) .batchSize(2);

以下示例代码显示了在上创建迭代器。FindIterable它使用 Java 的forEach构造遍历结果集。

collection.find(Filters.eq("cuisine", "American")).forEach(doc -> System.out.println(doc.toJson()));

在最后一个find()代码示例中,它显示了如何cursor()用于文档检索。它在 try 块中创建游标,这样可以确保在代码退出 try 块时光标会关闭。

try (MongoCursor < Document > cursor = collection.find(Filters.eq("cuisine", "American")) .batchSize(25) .cursor()) { while (cursor.hasNext()) { Document doc = cursor.next(); System.out.println(doc.toJson()); } } // Cursor automatically closed

更新 DocumentDB 集合中的现有文档

Amazon DocumentDB 提供了灵活而强大的机制,用于修改现有文档并在不存在时插入新文档。MongoDB Java 驱动程序提供了多种更新方法updateOne():用于单个文档更新updateMany()、用于多个文档更新replaceOne()以及用于完整文档替换。除了这三种方法之外,UpdatesUpdateOptions、和UpdateResult还有其他基本组件,它们为 MongoDB Java 驱动程序中的更新操作提供了构建块。

MongoDB Java 驱动程序中的Updates类是一个实用类,它提供了用于创建更新运算符的静态工厂方法。它充当以类型安全和可读的方式构造更新操作的主要构建器。诸如set()unset()、和之类的基本方法inc()允许直接修改文档。当使用允许以原子方式执行多个更新操作的Updates.combine()方法组合多个操作时,该类的力量变得显而易见,从而确保数据一致性。

UpdateOptions是 MongoDB 的 Java 驱动程序中的一个功能强大的配置类,它为文档更新操作提供了基本的自定义功能。该类的两个重要方面是为更新操作提供 upsert 和数组过滤器支持。通过upsert(true)启用的 upsert 功能允许在更新操作期间未找到匹配文档时创建新文档。通过arrayFilters(),更新操作可以精确更新满足特定条件的数组元素。

UpdateResult在 MongoDB 的 Java 驱动程序中,提供了详细说明更新操作结果的反馈机制。该类封装了三个关键指标:与更新条件匹配的文档数量 (matchedCount)、实际修改的文档数量 (modifiedCount) 以及有关任何已更新文档的信息 ()。upsertedId了解这些指标对于正确处理错误、验证更新操作和维护应用程序中的数据一致性至关重要。

更新和替换单个文档

在 DocumentDB 中,可以使用 updateOne () 方法来更新单个文档。此方法采用通常由Filters类提供的筛选参数来标识要更新的文档,使用确定要更新的字段的 Updat e UpdateOptions 参数以及用于为更新设置不同选项的可选参数。使用该updateOne()方法只会更新第一个符合选择标准的文档。以下示例代码更新一个文档的单个字段:

collection.updateOne(Filters.eq("restaurantId", "REST-Y2E9H5"), Updates.set("name", "Amazing Japanese sushi"));

要更新一个文档中的多个字段,请updateOne()使用Update.combine()和,如下例所示。此示例还说明如何向文档中的数组中添加项目。

List<Bson> updates = new ArrayList<>(); // Basic field updates updates.add(Updates.set("name", "Shanghai Best")); // Array operations updates.add(Updates.addEachToSet("features", Arrays.asList("Live Music"))); // Counter updates updates.add(Updates.inc("rating.totalReviews", 10)); // Combine all updates Bson combinedUpdates = Updates.combine(updates); // Execute automic update with one call collection.updateOne(Filters.eq("restaurantId","REST-1J83NH"), combinedUpdates);

以下代码示例演示了如何更新数据库中的文档。如果指定的文档不存在,则操作会自动将其作为新文档插入。此代码还演示了如何使用通过UpdateResult对象提供的指标。

Bson filter = Filters.eq("restaurantId", "REST-0Y9GL0"); Bson update = Updates.set("cuisine", "Indian"); // Upsert operation UpdateOptions options = new UpdateOptions().upsert(true); UpdateResult result = collection.updateOne(filter, update, options); if (result.getUpsertedId() != null) { System.out.println("Inserted document with _id: " + result.getUpsertedId()); } else { System.out.println("Updated " + result.getModifiedCount() + " document(s)"); }

以下代码示例演示了如何使用该replaceOne()方法将现有文档完全替换为新文档,而不是更新单个字段。该replaceOne()方法覆盖整个文档,仅保留原始文档的_id字段。如果多个文档符合筛选条件,则仅替换第一个遇到的文档。

Document newDocument = new Document() .append("restaurantId", "REST-0Y9GL0") .append("name", "Bhiryani Adda") .append("cuisine", "Indian") .append("rating", new Document() .append("average", 4.8) .append("totalReviews", 267)) .append("features", Arrays.asList("Outdoor Seating", "Live Music")); UpdateResult result = collection.replaceOne( Filters.eq("restaurantId", "REST-0Y9GL0"), newDocument); System.out.printf("Modified %d document%n", result.getModifiedCount());

更新多个文档

有两种方法可以同时更新集合中的多个文档。您可以使用该updateMany()方法,也可以将与bulkWrite()方法UpdateManyModel一起使用。该updateMany()方法使用筛选参数来选择要更新的文档,使用Update参数来标识要更新的字段,使用可选UpdateOptions参数来指定更新选项。

以下示例代码演示了该updateMany()方法的用法:

Bson filter = Filters.and( Filters.in("features", Arrays.asList("Private Dining")), Filters.eq("cuisine", "Thai")); UpdateResult result1 = collection.updateMany(filter, Updates.set("priceRange", "$$$"));

以下示例代码演示了使用相同更新的bulkWrite()方法:

BulkWriteOptions options = new BulkWriteOptions().ordered(false); List < WriteModel < Document >> updates = new ArrayList < > (); Bson filter = Filters.and( Filters.in("features", Arrays.asList("Private Dining")), Filters.eq("cuisine", "Indian")); Bson updateField = Updates.set("priceRange", "$$$"); updates.add(new UpdateManyModel < > (filter, updateField)); BulkWriteResult result = collection.bulkWrite(updates, options); System.out.printf("Modified %d document%n", result.getModifiedCount());

从 DocumentDB 集合中移除文档

MongoDB Java 驱动程序deleteOne()提供了删除单个文档deleteMany()和删除符合特定条件的多个文档的功能。就像更新一样,删除操作也可以与该bulkWrite()方法一起使用。两deleteOne()者都deleteMany()返回一个DeleteResult对象,该对象提供有关操作结果的信息,包括删除的文档数。以下是使用deleteMany()移除多个文档的示例:

Bson filter = Filters.and( Filters.eq("cuisine", "Thai"), Filters.lt("rating.totalReviews", 50)); DeleteResult result = collection.deleteMany(filter); System.out.printf("Deleted %d document%n", result.getDeletedCount());

使用重试逻辑处理错误

Amazon DocumentDB 的强大错误处理策略应将错误分为可重试(例如网络超时、连接问题)和不可重试(例如身份验证失败、查询无效)。对于因应重试的错误而导致的操作失败,它应在每次重试和最大重试尝试次数之间实现延迟。CRUD 操作应该位于捕获及其子类的 try-cat MongoExceptionch 块中。此外,它还应包括监控和记录错误,以实现操作可见性。以下是演示如何实现重试错误处理的示例代码:

int MAX_RETRIES = 3; int INITIAL_DELAY_MS = 1000; int retryCount = 0; while (true) { try { crud_operation(); //perform crud that will throw MongoException or one of its subclass break; } catch (MongoException e) { if (retryCount < MAX_RETRIES) { retryCount++; long delayMs = INITIAL_DELAY_MS * (long) Math.pow(2, retryCount - 1); try { TimeUnit.MILLISECONDS.sleep(delayMs); } catch (InterruptedException t) { Thread.currentThread().interrupt(); throw new RuntimeException("Retry interrupted", t); } continue; } else throw new RuntimeException("Crud operation failed", e); } }