Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Updating OnDemandFeatureView to add Entities and batch_source #4530

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions protos/feast/core/FeatureViewProjection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ option java_outer_classname = "FeatureReferenceProto";
option java_package = "feast.proto.core";

import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";


// A projection to be applied on top of a FeatureView.
Expand All @@ -22,4 +23,13 @@ message FeatureViewProjection {

// Map for entity join_key overrides of feature data entity join_key to entity data join_key
map<string,string> join_key_map = 4;

string timestamp_field = 5;
string date_partition_column = 6;
string created_timestamp_column = 7;
// Batch/Offline DataSource where this view can retrieve offline feature data.
DataSource batch_source = 8;
// Streaming DataSource from where this view can consume "online" feature data.
DataSource stream_source = 9;

}
6 changes: 6 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ message OnDemandFeatureViewSpec {
// Owner of the on demand feature view.
string owner = 8;
string mode = 11;
bool write_to_online_store = 12;

// List of names of entities associated with this feature view.
repeated string entities = 13;
// List of specifications for each entity defined as part of this feature view.
repeated FeatureSpecV2 entity_columns = 14;
}

message OnDemandFeatureViewMeta {
Expand Down
12 changes: 11 additions & 1 deletion sdk/python/feast/base_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message

from feast.data_source import DataSource
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
source: Optional[DataSource] = None,
):
"""
Creates a BaseFeatureView object.
Expand All @@ -76,7 +78,8 @@ def __init__(
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the base feature view, typically the email of the
primary maintainer.

source (optional): The source of data for this group of features. May be a stream source, or a batch source.
If a stream source, the source should contain a batch_source for backfills & batch materialization.
Raises:
ValueError: A field mapping conflicts with an Entity or a Feature.
"""
Expand All @@ -90,6 +93,9 @@ def __init__(
self.created_timestamp = None
self.last_updated_timestamp = None

if source:
self.source = source

@property
@abstractmethod
def proto_class(self) -> Type[Message]:
Expand Down Expand Up @@ -156,6 +162,10 @@ def __eq__(self, other):
or self.tags != other.tags
or self.owner != other.owner
):
# This is meant to ignore the File Source change to Push Source
if isinstance(type(self.source), type(other.source)):
if self.source != other.source:
return False
return False

return True
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def __init__(
description=description,
tags=tags,
owner=owner,
source=source,
)
self.online = online
self.materialization_intervals = []
Expand Down Expand Up @@ -429,7 +430,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):

# FeatureViewProjections are not saved in the FeatureView proto.
# Create the default projection.
feature_view.projection = FeatureViewProjection.from_definition(feature_view)
feature_view.projection = FeatureViewProjection.from_feature_view_definition(
feature_view
)

if feature_view_proto.meta.HasField("created_timestamp"):
feature_view.created_timestamp = (
Expand Down
86 changes: 79 additions & 7 deletions sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

from attr import dataclass

from feast.data_source import DataSource
from feast.field import Field
from feast.protos.feast.core.FeatureViewProjection_pb2 import (
FeatureViewProjection as FeatureViewProjectionProto,
)

if TYPE_CHECKING:
from feast.base_feature_view import BaseFeatureView
from feast.feature_view import FeatureView


@dataclass
Expand All @@ -27,50 +29,120 @@ class FeatureViewProjection:
is not ready to be projected, i.e. still needs to go through feature inference.
join_key_map: A map to modify join key columns during retrieval of this feature
view projection.
timestamp_field: The timestamp field of the feature view projection.
date_partition_column: The date partition column of the feature view projection.
created_timestamp_column: The created timestamp column of the feature view projection.
batch_source: The batch source of data where this group of features
is stored. This is optional ONLY if a push source is specified as the
stream_source, since push sources contain their own batch sources.

"""

name: str
name_alias: Optional[str]
desired_features: List[str]
features: List[Field]
join_key_map: Dict[str, str] = {}
timestamp_field: Optional[str] = None
date_partition_column: Optional[str] = None
created_timestamp_column: Optional[str] = None
batch_source: Optional[DataSource] = None

def name_to_use(self):
return self.name_alias or self.name

def to_proto(self) -> FeatureViewProjectionProto:
batch_source = None
if getattr(self, "batch_source", None):
if isinstance(self.batch_source, DataSource):
batch_source = self.batch_source.to_proto()
else:
batch_source = self.batch_source
feature_reference_proto = FeatureViewProjectionProto(
feature_view_name=self.name,
feature_view_name_alias=self.name_alias or "",
join_key_map=self.join_key_map,
timestamp_field=self.timestamp_field or "",
date_partition_column=self.date_partition_column or "",
created_timestamp_column=self.created_timestamp_column or "",
batch_source=batch_source,
)
for feature in self.features:
feature_reference_proto.feature_columns.append(feature.to_proto())

return feature_reference_proto

@staticmethod
def from_proto(proto: FeatureViewProjectionProto):
def from_proto(proto: FeatureViewProjectionProto) -> "FeatureViewProjection":
batch_source = (
DataSource.from_proto(proto.batch_source)
if str(getattr(proto, "batch_source"))
else None
)
feature_view_projection = FeatureViewProjection(
name=proto.feature_view_name,
name_alias=proto.feature_view_name_alias or None,
features=[],
join_key_map=dict(proto.join_key_map),
desired_features=[],
timestamp_field=proto.timestamp_field or None,
date_partition_column=proto.date_partition_column or None,
created_timestamp_column=proto.created_timestamp_column or None,
batch_source=batch_source,
)
for feature_column in proto.feature_columns:
feature_view_projection.features.append(Field.from_proto(feature_column))

return feature_view_projection

@staticmethod
def from_feature_view_definition(feature_view: "FeatureView"):
# TODO need to implement this for StreamFeatureViews
if getattr(feature_view, "batch_source", None):
return FeatureViewProjection(
name=feature_view.name,
name_alias=None,
features=feature_view.features,
desired_features=[],
timestamp_field=feature_view.batch_source.created_timestamp_column
or None,
created_timestamp_column=feature_view.batch_source.created_timestamp_column
or None,
date_partition_column=feature_view.batch_source.date_partition_column
or None,
batch_source=feature_view.batch_source or None,
)
else:
return FeatureViewProjection(
name=feature_view.name,
name_alias=None,
features=feature_view.features,
desired_features=[],
)

@staticmethod
def from_definition(base_feature_view: "BaseFeatureView"):
return FeatureViewProjection(
name=base_feature_view.name,
name_alias=None,
features=base_feature_view.features,
desired_features=[],
)
if getattr(base_feature_view, "batch_source", None):
return FeatureViewProjection(
name=base_feature_view.name,
name_alias=None,
features=base_feature_view.features,
desired_features=[],
timestamp_field=base_feature_view.batch_source.created_timestamp_column # type:ignore[attr-defined]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is admittedly a hack and I don't like it but this should be refactored in 1.0.0

My overall learning from this is that FeatureViews should be really derived from the same object with the same class parameters and make them optionally instantiated.

This should be described more thoroughly for 1.0.0

or None,
created_timestamp_column=base_feature_view.batch_source.created_timestamp_column # type:ignore[attr-defined]
or None,
date_partition_column=base_feature_view.batch_source.date_partition_column # type:ignore[attr-defined]
or None,
batch_source=base_feature_view.batch_source or None, # type:ignore[attr-defined]
)
else:
return FeatureViewProjection(
name=base_feature_view.name,
name_alias=None,
features=base_feature_view.features,
desired_features=[],
)

def get_feature(self, feature_name: str) -> Field:
try:
Expand Down
Loading
Loading