Skip to content

Commit

Permalink
Merge pull request #34 from openaq/upgrade-packages
Browse files Browse the repository at this point in the history
Upgrade packages, airgradient changes and publish method
  • Loading branch information
caparker committed Mar 27, 2024
2 parents fff5f84 + 687b93d commit 6566904
Show file tree
Hide file tree
Showing 27 changed files with 5,072 additions and 5,291 deletions.
3 changes: 1 addition & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
LCS_API=https://api.openaq.org
STACK=lcs-etl-pipeline
SECRET_STACK=lcs-etl-pipeline
BUCKET=openaq-fetches
VERBOSE=1
TOPIC_ARN=arn:aws:sns:us-east-1:470049585876:NewFetchResults
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ jobs:
steps:
- uses: actions/checkout@v2

- name: Use Node.js 12.x
- name: Use Node.js 20.x
uses: actions/setup-node@v1
with:
node-version: 12.x
node-version: 20.x

- run: npm install
- run: npm run lint
Expand Down
3 changes: 2 additions & 1 deletion cdk/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ const stack = new EtlPipeline(app, "lcs-etl-pipeline", {
schedulerModuleDir: "scheduler",
sources: require('../fetcher/sources'),
bucketName: process.env.BUCKET || 'openaq-fetches',
lcsApi: process.env.LCS_API || 'https://api.openaq.org'
lcsApi: process.env.LCS_API || 'https://api.openaq.org',
topicArn: process.env.TOPIC_ARN
});


Expand Down
19 changes: 16 additions & 3 deletions cdk/stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class EtlPipeline extends cdk.Stack {
sources,
lcsApi,
bucketName,
topicArn,
...props
}: StackProps
) {
Expand All @@ -36,6 +37,7 @@ export class EtlPipeline extends cdk.Stack {
queue,
bucket,
lcsApi,
topicArn,
});
this.buildSchedulerLambdas({
moduleDir: schedulerModuleDir,
Expand All @@ -49,20 +51,21 @@ export class EtlPipeline extends cdk.Stack {
queue: sqs.Queue;
bucket: s3.IBucket;
lcsApi: string;
topicArn: string;
}): lambda.Function {
this.prepareNodeModules(props.moduleDir);
const handler = new lambda.Function(this, 'Fetcher', {
description: 'Fetch a single source for a given time period',
runtime: lambda.Runtime.NODEJS_16_X,
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'index.handler',
code: lambda.Code.fromAsset(props.moduleDir),
timeout: cdk.Duration.seconds(900),
memorySize: 512,
environment: {
BUCKET: props.bucket.bucketName,
STACK: cdk.Stack.of(this).stackName,
VERBOSE: '1',
LCS_API: props.lcsApi,
TOPIC_ARN: props.topicArn,
},
});
handler.addEventSource(
Expand All @@ -72,6 +75,14 @@ export class EtlPipeline extends cdk.Stack {
);
props.queue.grantConsumeMessages(handler);
props.bucket.grantReadWrite(handler);
handler.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['sns:Publish'],
resources: [props.topicArn],
})
);

handler.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
Expand All @@ -86,6 +97,7 @@ export class EtlPipeline extends cdk.Stack {
],
})
);

return handler;
}

Expand All @@ -105,7 +117,7 @@ export class EtlPipeline extends cdk.Stack {
`${interval}Scheduler`,
{
description: `${interval}Scheduler`,
runtime: lambda.Runtime.NODEJS_16_X,
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'index.handler',
code: lambda.Code.fromAsset(props.moduleDir),
timeout: cdk.Duration.seconds(25),
Expand Down Expand Up @@ -147,6 +159,7 @@ interface StackProps extends cdk.StackProps {
fetcherModuleDir: string;
schedulerModuleDir: string;
lcsApi: string;
topicArn: string;
bucketName: string;
sources: Source[];
}
11 changes: 6 additions & 5 deletions fetcher/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ require('dotenv').config({ path });
const providers = new (require('./lib/providers'))();
const sources = require('./sources');


if (require.main === module) {
handler();
}

async function handler(event) {
try {

if (!process.env.SOURCE && !event)
throw new Error('SOURCE env var or event required');

Expand All @@ -23,12 +25,11 @@ async function handler(event) {
const source = sources.find((source) => source.provider === source_name);
if (!source) throw new Error(`Unable to find ${source_name} in sources.`);

console.log(`Processing '${source_name}'`);
await providers.processor(source_name, source);

return {};
const log = await providers.processor(source);
await providers.publish(log, 'fetcher/success');
return log;
} catch (err) {
console.error(err);
providers.publish(err, 'fetcher/error');
process.exit(1);
}
}
Expand Down
8 changes: 8 additions & 0 deletions fetcher/lib/measure.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class Measures {
constructor(type) {
this.headers = [];
this.measures = [];
this.from = null;
this.to = null;

if (type === FixedMeasure) {
this.headers = ['sensor_id', 'measure', 'timestamp'];
Expand All @@ -16,6 +18,12 @@ class Measures {
}

push(measure) {
if (!this.to || measure.timestamp > this.to) {
this.to = measure.timestamp;
}
if (!this.from || measure.timestamp < this.from) {
this.from = measure.timestamp;
}
this.measures.push(measure);
}

Expand Down
23 changes: 12 additions & 11 deletions fetcher/lib/meta.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const { VERBOSE } = require('./utils');
const AWS = require('aws-sdk');
const s3 = new AWS.S3({
maxRetries: 10
});

const {
getObject,
putObject
} = require('./utils');

/**
* Helper to store metadata about a source in S3.
Expand All @@ -20,8 +21,8 @@ class MetaDetails {

async load() {
try {
const resp = await s3.getObject(this.props).promise();
return JSON.parse(resp.Body.toString('utf-8'));
const body = await getObject(this.props.Bucket, this.props.Key);
return JSON.parse(body);
} catch (err) {
if (err.statusCode !== 404)
throw err;
Expand All @@ -32,11 +33,11 @@ class MetaDetails {
}

save(body) {
return s3.putObject({
...this.props,
Body: JSON.stringify(body),
ContentType: 'application/json'
}).promise();
return putObject(
JSON.stringify(body),
this.props.Bucket,
this.props.Key,
);
}
}
exports.MetaDetails = MetaDetails;
75 changes: 49 additions & 26 deletions fetcher/lib/providers.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
const fs = require('fs');
const path = require('path');
const AWS = require('aws-sdk');
const { SNSClient, PublishCommand } = require('@aws-sdk/client-sns');

const {
VERBOSE,
DRYRUN,
gzip,
unzip,
fetchSecret,
getObject,
putObject,
prettyPrintStation
} = require('./utils');

const s3 = new AWS.S3({
maxRetries: 10
});

const sns = new SNSClient();

/**
* Runtime handler for each of the custom provider scripts, as well
Expand All @@ -28,13 +30,42 @@ class Providers {
/**
* Given a source config file, choose the corresponding provider script to run
*
* @param {String} source_name
* @param {Object} source
*/
async processor(source_name, source) {
async processor(source) {
if (VERBOSE) console.debug('Processing', source.provider);
if (!this[source.provider]) throw new Error(`${source.provider} is not a supported provider`);
// fetch any secrets we may be storing for the provider
if (VERBOSE) console.debug('Fetching secret: ', source.secretKey);
const config = await fetchSecret(source);
// and combine them with the source config for more generic access
if (VERBOSE) console.log('Starting processor', { ...source, ...config });
const log = await this[source.provider].processor({ ...source, ...config });
// source_name is more consistent with our db schema
if (typeof(log) == 'object' && !Array.isArray(log) && !log.source_name) {
log.source_name = source.provider;
}
return (log);
}

await this[source.provider].processor(source_name, source);
/**
* Publish the results of the fetch to our SNS topic
*
* @param {Object} message
* @param {String} subject
*/
async publish(message, subject) {
console.log('Publishing:', subject, message);
if (process.env.TOPIC_ARN && message) {
const cmd = new PublishCommand({
TopicArn: process.env.TOPIC_ARN,
Subject: subject,
Message: JSON.stringify(message)
});
return await sns.send(cmd);
} else {
return {};
}
}

/**
Expand Down Expand Up @@ -67,8 +98,7 @@ class Providers {

// Diff data to minimize costly S3 PUT operations
try {
const resp = await s3.getObject({ Bucket, Key }).promise();
const currentData = (await unzip(resp.Body)).toString('utf-8');
const currentData = await getObject(Bucket, Key);
if (currentData === newData && !process.env.FORCE) {
if (VERBOSE) console.log(`station has not changed - station: ${providerStation}`);
return;
Expand All @@ -78,25 +108,23 @@ class Providers {
prettyPrintStation(newData);
console.log('-----------------> from');
prettyPrintStation(currentData);
} else {
console.log(`Updating the station file: ${providerStation}`);
}
} catch (err) {
if (err.statusCode !== 404) throw err;
if (err.Code !== 'NoSuchKey') throw err;
}

const compressedString = await gzip(newData);

if (!DRYRUN) {
if (VERBOSE) console.debug(`Saving station to ${Bucket}/${Key}`);

await s3.putObject({
await putObject(
compressedString,
Bucket,
Key,
Body: compressedString,
ContentType: 'application/json',
ContentEncoding: 'gzip'
}).promise();
false,
'application/json',
'gzip'
);
}
if (VERBOSE) console.log(`finished station: ${providerStation}\n------------------------`);
}
Expand All @@ -117,19 +145,14 @@ class Providers {
const Key = `${process.env.STACK}/measures/${provider}/${filename}.csv.gz`;
const compressedString = await gzip(measures.csv());


if (DRYRUN) {
console.log(`Would have saved ${measures.length} measurements to '${Bucket}/${Key}'`);
return new Promise((y) => y(true));
}
if (VERBOSE) console.debug(`Saving measurements to ${Bucket}/${Key}`);

return s3.putObject({
Bucket,
Key,
Body: compressedString,
ContentType: 'text/csv',
ContentEncoding: 'gzip'
}).promise();
return await putObject(compressedString, Bucket, Key, false, 'text/csv', 'gzip');
}
}

Expand Down
Loading

0 comments on commit 6566904

Please sign in to comment.