commit
acec76680a
@ -3,7 +3,7 @@ import { CrawlResult, WebScraperOptions } from "../types";
|
||||
import { WebScraperDataProvider } from "../scraper/WebScraper";
|
||||
import { Progress } from "../lib/entities";
|
||||
import { billTeam } from "../services/billing/credit_billing";
|
||||
|
||||
import { Document } from "../lib/entities";
|
||||
export async function startWebScraperPipeline({
|
||||
job,
|
||||
}: {
|
||||
@ -24,7 +24,7 @@ export async function startWebScraperPipeline({
|
||||
job.moveToFailed(error);
|
||||
},
|
||||
team_id: job.data.team_id,
|
||||
})) as { success: boolean; message: string; docs: CrawlResult[] };
|
||||
})) as { success: boolean; message: string; docs: Document[] };
|
||||
}
|
||||
export async function runWebScraper({
|
||||
url,
|
||||
@ -76,12 +76,12 @@ export async function runWebScraper({
|
||||
|
||||
// remove docs with empty content
|
||||
const filteredDocs = docs.filter((doc) => doc.content.trim().length > 0);
|
||||
onSuccess(filteredDocs);
|
||||
|
||||
const { success, credit_usage } = await billTeam(
|
||||
team_id,
|
||||
filteredDocs.length
|
||||
);
|
||||
|
||||
if (!success) {
|
||||
// throw new Error("Failed to bill team, no subscription was found");
|
||||
return {
|
||||
@ -91,7 +91,11 @@ export async function runWebScraper({
|
||||
};
|
||||
}
|
||||
|
||||
return { success: true, message: "", docs: filteredDocs as CrawlResult[] };
|
||||
// This is where the returnvalue from the job is set
|
||||
onSuccess(filteredDocs);
|
||||
|
||||
// this return doesn't matter too much for the job completion result
|
||||
return { success: true, message: "", docs: filteredDocs };
|
||||
} catch (error) {
|
||||
console.error("Error running web scraper", error);
|
||||
onError(error);
|
||||
|
33
apps/api/src/services/logging/log_job.ts
Normal file
33
apps/api/src/services/logging/log_job.ts
Normal file
@ -0,0 +1,33 @@
|
||||
import { supabase_service } from "../supabase";
|
||||
import { FirecrawlJob } from "../../types";
|
||||
import "dotenv/config";
|
||||
|
||||
export async function logJob(job: FirecrawlJob) {
|
||||
try {
|
||||
// Only log jobs in production
|
||||
if (process.env.ENV !== "production") {
|
||||
return;
|
||||
}
|
||||
const { data, error } = await supabase_service
|
||||
.from("firecrawl_jobs")
|
||||
.insert([
|
||||
{
|
||||
success: job.success,
|
||||
message: job.message,
|
||||
num_docs: job.num_docs,
|
||||
docs: job.docs,
|
||||
time_taken: job.time_taken,
|
||||
team_id: job.team_id,
|
||||
mode: job.mode,
|
||||
url: job.url,
|
||||
crawler_options: job.crawlerOptions,
|
||||
page_options: job.pageOptions,
|
||||
},
|
||||
]);
|
||||
if (error) {
|
||||
console.error("Error logging job:\n", error);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Error logging job:\n", error);
|
||||
}
|
||||
}
|
@ -4,6 +4,7 @@ import "dotenv/config";
|
||||
import { logtail } from "./logtail";
|
||||
import { startWebScraperPipeline } from "../main/runWebScraper";
|
||||
import { callWebhook } from "./webhook";
|
||||
import { logJob } from "./logging/log_job";
|
||||
|
||||
getWebScraperQueue().process(
|
||||
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
|
||||
@ -15,8 +16,11 @@ getWebScraperQueue().process(
|
||||
current_step: "SCRAPING",
|
||||
current_url: "",
|
||||
});
|
||||
const start = Date.now();
|
||||
const { success, message, docs } = await startWebScraperPipeline({ job });
|
||||
|
||||
const end = Date.now();
|
||||
const timeTakenInSeconds = (end - start) / 1000;
|
||||
|
||||
const data = {
|
||||
success: success,
|
||||
result: {
|
||||
@ -29,6 +33,19 @@ getWebScraperQueue().process(
|
||||
};
|
||||
|
||||
await callWebhook(job.data.team_id, data);
|
||||
|
||||
await logJob({
|
||||
success: success,
|
||||
message: message,
|
||||
num_docs: docs.length,
|
||||
docs: docs,
|
||||
time_taken: timeTakenInSeconds,
|
||||
team_id: job.data.team_id,
|
||||
mode: "crawl",
|
||||
url: job.data.url,
|
||||
crawlerOptions: job.data.crawlerOptions,
|
||||
pageOptions: job.data.pageOptions,
|
||||
});
|
||||
done(null, data);
|
||||
} catch (error) {
|
||||
if (error instanceof CustomError) {
|
||||
|
@ -25,6 +25,20 @@ export interface WebScraperOptions {
|
||||
}
|
||||
|
||||
|
||||
export interface FirecrawlJob {
|
||||
success: boolean;
|
||||
message: string;
|
||||
num_docs: number;
|
||||
docs: any[];
|
||||
time_taken: number;
|
||||
team_id: string;
|
||||
mode: string;
|
||||
url: string;
|
||||
crawlerOptions?: any;
|
||||
pageOptions?: any;
|
||||
}
|
||||
|
||||
|
||||
|
||||
export enum RateLimiterMode {
|
||||
Crawl = "crawl",
|
||||
|
Loading…
x
Reference in New Issue
Block a user