|
|
|
@@ -122,6 +122,79 @@ def github_callback(): |
|
|
|
return redirect("/?auth=%s" % user.get_id())
|
|
|
|
|
|
|
|
|
|
|
|
@manager.route('/feishu_callback', methods=['GET'])
|
|
|
|
def feishu_callback():
|
|
|
|
import requests
|
|
|
|
app_access_token_res = requests.post(FEISHU_OAUTH.get("app_access_token_url"), data=json.dumps({
|
|
|
|
"app_id": FEISHU_OAUTH.get("app_id"),
|
|
|
|
"app_secret": FEISHU_OAUTH.get("app_secret")
|
|
|
|
}), headers={"Content-Type": "application/json; charset=utf-8"})
|
|
|
|
app_access_token_res = app_access_token_res.json()
|
|
|
|
if app_access_token_res['code'] != 0:
|
|
|
|
return redirect("/?error=%s" % app_access_token_res)
|
|
|
|
|
|
|
|
res = requests.post(FEISHU_OAUTH.get("user_access_token_url"), data=json.dumps({
|
|
|
|
"grant_type": FEISHU_OAUTH.get("grant_type"),
|
|
|
|
"code": request.args.get('code')
|
|
|
|
}), headers={"Content-Type": "application/json; charset=utf-8",
|
|
|
|
'Authorization': f"Bearer {app_access_token_res['app_access_token']}"})
|
|
|
|
res = res.json()
|
|
|
|
if res['code'] != 0:
|
|
|
|
return redirect("/?error=%s" % res["message"])
|
|
|
|
|
|
|
|
if "contact:user.email:readonly" not in res["data"]["scope"].split(" "):
|
|
|
|
return redirect("/?error=contact:user.email:readonly not in scope")
|
|
|
|
session["access_token"] = res["data"]["access_token"]
|
|
|
|
session["access_token_from"] = "feishu"
|
|
|
|
userinfo = user_info_from_feishu(session["access_token"])
|
|
|
|
users = UserService.query(email=userinfo["email"])
|
|
|
|
user_id = get_uuid()
|
|
|
|
if not users:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
avatar = download_img(userinfo["avatar_url"])
|
|
|
|
except Exception as e:
|
|
|
|
stat_logger.exception(e)
|
|
|
|
avatar = ""
|
|
|
|
users = user_register(user_id, {
|
|
|
|
"access_token": session["access_token"],
|
|
|
|
"email": userinfo["email"],
|
|
|
|
"avatar": avatar,
|
|
|
|
"nickname": userinfo["en_name"],
|
|
|
|
"login_channel": "feishu",
|
|
|
|
"last_login_time": get_format_time(),
|
|
|
|
"is_superuser": False,
|
|
|
|
})
|
|
|
|
if not users:
|
|
|
|
raise Exception('Register user failure.')
|
|
|
|
if len(users) > 1:
|
|
|
|
raise Exception('Same E-mail exist!')
|
|
|
|
user = users[0]
|
|
|
|
login_user(user)
|
|
|
|
return redirect("/?auth=%s" % user.get_id())
|
|
|
|
except Exception as e:
|
|
|
|
rollback_user_registration(user_id)
|
|
|
|
stat_logger.exception(e)
|
|
|
|
return redirect("/?error=%s" % str(e))
|
|
|
|
user = users[0]
|
|
|
|
user.access_token = get_uuid()
|
|
|
|
login_user(user)
|
|
|
|
user.save()
|
|
|
|
return redirect("/?auth=%s" % user.get_id())
|
|
|
|
|
|
|
|
|
|
|
|
def user_info_from_feishu(access_token):
|
|
|
|
import requests
|
|
|
|
headers = {"Content-Type": "application/json; charset=utf-8",
|
|
|
|
'Authorization': f"Bearer {access_token}"}
|
|
|
|
res = requests.get(
|
|
|
|
f"https://open.feishu.cn/open-apis/authen/v1/user_info",
|
|
|
|
headers=headers)
|
|
|
|
user_info = res.json()["data"]
|
|
|
|
user_info["email"] = None if user_info.get("email") == "" else user_info["email"]
|
|
|
|
return user_info
|
|
|
|
|
|
|
|
|
|
|
|
def user_info_from_github(access_token):
|
|
|
|
import requests
|
|
|
|
headers = {"Accept": "application/json",
|