Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

opendal_conn.py 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import opendal
  2. import logging
  3. import pymysql
  4. from urllib.parse import quote_plus
  5. from api.utils import get_base_config
  6. from rag.utils import singleton
  7. CREATE_TABLE_SQL = """
  8. CREATE TABLE IF NOT EXISTS `{}` (
  9. `key` VARCHAR(255) PRIMARY KEY,
  10. `value` LONGBLOB,
  11. `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  12. `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
  13. );
  14. """
  15. SET_MAX_ALLOWED_PACKET_SQL = """
  16. SET GLOBAL max_allowed_packet={}
  17. """
  18. def get_opendal_config():
  19. try:
  20. opendal_config = get_base_config('opendal', {})
  21. if opendal_config.get("scheme") == 'mysql':
  22. mysql_config = get_base_config('mysql', {})
  23. max_packet = mysql_config.get("max_allowed_packet", 134217728)
  24. kwargs = {
  25. "scheme": "mysql",
  26. "host": mysql_config.get("host", "127.0.0.1"),
  27. "port": str(mysql_config.get("port", 3306)),
  28. "user": mysql_config.get("user", "root"),
  29. "password": mysql_config.get("password", ""),
  30. "database": mysql_config.get("name", "test_open_dal"),
  31. "table": opendal_config.get("config").get("oss_table", "opendal_storage"),
  32. "max_allowed_packet": str(max_packet)
  33. }
  34. kwargs["connection_string"] = f"mysql://{kwargs['user']}:{quote_plus(kwargs['password'])}@{kwargs['host']}:{kwargs['port']}/{kwargs['database']}?max_allowed_packet={max_packet}"
  35. else:
  36. scheme = opendal_config.get("scheme")
  37. config_data = opendal_config.get("config", {})
  38. kwargs = {"scheme": scheme, **config_data}
  39. logging.info("Loaded OpenDAL configuration from yaml: %s", kwargs)
  40. return kwargs
  41. except Exception as e:
  42. logging.error("Failed to load OpenDAL configuration from yaml: %s", str(e))
  43. raise
  44. @singleton
  45. class OpenDALStorage:
  46. def __init__(self):
  47. self._kwargs = get_opendal_config()
  48. self._scheme = self._kwargs.get('scheme', 'mysql')
  49. if self._scheme == 'mysql':
  50. self.init_db_config()
  51. self.init_opendal_mysql_table()
  52. self._operator = opendal.Operator(**self._kwargs)
  53. logging.info("OpenDALStorage initialized successfully")
  54. def health(self):
  55. bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
  56. r = self._operator.write(f"{bucket}/{fnm}", binary)
  57. return r
  58. def put(self, bucket, fnm, binary):
  59. self._operator.write(f"{bucket}/{fnm}", binary)
  60. def get(self, bucket, fnm):
  61. return self._operator.read(f"{bucket}/{fnm}")
  62. def rm(self, bucket, fnm):
  63. self._operator.delete(f"{bucket}/{fnm}")
  64. self._operator.__init__()
  65. def scan(self, bucket, fnm):
  66. return self._operator.scan(f"{bucket}/{fnm}")
  67. def obj_exist(self, bucket, fnm):
  68. return self._operator.exists(f"{bucket}/{fnm}")
  69. def init_db_config(self):
  70. try:
  71. conn = pymysql.connect(
  72. host=self._kwargs['host'],
  73. port=int(self._kwargs['port']),
  74. user=self._kwargs['user'],
  75. password=self._kwargs['password'],
  76. database=self._kwargs['database']
  77. )
  78. cursor = conn.cursor()
  79. max_packet = self._kwargs.get('max_allowed_packet', 4194304) # Default to 4MB if not specified
  80. cursor.execute(SET_MAX_ALLOWED_PACKET_SQL.format(max_packet))
  81. conn.commit()
  82. cursor.close()
  83. conn.close()
  84. logging.info(f"Database configuration initialized with max_allowed_packet={max_packet}")
  85. except Exception as e:
  86. logging.error(f"Failed to initialize database configuration: {str(e)}")
  87. raise
  88. def init_opendal_mysql_table(self):
  89. conn = pymysql.connect(
  90. host=self._kwargs['host'],
  91. port=int(self._kwargs['port']),
  92. user=self._kwargs['user'],
  93. password=self._kwargs['password'],
  94. database=self._kwargs['database']
  95. )
  96. cursor = conn.cursor()
  97. cursor.execute(CREATE_TABLE_SQL.format(self._kwargs['table']))
  98. conn.commit()
  99. cursor.close()
  100. conn.close()
  101. logging.info(f"Table `{self._kwargs['table']}` initialized.")