Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQSServiceSensor for non-polling SQS sensor #91

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

grepory
Copy link

@grepory grepory commented Dec 18, 2019

This adds a SQS Sensor with its own polling loop so that we can
consume messages from one or more SQS queues as quickly as possible
without relying on StackStorm to trigger a poll interval.

Closes #90 cc @Kami

The SQSServiceSensor class name is meh, but I lack in creativity. This is what we're using right now to process anywhere between 30-100 messages per second from a single queue.

This adds a SQS Sensor with its own polling loop so that we can
consume messages from one or more SQS queues as quickly as possible
without relying on StackStorm to trigger a poll interval.
@grepory grepory force-pushed the grepory/non-polling-sqs-sensor branch from 0651675 to 50d0d31 Compare December 18, 2019 23:51
# setting SQS ServiceResource object from the parameter of datastore or configuration file
self._may_setup_sqs()

while True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume there is no yielding needed in this function (aka eventlet.sleep(0.01) at the end or similar) because _receive_messages performs a network operation which already needs to yield at some point even if there are no messages to be retrieved.

Otherwise if that's not the case and _receive_messages could immediately return this could cause CPU spikes and 100% CPU utilization by the sensor process since there is no yielding.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boto3 receive_messages accepts a WaitTimeSeconds argument, which _receive_messages defaults to 2 seconds. That's what's keeping the loop from spinning too fast.

---
class_name: "AWSSQSServiceSensor"
entry_point: "sqs_service_sensor.py"
description: "Service Sensor which monitors a SQS queue for new messages"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would also be good to document here in the description and also in README how this sensor differentiates from other one :)

payload = {"queue": queue, "body": json.loads(msg.body)}
self._sensor_service.dispatch(trigger="aws.sqs_new_message",
payload=payload)
msg.delete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this method throw?

If so, it probably wouldn't be a bad idea to wrap it in try / catch to avoid a scenario where the same message would always throw for some reason which would prevent sensor from continuing the processing since it would always crash on exception...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this throws an exception, it's not because SQS couldn't delete the message due to some unrecoverable error. It will be because of a misconfiguration on the client side or something like that. I don't think it is necessary to try/catch this.

self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName)
return self.sqs_res.create_queue(QueueName=queueName)
elif e.response['Error']['Code'] == 'InvalidClientTokenId':
self._logger.warning("Cloudn't operate sqs because of invalid credential config")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also throw (and abort sensor processing) on this error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why this was logged and not raised. It shouldn't be possible to get here if you have invalid credentials. We tested that scenario, and it raises an exception. I can't recall if it did so when getting the resource or creating the session.

@punkrokk
Copy link
Contributor

punkrokk commented Feb 8, 2020

@grepory Hey - I want to get this merged, so I am reviewing this PR and I have a few questions.

  1. What happens if someone adds a large amount of queues? Say 30? 100? 500?
  2. Any system requirements recommendations? I can see this sensor getting people into trouble without care consideration before turning it on.
  3. What if you named the sensor SQSContinuousSensor or something like that to better differentiate?

@grepory
Copy link
Author

grepory commented Feb 8, 2020 via email

@CLAassistant
Copy link

CLAassistant commented May 11, 2022

CLA assistant check
All committers have signed the CLA.

@satellite-no
Copy link
Contributor

Based on @grepory last comment shouldnt this be closed?

Can we implement Stale-bot to clean old PRs issues?

@alaypatel07
Copy link

alaypatel07 commented Feb 14, 2024

I am working on a usecase that needs the SQS sensor to churn at a faster rate than 120 messages/minute, which is the limitation of current in-tree sensor.

Considering that I would be interested in moving this PR forward. Also, the non-polling sensor could potentially benefit from an async mechanism to grab the sqs message. Would exploring the following https://aiobotocore.readthedocs.io/en/latest/examples/sqs/producer_consumer.html# for non polling sqs sensor help address the open items on the review esp regarding timeout interval?

cc @Kami ^ since you were helping with the review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Change to Sensor from PollingSensor for SQS Sensor
6 participants