Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

web_utils.py 3.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. #
  2. # Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import re
  17. import socket
  18. from urllib.parse import urlparse
  19. import ipaddress
  20. import json
  21. import base64
  22. from selenium import webdriver
  23. from selenium.webdriver.chrome.options import Options
  24. from selenium.webdriver.chrome.service import Service
  25. from selenium.common.exceptions import TimeoutException
  26. from selenium.webdriver.support.ui import WebDriverWait
  27. from selenium.webdriver.support.expected_conditions import staleness_of
  28. from webdriver_manager.chrome import ChromeDriverManager
  29. from selenium.webdriver.common.by import By
  30. def html2pdf(
  31. source: str,
  32. timeout: int = 2,
  33. install_driver: bool = True,
  34. print_options: dict = {},
  35. ):
  36. result = __get_pdf_from_html(source, timeout, install_driver, print_options)
  37. return result
  38. def __send_devtools(driver, cmd, params={}):
  39. resource = "/session/%s/chromium/send_command_and_get_result" % driver.session_id
  40. url = driver.command_executor._url + resource
  41. body = json.dumps({"cmd": cmd, "params": params})
  42. response = driver.command_executor._request("POST", url, body)
  43. if not response:
  44. raise Exception(response.get("value"))
  45. return response.get("value")
  46. def __get_pdf_from_html(
  47. path: str,
  48. timeout: int,
  49. install_driver: bool,
  50. print_options: dict
  51. ):
  52. webdriver_options = Options()
  53. webdriver_prefs = {}
  54. webdriver_options.add_argument("--headless")
  55. webdriver_options.add_argument("--disable-gpu")
  56. webdriver_options.add_argument("--no-sandbox")
  57. webdriver_options.add_argument("--disable-dev-shm-usage")
  58. webdriver_options.experimental_options["prefs"] = webdriver_prefs
  59. webdriver_prefs["profile.default_content_settings"] = {"images": 2}
  60. if install_driver:
  61. service = Service(ChromeDriverManager().install())
  62. driver = webdriver.Chrome(service=service, options=webdriver_options)
  63. else:
  64. driver = webdriver.Chrome(options=webdriver_options)
  65. driver.get(path)
  66. try:
  67. WebDriverWait(driver, timeout).until(
  68. staleness_of(driver.find_element(by=By.TAG_NAME, value="html"))
  69. )
  70. except TimeoutException:
  71. calculated_print_options = {
  72. "landscape": False,
  73. "displayHeaderFooter": False,
  74. "printBackground": True,
  75. "preferCSSPageSize": True,
  76. }
  77. calculated_print_options.update(print_options)
  78. result = __send_devtools(
  79. driver, "Page.printToPDF", calculated_print_options)
  80. driver.quit()
  81. return base64.b64decode(result["data"])
  82. def is_private_ip(ip: str) -> bool:
  83. try:
  84. ip_obj = ipaddress.ip_address(ip)
  85. return ip_obj.is_private
  86. except ValueError:
  87. return False
  88. def is_valid_url(url: str) -> bool:
  89. if not re.match(r"(https?)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]", url):
  90. return False
  91. parsed_url = urlparse(url)
  92. hostname = parsed_url.hostname
  93. if not hostname:
  94. return False
  95. try:
  96. ip = socket.gethostbyname(hostname)
  97. if is_private_ip(ip):
  98. return False
  99. except socket.gaierror:
  100. return False
  101. return True
  102. def safe_json_parse(data: str | dict) -> dict:
  103. if isinstance(data, dict):
  104. return data
  105. try:
  106. return json.loads(data) if data else {}
  107. except (json.JSONDecodeError, TypeError):
  108. return {}