0

Nick: cancel job

This commit is contained in:
Nicolas 2024-05-06 17:16:43 -07:00
parent 6913fda710
commit 6d5da358cc
9 changed files with 236 additions and 49 deletions

View File

@ -22,6 +22,11 @@ kill_timeout = '5s'
min_machines_running = 2 min_machines_running = 2
processes = ['app'] processes = ['app']
[http_service.concurrency]
type = "requests"
hard_limit = 200
soft_limit = 100
[[services]] [[services]]
protocol = 'tcp' protocol = 'tcp'
internal_port = 8080 internal_port = 8080
@ -38,10 +43,14 @@ kill_timeout = '5s'
[services.concurrency] [services.concurrency]
type = 'connections' type = 'connections'
hard_limit = 45 hard_limit = 75
soft_limit = 20 soft_limit = 30
[[vm]] [[vm]]
size = 'performance-1x' size = 'performance-4x'
processes = ['app']

View File

@ -252,6 +252,41 @@ describe("E2E Tests for API Routes", () => {
}, 60000); // 60 seconds }, 60000); // 60 seconds
}); });
it("If someone cancels a crawl job, it should turn into failed status", async () => {
const crawlResponse = await request(TEST_URL)
.post("/v0/crawl")
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
.set("Content-Type", "application/json")
.send({ url: "https://jestjs.io" });
expect(crawlResponse.statusCode).toBe(200);
// wait for 30 seconds
await new Promise((r) => setTimeout(r, 10000));
const response = await request(TEST_URL)
.delete(`/v0/crawl/cancel/${crawlResponse.body.jobId}`)
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`);
expect(response.statusCode).toBe(200);
expect(response.body).toHaveProperty("status");
expect(response.body.status).toBe("cancelled");
await new Promise((r) => setTimeout(r, 20000));
const completedResponse = await request(TEST_URL)
.get(`/v0/crawl/status/${crawlResponse.body.jobId}`)
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`);
expect(completedResponse.statusCode).toBe(200);
expect(completedResponse.body).toHaveProperty("status");
expect(completedResponse.body.status).toBe("failed");
expect(completedResponse.body.partial_data?.length ?? 0).toBeLessThanOrEqual(completedResponse.body.data?.length ?? 0);
}, 60000); // 60 seconds
describe("POST /v0/scrape with LLM Extraction", () => { describe("POST /v0/scrape with LLM Extraction", () => {
it("should extract data using LLM extraction mode", async () => { it("should extract data using LLM extraction mode", async () => {
const response = await request(TEST_URL) const response = await request(TEST_URL)

View File

@ -0,0 +1,50 @@
import { Request, Response } from "express";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs";
import { getWebScraperQueue } from "../../src/services/queue-service";
import { supabase_service } from "../../src/services/supabase";
export async function crawlCancelController(req: Request, res: Response) {
try {
const { success, team_id, error, status } = await authenticateUser(
req,
res,
RateLimiterMode.CrawlStatus
);
if (!success) {
return res.status(status).json({ error });
}
const job = await getWebScraperQueue().getJob(req.params.jobId);
if (!job) {
return res.status(404).json({ error: "Job not found" });
}
// check if the job belongs to the team
const {data, error: supaError}= await supabase_service.from("bulljobs_teams").select("*").eq("job_id", req.params.jobId).eq("team_id", team_id);
if (supaError) {
return res.status(500).json({ error: supaError.message });
}
if (data.length === 0) {
return res.status(403).json({ error: "Unauthorized" });
}
try {
await job.moveToFailed(Error("Job cancelled by user"), true);
} catch (error) {
console.error(error);
}
const jobState = await job.getState();
res.json({
status: jobState === "failed" ? "cancelled" : "Cancelling...",
});
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
}

View File

@ -6,6 +6,7 @@ import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types"; import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs"; import { addWebScraperJob } from "../../src/services/queue-jobs";
import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist"; import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist";
import { logCrawl } from "../../src/services/logging/crawl_log";
export async function crawlController(req: Request, res: Response) { export async function crawlController(req: Request, res: Response) {
try { try {
@ -30,7 +31,12 @@ export async function crawlController(req: Request, res: Response) {
} }
if (isUrlBlocked(url)) { if (isUrlBlocked(url)) {
return res.status(403).json({ error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it." }); return res
.status(403)
.json({
error:
"Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.",
});
} }
const mode = req.body.mode ?? "crawl"; const mode = req.body.mode ?? "crawl";
@ -66,6 +72,7 @@ export async function crawlController(req: Request, res: Response) {
return res.status(500).json({ error: error.message }); return res.status(500).json({ error: error.message });
} }
} }
const job = await addWebScraperJob({ const job = await addWebScraperJob({
url: url, url: url,
mode: mode ?? "crawl", // fix for single urls not working mode: mode ?? "crawl", // fix for single urls not working
@ -75,6 +82,8 @@ export async function crawlController(req: Request, res: Response) {
origin: req.body.origin ?? "api", origin: req.body.origin ?? "api",
}); });
await logCrawl(job.id.toString(), team_id);
res.json({ jobId: job.id }); res.json({ jobId: job.id });
} catch (error) { } catch (error) {
console.error(error); console.error(error);

View File

@ -47,6 +47,7 @@ export type WebScraperOptions = {
pageOptions?: PageOptions; pageOptions?: PageOptions;
extractorOptions?: ExtractorOptions; extractorOptions?: ExtractorOptions;
concurrentRequests?: number; concurrentRequests?: number;
bullJobId?: string;
}; };
export interface DocumentUrl { export interface DocumentUrl {

View File

@ -27,6 +27,7 @@ export async function startWebScraperPipeline({
job.moveToFailed(error); job.moveToFailed(error);
}, },
team_id: job.data.team_id, team_id: job.data.team_id,
bull_job_id: job.id.toString(),
})) as { success: boolean; message: string; docs: Document[] }; })) as { success: boolean; message: string; docs: Document[] };
} }
export async function runWebScraper({ export async function runWebScraper({
@ -38,6 +39,7 @@ export async function runWebScraper({
onSuccess, onSuccess,
onError, onError,
team_id, team_id,
bull_job_id,
}: { }: {
url: string; url: string;
mode: "crawl" | "single_urls" | "sitemap"; mode: "crawl" | "single_urls" | "sitemap";
@ -47,6 +49,7 @@ export async function runWebScraper({
onSuccess: (result: any) => void; onSuccess: (result: any) => void;
onError: (error: any) => void; onError: (error: any) => void;
team_id: string; team_id: string;
bull_job_id: string;
}): Promise<{ }): Promise<{
success: boolean; success: boolean;
message: string; message: string;
@ -60,6 +63,7 @@ export async function runWebScraper({
urls: [url], urls: [url],
crawlerOptions: crawlerOptions, crawlerOptions: crawlerOptions,
pageOptions: pageOptions, pageOptions: pageOptions,
bullJobId: bull_job_id,
}); });
} else { } else {
await provider.setOptions({ await provider.setOptions({

View File

@ -5,6 +5,7 @@ import { scrapeController } from "../../src/controllers/scrape";
import { crawlPreviewController } from "../../src/controllers/crawlPreview"; import { crawlPreviewController } from "../../src/controllers/crawlPreview";
import { crawlJobStatusPreviewController } from "../../src/controllers/status"; import { crawlJobStatusPreviewController } from "../../src/controllers/status";
import { searchController } from "../../src/controllers/search"; import { searchController } from "../../src/controllers/search";
import { crawlCancelController } from "../../src/controllers/crawl-cancel";
export const v0Router = express.Router(); export const v0Router = express.Router();
@ -12,6 +13,7 @@ v0Router.post("/v0/scrape", scrapeController);
v0Router.post("/v0/crawl", crawlController); v0Router.post("/v0/crawl", crawlController);
v0Router.post("/v0/crawlWebsitePreview", crawlPreviewController); v0Router.post("/v0/crawlWebsitePreview", crawlPreviewController);
v0Router.get("/v0/crawl/status/:jobId", crawlStatusController); v0Router.get("/v0/crawl/status/:jobId", crawlStatusController);
v0Router.delete("/v0/crawl/cancel/:jobId", crawlCancelController);
v0Router.get("/v0/checkJobStatus/:jobId", crawlJobStatusPreviewController); v0Router.get("/v0/checkJobStatus/:jobId", crawlJobStatusPreviewController);
// Search routes // Search routes

View File

@ -1,4 +1,9 @@
import { Document, ExtractorOptions, PageOptions, WebScraperOptions } from "../../lib/entities"; import {
Document,
ExtractorOptions,
PageOptions,
WebScraperOptions,
} from "../../lib/entities";
import { Progress } from "../../lib/entities"; import { Progress } from "../../lib/entities";
import { scrapSingleUrl } from "./single_url"; import { scrapSingleUrl } from "./single_url";
import { SitemapEntry, fetchSitemapData, getLinksFromSitemap } from "./sitemap"; import { SitemapEntry, fetchSitemapData, getLinksFromSitemap } from "./sitemap";
@ -6,11 +11,15 @@ import { WebCrawler } from "./crawler";
import { getValue, setValue } from "../../services/redis"; import { getValue, setValue } from "../../services/redis";
import { getImageDescription } from "./utils/imageDescription"; import { getImageDescription } from "./utils/imageDescription";
import { fetchAndProcessPdf } from "./utils/pdfProcessor"; import { fetchAndProcessPdf } from "./utils/pdfProcessor";
import { replaceImgPathsWithAbsolutePaths, replacePathsWithAbsolutePaths } from "./utils/replacePaths"; import {
replaceImgPathsWithAbsolutePaths,
replacePathsWithAbsolutePaths,
} from "./utils/replacePaths";
import { generateCompletions } from "../../lib/LLM-extraction"; import { generateCompletions } from "../../lib/LLM-extraction";
import { getWebScraperQueue } from "../../../src/services/queue-service";
export class WebScraperDataProvider { export class WebScraperDataProvider {
private bullJobId: string;
private urls: string[] = [""]; private urls: string[] = [""];
private mode: "single_urls" | "sitemap" | "crawl" = "single_urls"; private mode: "single_urls" | "sitemap" | "crawl" = "single_urls";
private includes: string[]; private includes: string[];
@ -23,7 +32,8 @@ export class WebScraperDataProvider {
private pageOptions?: PageOptions; private pageOptions?: PageOptions;
private extractorOptions?: ExtractorOptions; private extractorOptions?: ExtractorOptions;
private replaceAllPathsWithAbsolutePaths?: boolean = false; private replaceAllPathsWithAbsolutePaths?: boolean = false;
private generateImgAltTextModel: "gpt-4-turbo" | "claude-3-opus" = "gpt-4-turbo"; private generateImgAltTextModel: "gpt-4-turbo" | "claude-3-opus" =
"gpt-4-turbo";
authorize(): void { authorize(): void {
throw new Error("Method not implemented."); throw new Error("Method not implemented.");
@ -53,12 +63,20 @@ export class WebScraperDataProvider {
total: totalUrls, total: totalUrls,
status: "SCRAPING", status: "SCRAPING",
currentDocumentUrl: url, currentDocumentUrl: url,
currentDocument: result currentDocument: result,
}); });
} }
results[i + index] = result; results[i + index] = result;
}) })
); );
const job = await getWebScraperQueue().getJob(this.bullJobId);
const jobStatus = await job.getState();
if (jobStatus === "failed") {
throw new Error(
"Job has failed or has been cancelled by the user. Stopping the job..."
);
}
} }
return results.filter((result) => result !== null) as Document[]; return results.filter((result) => result !== null) as Document[];
} }
@ -87,7 +105,9 @@ export class WebScraperDataProvider {
* @param inProgress inProgress * @param inProgress inProgress
* @returns documents * @returns documents
*/ */
private async processDocumentsWithoutCache(inProgress?: (progress: Progress) => void): Promise<Document[]> { private async processDocumentsWithoutCache(
inProgress?: (progress: Progress) => void
): Promise<Document[]> {
switch (this.mode) { switch (this.mode) {
case "crawl": case "crawl":
return this.handleCrawlMode(inProgress); return this.handleCrawlMode(inProgress);
@ -100,7 +120,9 @@ export class WebScraperDataProvider {
} }
} }
private async handleCrawlMode(inProgress?: (progress: Progress) => void): Promise<Document[]> { private async handleCrawlMode(
inProgress?: (progress: Progress) => void
): Promise<Document[]> {
const crawler = new WebCrawler({ const crawler = new WebCrawler({
initialUrl: this.urls[0], initialUrl: this.urls[0],
includes: this.includes, includes: this.includes,
@ -118,12 +140,16 @@ export class WebScraperDataProvider {
return this.cacheAndFinalizeDocuments(documents, links); return this.cacheAndFinalizeDocuments(documents, links);
} }
private async handleSingleUrlsMode(inProgress?: (progress: Progress) => void): Promise<Document[]> { private async handleSingleUrlsMode(
inProgress?: (progress: Progress) => void
): Promise<Document[]> {
let documents = await this.processLinks(this.urls, inProgress); let documents = await this.processLinks(this.urls, inProgress);
return documents; return documents;
} }
private async handleSitemapMode(inProgress?: (progress: Progress) => void): Promise<Document[]> { private async handleSitemapMode(
inProgress?: (progress: Progress) => void
): Promise<Document[]> {
let links = await getLinksFromSitemap(this.urls[0]); let links = await getLinksFromSitemap(this.urls[0]);
if (this.returnOnlyUrls) { if (this.returnOnlyUrls) {
return this.returnOnlyUrlsResponse(links, inProgress); return this.returnOnlyUrlsResponse(links, inProgress);
@ -133,68 +159,90 @@ export class WebScraperDataProvider {
return this.cacheAndFinalizeDocuments(documents, links); return this.cacheAndFinalizeDocuments(documents, links);
} }
private async returnOnlyUrlsResponse(links: string[], inProgress?: (progress: Progress) => void): Promise<Document[]> { private async returnOnlyUrlsResponse(
links: string[],
inProgress?: (progress: Progress) => void
): Promise<Document[]> {
inProgress?.({ inProgress?.({
current: links.length, current: links.length,
total: links.length, total: links.length,
status: "COMPLETED", status: "COMPLETED",
currentDocumentUrl: this.urls[0], currentDocumentUrl: this.urls[0],
}); });
return links.map(url => ({ return links.map((url) => ({
content: "", content: "",
markdown: "", markdown: "",
metadata: { sourceURL: url }, metadata: { sourceURL: url },
})); }));
} }
private async processLinks(links: string[], inProgress?: (progress: Progress) => void): Promise<Document[]> { private async processLinks(
let pdfLinks = links.filter(link => link.endsWith(".pdf")); links: string[],
inProgress?: (progress: Progress) => void
): Promise<Document[]> {
let pdfLinks = links.filter((link) => link.endsWith(".pdf"));
let pdfDocuments = await this.fetchPdfDocuments(pdfLinks); let pdfDocuments = await this.fetchPdfDocuments(pdfLinks);
links = links.filter(link => !link.endsWith(".pdf")); links = links.filter((link) => !link.endsWith(".pdf"));
let documents = await this.convertUrlsToDocuments(links, inProgress); let documents = await this.convertUrlsToDocuments(links, inProgress);
documents = await this.getSitemapData(this.urls[0], documents); documents = await this.getSitemapData(this.urls[0], documents);
documents = this.applyPathReplacements(documents); documents = this.applyPathReplacements(documents);
documents = await this.applyImgAltText(documents); documents = await this.applyImgAltText(documents);
if(this.extractorOptions.mode === "llm-extraction" && this.mode === "single_urls") { if (
documents = await generateCompletions( this.extractorOptions.mode === "llm-extraction" &&
documents, this.mode === "single_urls"
this.extractorOptions ) {
) documents = await generateCompletions(documents, this.extractorOptions);
} }
return documents.concat(pdfDocuments); return documents.concat(pdfDocuments);
} }
private async fetchPdfDocuments(pdfLinks: string[]): Promise<Document[]> { private async fetchPdfDocuments(pdfLinks: string[]): Promise<Document[]> {
return Promise.all(pdfLinks.map(async pdfLink => { return Promise.all(
pdfLinks.map(async (pdfLink) => {
const pdfContent = await fetchAndProcessPdf(pdfLink); const pdfContent = await fetchAndProcessPdf(pdfLink);
return { return {
content: pdfContent, content: pdfContent,
metadata: { sourceURL: pdfLink }, metadata: { sourceURL: pdfLink },
provider: "web-scraper" provider: "web-scraper",
}; };
})); })
);
} }
private applyPathReplacements(documents: Document[]): Document[] { private applyPathReplacements(documents: Document[]): Document[] {
return this.replaceAllPathsWithAbsolutePaths ? replacePathsWithAbsolutePaths(documents) : replaceImgPathsWithAbsolutePaths(documents); return this.replaceAllPathsWithAbsolutePaths
? replacePathsWithAbsolutePaths(documents)
: replaceImgPathsWithAbsolutePaths(documents);
} }
private async applyImgAltText(documents: Document[]): Promise<Document[]> { private async applyImgAltText(documents: Document[]): Promise<Document[]> {
return this.generateImgAltText ? this.generatesImgAltText(documents) : documents; return this.generateImgAltText
? this.generatesImgAltText(documents)
: documents;
} }
private async cacheAndFinalizeDocuments(documents: Document[], links: string[]): Promise<Document[]> { private async cacheAndFinalizeDocuments(
documents: Document[],
links: string[]
): Promise<Document[]> {
await this.setCachedDocuments(documents, links); await this.setCachedDocuments(documents, links);
documents = this.removeChildLinks(documents); documents = this.removeChildLinks(documents);
return documents.splice(0, this.limit); return documents.splice(0, this.limit);
} }
private async processDocumentsWithCache(inProgress?: (progress: Progress) => void): Promise<Document[]> { private async processDocumentsWithCache(
let documents = await this.getCachedDocuments(this.urls.slice(0, this.limit)); inProgress?: (progress: Progress) => void
): Promise<Document[]> {
let documents = await this.getCachedDocuments(
this.urls.slice(0, this.limit)
);
if (documents.length < this.limit) { if (documents.length < this.limit) {
const newDocuments: Document[] = await this.getDocuments(false, inProgress); const newDocuments: Document[] = await this.getDocuments(
false,
inProgress
);
documents = this.mergeNewDocuments(documents, newDocuments); documents = this.mergeNewDocuments(documents, newDocuments);
} }
documents = this.filterDocsExcludeInclude(documents); documents = this.filterDocsExcludeInclude(documents);
@ -202,9 +250,18 @@ export class WebScraperDataProvider {
return documents.splice(0, this.limit); return documents.splice(0, this.limit);
} }
private mergeNewDocuments(existingDocuments: Document[], newDocuments: Document[]): Document[] { private mergeNewDocuments(
newDocuments.forEach(doc => { existingDocuments: Document[],
if (!existingDocuments.some(d => this.normalizeUrl(d.metadata.sourceURL) === this.normalizeUrl(doc.metadata?.sourceURL))) { newDocuments: Document[]
): Document[] {
newDocuments.forEach((doc) => {
if (
!existingDocuments.some(
(d) =>
this.normalizeUrl(d.metadata.sourceURL) ===
this.normalizeUrl(doc.metadata?.sourceURL)
)
) {
existingDocuments.push(doc); existingDocuments.push(doc);
} }
}); });
@ -285,7 +342,7 @@ export class WebScraperDataProvider {
documents.push(cachedDocument); documents.push(cachedDocument);
// get children documents // get children documents
for (const childUrl of (cachedDocument.childrenLinks || [])) { for (const childUrl of cachedDocument.childrenLinks || []) {
const normalizedChildUrl = this.normalizeUrl(childUrl); const normalizedChildUrl = this.normalizeUrl(childUrl);
const childCachedDocumentString = await getValue( const childCachedDocumentString = await getValue(
"web-scraper-cache:" + normalizedChildUrl "web-scraper-cache:" + normalizedChildUrl
@ -313,6 +370,7 @@ export class WebScraperDataProvider {
throw new Error("Urls are required"); throw new Error("Urls are required");
} }
this.bullJobId = options.bullJobId;
this.urls = options.urls; this.urls = options.urls;
this.mode = options.mode; this.mode = options.mode;
this.concurrentRequests = options.concurrentRequests ?? 20; this.concurrentRequests = options.concurrentRequests ?? 20;
@ -324,8 +382,9 @@ export class WebScraperDataProvider {
this.generateImgAltText = this.generateImgAltText =
options.crawlerOptions?.generateImgAltText ?? false; options.crawlerOptions?.generateImgAltText ?? false;
this.pageOptions = options.pageOptions ?? { onlyMainContent: false }; this.pageOptions = options.pageOptions ?? { onlyMainContent: false };
this.extractorOptions = options.extractorOptions ?? {mode: "markdown"} this.extractorOptions = options.extractorOptions ?? { mode: "markdown" };
this.replaceAllPathsWithAbsolutePaths = options.crawlerOptions?.replaceAllPathsWithAbsolutePaths ?? false; this.replaceAllPathsWithAbsolutePaths =
options.crawlerOptions?.replaceAllPathsWithAbsolutePaths ?? false;
//! @nicolas, for some reason this was being injected and breakign everything. Don't have time to find source of the issue so adding this check //! @nicolas, for some reason this was being injected and breakign everything. Don't have time to find source of the issue so adding this check
this.excludes = this.excludes.filter((item) => item !== ""); this.excludes = this.excludes.filter((item) => item !== "");
@ -396,8 +455,9 @@ export class WebScraperDataProvider {
altText = await getImageDescription( altText = await getImageDescription(
imageUrl, imageUrl,
backText, backText,
frontText frontText,
, this.generateImgAltTextModel); this.generateImgAltTextModel
);
} }
document.content = document.content.replace( document.content = document.content.replace(

View File

@ -0,0 +1,17 @@
import { supabase_service } from "../supabase";
import "dotenv/config";
export async function logCrawl(job_id: string, team_id: string) {
try {
const { data, error } = await supabase_service
.from("bulljobs_teams")
.insert([
{
job_id: job_id,
team_id: team_id,
},
]);
} catch (error) {
console.error("Error logging crawl job:\n", error);
}
}