Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk: Remove Logger #311

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 34 additions & 58 deletions blueprint-manager/src/executor/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use gadget_io::GadgetConfig;
use gadget_sdk::clients::tangle::runtime::{TangleConfig, TangleEvent};
use gadget_sdk::clients::tangle::services::{RpcServicesWithBlueprint, ServicesClient};
use gadget_sdk::config::Protocol;
use gadget_sdk::logger::Logger;
use gadget_sdk::{error, info, trace, warn};
use std::fmt::Debug;
use std::sync::atomic::Ordering;
use tangle_subxt::subxt::utils::AccountId32;
Expand Down Expand Up @@ -40,19 +40,17 @@ pub async fn handle_services<'a>(
gadget_config: &GadgetConfig,
blueprint_manager_opts: &BlueprintManagerConfig,
active_gadgets: &mut ActiveGadgets,
logger: &Logger,
) -> color_eyre::Result<()> {
for blueprint in blueprints {
if let Err(err) = crate::sources::handle(
blueprint,
gadget_config,
blueprint_manager_opts,
active_gadgets,
logger,
)
.await
{
logger.error(err)
error!("{err}");
}
}

Expand All @@ -68,7 +66,6 @@ pub struct EventPollResult {

pub(crate) async fn check_blueprint_events(
event: &TangleEvent,
logger: &Logger,
active_gadgets: &mut ActiveGadgets,
account_id: &AccountId32,
) -> EventPollResult {
Expand All @@ -86,11 +83,11 @@ pub(crate) async fn check_blueprint_events(
Ok(evt) => {
if &evt.operator == account_id {
result.blueprint_registrations.push(evt.blueprint_id);
logger.info(format!("Pre-registered event: {evt:?}"));
info!("Pre-registered event: {evt:?}");
}
}
Err(err) => {
logger.warn(format!("Error handling pre-registered event: {err:?}"));
warn!("Error handling pre-registered event: {err:?}");
}
}
}
Expand All @@ -99,11 +96,11 @@ pub(crate) async fn check_blueprint_events(
for evt in registered_events {
match evt {
Ok(evt) => {
logger.info(format!("Registered event: {evt:?}"));
info!("Registered event: {evt:?}");
result.needs_update = true;
}
Err(err) => {
logger.warn(format!("Error handling registered event: {err:?}"));
warn!("Error handling registered event: {err:?}");
}
}
}
Expand All @@ -112,19 +109,16 @@ pub(crate) async fn check_blueprint_events(
for evt in unregistered_events {
match evt {
Ok(evt) => {
logger.info(format!("Unregistered event: {evt:?}"));
info!("Unregistered event: {evt:?}");
if &evt.operator == account_id && active_gadgets.remove(&evt.blueprint_id).is_some()
{
logger.info(format!(
"Removed services for blueprint_id: {}",
evt.blueprint_id,
));
info!("Removed services for blueprint_id: {}", evt.blueprint_id,);

result.needs_update = true;
}
}
Err(err) => {
logger.warn(format!("Error handling unregistered event: {err:?}"));
warn!("Error handling unregistered event: {err:?}");
}
}
}
Expand All @@ -133,10 +127,10 @@ pub(crate) async fn check_blueprint_events(
for evt in service_initiated_events {
match evt {
Ok(evt) => {
logger.info(format!("Service initiated event: {evt:?}"));
info!("Service initiated event: {evt:?}");
}
Err(err) => {
logger.warn(format!("Error handling service initiated event: {err:?}"));
warn!("Error handling service initiated event: {err:?}");
}
}
}
Expand All @@ -145,10 +139,10 @@ pub(crate) async fn check_blueprint_events(
for evt in job_called_events {
match evt {
Ok(evt) => {
logger.info(format!("Job called event: {evt:?}"));
info!("Job called event: {evt:?}");
}
Err(err) => {
logger.warn(format!("Error handling job called event: {err:?}"));
warn!("Error handling job called event: {err:?}");
}
}
}
Expand All @@ -157,12 +151,10 @@ pub(crate) async fn check_blueprint_events(
for evt in job_result_submitted_events {
match evt {
Ok(evt) => {
logger.info(format!("Job result submitted event: {evt:?}"));
info!("Job result submitted event: {evt:?}");
}
Err(err) => {
logger.warn(format!(
"Error handling job result submitted event: {err:?}"
));
warn!("Error handling job result submitted event: {err:?}");
}
}
}
Expand All @@ -174,16 +166,15 @@ pub(crate) async fn check_blueprint_events(
pub(crate) async fn handle_tangle_event(
event: &TangleEvent,
blueprints: &[RpcServicesWithBlueprint],
logger: &Logger,
gadget_config: &GadgetConfig,
gadget_manager_opts: &BlueprintManagerConfig,
active_gadgets: &mut ActiveGadgets,
poll_result: EventPollResult,
client: &ServicesClient<TangleConfig>,
) -> color_eyre::Result<()> {
logger.trace(format!("Received notification {}", event.number));
info!("Received notification {}", event.number);
const DEFAULT_PROTOCOL: Protocol = Protocol::Tangle;
logger.warn("Using Tangle protocol as default over Eigen. This is a temporary development workaround. You can alter this behavior here");
warn!("Using Tangle protocol as default over Eigen. This is a temporary development workaround. You can alter this behavior here");

let mut registration_blueprints = vec![];
// First, check to see if we need to register any new services invoked by the PreRegistration event
Expand Down Expand Up @@ -232,7 +223,6 @@ pub(crate) async fn handle_tangle_event(
let fetcher = GithubBinaryFetcher {
fetcher: gh.clone(),
blueprint_id: blueprint.blueprint_id,
logger,
gadget_name: blueprint.name.clone(),
};

Expand All @@ -242,14 +232,13 @@ pub(crate) async fn handle_tangle_event(
GadgetSourceFetcher::Testing(test) => {
// TODO: demote to TRACE once proven to work
if !gadget_manager_opts.test_mode {
logger.warn("Ignoring testing fetcher as we are not in test mode");
warn!("Ignoring testing fetcher as we are not in test mode");
continue;
}

let fetcher = crate::sources::testing::TestSourceFetcher {
fetcher: test.clone(),
blueprint_id: blueprint.blueprint_id,
logger,
gadget_name: blueprint.name.clone(),
};

Expand All @@ -258,7 +247,7 @@ pub(crate) async fn handle_tangle_event(
}

_ => {
logger.warn("Blueprint does not contain a supported fetcher");
warn!("Blueprint does not contain a supported fetcher");
continue;
}
}
Expand All @@ -268,19 +257,16 @@ pub(crate) async fn handle_tangle_event(

// Ensure that we have at least one fetcher
if fetcher_candidates.is_empty() {
logger.warn(format!(
"No fetchers found for blueprint: {}",
blueprint.name,
));
warn!("No fetchers found for blueprint: {}", blueprint.name,);
continue;
}

// Ensure that we have a test fetcher if we are in test mode
if gadget_manager_opts.test_mode && test_fetcher_idx.is_none() {
logger.warn(format!(
warn!(
"No testing fetcher found for blueprint `{}` despite operating in TEST MODE",
blueprint.name,
));
);
continue;
}

Expand All @@ -292,10 +278,10 @@ pub(crate) async fn handle_tangle_event(

// Ensure there is only a single candidate fetcher
if fetcher_candidates.len() != 1 {
logger.warn(format!(
warn!(
"Multiple fetchers found for blueprint: {}. Invalidating blueprint",
blueprint.name,
));
);
continue;
}

Expand All @@ -306,26 +292,24 @@ pub(crate) async fn handle_tangle_event(

verified_blueprints.push(verified_blueprint);
} else {
logger
.warn("Blueprint does not contain a native gadget and thus currently unsupported");
warn!("Blueprint does not contain a native gadget and thus currently unsupported");
}
}

logger.trace(format!(
trace!(
"OnChain Verified Blueprints: {:?}",
verified_blueprints
.iter()
.map(|r| format!("{r:?}"))
.collect::<Vec<_>>()
));
);

// Step 3: Check to see if we need to start any new services
handle_services(
&verified_blueprints,
gadget_config,
gadget_manager_opts,
active_gadgets,
logger,
)
.await?;

Expand All @@ -335,9 +319,9 @@ pub(crate) async fn handle_tangle_event(
// Loop through every (blueprint_id, service_id) running. See if the service is still on-chain. If not, kill it and add it to to_remove
for (blueprint_id, process_handles) in &mut *active_gadgets {
for service_id in process_handles.keys() {
logger.info(format!(
info!(
"Checking service for on-chain termination: bid={blueprint_id}//sid={service_id}"
));
);

// Since the below "verified blueprints" were freshly obtained from an on-chain source,
// we compare all these fresh values to see if we're running a service locally that is no longer on-chain
Expand All @@ -346,9 +330,7 @@ pub(crate) async fn handle_tangle_event(
// Safe assertion since we know there is at least one fetcher. All fetchers should have the same blueprint id
let fetcher = &verified_blueprints.fetcher;
if fetcher.blueprint_id() == *blueprint_id && !services.contains(service_id) {
logger.warn(format!(
"Killing service that is no longer on-chain: bid={blueprint_id}//sid={service_id}",
));
warn!("Killing service that is no longer on-chain: bid={blueprint_id}//sid={service_id}");
to_remove.push((*blueprint_id, *service_id));
}
}
Expand All @@ -362,28 +344,22 @@ pub(crate) async fn handle_tangle_event(
&& !process_handle.0.load(Ordering::Relaxed)
{
// By removing any killed processes, we will auto-restart them on the next finality notification if required
logger.warn("Killing service that has died to allow for auto-restart");
warn!("Killing service that has died to allow for auto-restart");
to_remove.push((*blueprint_id, *service_id));
}
}
}

for (blueprint_id, service_id) in to_remove {
logger.warn(format!(
"Removing service that is no longer active on-chain or killed: bid={blueprint_id}//sid={service_id}",
));
warn!("Removing service that is no longer active on-chain or killed: bid={blueprint_id}//sid={service_id}");
let mut should_delete_blueprint = false;
if let Some(gadgets) = active_gadgets.get_mut(&blueprint_id) {
if let Some((_, mut process_handle)) = gadgets.remove(&service_id) {
if let Some(abort_handle) = process_handle.take() {
if abort_handle.send(()).is_err() {
logger.error(format!(
"Failed to send abort signal to service: bid={blueprint_id}//sid={service_id}",
));
error!("Failed to send abort signal to service: bid={blueprint_id}//sid={service_id}");
} else {
logger.warn(format!(
"Sent abort signal to service: bid={blueprint_id}//sid={service_id}",
));
warn!("Sent abort signal to service: bid={blueprint_id}//sid={service_id}");
}
}
}
Expand Down
Loading
Loading