0

Nick: clusters

This commit is contained in:
Nicolas 2024-06-12 17:53:04 -07:00
parent bad0f57134
commit 67dc46b454
4 changed files with 208 additions and 158 deletions

View File

@ -1,5 +1,4 @@
import request from "supertest"; import request from "supertest";
import { app } from "../../index";
import dotenv from "dotenv"; import dotenv from "dotenv";
const fs = require("fs"); const fs = require("fs");
const path = require("path"); const path = require("path");

View File

@ -1,5 +1,4 @@
import request from "supertest"; import request from "supertest";
import { app } from "../../index";
import dotenv from "dotenv"; import dotenv from "dotenv";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
@ -35,7 +34,7 @@ describe("E2E Tests for API Routes", () => {
describe("POST /v0/scrape", () => { describe("POST /v0/scrape", () => {
it.concurrent("should require authorization", async () => { it.concurrent("should require authorization", async () => {
const response = await request(app).post("/v0/scrape"); const response = await request(TEST_URL).post("/v0/scrape");
expect(response.statusCode).toBe(401); expect(response.statusCode).toBe(401);
}); });

View File

@ -5,190 +5,215 @@ import "dotenv/config";
import { getWebScraperQueue } from "./services/queue-service"; import { getWebScraperQueue } from "./services/queue-service";
import { redisClient } from "./services/rate-limiter"; import { redisClient } from "./services/rate-limiter";
import { v0Router } from "./routes/v0"; import { v0Router } from "./routes/v0";
import { initSDK } from '@hyperdx/node-opentelemetry'; import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster";
import os from "os";
const { createBullBoard } = require("@bull-board/api"); const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter"); const { BullAdapter } = require("@bull-board/api/bullAdapter");
const { ExpressAdapter } = require("@bull-board/express"); const { ExpressAdapter } = require("@bull-board/express");
export const app = express(); const numCPUs = os.cpus().length;
console.log(`Number of CPUs: ${numCPUs} available`);
global.isProduction = process.env.IS_PRODUCTION === "true"; if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
app.use(bodyParser.urlencoded({ extended: true })); // Fork workers.
app.use(bodyParser.json({ limit: "10mb" })); for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
app.use(cors()); // Add this line to enable CORS cluster.on("exit", (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} exited`);
const serverAdapter = new ExpressAdapter(); console.log("Starting a new worker");
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); cluster.fork();
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getWebScraperQueue())],
serverAdapter: serverAdapter,
});
app.use(
`/admin/${process.env.BULL_AUTH_KEY}/queues`,
serverAdapter.getRouter()
);
app.get("/", (req, res) => {
res.send("SCRAPERS-JS: Hello, world! Fly.io");
});
//write a simple test function
app.get("/test", async (req, res) => {
res.send("Hello, world!");
});
// register router
app.use(v0Router);
const DEFAULT_PORT = process.env.PORT ?? 3002;
const HOST = process.env.HOST ?? "localhost";
redisClient.connect();
// HyperDX OpenTelemetry
if(process.env.ENV === 'production') {
initSDK({ consoleCapture: true, additionalInstrumentations: []});
}
export function startServer(port = DEFAULT_PORT) {
const server = app.listen(Number(port), HOST, () => {
console.log(`Server listening on port ${port}`);
console.log(
`For the UI, open http://${HOST}:${port}/admin/${process.env.BULL_AUTH_KEY}/queues`
);
console.log("");
console.log("1. Make sure Redis is running on port 6379 by default");
console.log(
"2. If you want to run nango, make sure you do port forwarding in 3002 using ngrok http 3002 "
);
}); });
return server; } else {
} const app = express();
if (require.main === module) { global.isProduction = process.env.IS_PRODUCTION === "true";
startServer();
}
// Use this as a "health check" that way we dont destroy the server app.use(bodyParser.urlencoded({ extended: true }));
app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => { app.use(bodyParser.json({ limit: "10mb" }));
try {
const webScraperQueue = getWebScraperQueue();
const [webScraperActive] = await Promise.all([
webScraperQueue.getActiveCount(),
]);
const noActiveJobs = webScraperActive === 0; app.use(cors()); // Add this line to enable CORS
// 200 if no active jobs, 503 if there are active jobs
return res.status(noActiveJobs ? 200 : 500).json({ const serverAdapter = new ExpressAdapter();
webScraperActive, serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
noActiveJobs,
}); const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
} catch (error) { queues: [new BullAdapter(getWebScraperQueue())],
console.error(error); serverAdapter: serverAdapter,
return res.status(500).json({ error: error.message }); });
app.use(
`/admin/${process.env.BULL_AUTH_KEY}/queues`,
serverAdapter.getRouter()
);
app.get("/", (req, res) => {
res.send("SCRAPERS-JS: Hello, world! Fly.io");
});
//write a simple test function
app.get("/test", async (req, res) => {
res.send("Hello, world!");
});
// register router
app.use(v0Router);
const DEFAULT_PORT = process.env.PORT ?? 3002;
const HOST = process.env.HOST ?? "localhost";
redisClient.connect();
// HyperDX OpenTelemetry
if (process.env.ENV === "production") {
initSDK({ consoleCapture: true, additionalInstrumentations: [] });
} }
});
app.get(`/serverHealthCheck`, async (req, res) => { function startServer(port = DEFAULT_PORT) {
try { const server = app.listen(Number(port), HOST, () => {
const webScraperQueue = getWebScraperQueue(); console.log(`Worker ${process.pid} listening on port ${port}`);
const [waitingJobs] = await Promise.all([ console.log(
webScraperQueue.getWaitingCount(), `For the UI, open http://${HOST}:${port}/admin/${process.env.BULL_AUTH_KEY}/queues`
]); );
console.log("");
const noWaitingJobs = waitingJobs === 0; console.log("1. Make sure Redis is running on port 6379 by default");
// 200 if no active jobs, 503 if there are active jobs console.log(
return res.status(noWaitingJobs ? 200 : 500).json({ "2. If you want to run nango, make sure you do port forwarding in 3002 using ngrok http 3002 "
waitingJobs, );
}); });
} catch (error) { return server;
console.error(error);
return res.status(500).json({ error: error.message });
} }
});
app.get('/serverHealthCheck/notify', async (req, res) => { if (require.main === module) {
if (process.env.SLACK_WEBHOOK_URL) { startServer();
const treshold = 1; // The treshold value for the active jobs }
const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds
const getWaitingJobsCount = async () => { // Use this as a "health check" that way we dont destroy the server
app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => {
try {
const webScraperQueue = getWebScraperQueue(); const webScraperQueue = getWebScraperQueue();
const [waitingJobsCount] = await Promise.all([ const [webScraperActive] = await Promise.all([
webScraperQueue.getActiveCount(),
]);
const noActiveJobs = webScraperActive === 0;
// 200 if no active jobs, 503 if there are active jobs
return res.status(noActiveJobs ? 200 : 500).json({
webScraperActive,
noActiveJobs,
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
app.get(`/serverHealthCheck`, async (req, res) => {
try {
const webScraperQueue = getWebScraperQueue();
const [waitingJobs] = await Promise.all([
webScraperQueue.getWaitingCount(), webScraperQueue.getWaitingCount(),
]); ]);
return waitingJobsCount; const noWaitingJobs = waitingJobs === 0;
}; // 200 if no active jobs, 503 if there are active jobs
return res.status(noWaitingJobs ? 200 : 500).json({
waitingJobs,
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
res.status(200).json({ message: "Check initiated" }); app.get("/serverHealthCheck/notify", async (req, res) => {
if (process.env.SLACK_WEBHOOK_URL) {
const treshold = 1; // The treshold value for the active jobs
const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds
const checkWaitingJobs = async () => { const getWaitingJobsCount = async () => {
try { const webScraperQueue = getWebScraperQueue();
let waitingJobsCount = await getWaitingJobsCount(); const [waitingJobsCount] = await Promise.all([
if (waitingJobsCount >= treshold) { webScraperQueue.getWaitingCount(),
setTimeout(async () => { ]);
// Re-check the waiting jobs count after the timeout
waitingJobsCount = await getWaitingJobsCount();
if (waitingJobsCount >= treshold) {
const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL;
const message = {
text: `⚠️ Warning: The number of active jobs (${waitingJobsCount}) has exceeded the threshold (${treshold}) for more than ${timeout/60000} minute(s).`,
};
const response = await fetch(slackWebhookUrl, { return waitingJobsCount;
method: 'POST', };
headers: {
'Content-Type': 'application/json', res.status(200).json({ message: "Check initiated" });
},
body: JSON.stringify(message), const checkWaitingJobs = async () => {
}) try {
let waitingJobsCount = await getWaitingJobsCount();
if (!response.ok) { if (waitingJobsCount >= treshold) {
console.error('Failed to send Slack notification') setTimeout(async () => {
// Re-check the waiting jobs count after the timeout
waitingJobsCount = await getWaitingJobsCount();
if (waitingJobsCount >= treshold) {
const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL;
const message = {
text: `⚠️ Warning: The number of active jobs (${waitingJobsCount}) has exceeded the threshold (${treshold}) for more than ${
timeout / 60000
} minute(s).`,
};
const response = await fetch(slackWebhookUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(message),
});
if (!response.ok) {
console.error("Failed to send Slack notification");
}
} }
} }, timeout);
}, timeout); }
} catch (error) {
console.error(error);
} }
} catch (error) { };
console.error(error);
}
};
checkWaitingJobs(); checkWaitingJobs();
} }
}); });
app.get(`/admin/${process.env.BULL_AUTH_KEY}/clean-before-24h-complete-jobs`, async (req, res) => { app.get(
try { `/admin/${process.env.BULL_AUTH_KEY}/clean-before-24h-complete-jobs`,
const webScraperQueue = getWebScraperQueue(); async (req, res) => {
const completedJobs = await webScraperQueue.getJobs(['completed']);
const before24hJobs = completedJobs.filter(job => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000);
const jobIds = before24hJobs.map(job => job.id) as string[];
let count = 0;
for (const jobId of jobIds) {
try { try {
await webScraperQueue.removeJobs(jobId); const webScraperQueue = getWebScraperQueue();
count++; const completedJobs = await webScraperQueue.getJobs(["completed"]);
} catch (jobError) { const before24hJobs = completedJobs.filter(
console.error(`Failed to remove job with ID ${jobId}:`, jobError); (job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000
);
const jobIds = before24hJobs.map((job) => job.id) as string[];
let count = 0;
for (const jobId of jobIds) {
try {
await webScraperQueue.removeJobs(jobId);
count++;
} catch (jobError) {
console.error(`Failed to remove job with ID ${jobId}:`, jobError);
}
}
res.status(200).send(`Removed ${count} completed jobs.`);
} catch (error) {
console.error("Failed to clean last 24h complete jobs:", error);
res.status(500).send("Failed to clean jobs");
} }
} }
res.status(200).send(`Removed ${count} completed jobs.`); );
} catch (error) {
console.error('Failed to clean last 24h complete jobs:', error);
res.status(500).send('Failed to clean jobs');
}
});
app.get("/is-production", (req, res) => { app.get("/is-production", (req, res) => {
res.send({ isProduction: global.isProduction }); res.send({ isProduction: global.isProduction });
}); });
console.log(`Worker ${process.pid} started`);
// /workers health check, cant act as load balancer, just has to be a pre deploy thing }

View File

@ -1,8 +1,35 @@
import Redis from 'ioredis'; import Redis from "ioredis";
// Initialize Redis client // Initialize Redis client
const redis = new Redis(process.env.REDIS_URL); const redis = new Redis(process.env.REDIS_URL);
// Listen to 'error' events to the Redis connection
redis.on("error", (error) => {
try {
if (error.message === "ECONNRESET") {
console.log("Connection to Redis Session Store timed out.");
} else if (error.message === "ECONNREFUSED") {
console.log("Connection to Redis Session Store refused!");
} else console.log(error);
} catch (error) {}
});
// Listen to 'reconnecting' event to Redis
redis.on("reconnecting", (err) => {
try {
if (redis.status === "reconnecting")
console.log("Reconnecting to Redis Session Store...");
else console.log("Error reconnecting to Redis Session Store.");
} catch (error) {}
});
// Listen to the 'connect' event to Redis
redis.on("connect", (err) => {
try {
if (!err) console.log("Connected to Redis Session Store!");
} catch (error) {}
});
/** /**
* Set a value in Redis with an optional expiration time. * Set a value in Redis with an optional expiration time.
* @param {string} key The key under which to store the value. * @param {string} key The key under which to store the value.
@ -11,7 +38,7 @@ const redis = new Redis(process.env.REDIS_URL);
*/ */
const setValue = async (key: string, value: string, expire?: number) => { const setValue = async (key: string, value: string, expire?: number) => {
if (expire) { if (expire) {
await redis.set(key, value, 'EX', expire); await redis.set(key, value, "EX", expire);
} else { } else {
await redis.set(key, value); await redis.set(key, value);
} }