diff --git a/package.json b/package.json index 8aa4dc1..1761b20 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,8 @@ "request-ip": "^3.3.0", "resend": "^4.2.0", "response-time": "^2.3.3", - "ts-node": "^10.9.2" + "ts-node": "^10.9.2", + "ws": "^8.18.0" }, "devDependencies": { "@types/react": "^19.0.12", @@ -60,6 +61,7 @@ "@types/node-statsd": "^0.1.6", "@types/request-ip": "^0.0.41", "@types/response-time": "^2.3.8", + "@types/ws": "^8.5.13", "nodemon": "^3.1.9", "typescript": "^5.7.2" }, diff --git a/src/admin-routes/router.ts b/src/admin-routes/router.ts index 858f683..27d3744 100644 --- a/src/admin-routes/router.ts +++ b/src/admin-routes/router.ts @@ -14,6 +14,7 @@ import { logRequest } from "src/middleware/logRequest"; import { db } from "src/utils/db"; import { authenticateToken, getUserInfo } from "src/utils/jwt"; import domainRouter from "./routes/domain"; +import sinkingYachtsRouter from "./routes/sinking-yachts"; import userRouter from "./routes/user"; const router = express.Router(); router.use(express.json()); @@ -297,6 +298,7 @@ router.get("/metrics", logRequest, async (req, res) => { }); router.use("/domain", logRequest, domainRouter); +router.use("/sinking-yachts", logRequest, sinkingYachtsRouter); router.use("/user", userRouter); export default router; diff --git a/src/admin-routes/routes/sinking-yachts.ts b/src/admin-routes/routes/sinking-yachts.ts new file mode 100644 index 0000000..a3527b2 --- /dev/null +++ b/src/admin-routes/routes/sinking-yachts.ts @@ -0,0 +1,123 @@ +import express, { Request, Response } from "express"; +import { sinkingYahtsService } from "src/services/_index"; + +const router = express.Router(); + +/** + * POST /admin/sinking-yachts/start-feed + * @summary Starts the SinkingYachts realtime feed monitoring + * @tags SinkingYachts - Feed Management + * @security BearerAuth + * @param {object} request.body + * @param {boolean} request.body.skipBulkImport - Skip the initial bulk import (optional) + * @return {object} 200 - Success response + * @return {object} 500 - Error response + */ +router.post("/start-feed", async (req: Request, res: Response) => { + try { + const { skipBulkImport = false } = req.body; + + await sinkingYahtsService.startFeedMonitoring(skipBulkImport); + + return res.status(200).json({ + success: true, + message: "SinkingYachts feed monitoring started successfully", + skipBulkImport + }); + } catch (error: any) { + return res.status(500).json({ + success: false, + message: "Failed to start SinkingYachts feed monitoring", + error: error.message + }); + } +}); + +/** + * POST /admin/sinking-yachts/stop-feed + * @summary Stops the SinkingYachts realtime feed monitoring + * @tags SinkingYachts - Feed Management + * @security BearerAuth + * @return {object} 200 - Success response + */ +router.post("/stop-feed", async (req: Request, res: Response) => { + try { + sinkingYahtsService.stopRealtimeFeed(); + + return res.status(200).json({ + success: true, + message: "SinkingYachts feed monitoring stopped successfully" + }); + } catch (error: any) { + return res.status(500).json({ + success: false, + message: "Failed to stop SinkingYachts feed monitoring", + error: error.message + }); + } +}); + +/** + * POST /admin/sinking-yachts/bulk-import + * @summary Manually triggers a bulk import of all domains from SinkingYachts + * @tags SinkingYachts - Feed Management + * @security BearerAuth + * @return {object} 200 - Success response + * @return {object} 500 - Error response + */ +router.post("/bulk-import", async (req: Request, res: Response) => { + try { + await sinkingYahtsService.initializeBulkImport(); + + return res.status(200).json({ + success: true, + message: "SinkingYachts bulk import completed successfully" + }); + } catch (error: any) { + return res.status(500).json({ + success: false, + message: "Failed to complete SinkingYachts bulk import", + error: error.message + }); + } +}); + +/** + * GET /admin/sinking-yachts/recent + * @summary Fetches recent domains from SinkingYachts API + * @tags SinkingYachts - Feed Management + * @security BearerAuth + * @param {string} since.query.required - ISO date string for filtering recent domains + * @return {object} 200 - Array of recent domains + * @return {object} 400 - Bad request (missing since parameter) + * @return {object} 500 - Error response + */ +router.get("/recent", async (req: Request, res: Response) => { + try { + const { since } = req.query; + + if (!since || typeof since !== "string") { + return res.status(400).json({ + success: false, + message: "Missing or invalid 'since' query parameter (ISO date string required)" + }); + } + + const recentDomains = await sinkingYahtsService.getRecentDomains(since); + + return res.status(200).json({ + success: true, + data: recentDomains, + count: recentDomains.length, + since + }); + } catch (error: any) { + return res.status(500).json({ + success: false, + message: "Failed to fetch recent domains", + error: error.message + }); + } +}); + +export default router; \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index ae10e28..906f650 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,6 +11,7 @@ import { server } from "./server"; // import metrics from "./metrics"; import { swaggerOptions as adminSwagOptions } from "./admin-routes/swaggerOptions"; import { swaggerOptions as mainSwagOptions } from "./swaggerOptions"; +import { sinkingYahtsService } from "./services/_index"; import * as logger from "./utils/logger"; dotenv.config(); @@ -26,6 +27,21 @@ try { } const db = drizzle(process.env.DATABASE_URL); logger.database("Database connection initialized successfully"); + + // Initialize SinkingYachts realtime feed monitoring + if (process.env.ENABLE_SINKING_YACHTS_FEED !== "false") { + logger.info("Starting SinkingYachts realtime feed monitoring..."); + sinkingYahtsService.startFeedMonitoring(process.env.SKIP_BULK_IMPORT === "true") + .then(() => { + logger.info("SinkingYachts feed monitoring started successfully"); + }) + .catch((error) => { + logger.error(`Failed to start SinkingYachts feed monitoring: ${error.message}`); + // Don't exit the process, let the server continue without feed monitoring + }); + } else { + logger.info("SinkingYachts feed monitoring disabled by ENABLE_SINKING_YACHTS_FEED=false"); + } } catch (error) { logger.error( `Failed to initialize database connection: ${error instanceof Error ? error.message : String(error)}` diff --git a/src/services/SinkingYahts.ts b/src/services/SinkingYahts.ts index 02ccba5..109df59 100644 --- a/src/services/SinkingYahts.ts +++ b/src/services/SinkingYahts.ts @@ -1,14 +1,34 @@ -import { rawAPIData } from "src/db/schema"; +import { rawAPIData, domains } from "src/db/schema"; import { headersWithSinkingYahts } from "src/defs/headers"; import { getDbDomain } from "src/func/db/domain"; import { axios } from "src/utils/axios"; import { db } from "src/utils/db"; import { sanitizeDomain } from "src/utils/sanitizeDomain"; +import { eq } from "drizzle-orm"; +import WebSocket from "ws"; +import { info, warn, error } from "src/utils/logger"; + +interface SinkingYachtsDomain { + domain: string; + date_added: string; + date_updated?: string; +} + +interface WebSocketMessage { + action: "add" | "remove" | "update"; + domain: string; + date: string; +} /** * A service that provides access to the SinkingYahts service for checking and reporting domains. */ export class SinkingYahtsService { + private wsConnection: WebSocket | null = null; + private reconnectAttempts = 0; + private maxReconnectAttempts = 5; + private reconnectDelay = 5000; // 5 seconds + domain = { /** * Asynchronously checks a given domain against the SinkingYahts service for any known bad domains. @@ -39,4 +59,295 @@ export class SinkingYahtsService { return data; }, }; + + /** + * Fetches all domains from the SinkingYachts API + */ + async getAllDomains(): Promise { + try { + const response = await axios.get( + "https://phish.sinking.yachts/v2/all", + { + headers: headersWithSinkingYahts, + } + ); + + info(`SinkingYachts: Fetched ${response.data.length} domains from /v2/all`); + return response.data; + } catch (err: any) { + error(`SinkingYachts: Failed to fetch all domains: ${err.message}`); + throw err; + } + } + + /** + * Fetches recent domains from the SinkingYachts API since a given date + * @param since - ISO date string for filtering recent domains + */ + async getRecentDomains(since: string): Promise { + try { + const response = await axios.get( + `https://phish.sinking.yachts/v2/recent/${since}`, + { + headers: headersWithSinkingYahts, + } + ); + + info(`SinkingYachts: Fetched ${response.data.length} recent domains since ${since}`); + return response.data; + } catch (err: any) { + error(`SinkingYachts: Failed to fetch recent domains: ${err.message}`); + throw err; + } + } + + /** + * Processes domain data and stores it in the database + */ + private async processDomainData(domainData: SinkingYachtsDomain[], isUpdate = false): Promise { + for (const item of domainData) { + try { + const sanitizedDomain = sanitizeDomain(item.domain); + let dbDomain = await getDbDomain(sanitizedDomain); + + if (!dbDomain) { + const [insertedDomain] = await db.insert(domains).values({ + domain: sanitizedDomain, + malicious: true, + }).returning(); + dbDomain = insertedDomain; + info(`SinkingYachts: Added new malicious domain: ${sanitizedDomain}`); + } else if (isUpdate) { + await db.update(domains) + .set({ + malicious: true, + last_checked: new Date(), + updated_at: new Date() + }) + .where(eq(domains.id, dbDomain.id)); + info(`SinkingYachts: Updated domain: ${sanitizedDomain}`); + } + + await db.insert(rawAPIData).values({ + sourceAPI: "SinkingYachts", + domain: dbDomain.id, + data: item, + }); + + } catch (err: any) { + warn(`SinkingYachts: Failed to process domain ${item.domain}: ${err.message}`); + } + } + } + + /** + * Initializes the bulk import of all domains from SinkingYachts + */ + async initializeBulkImport(): Promise { + try { + info("SinkingYachts: Starting bulk import of all domains..."); + const allDomains = await this.getAllDomains(); + await this.processDomainData(allDomains, false); + info(`SinkingYachts: Bulk import completed. Processed ${allDomains.length} domains.`); + } catch (err: any) { + error(`SinkingYachts: Bulk import failed: ${err.message}`); + throw err; + } + } + + /** + * Starts the WebSocket connection for real-time updates + */ + startRealtimeFeed(): void { + if (this.wsConnection && this.wsConnection.readyState === WebSocket.OPEN) { + warn("SinkingYachts: WebSocket connection already active"); + return; + } + + try { + this.wsConnection = new WebSocket("wss://phish.sinking.yachts/feed", { + headers: { + "X-Identity": headersWithSinkingYahts["X-Identity"], + }, + }); + + this.wsConnection.on("open", () => { + info("SinkingYachts: WebSocket connection established"); + this.reconnectAttempts = 0; + }); + + this.wsConnection.on("message", async (data: Buffer) => { + try { + const message: WebSocketMessage = JSON.parse(data.toString()); + await this.handleWebSocketMessage(message); + } catch (err: any) { + warn(`SinkingYachts: Failed to parse WebSocket message: ${err.message}`); + } + }); + + this.wsConnection.on("close", (code: number, reason: Buffer) => { + warn(`SinkingYachts: WebSocket connection closed (${code}: ${reason.toString()})`); + this.attemptReconnect(); + }); + + this.wsConnection.on("error", (err: Error) => { + error(`SinkingYachts: WebSocket error: ${err.message}`); + }); + + } catch (err: any) { + error(`SinkingYachts: Failed to start WebSocket connection: ${err.message}`); + this.attemptReconnect(); + } + } + + /** + * Handles incoming WebSocket messages + */ + private async handleWebSocketMessage(message: WebSocketMessage): Promise { + try { + const sanitizedDomain = sanitizeDomain(message.domain); + + switch (message.action) { + case "add": + await this.handleDomainAdd(sanitizedDomain, message.date); + break; + case "remove": + await this.handleDomainRemove(sanitizedDomain, message.date); + break; + case "update": + await this.handleDomainUpdate(sanitizedDomain, message.date); + break; + default: + warn(`SinkingYachts: Unknown WebSocket action: ${message.action}`); + } + } catch (err: any) { + error(`SinkingYachts: Failed to handle WebSocket message: ${err.message}`); + } + } + + /** + * Handles domain addition from WebSocket feed + */ + private async handleDomainAdd(domain: string, date: string): Promise { + let dbDomain = await getDbDomain(domain); + + if (!dbDomain) { + const [insertedDomain] = await db.insert(domains).values({ + domain: domain, + malicious: true, + }).returning(); + dbDomain = insertedDomain; + } else { + await db.update(domains) + .set({ + malicious: true, + last_checked: new Date(), + updated_at: new Date() + }) + .where(eq(domains.id, dbDomain.id)); + } + + await db.insert(rawAPIData).values({ + sourceAPI: "SinkingYachts", + domain: dbDomain.id, + data: { domain, date_added: date, action: "add" }, + }); + + info(`SinkingYachts: Real-time add - ${domain}`); + } + + /** + * Handles domain removal from WebSocket feed + */ + private async handleDomainRemove(domain: string, date: string): Promise { + const dbDomain = await getDbDomain(domain); + + if (dbDomain) { + await db.update(domains) + .set({ + malicious: false, + last_checked: new Date(), + updated_at: new Date() + }) + .where(eq(domains.id, dbDomain.id)); + + await db.insert(rawAPIData).values({ + sourceAPI: "SinkingYachts", + domain: dbDomain.id, + data: { domain, date_removed: date, action: "remove" }, + }); + + info(`SinkingYachts: Real-time remove - ${domain}`); + } + } + + /** + * Handles domain update from WebSocket feed + */ + private async handleDomainUpdate(domain: string, date: string): Promise { + const dbDomain = await getDbDomain(domain); + + if (dbDomain) { + await db.update(domains) + .set({ + last_checked: new Date(), + updated_at: new Date() + }) + .where(eq(domains.id, dbDomain.id)); + + await db.insert(rawAPIData).values({ + sourceAPI: "SinkingYachts", + domain: dbDomain.id, + data: { domain, date_updated: date, action: "update" }, + }); + + info(`SinkingYachts: Real-time update - ${domain}`); + } + } + + /** + * Attempts to reconnect the WebSocket connection + */ + private attemptReconnect(): void { + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + error("SinkingYachts: Max reconnection attempts reached. Giving up."); + return; + } + + this.reconnectAttempts++; + warn(`SinkingYachts: Attempting to reconnect WebSocket (${this.reconnectAttempts}/${this.maxReconnectAttempts}) in ${this.reconnectDelay}ms...`); + + setTimeout(() => { + this.startRealtimeFeed(); + }, this.reconnectDelay); + + this.reconnectDelay *= 2; // Exponential backoff + } + + /** + * Stops the WebSocket connection + */ + stopRealtimeFeed(): void { + if (this.wsConnection) { + this.wsConnection.close(); + this.wsConnection = null; + info("SinkingYachts: WebSocket connection stopped"); + } + } + + /** + * Starts the complete feed monitoring service (bulk import + realtime) + */ + async startFeedMonitoring(skipBulkImport = false): Promise { + try { + if (!skipBulkImport) { + await this.initializeBulkImport(); + } + this.startRealtimeFeed(); + info("SinkingYachts: Feed monitoring service started"); + } catch (err: any) { + error(`SinkingYachts: Failed to start feed monitoring: ${err.message}`); + throw err; + } + } }