diff --git a/apps/api/src/controllers/crawl-status.ts b/apps/api/src/controllers/crawl-status.ts index 3534cd1..feda86c 100644 --- a/apps/api/src/controllers/crawl-status.ts +++ b/apps/api/src/controllers/crawl-status.ts @@ -19,7 +19,7 @@ export async function crawlStatusController(req: Request, res: Response) { return res.status(404).json({ error: "Job not found" }); } - const { current, current_url, total, current_step } = await job.progress(); + const { current, current_url, total, current_step, partialDocs } = await job.progress(); res.json({ status: await job.getState(), // progress: job.progress(), @@ -28,6 +28,7 @@ export async function crawlStatusController(req: Request, res: Response) { current_step: current_step, total: total, data: job.returnvalue, + partial_data: partialDocs ?? [], }); } catch (error) { console.error(error); diff --git a/apps/api/src/controllers/search.ts b/apps/api/src/controllers/search.ts index 5c2cf80..1393922 100644 --- a/apps/api/src/controllers/search.ts +++ b/apps/api/src/controllers/search.ts @@ -54,10 +54,11 @@ export async function searchHelper( // filter out social media links + const a = new WebScraperDataProvider(); await a.setOptions({ mode: "single_urls", - urls: res.map((r) => r.url), + urls: res.map((r) => r.url).slice(0, searchOptions.limit ?? 7), crawlerOptions: { ...crawlerOptions, }, @@ -69,7 +70,7 @@ export async function searchHelper( }, }); - const docs = await a.getDocuments(true); + const docs = await a.getDocuments(false); if (docs.length === 0) { return { success: true, error: "No search results found", returnCode: 200 }; } @@ -147,7 +148,7 @@ export async function searchController(req: Request, res: Response) { logJob({ success: result.success, message: result.error, - num_docs: result.data.length, + num_docs: result.data ? result.data.length : 0, docs: result.data, time_taken: timeTakenInSeconds, team_id: team_id, diff --git a/apps/api/src/controllers/status.ts b/apps/api/src/controllers/status.ts index bd1d2ea..9079787 100644 --- a/apps/api/src/controllers/status.ts +++ b/apps/api/src/controllers/status.ts @@ -8,7 +8,7 @@ export async function crawlJobStatusPreviewController(req: Request, res: Respons return res.status(404).json({ error: "Job not found" }); } - const { current, current_url, total, current_step } = await job.progress(); + const { current, current_url, total, current_step, partialDocs } = await job.progress(); res.json({ status: await job.getState(), // progress: job.progress(), @@ -17,6 +17,7 @@ export async function crawlJobStatusPreviewController(req: Request, res: Respons current_step: current_step, total: total, data: job.returnvalue, + partial_data: partialDocs ?? [], }); } catch (error) { console.error(error); diff --git a/apps/api/src/lib/entities.ts b/apps/api/src/lib/entities.ts index 4008785..5b663f2 100644 --- a/apps/api/src/lib/entities.ts +++ b/apps/api/src/lib/entities.ts @@ -7,6 +7,7 @@ export interface Progress { [key: string]: any; }; currentDocumentUrl?: string; + currentDocument?: Document; } export type PageOptions = { diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 892a2a3..827eec5 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -10,13 +10,15 @@ export async function startWebScraperPipeline({ }: { job: Job; }) { + let partialDocs: Document[] = []; return (await runWebScraper({ url: job.data.url, mode: job.data.mode, crawlerOptions: job.data.crawlerOptions, pageOptions: job.data.pageOptions, inProgress: (progress) => { - job.progress(progress); + partialDocs.push(progress.currentDocument); + job.progress({...progress, partialDocs: partialDocs}); }, onSuccess: (result) => { job.moveToCompleted(result); @@ -69,6 +71,7 @@ export async function runWebScraper({ } const docs = (await provider.getDocuments(false, (progress: Progress) => { inProgress(progress); + })) as Document[]; if (docs.length === 0) { diff --git a/apps/api/src/scraper/WebScraper/index.ts b/apps/api/src/scraper/WebScraper/index.ts index fef5f69..1e28552 100644 --- a/apps/api/src/scraper/WebScraper/index.ts +++ b/apps/api/src/scraper/WebScraper/index.ts @@ -7,7 +7,6 @@ import { getValue, setValue } from "../../services/redis"; import { getImageDescription } from "./utils/imageDescription"; import { fetchAndProcessPdf } from "./utils/pdfProcessor"; import { replaceImgPathsWithAbsolutePaths, replacePathsWithAbsolutePaths } from "./utils/replacePaths"; -import OpenAI from 'openai' import { generateCompletions } from "../../lib/LLM-extraction"; @@ -54,6 +53,7 @@ export class WebScraperDataProvider { total: totalUrls, status: "SCRAPING", currentDocumentUrl: url, + currentDocument: result }); } results[i + index] = result; @@ -67,211 +67,148 @@ export class WebScraperDataProvider { useCaching: boolean = false, inProgress?: (progress: Progress) => void ): Promise { - + this.validateInitialUrl(); + + if (!useCaching) { + return this.processDocumentsWithoutCache(inProgress); + } + + return this.processDocumentsWithCache(inProgress); + } + + private validateInitialUrl(): void { if (this.urls[0].trim() === "") { throw new Error("Url is required"); } + } - if (!useCaching) { - if (this.mode === "crawl") { - const crawler = new WebCrawler({ - initialUrl: this.urls[0], - includes: this.includes, - excludes: this.excludes, - maxCrawledLinks: this.maxCrawledLinks, - limit: this.limit, - generateImgAltText: this.generateImgAltText, - }); - let links = await crawler.start(inProgress, 5, this.limit); - if (this.returnOnlyUrls) { - inProgress({ - current: links.length, - total: links.length, - status: "COMPLETED", - currentDocumentUrl: this.urls[0], - }); - return links.map((url) => ({ - content: "", - markdown: "", - metadata: { sourceURL: url }, - })); - } + /** + * Process documents without cache handling each mode + * @param inProgress inProgress + * @returns documents + */ + private async processDocumentsWithoutCache(inProgress?: (progress: Progress) => void): Promise { + switch (this.mode) { + case "crawl": + return this.handleCrawlMode(inProgress); + case "single_urls": + return this.handleSingleUrlsMode(inProgress); + case "sitemap": + return this.handleSitemapMode(inProgress); + default: + return []; + } + } - let pdfLinks = links.filter((link) => link.endsWith(".pdf")); - let pdfDocuments: Document[] = []; - for (let pdfLink of pdfLinks) { - const pdfContent = await fetchAndProcessPdf(pdfLink); - pdfDocuments.push({ - content: pdfContent, - metadata: { sourceURL: pdfLink }, - provider: "web-scraper" - }); - } - links = links.filter((link) => !link.endsWith(".pdf")); - - let documents = await this.convertUrlsToDocuments(links, inProgress); - documents = await this.getSitemapData(this.urls[0], documents); - - if (this.replaceAllPathsWithAbsolutePaths) { - documents = replacePathsWithAbsolutePaths(documents); - } else { - documents = replaceImgPathsWithAbsolutePaths(documents); - } - - if (this.generateImgAltText) { - documents = await this.generatesImgAltText(documents); - } - documents = documents.concat(pdfDocuments); - - // CACHING DOCUMENTS - // - parent document - const cachedParentDocumentString = await getValue( - "web-scraper-cache:" + this.normalizeUrl(this.urls[0]) - ); - if (cachedParentDocumentString != null) { - let cachedParentDocument = JSON.parse(cachedParentDocumentString); - if ( - !cachedParentDocument.childrenLinks || - cachedParentDocument.childrenLinks.length < links.length - 1 - ) { - cachedParentDocument.childrenLinks = links.filter( - (link) => link !== this.urls[0] - ); - await setValue( - "web-scraper-cache:" + this.normalizeUrl(this.urls[0]), - JSON.stringify(cachedParentDocument), - 60 * 60 * 24 * 10 - ); // 10 days - } - } else { - let parentDocument = documents.filter( - (document) => - this.normalizeUrl(document.metadata.sourceURL) === - this.normalizeUrl(this.urls[0]) - ); - await this.setCachedDocuments(parentDocument, links); - } - - await this.setCachedDocuments( - documents.filter( - (document) => - this.normalizeUrl(document.metadata.sourceURL) !== - this.normalizeUrl(this.urls[0]) - ), - [] - ); - documents = this.removeChildLinks(documents); - documents = documents.splice(0, this.limit); - return documents; - } - - if (this.mode === "single_urls") { - let pdfLinks = this.urls.filter((link) => link.endsWith(".pdf")); - let pdfDocuments: Document[] = []; - for (let pdfLink of pdfLinks) { - const pdfContent = await fetchAndProcessPdf(pdfLink); - pdfDocuments.push({ - content: pdfContent, - metadata: { sourceURL: pdfLink }, - provider: "web-scraper" - }); - } - - let documents = await this.convertUrlsToDocuments( - this.urls.filter((link) => !link.endsWith(".pdf")), - inProgress - ); - - if (this.replaceAllPathsWithAbsolutePaths) { - documents = replacePathsWithAbsolutePaths(documents); - } else { - documents = replaceImgPathsWithAbsolutePaths(documents); - } - - if (this.generateImgAltText) { - documents = await this.generatesImgAltText(documents); - } - const baseUrl = new URL(this.urls[0]).origin; - documents = await this.getSitemapData(baseUrl, documents); - documents = documents.concat(pdfDocuments); - - if(this.extractorOptions.mode === "llm-extraction") { - documents = await generateCompletions( - documents, - this.extractorOptions - ) - } - - await this.setCachedDocuments(documents); - documents = this.removeChildLinks(documents); - documents = documents.splice(0, this.limit); - return documents; - } - if (this.mode === "sitemap") { - let links = await getLinksFromSitemap(this.urls[0]); - let pdfLinks = links.filter((link) => link.endsWith(".pdf")); - let pdfDocuments: Document[] = []; - for (let pdfLink of pdfLinks) { - const pdfContent = await fetchAndProcessPdf(pdfLink); - pdfDocuments.push({ - content: pdfContent, - metadata: { sourceURL: pdfLink }, - provider: "web-scraper" - }); - } - links = links.filter((link) => !link.endsWith(".pdf")); - - let documents = await this.convertUrlsToDocuments( - links.slice(0, this.limit), - inProgress - ); - - documents = await this.getSitemapData(this.urls[0], documents); - - if (this.replaceAllPathsWithAbsolutePaths) { - documents = replacePathsWithAbsolutePaths(documents); - } else { - documents = replaceImgPathsWithAbsolutePaths(documents); - } - - if (this.generateImgAltText) { - documents = await this.generatesImgAltText(documents); - } - documents = documents.concat(pdfDocuments); - - await this.setCachedDocuments(documents); - documents = this.removeChildLinks(documents); - documents = documents.splice(0, this.limit); - return documents; - } - - return []; + private async handleCrawlMode(inProgress?: (progress: Progress) => void): Promise { + const crawler = new WebCrawler({ + initialUrl: this.urls[0], + includes: this.includes, + excludes: this.excludes, + maxCrawledLinks: this.maxCrawledLinks, + limit: this.limit, + generateImgAltText: this.generateImgAltText, + }); + let links = await crawler.start(inProgress, 5, this.limit); + if (this.returnOnlyUrls) { + return this.returnOnlyUrlsResponse(links, inProgress); } - let documents = await this.getCachedDocuments( - this.urls.slice(0, this.limit) - ); + let documents = await this.processLinks(links, inProgress); + return this.cacheAndFinalizeDocuments(documents, links); + } + + private async handleSingleUrlsMode(inProgress?: (progress: Progress) => void): Promise { + let documents = await this.processLinks(this.urls, inProgress); + return documents; + } + + private async handleSitemapMode(inProgress?: (progress: Progress) => void): Promise { + let links = await getLinksFromSitemap(this.urls[0]); + if (this.returnOnlyUrls) { + return this.returnOnlyUrlsResponse(links, inProgress); + } + + let documents = await this.processLinks(links, inProgress); + return this.cacheAndFinalizeDocuments(documents, links); + } + + private async returnOnlyUrlsResponse(links: string[], inProgress?: (progress: Progress) => void): Promise { + inProgress?.({ + current: links.length, + total: links.length, + status: "COMPLETED", + currentDocumentUrl: this.urls[0], + }); + return links.map(url => ({ + content: "", + markdown: "", + metadata: { sourceURL: url }, + })); + } + + private async processLinks(links: string[], inProgress?: (progress: Progress) => void): Promise { + let pdfLinks = links.filter(link => link.endsWith(".pdf")); + let pdfDocuments = await this.fetchPdfDocuments(pdfLinks); + links = links.filter(link => !link.endsWith(".pdf")); + + let documents = await this.convertUrlsToDocuments(links, inProgress); + documents = await this.getSitemapData(this.urls[0], documents); + documents = this.applyPathReplacements(documents); + documents = await this.applyImgAltText(documents); + + if(this.extractorOptions.mode === "llm-extraction" && this.mode === "single_urls") { + documents = await generateCompletions( + documents, + this.extractorOptions + ) + } + return documents.concat(pdfDocuments); + } + + private async fetchPdfDocuments(pdfLinks: string[]): Promise { + return Promise.all(pdfLinks.map(async pdfLink => { + const pdfContent = await fetchAndProcessPdf(pdfLink); + return { + content: pdfContent, + metadata: { sourceURL: pdfLink }, + provider: "web-scraper" + }; + })); + } + + private applyPathReplacements(documents: Document[]): Document[] { + return this.replaceAllPathsWithAbsolutePaths ? replacePathsWithAbsolutePaths(documents) : replaceImgPathsWithAbsolutePaths(documents); + } + + private async applyImgAltText(documents: Document[]): Promise { + return this.generateImgAltText ? this.generatesImgAltText(documents) : documents; + } + + private async cacheAndFinalizeDocuments(documents: Document[], links: string[]): Promise { + await this.setCachedDocuments(documents, links); + documents = this.removeChildLinks(documents); + return documents.splice(0, this.limit); + } + + private async processDocumentsWithCache(inProgress?: (progress: Progress) => void): Promise { + let documents = await this.getCachedDocuments(this.urls.slice(0, this.limit)); if (documents.length < this.limit) { - const newDocuments: Document[] = await this.getDocuments( - false, - inProgress - ); - newDocuments.forEach((doc) => { - if ( - !documents.some( - (d) => - this.normalizeUrl(d.metadata.sourceURL) === - this.normalizeUrl(doc.metadata?.sourceURL) - ) - ) { - documents.push(doc); - } - }); + const newDocuments: Document[] = await this.getDocuments(false, inProgress); + documents = this.mergeNewDocuments(documents, newDocuments); } documents = this.filterDocsExcludeInclude(documents); documents = this.removeChildLinks(documents); - documents = documents.splice(0, this.limit); - return documents; + return documents.splice(0, this.limit); + } + + private mergeNewDocuments(existingDocuments: Document[], newDocuments: Document[]): Document[] { + newDocuments.forEach(doc => { + if (!existingDocuments.some(d => this.normalizeUrl(d.metadata.sourceURL) === this.normalizeUrl(doc.metadata?.sourceURL))) { + existingDocuments.push(doc); + } + }); + return existingDocuments; } private filterDocsExcludeInclude(documents: Document[]): Document[] { @@ -348,7 +285,7 @@ export class WebScraperDataProvider { documents.push(cachedDocument); // get children documents - for (const childUrl of cachedDocument.childrenLinks) { + for (const childUrl of (cachedDocument.childrenLinks || [])) { const normalizedChildUrl = this.normalizeUrl(childUrl); const childCachedDocumentString = await getValue( "web-scraper-cache:" + normalizedChildUrl