0
This commit is contained in:
Nicolas 2024-04-20 16:38:05 -07:00
parent d201a4e58d
commit 23b2190e5d
9 changed files with 369 additions and 264 deletions

View File

@ -2,4 +2,7 @@ module.exports = {
preset: "ts-jest",
testEnvironment: "node",
setupFiles: ["./jest.setup.js"],
// ignore dist folder root dir
modulePathIgnorePatterns: ["<rootDir>/dist/"],
};

View File

@ -0,0 +1,67 @@
import { parseApi } from "../../src/lib/parseApi";
import { getRateLimiter } from "../../src/services/rate-limiter";
import { RateLimiterMode } from "../../src/types";
import { supabase_service } from "../../src/services/supabase";
export async function authenticateUser(
req,
res,
mode?: RateLimiterMode
): Promise<{
success: boolean;
team_id?: string;
error?: string;
status?: number;
}> {
const authHeader = req.headers.authorization;
if (!authHeader) {
return { success: false, error: "Unauthorized", status: 401 };
}
const token = authHeader.split(" ")[1]; // Extract the token from "Bearer <token>"
if (!token) {
return {
success: false,
error: "Unauthorized: Token missing",
status: 401,
};
}
try {
const incomingIP = (req.headers["x-forwarded-for"] ||
req.socket.remoteAddress) as string;
const iptoken = incomingIP + token;
await getRateLimiter(
token === "this_is_just_a_preview_token" ? RateLimiterMode.Preview : mode
).consume(iptoken);
} catch (rateLimiterRes) {
console.error(rateLimiterRes);
return {
success: false,
error: "Rate limit exceeded. Too many requests, try again in 1 minute.",
status: 429,
};
}
if (
token === "this_is_just_a_preview_token" &&
(mode === RateLimiterMode.Scrape || mode === RateLimiterMode.Preview)
) {
return { success: true, team_id: "preview" };
}
const normalizedApi = parseApi(token);
// make sure api key is valid, based on the api_keys table in supabase
const { data, error } = await supabase_service
.from("api_keys")
.select("*")
.eq("key", normalizedApi);
if (error || !data || data.length === 0) {
return {
success: false,
error: "Unauthorized: Invalid token",
status: 401,
};
}
return { success: true, team_id: data[0].team_id };
}

View File

@ -0,0 +1,36 @@
import { Request, Response } from "express";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs";
import { getWebScraperQueue } from "../../src/services/queue-service";
export async function crawlStatusController(req: Request, res: Response) {
try {
const { success, team_id, error, status } = await authenticateUser(
req,
res,
RateLimiterMode.CrawlStatus
);
if (!success) {
return res.status(status).json({ error });
}
const job = await getWebScraperQueue().getJob(req.params.jobId);
if (!job) {
return res.status(404).json({ error: "Job not found" });
}
const { current, current_url, total, current_step } = await job.progress();
res.json({
status: await job.getState(),
// progress: job.progress(),
current: current,
current_url: current_url,
current_step: current_step,
total: total,
data: job.returnvalue,
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
}

View File

@ -0,0 +1,77 @@
import { Request, Response } from "express";
import { WebScraperDataProvider } from "../../src/scraper/WebScraper";
import { billTeam } from "../../src/services/billing/credit_billing";
import { checkTeamCredits } from "../../src/services/billing/credit_billing";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs";
export async function crawlController(req: Request, res: Response) {
try {
const { success, team_id, error, status } = await authenticateUser(
req,
res,
RateLimiterMode.Crawl
);
if (!success) {
return res.status(status).json({ error });
}
const { success: creditsCheckSuccess, message: creditsCheckMessage } =
await checkTeamCredits(team_id, 1);
if (!creditsCheckSuccess) {
return res.status(402).json({ error: "Insufficient credits" });
}
// authenticate on supabase
const url = req.body.url;
if (!url) {
return res.status(400).json({ error: "Url is required" });
}
const mode = req.body.mode ?? "crawl";
const crawlerOptions = req.body.crawlerOptions ?? {};
const pageOptions = req.body.pageOptions ?? { onlyMainContent: false };
if (mode === "single_urls" && !url.includes(",")) {
try {
const a = new WebScraperDataProvider();
await a.setOptions({
mode: "single_urls",
urls: [url],
crawlerOptions: {
returnOnlyUrls: true,
},
pageOptions: pageOptions,
});
const docs = await a.getDocuments(false, (progress) => {
job.progress({
current: progress.current,
total: progress.total,
current_step: "SCRAPING",
current_url: progress.currentDocumentUrl,
});
});
return res.json({
success: true,
documents: docs,
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
}
const job = await addWebScraperJob({
url: url,
mode: mode ?? "crawl", // fix for single urls not working
crawlerOptions: { ...crawlerOptions },
team_id: team_id,
pageOptions: pageOptions,
});
res.json({ jobId: job.id });
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
}

View File

@ -0,0 +1,37 @@
import { Request, Response } from "express";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs";
export async function crawlPreviewController(req: Request, res: Response) {
try {
const { success, team_id, error, status } = await authenticateUser(
req,
res,
RateLimiterMode.Preview
);
if (!success) {
return res.status(status).json({ error });
}
// authenticate on supabase
const url = req.body.url;
if (!url) {
return res.status(400).json({ error: "Url is required" });
}
const mode = req.body.mode ?? "crawl";
const crawlerOptions = req.body.crawlerOptions ?? {};
const pageOptions = req.body.pageOptions ?? { onlyMainContent: false };
const job = await addWebScraperJob({
url: url,
mode: mode ?? "crawl", // fix for single urls not working
crawlerOptions: { ...crawlerOptions, limit: 5, maxCrawledLinks: 5 },
team_id: "preview",
pageOptions: pageOptions,
});
res.json({ jobId: job.id });
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
}

View File

@ -0,0 +1,104 @@
import { Request, Response } from "express";
import { WebScraperDataProvider } from "../../src/scraper/WebScraper";
import { billTeam } from "../../src/services/billing/credit_billing";
import { checkTeamCredits } from "../../src/services/billing/credit_billing";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types";
import { logJob } from "../../src/services/logging/log_job";
import { Document } from "../../src/lib/entities";
export async function scrapeHelper(
req: Request,
team_id: string,
crawlerOptions: any,
pageOptions: any
) : Promise<{ success: boolean; error?: string; data?: Document }> {
const url = req.body.url;
if (!url) {
throw new Error("Url is required");
}
const a = new WebScraperDataProvider();
await a.setOptions({
mode: "single_urls",
urls: [url],
crawlerOptions: {
...crawlerOptions,
},
pageOptions: pageOptions,
});
const docs = await a.getDocuments(false);
// make sure doc.content is not empty
const filteredDocs = docs.filter(
(doc: { content?: string }) => doc.content && doc.content.trim().length > 0
);
if (filteredDocs.length === 0) {
return { success: true, error: "No pages found" };
}
const { success, credit_usage } = await billTeam(
team_id,
filteredDocs.length
);
if (!success) {
return {
success: false,
error: "Failed to bill team. Insufficient credits or subscription not found.",
};
}
return {
success: true,
data: filteredDocs[0],
};
}
export async function scrapeController(req: Request, res: Response) {
try {
// make sure to authenticate user first, Bearer <token>
const { success, team_id, error, status } = await authenticateUser(
req,
res,
RateLimiterMode.Scrape
);
if (!success) {
return res.status(status).json({ error });
}
const crawlerOptions = req.body.crawlerOptions ?? {};
const pageOptions = req.body.pageOptions ?? { onlyMainContent: false };
try {
const { success: creditsCheckSuccess, message: creditsCheckMessage } =
await checkTeamCredits(team_id, 1);
if (!creditsCheckSuccess) {
return res.status(402).json({ error: "Insufficient credits" });
}
} catch (error) {
console.error(error);
return res.status(500).json({ error: "Internal server error" });
}
const result = await scrapeHelper(
req,
team_id,
crawlerOptions,
pageOptions
);
logJob({
success: result.success,
message: result.error,
num_docs: result.data.length,
docs: result.data,
time_taken: 0,
team_id: team_id,
mode: "scrape",
url: req.body.url,
crawlerOptions: crawlerOptions,
pageOptions: pageOptions,
});
return res.json(result);
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
}

View File

@ -0,0 +1,25 @@
import { Request, Response } from "express";
import { getWebScraperQueue } from "../../src/services/queue-service";
export async function crawlJobStatusPreviewController(req: Request, res: Response) {
try {
const job = await getWebScraperQueue().getJob(req.params.jobId);
if (!job) {
return res.status(404).json({ error: "Job not found" });
}
const { current, current_url, total, current_step } = await job.progress();
res.json({
status: await job.getState(),
// progress: job.progress(),
current: current,
current_url: current_url,
current_step: current_step,
total: total,
data: job.returnvalue,
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
}

View File

@ -10,6 +10,7 @@ import { billTeam, checkTeamCredits } from "./services/billing/credit_billing";
import { getRateLimiter, redisClient } from "./services/rate-limiter";
import { parseApi } from "./lib/parseApi";
import { RateLimiterMode } from "./types";
import { v0Router } from "./routes/v0";
const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter");
@ -17,7 +18,6 @@ const { ExpressAdapter } = require("@bull-board/express");
export const app = express();
global.isProduction = process.env.IS_PRODUCTION === "true";
app.use(bodyParser.urlencoded({ extended: true }));
@ -47,267 +47,8 @@ app.get("/test", async (req, res) => {
res.send("Hello, world!");
});
async function authenticateUser(req, res, mode?: RateLimiterMode): Promise<{ success: boolean, team_id?: string, error?: string, status?: number }> {
const authHeader = req.headers.authorization;
if (!authHeader) {
return { success: false, error: "Unauthorized", status: 401 };
}
const token = authHeader.split(" ")[1]; // Extract the token from "Bearer <token>"
if (!token) {
return { success: false, error: "Unauthorized: Token missing", status: 401 };
}
try {
const incomingIP = (req.headers["x-forwarded-for"] ||
req.socket.remoteAddress) as string;
const iptoken = incomingIP + token;
await getRateLimiter((token === "this_is_just_a_preview_token") ? RateLimiterMode.Preview : mode
).consume(iptoken);
} catch (rateLimiterRes) {
console.error(rateLimiterRes);
return { success: false, error: "Rate limit exceeded. Too many requests, try again in 1 minute.", status: 429 };
}
if (token === "this_is_just_a_preview_token" && (mode === RateLimiterMode.Scrape || mode === RateLimiterMode.Preview)) {
return { success: true, team_id: "preview" };
}
const normalizedApi = parseApi(token);
// make sure api key is valid, based on the api_keys table in supabase
const { data, error } = await supabase_service
.from("api_keys")
.select("*")
.eq("key", normalizedApi);
if (error || !data || data.length === 0) {
return { success: false, error: "Unauthorized: Invalid token", status: 401 };
}
return { success: true, team_id: data[0].team_id };
}
app.post("/v0/scrape", async (req, res) => {
try {
// make sure to authenticate user first, Bearer <token>
const { success, team_id, error, status } = await authenticateUser(req, res, RateLimiterMode.Scrape);
if (!success) {
return res.status(status).json({ error });
}
const crawlerOptions = req.body.crawlerOptions ?? {};
try {
const { success: creditsCheckSuccess, message: creditsCheckMessage } =
await checkTeamCredits(team_id, 1);
if (!creditsCheckSuccess) {
return res.status(402).json({ error: "Insufficient credits" });
}
} catch (error) {
console.error(error);
return res.status(500).json({ error: "Internal server error" });
}
// authenticate on supabase
const url = req.body.url;
if (!url) {
return res.status(400).json({ error: "Url is required" });
}
const pageOptions = req.body.pageOptions ?? { onlyMainContent: false };
try {
const a = new WebScraperDataProvider();
await a.setOptions({
mode: "single_urls",
urls: [url],
crawlerOptions: {
...crawlerOptions,
},
pageOptions: pageOptions,
});
const docs = await a.getDocuments(false);
// make sure doc.content is not empty
const filteredDocs = docs.filter(
(doc: { content?: string }) =>
doc.content && doc.content.trim().length > 0
);
if (filteredDocs.length === 0) {
return res.status(200).json({ success: true, data: [] });
}
const { success, credit_usage } = await billTeam(
team_id,
filteredDocs.length
);
if (!success) {
// throw new Error("Failed to bill team, no subscription was found");
// return {
// success: false,
// message: "Failed to bill team, no subscription was found",
// docs: [],
// };
return res
.status(402)
.json({ error: "Failed to bill, no subscription was found" });
}
return res.json({
success: true,
data: filteredDocs[0],
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
app.post("/v0/crawl", async (req, res) => {
try {
const { success, team_id, error, status } = await authenticateUser(req, res, RateLimiterMode.Crawl);
if (!success) {
return res.status(status).json({ error });
}
const { success: creditsCheckSuccess, message: creditsCheckMessage } =
await checkTeamCredits(team_id, 1);
if (!creditsCheckSuccess) {
return res.status(402).json({ error: "Insufficient credits" });
}
// authenticate on supabase
const url = req.body.url;
if (!url) {
return res.status(400).json({ error: "Url is required" });
}
const mode = req.body.mode ?? "crawl";
const crawlerOptions = req.body.crawlerOptions ?? {};
const pageOptions = req.body.pageOptions ?? { onlyMainContent: false };
if (mode === "single_urls" && !url.includes(",")) {
try {
const a = new WebScraperDataProvider();
await a.setOptions({
mode: "single_urls",
urls: [url],
crawlerOptions: {
returnOnlyUrls: true,
},
pageOptions: pageOptions,
});
const docs = await a.getDocuments(false, (progress) => {
job.progress({
current: progress.current,
total: progress.total,
current_step: "SCRAPING",
current_url: progress.currentDocumentUrl,
});
});
return res.json({
success: true,
documents: docs,
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
}
const job = await addWebScraperJob({
url: url,
mode: mode ?? "crawl", // fix for single urls not working
crawlerOptions: { ...crawlerOptions },
team_id: team_id,
pageOptions: pageOptions,
});
res.json({ jobId: job.id });
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
app.post("/v0/crawlWebsitePreview", async (req, res) => {
try {
const { success, team_id, error, status } = await authenticateUser(req, res, RateLimiterMode.Preview);
if (!success) {
return res.status(status).json({ error });
}
// authenticate on supabase
const url = req.body.url;
if (!url) {
return res.status(400).json({ error: "Url is required" });
}
const mode = req.body.mode ?? "crawl";
const crawlerOptions = req.body.crawlerOptions ?? {};
const pageOptions = req.body.pageOptions ?? { onlyMainContent: false };
const job = await addWebScraperJob({
url: url,
mode: mode ?? "crawl", // fix for single urls not working
crawlerOptions: { ...crawlerOptions, limit: 5, maxCrawledLinks: 5 },
team_id: "preview",
pageOptions: pageOptions,
});
res.json({ jobId: job.id });
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
app.get("/v0/crawl/status/:jobId", async (req, res) => {
try {
const { success, team_id, error, status } = await authenticateUser(req, res, RateLimiterMode.CrawlStatus);
if (!success) {
return res.status(status).json({ error });
}
const job = await getWebScraperQueue().getJob(req.params.jobId);
if (!job) {
return res.status(404).json({ error: "Job not found" });
}
const { current, current_url, total, current_step } = await job.progress();
res.json({
status: await job.getState(),
// progress: job.progress(),
current: current,
current_url: current_url,
current_step: current_step,
total: total,
data: job.returnvalue,
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
app.get("/v0/checkJobStatus/:jobId", async (req, res) => {
try {
const job = await getWebScraperQueue().getJob(req.params.jobId);
if (!job) {
return res.status(404).json({ error: "Job not found" });
}
const { current, current_url, total, current_step } = await job.progress();
res.json({
status: await job.getState(),
// progress: job.progress(),
current: current,
current_url: current_url,
current_step: current_step,
total: total,
data: job.returnvalue,
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
// register router
app.use(v0Router);
const DEFAULT_PORT = process.env.PORT ?? 3002;
const HOST = process.env.HOST ?? "localhost";
@ -316,7 +57,9 @@ redisClient.connect();
export function startServer(port = DEFAULT_PORT) {
const server = app.listen(Number(port), HOST, () => {
console.log(`Server listening on port ${port}`);
console.log(`For the UI, open http://${HOST}:${port}/admin/${process.env.BULL_AUTH_KEY}/queues`);
console.log(
`For the UI, open http://${HOST}:${port}/admin/${process.env.BULL_AUTH_KEY}/queues`
);
console.log("");
console.log("1. Make sure Redis is running on port 6379 by default");
console.log(
@ -353,4 +96,3 @@ app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => {
app.get("/is-production", (req, res) => {
res.send({ isProduction: global.isProduction });
});

14
apps/api/src/routes/v0.ts Normal file
View File

@ -0,0 +1,14 @@
import express from "express";
import { crawlController } from "../../src/controllers/crawl";
import { crawlStatusController } from "../../src/controllers/crawl-status";
import { scrapeController } from "../../src/controllers/scrape";
import { crawlPreviewController } from "../../src/controllers/crawlPreview";
import { crawlJobStatusPreviewController } from "../../src/controllers/status";
export const v0Router = express.Router();
v0Router.post("/v0/scrape", scrapeController);
v0Router.post("/v0/crawl", crawlController);
v0Router.post("/v0/crawlWebsitePreview", crawlPreviewController);
v0Router.get("/v0/crawl/status/:jobId", crawlStatusController);
v0Router.get("/v0/checkJobStatus/:jobId", crawlJobStatusPreviewController);