Skip to content

Commit

Permalink
[FLINK-36282][pipeline-connector][cdc-connector][mysql]fix incorrect …
Browse files Browse the repository at this point in the history
…data type of TINYINT(1) in mysql pipeline connector
  • Loading branch information
qg-lin committed Sep 14, 2024
1 parent 4b4b8ea commit 1dd101d
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public MySqlDataSource(MySqlSourceConfigFactory configFactory) {
public EventSourceProvider getEventSourceProvider() {
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges());
DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges(),
sourceConfig.getJdbcProperties());

MySqlSource<Event> source =
new MySqlSource<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord;

Expand All @@ -59,21 +60,25 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private final boolean includeSchemaChanges;
private final Properties jdbcProperties;

private transient Tables tables;
private transient CustomMySqlAntlrDdlParser customParser;

public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
DebeziumChangelogMode changelogMode,
boolean includeSchemaChanges,
Properties jdbcProperties) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
this.jdbcProperties = jdbcProperties;
}

@Override
protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record) {
if (includeSchemaChanges) {
if (customParser == null) {
customParser = new CustomMySqlAntlrDdlParser();
customParser = new CustomMySqlAntlrDdlParser(jdbcProperties);
tables = new Tables();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;
Expand All @@ -60,6 +61,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private final MySqlAntlrDdlParser parser;
private final List<ParseTreeListener> listeners;
private final LinkedList<SchemaChangeEvent> changes;
private final Properties jdbcProperties;
private org.apache.flink.cdc.common.event.TableId currentTable;
private List<ColumnEditor> columnEditors;
private CustomColumnDefinitionParserListener columnDefinitionListener;
Expand All @@ -70,10 +72,12 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
public CustomAlterTableParserListener(
MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners,
LinkedList<SchemaChangeEvent> changes) {
LinkedList<SchemaChangeEvent> changes,
Properties jdbcProperties) {
this.parser = parser;
this.listeners = listeners;
this.changes = changes;
this.jdbcProperties = jdbcProperties;
}

@Override
Expand Down Expand Up @@ -315,7 +319,7 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
String newColumnName = parser.parseName(ctx.newColumn);

Map<String, DataType> typeMapping = new HashMap<>();
typeMapping.put(column.name(), fromDbzColumn(column));
typeMapping.put(column.name(), fromDbzColumn(column, jdbcProperties));
changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));

if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) {
Expand Down Expand Up @@ -366,7 +370,7 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
() -> {
Column column = columnDefinitionListener.getColumn();
Map<String, DataType> typeMapping = new HashMap<>();
typeMapping.put(column.name(), fromDbzColumn(column));
typeMapping.put(column.name(), fromDbzColumn(column, jdbcProperties));
changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));
listeners.remove(columnDefinitionListener);
},
Expand Down Expand Up @@ -413,7 +417,7 @@ public void exitDropTable(MySqlParser.DropTableContext ctx) {
private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
dbzColumn.name(),
fromDbzColumn(dbzColumn),
fromDbzColumn(dbzColumn, jdbcProperties),
dbzColumn.comment(),
dbzColumn.defaultValueExpression().orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;

/** A ddl parser that will use custom listener. */
public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {

private final LinkedList<SchemaChangeEvent> parsedEvents;
private final Properties jdbcProperties;

public CustomMySqlAntlrDdlParser() {
public CustomMySqlAntlrDdlParser(Properties jdbcProperties) {
super();
this.parsedEvents = new LinkedList<>();
this.jdbcProperties = jdbcProperties;
}

// Overriding this method because the BIT type requires default length dimension of 1.
Expand Down Expand Up @@ -277,7 +280,7 @@ protected DataTypeResolver initializeDataTypeResolver() {

@Override
protected AntlrDdlParserListener createParseTreeWalkerListener() {
return new CustomMySqlAntlrDdlParserListener(this, parsedEvents);
return new CustomMySqlAntlrDdlParserListener(this, parsedEvents, jdbcProperties);
}

public List<SchemaChangeEvent> getAndClearParsedEvents() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;

/**
Expand Down Expand Up @@ -74,12 +75,16 @@ public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener
private final Collection<ParsingException> errors = new ArrayList<>();

public CustomMySqlAntlrDdlParserListener(
MySqlAntlrDdlParser parser, LinkedList<SchemaChangeEvent> parsedEvents) {
MySqlAntlrDdlParser parser,
LinkedList<SchemaChangeEvent> parsedEvents,
Properties jdbcProperties) {
// initialize listeners
listeners.add(new CreateAndAlterDatabaseParserListener(parser));
listeners.add(new DropDatabaseParserListener(parser));
listeners.add(new CreateTableParserListener(parser, listeners));
listeners.add(new CustomAlterTableParserListener(parser, listeners, parsedEvents));
listeners.add(
new CustomAlterTableParserListener(
parser, listeners, parsedEvents, jdbcProperties));
listeners.add(new DropTableParserListener(parser));
listeners.add(new RenameTableParserListener(parser));
listeners.add(new TruncateTableParserListener(parser));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ private Schema parseDDL(String ddlStatement, TableId tableId) {
Column column = columns.get(i);

String colName = column.name();
DataType dataType = MySqlTypeUtils.fromDbzColumn(column);
DataType dataType =
MySqlTypeUtils.fromDbzColumn(column, sourceConfig.getJdbcProperties());
if (!column.isOptional()) {
dataType = dataType.notNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
Expand Down Expand Up @@ -129,14 +130,14 @@ public static Schema getTableSchema(
new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) {
TableChanges.TableChange tableSchema =
mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId));
return toSchema(tableSchema.getTable());
return toSchema(tableSchema.getTable(), sourceConfig.getJdbcProperties());
}
}

public static Schema toSchema(Table table) {
public static Schema toSchema(Table table, Properties jdbcProperties) {
List<Column> columns =
table.columns().stream()
.map(MySqlSchemaUtils::toColumn)
.map(column -> toColumn(column, jdbcProperties))
.collect(Collectors.toList());

return Schema.newBuilder()
Expand All @@ -146,9 +147,11 @@ public static Schema toSchema(Table table) {
.build();
}

public static Column toColumn(io.debezium.relational.Column column) {
public static Column toColumn(io.debezium.relational.Column column, Properties jdbcProperties) {
return Column.physicalColumn(
column.name(), MySqlTypeUtils.fromDbzColumn(column), column.comment());
column.name(),
MySqlTypeUtils.fromDbzColumn(column, jdbcProperties),
column.comment());
}

public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;

import com.mysql.cj.conf.PropertyKey;
import io.debezium.relational.Column;

import java.util.Properties;

/** Utilities for converting from MySQL types to {@link DataType}s. */
public class MySqlTypeUtils {

Expand Down Expand Up @@ -109,8 +112,8 @@ public class MySqlTypeUtils {
private static final String UNKNOWN = "UNKNOWN";

/** Returns a corresponding Flink data type from a debezium {@link Column}. */
public static DataType fromDbzColumn(Column column) {
DataType dataType = convertFromColumn(column);
public static DataType fromDbzColumn(Column column, Properties jdbcProperties) {
DataType dataType = convertFromColumn(column, jdbcProperties);
if (column.isOptional()) {
return dataType;
} else {
Expand All @@ -122,7 +125,7 @@ public static DataType fromDbzColumn(Column column) {
* Returns a corresponding Flink data type from a debezium {@link Column} with nullable always
* be true.
*/
private static DataType convertFromColumn(Column column) {
private static DataType convertFromColumn(Column column, Properties jdbcProperties) {
String typeName = column.typeName();
switch (typeName) {
case BIT:
Expand All @@ -137,7 +140,13 @@ private static DataType convertFromColumn(Column column) {
// user should not use tinyint(1) to store number although jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's not a general way
// btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default
return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT();
boolean tinyInt1isBit =
Boolean.parseBoolean(
jdbcProperties.getProperty(
PropertyKey.tinyInt1isBit.getKeyName(), "true"));
return (column.length() == 1 && tinyInt1isBit)
? DataTypes.BOOLEAN()
: DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.mysql.cj.conf.PropertyKey;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -41,6 +42,7 @@
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -102,13 +104,23 @@ public void testMysql8AccessDatabaseAndTable() {
}

@Test
public void testMysql57AccessCommonTypesSchema() {
testAccessCommonTypesSchema(fullTypesMySql57Database);
public void testMysql57AccessCommonTypesSchemaTinyintIsBit() {
testAccessCommonTypesSchema(fullTypesMySql57Database, true);
}

@Test
public void testMysql8AccessCommonTypesSchema() {
testAccessCommonTypesSchema(fullTypesMySql8Database);
public void testMysql57AccessCommonTypesSchemaTinyintIsNotBit() {
testAccessCommonTypesSchema(fullTypesMySql57Database, false);
}

@Test
public void testMysql8AccessCommonTypesSchemaTinyintIsBit() {
testAccessCommonTypesSchema(fullTypesMySql8Database, true);
}

@Test
public void testMysql8AccessCommonTypesSchemaTinyintIsNotBit() {
testAccessCommonTypesSchema(fullTypesMySql8Database, false);
}

@Test
Expand All @@ -117,7 +129,7 @@ public void testMysql57AccessTimeTypesSchema() {

String[] tables = new String[] {"time_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database);
getMetadataAccessor(tables, fullTypesMySql57Database, true);

Schema actualSchema =
metadataAccessor.getTableSchema(
Expand Down Expand Up @@ -163,7 +175,7 @@ public void testMysql8AccessTimeTypesSchema() {

String[] tables = new String[] {"time_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql8Database);
getMetadataAccessor(tables, fullTypesMySql8Database, true);

Schema actualSchema =
metadataAccessor.getTableSchema(
Expand Down Expand Up @@ -211,7 +223,7 @@ private void testAccessDatabaseAndTable(UniqueDatabase database) {
database.createAndInitialize();

String[] tables = new String[] {"common_types", "time_types", "precision_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database, true);

assertThatThrownBy(metadataAccessor::listNamespaces)
.isInstanceOf(UnsupportedOperationException.class);
Expand All @@ -227,11 +239,12 @@ private void testAccessDatabaseAndTable(UniqueDatabase database) {
assertThat(actualTables).containsExactlyInAnyOrderElementsOf(expectedTables);
}

private void testAccessCommonTypesSchema(UniqueDatabase database) {
private void testAccessCommonTypesSchema(UniqueDatabase database, boolean tinyint1IsBit) {
database.createAndInitialize();

String[] tables = new String[] {"common_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, database, tinyint1IsBit);

Schema actualSchema =
metadataAccessor.getTableSchema(
Expand Down Expand Up @@ -277,8 +290,12 @@ private void testAccessCommonTypesSchema(UniqueDatabase database) {
DataTypes.STRING(),
DataTypes.BOOLEAN(),
DataTypes.BINARY(1),
DataTypes.BOOLEAN(),
DataTypes.BOOLEAN(),
tinyint1IsBit
? DataTypes.BOOLEAN()
: DataTypes.TINYINT(),
tinyint1IsBit
? DataTypes.BOOLEAN()
: DataTypes.TINYINT(),
DataTypes.BINARY(16),
DataTypes.BINARY(8),
DataTypes.STRING(),
Expand Down Expand Up @@ -357,17 +374,22 @@ private void testAccessCommonTypesSchema(UniqueDatabase database) {
assertThat(actualSchema).isEqualTo(expectedSchema);
}

private MySqlMetadataAccessor getMetadataAccessor(String[] tables, UniqueDatabase database) {
MySqlSourceConfig sourceConfig = getConfig(tables, database);
private MySqlMetadataAccessor getMetadataAccessor(
String[] tables, UniqueDatabase database, boolean tinyint1IsBit) {
MySqlSourceConfig sourceConfig = getConfig(tables, database, tinyint1IsBit);
return new MySqlMetadataAccessor(sourceConfig);
}

private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase database) {
private MySqlSourceConfig getConfig(
String[] captureTables, UniqueDatabase database, boolean tinyint1IsBit) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> database.getDatabaseName() + "." + tableName)
.toArray(String[]::new);

Properties jdbcProperties = new Properties();
jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyint1IsBit));

return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.latest())
.databaseList(database.getDatabaseName())
Expand All @@ -380,6 +402,7 @@ private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase datab
.username(database.getUsername())
.password(database.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
.jdbcProperties(jdbcProperties)
.createConfig(0);
}
}
Loading

0 comments on commit 1dd101d

Please sign in to comment.