Merge pull request #129 from mendableai/nsc/cancel-job
Cancel Job Route
This commit is contained in:
commit
3459b77f70
@ -22,6 +22,11 @@ kill_timeout = '5s'
|
||||
min_machines_running = 2
|
||||
processes = ['app']
|
||||
|
||||
[http_service.concurrency]
|
||||
type = "requests"
|
||||
hard_limit = 200
|
||||
soft_limit = 100
|
||||
|
||||
[[services]]
|
||||
protocol = 'tcp'
|
||||
internal_port = 8080
|
||||
@ -38,10 +43,14 @@ kill_timeout = '5s'
|
||||
|
||||
[services.concurrency]
|
||||
type = 'connections'
|
||||
hard_limit = 45
|
||||
soft_limit = 20
|
||||
hard_limit = 75
|
||||
soft_limit = 30
|
||||
|
||||
[[vm]]
|
||||
size = 'performance-1x'
|
||||
size = 'performance-4x'
|
||||
processes = ['app']
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -311,6 +311,45 @@ describe("E2E Tests for API Routes", () => {
|
||||
}, 60000); // 60 seconds
|
||||
});
|
||||
|
||||
it("If someone cancels a crawl job, it should turn into failed status", async () => {
|
||||
const crawlResponse = await request(TEST_URL)
|
||||
.post("/v0/crawl")
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
|
||||
.set("Content-Type", "application/json")
|
||||
.send({ url: "https://jestjs.io" });
|
||||
expect(crawlResponse.statusCode).toBe(200);
|
||||
|
||||
|
||||
|
||||
// wait for 30 seconds
|
||||
await new Promise((r) => setTimeout(r, 10000));
|
||||
|
||||
const response = await request(TEST_URL)
|
||||
.delete(`/v0/crawl/cancel/${crawlResponse.body.jobId}`)
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`);
|
||||
expect(response.statusCode).toBe(200);
|
||||
expect(response.body).toHaveProperty("status");
|
||||
expect(response.body.status).toBe("cancelled");
|
||||
|
||||
await new Promise((r) => setTimeout(r, 20000));
|
||||
|
||||
const completedResponse = await request(TEST_URL)
|
||||
.get(`/v0/crawl/status/${crawlResponse.body.jobId}`)
|
||||
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`);
|
||||
expect(completedResponse.statusCode).toBe(200);
|
||||
expect(completedResponse.body).toHaveProperty("status");
|
||||
expect(completedResponse.body.status).toBe("failed");
|
||||
expect(completedResponse.body).toHaveProperty("data");
|
||||
expect(completedResponse.body.data).toEqual(null);
|
||||
expect(completedResponse.body).toHaveProperty("partial_data");
|
||||
expect(completedResponse.body.partial_data[0]).toHaveProperty("content");
|
||||
expect(completedResponse.body.partial_data[0]).toHaveProperty("markdown");
|
||||
expect(completedResponse.body.partial_data[0]).toHaveProperty("metadata");
|
||||
|
||||
}, 60000); // 60 seconds
|
||||
|
||||
|
||||
|
||||
describe("POST /v0/scrape with LLM Extraction", () => {
|
||||
it("should extract data using LLM extraction mode", async () => {
|
||||
const response = await request(TEST_URL)
|
||||
|
62
apps/api/src/controllers/crawl-cancel.ts
Normal file
62
apps/api/src/controllers/crawl-cancel.ts
Normal file
@ -0,0 +1,62 @@
|
||||
import { Request, Response } from "express";
|
||||
import { authenticateUser } from "./auth";
|
||||
import { RateLimiterMode } from "../../src/types";
|
||||
import { addWebScraperJob } from "../../src/services/queue-jobs";
|
||||
import { getWebScraperQueue } from "../../src/services/queue-service";
|
||||
import { supabase_service } from "../../src/services/supabase";
|
||||
import { billTeam } from "../../src/services/billing/credit_billing";
|
||||
|
||||
export async function crawlCancelController(req: Request, res: Response) {
|
||||
try {
|
||||
const { success, team_id, error, status } = await authenticateUser(
|
||||
req,
|
||||
res,
|
||||
RateLimiterMode.CrawlStatus
|
||||
);
|
||||
if (!success) {
|
||||
return res.status(status).json({ error });
|
||||
}
|
||||
const job = await getWebScraperQueue().getJob(req.params.jobId);
|
||||
if (!job) {
|
||||
return res.status(404).json({ error: "Job not found" });
|
||||
}
|
||||
|
||||
// check if the job belongs to the team
|
||||
const { data, error: supaError } = await supabase_service
|
||||
.from("bulljobs_teams")
|
||||
.select("*")
|
||||
.eq("job_id", req.params.jobId)
|
||||
.eq("team_id", team_id);
|
||||
if (supaError) {
|
||||
return res.status(500).json({ error: supaError.message });
|
||||
}
|
||||
|
||||
if (data.length === 0) {
|
||||
return res.status(403).json({ error: "Unauthorized" });
|
||||
}
|
||||
const jobState = await job.getState();
|
||||
const { partialDocs } = await job.progress();
|
||||
|
||||
if (partialDocs && partialDocs.length > 0 && jobState === "active") {
|
||||
console.log("Billing team for partial docs...");
|
||||
// Note: the credits that we will bill them here might be lower than the actual
|
||||
// due to promises that are not yet resolved
|
||||
await billTeam(team_id, partialDocs.length);
|
||||
}
|
||||
|
||||
try {
|
||||
await job.moveToFailed(Error("Job cancelled by user"), true);
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
}
|
||||
|
||||
const newJobState = await job.getState();
|
||||
|
||||
res.json({
|
||||
status: newJobState === "failed" ? "cancelled" : "Cancelling...",
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
return res.status(500).json({ error: error.message });
|
||||
}
|
||||
}
|
@ -6,6 +6,7 @@ import { authenticateUser } from "./auth";
|
||||
import { RateLimiterMode } from "../../src/types";
|
||||
import { addWebScraperJob } from "../../src/services/queue-jobs";
|
||||
import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist";
|
||||
import { logCrawl } from "../../src/services/logging/crawl_log";
|
||||
|
||||
export async function crawlController(req: Request, res: Response) {
|
||||
try {
|
||||
@ -30,7 +31,12 @@ export async function crawlController(req: Request, res: Response) {
|
||||
}
|
||||
|
||||
if (isUrlBlocked(url)) {
|
||||
return res.status(403).json({ error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it." });
|
||||
return res
|
||||
.status(403)
|
||||
.json({
|
||||
error:
|
||||
"Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.",
|
||||
});
|
||||
}
|
||||
|
||||
const mode = req.body.mode ?? "crawl";
|
||||
@ -66,6 +72,7 @@ export async function crawlController(req: Request, res: Response) {
|
||||
return res.status(500).json({ error: error.message });
|
||||
}
|
||||
}
|
||||
|
||||
const job = await addWebScraperJob({
|
||||
url: url,
|
||||
mode: mode ?? "crawl", // fix for single urls not working
|
||||
@ -75,6 +82,8 @@ export async function crawlController(req: Request, res: Response) {
|
||||
origin: req.body.origin ?? "api",
|
||||
});
|
||||
|
||||
await logCrawl(job.id.toString(), team_id);
|
||||
|
||||
res.json({ jobId: job.id });
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
|
@ -47,6 +47,7 @@ export type WebScraperOptions = {
|
||||
pageOptions?: PageOptions;
|
||||
extractorOptions?: ExtractorOptions;
|
||||
concurrentRequests?: number;
|
||||
bullJobId?: string;
|
||||
};
|
||||
|
||||
export interface DocumentUrl {
|
||||
|
@ -26,7 +26,8 @@ export async function startWebScraperPipeline({
|
||||
onError: (error) => {
|
||||
job.moveToFailed(error);
|
||||
},
|
||||
team_id: job.data.team_id
|
||||
team_id: job.data.team_id,
|
||||
bull_job_id: job.id.toString()
|
||||
})) as { success: boolean; message: string; docs: Document[] };
|
||||
}
|
||||
export async function runWebScraper({
|
||||
@ -38,6 +39,7 @@ export async function runWebScraper({
|
||||
onSuccess,
|
||||
onError,
|
||||
team_id,
|
||||
bull_job_id,
|
||||
}: {
|
||||
url: string;
|
||||
mode: "crawl" | "single_urls" | "sitemap";
|
||||
@ -47,6 +49,7 @@ export async function runWebScraper({
|
||||
onSuccess: (result: any) => void;
|
||||
onError: (error: any) => void;
|
||||
team_id: string;
|
||||
bull_job_id: string;
|
||||
}): Promise<{
|
||||
success: boolean;
|
||||
message: string;
|
||||
@ -59,7 +62,8 @@ export async function runWebScraper({
|
||||
mode: mode,
|
||||
urls: [url],
|
||||
crawlerOptions: crawlerOptions,
|
||||
pageOptions: pageOptions
|
||||
pageOptions: pageOptions,
|
||||
bullJobId: bull_job_id
|
||||
});
|
||||
} else {
|
||||
await provider.setOptions({
|
||||
|
@ -5,6 +5,7 @@ import { scrapeController } from "../../src/controllers/scrape";
|
||||
import { crawlPreviewController } from "../../src/controllers/crawlPreview";
|
||||
import { crawlJobStatusPreviewController } from "../../src/controllers/status";
|
||||
import { searchController } from "../../src/controllers/search";
|
||||
import { crawlCancelController } from "../../src/controllers/crawl-cancel";
|
||||
|
||||
export const v0Router = express.Router();
|
||||
|
||||
@ -12,6 +13,7 @@ v0Router.post("/v0/scrape", scrapeController);
|
||||
v0Router.post("/v0/crawl", crawlController);
|
||||
v0Router.post("/v0/crawlWebsitePreview", crawlPreviewController);
|
||||
v0Router.get("/v0/crawl/status/:jobId", crawlStatusController);
|
||||
v0Router.delete("/v0/crawl/cancel/:jobId", crawlCancelController);
|
||||
v0Router.get("/v0/checkJobStatus/:jobId", crawlJobStatusPreviewController);
|
||||
|
||||
// Search routes
|
||||
|
@ -1,4 +1,9 @@
|
||||
import { Document, ExtractorOptions, PageOptions, WebScraperOptions } from "../../lib/entities";
|
||||
import {
|
||||
Document,
|
||||
ExtractorOptions,
|
||||
PageOptions,
|
||||
WebScraperOptions,
|
||||
} from "../../lib/entities";
|
||||
import { Progress } from "../../lib/entities";
|
||||
import { scrapSingleUrl } from "./single_url";
|
||||
import { SitemapEntry, fetchSitemapData, getLinksFromSitemap } from "./sitemap";
|
||||
@ -6,11 +11,15 @@ import { WebCrawler } from "./crawler";
|
||||
import { getValue, setValue } from "../../services/redis";
|
||||
import { getImageDescription } from "./utils/imageDescription";
|
||||
import { fetchAndProcessPdf } from "./utils/pdfProcessor";
|
||||
import { replaceImgPathsWithAbsolutePaths, replacePathsWithAbsolutePaths } from "./utils/replacePaths";
|
||||
import {
|
||||
replaceImgPathsWithAbsolutePaths,
|
||||
replacePathsWithAbsolutePaths,
|
||||
} from "./utils/replacePaths";
|
||||
import { generateCompletions } from "../../lib/LLM-extraction";
|
||||
|
||||
import { getWebScraperQueue } from "../../../src/services/queue-service";
|
||||
|
||||
export class WebScraperDataProvider {
|
||||
private bullJobId: string;
|
||||
private urls: string[] = [""];
|
||||
private mode: "single_urls" | "sitemap" | "crawl" = "single_urls";
|
||||
private includes: string[];
|
||||
@ -23,7 +32,8 @@ export class WebScraperDataProvider {
|
||||
private pageOptions?: PageOptions;
|
||||
private extractorOptions?: ExtractorOptions;
|
||||
private replaceAllPathsWithAbsolutePaths?: boolean = false;
|
||||
private generateImgAltTextModel: "gpt-4-turbo" | "claude-3-opus" = "gpt-4-turbo";
|
||||
private generateImgAltTextModel: "gpt-4-turbo" | "claude-3-opus" =
|
||||
"gpt-4-turbo";
|
||||
|
||||
authorize(): void {
|
||||
throw new Error("Method not implemented.");
|
||||
@ -53,12 +63,26 @@ export class WebScraperDataProvider {
|
||||
total: totalUrls,
|
||||
status: "SCRAPING",
|
||||
currentDocumentUrl: url,
|
||||
currentDocument: result
|
||||
currentDocument: result,
|
||||
});
|
||||
}
|
||||
|
||||
results[i + index] = result;
|
||||
})
|
||||
);
|
||||
try {
|
||||
if (this.mode === "crawl" && this.bullJobId) {
|
||||
const job = await getWebScraperQueue().getJob(this.bullJobId);
|
||||
const jobStatus = await job.getState();
|
||||
if (jobStatus === "failed") {
|
||||
throw new Error(
|
||||
"Job has failed or has been cancelled by the user. Stopping the job..."
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
}
|
||||
}
|
||||
return results.filter((result) => result !== null) as Document[];
|
||||
}
|
||||
@ -87,7 +111,9 @@ export class WebScraperDataProvider {
|
||||
* @param inProgress inProgress
|
||||
* @returns documents
|
||||
*/
|
||||
private async processDocumentsWithoutCache(inProgress?: (progress: Progress) => void): Promise<Document[]> {
|
||||
private async processDocumentsWithoutCache(
|
||||
inProgress?: (progress: Progress) => void
|
||||
): Promise<Document[]> {
|
||||
switch (this.mode) {
|
||||
case "crawl":
|
||||
return this.handleCrawlMode(inProgress);
|
||||
@ -100,7 +126,9 @@ export class WebScraperDataProvider {
|
||||
}
|
||||
}
|
||||
|
||||
private async handleCrawlMode(inProgress?: (progress: Progress) => void): Promise<Document[]> {
|
||||
private async handleCrawlMode(
|
||||
inProgress?: (progress: Progress) => void
|
||||
): Promise<Document[]> {
|
||||
const crawler = new WebCrawler({
|
||||
initialUrl: this.urls[0],
|
||||
includes: this.includes,
|
||||
@ -118,12 +146,16 @@ export class WebScraperDataProvider {
|
||||
return this.cacheAndFinalizeDocuments(documents, links);
|
||||
}
|
||||
|
||||
private async handleSingleUrlsMode(inProgress?: (progress: Progress) => void): Promise<Document[]> {
|
||||
private async handleSingleUrlsMode(
|
||||
inProgress?: (progress: Progress) => void
|
||||
): Promise<Document[]> {
|
||||
let documents = await this.processLinks(this.urls, inProgress);
|
||||
return documents;
|
||||
}
|
||||
|
||||
private async handleSitemapMode(inProgress?: (progress: Progress) => void): Promise<Document[]> {
|
||||
private async handleSitemapMode(
|
||||
inProgress?: (progress: Progress) => void
|
||||
): Promise<Document[]> {
|
||||
let links = await getLinksFromSitemap(this.urls[0]);
|
||||
if (this.returnOnlyUrls) {
|
||||
return this.returnOnlyUrlsResponse(links, inProgress);
|
||||
@ -133,14 +165,17 @@ export class WebScraperDataProvider {
|
||||
return this.cacheAndFinalizeDocuments(documents, links);
|
||||
}
|
||||
|
||||
private async returnOnlyUrlsResponse(links: string[], inProgress?: (progress: Progress) => void): Promise<Document[]> {
|
||||
private async returnOnlyUrlsResponse(
|
||||
links: string[],
|
||||
inProgress?: (progress: Progress) => void
|
||||
): Promise<Document[]> {
|
||||
inProgress?.({
|
||||
current: links.length,
|
||||
total: links.length,
|
||||
status: "COMPLETED",
|
||||
currentDocumentUrl: this.urls[0],
|
||||
});
|
||||
return links.map(url => ({
|
||||
return links.map((url) => ({
|
||||
content: "",
|
||||
html: this.pageOptions?.includeHtml ? "" : undefined,
|
||||
markdown: "",
|
||||
@ -148,54 +183,73 @@ export class WebScraperDataProvider {
|
||||
}));
|
||||
}
|
||||
|
||||
private async processLinks(links: string[], inProgress?: (progress: Progress) => void): Promise<Document[]> {
|
||||
let pdfLinks = links.filter(link => link.endsWith(".pdf"));
|
||||
private async processLinks(
|
||||
links: string[],
|
||||
inProgress?: (progress: Progress) => void
|
||||
): Promise<Document[]> {
|
||||
let pdfLinks = links.filter((link) => link.endsWith(".pdf"));
|
||||
let pdfDocuments = await this.fetchPdfDocuments(pdfLinks);
|
||||
links = links.filter(link => !link.endsWith(".pdf"));
|
||||
links = links.filter((link) => !link.endsWith(".pdf"));
|
||||
|
||||
let documents = await this.convertUrlsToDocuments(links, inProgress);
|
||||
documents = await this.getSitemapData(this.urls[0], documents);
|
||||
documents = this.applyPathReplacements(documents);
|
||||
documents = await this.applyImgAltText(documents);
|
||||
|
||||
if(this.extractorOptions.mode === "llm-extraction" && this.mode === "single_urls") {
|
||||
documents = await generateCompletions(
|
||||
documents,
|
||||
this.extractorOptions
|
||||
)
|
||||
if (
|
||||
this.extractorOptions.mode === "llm-extraction" &&
|
||||
this.mode === "single_urls"
|
||||
) {
|
||||
documents = await generateCompletions(documents, this.extractorOptions);
|
||||
}
|
||||
return documents.concat(pdfDocuments);
|
||||
}
|
||||
|
||||
private async fetchPdfDocuments(pdfLinks: string[]): Promise<Document[]> {
|
||||
return Promise.all(pdfLinks.map(async pdfLink => {
|
||||
const pdfContent = await fetchAndProcessPdf(pdfLink);
|
||||
return {
|
||||
content: pdfContent,
|
||||
metadata: { sourceURL: pdfLink },
|
||||
provider: "web-scraper"
|
||||
};
|
||||
}));
|
||||
return Promise.all(
|
||||
pdfLinks.map(async (pdfLink) => {
|
||||
const pdfContent = await fetchAndProcessPdf(pdfLink);
|
||||
return {
|
||||
content: pdfContent,
|
||||
metadata: { sourceURL: pdfLink },
|
||||
provider: "web-scraper",
|
||||
};
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
private applyPathReplacements(documents: Document[]): Document[] {
|
||||
return this.replaceAllPathsWithAbsolutePaths ? replacePathsWithAbsolutePaths(documents) : replaceImgPathsWithAbsolutePaths(documents);
|
||||
return this.replaceAllPathsWithAbsolutePaths
|
||||
? replacePathsWithAbsolutePaths(documents)
|
||||
: replaceImgPathsWithAbsolutePaths(documents);
|
||||
}
|
||||
|
||||
private async applyImgAltText(documents: Document[]): Promise<Document[]> {
|
||||
return this.generateImgAltText ? this.generatesImgAltText(documents) : documents;
|
||||
return this.generateImgAltText
|
||||
? this.generatesImgAltText(documents)
|
||||
: documents;
|
||||
}
|
||||
|
||||
private async cacheAndFinalizeDocuments(documents: Document[], links: string[]): Promise<Document[]> {
|
||||
private async cacheAndFinalizeDocuments(
|
||||
documents: Document[],
|
||||
links: string[]
|
||||
): Promise<Document[]> {
|
||||
await this.setCachedDocuments(documents, links);
|
||||
documents = this.removeChildLinks(documents);
|
||||
return documents.splice(0, this.limit);
|
||||
}
|
||||
|
||||
private async processDocumentsWithCache(inProgress?: (progress: Progress) => void): Promise<Document[]> {
|
||||
let documents = await this.getCachedDocuments(this.urls.slice(0, this.limit));
|
||||
private async processDocumentsWithCache(
|
||||
inProgress?: (progress: Progress) => void
|
||||
): Promise<Document[]> {
|
||||
let documents = await this.getCachedDocuments(
|
||||
this.urls.slice(0, this.limit)
|
||||
);
|
||||
if (documents.length < this.limit) {
|
||||
const newDocuments: Document[] = await this.getDocuments(false, inProgress);
|
||||
const newDocuments: Document[] = await this.getDocuments(
|
||||
false,
|
||||
inProgress
|
||||
);
|
||||
documents = this.mergeNewDocuments(documents, newDocuments);
|
||||
}
|
||||
documents = this.filterDocsExcludeInclude(documents);
|
||||
@ -203,9 +257,18 @@ export class WebScraperDataProvider {
|
||||
return documents.splice(0, this.limit);
|
||||
}
|
||||
|
||||
private mergeNewDocuments(existingDocuments: Document[], newDocuments: Document[]): Document[] {
|
||||
newDocuments.forEach(doc => {
|
||||
if (!existingDocuments.some(d => this.normalizeUrl(d.metadata.sourceURL) === this.normalizeUrl(doc.metadata?.sourceURL))) {
|
||||
private mergeNewDocuments(
|
||||
existingDocuments: Document[],
|
||||
newDocuments: Document[]
|
||||
): Document[] {
|
||||
newDocuments.forEach((doc) => {
|
||||
if (
|
||||
!existingDocuments.some(
|
||||
(d) =>
|
||||
this.normalizeUrl(d.metadata.sourceURL) ===
|
||||
this.normalizeUrl(doc.metadata?.sourceURL)
|
||||
)
|
||||
) {
|
||||
existingDocuments.push(doc);
|
||||
}
|
||||
});
|
||||
@ -286,7 +349,7 @@ export class WebScraperDataProvider {
|
||||
documents.push(cachedDocument);
|
||||
|
||||
// get children documents
|
||||
for (const childUrl of (cachedDocument.childrenLinks || [])) {
|
||||
for (const childUrl of cachedDocument.childrenLinks || []) {
|
||||
const normalizedChildUrl = this.normalizeUrl(childUrl);
|
||||
const childCachedDocumentString = await getValue(
|
||||
"web-scraper-cache:" + normalizedChildUrl
|
||||
@ -314,6 +377,7 @@ export class WebScraperDataProvider {
|
||||
throw new Error("Urls are required");
|
||||
}
|
||||
|
||||
this.bullJobId = options.bullJobId;
|
||||
this.urls = options.urls;
|
||||
this.mode = options.mode;
|
||||
this.concurrentRequests = options.concurrentRequests ?? 20;
|
||||
@ -396,8 +460,9 @@ export class WebScraperDataProvider {
|
||||
altText = await getImageDescription(
|
||||
imageUrl,
|
||||
backText,
|
||||
frontText
|
||||
, this.generateImgAltTextModel);
|
||||
frontText,
|
||||
this.generateImgAltTextModel
|
||||
);
|
||||
}
|
||||
|
||||
document.content = document.content.replace(
|
||||
|
17
apps/api/src/services/logging/crawl_log.ts
Normal file
17
apps/api/src/services/logging/crawl_log.ts
Normal file
@ -0,0 +1,17 @@
|
||||
import { supabase_service } from "../supabase";
|
||||
import "dotenv/config";
|
||||
|
||||
export async function logCrawl(job_id: string, team_id: string) {
|
||||
try {
|
||||
const { data, error } = await supabase_service
|
||||
.from("bulljobs_teams")
|
||||
.insert([
|
||||
{
|
||||
job_id: job_id,
|
||||
team_id: team_id,
|
||||
},
|
||||
]);
|
||||
} catch (error) {
|
||||
console.error("Error logging crawl job:\n", error);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user