Skip to content

Commit

Permalink
[minor][cdc-connector][mongodb] Add op_type metadata column
Browse files Browse the repository at this point in the history
  • Loading branch information
qg-lin committed Sep 20, 2024
1 parent a5b666a commit 32d3f1a
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 33 deletions.
5 changes: 5 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>它指示在数据库中进行更改的时间。 <br>如果记录是从表的快照而不是改变流中读取的,该值将始终为0</td>
</tr>
<tr>
<td>op_type</td>
<td>STRING NOT NULL</td>
<td>该行的操作类型。</td>
</tr>
</tbody>
</table>

Expand Down
5 changes: 5 additions & 0 deletions docs/content/docs/connectors/flink-sources/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
</tr>
<tr>
<td>op_type</td>
<td>STRING NOT NULL</td>
<td>Operation type of the row.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ public Object read(SourceRecord record) {
return TimestampData.fromEpochMillis(
(Long) source.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
}),

OP_TYPE(
"op_type",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object read(SourceRecord record) {
Struct value = (Struct) record.value();
String opType = value.getString(MongoDBEnvelope.OPERATION_TYPE_FIELD);
return StringData.fromString(opType);
}
});

private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ private void testMongoDBParallelSource(
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " op_type STRING METADATA FROM 'op_type' VIRTUAL,"
+ " primary key (_id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mongodb-cdc',"
Expand All @@ -674,31 +675,31 @@ private void testMongoDBParallelSource(
// first step: check the snapshot data
String[] snapshotForSingleTable =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
"+I[101, user_1, Shanghai, 123567891234, insert]",
"+I[102, user_2, Shanghai, 123567891234, insert]",
"+I[103, user_3, Shanghai, 123567891234, insert]",
"+I[109, user_4, Shanghai, 123567891234, insert]",
"+I[110, user_5, Shanghai, 123567891234, insert]",
"+I[111, user_6, Shanghai, 123567891234, insert]",
"+I[118, user_7, Shanghai, 123567891234, insert]",
"+I[121, user_8, Shanghai, 123567891234, insert]",
"+I[123, user_9, Shanghai, 123567891234, insert]",
"+I[1009, user_10, Shanghai, 123567891234, insert]",
"+I[1010, user_11, Shanghai, 123567891234, insert]",
"+I[1011, user_12, Shanghai, 123567891234, insert]",
"+I[1012, user_13, Shanghai, 123567891234, insert]",
"+I[1013, user_14, Shanghai, 123567891234, insert]",
"+I[1014, user_15, Shanghai, 123567891234, insert]",
"+I[1015, user_16, Shanghai, 123567891234, insert]",
"+I[1016, user_17, Shanghai, 123567891234, insert]",
"+I[1017, user_18, Shanghai, 123567891234, insert]",
"+I[1018, user_19, Shanghai, 123567891234, insert]",
"+I[1019, user_20, Shanghai, 123567891234, insert]",
"+I[2000, user_21, Shanghai, 123567891234, insert]"
};
tEnv.executeSql(sourceDDL);
TableResult tableResult =
tEnv.executeSql("select cid, name, address, phone_number from customers");
tEnv.executeSql("select cid, name, address, phone_number, op_type from customers");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = new ArrayList<>();
Expand Down Expand Up @@ -731,17 +732,17 @@ private void testMongoDBParallelSource(

String[] changeEventsForSingleTable =
new String[] {
"-U[101, user_1, Shanghai, 123567891234]",
"+U[101, user_1, Hangzhou, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]",
"-U[1010, user_11, Shanghai, 123567891234]",
"+U[1010, user_11, Hangzhou, 123567891234]",
"+I[2001, user_22, Shanghai, 123567891234]",
"+I[2002, user_23, Shanghai, 123567891234]",
"+I[2003, user_24, Shanghai, 123567891234]"
"-U[101, user_1, Shanghai, 123567891234, update]",
"+U[101, user_1, Hangzhou, 123567891234, update]",
"-D[102, user_2, Shanghai, 123567891234, delete]",
"+I[102, user_2, Shanghai, 123567891234, insert]",
"-U[103, user_3, Shanghai, 123567891234, update]",
"+U[103, user_3, Hangzhou, 123567891234, update]",
"-U[1010, user_11, Shanghai, 123567891234, update]",
"+U[1010, user_11, Hangzhou, 123567891234, update]",
"+I[2001, user_22, Shanghai, 123567891234, insert]",
"+I[2002, user_23, Shanghai, 123567891234, insert]",
"+I[2003, user_24, Shanghai, 123567891234, insert]"
};
List<String> expectedChangeStreamData = new ArrayList<>();
for (int i = 0; i < captureCustomerCollections.length; i++) {
Expand Down

0 comments on commit 32d3f1a

Please sign in to comment.