diff --git a/.changeset/true-sides-wait.md b/.changeset/true-sides-wait.md new file mode 100644 index 0000000000..c8b4faa0d8 --- /dev/null +++ b/.changeset/true-sides-wait.md @@ -0,0 +1,5 @@ +--- +'@chainlink/six-adapter': patch +--- + +Price endpoint diff --git a/.pnp.cjs b/.pnp.cjs index 84df3a5272..212892795e 100644 --- a/.pnp.cjs +++ b/.pnp.cjs @@ -7844,6 +7844,7 @@ const RAW_RUNTIME_STATE = "packageDependencies": [\ ["@chainlink/external-adapter-framework", "npm:2.14.2"],\ ["@chainlink/six-adapter", "workspace:packages/sources/six"],\ + ["@sinonjs/fake-timers", "npm:15.4.0"],\ ["@types/jest", "npm:29.5.14"],\ ["@types/node", "npm:22.14.1"],\ ["nock", "npm:13.5.6"],\ @@ -11925,6 +11926,14 @@ const RAW_RUNTIME_STATE = ],\ "linkType": "HARD"\ }],\ + ["npm:15.4.0", {\ + "packageLocation": "./.yarn/cache/@sinonjs-fake-timers-npm-15.4.0-94cf7a0827-3960a9fe06.zip/node_modules/@sinonjs/fake-timers/",\ + "packageDependencies": [\ + ["@sinonjs/commons", "npm:3.0.1"],\ + ["@sinonjs/fake-timers", "npm:15.4.0"]\ + ],\ + "linkType": "HARD"\ + }],\ ["npm:8.1.0", {\ "packageLocation": "./.yarn/cache/@sinonjs-fake-timers-npm-8.1.0-95c51c96db-da50ddd684.zip/node_modules/@sinonjs/fake-timers/",\ "packageDependencies": [\ diff --git a/.yarn/cache/@sinonjs-fake-timers-npm-15.4.0-94cf7a0827-3960a9fe06.zip b/.yarn/cache/@sinonjs-fake-timers-npm-15.4.0-94cf7a0827-3960a9fe06.zip new file mode 100644 index 0000000000..c9c1c9f095 Binary files /dev/null and b/.yarn/cache/@sinonjs-fake-timers-npm-15.4.0-94cf7a0827-3960a9fe06.zip differ diff --git a/packages/sources/six/package.json b/packages/sources/six/package.json index a7cfb3dd4b..a352d9910a 100644 --- a/packages/sources/six/package.json +++ b/packages/sources/six/package.json @@ -28,6 +28,7 @@ "start": "yarn server:dist" }, "devDependencies": { + "@sinonjs/fake-timers": "15.4.0", "@types/jest": "^29.5.14", "@types/node": "22.14.1", "nock": "13.5.6", diff --git a/packages/sources/six/src/config/index.ts b/packages/sources/six/src/config/index.ts index d998411f38..b71af3b64b 100644 --- a/packages/sources/six/src/config/index.ts +++ b/packages/sources/six/src/config/index.ts @@ -1,64 +1,73 @@ import { AdapterConfig } from '@chainlink/external-adapter-framework/config' -export const config = new AdapterConfig({ - WS_API_ENDPOINT: { - description: 'SIX WebSocket API endpoint', - type: 'string', - default: 'wss://api.six-group.com/web/v2/websocket', - sensitive: false, - }, - API_ENDPOINT: { - description: 'SIX REST API base URL', - type: 'string', - default: 'https://api.six-group.com', - sensitive: false, - }, - PRIVATE_KEY: { - description: - 'The private key that starts with "-----BEGIN PRIVATE KEY-----" and end with "-----END PRIVATE KEY-----"', - type: 'string', - required: true, - sensitive: true, - validate: { - meta: { - details: 'Value must be a valid private key', +export const config = new AdapterConfig( + { + WS_API_ENDPOINT: { + description: 'SIX WebSocket API endpoint', + type: 'string', + default: 'wss://api.six-group.com/web/v2/websocket', + sensitive: false, + }, + API_ENDPOINT: { + description: 'SIX REST API base URL', + type: 'string', + default: 'https://api.six-group.com', + sensitive: false, + }, + PRIVATE_KEY: { + description: + 'The private key that starts with "-----BEGIN PRIVATE KEY-----" and end with "-----END PRIVATE KEY-----"', + type: 'string', + required: true, + sensitive: true, + validate: { + meta: { + details: 'Value must be a valid private key', + }, + fn: (value) => { + if ( + !( + value && + value.startsWith('-----BEGIN PRIVATE KEY-----\n') && + value.endsWith('\n-----END PRIVATE KEY-----') + ) + ) { + return 'Value must be a valid private key that starts with "-----BEGIN PRIVATE KEY-----\\n" and end with "\\n-----END PRIVATE KEY-----"' + } + return + }, }, - fn: (value) => { - if ( - !( - value && - value.startsWith('-----BEGIN PRIVATE KEY-----\n') && - value.endsWith('\n-----END PRIVATE KEY-----') - ) - ) { - return 'Value must be a valid private key that starts with "-----BEGIN PRIVATE KEY-----\\n" and end with "\\n-----END PRIVATE KEY-----"' - } - return + }, + PUBLIC_CERT: { + description: + 'The public certificate that starts with "-----BEGIN CERTIFICATE-----" and end with "-----END CERTIFICATE-----"', + type: 'string', + required: true, + sensitive: false, + validate: { + meta: { + details: 'Value must be a valid public certificate', + }, + fn: (value) => { + if ( + !( + value && + value.startsWith('-----BEGIN CERTIFICATE-----\n') && + value.endsWith('\n-----END CERTIFICATE-----') + ) + ) { + return 'Value must be a valid public certificate that starts with "-----BEGIN CERTIFICATE-----\\n" and end with "\\n-----END CERTIFICATE-----"' + } + return + }, }, }, }, - PUBLIC_CERT: { - description: - 'The public certificate that starts with "-----BEGIN CERTIFICATE-----" and end with "-----END CERTIFICATE-----"', - type: 'string', - required: true, - sensitive: false, - validate: { - meta: { - details: 'Value must be a valid public certificate', - }, - fn: (value) => { - if ( - !( - value && - value.startsWith('-----BEGIN CERTIFICATE-----\n') && - value.endsWith('\n-----END CERTIFICATE-----') - ) - ) { - return 'Value must be a valid public certificate that starts with "-----BEGIN CERTIFICATE-----\\n" and end with "\\n-----END CERTIFICATE-----"' - } - return - }, + { + envDefaultOverrides: { + // To maintain a stable WebSocket connection, clients must send a ping every 10 seconds + // Doing so every 5 seconds here to account for delay etc... + WS_HEARTBEAT_INTERVAL_MS: 5_000, }, }, -}) +) diff --git a/packages/sources/six/src/endpoint/index.ts b/packages/sources/six/src/endpoint/index.ts index f824920fa4..ab45309ad9 100644 --- a/packages/sources/six/src/endpoint/index.ts +++ b/packages/sources/six/src/endpoint/index.ts @@ -1 +1,2 @@ export { endpoint as marketStatus } from './market-status' +export { endpoint as stock } from './stock' diff --git a/packages/sources/six/src/endpoint/stock.ts b/packages/sources/six/src/endpoint/stock.ts new file mode 100644 index 0000000000..13b52bdda5 --- /dev/null +++ b/packages/sources/six/src/endpoint/stock.ts @@ -0,0 +1,60 @@ +import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter' +import { stockEndpointInputParametersDefinition } from '@chainlink/external-adapter-framework/adapter/stock' +import { SingleNumberResultResponse } from '@chainlink/external-adapter-framework/util' +import { InputParameters } from '@chainlink/external-adapter-framework/validation' +import { AdapterInputError } from '@chainlink/external-adapter-framework/validation/error' +import { config } from '../config' +import { wsTransport } from '../transport/stock' + +export const inputParameters = new InputParameters( + { + ...stockEndpointInputParametersDefinition, + rawEndpoint: { + type: 'string', + description: 'The value of endpoint input', + }, + }, + [ + { + base: 'ABBN_4', + rawEndpoint: '', + }, + ], +) + +export type BaseEndpointTypes = { + Parameters: typeof inputParameters.definition + Settings: typeof config.settings + Response: + | SingleNumberResultResponse + | { + Result: null + Data: { + mid_price: number + bid_price: number + bid_volume: number + ask_price: number + ask_volume: number + } + } +} + +export const endpoint = new AdapterEndpoint({ + name: 'stock', + aliases: ['stock_quotes'], + transport: wsTransport, + inputParameters: inputParameters, + requestTransforms: [ + (req) => { + const [ticker, market] = req.requestContext.data.base.split('_') + if (!ticker || !market) { + /// Not using customInputValidation because we want to validate after overrides are applied + throw new AdapterInputError({ + statusCode: 400, + message: 'base must be in the format of ${TICKER}_${MARKET}', + }) + } + req.requestContext.data.rawEndpoint = req.requestContext.requestEndpointName + }, + ], +}) diff --git a/packages/sources/six/src/index.ts b/packages/sources/six/src/index.ts index f4a98d9ad7..e2c64ff3c9 100644 --- a/packages/sources/six/src/index.ts +++ b/packages/sources/six/src/index.ts @@ -1,13 +1,13 @@ import { expose, ServerInstance } from '@chainlink/external-adapter-framework' import { Adapter } from '@chainlink/external-adapter-framework/adapter' import { config } from './config' -import { marketStatus } from './endpoint' +import { marketStatus, stock } from './endpoint' export const adapter = new Adapter({ - defaultEndpoint: marketStatus.name, + defaultEndpoint: stock.name, name: 'SIX', config, - endpoints: [marketStatus], + endpoints: [marketStatus, stock], rateLimiting: { tiers: { default: { diff --git a/packages/sources/six/src/transport/stock-cache.ts b/packages/sources/six/src/transport/stock-cache.ts new file mode 100644 index 0000000000..40b3ddc728 --- /dev/null +++ b/packages/sources/six/src/transport/stock-cache.ts @@ -0,0 +1,120 @@ +import { makeLogger } from '@chainlink/external-adapter-framework/util' +import { PriceMessage } from './stock' + +const MESSAGE_TTL_SECONDS = 300 +const convertTimeToMs = (time?: number) => (time ? Math.floor(time * 1000) : undefined) +const isMessageOld = (time?: number) => + time ? Date.now() - time > MESSAGE_TTL_SECONDS * 1000 : false + +export class StockCache { + bidCache: Map = new Map() + askCache: Map = new Map() + private logger = makeLogger('StockCache') + + processBidAsk(streamId: string, bid?: PriceMessage, ask?: PriceMessage) { + this.setBookSide(streamId, 'bid', bid) + this.setBookSide(streamId, 'ask', ask) + } + + private setBookSide(streamId: string, side: 'bid' | 'ask', msg?: PriceMessage) { + const cache = side === 'bid' ? this.bidCache : this.askCache + const price = msg?.value + const volume = msg?.size + const time = convertTimeToMs(msg?.unixTimestamp) + + if (isMessageOld(time)) { + this.logger.warn( + `${side} message ${JSON.stringify(msg)} is more than ${MESSAGE_TTL_SECONDS}s old`, + ) + return + } + if ( + price === undefined || + Number.isNaN(price) || + volume === undefined || + Number.isNaN(volume) + ) { + this.logger.warn(`Invalid or missing ${side} ${JSON.stringify(msg)}`) + return + } + cache.set(streamId, { + price, + volume, + time, + }) + } + + getPriceResponse(streamId: string, last?: PriceMessage) { + const result = last?.value + if (result === undefined || Number.isNaN(result)) { + this.logger.info(`Invalid or missing last: ${JSON.stringify(last)}`) + return [] + } + + const time = convertTimeToMs(last?.unixTimestamp) + if (isMessageOld(time)) { + this.logger.warn( + `Last message ${JSON.stringify(last)} is more than ${MESSAGE_TTL_SECONDS}s old`, + ) + return [] + } + + return [ + { + params: { base: streamId, rawEndpoint: 'stock' }, + response: { + result, + data: { + result, + }, + ...(time && { + timestamps: { + providerIndicatedTimeUnixMs: time, + }, + }), + }, + }, + ] + } + + getBidAskResponse(streamId: string) { + const bid = this.bidCache.get(streamId) + const ask = this.askCache.get(streamId) + + if (bid && ask) { + let midPrice: number + if (bid.price === 0) { + midPrice = ask.price + } else if (ask.price === 0) { + midPrice = bid.price + } else { + midPrice = (bid.price + ask.price) / 2 + } + + const time = Math.max(bid.time || 0, ask.time || 0) + return [ + { + params: { base: streamId, rawEndpoint: 'stock_quotes' }, + response: { + result: null, + data: { + mid_price: midPrice, + bid_price: bid.price, + bid_volume: bid.volume, + ask_price: ask.price, + ask_volume: ask.volume, + }, + ...(time && { + timestamps: { + providerIndicatedTimeUnixMs: time, + }, + }), + }, + }, + ] + } else { + this.logger.info(`Missing bid or ask for ${streamId}`) + return [] + } + } +} diff --git a/packages/sources/six/src/transport/stock.ts b/packages/sources/six/src/transport/stock.ts new file mode 100644 index 0000000000..bc596ff581 --- /dev/null +++ b/packages/sources/six/src/transport/stock.ts @@ -0,0 +1,166 @@ +import { WebSocketTransport } from '@chainlink/external-adapter-framework/transports/websocket' +import { makeLogger } from '@chainlink/external-adapter-framework/util' +import { BaseEndpointTypes } from '../endpoint/stock' +import { StockCache } from './stock-cache' + +export type Message = DataMessage | ErrorMessage + +export interface PriceMessage { + value?: number + size?: number + unixTimestamp?: number +} + +interface DataMessage { + data: { + startStream: { + type: string + streamId: string + last?: PriceMessage + bestBid?: PriceMessage + bestAsk?: PriceMessage + }[] + } +} + +interface ErrorMessage { + errors: { + message: string + messageCode: number + category: string + type: string + }[] +} + +type WsTransportTypes = BaseEndpointTypes & { + Provider: { + WsMessage: Message + } +} + +const logger = makeLogger('StockTransport') +const PONG_TIMEOUT = 2000 + +let pongWaitTimer: ReturnType | undefined +function clearPongWaitTimer() { + if (pongWaitTimer !== undefined) { + clearTimeout(pongWaitTimer) + pongWaitTimer = undefined + } +} + +// Prevent sending duplicate streamId to server +const activeStreamIds = new Set() +// We may get bid and ask updates in two seperate messages +const cache = new StockCache() + +export const wsTransport = new WebSocketTransport({ + url: (context) => context.adapterSettings.WS_API_ENDPOINT, + + options: (context) => ({ + cert: context.adapterSettings.PUBLIC_CERT, + key: context.adapterSettings.PRIVATE_KEY, + }), + + handlers: { + open() { + clearPongWaitTimer() + activeStreamIds.clear() + }, + close() { + clearPongWaitTimer() + }, + heartbeat(connection) { + if (pongWaitTimer) { + clearPongWaitTimer() + connection.close( + 1006, + `Heartbeat frequency exceeded ${PONG_TIMEOUT}ms, increase WS_HEARTBEAT_INTERVAL_MS in environment variable`, + ) + } else { + connection.ping() + pongWaitTimer = setTimeout(() => { + pongWaitTimer = undefined + logger.error(`Pong not received within ${PONG_TIMEOUT}ms after ping; closing connection`) + connection.close( + 1006, + 'The connection appears to be active but stopped receiving updates', + ) + }, PONG_TIMEOUT) + } + }, + pong() { + clearPongWaitTimer() + }, + message(message) { + if ('errors' in message) { + logger.error({ errors: message.errors }, 'SIX API returned errors') + return [] + } + + return message.data.startStream + .map((stream) => { + const [ticker, market] = stream.streamId.split('_') + if (!ticker || !market) { + logger.warn(`Unexpected streamId format ${stream.streamId}`) + return [] + } + if (stream.type === 'ERROR') { + return ['stock', 'stock_quotes'].map((requestEndpoint) => ({ + params: { base: stream.streamId, rawEndpoint: requestEndpoint }, + response: { + statusCode: 502, + errorMessage: `Data Provider returned error for this request`, + }, + })) + } + + if (stream.type === 'START' || stream.type === 'UPDATE') { + cache.processBidAsk(stream.streamId, stream.bestBid, stream.bestAsk) + return [ + ...cache.getPriceResponse(stream.streamId, stream.last), + ...cache.getBidAskResponse(stream.streamId), + ] + } + logger.debug(`Ignore message ${stream}`) + return [] + }) + .flat() + }, + }, + + builders: { + subscribeMessage: (params) => { + if (activeStreamIds.has(params.base)) { + return undefined + } + activeStreamIds.add(params.base) + + return { + query: `subscription { + startStream( + scheme: TICKER_BC, + ids: ["${params.base}"], + streamId: "${params.base}", + conflationType: INTERVAL, + conflationPeriod: "PT1S" + ) { + type + requestedId + streamId + last { value unixTimestamp } + bestBid { value size unixTimestamp } + bestAsk { value size unixTimestamp } + mid { value unixTimestamp } + } + }`, + } + }, + unsubscribeMessage: (params) => { + activeStreamIds.delete(params.base) + return { + query: `mutation { closeStream(streamId: "${params.base}") { type streamId } }`, + } + }, + }, +}) diff --git a/packages/sources/six/test/integration/__snapshots__/adapter-stock.test.ts.snap b/packages/sources/six/test/integration/__snapshots__/adapter-stock.test.ts.snap new file mode 100644 index 0000000000..cbf9f5fda3 --- /dev/null +++ b/packages/sources/six/test/integration/__snapshots__/adapter-stock.test.ts.snap @@ -0,0 +1,121 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`execute stock endpoint - bid ask only stock fail 1`] = ` +{ + "error": { + "message": "The EA has not received any values from the Data Provider for the requested data yet. Retry after a short delay, and if the problem persists raise this issue in the relevant channels.", + "name": "AdapterError", + }, + "status": "errored", + "statusCode": 504, +} +`; + +exports[`execute stock endpoint - bid ask only stock_quotes pass 1`] = ` +{ + "data": { + "ask_price": 4, + "ask_volume": 5, + "bid_price": 1, + "bid_volume": 2, + "mid_price": 2.5, + }, + "result": null, + "statusCode": 200, + "timestamps": { + "providerDataReceivedUnixMs": 6068, + "providerDataStreamEstablishedUnixMs": 1010, + "providerIndicatedTimeUnixMs": 6000, + }, +} +`; + +exports[`execute stock endpoint - bid update stock_quotes pass 1`] = ` +{ + "data": { + "ask_price": 13, + "ask_volume": 14, + "bid_price": 100, + "bid_volume": 120, + "mid_price": 56.5, + }, + "result": null, + "statusCode": 200, + "timestamps": { + "providerDataReceivedUnixMs": 6068, + "providerDataStreamEstablishedUnixMs": 1010, + "providerIndicatedTimeUnixMs": 120000, + }, +} +`; + +exports[`execute stock endpoint - happy path stock pass 1`] = ` +{ + "data": { + "result": 100.25, + }, + "result": 100.25, + "statusCode": 200, + "timestamps": { + "providerDataReceivedUnixMs": 6068, + "providerDataStreamEstablishedUnixMs": 1010, + "providerIndicatedTimeUnixMs": 1500, + }, +} +`; + +exports[`execute stock endpoint - happy path stock_quotes pass 1`] = ` +{ + "data": { + "ask_price": 100.5, + "ask_volume": 5, + "bid_price": 100, + "bid_volume": 10, + "mid_price": 100.25, + }, + "result": null, + "statusCode": 200, + "timestamps": { + "providerDataReceivedUnixMs": 6068, + "providerDataStreamEstablishedUnixMs": 1010, + "providerIndicatedTimeUnixMs": 2000, + }, +} +`; + +exports[`execute stock endpoint - invalid base stock fail 1`] = ` +{ + "error": { + "message": "base must be in the format of \${TICKER}_\${MARKET}", + "name": "AdapterError", + }, + "status": "errored", + "statusCode": 400, +} +`; + +exports[`execute stock endpoint - last only stock pass 1`] = ` +{ + "data": { + "result": 1, + }, + "result": 1, + "statusCode": 200, + "timestamps": { + "providerDataReceivedUnixMs": 6068, + "providerDataStreamEstablishedUnixMs": 1010, + "providerIndicatedTimeUnixMs": 2000, + }, +} +`; + +exports[`execute stock endpoint - last only stock_quotes fail 1`] = ` +{ + "error": { + "message": "The EA has not received any values from the Data Provider for the requested data yet. Retry after a short delay, and if the problem persists raise this issue in the relevant channels.", + "name": "AdapterError", + }, + "status": "errored", + "statusCode": 504, +} +`; diff --git a/packages/sources/six/test/integration/adapter-stock.test.ts b/packages/sources/six/test/integration/adapter-stock.test.ts new file mode 100644 index 0000000000..a79ccefa84 --- /dev/null +++ b/packages/sources/six/test/integration/adapter-stock.test.ts @@ -0,0 +1,113 @@ +import { WebSocketClassProvider } from '@chainlink/external-adapter-framework/transports' +import { + mockWebSocketProvider, + MockWebsocketServer, + setEnvVariables, + TestAdapter, +} from '@chainlink/external-adapter-framework/util/testing-utils' +import FakeTimers from '@sinonjs/fake-timers' +import { mockSixStockWebSocketServer } from './stock-fixtures' + +describe('execute', () => { + let mockWsServer: MockWebsocketServer | undefined + let testAdapter: TestAdapter + const wsEndpoint = 'ws://localhost:9090' + let oldEnv: NodeJS.ProcessEnv + + const normalRequest = { base: 'happy_market' } + const lastonlyRequest = { base: 'last_only' } + const bidaskonlyRequest = { base: 'bidask_only' } + const errorRequest = { base: 'error_market' } + const bidUpdateRequest = { base: 'bid_update' } + + beforeAll(async () => { + oldEnv = JSON.parse(JSON.stringify(process.env)) + process.env.PRIVATE_KEY = + '-----BEGIN PRIVATE KEY-----\nfake-private-key\n-----END PRIVATE KEY-----' + process.env.PUBLIC_CERT = + '-----BEGIN CERTIFICATE-----\nfake-public-cert\n-----END CERTIFICATE-----' + process.env.WS_API_ENDPOINT = wsEndpoint + + mockWebSocketProvider(WebSocketClassProvider) + mockWsServer = mockSixStockWebSocketServer(wsEndpoint) + + const adapter = (await import('../../src')).adapter + adapter.rateLimiting = undefined + testAdapter = await TestAdapter.startWithMockedCache(adapter, { + clock: FakeTimers.install({ + // To make TestAdapter.waitUntilResolved / waitForCache work with clock.nextAsync + toNotFake: ['nextTick', 'queueMicrotask'], + }), + testAdapter: {} as TestAdapter, + }) + + await testAdapter.request(normalRequest) // 2 cache + await testAdapter.request(lastonlyRequest) // 1 cache + await testAdapter.request(bidaskonlyRequest) // 1 cache + await testAdapter.request(errorRequest) // 0 cache + await testAdapter.request(bidUpdateRequest) //1 cache + await testAdapter.waitForCache(5) + }) + + afterAll(async () => { + setEnvVariables(oldEnv) + mockWsServer?.close() + testAdapter.clock?.uninstall() + await testAdapter.api.close() + }) + + describe('stock endpoint - invalid base', () => { + it('stock fail', async () => { + expect((await testAdapter.request({ base: 'lol' })).json()).toMatchSnapshot() + }) + }) + + describe('stock endpoint - happy path', () => { + it('stock pass', async () => { + expect((await testAdapter.request(normalRequest)).json()).toMatchSnapshot() + }) + it('stock_quotes pass', async () => { + const response = await testAdapter.request({ + ...normalRequest, + endpoint: 'stock_quotes', + }) + expect(response.json()).toMatchSnapshot() + }) + }) + + describe('stock endpoint - last only', () => { + it('stock pass', async () => { + expect((await testAdapter.request(lastonlyRequest)).json()).toMatchSnapshot() + }) + it('stock_quotes fail', async () => { + const response = await testAdapter.request({ + ...lastonlyRequest, + endpoint: 'stock_quotes', + }) + expect(response.json()).toMatchSnapshot() + }) + }) + + describe('stock endpoint - bid ask only', () => { + it('stock fail', async () => { + expect((await testAdapter.request(bidaskonlyRequest)).json()).toMatchSnapshot() + }) + it('stock_quotes pass', async () => { + const response = await testAdapter.request({ + ...bidaskonlyRequest, + endpoint: 'stock_quotes', + }) + expect(response.json()).toMatchSnapshot() + }) + }) + + describe('stock endpoint - bid update', () => { + it('stock_quotes pass', async () => { + const response = await testAdapter.request({ + ...bidUpdateRequest, + endpoint: 'stock_quotes', + }) + expect(response.json()).toMatchSnapshot() + }) + }) +}) diff --git a/packages/sources/six/test/integration/stock-fixtures.ts b/packages/sources/six/test/integration/stock-fixtures.ts new file mode 100644 index 0000000000..3b647e0cc3 --- /dev/null +++ b/packages/sources/six/test/integration/stock-fixtures.ts @@ -0,0 +1,86 @@ +import { MockWebsocketServer } from '@chainlink/external-adapter-framework/util/testing-utils' + +export const mockSixStockWebSocketServer = (URL: string): MockWebsocketServer => { + const mockWsServer = new MockWebsocketServer(URL, { mock: false }) + + const streams = JSON.stringify({ + data: { + startStream: [ + { + type: 'UPDATE', + streamId: `happy_market`, + last: { value: 100.25, unixTimestamp: 1.5 }, + bestBid: { value: 100, size: 10, unixTimestamp: 1 }, + bestAsk: { value: 100.5, size: 5, unixTimestamp: 2 }, + }, + // Ignored by type + { + type: 'CANCEL', + streamId: `happy_market`, + last: { value: 999, unixTimestamp: 999 }, + bestBid: { value: 999, size: 999, unixTimestamp: 999 }, + bestAsk: { value: 999, size: 999, unixTimestamp: 999 }, + }, + // Ignored by missing value + { + type: 'UPDATE', + streamId: `happy_market`, + last: { unixTimestamp: 999 }, + bestBid: { value: 999, unixTimestamp: 999 }, + bestAsk: { size: 999, unixTimestamp: 999 }, + }, + { + type: 'UPDATE', + streamId: `last_only`, + last: { value: 1, unixTimestamp: 2 }, + }, + { + type: 'UPDATE', + streamId: `bidask_only`, + bestBid: { value: 1, size: 2, unixTimestamp: 3 }, + bestAsk: { value: 4, size: 5, unixTimestamp: 6 }, + }, + // Ignored by invalid stream id + { + type: 'UPDATE', + streamId: `lol`, + last: { value: 100.25, unixTimestamp: 1.5 }, + }, + { + type: 'UPDATE', + streamId: `bid_update`, + bestBid: { value: 10, size: 12, unixTimestamp: 12 }, + bestAsk: { value: 13, size: 14, unixTimestamp: 15 }, + }, + { + type: 'UPDATE', + streamId: `bid_update`, + bestBid: { value: 100, size: 120, unixTimestamp: 120 }, + bestAsk: { value: 13, size: 14, unixTimestamp: 15 }, + }, + ], + }, + }) + + const error = JSON.stringify({ + errors: [ + { + message: 'Error', + messageCode: 100, + category: 'Erro category 1', + type: 'error', + }, + ], + }) + + mockWsServer.on('connection', (socket) => { + socket.on('message', (message) => { + if (message.toString().includes('error_market')) { + socket.send(error) + } else { + socket.send(streams) + } + }) + }) + return mockWsServer +} diff --git a/packages/sources/six/test/unit/stock-cache.test.ts b/packages/sources/six/test/unit/stock-cache.test.ts new file mode 100644 index 0000000000..d5e6d48e20 --- /dev/null +++ b/packages/sources/six/test/unit/stock-cache.test.ts @@ -0,0 +1,257 @@ +import { LoggerFactoryProvider } from '@chainlink/external-adapter-framework/util' +import { StockCache } from '../../src/transport/stock-cache' + +LoggerFactoryProvider.set() + +describe('StockCache', () => { + const streamId = 'stream-1' + + describe('processBidAsk', () => { + it('stores bid and ask with provider time in ms', () => { + const now = Date.now() + const cache = new StockCache() + cache.processBidAsk( + streamId, + { value: 100, size: 10, unixTimestamp: now / 1000 }, + { value: 102, size: 5, unixTimestamp: now / 1000 }, + ) + + expect(cache.bidCache.get(streamId)).toEqual({ price: 100, volume: 10, time: now }) + expect(cache.askCache.get(streamId)).toEqual({ price: 102, volume: 5, time: now }) + }) + + it('does not store when price or volume is missing', () => { + const cache = new StockCache() + cache.processBidAsk(streamId, { value: 1 }, { size: 2 }) + expect(cache.bidCache.has(streamId)).toBe(false) + expect(cache.askCache.has(streamId)).toBe(false) + }) + + it('does not store when message is too old', () => { + const cache = new StockCache() + + const old = Date.now() / 1000 - 305 + cache.processBidAsk( + streamId, + { value: 100, size: 10, unixTimestamp: old }, + { value: 102, size: 5, unixTimestamp: old }, + ) + expect(cache.bidCache.has(streamId)).toBe(false) + expect(cache.askCache.has(streamId)).toBe(false) + + const notOld = Date.now() / 1000 - 295 + cache.processBidAsk( + streamId, + { value: 100, size: 10, unixTimestamp: notOld }, + { value: 102, size: 5, unixTimestamp: notOld }, + ) + expect(cache.bidCache.has(streamId)).toBe(true) + expect(cache.askCache.has(streamId)).toBe(true) + }) + + it('does not store NaN price or volume', () => { + const cache = new StockCache() + cache.processBidAsk(streamId, { value: NaN, size: 1 }, { value: 1, size: NaN }) + expect(cache.bidCache.has(streamId)).toBe(false) + expect(cache.askCache.has(streamId)).toBe(false) + }) + + it('allows zero price and volume and no timestamp', () => { + const cache = new StockCache() + cache.processBidAsk(streamId, { value: 0, size: 0 }, { value: 0, size: 0 }) + expect(cache.bidCache.get(streamId)).toEqual({ price: 0, volume: 0 }) + expect(cache.askCache.get(streamId)).toEqual({ price: 0, volume: 0 }) + }) + }) + + describe('getPriceResponse', () => { + it('returns adapter response successfully', () => { + const now = Date.now() + const cache = new StockCache() + expect(cache.getPriceResponse(streamId, { value: 55.5, unixTimestamp: now / 1000 })).toEqual([ + { + params: { base: streamId, rawEndpoint: 'stock' }, + response: { + result: 55.5, + data: { result: 55.5 }, + timestamps: { providerIndicatedTimeUnixMs: now }, + }, + }, + ]) + }) + + it('no timestamps when missing unixTimestamp', () => { + const cache = new StockCache() + expect(cache.getPriceResponse(streamId, { value: 10 })).toEqual([ + { + params: { base: streamId, rawEndpoint: 'stock' }, + response: { result: 10, data: { result: 10 } }, + }, + ]) + }) + + it('returns empty array when last value is missing or NaN', () => { + const cache = new StockCache() + expect(cache.getPriceResponse(streamId, undefined)).toEqual([]) + expect(cache.getPriceResponse(streamId, {})).toEqual([]) + expect(cache.getPriceResponse(streamId, { value: NaN })).toEqual([]) + }) + + it('returns empty array when message is too old', () => { + const cache = new StockCache() + expect( + cache.getPriceResponse(streamId, { + value: 55.5, + unixTimestamp: Date.now() / 1000 - 295, + }).length, + ).toBeGreaterThan(0) + expect( + cache.getPriceResponse(streamId, { + value: 55.5, + unixTimestamp: Date.now() / 1000 - 305, + }).length, + ).toEqual(0) + }) + }) + + describe('getBidAskResponse', () => { + it('returns mid, sides, volumes, and max provider time', () => { + const now = Date.now() + const cache = new StockCache() + cache.processBidAsk( + streamId, + { value: 100, size: 10, unixTimestamp: now / 1000 }, + { value: 104, size: 4, unixTimestamp: now / 1000 }, + ) + + expect(cache.getBidAskResponse(streamId)).toEqual([ + { + params: { base: streamId, rawEndpoint: 'stock_quotes' }, + response: { + result: null, + data: { + mid_price: 102, + bid_price: 100, + bid_volume: 10, + ask_price: 104, + ask_volume: 4, + }, + timestamps: { providerIndicatedTimeUnixMs: now }, + }, + }, + ]) + }) + + it('returns mid, sides, volumes, and max provider time - update seperately', () => { + const now = Date.now() + const cache = new StockCache() + cache.processBidAsk(streamId, { value: 100, size: 10, unixTimestamp: now / 1000 }) + cache.processBidAsk(streamId, undefined, { + value: 104, + size: 4, + unixTimestamp: (now * 10) / 1000, + }) + + expect(cache.getBidAskResponse(streamId)).toEqual([ + { + params: { base: streamId, rawEndpoint: 'stock_quotes' }, + response: { + result: null, + data: { + mid_price: 102, + bid_price: 100, + bid_volume: 10, + ask_price: 104, + ask_volume: 4, + }, + timestamps: { providerIndicatedTimeUnixMs: now * 10 }, + }, + }, + ]) + }) + + it('refreshes bid only when there is a bid update', () => { + const now = Date.now() + const cache = new StockCache() + cache.processBidAsk( + streamId, + { value: 100, size: 10, unixTimestamp: now / 1000 }, + { value: 104, size: 4, unixTimestamp: now / 1000 }, + ) + + cache.processBidAsk(streamId, { value: 1000, size: 100, unixTimestamp: (now * 10) / 1000 }) + + expect(cache.getBidAskResponse(streamId)).toEqual([ + { + params: { base: streamId, rawEndpoint: 'stock_quotes' }, + response: { + result: null, + data: { + mid_price: 552, + bid_price: 1000, + bid_volume: 100, + ask_price: 104, + ask_volume: 4, + }, + timestamps: { providerIndicatedTimeUnixMs: now * 10 }, + }, + }, + ]) + }) + + it('uses ask price as mid when bid is zero', () => { + const now = Date.now() + const cache = new StockCache() + cache.processBidAsk( + streamId, + { value: 0, size: 1, unixTimestamp: now / 1000 }, + { value: 200, size: 1, unixTimestamp: now / 1000 }, + ) + const [row] = cache.getBidAskResponse(streamId) + expect(row.response.data.mid_price).toBe(200) + }) + + it('uses bid price as mid when ask is zero', () => { + const now = Date.now() + const cache = new StockCache() + cache.processBidAsk( + streamId, + { value: 100, size: 1, unixTimestamp: now / 1000 }, + { value: 0, size: 1, unixTimestamp: now / 1000 }, + ) + const [row] = cache.getBidAskResponse(streamId) + expect(row.response.data.mid_price).toBe(100) + }) + + it('returns empty array when bid is missing', () => { + const cache = new StockCache() + cache.processBidAsk(streamId, { value: 1, size: 1 }, undefined) + expect(cache.getBidAskResponse(streamId)).toEqual([]) + }) + + it('returns empty array when ask is missing', () => { + const cache = new StockCache() + cache.processBidAsk(streamId, undefined, { value: 2, size: 2 }) + expect(cache.getBidAskResponse(streamId)).toEqual([]) + }) + + it('omits timestamps when neither side has provider time', () => { + const cache = new StockCache() + cache.processBidAsk(streamId, { value: 10, size: 1 }, { value: 20, size: 1 }) + const [row] = cache.getBidAskResponse(streamId) + expect(row.response).not.toHaveProperty('timestamps') + }) + + it('use bid timestamps if ask timestamp is missing', () => { + const now = Date.now() + const cache = new StockCache() + cache.processBidAsk( + streamId, + { value: 10, size: 1, unixTimestamp: now / 1000 }, + { value: 20, size: 1 }, + ) + const [row] = cache.getBidAskResponse(streamId) + expect(row.response.timestamps).toEqual({ providerIndicatedTimeUnixMs: now }) + }) + }) +}) diff --git a/packages/sources/six/test/unit/stock-ws-ping-watchdog.test.ts b/packages/sources/six/test/unit/stock-ws-ping-watchdog.test.ts new file mode 100644 index 0000000000..969ffb871b --- /dev/null +++ b/packages/sources/six/test/unit/stock-ws-ping-watchdog.test.ts @@ -0,0 +1,99 @@ +import { LoggerFactoryProvider } from '@chainlink/external-adapter-framework/util' +import { wsTransport } from '../../src/transport/stock' + +LoggerFactoryProvider.set() + +type StubWs = { + readyState: number + ping: jest.Mock + close: jest.Mock +} + +/** Handlers are exercised by the framework at runtime; for unit checks we introspect here only. */ +function getWsHandlers(): { + open: () => void + close: () => void + heartbeat: (conn: StubWs) => void + pong: () => void +} { + // Structural cast only: intersecting with typeof wsTransport yields `never` + // because WebSocketTransport's `config` is private and conflicts with a public `config`. + type TransportWithHandlerConfig = { + config: { + handlers: { + open: () => void + close: () => void + heartbeat: (conn: StubWs) => void + pong: () => void + } + } + } + return (wsTransport as unknown as TransportWithHandlerConfig).config.handlers +} + +describe('six stock websocket ping / pong watchdog', () => { + const handlers = getWsHandlers() + + beforeEach(() => { + jest.useFakeTimers() + handlers.open() + }) + + afterEach(() => { + handlers.close() + jest.useRealTimers() + }) + + it('closes when no pong arrives before PONG_TIMEOUT', () => { + const conn: StubWs = { + readyState: 1, + ping: jest.fn(), + close: jest.fn(), + } + handlers.heartbeat(conn) + + expect(conn.ping).toHaveBeenCalledTimes(1) + + jest.advanceTimersByTime(1999) + expect(conn.close).not.toHaveBeenCalled() + + jest.advanceTimersByTime(1) + expect(conn.close).toHaveBeenCalledWith( + 1006, + 'The connection appears to be active but stopped receiving updates', + ) + }) + + it('does not close when pong clears the watchdog before timeout', () => { + const conn: StubWs = { + readyState: 1, + ping: jest.fn(), + close: jest.fn(), + } + + handlers.heartbeat(conn) + handlers.pong() + jest.advanceTimersByTime(60_000) + + expect(conn.close).not.toHaveBeenCalled() + }) + + it('closes when heartbeat runs again before pong (interval shorter than pong wait)', () => { + const conn: StubWs = { + readyState: 1, + ping: jest.fn(), + close: jest.fn(), + } + + handlers.heartbeat(conn) + handlers.heartbeat(conn) + + expect(conn.close).toHaveBeenCalledWith( + 1006, + 'Heartbeat frequency exceeded 2000ms, increase WS_HEARTBEAT_INTERVAL_MS in environment variable', + ) + + jest.advanceTimersByTime(60_000) + expect(conn.close).toHaveBeenCalledTimes(1) + }) +}) diff --git a/yarn.lock b/yarn.lock index ae9bec5589..c214ab2048 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5054,6 +5054,7 @@ __metadata: resolution: "@chainlink/six-adapter@workspace:packages/sources/six" dependencies: "@chainlink/external-adapter-framework": "npm:2.14.2" + "@sinonjs/fake-timers": "npm:15.4.0" "@types/jest": "npm:^29.5.14" "@types/node": "npm:22.14.1" nock: "npm:13.5.6" @@ -8509,6 +8510,15 @@ __metadata: languageName: node linkType: hard +"@sinonjs/fake-timers@npm:15.4.0": + version: 15.4.0 + resolution: "@sinonjs/fake-timers@npm:15.4.0" + dependencies: + "@sinonjs/commons": "npm:^3.0.1" + checksum: 10/3960a9fe065f38a4228c66d184eeb101e8a6af9cbfc8454dd5d45ac397201da72134048d4e808a25993494885b172dd6deecdad9949bbf4c1d3a220ef561f6cc + languageName: node + linkType: hard + "@sinonjs/fake-timers@npm:9.1.2, @sinonjs/fake-timers@npm:^9.1.2": version: 9.1.2 resolution: "@sinonjs/fake-timers@npm:9.1.2"