diff --git a/apps/api/src/__tests__/e2e_noAuth/index.test.ts b/apps/api/src/__tests__/e2e_noAuth/index.test.ts index c443e71..acb2278 100644 --- a/apps/api/src/__tests__/e2e_noAuth/index.test.ts +++ b/apps/api/src/__tests__/e2e_noAuth/index.test.ts @@ -1,5 +1,4 @@ import request from "supertest"; -import { app } from "../../index"; import dotenv from "dotenv"; const fs = require("fs"); const path = require("path"); diff --git a/apps/api/src/__tests__/e2e_withAuth/index.test.ts b/apps/api/src/__tests__/e2e_withAuth/index.test.ts index 02e4a47..431c7d1 100644 --- a/apps/api/src/__tests__/e2e_withAuth/index.test.ts +++ b/apps/api/src/__tests__/e2e_withAuth/index.test.ts @@ -1,5 +1,4 @@ import request from "supertest"; -import { app } from "../../index"; import dotenv from "dotenv"; import { v4 as uuidv4 } from "uuid"; @@ -35,7 +34,7 @@ describe("E2E Tests for API Routes", () => { describe("POST /v0/scrape", () => { it.concurrent("should require authorization", async () => { - const response = await request(app).post("/v0/scrape"); + const response = await request(TEST_URL).post("/v0/scrape"); expect(response.statusCode).toBe(401); }); diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index cc8376b..6b62f06 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -5,190 +5,215 @@ import "dotenv/config"; import { getWebScraperQueue } from "./services/queue-service"; import { redisClient } from "./services/rate-limiter"; import { v0Router } from "./routes/v0"; -import { initSDK } from '@hyperdx/node-opentelemetry'; +import { initSDK } from "@hyperdx/node-opentelemetry"; +import cluster from "cluster"; +import os from "os"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); const { ExpressAdapter } = require("@bull-board/express"); -export const app = express(); +const numCPUs = os.cpus().length; +console.log(`Number of CPUs: ${numCPUs} available`); -global.isProduction = process.env.IS_PRODUCTION === "true"; +if (cluster.isMaster) { + console.log(`Master ${process.pid} is running`); -app.use(bodyParser.urlencoded({ extended: true })); -app.use(bodyParser.json({ limit: "10mb" })); + // Fork workers. + for (let i = 0; i < numCPUs; i++) { + cluster.fork(); + } -app.use(cors()); // Add this line to enable CORS - -const serverAdapter = new ExpressAdapter(); -serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); - -const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ - queues: [new BullAdapter(getWebScraperQueue())], - serverAdapter: serverAdapter, -}); - -app.use( - `/admin/${process.env.BULL_AUTH_KEY}/queues`, - serverAdapter.getRouter() -); - -app.get("/", (req, res) => { - res.send("SCRAPERS-JS: Hello, world! Fly.io"); -}); - -//write a simple test function -app.get("/test", async (req, res) => { - res.send("Hello, world!"); -}); - -// register router -app.use(v0Router); - -const DEFAULT_PORT = process.env.PORT ?? 3002; -const HOST = process.env.HOST ?? "localhost"; -redisClient.connect(); - -// HyperDX OpenTelemetry -if(process.env.ENV === 'production') { - initSDK({ consoleCapture: true, additionalInstrumentations: []}); -} - - -export function startServer(port = DEFAULT_PORT) { - const server = app.listen(Number(port), HOST, () => { - console.log(`Server listening on port ${port}`); - console.log( - `For the UI, open http://${HOST}:${port}/admin/${process.env.BULL_AUTH_KEY}/queues` - ); - console.log(""); - console.log("1. Make sure Redis is running on port 6379 by default"); - console.log( - "2. If you want to run nango, make sure you do port forwarding in 3002 using ngrok http 3002 " - ); + cluster.on("exit", (worker, code, signal) => { + console.log(`Worker ${worker.process.pid} exited`); + console.log("Starting a new worker"); + cluster.fork(); }); - return server; -} +} else { + const app = express(); -if (require.main === module) { - startServer(); -} + global.isProduction = process.env.IS_PRODUCTION === "true"; -// Use this as a "health check" that way we dont destroy the server -app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => { - try { - const webScraperQueue = getWebScraperQueue(); - const [webScraperActive] = await Promise.all([ - webScraperQueue.getActiveCount(), - ]); + app.use(bodyParser.urlencoded({ extended: true })); + app.use(bodyParser.json({ limit: "10mb" })); - const noActiveJobs = webScraperActive === 0; - // 200 if no active jobs, 503 if there are active jobs - return res.status(noActiveJobs ? 200 : 500).json({ - webScraperActive, - noActiveJobs, - }); - } catch (error) { - console.error(error); - return res.status(500).json({ error: error.message }); + app.use(cors()); // Add this line to enable CORS + + const serverAdapter = new ExpressAdapter(); + serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); + + const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ + queues: [new BullAdapter(getWebScraperQueue())], + serverAdapter: serverAdapter, + }); + + app.use( + `/admin/${process.env.BULL_AUTH_KEY}/queues`, + serverAdapter.getRouter() + ); + + app.get("/", (req, res) => { + res.send("SCRAPERS-JS: Hello, world! Fly.io"); + }); + + //write a simple test function + app.get("/test", async (req, res) => { + res.send("Hello, world!"); + }); + + // register router + app.use(v0Router); + + const DEFAULT_PORT = process.env.PORT ?? 3002; + const HOST = process.env.HOST ?? "localhost"; + redisClient.connect(); + + // HyperDX OpenTelemetry + if (process.env.ENV === "production") { + initSDK({ consoleCapture: true, additionalInstrumentations: [] }); } -}); -app.get(`/serverHealthCheck`, async (req, res) => { - try { - const webScraperQueue = getWebScraperQueue(); - const [waitingJobs] = await Promise.all([ - webScraperQueue.getWaitingCount(), - ]); - - const noWaitingJobs = waitingJobs === 0; - // 200 if no active jobs, 503 if there are active jobs - return res.status(noWaitingJobs ? 200 : 500).json({ - waitingJobs, + function startServer(port = DEFAULT_PORT) { + const server = app.listen(Number(port), HOST, () => { + console.log(`Worker ${process.pid} listening on port ${port}`); + console.log( + `For the UI, open http://${HOST}:${port}/admin/${process.env.BULL_AUTH_KEY}/queues` + ); + console.log(""); + console.log("1. Make sure Redis is running on port 6379 by default"); + console.log( + "2. If you want to run nango, make sure you do port forwarding in 3002 using ngrok http 3002 " + ); }); - } catch (error) { - console.error(error); - return res.status(500).json({ error: error.message }); + return server; } -}); -app.get('/serverHealthCheck/notify', async (req, res) => { - if (process.env.SLACK_WEBHOOK_URL) { - const treshold = 1; // The treshold value for the active jobs - const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds + if (require.main === module) { + startServer(); + } - const getWaitingJobsCount = async () => { + // Use this as a "health check" that way we dont destroy the server + app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => { + try { const webScraperQueue = getWebScraperQueue(); - const [waitingJobsCount] = await Promise.all([ + const [webScraperActive] = await Promise.all([ + webScraperQueue.getActiveCount(), + ]); + + const noActiveJobs = webScraperActive === 0; + // 200 if no active jobs, 503 if there are active jobs + return res.status(noActiveJobs ? 200 : 500).json({ + webScraperActive, + noActiveJobs, + }); + } catch (error) { + console.error(error); + return res.status(500).json({ error: error.message }); + } + }); + + app.get(`/serverHealthCheck`, async (req, res) => { + try { + const webScraperQueue = getWebScraperQueue(); + const [waitingJobs] = await Promise.all([ webScraperQueue.getWaitingCount(), ]); - return waitingJobsCount; - }; + const noWaitingJobs = waitingJobs === 0; + // 200 if no active jobs, 503 if there are active jobs + return res.status(noWaitingJobs ? 200 : 500).json({ + waitingJobs, + }); + } catch (error) { + console.error(error); + return res.status(500).json({ error: error.message }); + } + }); - res.status(200).json({ message: "Check initiated" }); + app.get("/serverHealthCheck/notify", async (req, res) => { + if (process.env.SLACK_WEBHOOK_URL) { + const treshold = 1; // The treshold value for the active jobs + const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds - const checkWaitingJobs = async () => { - try { - let waitingJobsCount = await getWaitingJobsCount(); - if (waitingJobsCount >= treshold) { - setTimeout(async () => { - // Re-check the waiting jobs count after the timeout - waitingJobsCount = await getWaitingJobsCount(); - if (waitingJobsCount >= treshold) { - const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL; - const message = { - text: `⚠️ Warning: The number of active jobs (${waitingJobsCount}) has exceeded the threshold (${treshold}) for more than ${timeout/60000} minute(s).`, - }; + const getWaitingJobsCount = async () => { + const webScraperQueue = getWebScraperQueue(); + const [waitingJobsCount] = await Promise.all([ + webScraperQueue.getWaitingCount(), + ]); - const response = await fetch(slackWebhookUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(message), - }) - - if (!response.ok) { - console.error('Failed to send Slack notification') + return waitingJobsCount; + }; + + res.status(200).json({ message: "Check initiated" }); + + const checkWaitingJobs = async () => { + try { + let waitingJobsCount = await getWaitingJobsCount(); + if (waitingJobsCount >= treshold) { + setTimeout(async () => { + // Re-check the waiting jobs count after the timeout + waitingJobsCount = await getWaitingJobsCount(); + if (waitingJobsCount >= treshold) { + const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL; + const message = { + text: `⚠️ Warning: The number of active jobs (${waitingJobsCount}) has exceeded the threshold (${treshold}) for more than ${ + timeout / 60000 + } minute(s).`, + }; + + const response = await fetch(slackWebhookUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(message), + }); + + if (!response.ok) { + console.error("Failed to send Slack notification"); + } } - } - }, timeout); + }, timeout); + } + } catch (error) { + console.error(error); } - } catch (error) { - console.error(error); - } - }; + }; - checkWaitingJobs(); - } -}); + checkWaitingJobs(); + } + }); -app.get(`/admin/${process.env.BULL_AUTH_KEY}/clean-before-24h-complete-jobs`, async (req, res) => { - try { - const webScraperQueue = getWebScraperQueue(); - const completedJobs = await webScraperQueue.getJobs(['completed']); - const before24hJobs = completedJobs.filter(job => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000); - const jobIds = before24hJobs.map(job => job.id) as string[]; - let count = 0; - for (const jobId of jobIds) { + app.get( + `/admin/${process.env.BULL_AUTH_KEY}/clean-before-24h-complete-jobs`, + async (req, res) => { try { - await webScraperQueue.removeJobs(jobId); - count++; - } catch (jobError) { - console.error(`Failed to remove job with ID ${jobId}:`, jobError); + const webScraperQueue = getWebScraperQueue(); + const completedJobs = await webScraperQueue.getJobs(["completed"]); + const before24hJobs = completedJobs.filter( + (job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000 + ); + const jobIds = before24hJobs.map((job) => job.id) as string[]; + let count = 0; + for (const jobId of jobIds) { + try { + await webScraperQueue.removeJobs(jobId); + count++; + } catch (jobError) { + console.error(`Failed to remove job with ID ${jobId}:`, jobError); + } + } + res.status(200).send(`Removed ${count} completed jobs.`); + } catch (error) { + console.error("Failed to clean last 24h complete jobs:", error); + res.status(500).send("Failed to clean jobs"); } } - res.status(200).send(`Removed ${count} completed jobs.`); - } catch (error) { - console.error('Failed to clean last 24h complete jobs:', error); - res.status(500).send('Failed to clean jobs'); - } -}); + ); -app.get("/is-production", (req, res) => { - res.send({ isProduction: global.isProduction }); -}); + app.get("/is-production", (req, res) => { + res.send({ isProduction: global.isProduction }); + }); - -// /workers health check, cant act as load balancer, just has to be a pre deploy thing \ No newline at end of file + console.log(`Worker ${process.pid} started`); +} diff --git a/apps/api/src/services/redis.ts b/apps/api/src/services/redis.ts index f2cedd1..491eeb1 100644 --- a/apps/api/src/services/redis.ts +++ b/apps/api/src/services/redis.ts @@ -1,8 +1,35 @@ -import Redis from 'ioredis'; +import Redis from "ioredis"; // Initialize Redis client const redis = new Redis(process.env.REDIS_URL); +// Listen to 'error' events to the Redis connection +redis.on("error", (error) => { + try { + if (error.message === "ECONNRESET") { + console.log("Connection to Redis Session Store timed out."); + } else if (error.message === "ECONNREFUSED") { + console.log("Connection to Redis Session Store refused!"); + } else console.log(error); + } catch (error) {} +}); + +// Listen to 'reconnecting' event to Redis +redis.on("reconnecting", (err) => { + try { + if (redis.status === "reconnecting") + console.log("Reconnecting to Redis Session Store..."); + else console.log("Error reconnecting to Redis Session Store."); + } catch (error) {} +}); + +// Listen to the 'connect' event to Redis +redis.on("connect", (err) => { + try { + if (!err) console.log("Connected to Redis Session Store!"); + } catch (error) {} +}); + /** * Set a value in Redis with an optional expiration time. * @param {string} key The key under which to store the value. @@ -11,7 +38,7 @@ const redis = new Redis(process.env.REDIS_URL); */ const setValue = async (key: string, value: string, expire?: number) => { if (expire) { - await redis.set(key, value, 'EX', expire); + await redis.set(key, value, "EX", expire); } else { await redis.set(key, value); }