From 6d5da358cca6f6ef3b9d047fec7c1eea63997664 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Mon, 6 May 2024 17:16:43 -0700 Subject: [PATCH] Nick: cancel job --- apps/api/fly.toml | 15 +- .../src/__tests__/e2e_withAuth/index.test.ts | 35 +++++ apps/api/src/controllers/crawl-cancel.ts | 50 ++++++ apps/api/src/controllers/crawl.ts | 13 +- apps/api/src/lib/entities.ts | 1 + apps/api/src/main/runWebScraper.ts | 4 + apps/api/src/routes/v0.ts | 2 + apps/api/src/scraper/WebScraper/index.ts | 148 ++++++++++++------ apps/api/src/services/logging/crawl_log.ts | 17 ++ 9 files changed, 236 insertions(+), 49 deletions(-) create mode 100644 apps/api/src/controllers/crawl-cancel.ts create mode 100644 apps/api/src/services/logging/crawl_log.ts diff --git a/apps/api/fly.toml b/apps/api/fly.toml index 1272f4b..ca619d1 100644 --- a/apps/api/fly.toml +++ b/apps/api/fly.toml @@ -22,6 +22,11 @@ kill_timeout = '5s' min_machines_running = 2 processes = ['app'] +[http_service.concurrency] + type = "requests" + hard_limit = 200 + soft_limit = 100 + [[services]] protocol = 'tcp' internal_port = 8080 @@ -38,10 +43,14 @@ kill_timeout = '5s' [services.concurrency] type = 'connections' - hard_limit = 45 - soft_limit = 20 + hard_limit = 75 + soft_limit = 30 [[vm]] - size = 'performance-1x' + size = 'performance-4x' + processes = ['app'] + + + diff --git a/apps/api/src/__tests__/e2e_withAuth/index.test.ts b/apps/api/src/__tests__/e2e_withAuth/index.test.ts index c6c59bc..78d20e4 100644 --- a/apps/api/src/__tests__/e2e_withAuth/index.test.ts +++ b/apps/api/src/__tests__/e2e_withAuth/index.test.ts @@ -252,6 +252,41 @@ describe("E2E Tests for API Routes", () => { }, 60000); // 60 seconds }); + it("If someone cancels a crawl job, it should turn into failed status", async () => { + const crawlResponse = await request(TEST_URL) + .post("/v0/crawl") + .set("Authorization", `Bearer ${process.env.TEST_API_KEY}`) + .set("Content-Type", "application/json") + .send({ url: "https://jestjs.io" }); + expect(crawlResponse.statusCode).toBe(200); + + + + // wait for 30 seconds + await new Promise((r) => setTimeout(r, 10000)); + + const response = await request(TEST_URL) + .delete(`/v0/crawl/cancel/${crawlResponse.body.jobId}`) + .set("Authorization", `Bearer ${process.env.TEST_API_KEY}`); + expect(response.statusCode).toBe(200); + expect(response.body).toHaveProperty("status"); + expect(response.body.status).toBe("cancelled"); + + await new Promise((r) => setTimeout(r, 20000)); + + const completedResponse = await request(TEST_URL) + .get(`/v0/crawl/status/${crawlResponse.body.jobId}`) + .set("Authorization", `Bearer ${process.env.TEST_API_KEY}`); + expect(completedResponse.statusCode).toBe(200); + expect(completedResponse.body).toHaveProperty("status"); + expect(completedResponse.body.status).toBe("failed"); + expect(completedResponse.body.partial_data?.length ?? 0).toBeLessThanOrEqual(completedResponse.body.data?.length ?? 0); + + + }, 60000); // 60 seconds + + + describe("POST /v0/scrape with LLM Extraction", () => { it("should extract data using LLM extraction mode", async () => { const response = await request(TEST_URL) diff --git a/apps/api/src/controllers/crawl-cancel.ts b/apps/api/src/controllers/crawl-cancel.ts new file mode 100644 index 0000000..7523b78 --- /dev/null +++ b/apps/api/src/controllers/crawl-cancel.ts @@ -0,0 +1,50 @@ +import { Request, Response } from "express"; +import { authenticateUser } from "./auth"; +import { RateLimiterMode } from "../../src/types"; +import { addWebScraperJob } from "../../src/services/queue-jobs"; +import { getWebScraperQueue } from "../../src/services/queue-service"; +import { supabase_service } from "../../src/services/supabase"; + +export async function crawlCancelController(req: Request, res: Response) { + try { + const { success, team_id, error, status } = await authenticateUser( + req, + res, + RateLimiterMode.CrawlStatus + ); + if (!success) { + return res.status(status).json({ error }); + } + const job = await getWebScraperQueue().getJob(req.params.jobId); + if (!job) { + return res.status(404).json({ error: "Job not found" }); + } + + // check if the job belongs to the team + const {data, error: supaError}= await supabase_service.from("bulljobs_teams").select("*").eq("job_id", req.params.jobId).eq("team_id", team_id); + if (supaError) { + return res.status(500).json({ error: supaError.message }); + } + + if (data.length === 0) { + return res.status(403).json({ error: "Unauthorized" }); + } + + try { + await job.moveToFailed(Error("Job cancelled by user"), true); + + } catch (error) { + console.error(error); + + } + + const jobState = await job.getState(); + + res.json({ + status: jobState === "failed" ? "cancelled" : "Cancelling...", + }); + } catch (error) { + console.error(error); + return res.status(500).json({ error: error.message }); + } +} diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index 3d64f7f..8b5249b 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -6,6 +6,7 @@ import { authenticateUser } from "./auth"; import { RateLimiterMode } from "../../src/types"; import { addWebScraperJob } from "../../src/services/queue-jobs"; import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist"; +import { logCrawl } from "../../src/services/logging/crawl_log"; export async function crawlController(req: Request, res: Response) { try { @@ -30,9 +31,14 @@ export async function crawlController(req: Request, res: Response) { } if (isUrlBlocked(url)) { - return res.status(403).json({ error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it." }); + return res + .status(403) + .json({ + error: + "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", + }); } - + const mode = req.body.mode ?? "crawl"; const crawlerOptions = req.body.crawlerOptions ?? {}; const pageOptions = req.body.pageOptions ?? { onlyMainContent: false }; @@ -66,6 +72,7 @@ export async function crawlController(req: Request, res: Response) { return res.status(500).json({ error: error.message }); } } + const job = await addWebScraperJob({ url: url, mode: mode ?? "crawl", // fix for single urls not working @@ -75,6 +82,8 @@ export async function crawlController(req: Request, res: Response) { origin: req.body.origin ?? "api", }); + await logCrawl(job.id.toString(), team_id); + res.json({ jobId: job.id }); } catch (error) { console.error(error); diff --git a/apps/api/src/lib/entities.ts b/apps/api/src/lib/entities.ts index 5b663f2..1bb9429 100644 --- a/apps/api/src/lib/entities.ts +++ b/apps/api/src/lib/entities.ts @@ -47,6 +47,7 @@ export type WebScraperOptions = { pageOptions?: PageOptions; extractorOptions?: ExtractorOptions; concurrentRequests?: number; + bullJobId?: string; }; export interface DocumentUrl { diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 827eec5..252f2e4 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -27,6 +27,7 @@ export async function startWebScraperPipeline({ job.moveToFailed(error); }, team_id: job.data.team_id, + bull_job_id: job.id.toString(), })) as { success: boolean; message: string; docs: Document[] }; } export async function runWebScraper({ @@ -38,6 +39,7 @@ export async function runWebScraper({ onSuccess, onError, team_id, + bull_job_id, }: { url: string; mode: "crawl" | "single_urls" | "sitemap"; @@ -47,6 +49,7 @@ export async function runWebScraper({ onSuccess: (result: any) => void; onError: (error: any) => void; team_id: string; + bull_job_id: string; }): Promise<{ success: boolean; message: string; @@ -60,6 +63,7 @@ export async function runWebScraper({ urls: [url], crawlerOptions: crawlerOptions, pageOptions: pageOptions, + bullJobId: bull_job_id, }); } else { await provider.setOptions({ diff --git a/apps/api/src/routes/v0.ts b/apps/api/src/routes/v0.ts index f84b974..42b8814 100644 --- a/apps/api/src/routes/v0.ts +++ b/apps/api/src/routes/v0.ts @@ -5,6 +5,7 @@ import { scrapeController } from "../../src/controllers/scrape"; import { crawlPreviewController } from "../../src/controllers/crawlPreview"; import { crawlJobStatusPreviewController } from "../../src/controllers/status"; import { searchController } from "../../src/controllers/search"; +import { crawlCancelController } from "../../src/controllers/crawl-cancel"; export const v0Router = express.Router(); @@ -12,6 +13,7 @@ v0Router.post("/v0/scrape", scrapeController); v0Router.post("/v0/crawl", crawlController); v0Router.post("/v0/crawlWebsitePreview", crawlPreviewController); v0Router.get("/v0/crawl/status/:jobId", crawlStatusController); +v0Router.delete("/v0/crawl/cancel/:jobId", crawlCancelController); v0Router.get("/v0/checkJobStatus/:jobId", crawlJobStatusPreviewController); // Search routes diff --git a/apps/api/src/scraper/WebScraper/index.ts b/apps/api/src/scraper/WebScraper/index.ts index 1e28552..18624e1 100644 --- a/apps/api/src/scraper/WebScraper/index.ts +++ b/apps/api/src/scraper/WebScraper/index.ts @@ -1,4 +1,9 @@ -import { Document, ExtractorOptions, PageOptions, WebScraperOptions } from "../../lib/entities"; +import { + Document, + ExtractorOptions, + PageOptions, + WebScraperOptions, +} from "../../lib/entities"; import { Progress } from "../../lib/entities"; import { scrapSingleUrl } from "./single_url"; import { SitemapEntry, fetchSitemapData, getLinksFromSitemap } from "./sitemap"; @@ -6,11 +11,15 @@ import { WebCrawler } from "./crawler"; import { getValue, setValue } from "../../services/redis"; import { getImageDescription } from "./utils/imageDescription"; import { fetchAndProcessPdf } from "./utils/pdfProcessor"; -import { replaceImgPathsWithAbsolutePaths, replacePathsWithAbsolutePaths } from "./utils/replacePaths"; +import { + replaceImgPathsWithAbsolutePaths, + replacePathsWithAbsolutePaths, +} from "./utils/replacePaths"; import { generateCompletions } from "../../lib/LLM-extraction"; - +import { getWebScraperQueue } from "../../../src/services/queue-service"; export class WebScraperDataProvider { + private bullJobId: string; private urls: string[] = [""]; private mode: "single_urls" | "sitemap" | "crawl" = "single_urls"; private includes: string[]; @@ -23,7 +32,8 @@ export class WebScraperDataProvider { private pageOptions?: PageOptions; private extractorOptions?: ExtractorOptions; private replaceAllPathsWithAbsolutePaths?: boolean = false; - private generateImgAltTextModel: "gpt-4-turbo" | "claude-3-opus" = "gpt-4-turbo"; + private generateImgAltTextModel: "gpt-4-turbo" | "claude-3-opus" = + "gpt-4-turbo"; authorize(): void { throw new Error("Method not implemented."); @@ -39,7 +49,7 @@ export class WebScraperDataProvider { ): Promise { const totalUrls = urls.length; let processedUrls = 0; - + const results: (Document | null)[] = new Array(urls.length).fill(null); for (let i = 0; i < urls.length; i += this.concurrentRequests) { const batchUrls = urls.slice(i, i + this.concurrentRequests); @@ -53,12 +63,20 @@ export class WebScraperDataProvider { total: totalUrls, status: "SCRAPING", currentDocumentUrl: url, - currentDocument: result + currentDocument: result, }); } + results[i + index] = result; }) ); + const job = await getWebScraperQueue().getJob(this.bullJobId); + const jobStatus = await job.getState(); + if (jobStatus === "failed") { + throw new Error( + "Job has failed or has been cancelled by the user. Stopping the job..." + ); + } } return results.filter((result) => result !== null) as Document[]; } @@ -87,7 +105,9 @@ export class WebScraperDataProvider { * @param inProgress inProgress * @returns documents */ - private async processDocumentsWithoutCache(inProgress?: (progress: Progress) => void): Promise { + private async processDocumentsWithoutCache( + inProgress?: (progress: Progress) => void + ): Promise { switch (this.mode) { case "crawl": return this.handleCrawlMode(inProgress); @@ -100,7 +120,9 @@ export class WebScraperDataProvider { } } - private async handleCrawlMode(inProgress?: (progress: Progress) => void): Promise { + private async handleCrawlMode( + inProgress?: (progress: Progress) => void + ): Promise { const crawler = new WebCrawler({ initialUrl: this.urls[0], includes: this.includes, @@ -118,12 +140,16 @@ export class WebScraperDataProvider { return this.cacheAndFinalizeDocuments(documents, links); } - private async handleSingleUrlsMode(inProgress?: (progress: Progress) => void): Promise { + private async handleSingleUrlsMode( + inProgress?: (progress: Progress) => void + ): Promise { let documents = await this.processLinks(this.urls, inProgress); return documents; } - private async handleSitemapMode(inProgress?: (progress: Progress) => void): Promise { + private async handleSitemapMode( + inProgress?: (progress: Progress) => void + ): Promise { let links = await getLinksFromSitemap(this.urls[0]); if (this.returnOnlyUrls) { return this.returnOnlyUrlsResponse(links, inProgress); @@ -133,68 +159,90 @@ export class WebScraperDataProvider { return this.cacheAndFinalizeDocuments(documents, links); } - private async returnOnlyUrlsResponse(links: string[], inProgress?: (progress: Progress) => void): Promise { + private async returnOnlyUrlsResponse( + links: string[], + inProgress?: (progress: Progress) => void + ): Promise { inProgress?.({ current: links.length, total: links.length, status: "COMPLETED", currentDocumentUrl: this.urls[0], }); - return links.map(url => ({ + return links.map((url) => ({ content: "", markdown: "", metadata: { sourceURL: url }, })); } - private async processLinks(links: string[], inProgress?: (progress: Progress) => void): Promise { - let pdfLinks = links.filter(link => link.endsWith(".pdf")); + private async processLinks( + links: string[], + inProgress?: (progress: Progress) => void + ): Promise { + let pdfLinks = links.filter((link) => link.endsWith(".pdf")); let pdfDocuments = await this.fetchPdfDocuments(pdfLinks); - links = links.filter(link => !link.endsWith(".pdf")); + links = links.filter((link) => !link.endsWith(".pdf")); let documents = await this.convertUrlsToDocuments(links, inProgress); documents = await this.getSitemapData(this.urls[0], documents); documents = this.applyPathReplacements(documents); documents = await this.applyImgAltText(documents); - - if(this.extractorOptions.mode === "llm-extraction" && this.mode === "single_urls") { - documents = await generateCompletions( - documents, - this.extractorOptions - ) + + if ( + this.extractorOptions.mode === "llm-extraction" && + this.mode === "single_urls" + ) { + documents = await generateCompletions(documents, this.extractorOptions); } return documents.concat(pdfDocuments); } private async fetchPdfDocuments(pdfLinks: string[]): Promise { - return Promise.all(pdfLinks.map(async pdfLink => { - const pdfContent = await fetchAndProcessPdf(pdfLink); - return { - content: pdfContent, - metadata: { sourceURL: pdfLink }, - provider: "web-scraper" - }; - })); + return Promise.all( + pdfLinks.map(async (pdfLink) => { + const pdfContent = await fetchAndProcessPdf(pdfLink); + return { + content: pdfContent, + metadata: { sourceURL: pdfLink }, + provider: "web-scraper", + }; + }) + ); } private applyPathReplacements(documents: Document[]): Document[] { - return this.replaceAllPathsWithAbsolutePaths ? replacePathsWithAbsolutePaths(documents) : replaceImgPathsWithAbsolutePaths(documents); + return this.replaceAllPathsWithAbsolutePaths + ? replacePathsWithAbsolutePaths(documents) + : replaceImgPathsWithAbsolutePaths(documents); } private async applyImgAltText(documents: Document[]): Promise { - return this.generateImgAltText ? this.generatesImgAltText(documents) : documents; + return this.generateImgAltText + ? this.generatesImgAltText(documents) + : documents; } - private async cacheAndFinalizeDocuments(documents: Document[], links: string[]): Promise { + private async cacheAndFinalizeDocuments( + documents: Document[], + links: string[] + ): Promise { await this.setCachedDocuments(documents, links); documents = this.removeChildLinks(documents); return documents.splice(0, this.limit); } - private async processDocumentsWithCache(inProgress?: (progress: Progress) => void): Promise { - let documents = await this.getCachedDocuments(this.urls.slice(0, this.limit)); + private async processDocumentsWithCache( + inProgress?: (progress: Progress) => void + ): Promise { + let documents = await this.getCachedDocuments( + this.urls.slice(0, this.limit) + ); if (documents.length < this.limit) { - const newDocuments: Document[] = await this.getDocuments(false, inProgress); + const newDocuments: Document[] = await this.getDocuments( + false, + inProgress + ); documents = this.mergeNewDocuments(documents, newDocuments); } documents = this.filterDocsExcludeInclude(documents); @@ -202,9 +250,18 @@ export class WebScraperDataProvider { return documents.splice(0, this.limit); } - private mergeNewDocuments(existingDocuments: Document[], newDocuments: Document[]): Document[] { - newDocuments.forEach(doc => { - if (!existingDocuments.some(d => this.normalizeUrl(d.metadata.sourceURL) === this.normalizeUrl(doc.metadata?.sourceURL))) { + private mergeNewDocuments( + existingDocuments: Document[], + newDocuments: Document[] + ): Document[] { + newDocuments.forEach((doc) => { + if ( + !existingDocuments.some( + (d) => + this.normalizeUrl(d.metadata.sourceURL) === + this.normalizeUrl(doc.metadata?.sourceURL) + ) + ) { existingDocuments.push(doc); } }); @@ -285,7 +342,7 @@ export class WebScraperDataProvider { documents.push(cachedDocument); // get children documents - for (const childUrl of (cachedDocument.childrenLinks || [])) { + for (const childUrl of cachedDocument.childrenLinks || []) { const normalizedChildUrl = this.normalizeUrl(childUrl); const childCachedDocumentString = await getValue( "web-scraper-cache:" + normalizedChildUrl @@ -313,6 +370,7 @@ export class WebScraperDataProvider { throw new Error("Urls are required"); } + this.bullJobId = options.bullJobId; this.urls = options.urls; this.mode = options.mode; this.concurrentRequests = options.concurrentRequests ?? 20; @@ -323,9 +381,10 @@ export class WebScraperDataProvider { this.limit = options.crawlerOptions?.limit ?? 10000; this.generateImgAltText = options.crawlerOptions?.generateImgAltText ?? false; - this.pageOptions = options.pageOptions ?? {onlyMainContent: false}; - this.extractorOptions = options.extractorOptions ?? {mode: "markdown"} - this.replaceAllPathsWithAbsolutePaths = options.crawlerOptions?.replaceAllPathsWithAbsolutePaths ?? false; + this.pageOptions = options.pageOptions ?? { onlyMainContent: false }; + this.extractorOptions = options.extractorOptions ?? { mode: "markdown" }; + this.replaceAllPathsWithAbsolutePaths = + options.crawlerOptions?.replaceAllPathsWithAbsolutePaths ?? false; //! @nicolas, for some reason this was being injected and breakign everything. Don't have time to find source of the issue so adding this check this.excludes = this.excludes.filter((item) => item !== ""); @@ -396,8 +455,9 @@ export class WebScraperDataProvider { altText = await getImageDescription( imageUrl, backText, - frontText - , this.generateImgAltTextModel); + frontText, + this.generateImgAltTextModel + ); } document.content = document.content.replace( diff --git a/apps/api/src/services/logging/crawl_log.ts b/apps/api/src/services/logging/crawl_log.ts new file mode 100644 index 0000000..76a0607 --- /dev/null +++ b/apps/api/src/services/logging/crawl_log.ts @@ -0,0 +1,17 @@ +import { supabase_service } from "../supabase"; +import "dotenv/config"; + +export async function logCrawl(job_id: string, team_id: string) { + try { + const { data, error } = await supabase_service + .from("bulljobs_teams") + .insert([ + { + job_id: job_id, + team_id: team_id, + }, + ]); + } catch (error) { + console.error("Error logging crawl job:\n", error); + } +}