123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- import json
- import requests
- import profile_pb2
- def post_credentials(session, username, password):
-
-
- try:
- response = session.post(
- url="https://secure.zwift.com/auth/realms/zwift/tokens/access/codes",
- headers={
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate",
- "Connection": "keep-alive",
- "Content-Type": "application/x-www-form-urlencoded",
- "Host": "secure.zwift.com",
- "User-Agent": "Zwift/1.5 (iPhone; iOS 9.0.2; Scale/2.00)",
- "Accept-Language": "en-US;q=1",
- },
- data={
- "client_id": "Zwift_Mobile_Link",
- "username": username,
- "password": password,
- "grant_type": "password",
- },
- allow_redirects=False,
- )
- json_dict = json.loads(response.content)
- return (json_dict["access_token"], json_dict["refresh_token"], json_dict["expires_in"])
- except requests.exceptions.RequestException as e:
- print('HTTP Request failed: %s' % e)
- except KeyError as e:
- print('Invalid uname and/or password')
- def query(session, access_token, route):
- try:
- response = session.get(
- url="https://us-or-rly101.zwift.com/%s" % route,
- headers={
- "Accept-Encoding": "gzip, deflate",
- "Accept": "application/x-protobuf-lite",
- "Connection": "keep-alive",
- "Host": "us-or-rly101.zwift.com",
- "User-Agent": "Zwift/115 CFNetwork/758.0.2 Darwin/15.0.0",
- "Authorization": "Bearer %s" % access_token,
- "Accept-Language": "en-us",
- },
- )
- return response.content
- except requests.exceptions.RequestException as e:
- print('HTTP Request failed: %s' % e)
- def api_login(session, access_token, login_request):
- try:
- response = session.post(
- url="https://us-or-rly101.zwift.com/api/users/login",
- headers={
- "Content-Type": "application/x-protobuf-lite",
- "Accept": "application/x-protobuf-lite",
- "Connection": "keep-alive",
- "Host": "us-or-rly101.zwift.com",
- "User-Agent": "Zwift/115 CFNetwork/758.0.2 Darwin/15.0.0",
- "Authorization": "Bearer %s" % access_token,
- "Accept-Language": "en-us",
- },
- data=login_request.SerializeToString(),
- )
- return response.content
- except requests.exceptions.RequestException as e:
- print('HTTP Request failed: %s' % e)
- def logout(session, refresh_token):
-
-
- try:
- response = session.post(
- url="https://secure.zwift.com/auth/realms/zwift/tokens/logout",
- headers={
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate",
- "Connection": "keep-alive",
- "Content-Type": "application/x-www-form-urlencoded",
- "Host": "secure.zwift.com",
- "User-Agent": "Zwift/1.5 (iPhone; iOS 9.0.2; Scale/2.00)",
- "Accept-Language": "en-US;q=1",
- },
- data={
- "client_id": "Zwift_Mobile_Link",
- "refresh_token": refresh_token,
- },
- )
- except requests.exceptions.RequestException as e:
- print('HTTP Request failed: %s' % e)
- def login(session, user, password):
- access_token, refresh_token, expired_in = post_credentials(session, user, password)
- return access_token, refresh_token
- def create_activity(session, access_token, activity):
- try:
- response = session.post(
- url="https://us-or-rly101.zwift.com/api/profiles/%s/activities" % activity.player_id,
- headers={
- "Content-Type": "application/x-protobuf-lite",
- "Accept": "application/json",
- "Connection": "keep-alive",
- "Host": "us-or-rly101.zwift.com",
- "User-Agent": "Zwift/115 CFNetwork/758.0.2 Darwin/15.0.0",
- "Authorization": "Bearer %s" % access_token,
- "Accept-Language": "en-us",
- },
- data=activity.SerializeToString(),
- )
- json_dict = json.loads(response.content)
- return json_dict["id"]
- except requests.exceptions.RequestException as e:
- print('HTTP Request failed: %s' % e)
- def upload_activity(session, access_token, activity):
- try:
- response = session.put(
- url="https://us-or-rly101.zwift.com/api/profiles/%s/activities/%s" % (activity.player_id, activity.id),
- headers={
- "Content-Type": "application/x-protobuf-lite",
- "Accept": "application/json",
- "Connection": "keep-alive",
- "Host": "us-or-rly101.zwift.com",
- "User-Agent": "Zwift/115 CFNetwork/758.0.2 Darwin/15.0.0",
- "Authorization": "Bearer %s" % access_token,
- "Accept-Language": "en-us",
- },
- data=activity.SerializeToString(),
- params={"upload-to-strava": "true"}
- )
- return response.status_code
- except requests.exceptions.RequestException as e:
- print('HTTP Request failed: %s' % e)
- def get_player_id(session, access_token):
- try:
- response = session.get(
- url="https://us-or-rly101.zwift.com/api/profiles/me",
- headers={
- "Accept-Encoding": "gzip, deflate",
- "Accept": "application/x-protobuf-lite",
- "Connection": "keep-alive",
- "Host": "us-or-rly101.zwift.com",
- "User-Agent": "Zwift/115 CFNetwork/758.0.2 Darwin/15.0.0",
- "Authorization": "Bearer %s" % access_token,
- "Accept-Language": "en-us",
- },
- )
- profile = profile_pb2.PlayerProfile()
- profile.ParseFromString(response.content)
- return profile.id
- except requests.exceptions.RequestException as e:
- print('HTTP Request failed: %s' % e)
|