|
|
|
@@ -705,34 +705,7 @@ async def report_status(): |
|
|
|
finally: |
|
|
|
redis_lock.release() |
|
|
|
await trio.sleep(30) |
|
|
|
|
|
|
|
|
|
|
|
def recover_pending_tasks(): |
|
|
|
redis_lock = RedisDistributedLock("recover_pending_tasks", lock_value=CONSUMER_NAME, timeout=60) |
|
|
|
svr_queue_names = get_svr_queue_names() |
|
|
|
while not stop_event.is_set(): |
|
|
|
try: |
|
|
|
if redis_lock.acquire(): |
|
|
|
for queue_name in svr_queue_names: |
|
|
|
msgs = REDIS_CONN.get_pending_msg(queue=queue_name, group_name=SVR_CONSUMER_GROUP_NAME) |
|
|
|
msgs = [msg for msg in msgs if msg['consumer'] != CONSUMER_NAME] |
|
|
|
if len(msgs) == 0: |
|
|
|
continue |
|
|
|
|
|
|
|
task_executors = REDIS_CONN.smembers("TASKEXE") |
|
|
|
task_executor_set = {t for t in task_executors} |
|
|
|
msgs = [msg for msg in msgs if msg['consumer'] not in task_executor_set] |
|
|
|
for msg in msgs: |
|
|
|
logging.info( |
|
|
|
f"Recover pending task: {msg['message_id']}, consumer: {msg['consumer']}, " |
|
|
|
f"time since delivered: {msg['time_since_delivered'] / 1000} s" |
|
|
|
) |
|
|
|
REDIS_CONN.requeue_msg(queue_name, SVR_CONSUMER_GROUP_NAME, msg['message_id']) |
|
|
|
except Exception: |
|
|
|
logging.warning("recover_pending_tasks got exception") |
|
|
|
finally: |
|
|
|
redis_lock.release() |
|
|
|
stop_event.wait(60) |
|
|
|
|
|
|
|
|
|
|
|
async def task_manager(): |
|
|
|
try: |
|
|
|
@@ -762,8 +735,6 @@ async def main(): |
|
|
|
signal.signal(signal.SIGINT, signal_handler) |
|
|
|
signal.signal(signal.SIGTERM, signal_handler) |
|
|
|
|
|
|
|
threading.Thread(name="RecoverPendingTask", target=recover_pending_tasks).start() |
|
|
|
|
|
|
|
async with trio.open_nursery() as nursery: |
|
|
|
nursery.start_soon(report_status) |
|
|
|
while not stop_event.is_set(): |