Skip to content

Commit

Permalink
chore: define main in package.json
Browse files Browse the repository at this point in the history
  • Loading branch information
117 committed Jun 19, 2024
1 parent 2280a71 commit bbfb546
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 133 deletions.
1 change: 1 addition & 0 deletions build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ await build({
url: "https://github.com/@alpacahq/typescript-sdk/issues",
},
homepage: "https://github.com/@alpacahq/typescript-sdk#readme",
main: "mod.js",
},
postBuild() {
// Copy the README to the npm directory (for npmjs.com)
Expand Down
18 changes: 4 additions & 14 deletions factory/createClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ export type RequestOptions<T> = {
};

export type Client =
& {
[K in keyof typeof trade]: ReturnType<(typeof trade)[K]>;
}
& {
[K in keyof typeof marketData]: ReturnType<(typeof marketData)[K]>;
};
& { [K in keyof typeof trade]: ReturnType<(typeof trade)[K]> }
& { [K in keyof typeof marketData]: ReturnType<(typeof marketData)[K]> };

export type ClientContext = {
options: CreateClientOptions;
Expand Down Expand Up @@ -122,17 +118,11 @@ export const createClient = (options: CreateClientOptions) => {
};

// Create a context object to pass to the client factory
const context: ClientContext = {
options,
request,
};
const context: ClientContext = { options, request };

// Return an object with all methods
return [...Object.values(trade), ...Object.values(marketData)].reduce(
(prev, fn) => ({
...prev,
[fn.name]: fn(context),
}),
(prev, fn) => ({ ...prev, [fn.name]: fn(context) }),
{} as Client,
);
};
154 changes: 114 additions & 40 deletions factory/createStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// NOT CLEANED UP; TYPES MISSING FOR CALLBACKS
// NOT CLEANED UP; TYPES MISSING FOR CALLBACKS; BARELY FUNCTIONAL

import { Nullable } from "../api/trade.ts";

Expand All @@ -23,7 +23,16 @@ type CreateStreamOptions = {
retryDelay?: number;
};

export const createStream = (options: CreateStreamOptions): void => {
type EventCallback = (data: any) => void;

type TradeUpdate = {
event: string;
price: string;
qty: string;
timestamp: string;
};

export const createStream = (options: CreateStreamOptions) => {
const {
type,
version = "v2",
Expand All @@ -33,7 +42,6 @@ export const createStream = (options: CreateStreamOptions): void => {
retryDelay = 3000,
} = options;

// Default to environment variables if key or secret are not provided
const key = options.key || Deno.env.get("APCA_KEY_ID");
const secret = options.secret || Deno.env.get("APCA_KEY_SECRET");

Expand All @@ -45,28 +53,30 @@ export const createStream = (options: CreateStreamOptions): void => {
let url: string;

if (type === "data" || type === "data_sandbox") {
// Modify the URL to include version and feed
url = `${baseURLs[type]}/${version}/${feed}`;
} else if (type === "data_test") {
// Test data URL is already fine
url = baseURLs[type];
} else {
// Otherwise, we're dealing with an account stream
url = `${baseURLs[type]}/stream`;
}

console.log(url);
let socket: Nullable<WebSocket> = null;
let retries = 0;

// Handle incoming messages
const handle = (message: object) => {
// @todo: will be passed to a callback with proper typing
console.debug(message);
const eventCallbacks: { [event: string]: EventCallback[] } = {};
const activeStreams: Set<string> = new Set();
let isAuthenticated = false;

const handle = (message: any) => {
const event = message.stream;
if (event && eventCallbacks[event]) {
eventCallbacks[event].forEach((callback) => callback(message));
} else {
console.debug("Unhandled message:", message);
}
};

const connect = () => {
// If auto-reconnect is disabled or max retries reached, stop trying to reconnect
// and close the WebSocket connection.
if (!autoReconnect || (maxRetries !== undefined && retries >= maxRetries)) {
console.debug("Auto-reconnect is disabled or max retries reached.");
socket?.close();
Expand Down Expand Up @@ -101,39 +111,103 @@ export const createStream = (options: CreateStreamOptions): void => {
};

socket.onmessage = ({ data }) => {
try {
const messages = JSON.parse(data);

// Only the data stream sends an array of messages
if (Array.isArray(messages)) {
messages.forEach(handle);
return;
if (typeof data === "string") {
console.log("Received text message:", data);
try {
const result = JSON.parse(data);
if (
result.stream === "authorization" &&
result.data.status === "authorized"
) {
isAuthenticated = true;
sendListenMessage();
}
handle(result);
} catch (error) {
console.debug("Error parsing text message:", error);
}

// deno-lint-ignore no-empty
} catch (_) {}

const blob = new Blob([data]);
const reader = new FileReader();

reader.onload = function () {
if (typeof reader.result === "string") {
try {
const result = JSON.parse(reader.result);
if (Array.isArray(result)) {
result.forEach(handle);
} else {
} else if (data instanceof Blob) {
console.log("Received binary message:", data);
const reader = new FileReader();
reader.onload = function () {
if (typeof reader.result === "string") {
try {
const result = JSON.parse(reader.result);
if (
result.stream === "authorization" &&
result.data.status === "authorized"
) {
isAuthenticated = true;
sendListenMessage();
}
handle(result);
} catch (error) {
console.debug("Error parsing binary message:", error);
}
} catch (error) {
console.debug("Error parsing message:", error);
}
}
};

reader.readAsText(blob);
};
reader.readAsText(data);
} else {
console.debug("Unknown message type:", data);
}
};
};

connect();

const sendListenMessage = () => {
if (socket && socket.readyState === WebSocket.OPEN && isAuthenticated) {
console.log("Sending listen message:", Array.from(activeStreams));
socket.send(
JSON.stringify({
action: "listen",
data: {
streams: Array.from(activeStreams),
},
}),
);
} else {
console.debug(
"Socket is not open or not authenticated. Cannot send listen message.",
);
}
};

const subscribe = (event: string, callback: EventCallback) => {
console.log("Subscribing to event:", event);
if (!eventCallbacks[event]) {
eventCallbacks[event] = [];
}
eventCallbacks[event].push(callback);
activeStreams.add(event);
sendListenMessage();
};

const unsubscribe = (event: string) => {
if (eventCallbacks[event]) {
delete eventCallbacks[event];
activeStreams.delete(event);
sendListenMessage();
}
};

return {
socket,
close: () => socket?.close(),
subscribe,
unsubscribe,
};
};

// const stream = createStream({
// type: "account_paper",
// key: "key",
// secret: "secret",
// autoReconnect: true,
// maxRetries: 5,
// retryDelay: 3000,
// });

// stream.subscribe("trade_updates", (data) => {
// // trade update received
// });
8 changes: 4 additions & 4 deletions factory/createTokenBucket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Deno.test(
});

assert(tokenBucket.take(50) === true);
}
},
);

Deno.test(
Expand All @@ -22,7 +22,7 @@ Deno.test(
});

assert(tokenBucket.take(300) === false);
}
},
);

Deno.test(
Expand All @@ -38,7 +38,7 @@ Deno.test(
await new Promise((resolve) => setTimeout(resolve, 3000));

assert(tokenBucket.take(50) === true);
}
},
);

Deno.test(
Expand All @@ -55,5 +55,5 @@ Deno.test(
}

assert(successfulRequests === 200);
}
},
);
Loading

0 comments on commit bbfb546

Please sign in to comment.