|
|
|
@@ -1,5 +1,7 @@ |
|
|
|
import re |
|
|
|
from collections.abc import Mapping, Sequence |
|
|
|
from typing import Any, Union |
|
|
|
from urllib.parse import parse_qs, urlparse |
|
|
|
|
|
|
|
import requests |
|
|
|
|
|
|
|
@@ -186,6 +188,30 @@ class MessageFileParser: |
|
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
|
|
|
} |
|
|
|
|
|
|
|
def is_s3_presigned_url(url): |
|
|
|
try: |
|
|
|
parsed_url = urlparse(url) |
|
|
|
if 'amazonaws.com' not in parsed_url.netloc: |
|
|
|
return False |
|
|
|
query_params = parse_qs(parsed_url.query) |
|
|
|
required_params = ['Signature', 'Expires'] |
|
|
|
for param in required_params: |
|
|
|
if param not in query_params: |
|
|
|
return False |
|
|
|
if not query_params['Expires'][0].isdigit(): |
|
|
|
return False |
|
|
|
signature = query_params['Signature'][0] |
|
|
|
if not re.match(r'^[A-Za-z0-9+/]+={0,2}$', signature): |
|
|
|
return False |
|
|
|
return True |
|
|
|
except Exception: |
|
|
|
return False |
|
|
|
|
|
|
|
if is_s3_presigned_url(url): |
|
|
|
response = requests.get(url, headers=headers, allow_redirects=True) |
|
|
|
if response.status_code in {200, 304}: |
|
|
|
return True, "" |
|
|
|
|
|
|
|
response = requests.head(url, headers=headers, allow_redirects=True) |
|
|
|
if response.status_code in {200, 304}: |
|
|
|
return True, "" |