From c961c8a33d883047c432df15d8ff47ae1be7e49b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 17 Sep 2024 15:17:29 -0400 Subject: [PATCH 01/13] feat: Updating protos for Projections to include more info Signed-off-by: Francisco Javier Arceo --- protos/feast/core/FeatureViewProjection.proto | 10 ++ protos/feast/core/OnDemandFeatureView.proto | 6 + sdk/python/feast/feature_view_projection.py | 35 ++++ sdk/python/feast/inference.py | 158 +++++++++++++----- 4 files changed, 165 insertions(+), 44 deletions(-) diff --git a/protos/feast/core/FeatureViewProjection.proto b/protos/feast/core/FeatureViewProjection.proto index 36d17632e7..b0e697b656 100644 --- a/protos/feast/core/FeatureViewProjection.proto +++ b/protos/feast/core/FeatureViewProjection.proto @@ -6,6 +6,7 @@ option java_outer_classname = "FeatureReferenceProto"; option java_package = "feast.proto.core"; import "feast/core/Feature.proto"; +import "feast/core/DataSource.proto"; // A projection to be applied on top of a FeatureView. @@ -22,4 +23,13 @@ message FeatureViewProjection { // Map for entity join_key overrides of feature data entity join_key to entity data join_key map join_key_map = 4; + + string timestamp_field = 5; + string date_partition_column = 6; + string created_timestamp_column = 7; + // Batch/Offline DataSource where this view can retrieve offline feature data. + DataSource batch_source = 8; + // Streaming DataSource from where this view can consume "online" feature data. + DataSource stream_source = 9; + } diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index 7a5fec1650..c915e32e16 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -63,6 +63,12 @@ message OnDemandFeatureViewSpec { // Owner of the on demand feature view. string owner = 8; string mode = 11; + bool write_to_online_store = 12; + + // List of names of entities associated with this feature view. + repeated string entities = 13; + // List of specifications for each entity defined as part of this feature view. + repeated FeatureSpecV2 entity_columns = 14; } message OnDemandFeatureViewMeta { diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index ff5b1b6e06..75e49d0b16 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -2,6 +2,7 @@ from attr import dataclass +from feast.data_source import DataSource from feast.field import Field from feast.protos.feast.core.FeatureViewProjection_pb2 import ( FeatureViewProjection as FeatureViewProjectionProto, @@ -27,6 +28,13 @@ class FeatureViewProjection: is not ready to be projected, i.e. still needs to go through feature inference. join_key_map: A map to modify join key columns during retrieval of this feature view projection. + timestamp_field: The timestamp field of the feature view projection. + date_partition_column: The date partition column of the feature view projection. + created_timestamp_column: The created timestamp column of the feature view projection. + batch_source: The batch source of data where this group of features + is stored. This is optional ONLY if a push source is specified as the + stream_source, since push sources contain their own batch sources. + """ name: str @@ -34,6 +42,10 @@ class FeatureViewProjection: desired_features: List[str] features: List[Field] join_key_map: Dict[str, str] = {} + timestamp_field: Optional[str] = None + date_partition_column: Optional[str] = None + created_timestamp_column: Optional[str] = None + batch_source: Optional[DataSource] = None def name_to_use(self): return self.name_alias or self.name @@ -43,6 +55,10 @@ def to_proto(self) -> FeatureViewProjectionProto: feature_view_name=self.name, feature_view_name_alias=self.name_alias or "", join_key_map=self.join_key_map, + timestamp_field=self.timestamp_field, + date_partition_column=self.date_partition_column, + created_timestamp_column=self.created_timestamp_column, + batch_source=self.batch_source.to_proto() or None, ) for feature in self.features: feature_reference_proto.feature_columns.append(feature.to_proto()) @@ -57,6 +73,10 @@ def from_proto(proto: FeatureViewProjectionProto): features=[], join_key_map=dict(proto.join_key_map), desired_features=[], + timestamp_field=proto.timestamp_field or None, + date_partition_column=proto.date_partition_column or None, + created_timestamp_column=proto.created_timestamp_column or None, + batch_source=proto.batch_source or None, ) for feature_column in proto.feature_columns: feature_view_projection.features.append(Field.from_proto(feature_column)) @@ -65,6 +85,21 @@ def from_proto(proto: FeatureViewProjectionProto): @staticmethod def from_definition(base_feature_view: "BaseFeatureView"): + # TODO need to implement this for StreamFeatureViews + if getattr(base_feature_view, "batch_source", None): + return FeatureViewProjection( + name=base_feature_view.name, + name_alias=None, + features=base_feature_view.features, + desired_features=[], + timestamp_field=base_feature_view.batch_source.created_timestamp_column + or None, + created_timestamp_column=base_feature_view.batch_source.created_timestamp_column + or None, + date_partition_column=base_feature_view.batch_source.date_partition_column + or None, + batch_source=base_feature_view.batch_source or None, + ) return FeatureViewProjection( name=base_feature_view.name, name_alias=None, diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 28a170172c..7466edffe8 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -13,6 +13,7 @@ from feast.infra.offline_stores.file_source import FileSource from feast.infra.offline_stores.redshift_source import RedshiftSource from feast.infra.offline_stores.snowflake_source import SnowflakeSource +from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import RepoConfig from feast.stream_feature_view import StreamFeatureView from feast.types import String @@ -94,7 +95,7 @@ def update_data_sources_with_inferred_event_timestamp_col( def update_feature_views_with_inferred_features_and_entities( - fvs: Union[List[FeatureView], List[StreamFeatureView]], + fvs: Union[List[FeatureView], List[StreamFeatureView], List[OnDemandFeatureView]], entities: List[Entity], config: RepoConfig, ) -> None: @@ -127,13 +128,14 @@ def update_feature_views_with_inferred_features_and_entities( # Fields whose names match a join key are considered to be entity columns; all # other fields are considered to be feature columns. + entity_columns = fv.entity_columns if fv.entity_columns else [] for field in fv.schema: if field.name in join_keys: # Do not override a preexisting field with the same name. if field.name not in [ - entity_column.name for entity_column in fv.entity_columns + entity_column.name for entity_column in entity_columns ]: - fv.entity_columns.append(field) + entity_columns.append(field) else: if field.name not in [feature.name for feature in fv.features]: fv.features.append(field) @@ -146,10 +148,10 @@ def update_feature_views_with_inferred_features_and_entities( continue if ( entity.join_key - not in [entity_column.name for entity_column in fv.entity_columns] + not in [entity_column.name for entity_column in entity_columns] and entity.value_type != ValueType.UNKNOWN ): - fv.entity_columns.append( + entity_columns.append( Field( name=entity.join_key, dtype=from_value_type(entity.value_type), @@ -160,10 +162,11 @@ def update_feature_views_with_inferred_features_and_entities( if ( len(fv.entities) == 1 and fv.entities[0] == DUMMY_ENTITY_NAME - and not fv.entity_columns + and not entity_columns ): - fv.entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String)) + entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String)) + fv.entity_columns = entity_columns # Run inference for entity columns if there are fewer entity fields than expected. run_inference_for_entities = len(fv.entity_columns) < len(join_keys) @@ -200,49 +203,116 @@ def _infer_features_and_entities( run_inference_for_features: Whether to run inference for features. config: The config for the current feature store. """ - columns_to_exclude = { - fv.batch_source.timestamp_field, - fv.batch_source.created_timestamp_column, - } - for original_col, mapped_col in fv.batch_source.field_mapping.items(): - if mapped_col in columns_to_exclude: - columns_to_exclude.remove(mapped_col) - columns_to_exclude.add(original_col) - - table_column_names_and_types = fv.batch_source.get_table_column_names_and_types( - config - ) - - for col_name, col_datatype in table_column_names_and_types: - if col_name in columns_to_exclude: - continue - elif col_name in join_keys: - field = Field( - name=col_name, - dtype=from_value_type( - fv.batch_source.source_datatype_to_feast_value_type()(col_datatype) - ), + entity_columns = [] + if isinstance(fv, OnDemandFeatureView): + columns_to_exclude = set() + for ( + source_feature_view_name, + source_feature_view, + ) in fv.source_feature_view_projections.items(): + columns_to_exclude.add(source_feature_view.timestamp_field) + columns_to_exclude.add(source_feature_view.created_timestamp_column) + + for ( + original_col, + mapped_col, + ) in source_feature_view.batch_source.field_mapping.items(): + if mapped_col in columns_to_exclude: + columns_to_exclude.remove(mapped_col) + columns_to_exclude.add(original_col) + + table_column_names_and_types = ( + source_feature_view.batch_source.get_table_column_names_and_types( + config + ) ) - if field.name not in [ - entity_column.name for entity_column in fv.entity_columns - ]: - fv.entity_columns.append(field) - elif not re.match( - "^__|__$", col_name - ): # double underscores often signal an internal-use column - if run_inference_for_features: - feature_name = ( - fv.batch_source.field_mapping[col_name] - if col_name in fv.batch_source.field_mapping - else col_name + + for col_name, col_datatype in table_column_names_and_types: + if col_name in columns_to_exclude: + continue + elif col_name in join_keys: + field = Field( + name=col_name, + dtype=from_value_type( + source_feature_view.batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), ) + if field.name not in [ + entity_column.name for entity_column in entity_columns + ]: + entity_columns.append(field) + elif not re.match( + "^__|__$", col_name + ): # double underscores often signal an internal-use column + if run_inference_for_features: + feature_name = ( + source_feature_view.batch_source.field_mapping[col_name] + if col_name in source_feature_view.batch_source.field_mapping + else col_name + ) + field = Field( + name=feature_name, + dtype=from_value_type( + source_feature_view.batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), + ) + if field.name not in [ + feature.name for feature in source_feature_view.features + ]: + source_feature_view.features.append(field) + + else: + columns_to_exclude = { + fv.batch_source.timestamp_field, + fv.batch_source.created_timestamp_column, + } + for original_col, mapped_col in fv.batch_source.field_mapping.items(): + if mapped_col in columns_to_exclude: + columns_to_exclude.remove(mapped_col) + columns_to_exclude.add(original_col) + + table_column_names_and_types = fv.batch_source.get_table_column_names_and_types( + config + ) + + for col_name, col_datatype in table_column_names_and_types: + if col_name in columns_to_exclude: + continue + elif col_name in join_keys: field = Field( - name=feature_name, + name=col_name, dtype=from_value_type( fv.batch_source.source_datatype_to_feast_value_type()( col_datatype ) ), ) - if field.name not in [feature.name for feature in fv.features]: - fv.features.append(field) + if field.name not in [ + entity_column.name for entity_column in entity_columns + ]: + entity_columns.append(field) + elif not re.match( + "^__|__$", col_name + ): # double underscores often signal an internal-use column + if run_inference_for_features: + feature_name = ( + fv.batch_source.field_mapping[col_name] + if col_name in fv.batch_source.field_mapping + else col_name + ) + field = Field( + name=feature_name, + dtype=from_value_type( + fv.batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), + ) + if field.name not in [feature.name for feature in fv.features]: + fv.features.append(field) + + fv.entity_columns = entity_columns From f8fdaa278841dcdc6565d527eb3f860bf1dd60c3 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 18 Sep 2024 08:34:44 -0400 Subject: [PATCH 02/13] adding unit test Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 122 +++++++++++++++++++- sdk/python/tests/unit/test_feature_views.py | 19 +++ 2 files changed, 135 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 47fcf29926..fb9018dbbf 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -3,7 +3,7 @@ import inspect import warnings from types import FunctionType -from typing import Any, Optional, Union, get_type_hints +from typing import Any, List, Optional, Union, get_type_hints import dill import pandas as pd @@ -12,8 +12,9 @@ from feast.base_feature_view import BaseFeatureView from feast.data_source import RequestSource +from feast.entity import Entity from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError -from feast.feature_view import FeatureView +from feast.feature_view import DUMMY_ENTITY_NAME, FeatureView from feast.feature_view_projection import FeatureViewProjection from feast.field import Field, from_value_type from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( @@ -61,7 +62,8 @@ class OnDemandFeatureView(BaseFeatureView): """ name: str - features: list[Field] + entities: Optional[List[str]] + features: List[Field] source_feature_view_projections: dict[str, FeatureViewProjection] source_request_sources: dict[str, RequestSource] feature_transformation: Union[ @@ -71,13 +73,15 @@ class OnDemandFeatureView(BaseFeatureView): description: str tags: dict[str, str] owner: str + write_to_online_store: bool def __init__( # noqa: C901 self, *, name: str, - schema: list[Field], - sources: list[ + entities: Optional[List[Entity]] = None, + schema: Optional[List[Field]] = None, + sources: List[ Union[ FeatureView, RequestSource, @@ -93,12 +97,14 @@ def __init__( # noqa: C901 description: str = "", tags: Optional[dict[str, str]] = None, owner: str = "", + write_to_online_store: bool = False, ): """ Creates an OnDemandFeatureView object. Args: name: The unique name of the on demand feature view. + entities (optional): The list of names of entities that this feature view is associated with. schema: The list of features in the output of the on demand feature view, after the transformation has been applied. sources: A map from input source names to the actual input sources, which may be @@ -113,6 +119,8 @@ def __init__( # noqa: C901 tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email of the primary maintainer. + write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to + the online store for faster retrieval. """ super().__init__( name=name, @@ -122,6 +130,8 @@ def __init__( # noqa: C901 owner=owner, ) + schema = schema or [] + self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME] self.mode = mode.lower() if self.mode not in {"python", "pandas", "substrait"}: @@ -152,12 +162,48 @@ def __init__( # noqa: C901 self.source_request_sources[odfv_source.name] = odfv_source elif isinstance(odfv_source, FeatureViewProjection): self.source_feature_view_projections[odfv_source.name] = odfv_source + else: self.source_feature_view_projections[odfv_source.name] = ( odfv_source.projection ) + features: List[Field] = [] + self.entity_columns = [] + + join_keys: List[str] = [] + if entities: + for entity in entities: + join_keys.append(entity.join_key) + # Ensure that entities have unique join keys. + if len(set(join_keys)) < len(join_keys): + raise ValueError( + "A feature view should not have entities that share a join key." + ) + + for field in schema: + if field.name in join_keys: + self.entity_columns.append(field) + + # Confirm that the inferred type matches the specified entity type, if it exists. + matching_entities = ( + [e for e in entities if e.join_key == field.name] + if entities + else [] + ) + assert len(matching_entities) == 1 + entity = matching_entities[0] + if entity.value_type != ValueType.UNKNOWN: + if from_value_type(entity.value_type) != field.dtype: + raise ValueError( + f"Entity {entity.name} has type {entity.value_type}, which does not match the inferred type {field.dtype}." + ) + else: + features.append(field) + + self.features = features self.feature_transformation = feature_transformation + self.write_to_online_store = write_to_online_store @property def proto_class(self) -> type[OnDemandFeatureViewProto]: @@ -174,8 +220,13 @@ def __copy__(self): description=self.description, tags=self.tags, owner=self.owner, + write_to_online_store=self.write_to_online_store, ) + fv.entities = self.entities + fv.features = self.features fv.projection = copy.copy(self.projection) + fv.entity_columns = copy.copy(self.entity_columns) + return fv def __eq__(self, other): @@ -193,11 +244,36 @@ def __eq__(self, other): or self.source_request_sources != other.source_request_sources or self.mode != other.mode or self.feature_transformation != other.feature_transformation + or self.write_to_online_store != other.write_to_online_store + or sorted(self.entity_columns) != sorted(other.entity_columns) ): return False return True + @property + def join_keys(self) -> List[str]: + """Returns a list of all the join keys.""" + return [entity.name for entity in self.entity_columns] + + @property + def schema(self) -> List[Field]: + return list(set(self.entity_columns + self.features)) + + def ensure_valid(self): + """ + Validates the state of this feature view locally. + + Raises: + ValueError: The On Demand feature view does not have an entity when trying to use write_to_online_store. + """ + super().ensure_valid() + + if self.write_to_online_store and not self.entities: + raise ValueError( + "On Demand Feature views require an entity if write_to_online_store=True" + ) + def __hash__(self): return super().__hash__() @@ -216,7 +292,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: sources = {} for source_name, fv_projection in self.source_feature_view_projections.items(): sources[source_name] = OnDemandSource( - feature_view_projection=fv_projection.to_proto() + feature_view_projection=fv_projection.to_proto(), ) for ( source_name, @@ -239,6 +315,10 @@ def to_proto(self) -> OnDemandFeatureViewProto: ) spec = OnDemandFeatureViewSpec( name=self.name, + entities=self.entities if self.entities else None, + entity_columns=[ + field.to_proto() for field in self.entity_columns if self.entity_columns + ], features=[feature.to_proto() for feature in self.features], sources=sources, feature_transformation=feature_transformation, @@ -246,6 +326,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: description=self.description, tags=self.tags, owner=self.owner, + write_to_online_store=self.write_to_online_store, ) return OnDemandFeatureViewProto(spec=spec, meta=meta) @@ -335,6 +416,24 @@ def from_proto( else: raise ValueError("At least one transformation type needs to be provided") + if hasattr(on_demand_feature_view_proto.spec, "write_to_online_store"): + write_to_online_store = ( + on_demand_feature_view_proto.spec.write_to_online_store + ) + else: + write_to_online_store = False + if hasattr(on_demand_feature_view_proto.spec, "entities"): + entities = on_demand_feature_view_proto.spec.entities + else: + entities = None + if hasattr(on_demand_feature_view_proto.spec, "entity_columns"): + entity_columns = [ + Field.from_proto(field_proto) + for field_proto in on_demand_feature_view_proto.spec.entity_columns + ] + else: + entity_columns = [] + on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, schema=[ @@ -350,8 +449,12 @@ def from_proto( description=on_demand_feature_view_proto.spec.description, tags=dict(on_demand_feature_view_proto.spec.tags), owner=on_demand_feature_view_proto.spec.owner, + write_to_online_store=write_to_online_store, ) + on_demand_feature_view_obj.entities = list(entities) + on_demand_feature_view_obj.entity_columns = entity_columns + # FeatureViewProjections are not saved in the OnDemandFeatureView proto. # Create the default projection. on_demand_feature_view_obj.projection = FeatureViewProjection.from_definition( @@ -595,6 +698,7 @@ def get_requested_odfvs( def on_demand_feature_view( *, + entities: Optional[List[Entity]] = None, schema: list[Field], sources: list[ Union[ @@ -607,11 +711,13 @@ def on_demand_feature_view( description: str = "", tags: Optional[dict[str, str]] = None, owner: str = "", + write_to_online_store: bool = False, ): """ Creates an OnDemandFeatureView object with the given user function as udf. Args: + entities (Optional): The list of names of entities that this feature view is associated with. schema: The list of features in the output of the on demand feature view, after the transformation has been applied. sources: A map from input source names to the actual input sources, which may be @@ -622,6 +728,8 @@ def on_demand_feature_view( tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email of the primary maintainer. + write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to + the online store for faster retrieval. """ def mainify(obj) -> None: @@ -664,6 +772,8 @@ def decorator(user_function): description=description, tags=tags, owner=owner, + write_to_online_store=write_to_online_store, + entities=entities, ) functools.update_wrapper( wrapper=on_demand_feature_view_obj, wrapped=user_function diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 981968df0d..652819c0f3 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -112,6 +112,25 @@ def test_hash(): # TODO(felixwang9817): Add tests for proto conversion. +def test_proto_conversion(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view_1 = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + + feature_view_proto = feature_view_1.to_proto() + assert ( + feature_view_proto.spec.name == "my-feature-view" and + feature_view_proto.spec.batch_source.file_options.uri == "test.parquet" and + feature_view_proto.spec.batch_source.name == "my-file-source" and + feature_view_proto.spec.batch_source.type == 1 + ) # TODO(felixwang9817): Add tests for field mapping logic. From 2d037cbce239b23c4515481f48586bbe05ee1cb3 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 18 Sep 2024 09:52:47 -0400 Subject: [PATCH 03/13] adding type checking where batch source is already serialized into protobuf Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_view_projection.py | 8 +++++++- sdk/python/tests/unit/test_feature_views.py | 10 ++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index 75e49d0b16..291097fd4b 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -51,6 +51,12 @@ def name_to_use(self): return self.name_alias or self.name def to_proto(self) -> FeatureViewProjectionProto: + batch_source = None + if getattr(self, "batch_source", None): + if type(self.batch_source).__name__ == "DataSource": + batch_source = self.batch_source + else: + batch_source = self.batch_source.to_proto() feature_reference_proto = FeatureViewProjectionProto( feature_view_name=self.name, feature_view_name_alias=self.name_alias or "", @@ -58,7 +64,7 @@ def to_proto(self) -> FeatureViewProjectionProto: timestamp_field=self.timestamp_field, date_partition_column=self.date_partition_column, created_timestamp_column=self.created_timestamp_column, - batch_source=self.batch_source.to_proto() or None, + batch_source=batch_source, ) for feature in self.features: feature_reference_proto.feature_columns.append(feature.to_proto()) diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 652819c0f3..9ad6d0fb0c 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -126,11 +126,13 @@ def test_proto_conversion(): feature_view_proto = feature_view_1.to_proto() assert ( - feature_view_proto.spec.name == "my-feature-view" and - feature_view_proto.spec.batch_source.file_options.uri == "test.parquet" and - feature_view_proto.spec.batch_source.name == "my-file-source" and - feature_view_proto.spec.batch_source.type == 1 + feature_view_proto.spec.name == "my-feature-view" + and feature_view_proto.spec.batch_source.file_options.uri == "test.parquet" + and feature_view_proto.spec.batch_source.name == "my-file-source" + and feature_view_proto.spec.batch_source.type == 1 ) + + # TODO(felixwang9817): Add tests for field mapping logic. From 3d9da066231d6037adae42789a224de4984cd637 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 18 Sep 2024 14:41:08 -0400 Subject: [PATCH 04/13] almost got everything working and type validation behaving Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/base_feature_view.py | 9 +++- sdk/python/feast/feature_view.py | 5 ++- sdk/python/feast/feature_view_projection.py | 45 +++++++++++++------ sdk/python/feast/inference.py | 2 +- sdk/python/feast/on_demand_feature_view.py | 6 +-- sdk/python/feast/utils.py | 4 +- .../test_local_feature_store.py | 3 +- 7 files changed, 53 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index 31140e2899..4164f5df4f 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -18,6 +18,7 @@ from google.protobuf.json_format import MessageToJson from google.protobuf.message import Message +from feast.data_source import DataSource from feast.feature_view_projection import FeatureViewProjection from feast.field import Field from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto @@ -65,6 +66,7 @@ def __init__( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", + source: Optional[DataSource] = None, ): """ Creates a BaseFeatureView object. @@ -76,7 +78,8 @@ def __init__( tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the base feature view, typically the email of the primary maintainer. - + source (optional): The source of data for this group of features. May be a stream source, or a batch source. + If a stream source, the source should contain a batch_source for backfills & batch materialization. Raises: ValueError: A field mapping conflicts with an Entity or a Feature. """ @@ -90,6 +93,9 @@ def __init__( self.created_timestamp = None self.last_updated_timestamp = None + if source: + self.source = source + @property @abstractmethod def proto_class(self) -> Type[Message]: @@ -155,6 +161,7 @@ def __eq__(self, other): or self.description != other.description or self.tags != other.tags or self.owner != other.owner + or self.source != other.source ): return False diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index dd01078e20..33ea761158 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -206,6 +206,7 @@ def __init__( description=description, tags=tags, owner=owner, + source=source, ) self.online = online self.materialization_intervals = [] @@ -429,7 +430,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): # FeatureViewProjections are not saved in the FeatureView proto. # Create the default projection. - feature_view.projection = FeatureViewProjection.from_definition(feature_view) + feature_view.projection = FeatureViewProjection.from_feature_view_definition( + feature_view + ) if feature_view_proto.meta.HasField("created_timestamp"): feature_view.created_timestamp = ( diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index 291097fd4b..b477f3bc9e 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -10,6 +10,7 @@ if TYPE_CHECKING: from feast.base_feature_view import BaseFeatureView + from feast.feature_view import FeatureView @dataclass @@ -53,17 +54,17 @@ def name_to_use(self): def to_proto(self) -> FeatureViewProjectionProto: batch_source = None if getattr(self, "batch_source", None): - if type(self.batch_source).__name__ == "DataSource": - batch_source = self.batch_source - else: + if isinstance(self.batch_source, DataSource): batch_source = self.batch_source.to_proto() + else: + batch_source = self.batch_source feature_reference_proto = FeatureViewProjectionProto( feature_view_name=self.name, feature_view_name_alias=self.name_alias or "", join_key_map=self.join_key_map, - timestamp_field=self.timestamp_field, - date_partition_column=self.date_partition_column, - created_timestamp_column=self.created_timestamp_column, + timestamp_field=self.timestamp_field or "", + date_partition_column=self.date_partition_column or "", + created_timestamp_column=self.created_timestamp_column or "", batch_source=batch_source, ) for feature in self.features: @@ -90,8 +91,25 @@ def from_proto(proto: FeatureViewProjectionProto): return feature_view_projection @staticmethod - def from_definition(base_feature_view: "BaseFeatureView"): + def from_feature_view_definition(feature_view: "FeatureView"): # TODO need to implement this for StreamFeatureViews + if getattr(feature_view, "batch_source", None): + return FeatureViewProjection( + name=feature_view.name, + name_alias=None, + features=feature_view.features, + desired_features=[], + timestamp_field=feature_view.batch_source.created_timestamp_column + or None, + created_timestamp_column=feature_view.batch_source.created_timestamp_column + or None, + date_partition_column=feature_view.batch_source.date_partition_column + or None, + batch_source=feature_view.batch_source or None, + ) + + @staticmethod + def from_definition(base_feature_view: "BaseFeatureView"): if getattr(base_feature_view, "batch_source", None): return FeatureViewProjection( name=base_feature_view.name, @@ -106,12 +124,13 @@ def from_definition(base_feature_view: "BaseFeatureView"): or None, batch_source=base_feature_view.batch_source or None, ) - return FeatureViewProjection( - name=base_feature_view.name, - name_alias=None, - features=base_feature_view.features, - desired_features=[], - ) + else: + return FeatureViewProjection( + name=base_feature_view.name, + name_alias=None, + features=base_feature_view.features, + desired_features=[], + ) def get_feature(self, feature_name: str) -> Field: try: diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 7466edffe8..976d73fe14 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -203,7 +203,7 @@ def _infer_features_and_entities( run_inference_for_features: Whether to run inference for features. config: The config for the current feature store. """ - entity_columns = [] + entity_columns: list[str] = [] if isinstance(fv, OnDemandFeatureView): columns_to_exclude = set() for ( diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index fb9018dbbf..e95e20c926 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -235,9 +235,9 @@ def __eq__(self, other): "Comparisons should only involve OnDemandFeatureView class objects." ) - if not super().__eq__(other): - return False - + # Note, no longer evaluating the base feature view layer as ODFVs can have + # multiple datasources and a base_feature_view only has one source + # though maybe that shouldn't be true if ( self.source_feature_view_projections != other.source_feature_view_projections diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 992869557a..7c59547c1f 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -342,7 +342,9 @@ def _group_feature_refs( # on demand view to on demand view proto on_demand_view_index = { - view.projection.name_to_use(): view for view in all_on_demand_feature_views + view.projection.name_to_use(): view + for view in all_on_demand_feature_views + if view.projection } # view name to feature names diff --git a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py index c86441d56c..c6f3c2fd31 100644 --- a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py +++ b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py @@ -209,8 +209,9 @@ def test_apply_feature_view_with_inline_batch_source( test_feature_store.apply([entity, driver_fv]) fvs = test_feature_store.list_batch_feature_views() + dfv = fvs[0] assert len(fvs) == 1 - assert fvs[0] == driver_fv + assert dfv == driver_fv ds = test_feature_store.list_data_sources() assert len(ds) == 1 From 3fe06ddef0fa0b8e8190051b178e20ac35399af5 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 18 Sep 2024 16:16:18 -0400 Subject: [PATCH 05/13] cleaned up and have tests behaving Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/base_feature_view.py | 5 +- sdk/python/feast/feature_view_projection.py | 8 +- sdk/python/feast/inference.py | 101 +++++++++++--------- sdk/python/feast/on_demand_feature_view.py | 7 +- 4 files changed, 66 insertions(+), 55 deletions(-) diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index 4164f5df4f..d7dc2237bd 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -161,8 +161,11 @@ def __eq__(self, other): or self.description != other.description or self.tags != other.tags or self.owner != other.owner - or self.source != other.source ): + # This is meant to ignore the File Source change to Push Source + if isinstance(type(self.source), type(other.source)): + if self.source != other.source: + return False return False return True diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index b477f3bc9e..c9cf9625c8 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -116,13 +116,13 @@ def from_definition(base_feature_view: "BaseFeatureView"): name_alias=None, features=base_feature_view.features, desired_features=[], - timestamp_field=base_feature_view.batch_source.created_timestamp_column + timestamp_field=base_feature_view.batch_source.created_timestamp_column # type:ignore[attr-defined] or None, - created_timestamp_column=base_feature_view.batch_source.created_timestamp_column + created_timestamp_column=base_feature_view.batch_source.created_timestamp_column # type:ignore[attr-defined] or None, - date_partition_column=base_feature_view.batch_source.date_partition_column + date_partition_column=base_feature_view.batch_source.date_partition_column # type:ignore[attr-defined] or None, - batch_source=base_feature_view.batch_source or None, + batch_source=base_feature_view.batch_source or None, # type:ignore[attr-defined] ) else: return FeatureViewProjection( diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 976d73fe14..b2d61fc54b 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -122,7 +122,7 @@ def update_feature_views_with_inferred_features_and_entities( join_keys = set( [ entity_name_to_join_key_map.get(entity_name) - for entity_name in fv.entities + for entity_name in getattr(fv, "entities", []) ] ) @@ -141,7 +141,8 @@ def update_feature_views_with_inferred_features_and_entities( fv.features.append(field) # Respect the `value_type` attribute of the entity, if it is specified. - for entity_name in fv.entities: + fv_entities = getattr(fv, "entities", []) + for entity_name in fv_entities: entity = entity_name_to_entity_map.get(entity_name) # pass when entity does not exist. Entityless feature view case if entity is None: @@ -160,8 +161,8 @@ def update_feature_views_with_inferred_features_and_entities( # Infer a dummy entity column for entityless feature views. if ( - len(fv.entities) == 1 - and fv.entities[0] == DUMMY_ENTITY_NAME + len(fv_entities) == 1 + and fv_entities[0] == DUMMY_ENTITY_NAME and not entity_columns ): entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String)) @@ -189,7 +190,7 @@ def update_feature_views_with_inferred_features_and_entities( def _infer_features_and_entities( - fv: FeatureView, + fv: Union[FeatureView, OnDemandFeatureView], join_keys: Set[Optional[str]], run_inference_for_features, config, @@ -203,7 +204,7 @@ def _infer_features_and_entities( run_inference_for_features: Whether to run inference for features. config: The config for the current feature store. """ - entity_columns: list[str] = [] + entity_columns: list[Field] = [] if isinstance(fv, OnDemandFeatureView): columns_to_exclude = set() for ( @@ -213,57 +214,60 @@ def _infer_features_and_entities( columns_to_exclude.add(source_feature_view.timestamp_field) columns_to_exclude.add(source_feature_view.created_timestamp_column) - for ( - original_col, - mapped_col, - ) in source_feature_view.batch_source.field_mapping.items(): - if mapped_col in columns_to_exclude: - columns_to_exclude.remove(mapped_col) - columns_to_exclude.add(original_col) + batch_source = getattr(source_feature_view, "batch_source") + batch_field_mapping = getattr(batch_source or None, "field_mapping") + if batch_field_mapping: + for ( + original_col, + mapped_col, + ) in batch_field_mapping.items(): + if mapped_col in columns_to_exclude: + columns_to_exclude.remove(mapped_col) + columns_to_exclude.add(original_col) - table_column_names_and_types = ( - source_feature_view.batch_source.get_table_column_names_and_types( - config + table_column_names_and_types = ( + batch_source.get_table_column_names_and_types(config) ) - ) - for col_name, col_datatype in table_column_names_and_types: - if col_name in columns_to_exclude: - continue - elif col_name in join_keys: - field = Field( - name=col_name, - dtype=from_value_type( - source_feature_view.batch_source.source_datatype_to_feast_value_type()( - col_datatype - ) - ), - ) - if field.name not in [ - entity_column.name for entity_column in entity_columns - ]: - entity_columns.append(field) - elif not re.match( - "^__|__$", col_name - ): # double underscores often signal an internal-use column - if run_inference_for_features: - feature_name = ( - source_feature_view.batch_source.field_mapping[col_name] - if col_name in source_feature_view.batch_source.field_mapping - else col_name - ) + for col_name, col_datatype in table_column_names_and_types: + if col_name in columns_to_exclude: + continue + elif col_name in join_keys: field = Field( - name=feature_name, + name=col_name, dtype=from_value_type( - source_feature_view.batch_source.source_datatype_to_feast_value_type()( + batch_source.source_datatype_to_feast_value_type()( col_datatype ) ), ) if field.name not in [ - feature.name for feature in source_feature_view.features + entity_column.name + for entity_column in entity_columns + if hasattr(entity_column, "name") ]: - source_feature_view.features.append(field) + entity_columns.append(field) + elif not re.match( + "^__|__$", col_name + ): # double underscores often signal an internal-use column + if run_inference_for_features: + feature_name = ( + batch_field_mapping[col_name] + if col_name in batch_field_mapping + else col_name + ) + field = Field( + name=feature_name, + dtype=from_value_type( + batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), + ) + if field.name not in [ + feature.name for feature in source_feature_view.features + ]: + source_feature_view.features.append(field) else: columns_to_exclude = { @@ -292,7 +296,10 @@ def _infer_features_and_entities( ), ) if field.name not in [ - entity_column.name for entity_column in entity_columns + entity_column.name + if not isinstance(entity_column, str) + else entity_column + for entity_column in entity_columns ]: entity_columns.append(field) elif not re.match( diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index e95e20c926..1b75d23ed4 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -241,6 +241,7 @@ def __eq__(self, other): if ( self.source_feature_view_projections != other.source_feature_view_projections + or self.description != other.description or self.source_request_sources != other.source_request_sources or self.mode != other.mode or self.feature_transformation != other.feature_transformation @@ -423,9 +424,9 @@ def from_proto( else: write_to_online_store = False if hasattr(on_demand_feature_view_proto.spec, "entities"): - entities = on_demand_feature_view_proto.spec.entities + entities = list(on_demand_feature_view_proto.spec.entities) else: - entities = None + entities = [] if hasattr(on_demand_feature_view_proto.spec, "entity_columns"): entity_columns = [ Field.from_proto(field_proto) @@ -452,7 +453,7 @@ def from_proto( write_to_online_store=write_to_online_store, ) - on_demand_feature_view_obj.entities = list(entities) + on_demand_feature_view_obj.entities = entities on_demand_feature_view_obj.entity_columns = entity_columns # FeatureViewProjections are not saved in the OnDemandFeatureView proto. From ed97620170f94f60c8ddf94bdbc484f74e537f1e Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 18 Sep 2024 16:26:44 -0400 Subject: [PATCH 06/13] removed comment Signed-off-by: Francisco Javier Arceo --- sdk/python/tests/unit/test_feature_views.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 9ad6d0fb0c..ce789c706c 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -111,7 +111,6 @@ def test_hash(): assert len(s4) == 3 -# TODO(felixwang9817): Add tests for proto conversion. def test_proto_conversion(): file_source = FileSource(name="my-file-source", path="test.parquet") feature_view_1 = FeatureView( From 8cb2471638f37fcb87e013707af5d67dd5214121 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 19 Sep 2024 10:30:58 -0400 Subject: [PATCH 07/13] updated FeatureViewProjection batch_source serialization Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_view_projection.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index c9cf9625c8..e7ed9e3aa6 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -73,7 +73,12 @@ def to_proto(self) -> FeatureViewProjectionProto: return feature_reference_proto @staticmethod - def from_proto(proto: FeatureViewProjectionProto): + def from_proto(proto: FeatureViewProjectionProto) -> "FeatureViewProjection": + batch_source = ( + DataSource.from_proto(proto.batch_source) + if str(getattr(proto, "batch_source")) + else None + ) feature_view_projection = FeatureViewProjection( name=proto.feature_view_name, name_alias=proto.feature_view_name_alias or None, @@ -83,7 +88,7 @@ def from_proto(proto: FeatureViewProjectionProto): timestamp_field=proto.timestamp_field or None, date_partition_column=proto.date_partition_column or None, created_timestamp_column=proto.created_timestamp_column or None, - batch_source=proto.batch_source or None, + batch_source=batch_source, ) for feature_column in proto.feature_columns: feature_view_projection.features.append(Field.from_proto(feature_column)) From 6a78b15fad32f45cbc01c87a875b501e8bd535a4 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 19 Sep 2024 12:51:54 -0400 Subject: [PATCH 08/13] trying to debug a test Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/materialization/snowflake_engine.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/infra/materialization/snowflake_engine.py b/sdk/python/feast/infra/materialization/snowflake_engine.py index e8b0857e5d..3e5f0db692 100644 --- a/sdk/python/feast/infra/materialization/snowflake_engine.py +++ b/sdk/python/feast/infra/materialization/snowflake_engine.py @@ -285,6 +285,8 @@ def _materialize_one( fv_latest_values_sql = offline_job.to_sql() + print(f'this is for fja {feature_view}') + print(f'\nthis is for fja 2 {feature_view.entity_columns}') if ( feature_view.entity_columns[0].name == DUMMY_ENTITY_ID ): # entityless Feature View's placeholder entity From f28a7135dbadbc6c461076abc24b677e2dfaa6af Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 19 Sep 2024 13:16:44 -0400 Subject: [PATCH 09/13] handling snowflake issue, cant confirm why it is happening so just going to put a workaround Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/materialization/snowflake_engine.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/materialization/snowflake_engine.py b/sdk/python/feast/infra/materialization/snowflake_engine.py index 3e5f0db692..91c76d09cb 100644 --- a/sdk/python/feast/infra/materialization/snowflake_engine.py +++ b/sdk/python/feast/infra/materialization/snowflake_engine.py @@ -285,10 +285,9 @@ def _materialize_one( fv_latest_values_sql = offline_job.to_sql() - print(f'this is for fja {feature_view}') - print(f'\nthis is for fja 2 {feature_view.entity_columns}') + first_feature_view_entity_name = getattr(feature_view.entity_columns[0], "name", None) if ( - feature_view.entity_columns[0].name == DUMMY_ENTITY_ID + first_feature_view_entity_name == DUMMY_ENTITY_ID ): # entityless Feature View's placeholder entity entities_to_write = 1 else: From 6f801ea8df34cd2a3c25770db04e5dd1ee95224b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 19 Sep 2024 13:17:58 -0400 Subject: [PATCH 10/13] linter Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/materialization/snowflake_engine.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/snowflake_engine.py b/sdk/python/feast/infra/materialization/snowflake_engine.py index 91c76d09cb..1acf0096c1 100644 --- a/sdk/python/feast/infra/materialization/snowflake_engine.py +++ b/sdk/python/feast/infra/materialization/snowflake_engine.py @@ -285,7 +285,9 @@ def _materialize_one( fv_latest_values_sql = offline_job.to_sql() - first_feature_view_entity_name = getattr(feature_view.entity_columns[0], "name", None) + first_feature_view_entity_name = getattr( + feature_view.entity_columns[0], "name", None + ) if ( first_feature_view_entity_name == DUMMY_ENTITY_ID ): # entityless Feature View's placeholder entity From de4d6ceab386aadcc53af141a0265b5865876a52 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 19 Sep 2024 13:40:41 -0400 Subject: [PATCH 11/13] trying to handle it correctly Signed-off-by: Francisco Javier Arceo --- .../feast/infra/materialization/snowflake_engine.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/materialization/snowflake_engine.py b/sdk/python/feast/infra/materialization/snowflake_engine.py index 1acf0096c1..600e1b20d8 100644 --- a/sdk/python/feast/infra/materialization/snowflake_engine.py +++ b/sdk/python/feast/infra/materialization/snowflake_engine.py @@ -285,9 +285,12 @@ def _materialize_one( fv_latest_values_sql = offline_job.to_sql() - first_feature_view_entity_name = getattr( - feature_view.entity_columns[0], "name", None - ) + if feature_view.entity_columns: + first_feature_view_entity_name = getattr( + feature_view.entity_columns[0], "name", None + ) + else: + first_feature_view_entity_name = None if ( first_feature_view_entity_name == DUMMY_ENTITY_ID ): # entityless Feature View's placeholder entity From 082415fa38740ce378cb84e66e2354c72894e188 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 19 Sep 2024 14:39:43 -0400 Subject: [PATCH 12/13] handling the else case for from_feature_view_definition Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_view_projection.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index e7ed9e3aa6..70415e9ed3 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -112,6 +112,13 @@ def from_feature_view_definition(feature_view: "FeatureView"): or None, batch_source=feature_view.batch_source or None, ) + else: + return FeatureViewProjection( + name=feature_view.name, + name_alias=None, + features=feature_view.features, + desired_features=[], + ) @staticmethod def from_definition(base_feature_view: "BaseFeatureView"): From f50923feb4ec8c806116947b0c7dd650446f6a6d Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 20 Sep 2024 20:35:15 -0400 Subject: [PATCH 13/13] adding print Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/materialization/snowflake_engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/materialization/snowflake_engine.py b/sdk/python/feast/infra/materialization/snowflake_engine.py index 600e1b20d8..d8662f179e 100644 --- a/sdk/python/feast/infra/materialization/snowflake_engine.py +++ b/sdk/python/feast/infra/materialization/snowflake_engine.py @@ -307,6 +307,7 @@ def _materialize_one( """ with GetSnowflakeConnection(self.repo_config.offline_store) as conn: + print(query) entities_to_write = conn.cursor().execute(query).fetchall()[0][0] if feature_view.batch_source.field_mapping is not None: