Skip to content

Commit

Permalink
Merge pull request #31 from tkiapril/feat/optimize-apicalls
Browse files Browse the repository at this point in the history
Remove double API call on blockTag
  • Loading branch information
tkiapril committed Jul 31, 2023
2 parents 6c87c6b + e7c6296 commit c518537
Showing 1 changed file with 117 additions and 114 deletions.
231 changes: 117 additions & 114 deletions emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,132 +144,135 @@ export async function emitter(
}

// TODO: customizable poll interval and transport
const unwatch = client.watchBlockNumber(
{
onBlockNumber: async (blockNumber) => {
const finalizedBlockNumber = typeof (blockFinality) === "bigint"
? blockNumber - blockFinality
: (await client.getBlock({ blockTag: blockFinality })).number!;
const observed = finalizationQueue.filter((x) =>
x.blockNumber <= finalizedBlockNumber
);
const finalizedBlocks: Record<string, bigint> = {};
const finalized = await observed.reduce(async (acc, x) => {
const hash = (await client.getBlock({ blockNumber: x.blockNumber }))
.hash;
const unwatch = typeof (blockFinality) === "bigint"
? client.watchBlockNumber({
onBlockNumber: (blockNumber) =>
blockFinalized(blockNumber - blockFinality),
})
: client.watchBlocks({
blockTag: blockFinality,
onBlock: (block) => blockFinalized(block.number!),
});

const isFinal = toHex(x.blockHash) === hash;
if (isFinal) finalizedBlocks[hash] = x.blockNumber;
return isFinal ? [...(await acc), x] : acc;
}, Promise.resolve([] as typeof observed));
async function blockFinalized(blockNumber: bigint) {
const observed = finalizationQueue.filter((x) =>
x.blockNumber <= blockNumber
);
const finalizedBlocks: Record<string, bigint> = {};
const finalized = await observed.reduce(async (acc, x) => {
const hash = (await client.getBlock({ blockNumber: x.blockNumber }))
.hash;

await Promise.all(
finalized.map(async (x) => {
const event = await prisma.event.findUnique({
where: {
blockTimestamp_txIndex_logIndex: {
blockTimestamp: new Date(
Number(x.blockTimestamp) * 1000,
),
txIndex: Number(x.txIndex),
logIndex: Number(x.logIndex),
},
},
const isFinal = toHex(x.blockHash) === hash;
if (isFinal) finalizedBlocks[hash] = x.blockNumber;
return isFinal ? [...(await acc), x] : acc;
}, Promise.resolve([] as typeof observed));

await Promise.all(
finalized.map(async (x) => {
const event = await prisma.event.findUnique({
where: {
blockTimestamp_txIndex_logIndex: {
blockTimestamp: new Date(
Number(x.blockTimestamp) * 1000,
),
txIndex: Number(x.txIndex),
logIndex: Number(x.logIndex),
},
},
select: {
txHash: true,
sourceAddress: true,
topic1: true,
topic2: true,
topic3: true,
data: true,
Abi: {
select: {
txHash: true,
sourceAddress: true,
topic1: true,
topic2: true,
topic3: true,
data: true,
Abi: {
select: {
json: true,
},
},
json: true,
},
});
},
},
});

if (event == null) {
console.error(
`ERROR: event ${x.blockTimestamp}_${x.txIndex}_${x.logIndex} not found`,
);
return;
}
if (event == null) {
console.error(
`ERROR: event ${x.blockTimestamp}_${x.txIndex}_${x.logIndex} not found`,
);
return;
}

const { args } = decodeEventLog({
abi: [JSON.parse(event.Abi.json)],
data: toHex(event.data as unknown as Uint8Array),
topics: [toHex(x.sigHash)].concat(
event.topic1 !== null
? [toHex(event.topic1 as unknown as Uint8Array)].concat(
event.topic2 !== null
? [toHex(event.topic2 as unknown as Uint8Array)].concat(
event.topic3 !== null
? [toHex(event.topic3 as unknown as Uint8Array)]
: [],
)
const { args } = decodeEventLog({
abi: [JSON.parse(event.Abi.json)],
data: toHex(event.data as unknown as Uint8Array),
topics: [toHex(x.sigHash)].concat(
event.topic1 !== null
? [toHex(event.topic1 as unknown as Uint8Array)].concat(
event.topic2 !== null
? [toHex(event.topic2 as unknown as Uint8Array)].concat(
event.topic3 !== null
? [toHex(event.topic3 as unknown as Uint8Array)]
: [],
)
: [],
) as [signature: `0x${string}`, ...args: `0x${string}`[]],
});
return fetch(x.url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: losslessJsonStringify({
timestamp: x.blockTimestamp,
blockIndex: x.blockNumber,
transactionIndex: x.txIndex,
logIndex: x.logIndex,
blockHash: toHex(x.blockHash),
transactionHash: toHex(event.txHash as unknown as Uint8Array),
sourceAddress: getAddress(
toHex(event.sourceAddress as unknown as Uint8Array),
),
abiHash: toHex(x.sigHash),
abiSignature: formatAbiItemPrototype(
JSON.parse(event.Abi.json),
),
args: {
named: Object.keys(args).filter((x) =>
!Object.keys([...(args as unknown[])]).includes(x)
).reduce(
(acc, x) => ({
...acc,
[x]: (args as Record<string, unknown>)[x],
}),
{},
),
ordered: [...(args as unknown[])],
},
}),
});
)
: [],
) as [signature: `0x${string}`, ...args: `0x${string}`[]],
});
return fetch(x.url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: losslessJsonStringify({
timestamp: x.blockTimestamp,
blockIndex: x.blockNumber,
transactionIndex: x.txIndex,
logIndex: x.logIndex,
blockHash: toHex(x.blockHash),
transactionHash: toHex(event.txHash as unknown as Uint8Array),
sourceAddress: getAddress(
toHex(event.sourceAddress as unknown as Uint8Array),
),
abiHash: toHex(x.sigHash),
abiSignature: formatAbiItemPrototype(
JSON.parse(event.Abi.json),
),
args: {
named: Object.keys(args).filter((x) =>
!Object.keys([...(args as unknown[])]).includes(x)
).reduce(
(acc, x) => ({
...acc,
[x]: (args as Record<string, unknown>)[x],
}),
{},
),
ordered: [...(args as unknown[])],
},
}),
);
});
}),
);

observed.filter((x) =>
finalizedBlocks[toHex(x.blockHash)] !== x.blockNumber
).forEach(async (x) =>
await prisma.event.delete({
where: {
blockTimestamp_txIndex_logIndex: {
blockTimestamp: new Date(
Number(x.blockTimestamp) * 1000,
),
txIndex: Number(x.txIndex),
logIndex: Number(x.logIndex),
},
},
})
);
observed.filter((x) =>
finalizedBlocks[toHex(x.blockHash)] !== x.blockNumber
).forEach(async (x) =>
await prisma.event.delete({
where: {
blockTimestamp_txIndex_logIndex: {
blockTimestamp: new Date(
Number(x.blockTimestamp) * 1000,
),
txIndex: Number(x.txIndex),
logIndex: Number(x.logIndex),
},
},
})
);

finalizationQueue = finalizationQueue.filter(
(x) => x.blockNumber > finalizedBlockNumber,
);
},
},
);
finalizationQueue = finalizationQueue.filter(
(x) => x.blockNumber > blockNumber,
);
}

const abortController = new AbortController();
const runningPromise = block(abortController.signal);
Expand Down

0 comments on commit c518537

Please sign in to comment.