Skip to content

Commit

Permalink
Merge pull request #2618 from cloudflare/ggu/packages-in-artifact-bun…
Browse files Browse the repository at this point in the history
…dler

Add packages to ArtifactBundler
  • Loading branch information
garrettgu10 committed Sep 13, 2024
2 parents 72335f0 + 5bf0f99 commit 157fe35
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 44 deletions.
69 changes: 55 additions & 14 deletions src/pyodide/internal/loadPackage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@
* that contains all the packages ready to go.
*/

import { default as LOCKFILE } from 'pyodide-internal:generated/pyodide-lock.json';
import { WORKERD_INDEX_URL } from 'pyodide-internal:metadata';
import {
SITE_PACKAGES,
WORKERD_INDEX_URL,
LOCKFILE,
LOAD_WHEELS_FROM_R2,
LOAD_WHEELS_FROM_ARTIFACT_BUNDLER,
PACKAGES_VERSION,
} from 'pyodide-internal:metadata';
import {
SITE_PACKAGES,
getSitePackagesPath,
} from 'pyodide-internal:setupPackages';
import { parseTarInfo } from 'pyodide-internal:tar';
import { default as DiskCache } from 'pyodide-internal:disk_cache';
import { createTarFS } from 'pyodide-internal:tarfs';
import { default as ArtifactBundler } from 'pyodide-internal:artifacts';

async function decompressArrayBuffer(
arrBuf: ArrayBuffer
Expand All @@ -33,13 +38,25 @@ async function decompressArrayBuffer(
}
}

async function loadBundle(requirement: string): Promise<[string, ArrayBuffer]> {
function getFilenameOfPackage(requirement: string): string {
const obj = LOCKFILE['packages'][requirement];
if (!obj) {
throw new Error('Requirement ' + requirement + ' not found in lockfile');
}

return obj.file_name;
}

// loadBundleFromR2 loads the package from the internet (through fetch) and uses the DiskCache as
// a backing store. This is only used in local dev.
async function loadBundleFromR2(requirement: string): Promise<Reader> {
// first check if the disk cache has what we want
const filename = LOCKFILE['packages'][requirement]['file_name'];
const filename = getFilenameOfPackage(requirement);
const cached = DiskCache.get(filename);
if (cached) {
const decompressed = await decompressArrayBuffer(cached);
return [requirement, decompressed];
const reader = new ArrayBufferReader(decompressed);
return reader;
}

// we didn't find it in the disk cache, continue with original fetch
Expand All @@ -50,7 +67,22 @@ async function loadBundle(requirement: string): Promise<[string, ArrayBuffer]> {
const decompressed = await decompressArrayBuffer(compressed);

DiskCache.put(filename, compressed);
return [requirement, decompressed];
const reader = new ArrayBufferReader(decompressed);
return reader;
}

async function loadBundleFromArtifactBundler(
requirement: string
): Promise<Reader> {
const filename = getFilenameOfPackage(requirement);
const fullPath = 'python-package-bucket/' + PACKAGES_VERSION + '/' + filename;
const reader = ArtifactBundler.getPackage(fullPath);
if (!reader) {
throw new Error(
'Failed to get package ' + fullPath + ' from ArtifactBundler'
);
}
return reader;
}

/**
Expand All @@ -73,22 +105,23 @@ class ArrayBufferReader {
}
}

export async function loadPackages(Module: Module, requirements: Set<string>) {
if (!LOAD_WHEELS_FROM_R2) return;

let loadPromises = [];
async function loadPackagesImpl(
Module: Module,
requirements: Set<string>,
loadBundle: (req: string) => Promise<Reader>
) {
let loadPromises: Promise<[string, Reader]>[] = [];
let loading = [];
for (const req of requirements) {
if (SITE_PACKAGES.loadedRequirements.has(req)) continue;
loadPromises.push(loadBundle(req));
loadPromises.push(loadBundle(req).then((r) => [req, r]));
loading.push(req);
}

console.log('Loading ' + loading.join(', '));

const buffers = await Promise.all(loadPromises);
for (const [requirement, buffer] of buffers) {
const reader = new ArrayBufferReader(buffer);
for (const [requirement, reader] of buffers) {
const [tarInfo, soFiles] = parseTarInfo(reader);
SITE_PACKAGES.addSmallBundle(tarInfo, soFiles, requirement);
}
Expand All @@ -100,3 +133,11 @@ export async function loadPackages(Module: Module, requirements: Set<string>) {
const info = SITE_PACKAGES.rootInfo;
Module.FS.mount(tarFS, { info }, path);
}

export async function loadPackages(Module: Module, requirements: Set<string>) {
if (LOAD_WHEELS_FROM_R2) {
await loadPackagesImpl(Module, requirements, loadBundleFromR2);
} else if (LOAD_WHEELS_FROM_ARTIFACT_BUNDLER) {
await loadPackagesImpl(Module, requirements, loadBundleFromArtifactBundler);
}
}
8 changes: 7 additions & 1 deletion src/pyodide/internal/metadata.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { default as MetadataReader } from 'pyodide-internal:runtime-generated/metadata';
export { default as LOCKFILE } from 'pyodide-internal:generated/pyodide-lock.json';
import { default as PYODIDE_BUCKET } from 'pyodide-internal:generated/pyodide-bucket.json';
import { default as ArtifactBundler } from 'pyodide-internal:artifacts';

Expand All @@ -9,6 +8,13 @@ export const SHOULD_SNAPSHOT_TO_DISK = MetadataReader.shouldSnapshotToDisk();
export const IS_CREATING_BASELINE_SNAPSHOT =
MetadataReader.isCreatingBaselineSnapshot();
export const WORKERD_INDEX_URL = PYODIDE_BUCKET.PYODIDE_PACKAGE_BUCKET_URL;
export const LOAD_WHEELS_FROM_R2: boolean = IS_WORKERD;
export const LOAD_WHEELS_FROM_ARTIFACT_BUNDLER =
MetadataReader.shouldUsePackagesInArtifactBundler();
export const PACKAGES_VERSION = MetadataReader.getPackagesVersion();
export const LOCKFILE: PackageLock = JSON.parse(
MetadataReader.getPackagesLock()
);
export const REQUIREMENTS = MetadataReader.getRequirements();
export const MAIN_MODULE_NAME = MetadataReader.getMainModule();
export const MEMORY_SNAPSHOT_READER = MetadataReader.hasMemorySnapshot()
Expand Down
33 changes: 19 additions & 14 deletions src/pyodide/internal/setupPackages.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { parseTarInfo } from 'pyodide-internal:tar';
import { createTarFS } from 'pyodide-internal:tarfs';
import { createMetadataFS } from 'pyodide-internal:metadatafs';
import { default as LOCKFILE } from 'pyodide-internal:generated/pyodide-lock.json';
import { REQUIREMENTS, WORKERD_INDEX_URL } from 'pyodide-internal:metadata';
import {
REQUIREMENTS,
LOAD_WHEELS_FROM_R2,
LOCKFILE,
LOAD_WHEELS_FROM_ARTIFACT_BUNDLER,
} from 'pyodide-internal:metadata';
import { simpleRunPython } from 'pyodide-internal:util';

const canonicalizeNameRegex = /[-_.]+/g;
Expand Down Expand Up @@ -122,22 +126,25 @@ class SitePackagesDir {
* This also returns the list of soFiles in the resulting site-packages
* directory so we can preload them.
*/
export function buildSitePackages(
requirements: Set<string>
): [SitePackagesDir, boolean] {
export function buildSitePackages(requirements: Set<string>): SitePackagesDir {
const [bigTarInfo, bigTarSoFiles] = parseTarInfo();

let LOAD_WHEELS_FROM_R2 = true;
let requirementsInBigBundle = new Set([...STDLIB_PACKAGES]);
if (bigTarInfo.children!.size > 10) {
LOAD_WHEELS_FROM_R2 = false;

// Currently, we include all packages within the big bundle in Edgeworker.
// During this transitionary period, we add the option (via autogate)
// to load packages from GCS (in which case they are accessible through the ArtifactBundler)
// or to simply use the packages within the big bundle. The latter is not ideal
// since we're locked to a specific packages version, so we will want to move away
// from it eventually.
if (!LOAD_WHEELS_FROM_R2 && !LOAD_WHEELS_FROM_ARTIFACT_BUNDLER) {
requirements.forEach((r) => requirementsInBigBundle.add(r));
}

const res = new SitePackagesDir();
res.addBigBundle(bigTarInfo, bigTarSoFiles, requirementsInBigBundle);

return [res, LOAD_WHEELS_FROM_R2];
return res;
}

/**
Expand Down Expand Up @@ -188,8 +195,8 @@ export function mountLib(Module: Module, info: TarFSInfo): void {
const site_packages = getSitePackagesPath(Module);
Module.FS.mkdirTree(site_packages);
Module.FS.mkdirTree('/session/metadata');
if (!LOAD_WHEELS_FROM_R2) {
// if we are not loading additional wheels from R2, then we're done
if (!LOAD_WHEELS_FROM_R2 && !LOAD_WHEELS_FROM_ARTIFACT_BUNDLER) {
// if we are not loading additional wheels, then we're done
// with site-packages and we can mount it here. Otherwise, we must mount it in
// loadPackages().
Module.FS.mount(tarFS, { info }, site_packages);
Expand Down Expand Up @@ -253,6 +260,4 @@ function addPackageToLoad(

export { REQUIREMENTS };
export const TRANSITIVE_REQUIREMENTS = getTransitiveRequirements();
export const [SITE_PACKAGES, LOAD_WHEELS_FROM_R2] = buildSitePackages(
TRANSITIVE_REQUIREMENTS
);
export const SITE_PACKAGES = buildSitePackages(TRANSITIVE_REQUIREMENTS);
1 change: 1 addition & 0 deletions src/pyodide/types/artifacts.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ declare namespace ArtifactBundler {
const getMemorySnapshotSize: () => number;
const disposeMemorySnapshot: () => void;
const storeMemorySnapshot: (snap: MemorySnapshotResult) => void;
const getPackage: (path: string) => Reader | null;
}

export default ArtifactBundler;
5 changes: 0 additions & 5 deletions src/pyodide/types/pyodide-lock.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,3 @@ interface PackageLock {
[id: string]: PackageDeclaration;
};
}

declare module 'pyodide-internal:generated/pyodide-lock.json' {
const lock: PackageLock;
export default lock;
}
3 changes: 3 additions & 0 deletions src/pyodide/types/runtime-generated/metadata.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ declare namespace MetadataReader {
) => void;
const getMemorySnapshotSize: () => number;
const disposeMemorySnapshot: () => void;
const shouldUsePackagesInArtifactBundler: () => boolean;
const getPackagesVersion: () => string;
const getPackagesLock: () => string;
const read: (index: number, position: number, buffer: Uint8Array) => number;
}

Expand Down
11 changes: 6 additions & 5 deletions src/workerd/api/pyodide/pyodide.c++
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ namespace workerd::api::pyodide {

const kj::Maybe<jsg::Bundle::Reader> PyodideBundleManager::getPyodideBundle(
kj::StringPtr version) const {
KJ_IF_SOME(t, bundles.lockShared()->find(version)) {
return t.bundle;
}
return kj::none;
return bundles.lockShared()->find(version).map(
[](const MessageBundlePair& t) { return t.bundle; });
}

void PyodideBundleManager::setPyodideBundleData(
Expand Down Expand Up @@ -53,7 +51,7 @@ static int readToTarget(
}

int PackagesTarReader::read(jsg::Lock& js, int offset, kj::Array<kj::byte> buf) {
return readToTarget(PYODIDE_PACKAGES_TAR.get(), offset, buf);
return readToTarget(source, offset, buf);
}

kj::Array<jsg::JsRef<jsg::JsString>> PyodideMetadataReader::getNames(jsg::Lock& js) {
Expand Down Expand Up @@ -159,10 +157,13 @@ jsg::Ref<PyodideMetadataReader> makePyodideMetadataReader(
names.finish(),
contents.finish(),
requirements.finish(),
kj::str("20240829.4"), // TODO: hardcoded version & lock
kj::str(PYODIDE_LOCK.toString()),
true /* isWorkerd */,
false /* isTracing */,
snapshotToDisk,
createBaselineSnapshot,
false, /* usePackagesInArtifactBundler */
kj::none /* memorySnapshot */
);
// clang-format on
Expand Down
Loading

0 comments on commit 157fe35

Please sign in to comment.