Quellcode durchsuchen

r2

tags/2.0.0-beta.1
jyong vor 5 Monaten
Ursprung
Commit
b8f3b23b1a
2 geänderte Dateien mit 33 neuen und 25 gelöschten Zeilen
  1. 0
    1
      api/configs/feature/__init__.py
  2. 33
    24
      api/core/app/apps/pipeline/pipeline_generator.py

+ 0
- 1
api/configs/feature/__init__.py Datei anzeigen

@@ -667,7 +667,6 @@ class MailConfig(BaseSettings):
class RagEtlConfig(BaseSettings):
"""
Configuration for RAG ETL processes
"""

# TODO: This config is not only for rag etl, it is also for file upload, we should move it to file upload config
ETL_TYPE: str = Field(

+ 33
- 24
api/core/app/apps/pipeline/pipeline_generator.py Datei anzeigen

@@ -55,7 +55,8 @@ class PipelineGenerator(BaseAppGenerator):
streaming: Literal[True],
call_depth: int,
workflow_thread_pool_id: Optional[str],
) -> Mapping[str, Any] | Generator[Mapping | str, None, None] | None: ...
) -> Mapping[str, Any] | Generator[Mapping | str, None, None] | None:
...

@overload
def generate(
@@ -69,7 +70,8 @@ class PipelineGenerator(BaseAppGenerator):
streaming: Literal[False],
call_depth: int,
workflow_thread_pool_id: Optional[str],
) -> Mapping[str, Any]: ...
) -> Mapping[str, Any]:
...

@overload
def generate(
@@ -83,7 +85,8 @@ class PipelineGenerator(BaseAppGenerator):
streaming: bool,
call_depth: int,
workflow_thread_pool_id: Optional[str],
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: ...
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]:
...

def generate(
self,
@@ -184,7 +187,7 @@ class PipelineGenerator(BaseAppGenerator):
)
if invoke_from == InvokeFrom.DEBUGGER:
return self._generate(
flask_app=current_app._get_current_object(),# type: ignore
flask_app=current_app._get_current_object(), # type: ignore
context=contextvars.copy_context(),
pipeline=pipeline,
workflow_id=workflow.id,
@@ -199,6 +202,7 @@ class PipelineGenerator(BaseAppGenerator):
else:
# run in child thread
context = contextvars.copy_context()

@copy_current_request_context
def worker_with_context():
# Run the worker within the copied context
@@ -222,24 +226,25 @@ class PipelineGenerator(BaseAppGenerator):
worker_thread.start()
# return batch, dataset, documents
return {
"batch": batch,
"dataset": PipelineDataset(
id=dataset.id,
name=dataset.name,
description=dataset.description,
chunk_structure=dataset.chunk_structure,
).model_dump(),
"documents": [PipelineDocument(
id=document.id,
position=document.position,
data_source_info=json.loads(document.data_source_info) if document.data_source_info else None,
name=document.name,
indexing_status=document.indexing_status,
error=document.error,
enabled=document.enabled,
).model_dump() for document in documents
]
}
"batch": batch,
"dataset": PipelineDataset(
id=dataset.id,
name=dataset.name,
description=dataset.description,
chunk_structure=dataset.chunk_structure,
).model_dump(),
"documents": [PipelineDocument(
id=document.id,
position=document.position,
data_source_info=json.loads(document.data_source_info) if document.data_source_info else None,
name=document.name,
indexing_status=document.indexing_status,
error=document.error,
enabled=document.enabled,
).model_dump() for document in documents
]
}

def _generate(
self,
*,
@@ -268,6 +273,7 @@ class PipelineGenerator(BaseAppGenerator):
:param streaming: is stream
:param workflow_thread_pool_id: workflow thread pool id
"""
print("jin ru la 1")
for var, val in context.items():
var.set(val)

@@ -279,6 +285,7 @@ class PipelineGenerator(BaseAppGenerator):
saved_user = g._login_user
with flask_app.app_context():
# Restore user in new app context
print("jin ru la 2")
if saved_user is not None:
from flask import g

@@ -306,6 +313,7 @@ class PipelineGenerator(BaseAppGenerator):
application_generate_entity=application_generate_entity,
workflow_thread_pool_id=workflow_thread_pool_id,
)

# new thread
worker_thread = threading.Thread(
target=worker_with_context
@@ -396,7 +404,7 @@ class PipelineGenerator(BaseAppGenerator):
)

return self._generate(
flask_app=current_app._get_current_object(),# type: ignore
flask_app=current_app._get_current_object(), # type: ignore
pipeline=pipeline,
workflow=workflow,
user=user,
@@ -479,7 +487,7 @@ class PipelineGenerator(BaseAppGenerator):
)

return self._generate(
flask_app=current_app._get_current_object(),# type: ignore
flask_app=current_app._get_current_object(), # type: ignore
pipeline=pipeline,
workflow=workflow,
user=user,
@@ -506,6 +514,7 @@ class PipelineGenerator(BaseAppGenerator):
:param workflow_thread_pool_id: workflow thread pool id
:return:
"""
print("jin ru la 3")
for var, val in context.items():
var.set(val)
from flask import g

Laden…
Abbrechen
Speichern