diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md index 83103c9218..7675805e30 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md @@ -332,6 +332,11 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为 TIMESTAMP_LTZ(3) NOT NULL 它指示在数据库中进行更改的时间。
如果记录是从表的快照而不是改变流中读取的,该值将始终为0。 + + op_type + STRING NOT NULL + 该行的操作类型。 + diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md index 9ffbf184d4..be7e3cace6 100644 --- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md @@ -357,6 +357,11 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0. + + op_type + STRING NOT NULL + Operation type of the row. + diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java index c2baf021c9..c4679da6b9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java @@ -81,6 +81,20 @@ public Object read(SourceRecord record) { return TimestampData.fromEpochMillis( (Long) source.get(AbstractSourceInfo.TIMESTAMP_KEY)); } + }), + + OP_TYPE( + "op_type", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct value = (Struct) record.value(); + String opType = value.getString(MongoDBEnvelope.OPERATION_TYPE_FIELD); + return StringData.fromString(opType); + } }); private final String key; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index 8d8047fa72..796bbfbad5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -648,6 +648,7 @@ private void testMongoDBParallelSource( + " name STRING," + " address STRING," + " phone_number STRING," + + " op_type STRING METADATA FROM 'op_type' VIRTUAL," + " primary key (_id) not enforced" + ") WITH (" + " 'connector' = 'mongodb-cdc'," @@ -674,31 +675,31 @@ private void testMongoDBParallelSource( // first step: check the snapshot data String[] snapshotForSingleTable = new String[] { - "+I[101, user_1, Shanghai, 123567891234]", - "+I[102, user_2, Shanghai, 123567891234]", - "+I[103, user_3, Shanghai, 123567891234]", - "+I[109, user_4, Shanghai, 123567891234]", - "+I[110, user_5, Shanghai, 123567891234]", - "+I[111, user_6, Shanghai, 123567891234]", - "+I[118, user_7, Shanghai, 123567891234]", - "+I[121, user_8, Shanghai, 123567891234]", - "+I[123, user_9, Shanghai, 123567891234]", - "+I[1009, user_10, Shanghai, 123567891234]", - "+I[1010, user_11, Shanghai, 123567891234]", - "+I[1011, user_12, Shanghai, 123567891234]", - "+I[1012, user_13, Shanghai, 123567891234]", - "+I[1013, user_14, Shanghai, 123567891234]", - "+I[1014, user_15, Shanghai, 123567891234]", - "+I[1015, user_16, Shanghai, 123567891234]", - "+I[1016, user_17, Shanghai, 123567891234]", - "+I[1017, user_18, Shanghai, 123567891234]", - "+I[1018, user_19, Shanghai, 123567891234]", - "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" + "+I[101, user_1, Shanghai, 123567891234, insert]", + "+I[102, user_2, Shanghai, 123567891234, insert]", + "+I[103, user_3, Shanghai, 123567891234, insert]", + "+I[109, user_4, Shanghai, 123567891234, insert]", + "+I[110, user_5, Shanghai, 123567891234, insert]", + "+I[111, user_6, Shanghai, 123567891234, insert]", + "+I[118, user_7, Shanghai, 123567891234, insert]", + "+I[121, user_8, Shanghai, 123567891234, insert]", + "+I[123, user_9, Shanghai, 123567891234, insert]", + "+I[1009, user_10, Shanghai, 123567891234, insert]", + "+I[1010, user_11, Shanghai, 123567891234, insert]", + "+I[1011, user_12, Shanghai, 123567891234, insert]", + "+I[1012, user_13, Shanghai, 123567891234, insert]", + "+I[1013, user_14, Shanghai, 123567891234, insert]", + "+I[1014, user_15, Shanghai, 123567891234, insert]", + "+I[1015, user_16, Shanghai, 123567891234, insert]", + "+I[1016, user_17, Shanghai, 123567891234, insert]", + "+I[1017, user_18, Shanghai, 123567891234, insert]", + "+I[1018, user_19, Shanghai, 123567891234, insert]", + "+I[1019, user_20, Shanghai, 123567891234, insert]", + "+I[2000, user_21, Shanghai, 123567891234, insert]" }; tEnv.executeSql(sourceDDL); TableResult tableResult = - tEnv.executeSql("select cid, name, address, phone_number from customers"); + tEnv.executeSql("select cid, name, address, phone_number, op_type from customers"); CloseableIterator iterator = tableResult.collect(); JobID jobId = tableResult.getJobClient().get().getJobID(); List expectedSnapshotData = new ArrayList<>(); @@ -731,17 +732,17 @@ private void testMongoDBParallelSource( String[] changeEventsForSingleTable = new String[] { - "-U[101, user_1, Shanghai, 123567891234]", - "+U[101, user_1, Hangzhou, 123567891234]", - "-D[102, user_2, Shanghai, 123567891234]", - "+I[102, user_2, Shanghai, 123567891234]", - "-U[103, user_3, Shanghai, 123567891234]", - "+U[103, user_3, Hangzhou, 123567891234]", - "-U[1010, user_11, Shanghai, 123567891234]", - "+U[1010, user_11, Hangzhou, 123567891234]", - "+I[2001, user_22, Shanghai, 123567891234]", - "+I[2002, user_23, Shanghai, 123567891234]", - "+I[2003, user_24, Shanghai, 123567891234]" + "-U[101, user_1, Shanghai, 123567891234, update]", + "+U[101, user_1, Hangzhou, 123567891234, update]", + "-D[102, user_2, Shanghai, 123567891234, delete]", + "+I[102, user_2, Shanghai, 123567891234, insert]", + "-U[103, user_3, Shanghai, 123567891234, update]", + "+U[103, user_3, Hangzhou, 123567891234, update]", + "-U[1010, user_11, Shanghai, 123567891234, update]", + "+U[1010, user_11, Hangzhou, 123567891234, update]", + "+I[2001, user_22, Shanghai, 123567891234, insert]", + "+I[2002, user_23, Shanghai, 123567891234, insert]", + "+I[2003, user_24, Shanghai, 123567891234, insert]" }; List expectedChangeStreamData = new ArrayList<>(); for (int i = 0; i < captureCustomerCollections.length; i++) {