Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-3799] Fix the issue of array out of bounds when fetching lineage information #3816

Merged
merged 20 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d3ed04d
[Fix] Fix the issue where the maintainer is left blank during the upd…
May 10, 2024
e26b51a
Merge remote-tracking branch 'upstream/dev' into dev
May 11, 2024
355ee64
[Fix] Resolving issues where custom classes in user Jars fail to comp…
May 11, 2024
0d8d464
Merge branch 'DataLinkDC:dev' into dev
18216499322 Sep 10, 2024
694acd1
Merge branch 'dev' of https://github.com/DataLinkDC/dinky into dev
actions-user Sep 11, 2024
e27ad53
Merge branch 'dev' of https://github.com/DataLinkDC/dinky into dev
actions-user Sep 13, 2024
5efda5d
Merge branch 'dev' of https://github.com/DataLinkDC/dinky into dev
actions-user Sep 14, 2024
a63905f
Merge branch 'dev' of https://github.com/DataLinkDC/dinky into dev
actions-user Sep 16, 2024
d1810e1
Merge branch 'dev' of https://github.com/DataLinkDC/dinky into dev
actions-user Sep 17, 2024
270f71f
Merge branch 'dev' of https://github.com/DataLinkDC/dinky into dev
actions-user Sep 18, 2024
e7a6e0b
feat:获取血缘异常时,抛出异常信息到前端
Sep 11, 2024
2d67318
fix:修复获取血缘数组越界异常
Sep 18, 2024
af82feb
fix:修复获取血缘数组越界异常
Sep 18, 2024
3a3d3c3
fix:修复获取血缘数组越界异常
Sep 18, 2024
80fe22b
Spotless Apply
18216499322 Sep 18, 2024
ea757e9
fix:导入依赖
Sep 18, 2024
620cc31
Merge remote-tracking branch 'lsj-origin/fix-lineage-bug' into fix-li…
Sep 18, 2024
54e4d77
Spotless Apply
18216499322 Sep 18, 2024
8c1536e
使用英文
Sep 18, 2024
1cd5e7f
Merge remote-tracking branch 'lsj-origin/fix-lineage-bug' into fix-li…
Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class RelColumnOrigin {

private final boolean isDerived;

private boolean isComputedColumn;

/**
* Stores the expression for data conversion,
* which source table fields are transformed by which expression the target field
Expand All @@ -63,6 +65,13 @@ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDer
this.transform = transform;
}

public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived, boolean isComputedColumn) {
this.originTable = originTable;
this.iOriginColumn = iOriginColumn;
this.isDerived = isDerived;
this.isComputedColumn = isComputedColumn;
}

// ~ Methods ----------------------------------------------------------------

/**
Expand Down Expand Up @@ -94,6 +103,10 @@ public boolean isDerived() {
return isDerived;
}

public boolean isComputedColumn() {
return isComputedColumn;
}

public String getTransform() {
return transform;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -230,14 +232,28 @@ public Set<RelColumnOrigin> getColumnOrigins(Project rel, final RelMetadataQuery
// Direct reference: no derivation added.
RexInputRef inputRef = (RexInputRef) rexNode;
int index = inputRef.getIndex();
if (input instanceof TableScan) {
index = computeIndexWithOffset(rel.getProjects(), inputRef.getIndex(), iOutputColumn);
}
return mq.getColumnOrigins(input, index);
} else if (input instanceof TableScan
&& rexNode.getClass().equals(RexCall.class)
&& ((RexCall) rexNode).getOperands().isEmpty()) {
return mq.getColumnOrigins(input, iOutputColumn);
List<Column> columns = ((TableSourceTable) (input).getTable())
.catalogTable()
.getResolvedSchema()
.getColumns();
Set<RelColumnOrigin> set = new LinkedHashSet<>();
for (int index = 0; index < columns.size(); index++) {
Column column = columns.get(index);
if (column instanceof Column.ComputedColumn
&& rexNode.toString()
.equals(((Column.ComputedColumn) column)
.getExpression()
.toString())) {
set.add(new RelColumnOrigin(input.getTable(), index, false, true));
return set;
}
}
set.add(new RelColumnOrigin(input.getTable(), -1, false, false));
return set;
}
// Anything else is a derivation, possibly from multiple columns.
final Set<RelColumnOrigin> set = getMultipleColumns(rexNode, input, mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,22 @@ private List<LineageRel> buildFiledLineageResult(String sinkTable, RelNode optRe

// filed
int ordinal = relColumnOrigin.getOriginColumnOrdinal();
List<String> fieldNames = ((TableSourceTable) table)
.catalogTable()
.getResolvedSchema()
.getColumnNames();
String sourceColumn = fieldNames.get(ordinal);

if (ordinal == -1) {
continue;
}

String sourceColumn;
if (relColumnOrigin.isComputedColumn()) {
List<String> fieldNames = ((TableSourceTable) table)
.catalogTable()
.getResolvedSchema()
.getColumnNames();
sourceColumn = fieldNames.get(ordinal);
} else {
List<String> fieldNames = table.getRowType().getFieldNames();
sourceColumn = fieldNames.get(ordinal);
}

// add record
resultList.add(LineageRel.build(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class RelColumnOrigin {

private final boolean isDerived;

private boolean isComputedColumn;

/**
* Stores the expression for data conversion,
* which source table fields are transformed by which expression the target field
Expand All @@ -63,6 +65,13 @@ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDer
this.transform = transform;
}

public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived, boolean isComputedColumn) {
this.originTable = originTable;
this.iOriginColumn = iOriginColumn;
this.isDerived = isDerived;
this.isComputedColumn = isComputedColumn;
}

// ~ Methods ----------------------------------------------------------------

/**
Expand Down Expand Up @@ -98,6 +107,10 @@ public String getTransform() {
return transform;
}

public boolean isComputedColumn() {
return isComputedColumn;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof RelColumnOrigin)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -230,14 +232,28 @@ public Set<RelColumnOrigin> getColumnOrigins(Project rel, final RelMetadataQuery
// Direct reference: no derivation added.
RexInputRef inputRef = (RexInputRef) rexNode;
int index = inputRef.getIndex();
if (input instanceof TableScan) {
index = computeIndexWithOffset(rel.getProjects(), inputRef.getIndex(), iOutputColumn);
}
return mq.getColumnOrigins(input, index);
} else if (input instanceof TableScan
&& rexNode.getClass().equals(RexCall.class)
&& ((RexCall) rexNode).getOperands().isEmpty()) {
return mq.getColumnOrigins(input, iOutputColumn);
List<Column> columns = ((TableSourceTable) (input).getTable())
.contextResolvedTable()
.getResolvedSchema()
.getColumns();
Set<RelColumnOrigin> set = new LinkedHashSet<>();
for (int index = 0; index < columns.size(); index++) {
Column column = columns.get(index);
if (column instanceof Column.ComputedColumn
&& rexNode.toString()
.equals(((Column.ComputedColumn) column)
.getExpression()
.toString())) {
set.add(new RelColumnOrigin(input.getTable(), index, false, true));
return set;
}
}
set.add(new RelColumnOrigin(input.getTable(), -1, false, false));
return set;
}
// Anything else is a derivation, possibly from multiple columns.
final Set<RelColumnOrigin> set = getMultipleColumns(rexNode, input, mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,22 @@ private List<LineageRel> buildFiledLineageResult(String sinkTable, RelNode optRe

// filed
int ordinal = relColumnOrigin.getOriginColumnOrdinal();
List<String> fieldNames = ((TableSourceTable) table)
.contextResolvedTable()
.getResolvedSchema()
.getColumnNames();
String sourceColumn = fieldNames.get(ordinal);

if (ordinal == -1) {
continue;
}

String sourceColumn;
if (relColumnOrigin.isComputedColumn()) {
List<String> fieldNames = ((TableSourceTable) table)
.contextResolvedTable()
.getResolvedSchema()
.getColumnNames();
sourceColumn = fieldNames.get(ordinal);
} else {
List<String> fieldNames = table.getRowType().getFieldNames();
sourceColumn = fieldNames.get(ordinal);
}

// add record
resultList.add(LineageRel.build(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class RelColumnOrigin {

private final boolean isDerived;

private boolean isComputedColumn;

/**
* Stores the expression for data conversion,
* which source table fields are transformed by which expression the target field
Expand All @@ -63,6 +65,13 @@ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDer
this.transform = transform;
}

public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived, boolean isComputedColumn) {
this.originTable = originTable;
this.iOriginColumn = iOriginColumn;
this.isDerived = isDerived;
this.isComputedColumn = isComputedColumn;
}

// ~ Methods ----------------------------------------------------------------

/**
Expand Down Expand Up @@ -98,6 +107,10 @@ public String getTransform() {
return transform;
}

public boolean isComputedColumn() {
return isComputedColumn;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof RelColumnOrigin)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -230,14 +232,31 @@ public Set<RelColumnOrigin> getColumnOrigins(Project rel, final RelMetadataQuery
// Direct reference: no derivation added.
RexInputRef inputRef = (RexInputRef) rexNode;
int index = inputRef.getIndex();
if (input instanceof TableScan) {
index = computeIndexWithOffset(rel.getProjects(), inputRef.getIndex(), iOutputColumn);
}
// if (input instanceof TableScan) {
// index = computeIndexWithOffset(rel.getProjects(), inputRef.getIndex(), iOutputColumn);
// }
return mq.getColumnOrigins(input, index);
} else if (input instanceof TableScan
&& rexNode.getClass().equals(RexCall.class)
&& ((RexCall) rexNode).getOperands().isEmpty()) {
return mq.getColumnOrigins(input, iOutputColumn);
List<Column> columns = ((TableSourceTable) (input).getTable())
.contextResolvedTable()
.getResolvedSchema()
.getColumns();
Set<RelColumnOrigin> set = new LinkedHashSet<>();
for (int index = 0; index < columns.size(); index++) {
Column column = columns.get(index);
if (column instanceof Column.ComputedColumn
&& rexNode.toString()
.equals(((Column.ComputedColumn) column)
.getExpression()
.toString())) {
set.add(new RelColumnOrigin(input.getTable(), index, false, true));
return set;
}
}
set.add(new RelColumnOrigin(input.getTable(), -1, false, false));
return set;
}
// Anything else is a derivation, possibly from multiple columns.
final Set<RelColumnOrigin> set = getMultipleColumns(rexNode, input, mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,22 @@ private List<LineageRel> buildFiledLineageResult(String sinkTable, RelNode optRe

// filed
int ordinal = relColumnOrigin.getOriginColumnOrdinal();
List<String> fieldNames = ((TableSourceTable) table)
.contextResolvedTable()
.getResolvedSchema()
.getColumnNames();
String sourceColumn = fieldNames.get(ordinal);

if (ordinal == -1) {
continue;
}

String sourceColumn;
if (relColumnOrigin.isComputedColumn()) {
List<String> fieldNames = ((TableSourceTable) table)
.contextResolvedTable()
.getResolvedSchema()
.getColumnNames();
sourceColumn = fieldNames.get(ordinal);
} else {
List<String> fieldNames = table.getRowType().getFieldNames();
sourceColumn = fieldNames.get(ordinal);
}

// add record
resultList.add(LineageRel.build(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class RelColumnOrigin {

private final boolean isDerived;

private boolean isComputedColumn;

/**
* Stores the expression for data conversion,
* which source table fields are transformed by which expression the target field
Expand All @@ -63,6 +65,13 @@ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDer
this.transform = transform;
}

public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived, boolean isComputedColumn) {
this.originTable = originTable;
this.iOriginColumn = iOriginColumn;
this.isDerived = isDerived;
this.isComputedColumn = isComputedColumn;
}

// ~ Methods ----------------------------------------------------------------

/**
Expand Down Expand Up @@ -98,6 +107,10 @@ public String getTransform() {
return transform;
}

public boolean isComputedColumn() {
return isComputedColumn;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof RelColumnOrigin)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -230,14 +232,28 @@ public Set<RelColumnOrigin> getColumnOrigins(Project rel, final RelMetadataQuery
// Direct reference: no derivation added.
RexInputRef inputRef = (RexInputRef) rexNode;
int index = inputRef.getIndex();
if (input instanceof TableScan) {
index = computeIndexWithOffset(rel.getProjects(), inputRef.getIndex(), iOutputColumn);
}
return mq.getColumnOrigins(input, index);
} else if (input instanceof TableScan
&& rexNode.getClass().equals(RexCall.class)
&& ((RexCall) rexNode).getOperands().isEmpty()) {
return mq.getColumnOrigins(input, iOutputColumn);
List<Column> columns = ((TableSourceTable) (input).getTable())
.contextResolvedTable()
.getResolvedSchema()
.getColumns();
Set<RelColumnOrigin> set = new LinkedHashSet<>();
for (int index = 0; index < columns.size(); index++) {
Column column = columns.get(index);
if (column instanceof Column.ComputedColumn
&& rexNode.toString()
.equals(((Column.ComputedColumn) column)
.getExpression()
.toString())) {
set.add(new RelColumnOrigin(input.getTable(), index, false, true));
return set;
}
}
set.add(new RelColumnOrigin(input.getTable(), -1, false, false));
return set;
}
// Anything else is a derivation, possibly from multiple columns.
final Set<RelColumnOrigin> set = getMultipleColumns(rexNode, input, mq);
Expand Down
Loading
Loading