0

Merge pull request #120 from mendableai/nsc/initial-web-refac

Refactor of main web scraper + Partial data streaming
This commit is contained in:
Nicolas 2024-05-04 13:15:25 -07:00 committed by GitHub
commit 3156e0ca15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 146 additions and 202 deletions

View File

@ -19,7 +19,7 @@ export async function crawlStatusController(req: Request, res: Response) {
return res.status(404).json({ error: "Job not found" }); return res.status(404).json({ error: "Job not found" });
} }
const { current, current_url, total, current_step } = await job.progress(); const { current, current_url, total, current_step, partialDocs } = await job.progress();
res.json({ res.json({
status: await job.getState(), status: await job.getState(),
// progress: job.progress(), // progress: job.progress(),
@ -28,6 +28,7 @@ export async function crawlStatusController(req: Request, res: Response) {
current_step: current_step, current_step: current_step,
total: total, total: total,
data: job.returnvalue, data: job.returnvalue,
partial_data: partialDocs ?? [],
}); });
} catch (error) { } catch (error) {
console.error(error); console.error(error);

View File

@ -54,10 +54,11 @@ export async function searchHelper(
// filter out social media links // filter out social media links
const a = new WebScraperDataProvider(); const a = new WebScraperDataProvider();
await a.setOptions({ await a.setOptions({
mode: "single_urls", mode: "single_urls",
urls: res.map((r) => r.url), urls: res.map((r) => r.url).slice(0, searchOptions.limit ?? 7),
crawlerOptions: { crawlerOptions: {
...crawlerOptions, ...crawlerOptions,
}, },
@ -69,7 +70,7 @@ export async function searchHelper(
}, },
}); });
const docs = await a.getDocuments(true); const docs = await a.getDocuments(false);
if (docs.length === 0) { if (docs.length === 0) {
return { success: true, error: "No search results found", returnCode: 200 }; return { success: true, error: "No search results found", returnCode: 200 };
} }
@ -147,7 +148,7 @@ export async function searchController(req: Request, res: Response) {
logJob({ logJob({
success: result.success, success: result.success,
message: result.error, message: result.error,
num_docs: result.data.length, num_docs: result.data ? result.data.length : 0,
docs: result.data, docs: result.data,
time_taken: timeTakenInSeconds, time_taken: timeTakenInSeconds,
team_id: team_id, team_id: team_id,

View File

@ -8,7 +8,7 @@ export async function crawlJobStatusPreviewController(req: Request, res: Respons
return res.status(404).json({ error: "Job not found" }); return res.status(404).json({ error: "Job not found" });
} }
const { current, current_url, total, current_step } = await job.progress(); const { current, current_url, total, current_step, partialDocs } = await job.progress();
res.json({ res.json({
status: await job.getState(), status: await job.getState(),
// progress: job.progress(), // progress: job.progress(),
@ -17,6 +17,7 @@ export async function crawlJobStatusPreviewController(req: Request, res: Respons
current_step: current_step, current_step: current_step,
total: total, total: total,
data: job.returnvalue, data: job.returnvalue,
partial_data: partialDocs ?? [],
}); });
} catch (error) { } catch (error) {
console.error(error); console.error(error);

View File

@ -7,6 +7,7 @@ export interface Progress {
[key: string]: any; [key: string]: any;
}; };
currentDocumentUrl?: string; currentDocumentUrl?: string;
currentDocument?: Document;
} }
export type PageOptions = { export type PageOptions = {

View File

@ -10,13 +10,15 @@ export async function startWebScraperPipeline({
}: { }: {
job: Job<WebScraperOptions>; job: Job<WebScraperOptions>;
}) { }) {
let partialDocs: Document[] = [];
return (await runWebScraper({ return (await runWebScraper({
url: job.data.url, url: job.data.url,
mode: job.data.mode, mode: job.data.mode,
crawlerOptions: job.data.crawlerOptions, crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions, pageOptions: job.data.pageOptions,
inProgress: (progress) => { inProgress: (progress) => {
job.progress(progress); partialDocs.push(progress.currentDocument);
job.progress({...progress, partialDocs: partialDocs});
}, },
onSuccess: (result) => { onSuccess: (result) => {
job.moveToCompleted(result); job.moveToCompleted(result);
@ -69,6 +71,7 @@ export async function runWebScraper({
} }
const docs = (await provider.getDocuments(false, (progress: Progress) => { const docs = (await provider.getDocuments(false, (progress: Progress) => {
inProgress(progress); inProgress(progress);
})) as Document[]; })) as Document[];
if (docs.length === 0) { if (docs.length === 0) {

View File

@ -7,7 +7,6 @@ import { getValue, setValue } from "../../services/redis";
import { getImageDescription } from "./utils/imageDescription"; import { getImageDescription } from "./utils/imageDescription";
import { fetchAndProcessPdf } from "./utils/pdfProcessor"; import { fetchAndProcessPdf } from "./utils/pdfProcessor";
import { replaceImgPathsWithAbsolutePaths, replacePathsWithAbsolutePaths } from "./utils/replacePaths"; import { replaceImgPathsWithAbsolutePaths, replacePathsWithAbsolutePaths } from "./utils/replacePaths";
import OpenAI from 'openai'
import { generateCompletions } from "../../lib/LLM-extraction"; import { generateCompletions } from "../../lib/LLM-extraction";
@ -54,6 +53,7 @@ export class WebScraperDataProvider {
total: totalUrls, total: totalUrls,
status: "SCRAPING", status: "SCRAPING",
currentDocumentUrl: url, currentDocumentUrl: url,
currentDocument: result
}); });
} }
results[i + index] = result; results[i + index] = result;
@ -67,211 +67,148 @@ export class WebScraperDataProvider {
useCaching: boolean = false, useCaching: boolean = false,
inProgress?: (progress: Progress) => void inProgress?: (progress: Progress) => void
): Promise<Document[]> { ): Promise<Document[]> {
this.validateInitialUrl();
if (!useCaching) {
return this.processDocumentsWithoutCache(inProgress);
}
return this.processDocumentsWithCache(inProgress);
}
private validateInitialUrl(): void {
if (this.urls[0].trim() === "") { if (this.urls[0].trim() === "") {
throw new Error("Url is required"); throw new Error("Url is required");
} }
}
if (!useCaching) { /**
if (this.mode === "crawl") { * Process documents without cache handling each mode
const crawler = new WebCrawler({ * @param inProgress inProgress
initialUrl: this.urls[0], * @returns documents
includes: this.includes, */
excludes: this.excludes, private async processDocumentsWithoutCache(inProgress?: (progress: Progress) => void): Promise<Document[]> {
maxCrawledLinks: this.maxCrawledLinks, switch (this.mode) {
limit: this.limit, case "crawl":
generateImgAltText: this.generateImgAltText, return this.handleCrawlMode(inProgress);
}); case "single_urls":
let links = await crawler.start(inProgress, 5, this.limit); return this.handleSingleUrlsMode(inProgress);
if (this.returnOnlyUrls) { case "sitemap":
inProgress({ return this.handleSitemapMode(inProgress);
current: links.length, default:
total: links.length, return [];
status: "COMPLETED", }
currentDocumentUrl: this.urls[0], }
});
return links.map((url) => ({
content: "",
markdown: "",
metadata: { sourceURL: url },
}));
}
let pdfLinks = links.filter((link) => link.endsWith(".pdf")); private async handleCrawlMode(inProgress?: (progress: Progress) => void): Promise<Document[]> {
let pdfDocuments: Document[] = []; const crawler = new WebCrawler({
for (let pdfLink of pdfLinks) { initialUrl: this.urls[0],
const pdfContent = await fetchAndProcessPdf(pdfLink); includes: this.includes,
pdfDocuments.push({ excludes: this.excludes,
content: pdfContent, maxCrawledLinks: this.maxCrawledLinks,
metadata: { sourceURL: pdfLink }, limit: this.limit,
provider: "web-scraper" generateImgAltText: this.generateImgAltText,
}); });
} let links = await crawler.start(inProgress, 5, this.limit);
links = links.filter((link) => !link.endsWith(".pdf")); if (this.returnOnlyUrls) {
return this.returnOnlyUrlsResponse(links, inProgress);
let documents = await this.convertUrlsToDocuments(links, inProgress);
documents = await this.getSitemapData(this.urls[0], documents);
if (this.replaceAllPathsWithAbsolutePaths) {
documents = replacePathsWithAbsolutePaths(documents);
} else {
documents = replaceImgPathsWithAbsolutePaths(documents);
}
if (this.generateImgAltText) {
documents = await this.generatesImgAltText(documents);
}
documents = documents.concat(pdfDocuments);
// CACHING DOCUMENTS
// - parent document
const cachedParentDocumentString = await getValue(
"web-scraper-cache:" + this.normalizeUrl(this.urls[0])
);
if (cachedParentDocumentString != null) {
let cachedParentDocument = JSON.parse(cachedParentDocumentString);
if (
!cachedParentDocument.childrenLinks ||
cachedParentDocument.childrenLinks.length < links.length - 1
) {
cachedParentDocument.childrenLinks = links.filter(
(link) => link !== this.urls[0]
);
await setValue(
"web-scraper-cache:" + this.normalizeUrl(this.urls[0]),
JSON.stringify(cachedParentDocument),
60 * 60 * 24 * 10
); // 10 days
}
} else {
let parentDocument = documents.filter(
(document) =>
this.normalizeUrl(document.metadata.sourceURL) ===
this.normalizeUrl(this.urls[0])
);
await this.setCachedDocuments(parentDocument, links);
}
await this.setCachedDocuments(
documents.filter(
(document) =>
this.normalizeUrl(document.metadata.sourceURL) !==
this.normalizeUrl(this.urls[0])
),
[]
);
documents = this.removeChildLinks(documents);
documents = documents.splice(0, this.limit);
return documents;
}
if (this.mode === "single_urls") {
let pdfLinks = this.urls.filter((link) => link.endsWith(".pdf"));
let pdfDocuments: Document[] = [];
for (let pdfLink of pdfLinks) {
const pdfContent = await fetchAndProcessPdf(pdfLink);
pdfDocuments.push({
content: pdfContent,
metadata: { sourceURL: pdfLink },
provider: "web-scraper"
});
}
let documents = await this.convertUrlsToDocuments(
this.urls.filter((link) => !link.endsWith(".pdf")),
inProgress
);
if (this.replaceAllPathsWithAbsolutePaths) {
documents = replacePathsWithAbsolutePaths(documents);
} else {
documents = replaceImgPathsWithAbsolutePaths(documents);
}
if (this.generateImgAltText) {
documents = await this.generatesImgAltText(documents);
}
const baseUrl = new URL(this.urls[0]).origin;
documents = await this.getSitemapData(baseUrl, documents);
documents = documents.concat(pdfDocuments);
if(this.extractorOptions.mode === "llm-extraction") {
documents = await generateCompletions(
documents,
this.extractorOptions
)
}
await this.setCachedDocuments(documents);
documents = this.removeChildLinks(documents);
documents = documents.splice(0, this.limit);
return documents;
}
if (this.mode === "sitemap") {
let links = await getLinksFromSitemap(this.urls[0]);
let pdfLinks = links.filter((link) => link.endsWith(".pdf"));
let pdfDocuments: Document[] = [];
for (let pdfLink of pdfLinks) {
const pdfContent = await fetchAndProcessPdf(pdfLink);
pdfDocuments.push({
content: pdfContent,
metadata: { sourceURL: pdfLink },
provider: "web-scraper"
});
}
links = links.filter((link) => !link.endsWith(".pdf"));
let documents = await this.convertUrlsToDocuments(
links.slice(0, this.limit),
inProgress
);
documents = await this.getSitemapData(this.urls[0], documents);
if (this.replaceAllPathsWithAbsolutePaths) {
documents = replacePathsWithAbsolutePaths(documents);
} else {
documents = replaceImgPathsWithAbsolutePaths(documents);
}
if (this.generateImgAltText) {
documents = await this.generatesImgAltText(documents);
}
documents = documents.concat(pdfDocuments);
await this.setCachedDocuments(documents);
documents = this.removeChildLinks(documents);
documents = documents.splice(0, this.limit);
return documents;
}
return [];
} }
let documents = await this.getCachedDocuments( let documents = await this.processLinks(links, inProgress);
this.urls.slice(0, this.limit) return this.cacheAndFinalizeDocuments(documents, links);
); }
private async handleSingleUrlsMode(inProgress?: (progress: Progress) => void): Promise<Document[]> {
let documents = await this.processLinks(this.urls, inProgress);
return documents;
}
private async handleSitemapMode(inProgress?: (progress: Progress) => void): Promise<Document[]> {
let links = await getLinksFromSitemap(this.urls[0]);
if (this.returnOnlyUrls) {
return this.returnOnlyUrlsResponse(links, inProgress);
}
let documents = await this.processLinks(links, inProgress);
return this.cacheAndFinalizeDocuments(documents, links);
}
private async returnOnlyUrlsResponse(links: string[], inProgress?: (progress: Progress) => void): Promise<Document[]> {
inProgress?.({
current: links.length,
total: links.length,
status: "COMPLETED",
currentDocumentUrl: this.urls[0],
});
return links.map(url => ({
content: "",
markdown: "",
metadata: { sourceURL: url },
}));
}
private async processLinks(links: string[], inProgress?: (progress: Progress) => void): Promise<Document[]> {
let pdfLinks = links.filter(link => link.endsWith(".pdf"));
let pdfDocuments = await this.fetchPdfDocuments(pdfLinks);
links = links.filter(link => !link.endsWith(".pdf"));
let documents = await this.convertUrlsToDocuments(links, inProgress);
documents = await this.getSitemapData(this.urls[0], documents);
documents = this.applyPathReplacements(documents);
documents = await this.applyImgAltText(documents);
if(this.extractorOptions.mode === "llm-extraction" && this.mode === "single_urls") {
documents = await generateCompletions(
documents,
this.extractorOptions
)
}
return documents.concat(pdfDocuments);
}
private async fetchPdfDocuments(pdfLinks: string[]): Promise<Document[]> {
return Promise.all(pdfLinks.map(async pdfLink => {
const pdfContent = await fetchAndProcessPdf(pdfLink);
return {
content: pdfContent,
metadata: { sourceURL: pdfLink },
provider: "web-scraper"
};
}));
}
private applyPathReplacements(documents: Document[]): Document[] {
return this.replaceAllPathsWithAbsolutePaths ? replacePathsWithAbsolutePaths(documents) : replaceImgPathsWithAbsolutePaths(documents);
}
private async applyImgAltText(documents: Document[]): Promise<Document[]> {
return this.generateImgAltText ? this.generatesImgAltText(documents) : documents;
}
private async cacheAndFinalizeDocuments(documents: Document[], links: string[]): Promise<Document[]> {
await this.setCachedDocuments(documents, links);
documents = this.removeChildLinks(documents);
return documents.splice(0, this.limit);
}
private async processDocumentsWithCache(inProgress?: (progress: Progress) => void): Promise<Document[]> {
let documents = await this.getCachedDocuments(this.urls.slice(0, this.limit));
if (documents.length < this.limit) { if (documents.length < this.limit) {
const newDocuments: Document[] = await this.getDocuments( const newDocuments: Document[] = await this.getDocuments(false, inProgress);
false, documents = this.mergeNewDocuments(documents, newDocuments);
inProgress
);
newDocuments.forEach((doc) => {
if (
!documents.some(
(d) =>
this.normalizeUrl(d.metadata.sourceURL) ===
this.normalizeUrl(doc.metadata?.sourceURL)
)
) {
documents.push(doc);
}
});
} }
documents = this.filterDocsExcludeInclude(documents); documents = this.filterDocsExcludeInclude(documents);
documents = this.removeChildLinks(documents); documents = this.removeChildLinks(documents);
documents = documents.splice(0, this.limit); return documents.splice(0, this.limit);
return documents; }
private mergeNewDocuments(existingDocuments: Document[], newDocuments: Document[]): Document[] {
newDocuments.forEach(doc => {
if (!existingDocuments.some(d => this.normalizeUrl(d.metadata.sourceURL) === this.normalizeUrl(doc.metadata?.sourceURL))) {
existingDocuments.push(doc);
}
});
return existingDocuments;
} }
private filterDocsExcludeInclude(documents: Document[]): Document[] { private filterDocsExcludeInclude(documents: Document[]): Document[] {
@ -348,7 +285,7 @@ export class WebScraperDataProvider {
documents.push(cachedDocument); documents.push(cachedDocument);
// get children documents // get children documents
for (const childUrl of cachedDocument.childrenLinks) { for (const childUrl of (cachedDocument.childrenLinks || [])) {
const normalizedChildUrl = this.normalizeUrl(childUrl); const normalizedChildUrl = this.normalizeUrl(childUrl);
const childCachedDocumentString = await getValue( const childCachedDocumentString = await getValue(
"web-scraper-cache:" + normalizedChildUrl "web-scraper-cache:" + normalizedChildUrl