Skip to content

Commit

Permalink
airgradient secret fix and node 16 upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
russbiggs committed Jul 15, 2023
1 parent df86de5 commit 9c350b0
Show file tree
Hide file tree
Showing 7 changed files with 429 additions and 1,582 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
LCS_API=https://api.openaq.org
STACK=my-dev-stack
SECRET_STACK=my-prod-stack
STACK=lcs-etl-pipeline
SECRET_STACK=lcs-etl-pipeline
BUCKET=openaq-fetches
VERBOSE=1
44 changes: 38 additions & 6 deletions cdk.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,42 @@
{
"app": "npx ts-node --prefer-ts-exts cdk/app.ts",
"watch": {
"include": [
"**"
],
"exclude": [
"README.md",
"cdk*.json",
"**/*.d.ts",
"**/*.js",
"tsconfig.json",
"package*.json",
"yarn.lock",
"node_modules",
"test"
]
},
"context": {
"@aws-cdk/core:enableStackNameDuplicates": "true",
"aws-cdk:enableDiffNoFail": "true",
"@aws-cdk/core:stackRelativeExports": "true",
"@aws-cdk/aws-ecr-assets:dockerIgnoreSupport": true,
"@aws-cdk/aws-secretsmanager:parseOwnedSecretName": true
"@aws-cdk/aws-apigateway:usagePlanKeyOrderInsensitiveId": true,
"@aws-cdk/core:stackRelativeExports": true,
"@aws-cdk/aws-rds:lowercaseDbIdentifier": true,
"@aws-cdk/aws-lambda:recognizeVersionProps": true,
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
"@aws-cdk/aws-cloudfront:defaultSecurityPolicyTLSv1.2_2021": true,
"@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true,
"@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true,
"@aws-cdk/core:checkSecretUsage": true,
"@aws-cdk/aws-iam:minimizePolicies": true,
"@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true,
"@aws-cdk/core:validateSnapshotRemovalPolicy": true,
"@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true,
"@aws-cdk/aws-s3:createDefaultLoggingPolicy": true,
"@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true,
"@aws-cdk/aws-apigateway:disableCloudWatchRole": true,
"@aws-cdk/core:enablePartitionLiterals": true,
"@aws-cdk/core:target-partitions": [
"aws",
"aws-cn"
]
}
}
}
20 changes: 10 additions & 10 deletions cdk/app.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env node
import * as cdk from "@aws-cdk/core";
import * as cdk from 'aws-cdk-lib/core';
import "source-map-support/register";
import { EtlPipeline } from "./stack";

Expand All @@ -15,15 +15,15 @@ const stack = new EtlPipeline(app, "lcs-etl-pipeline", {
});


const testingStack = new EtlPipeline(app, "lcs-etl-testing-pipeline", {
description: "Low Cost Sensors: ETL Pipeline Testing",
fetcherModuleDir: "fetcher",
schedulerModuleDir: "scheduler",
sources: require('../fetcher/sources'),
bucketName: process.env.BUCKET || 'openaq-fetches-testing',
lcsApi: process.env.LCS_API || 'https://api.openaq.org'
});
// const testingStack = new EtlPipeline(app, "lcs-etl-testing-pipeline", {
// description: "Low Cost Sensors: ETL Pipeline Testing",
// fetcherModuleDir: "fetcher",
// schedulerModuleDir: "scheduler",
// sources: require('../fetcher/sources'),
// bucketName: process.env.BUCKET || 'openaq-fetches-testing',
// lcsApi: process.env.LCS_API || 'https://api.openaq.org'
// });


cdk.Tags.of(stack).add('Project', 'lcs')
cdk.Tags.of(testingStack).add('Project', 'lcs-testing')
// cdk.Tags.of(testingStack).add('Project', 'lcs-testing')
263 changes: 141 additions & 122 deletions cdk/stack.ts
Original file line number Diff line number Diff line change
@@ -1,133 +1,152 @@
import * as events from "@aws-cdk/aws-events";
import * as eventTargets from "@aws-cdk/aws-events-targets";
import * as iam from "@aws-cdk/aws-iam";
import * as lambda from "@aws-cdk/aws-lambda";
import { SqsEventSource } from "@aws-cdk/aws-lambda-event-sources";
import * as s3 from "@aws-cdk/aws-s3";
import * as sqs from "@aws-cdk/aws-sqs";
import * as cdk from "@aws-cdk/core";
import { execSync } from "child_process";
import { Interval, Source } from "./types";
import * as events from 'aws-cdk-lib/aws-events';
import * as eventTargets from 'aws-cdk-lib/aws-events-targets';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as cdk from 'aws-cdk-lib/core';
import { execSync } from 'child_process';
import { Interval, Source } from './types';
import { Construct } from 'constructs';

export class EtlPipeline extends cdk.Stack {
constructor(
scope: cdk.Construct,
id: string,
{ fetcherModuleDir, schedulerModuleDir, sources, lcsApi, bucketName, ...props }: StackProps
) {
super(scope, id, props);
constructor(
scope: Construct,
id: string,
{
fetcherModuleDir,
schedulerModuleDir,
sources,
lcsApi,
bucketName,
...props
}: StackProps
) {
super(scope, id, props);

const queue = new sqs.Queue(this, "FetcherQueue", {
queueName: `${cdk.Stack.of(this).stackName}-fetch-queue`,
visibilityTimeout: cdk.Duration.seconds(2880),
});
const bucket = s3.Bucket.fromBucketName(this, "Data", bucketName);
const queue = new sqs.Queue(this, 'FetcherQueue', {
queueName: `${cdk.Stack.of(this).stackName}-fetch-queue`,
visibilityTimeout: cdk.Duration.seconds(2880),
});
const bucket = s3.Bucket.fromBucketName(this, 'Data', bucketName);

this.buildFetcherLambda({ moduleDir: fetcherModuleDir, queue, bucket, lcsApi });
this.buildSchedulerLambdas({ moduleDir: schedulerModuleDir, queue, sources });
}
this.buildFetcherLambda({
moduleDir: fetcherModuleDir,
queue,
bucket,
lcsApi,
});
this.buildSchedulerLambdas({
moduleDir: schedulerModuleDir,
queue,
sources,
});
}

private buildFetcherLambda(props: {
moduleDir: string;
queue: sqs.Queue;
bucket: s3.IBucket;
lcsApi: string;
}): lambda.Function {
this.prepareNodeModules(props.moduleDir);
const handler = new lambda.Function(this, "Fetcher", {
description: "Fetch a single source for a given time period",
runtime: lambda.Runtime.NODEJS_12_X,
handler: "index.handler",
code: lambda.Code.fromAsset(props.moduleDir),
timeout: cdk.Duration.seconds(900),
memorySize: 512,
environment: {
BUCKET: props.bucket.bucketName,
STACK: cdk.Stack.of(this).stackName,
VERBOSE: '1',
LCS_API: props.lcsApi
},
});
handler.addEventSource(
new SqsEventSource(props.queue, {
batchSize: 1,
})
);
props.queue.grantConsumeMessages(handler);
props.bucket.grantReadWrite(handler);
handler.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"secretsmanager:DescribeSecret",
"secretsmanager:GetSecretValue",
],
resources: [
`arn:aws:secretsmanager:*:*:secret:${cdk.Stack.of(this).stackName}/*`,
],
})
);
return handler;
}
private buildFetcherLambda(props: {
moduleDir: string;
queue: sqs.Queue;
bucket: s3.IBucket;
lcsApi: string;
}): lambda.Function {
this.prepareNodeModules(props.moduleDir);
const handler = new lambda.Function(this, 'Fetcher', {
description: 'Fetch a single source for a given time period',
runtime: lambda.Runtime.NODEJS_16_X,
handler: 'index.handler',
code: lambda.Code.fromAsset(props.moduleDir),
timeout: cdk.Duration.seconds(900),
memorySize: 512,
environment: {
BUCKET: props.bucket.bucketName,
STACK: cdk.Stack.of(this).stackName,
VERBOSE: '1',
LCS_API: props.lcsApi,
},
});
handler.addEventSource(
new SqsEventSource(props.queue, {
batchSize: 1,
})
);
props.queue.grantConsumeMessages(handler);
props.bucket.grantReadWrite(handler);
handler.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'secretsmanager:DescribeSecret',
'secretsmanager:GetSecretValue',
],
resources: [
`arn:aws:secretsmanager:*:*:secret:${
cdk.Stack.of(this).stackName
}/*`,
],
})
);
return handler;
}

private buildSchedulerLambdas(props: {
moduleDir: string;
queue: sqs.Queue;
sources: Source[];
}): lambda.Function[] {
const durations: Record<Interval, cdk.Duration> = {
minute: cdk.Duration.minutes(1),
hour: cdk.Duration.hours(1),
day: cdk.Duration.days(1),
};
return Object.entries(durations).map(([interval, duration]) => {
const scheduler = new lambda.Function(
this,
`${interval}Scheduler`,
{
description: `${interval}Scheduler`,
runtime: lambda.Runtime.NODEJS_12_X,
handler: "index.handler",
code: lambda.Code.fromAsset(props.moduleDir),
timeout: cdk.Duration.seconds(25),
memorySize: 128,
environment: {
QUEUE_URL: props.queue.queueUrl,
SOURCES: props.sources
.filter((source) => source.frequency === interval)
.map((source) => source.provider)
.join(","),
},
}
);
props.queue.grantSendMessages(scheduler);
new events.Rule(this, `${interval}Rule`, {
schedule: events.Schedule.rate(duration),
targets: [new eventTargets.LambdaFunction(scheduler)],
});
return scheduler;
});
}
private buildSchedulerLambdas(props: {
moduleDir: string;
queue: sqs.Queue;
sources: Source[];
}): lambda.Function[] {
const durations: Record<Interval, cdk.Duration> = {
minute: cdk.Duration.minutes(1),
hour: cdk.Duration.hours(1),
day: cdk.Duration.days(1),
};
return Object.entries(durations).map(([interval, duration]) => {
const scheduler = new lambda.Function(
this,
`${interval}Scheduler`,
{
description: `${interval}Scheduler`,
runtime: lambda.Runtime.NODEJS_16_X,
handler: 'index.handler',
code: lambda.Code.fromAsset(props.moduleDir),
timeout: cdk.Duration.seconds(25),
memorySize: 128,
environment: {
QUEUE_URL: props.queue.queueUrl,
SOURCES: props.sources
.filter((source) => source.frequency === interval)
.map((source) => source.provider)
.join(','),
},
}
);
props.queue.grantSendMessages(scheduler);
new events.Rule(this, `${interval}Rule`, {
schedule: events.Schedule.rate(duration),
targets: [new eventTargets.LambdaFunction(scheduler)],
});
return scheduler;
});
}

/**
* Install node_modules in module directory for the sake of easy packaging.
* @param moduleDir string
*/
private prepareNodeModules(moduleDir: string): void {
const cmd = [
"yarn",
"--prod",
"--frozen-lockfile",
`--modules-folder ${moduleDir}/node_modules`,
].join(" ");
execSync(cmd);
}
/**
* Install node_modules in module directory for the sake of easy packaging.
* @param moduleDir string
*/
private prepareNodeModules(moduleDir: string): void {
const cmd = [
'yarn',
'--prod',
'--frozen-lockfile',
`--modules-folder ${moduleDir}/node_modules`,
].join(' ');
execSync(cmd);
}
}

interface StackProps extends cdk.StackProps {
fetcherModuleDir: string;
schedulerModuleDir: string;
lcsApi: string;
bucketName: string;
sources: Source[];
fetcherModuleDir: string;
schedulerModuleDir: string;
lcsApi: string;
bucketName: string;
sources: Source[];
}
2 changes: 1 addition & 1 deletion fetcher/providers/airgradient.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function getLatestReading(sensorData) {
}

async function processor(source_name, source) {
const token = await fetchSecret('airgradient');
const { token } = await fetchSecret('airgradient');
let devices = await getDevices(source, token);
devices = devices.filter((o) => !o.offline);
devices = devices.filter((o) => o.latitude != null && o.longitude != null );
Expand Down
Loading

0 comments on commit 9c350b0

Please sign in to comment.