Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

website_service.py 12KB

10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
10 месяцев назад
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import datetime
  2. import json
  3. from typing import Any
  4. import requests
  5. from flask_login import current_user # type: ignore
  6. from core.helper import encrypter
  7. from core.rag.extractor.firecrawl.firecrawl_app import FirecrawlApp
  8. from core.rag.extractor.watercrawl.provider import WaterCrawlProvider
  9. from extensions.ext_redis import redis_client
  10. from extensions.ext_storage import storage
  11. from services.auth.api_key_auth_service import ApiKeyAuthService
  12. class WebsiteService:
  13. @classmethod
  14. def document_create_args_validate(cls, args: dict):
  15. if "url" not in args or not args["url"]:
  16. raise ValueError("url is required")
  17. if "options" not in args or not args["options"]:
  18. raise ValueError("options is required")
  19. if "limit" not in args["options"] or not args["options"]["limit"]:
  20. raise ValueError("limit is required")
  21. @classmethod
  22. def crawl_url(cls, args: dict) -> dict:
  23. provider = args.get("provider", "")
  24. url = args.get("url")
  25. options = args.get("options", "")
  26. credentials = ApiKeyAuthService.get_auth_credentials(current_user.current_tenant_id, "website", provider)
  27. if provider == "firecrawl":
  28. # decrypt api_key
  29. api_key = encrypter.decrypt_token(
  30. tenant_id=current_user.current_tenant_id, token=credentials.get("config").get("api_key")
  31. )
  32. firecrawl_app = FirecrawlApp(api_key=api_key, base_url=credentials.get("config").get("base_url", None))
  33. crawl_sub_pages = options.get("crawl_sub_pages", False)
  34. only_main_content = options.get("only_main_content", False)
  35. if not crawl_sub_pages:
  36. params = {
  37. "includePaths": [],
  38. "excludePaths": [],
  39. "limit": 1,
  40. "scrapeOptions": {"onlyMainContent": only_main_content},
  41. }
  42. else:
  43. includes = options.get("includes").split(",") if options.get("includes") else []
  44. excludes = options.get("excludes").split(",") if options.get("excludes") else []
  45. params = {
  46. "includePaths": includes,
  47. "excludePaths": excludes,
  48. "limit": options.get("limit", 1),
  49. "scrapeOptions": {"onlyMainContent": only_main_content},
  50. }
  51. if options.get("max_depth"):
  52. params["maxDepth"] = options.get("max_depth")
  53. job_id = firecrawl_app.crawl_url(url, params)
  54. website_crawl_time_cache_key = f"website_crawl_{job_id}"
  55. time = str(datetime.datetime.now().timestamp())
  56. redis_client.setex(website_crawl_time_cache_key, 3600, time)
  57. return {"status": "active", "job_id": job_id}
  58. elif provider == "watercrawl":
  59. # decrypt api_key
  60. api_key = encrypter.decrypt_token(
  61. tenant_id=current_user.current_tenant_id, token=credentials.get("config").get("api_key")
  62. )
  63. return WaterCrawlProvider(api_key, credentials.get("config").get("base_url", None)).crawl_url(url, options)
  64. elif provider == "jinareader":
  65. api_key = encrypter.decrypt_token(
  66. tenant_id=current_user.current_tenant_id, token=credentials.get("config").get("api_key")
  67. )
  68. crawl_sub_pages = options.get("crawl_sub_pages", False)
  69. if not crawl_sub_pages:
  70. response = requests.get(
  71. f"https://r.jina.ai/{url}",
  72. headers={"Accept": "application/json", "Authorization": f"Bearer {api_key}"},
  73. )
  74. if response.json().get("code") != 200:
  75. raise ValueError("Failed to crawl")
  76. return {"status": "active", "data": response.json().get("data")}
  77. else:
  78. response = requests.post(
  79. "https://adaptivecrawl-kir3wx7b3a-uc.a.run.app",
  80. json={
  81. "url": url,
  82. "maxPages": options.get("limit", 1),
  83. "useSitemap": options.get("use_sitemap", True),
  84. },
  85. headers={
  86. "Content-Type": "application/json",
  87. "Authorization": f"Bearer {api_key}",
  88. },
  89. )
  90. if response.json().get("code") != 200:
  91. raise ValueError("Failed to crawl")
  92. return {"status": "active", "job_id": response.json().get("data", {}).get("taskId")}
  93. else:
  94. raise ValueError("Invalid provider")
  95. @classmethod
  96. def get_crawl_status(cls, job_id: str, provider: str) -> dict:
  97. credentials = ApiKeyAuthService.get_auth_credentials(current_user.current_tenant_id, "website", provider)
  98. if provider == "firecrawl":
  99. # decrypt api_key
  100. api_key = encrypter.decrypt_token(
  101. tenant_id=current_user.current_tenant_id, token=credentials.get("config").get("api_key")
  102. )
  103. firecrawl_app = FirecrawlApp(api_key=api_key, base_url=credentials.get("config").get("base_url", None))
  104. result = firecrawl_app.check_crawl_status(job_id)
  105. crawl_status_data = {
  106. "status": result.get("status", "active"),
  107. "job_id": job_id,
  108. "total": result.get("total", 0),
  109. "current": result.get("current", 0),
  110. "data": result.get("data", []),
  111. }
  112. if crawl_status_data["status"] == "completed":
  113. website_crawl_time_cache_key = f"website_crawl_{job_id}"
  114. start_time = redis_client.get(website_crawl_time_cache_key)
  115. if start_time:
  116. end_time = datetime.datetime.now().timestamp()
  117. time_consuming = abs(end_time - float(start_time))
  118. crawl_status_data["time_consuming"] = f"{time_consuming:.2f}"
  119. redis_client.delete(website_crawl_time_cache_key)
  120. elif provider == "watercrawl":
  121. # decrypt api_key
  122. api_key = encrypter.decrypt_token(
  123. tenant_id=current_user.current_tenant_id, token=credentials.get("config").get("api_key")
  124. )
  125. crawl_status_data = WaterCrawlProvider(
  126. api_key, credentials.get("config").get("base_url", None)
  127. ).get_crawl_status(job_id)
  128. elif provider == "jinareader":
  129. api_key = encrypter.decrypt_token(
  130. tenant_id=current_user.current_tenant_id, token=credentials.get("config").get("api_key")
  131. )
  132. response = requests.post(
  133. "https://adaptivecrawlstatus-kir3wx7b3a-uc.a.run.app",
  134. headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
  135. json={"taskId": job_id},
  136. )
  137. data = response.json().get("data", {})
  138. crawl_status_data = {
  139. "status": data.get("status", "active"),
  140. "job_id": job_id,
  141. "total": len(data.get("urls", [])),
  142. "current": len(data.get("processed", [])) + len(data.get("failed", [])),
  143. "data": [],
  144. "time_consuming": data.get("duration", 0) / 1000,
  145. }
  146. if crawl_status_data["status"] == "completed":
  147. response = requests.post(
  148. "https://adaptivecrawlstatus-kir3wx7b3a-uc.a.run.app",
  149. headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
  150. json={"taskId": job_id, "urls": list(data.get("processed", {}).keys())},
  151. )
  152. data = response.json().get("data", {})
  153. formatted_data = [
  154. {
  155. "title": item.get("data", {}).get("title"),
  156. "source_url": item.get("data", {}).get("url"),
  157. "description": item.get("data", {}).get("description"),
  158. "markdown": item.get("data", {}).get("content"),
  159. }
  160. for item in data.get("processed", {}).values()
  161. ]
  162. crawl_status_data["data"] = formatted_data
  163. else:
  164. raise ValueError("Invalid provider")
  165. return crawl_status_data
  166. @classmethod
  167. def get_crawl_url_data(cls, job_id: str, provider: str, url: str, tenant_id: str) -> dict[Any, Any] | None:
  168. credentials = ApiKeyAuthService.get_auth_credentials(tenant_id, "website", provider)
  169. # decrypt api_key
  170. api_key = encrypter.decrypt_token(tenant_id=tenant_id, token=credentials.get("config").get("api_key"))
  171. # FIXME data is redefine too many times here, use Any to ease the type checking, fix it later
  172. data: Any
  173. if provider == "firecrawl":
  174. file_key = "website_files/" + job_id + ".txt"
  175. if storage.exists(file_key):
  176. d = storage.load_once(file_key)
  177. if d:
  178. data = json.loads(d.decode("utf-8"))
  179. else:
  180. firecrawl_app = FirecrawlApp(api_key=api_key, base_url=credentials.get("config").get("base_url", None))
  181. result = firecrawl_app.check_crawl_status(job_id)
  182. if result.get("status") != "completed":
  183. raise ValueError("Crawl job is not completed")
  184. data = result.get("data")
  185. if data:
  186. for item in data:
  187. if item.get("source_url") == url:
  188. return dict(item)
  189. return None
  190. elif provider == "watercrawl":
  191. api_key = encrypter.decrypt_token(tenant_id=tenant_id, token=credentials.get("config").get("api_key"))
  192. return WaterCrawlProvider(api_key, credentials.get("config").get("base_url", None)).get_crawl_url_data(
  193. job_id, url
  194. )
  195. elif provider == "jinareader":
  196. if not job_id:
  197. response = requests.get(
  198. f"https://r.jina.ai/{url}",
  199. headers={"Accept": "application/json", "Authorization": f"Bearer {api_key}"},
  200. )
  201. if response.json().get("code") != 200:
  202. raise ValueError("Failed to crawl")
  203. return dict(response.json().get("data", {}))
  204. else:
  205. api_key = encrypter.decrypt_token(tenant_id=tenant_id, token=credentials.get("config").get("api_key"))
  206. response = requests.post(
  207. "https://adaptivecrawlstatus-kir3wx7b3a-uc.a.run.app",
  208. headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
  209. json={"taskId": job_id},
  210. )
  211. data = response.json().get("data", {})
  212. if data.get("status") != "completed":
  213. raise ValueError("Crawl job is not completed")
  214. response = requests.post(
  215. "https://adaptivecrawlstatus-kir3wx7b3a-uc.a.run.app",
  216. headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
  217. json={"taskId": job_id, "urls": list(data.get("processed", {}).keys())},
  218. )
  219. data = response.json().get("data", {})
  220. for item in data.get("processed", {}).values():
  221. if item.get("data", {}).get("url") == url:
  222. return dict(item.get("data", {}))
  223. return None
  224. else:
  225. raise ValueError("Invalid provider")
  226. @classmethod
  227. def get_scrape_url_data(cls, provider: str, url: str, tenant_id: str, only_main_content: bool) -> dict:
  228. credentials = ApiKeyAuthService.get_auth_credentials(tenant_id, "website", provider)
  229. if provider == "firecrawl":
  230. # decrypt api_key
  231. api_key = encrypter.decrypt_token(tenant_id=tenant_id, token=credentials.get("config").get("api_key"))
  232. firecrawl_app = FirecrawlApp(api_key=api_key, base_url=credentials.get("config").get("base_url", None))
  233. params = {"onlyMainContent": only_main_content}
  234. result = firecrawl_app.scrape_url(url, params)
  235. return result
  236. elif provider == "watercrawl":
  237. api_key = encrypter.decrypt_token(tenant_id=tenant_id, token=credentials.get("config").get("api_key"))
  238. return WaterCrawlProvider(api_key, credentials.get("config").get("base_url", None)).scrape_url(url)
  239. else:
  240. raise ValueError("Invalid provider")