Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 39 additions & 58 deletions codegenerator/cli/npm/envio/src/sources/HyperSync.res
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ module GetLogs = {
}

module BlockData = {
let makeRequestBody = (~fromBlock, ~toBlock): HyperSyncJsonApi.QueryTypes.postQueryBody => {
let makeRequestBody = (~fromBlock, ~toBlock): HyperSyncClient.QueryTypes.query => {
fromBlock,
toBlockExclusive: toBlock + 1,
fieldSelection: {
Expand All @@ -208,51 +208,44 @@ module BlockData = {
includeAllBlocks: true,
}

let convertResponse = (res: HyperSyncJsonApi.ResponseTypes.queryResponse): queryResponse<
let convertResponse = (res: HyperSyncClient.queryResponse): queryResponse<
array<ReorgDetection.blockDataWithTimestamp>,
> => {
res.data
->Array.flatMap(item => {
item.blocks->Option.mapWithDefault([], blocks => {
blocks->Array.map(
block => {
switch block {
| {number: blockNumber, timestamp, hash: blockHash} =>
let blockTimestamp = timestamp->BigInt.toInt->Option.getExn
Ok(
(
{
blockTimestamp,
blockNumber,
blockHash,
}: ReorgDetection.blockDataWithTimestamp
),
)
| _ =>
let missingParams =
[
block.number->Utils.Option.mapNone("block.number"),
block.timestamp->Utils.Option.mapNone("block.timestamp"),
block.hash->Utils.Option.mapNone("block.hash"),
]->Array.keepMap(p => p)

Error(
UnexpectedMissingParams({
queryName: "query block data HyperSync",
missingParams,
}),
)
}
},
res.data.blocks
->Option.getWithDefault([])
->Array.map(block => {
switch block {
| {number: blockNumber, timestamp: blockTimestamp, hash: blockHash} =>
Ok(
(
{
blockTimestamp,
blockNumber,
blockHash,
}: ReorgDetection.blockDataWithTimestamp
),
)
})
| _ =>
let missingParams =
[
block.number->Utils.Option.mapNone("block.number"),
block.timestamp->Utils.Option.mapNone("block.timestamp"),
block.hash->Utils.Option.mapNone("block.hash"),
]->Array.keepMap(p => p)

Error(
UnexpectedMissingParams({
queryName: "query block data HyperSync",
missingParams,
}),
)
}
})
->Utils.Array.transposeResults
}

let rec queryBlockData = async (
~serverUrl,
~apiToken,
~client: HyperSyncClient.t,
~fromBlock,
~toBlock,
~logger,
Expand All @@ -268,38 +261,28 @@ module BlockData = {
},
)

let maybeSuccessfulRes = switch await Time.retryAsyncWithExponentialBackOff(() =>
HyperSyncJsonApi.queryRoute->Rest.fetch(
{
"query": body,
"token": apiToken,
},
~client=Rest.client(serverUrl),
)
, ~logger) {
let maybeSuccessfulRes = switch await client.get(~query=body) {
| exception _ => None
| res if res.nextBlock <= fromBlock => None
| res => Some(res)
}

// If the block is not found, retry the query. This can occur since replicas of hypersync might not hack caught up yet
// If the block is not found, retry the query. This can occur since replicas of hypersync might not have caught up yet
switch maybeSuccessfulRes {
| None => {
let logger = Logging.createChild(~params={"url": serverUrl})
let delayMilliseconds = 100
logger->Logging.childInfo(
`Block #${fromBlock->Int.toString} not found in HyperSync. HyperSync has multiple instances and it's possible that they drift independently slightly from the head. Indexing should continue correctly after retrying the query in ${delayMilliseconds->Int.toString}ms.`,
)
await Time.resolvePromiseAfterDelay(~delayMilliseconds)
await queryBlockData(~serverUrl, ~apiToken, ~fromBlock, ~toBlock, ~logger)
await queryBlockData(~client, ~fromBlock, ~toBlock, ~logger)
}
| Some(res) =>
switch res->convertResponse {
| Error(_) as err => err
| Ok(datas) if res.nextBlock <= toBlock => {
let restRes = await queryBlockData(
~serverUrl,
~apiToken,
~client,
~fromBlock=res.nextBlock,
~toBlock,
~logger,
Expand All @@ -311,7 +294,7 @@ module BlockData = {
}
}

let queryBlockDataMulti = async (~serverUrl, ~apiToken, ~blockNumbers, ~logger) => {
let queryBlockDataMulti = async (~client, ~blockNumbers, ~logger) => {
switch blockNumbers->Array.get(0) {
| None => Ok([])
| Some(firstBlock) => {
Expand All @@ -336,8 +319,7 @@ module BlockData = {
let res = await queryBlockData(
~fromBlock=fromBlock.contents,
~toBlock=toBlock.contents,
~serverUrl,
~apiToken,
~client,
~logger,
)
let filtered = res->Result.map(datas => {
Expand All @@ -356,10 +338,9 @@ module BlockData = {
}
}

let queryBlockData = (~serverUrl, ~apiToken, ~blockNumber, ~logger) =>
let queryBlockData = (~client, ~blockNumber, ~logger) =>
BlockData.queryBlockData(
~serverUrl,
~apiToken,
~client,
~fromBlock=blockNumber,
~toBlock=blockNumber,
~logger,
Expand Down
6 changes: 2 additions & 4 deletions codegenerator/cli/npm/envio/src/sources/HyperSync.resi
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,13 @@ module GetLogs: {
}

let queryBlockData: (
~serverUrl: string,
~apiToken: string,
~client: HyperSyncClient.t,
~blockNumber: int,
~logger: Pino.t,
) => promise<queryResponse<option<ReorgDetection.blockDataWithTimestamp>>>

let queryBlockDataMulti: (
~serverUrl: string,
~apiToken: string,
~client: HyperSyncClient.t,
~blockNumbers: array<int>,
~logger: Pino.t,
) => promise<queryResponse<array<ReorgDetection.blockDataWithTimestamp>>>
Expand Down
21 changes: 20 additions & 1 deletion codegenerator/cli/npm/envio/src/sources/HyperSyncClient.res
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ module QueryTypes = {
* JoinNothing: join nothing.
*/
joinMode?: joinMode,
/**
* Whether to include all blocks regardless of if they are related to a returned transaction or log. Normally
* the server will return only the blocks that are related to the transaction or logs in the response. But if this
* is set to true, the server will return data for all blocks in the requested range [from_block, to_block).
*/
includeAllBlocks?: bool,
}
}

Expand Down Expand Up @@ -456,10 +462,23 @@ type eventResponse = ResponseTypes.eventResponse

//Todo, add bindings for these types
type streamConfig
type queryResponse
type queryResponseStream
type eventStream

type queryResponseData = {
blocks: option<array<ResponseTypes.block>>,
transactions: option<array<ResponseTypes.transaction>>,
logs: option<array<ResponseTypes.log>>,
}

type queryResponse = {
archiveHeight: option<int>,
nextBlock: int,
totalExecutionTime: int,
data: queryResponseData,
rollbackGuard: option<ResponseTypes.rollbackGuard>,
}

@tag("type")
type heightStreamEvent =
| Height({height: int})
Expand Down
6 changes: 2 additions & 4 deletions codegenerator/cli/npm/envio/src/sources/HyperSyncSource.res
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ let make = (
//If there were no logs at all in the current page query then fetch the
//timestamp of the heighest block accounted for
HyperSync.queryBlockData(
~serverUrl=endpointUrl,
~apiToken,
~client,
~blockNumber=heighestBlockQueried,
~logger,
)->Promise.thenResolve(res =>
Expand Down Expand Up @@ -548,8 +547,7 @@ let make = (

let getBlockHashes = (~blockNumbers, ~logger) =>
HyperSync.queryBlockDataMulti(
~serverUrl=endpointUrl,
~apiToken,
~client,
~blockNumbers,
~logger,
)->Promise.thenResolve(HyperSync.mapExn)
Expand Down
Loading