import os
import re
import pytz
import asyncio
import json
import csv
import base64
import datetime as dt
from pytz import UTC
from datetime import datetime, timedelta
from collections import defaultdict
from bson.objectid import ObjectId
from django.shortcuts import render
from django.views import generic
from django.http import HttpResponse
from django.conf import settings
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from therapist_mgr_app.utils import *
from clinic_mgr_app.utils.notification_city_list import notification_city_list_reversed
from therapist_mgr_app.views.view_commons import get_eligible_records, generate_therapist_auth_token
from therapist_mgr_app.forms import TherapyBookingForm
from therapist_mgr_app.views.therapist.insert_therapy_booking import THERAPY_BOOKING_DEFAULT_DICT
from geopy.geocoders import Nominatim
IST = pytz.timezone('Asia/Kolkata')
USER_ID_DATE_IDX = 'user_id_date_idx'
email = 'raghavan@ayursh.com'
CREDS = settings.GS_CREDENTIALS
CREDS = CREDS.with_scopes(['https://www.googleapis.com/auth/gmail.readonly'])
CREDS = CREDS.with_subject(email)
service = build("gmail", "v1", credentials=CREDS)
SKIP_NUMBERS = ['+919916788332', '+919632608009']
def convert_sec_to_time(seconds):
min, sec = divmod(seconds, 60)
hour, min = divmod(min, 60)
return '%02d:%02d' % (min, sec)
async def get_knowlarity_users(calendar_date, user_profiles_data, request, calendar_end_date=None):
all_call_logs = {}
merged_profiles = {}
if calendar_end_date:
url = (
f"{knowlarity_user_login_host}"
f"?calendar_date={calendar_date}"
f"&calendar_end_date={calendar_end_date}"
)
else:
url = f"{knowlarity_user_login_host}?calendar_date={calendar_date}"
knowlarity_response = await create_async_tasks(None, url, [''])
_, _, response = knowlarity_response[0]
data = json.loads(response)['data']
profiles = data['knowlarity_user_profiles']
call_logs = data['call_logs']
for phone, logs in call_logs.items():
normalized = phone if phone.startswith('+') else '+91' + phone
if normalized not in all_call_logs:
all_call_logs[normalized] = []
all_call_logs[normalized].extend(logs)
merged_profiles.update(profiles)
knowlarity_user_profiles_dict = {}
for phone_number, profile in merged_profiles.items():
profile['last_login_on'] = UTC.localize(
dt.datetime.fromtimestamp(profile['last_login_on'])
).astimezone(IST)
if phone_number not in SKIP_NUMBERS:
knowlarity_user_profiles_dict[phone_number] = profile
call_logs_dict = all_call_logs
is_list_input = isinstance(user_profiles_data, list)
if is_list_input:
for user_profile in user_profiles_data:
phone_number = user_profile.get('phone_number')
if phone_number:
formatted_phone = phone_number if phone_number.startswith('+') else '+91' + phone_number
date_str = user_profile.get('date')
if formatted_phone in call_logs_dict:
profile_update = knowlarity_user_profiles_dict.get(formatted_phone, {}).copy()
profile_update.pop('is_web_registration', None)
profile_update.pop('last_login_on', None)
profile_update.pop('last_login_on_time', None)
if user_profile.get('name'):
profile_update.pop('name', None)
user_profile.update(profile_update)
all_logs = call_logs_dict.get(formatted_phone, [])
todays_call_logs = []
day_call_times = []
for call in all_logs:
call_ts = call.get('call_start_time')
if call_ts:
call_dt = dt.datetime.fromtimestamp(call_ts, tz=IST)
if call_dt.strftime('%Y-%m-%d') == date_str:
todays_call_logs.append(call)
day_call_times.append(call_dt)
user_profile['call_logs'] = todays_call_logs
if day_call_times:
latest_call = max(day_call_times)
last_activity_time = user_profile.get('last_activity_time')
if not last_activity_time or latest_call > last_activity_time:
user_profile['last_login_on'] = latest_call
user_profile['last_login_on_time'] = latest_call.strftime('%H:%M')
# ✅ fallback — only sets if not already set above
if not user_profile.get('last_login_on_time') and user_profile.get('last_login_on'):
user_profile['last_login_on_time'] = user_profile['last_login_on'].strftime('%H:%M')
if user_profile.get('is_web_registration') and not user_profile.get('login_method'):
user_profile['login_method'] = 'web'
else:
for user_id, user_profile in user_profiles_data.items():
phone_number = user_profile.get('phone_number')
if phone_number:
formatted_phone = phone_number if phone_number.startswith('+') else '+91' + phone_number
date_str = user_profile.get('date') or calendar_date
if formatted_phone in call_logs_dict:
profile_update = knowlarity_user_profiles_dict.get(formatted_phone, {}).copy()
profile_update.pop('is_web_registration', None)
profile_update.pop('last_login_on', None)
profile_update.pop('last_login_on_time', None)
if user_profile.get('name'):
profile_update.pop('name', None)
user_profile.update(profile_update)
all_logs = call_logs_dict.get(formatted_phone, [])
todays_call_logs = []
day_call_times = []
for call in all_logs:
call_ts = call.get('call_start_time')
if call_ts:
call_dt = dt.datetime.fromtimestamp(call_ts, tz=IST)
if call_dt.strftime('%Y-%m-%d') == date_str:
todays_call_logs.append(call)
day_call_times.append(call_dt)
user_profile['call_logs'] = todays_call_logs
if day_call_times:
latest_call = max(day_call_times)
last_activity_time = user_profile.get('last_activity_time')
if not last_activity_time or latest_call > last_activity_time:
user_profile['last_login_on'] = latest_call
user_profile['last_login_on_time'] = latest_call.strftime('%H:%M')
# ✅ fallback — only sets if not already set above
if not user_profile.get('last_login_on_time') and user_profile.get('last_login_on'):
user_profile['last_login_on_time'] = user_profile['last_login_on'].strftime('%H:%M')
if user_profile.get('is_web_registration') and not user_profile.get('login_method'):
user_profile['login_method'] = 'web'
if is_list_input:
existing_phones = {p.get('phone_number') for p in user_profiles_data}
else:
existing_phones = {p.get('phone_number') for p in user_profiles_data.values()}
for phone, profile in knowlarity_user_profiles_dict.items():
if phone not in existing_phones:
date_str = calendar_date
all_logs = call_logs_dict.get(phone, [])
todays_call_logs = []
day_call_times = []
for call in all_logs:
call_ts = call.get('call_start_time')
if call_ts:
call_dt = dt.datetime.fromtimestamp(call_ts, tz=IST)
if call_dt.strftime('%Y-%m-%d') == date_str:
todays_call_logs.append(call)
day_call_times.append(call_dt)
if todays_call_logs:
profile['call_logs'] = todays_call_logs
if day_call_times:
latest_call = max(day_call_times)
profile['last_login_on'] = latest_call
profile['last_login_on_time'] = latest_call.strftime('%H:%M')
if is_list_input:
user_profiles_data.append(profile)
else:
user_profiles_data[phone] = profile
app_users = len(user_profiles_data)
knowlarity_users = len(knowlarity_user_profiles_dict)
total_users = len(user_profiles_data)
return app_users, knowlarity_users, total_users
async def get_message_status_update(request_id):
freshworks_api_header = {'Authorization': 'Bearer ' + settings.FRESHWORKS_AUTH_TOKEN}
get_url = 'https://api.in.freshchat.com/v2/outbound-messages?request_id=' + request_id
response = await create_async_tasks(
None,
get_url,
[''],
headers=freshworks_api_header
)
_, _, response = response[0]
response_json = json.loads(response)
outbound_messages = response_json['outbound_messages']
message_created_on = outbound_messages[0]['created_on']
message_status = outbound_messages[0]['status']
return message_status, message_created_on
def fetch_clinic_data(clinics_list):
clinic_profiles_dict = dict()
clinic_list_obj = [ObjectId(clinic) for clinic in clinics_list]
for clinic in clinic_profile_collection.find(
{'_id': {"$in": clinic_list_obj}},
{'clinic_name': 1}
):
clinic_profiles_dict[str(clinic['_id'])] = clinic['clinic_name']
for clinic in google_clinics_collection.find(
{'_id': {"$in": clinic_list_obj}},
{'title': 1}
):
clinic_profiles_dict[str(clinic['_id'])] = clinic['title']
return clinic_profiles_dict
async def get_clinic_notifications(calendar_date, user_profiles_dict, app_user=None):
clinics_list = list()
user_clinic_list_dict = defaultdict(list)
for notification in clinic_notification_collection.find(
{
'date': calendar_date,
'user_id': {"$in": list(user_profiles_dict.keys())}
},
{
'clinic_id': 1, 'whatsapp_request_id': 1, 'whatsapp_request_status': 1, 'user_id': 1
}
):
clinics_list.append(notification['clinic_id'])
message_status = notification['whatsapp_request_status']
if message_status in ['IN_PROGRESS', 'ACCEPTED', 'DELIVERED']:
message_status, message_created_on = await get_message_status_update(notification['whatsapp_request_id'])
update_response = clinic_notification_collection.find_one_and_update(
{
'_id': ObjectId(notification['_id'])
},
{
'$set': {
'whatsapp_request_status': message_status,
}
},
# hint=THERAPY_BOOKINGS_SESSIONS_ID_IDX
)
user_clinic_list_dict[notification['user_id']].append(
{
'clinic_id': notification['clinic_id'],
'message_status': message_status
}
)
clinic_profiles_dict = fetch_clinic_data(clinics_list)
for user_id, clinic_dict_list in user_clinic_list_dict.items():
for clinic_dict in clinic_dict_list:
clinic_id = clinic_dict['clinic_id']
clinic_name = clinic_profiles_dict[clinic_id]
clinic_dict.update({
'clinic_name': clinic_name
})
user_profiles_dict[user_id].update(
{'clinic_notifications': clinic_dict_list}
)
def read_cities_list():
print(os.getcwd())
geolocator = Nominatim(user_agent="MyApp")
with open('therapist_mgr_app/utils_dir/cities_list.csv', 'r') as f:
csv_reader = csv.reader(f, delimiter='\t')
for line in csv_reader:
location = geolocator.geocode(line[1] + ", " + line[4])
if location:
print(("('" + str(int(line[0]) - 1)) + "',",
"('" + line[1] + ', ' + line[4] + "', " +
"[" + str(location.latitude) + ", " + str(location.longitude) + "])),")
def get_users_by_date(calendar_date, eligible_records=None, user=None,
user_phone_numbers=None, user_profiles_dict=None):
date = datetime.strptime(calendar_date, '%Y-%m-%d')
if not date:
date = datetime.today()
if not user_phone_numbers:
start_date = dt.datetime.combine(date, dt.time(0, 0, 0)).astimezone(pytz.utc)
end_date = dt.datetime.combine(date, dt.time(23, 59, 59)).astimezone(pytz.utc)
find_query = {
"$or": [
{
'registered_on': {"$gte": start_date,"$lte": end_date}
},
{
'last_login_on': {"$gte": start_date,"$lte": end_date}
}
]
}
if user and user.clinic_id:
clinic_id = user.clinic_id
# find_query.update({
# 'therapies.clinic_id': clinic_id
# })
else:
clinic_id = None
if not user.is_staff:
return 'Invalid User'
else:
find_query = {
'phone_number': {"$in": user_phone_numbers}
}
if not user_profiles_dict:
user_profiles_dict = dict()
for user_profile in user_profile_collection.find(
find_query,
{
'pin': 0, 'referral_code': 0, 'fcm_token': 0
}
):
if user_profile['phone_number'] not in SKIP_NUMBERS:
if user_profile.get('registered_on'):
x = UTC.localize(user_profile['registered_on']).astimezone(IST)
user_profile['registered_on'] = x
if user_profile.get('last_login_on'):
x = UTC.localize(user_profile['last_login_on']).astimezone(IST)
user_profile['last_login_on'] = x
else:
user_profile['last_login_on'] = user_profile['registered_on']
user_profiles_dict[str(user_profile['_id'])] = user_profile
refetch = False
for user_activity in user_activity_collection.find(
{
"date": calendar_date
},
hint=USER_ID_DATE_IDX
):
is_closed = user_activity.get('is_closed')
closed_at = user_activity.get('closed_at')
if closed_at:
x = UTC.localize(closed_at).astimezone(IST)
closed_at = x
notes = user_activity.get('notes')
try:
coordinates = user_activity.get('coordinates')
if coordinates and isinstance(coordinates, dict):
coordinates = coordinates['coordinates']
user_profiles_dict[user_activity['user_id']].update({
'activities': user_activity.get('activities'),
'is_closed': is_closed,
'closed_at': closed_at
})
if coordinates and coordinates[0] and coordinates[1]:
user_profiles_dict[user_activity['user_id']].update({
'coordinates': coordinates,
})
if notes and notes != '':
user_profiles_dict[user_activity['user_id']].update({
'notes': notes
})
except KeyError:
refetch = True
user_profiles_dict[user_activity['user_id']] = {
'coordinates': coordinates,
'activities': user_activity.get('activities'),
'is_closed': is_closed,
'closed_at': closed_at
}
if notes and notes != '':
user_profiles_dict[user_activity['user_id']].update({
'notes': notes
})
if is_closed and is_closed != '' and is_closed == 'yes':
user_profiles_dict[user_activity['user_id']]['log_status'] = \
'bg-success h-5 border border-success'
if refetch:
for user_profile in user_profile_collection.find(
{
"_id": {
"$in": list(
map(lambda y: ObjectId(y), user_profiles_dict.keys())
)
}
},
):
if user_profile['phone_number'] not in SKIP_NUMBERS:
if user_profile.get('registered_on'):
x = UTC.localize(user_profile['registered_on']).astimezone(IST)
user_profile['registered_on'] = x
if user_profile.get('last_login_on'):
x = UTC.localize(user_profile['last_login_on']).astimezone(IST)
user_profile['last_login_on'] = x
else:
user_profile['last_login_on'] = user_profile['registered_on']
user_profiles_dict[str(user_profile['_id'])].update(user_profile)
else:
user_profiles_dict.pop(str(user_profile['_id']))
for user_id, user_profile in user_profiles_dict.items():
if user_profile.get('_id'):
user_profile['id'] = str(user_profile['_id'])
'''This is to make last login on as the last activity user has done in our systems.'''
activities = user_profile.get('activities', [])
closed_at = user_profile.get('closed_at')
user_profile['last_activity_time'] = None
if activities:
for activity in user_profile.get('activities', []):
activity['visit_time'] = UTC.localize(activity['visit_time']).astimezone(IST)
if activity['visit_time'] > user_profile['last_login_on']:
user_profile['last_login_on'] = activity['visit_time']
user_profile['last_activity_time'] = user_profile['last_login_on']
if closed_at:
if user_profile['last_login_on'] > closed_at:
user_profiles_dict[user_id]['log_status'] = ""
user_profiles_dict[user_id]['is_closed'] = "no"
return user_profiles_dict
def get_all_entries(phone_number=None, name=None):
if not phone_number and not name:
return []
entries = []
if phone_number:
formatted_phone = phone_number if phone_number.startswith('+') else '+91' + phone_number
query = {
"$or": [
{"phone_number": formatted_phone},
{"patient_phone_number": formatted_phone}
]
}
base_user_profiles = user_profile_collection.find_one(
query,
{"pin": 0, "referral_code": 0, "fcm_token": 0}
)
base_user_profiles = [base_user_profiles] if base_user_profiles else []
else:
if len(name) >= 4:
pattern = re.escape(name[:-1]) + ".*" + re.escape(name[-1])
else:
pattern = "^" + re.escape(name)
base_user_profiles = list(
user_profile_collection.find(
{
"$or": [
{"offline_name": {"$regex": pattern, "$options": "i"}},
{"name": {"$regex": pattern, "$options": "i"}}
]
},
{"pin": 0, "referral_code": 0, "fcm_token": 0}
).sort("last_login_on", -1).limit(50)
)
if not base_user_profiles:
return []
# ✅ step 1 — collect all user_ids and build lookup dict
user_ids = [str(p['_id']) for p in base_user_profiles]
profiles_by_user_id = {str(p['_id']): p for p in base_user_profiles}
# ✅ step 2 — single DB hit for all daily records
all_daily_records = list(
user_activity_collection.find(
{"user_id": {"$in": user_ids}}
).sort("date", -1)
)
# ✅ step 3 — group daily records by user_id
records_by_user_id = {}
for daily_record in all_daily_records:
uid = daily_record['user_id']
if uid not in records_by_user_id:
records_by_user_id[uid] = []
records_by_user_id[uid].append(daily_record)
# ✅ step 4 — process each profile with its records
for user_id, base_user_profile in profiles_by_user_id.items():
daily_records = records_by_user_id.get(user_id, [])
for daily_record in daily_records:
user_profile = base_user_profile.copy()
if user_profile.get('registered_on'):
user_profile['registered_on'] = UTC.localize(
user_profile['registered_on']
).astimezone(IST)
if user_profile.get('last_login_on'):
user_profile['last_login_on'] = UTC.localize(
user_profile['last_login_on']
).astimezone(IST)
user_profile['last_login_on_time'] = user_profile['last_login_on'].strftime('%H:%M')
else:
user_profile['last_login_on'] = user_profile.get('registered_on')
if user_profile['last_login_on']:
user_profile['last_login_on_time'] = user_profile['last_login_on'].strftime('%H:%M')
user_profile['id'] = str(daily_record['_id'])
user_profile['user_id'] = user_id
activities = daily_record.get('activities', [])
last_activity_time = None
for activity in activities:
vt = activity.get('visit_time')
if vt:
vt_ist = vt.astimezone(IST)
activity['visit_time'] = vt_ist
if not last_activity_time or vt_ist > last_activity_time:
last_activity_time = vt_ist
if last_activity_time:
user_profile['last_login_on'] = last_activity_time
user_profile['last_login_on_time'] = last_activity_time.strftime('%H:%M')
user_profile['activities'] = activities
user_profile['last_activity_time'] = last_activity_time
user_profile['is_closed'] = daily_record.get('is_closed', 'no')
user_profile['coordinates'] = daily_record.get('coordinates')
user_profile['geo_coordinates'] = daily_record.get('geo_coordinates')
user_profile['date'] = daily_record.get('date')
entries.append(user_profile)
def sort_key(x):
date_dt = datetime.strptime(x['date'], '%Y-%m-%d') if x.get('date') else datetime.min
time_dt = x.get('closed_at') if x.get('closed_at') else datetime.min
return (date_dt, time_dt)
entries.sort(key=sort_key, reverse=True)
return entries
class ClinicUser(AyurshUserMixin, generic.View):
login_url = "accounts:signin"
def filter_clinic_users(self, user_profiles_dict):
updated_user_profiles_dict = dict()
if self.request.user and self.request.user.clinic_id:
clinic_id = self.request.user.clinic_id
for user_id, user_profile in user_profiles_dict.items():
for notification in user_profile.get('clinic_notifications', []):
if notification['clinic_id'] == clinic_id:
updated_user_profiles_dict[user_id] = user_profiles_dict[user_id]
else:
if not self.request.user.is_staff:
return 'Invalid User'
'''Overwriting user_profiles_dict with filtered result'''
user_profiles_dict = updated_user_profiles_dict
return user_profiles_dict
async def check_clinic_doctor_therapist(self, user_profiles_dict):
numbers_user_id_dict = {v['phone_number']: k for k, v in user_profiles_dict.items()
if v.get('phone_number')}
numbers = list(numbers_user_id_dict.keys())
for therapist in therapist_profile_collection.find(
{'phone_number': {"$in": numbers}},
{'phone_number': 1}
):
user_id = numbers_user_id_dict[therapist['phone_number']]
user_profiles_dict[user_id].update({
'non_notification_reason': 'THERAPIST'
})
for clinic in google_clinics_collection.find(
{'phone_number': {"$in": numbers}},
{'phone_number': 1}
):
user_id = numbers_user_id_dict[clinic['phone_number']]
user_profiles_dict[user_id].update({
'non_notification_reason': 'CLINIC'
})
await asyncio.sleep(0.001)
def get(self, request, *args, **kwargs):
result_dict = dict()
query_date = request.GET.get('dateFilter')
query_filter = request.GET.get('phoneNumberFilter')
name_filter = request.GET.get('nameFilter')
if query_filter and not query_filter.startswith('+91'):
query_filter = '+91' + query_filter
therapist_query_filter = request.GET.get('therapistphoneNumberFilter')
query_therapy_booking_session_id = request.GET.get('therapyBookingSessionId')
stats = request.GET.get('stats') == 'true'
form = TherapyBookingForm(initial=THERAPY_BOOKING_DEFAULT_DICT)
eligible_records, dept_record = get_eligible_records(request.user.get_username())
unpack_error = None
therapy_bookings_list = list()
if query_date:
date_filter = query_date
else:
date_filter = datetime.today().strftime('%Y-%m-%d')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
user_profiles_list = []
show_date = False
if query_filter:
user_profiles_list = get_all_entries(phone_number=query_filter)
show_date = True
elif name_filter:
if len(name_filter.strip()) < 3:
user_profiles_list = []
show_date = True
else:
user_profiles_list = get_all_entries(name=name_filter)
show_date = True
if user_profiles_list:
dates = set()
for user in user_profiles_list:
user_date = user.get('date') or user.get('created_at')
if user_date:
if isinstance(user_date, dt.datetime):
user_date = user_date.strftime('%Y-%m-%d')
dates.add(user_date)
dates = list(dates)
start_date = min(dates) if dates else None
end_date = max(dates) if dates else None
app_users, knowlarity_users, total_users = loop.run_until_complete(
get_knowlarity_users(
start_date,
user_profiles_list,
request,
end_date
)
)
else:
user_profiles_dict = get_users_by_date(date_filter, eligible_records, user=request.user)
x = loop.run_until_complete(
asyncio.gather(
*[
get_knowlarity_users(date_filter, user_profiles_dict, request),
# get_clinic_notifications(date_filter, user_profiles_dict, app_user=request.user),
self.check_clinic_doctor_therapist(user_profiles_dict)
]
)
)
if not self.request.user.is_superuser and not dept_record['OWNERS']:
'''Filtering only for clinics and not for Owners'''
user_profiles_dict = self.filter_clinic_users(user_profiles_dict)
app_users, knowlarity_users, total_users = x[0]
user_profiles_list = sorted(list(user_profiles_dict.values()),
key=lambda x: x.get('last_login_on', get_ist_time()),
reverse=True)
loop.close()
# print(notification_city_list)
# for city, coords in notification_city_list:
# print("['" + city + "', " +
# "[" + str(coords[1]) + ", " + str(coords[0]) + ']],')
# read_cities_list()
try:
result_dict.update({'date': date_filter,
'dept_record': dept_record
})
except ValueError:
unpack_error = user_profiles_list
result_dict.update(
{
'form': form,
'show_date':show_date,
'user_profiles': user_profiles_list,
'dept_record': dept_record,
'superuser': self.request.user.is_superuser,
'app_users': app_users,
'knowlarity_users': knowlarity_users,
'total_users': total_users,
'clinic_phone_number': self.request.user.phone_number[3:],
'countries': notification_city_list_reversed
# 'clinic_choices': CLINIC_CHOICES
}
)
if unpack_error:
result_dict.update({'errors': unpack_error})
return render(request, 'clinic_users2.html', result_dict)
def post(self, request, *args, **kwargs):
user_id = kwargs['user_id']
update_type = kwargs['update_type']
cal_date = kwargs['cal_date']
message = 'Customer ' + update_type + ' Not Saved'
cust_name = request.POST.get('cust_name')
cust_notes = request.POST.get('cust_notes')
cust_closed_status = request.POST.get('cust_closed_status')
status_code = 200
if update_type == 'name':
if cust_name and cust_name != '':
result = user_profile_collection.update_one(
{
'_id': ObjectId(user_id),
'name': {"$exists": False}
},
{
"$set": {
'offline_name': cust_name
}
}
)
if result.modified_count == 1:
message = 'Customer Name Saved'
status_code = 201
else:
result = user_profile_collection.update_one(
{
'_id': ObjectId(user_id)
},
{
"$set": {
'name': cust_name
}
}
)
if result.modified_count == 1:
message = 'Customer Name Saved'
status_code = 201
else:
message = 'Invalid Name'
status_code = 403
response = HttpResponse(json.dumps({'message': message,
'cust_name': cust_name}),
content_type="application/json")
response.status_code = status_code
elif update_type == 'notes':
if cust_notes and cust_notes != '':
user_activity_collection.find_one_and_update(
{
'user_id': user_id,
'date': cal_date
},
{
"$set": {
'notes': cust_notes
}
},
upsert=True,
hint=USER_ID_DATE_IDX
)
message = 'Customer Notes Saved'
status_code = 201
else:
message = 'Invalid Notes'
status_code = 403
response = HttpResponse(json.dumps({'message': message,
'cust_notes': cust_notes}),
content_type="application/json")
response.status_code = status_code
elif update_type == 'close-lead':
if cust_closed_status and cust_closed_status != '':
user_activity_collection.find_one_and_update(
{
'user_id': user_id,
'date': cal_date
},
{
"$set": {
'is_closed': cust_closed_status,
'closed_at': get_ist_time(),
'closed_by': self.request.user.email
}
},
upsert=True,
hint=USER_ID_DATE_IDX
)
message = 'Customer Lead Status Saved'
status_code = 201
else:
message = 'Invalid Type'
status_code = 403
response = HttpResponse(json.dumps({'message': message,
'cust_closed_status': cust_notes}),
content_type="application/json")
response.status_code = status_code
return response
class ClinicUserChat(AyurshUserMixin, generic.View):
login_url = "accounts:signin"
def get(self, request, *args, **kwargs):
user_id = kwargs['user_id']
user_profile = user_profile_collection.find_one({
"_id": ObjectId(user_id)
})
result_dict = {
"phone_number": user_profile['phone_number'],
"user_id": user_id,
"restore_id": user_profile.get('restore_id'),
"token": "15c3f8e3-346f-4477-8fc1-e0a27f61f4af",
"host": "https://ayursh-org-72e1fa982b95fb216697299.freshchat.com"
}
return render(request, 'clinic_user_chat.html', {
'result': result_dict
})
def post(self, request, *args, **kwargs):
user_id = kwargs['user_id']
update_type = kwargs['update_type']
cal_date = kwargs['cal_date']
message = 'Customer ' + update_type + ' Not Saved'
cust_name = request.POST.get('cust_name')
cust_notes = request.POST.get('cust_notes')
cust_closed_status = request.POST.get('cust_closed_status')
status_code = 200
if update_type == 'name':
if cust_name and cust_name != '':
result = user_profile_collection.update_one(
{
'_id': ObjectId(user_id),
'name': {"$exists": False}
},
{
"$set": {
'offline_name': cust_name
}
}
)
if result.modified_count == 1:
message = 'Customer Name Saved'
status_code = 201
else:
message = 'Invalid Name'
status_code = 403
response = HttpResponse(json.dumps({'message': message,
'cust_name': cust_name}),
content_type="application/json")
response.status_code = status_code
return response
import asyncio
import copy
import datetime
import json
import jwt
import re
import base64
import aiosmtplib
import datetime as dt
from datetime import timedelta
from collections import defaultdict
from random import randint
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from aiohttp import web, log
from twilio.base.exceptions import TwilioRestException
from googleapiclient.errors import HttpError
from googleapiclient.discovery import build
from utils.request import create_async_tasks, convert_sec_to_time, IST
from user.db.registration import *
from user.user_exceptions import *
from user.whatsapp_templates import *
from user.user_profile import UserProfile
from user.api.auth import GenerateUserProfileAuthToken
from utils.request import notify_slack, user_funnel_slack_notification, post_async_tasks
logger = log.access_logger
FRESHWORKS_SPAM_ACTOR_ID_LIST = [
'f1a3797a-31c2-4e77-8288-f548d591caa1', # Br.S.k.chawhan
'53888cc5-3069-410c-b6e2-b76f76edac6b' # s k chawhan72
]
FRESHWORKS_AGENT_ID = 'be8f79d6-2a66-4b54-883b-de02816e5258'
CALL_AGENTS = {
'+919916788332': 'Raghavan',
'+919738600900': 'Ranjani',
'+916363112992': 'Vinod',
'+919632608009': 'Raghavan'
}
class User(web.View):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.db = self.request.app['db']
self.logger = logger
self.twilio_client = self.request.app['twilio_client']
self.verify_services = self.request.app['twilio_verify_service']
self.user_id = None
self.user_name = None
self.user_email = None
self.user_name_handle = None
self.user_profile = None
self.sms_token = None
self.sms_token_session_id = None
self.sms_token_session_period = None
self.session_token = None
self.token_session_id = None
self.token_session_period = None
self.token_type = None
self.communication_vendor = '2FA' # or TWILIO
self.auth_token = None
self.support_message = None
async def send_sms_token(self):
sms_token_session_id = None
if self.communication_vendor == 'TWILIO':
self.verify_services.verifications.create(
to=self.user_profile.phone_number,
channel='sms'
)
elif self.communication_vendor == '2FA':
response = await create_async_tasks(
self.request.app,
self.request.app['2FACTOR_ENDPOINT'].format(
self.request.app['TWO_FACTOR_API_KEY'],
self.user_profile.phone_number
),
['']
)
endpoint, _, twofa_response = response[0]
self.logger.info(twofa_response)
twofa_response_dict = json.loads(twofa_response)
if twofa_response_dict['Status'] == 'Success':
sms_token_session_id = twofa_response_dict['Details']
else:
logger.error(
'Failed to send SMS Token: %s: %s' %
(self.user_profile.phone_number, twofa_response_dict)
)
raise UnknownSMSTokenError('AYOU0129')
return sms_token_session_id
async def send_whatsapp_token(self):
def random_with_N_digits(n):
range_start = 10 ** (n - 1)
range_end = (10 ** n) - 1
return randint(range_start, range_end)
whatsapp_otp = str(random_with_N_digits(6))
whatsapp_token_session_id = None
template = freshworks_whatsapp_otp_authentication_template
template['to'][0]['phone_number'] = self.user_profile.phone_number
template['data']['message_template']['rich_template_data']['body']['params'][0]['data'] = whatsapp_otp
template['data']['message_template']['rich_template_data']['button']['params'][0]['data'] = whatsapp_otp
try:
response = await post_async_tasks(
self.request.app,
self.request.app['FRESHWORKS_WHATSAPP_ENDPOINT'],
task_list=[template],
headers={'Authorization': 'Bearer ' + self.request.app['FRESHWORKS_AUTH_TOKEN']}
)
response_json = response[0]
try:
whatsapp_token_session_id = response_json['request_id']
except KeyError:
message_status = response_json['success']
if not message_status:
logger.error(
'Failed to send Whatsapp Token: %s: %s' %
(self.user_profile.phone_number, response_json['error_message'])
)
raise UnknownSMSTokenError('AYOU0127')
else:
logger.error(
'Unknown Error: %s: %s' %
(self.user_profile.phone_number, response_json)
)
except Exception as e:
print(str(e))
print('Error sending whatsapp token')
return whatsapp_token_session_id
async def login_user(self, user_dict):
ayursh_login_url = 'https://prod.ayursh.com/onboarding/api/v1/login'
# ayursh_login_url = 'http://localhost:8080/onboarding/api/v1/login'
if user_dict['phone_number']:
is_existing_user, is_existing_user_activity = \
await is_existing_user_gmail_login_(self, user_dict['phone_number'])
user_id = None
login_payload = copy.deepcopy(user_dict)
login_payload.pop('last_login_on')
if is_existing_user:
user_id = str(is_existing_user['_id'])
else:
self.logger.info('Creating new User: %s' % user_dict)
login_response = await post_async_tasks(
self.request.app,
ayursh_login_url,
[login_payload]
)
if login_response:
login_response = login_response[0]
if login_response['success']:
response_data = login_response['data']
user_id = response_data['user_id']
else:
print('Error Creating Login for Email Users')
if user_id and user_dict.get('activity'):
record_already_exists = False
if is_existing_user_activity and is_existing_user_activity.get('activities'):
activities = is_existing_user_activity['activities']
for activity in activities:
if (activity.get('message_id') and
activity.get('message_id') == user_dict['message_id']):
record_already_exists = True
# elif (activity.get('conversation_id') and user_dict.get('conversation_id') and
# activity.get('conversation_id') == user_dict['conversation_id']):
# record_already_exists = True
if not record_already_exists:
await insert_user_activity_(self, user_dict, user_id)
else:
'''AGENT'''
await insert_user_activity_(self, user_dict, None)
class RegisterUser(User):
async def post(self):
post_data = await self.request.json()
self.logger.info('RegisterUser: %s' % post_data)
password_hasher = self.request.app['password_hasher']
self.user_profile = UserProfile(
post_data['phone_number'],
pin=post_data['pin'],
user_name=post_data.get('user_name'),
user_email=post_data.get('user_email'),
is_web_registration=post_data['is_web_registration']
)
if not await is_existing_user_(self):
'''Commenting it as it takes 6+ seconds'''
# user_pin_hash = password_hasher.hash(self.user_profile.pin)
user_pin_hash = 1985
insert_id = await register_user_profile_(self, user_pin_hash)
if insert_id:
sms_token_session_id = await self.send_sms_token()
return {
"user_id": str(insert_id),
"sms_token_session_id": sms_token_session_id
}, 200
else:
self.logger.error('Unknown Register User Exception: %s' % self.user_profile.phone_number)
raise UnknownUserException('AYOU0032')
else:
self.logger.error('User is already registered: %s' % self.user_profile.phone_number)
raise AlreadyRegisteredUser('AYOU0004')
class RegisterWebUser(User):
registration_message = 'Thanks for your interest in Ayursh. ' \
'Our team will call you back shortly on {}.'
duplicate_registration_message = 'Your request is already registered. ' \
'Our team will get back to you shortly on {}'
async def post(self):
post_data = await self.request.json()
self.user_profile = UserProfile(
post_data['phone_number'],
user_name=post_data['user_name'],
user_message=post_data['user_message']
)
if not await is_existing_web_user_(self):
insert_id = await register_web_user_profile_(self)
if insert_id:
return {
"registration_response": self.registration_message.
format(self.user_profile.phone_number)
}, 200
else:
raise UnknownUserException('AYOU0032')
else:
return {
"registration_response": self.duplicate_registration_message.
format(self.user_profile.phone_number)
}, 200
class LoginUser(User):
async def post(self):
post_data = await self.request.json()
self.logger.info('LoginUser: %s' % post_data)
if post_data['phone_number'].startswith('+91+91') and len(post_data['phone_number']) == 16:
post_data['phone_number'] = post_data['phone_number'][3:]
self.logger.info('LoginUser Updated Phone number: %s' % post_data)
self.user_profile = UserProfile(
post_data['phone_number'],
is_web_login=post_data.get('is_web_login', False),
is_offline_registration=post_data.get('is_offline_registration'),
login_method=post_data.get('login_method'),
user_name_handle=post_data.get('user_name_handle')
)
if post_data.get('user_name'):
self.user_profile.user_name = post_data.get('user_name')
if post_data.get('user_email'):
self.user_profile.user_email = post_data.get('user_email')
message = f'User {post_data["phone_number"]} is trying to login'
user_funnel_slack_notification(self.request, message)
user_profile = await get_user_profile(self)
sms_token_session_id = None
if user_profile:
'''Dont want to generate OTP if offline registration'''
if (not self.user_profile.is_offline_registration
# and not self.user_profile.phone_number == '+919916788332'
):
if self.user_profile.phone_number.startswith('+91'):
sms_token_session_id = await self.send_sms_token()
else:
sms_token_session_id = await self.send_whatsapp_token()
return {
"user_id": str(user_profile['_id']),
"sms_token_session_id": sms_token_session_id
}, 200
else:
if self.user_profile.is_web_login:
'''Setting is_web_registration to True because no user_profile'''
self.user_profile.is_web_registration = True
else:
self.user_profile.is_web_registration = False
if self.user_profile.is_offline_registration:
self.user_profile.is_offline_registration = True
else:
self.user_profile.is_offline_registration = False
'''Pin is not used. Hardcoding for now'''
# password_hasher = self.request.app['password_hasher']
# user_pin_hash = password_hasher.hash("1985")
user_pin_hash = 1985
insert_id = await register_user_profile_(self, user_pin_hash)
if insert_id:
if (not self.user_profile.is_offline_registration and
not self.user_profile.phone_number == '+919916788332'):
if self.user_profile.phone_number.startswith('+91'):
sms_token_session_id = await self.send_sms_token()
else:
sms_token_session_id = await self.send_whatsapp_token()
return {
"user_id": str(insert_id),
"sms_token_session_id": sms_token_session_id
}, 200
else:
self.logger.error('Unknown Register User Exception: %s' % self.user_profile.phone_number)
raise UnknownUserException('AYOU0032')
# else:
# self.logger.error('Unknown User - Please register: %s' % self.user_profile.phone_number)
# raise InvalidUser('AYOU0005')
async def get(self):
self.user_profile = UserProfile('+91xxxxxxxxxx')
user_profile = await get_user_profile(self)
return user_profile, 200
class ValidateToken(User):
def verify_twilio_token(self):
try:
verification_result = self.verify_services.verification_checks.create(
to=self.user_profile.phone_number,
code=self.user_profile.sms_token
)
if verification_result.status == 'pending':
raise InvalidSMSToken('AYOU0128')
elif verification_result.status == 'approved':
return {
"token_approved": True,
"auth_token": GenerateUserProfileAuthToken(
self.user_profile,
self.request.app['JWT_SECRET_KEY'],
self.request.app['JWT_EXP_DELTA_SECONDS'],
self.request.app['JWT_REFRESH_EXP_DELTA_SECONDS']
).get_auth_token()
}, 200
else:
raise UnknownSMSTokenError('AYOU0064')
except TwilioRestException as e:
if e.status == 404:
self.logger.info(
'User Twilio SMS Token Verification failure: %s: %s' %
(self.user_profile.phone_number, str(e))
)
raise UnknownSMSTokenError('AYOU0065')
elif e.status == 429:
self.logger.info(
'User Twilio SMS Token Verification failure: %s: %s' %
(self.user_profile.phone_number, str(e))
)
raise UnknownSMSTokenError('AYOU0066')
async def verify_2fa_token(self):
twofa_response_status = dict()
if not self.user_profile.is_requested_by_operation_team:
sms_token_session_id = self.user_profile.sms_token_session_id
response = await create_async_tasks(
# two_factor_task = create_async_tasks(
self.request.app,
self.request.app['2FACTOR_VERIFY_ENDPOINT'].format(
self.request.app['TWO_FACTOR_API_KEY'],
sms_token_session_id,
self.user_profile.sms_token
),
['']
)
# persist_login_task = persist_login_log_(self)
# await asyncio.gather(two_factor_task, persist_login_task)
endpoint, _, twofa_response = response[0]
twofa_response_status = json.loads(twofa_response)
# self.logger.info(sms_token_session_id)
# self.logger.info(self.user_profile.sms_token)
# self.logger.info(twofa_response_status)
# self.logger.info(self.user_profile.phone_number)
# self.logger.info(self.user_profile.is_offline_registration)
# self.logger.info(self.user_profile.is_requested_by_operation_team)
if (twofa_response_status and twofa_response_status['Details'] == 'OTP Matched') or \
((self.user_profile.phone_number == '+919916788332' or self.user_profile.is_offline_registration) and self.user_profile.sms_token == '654140') or self.user_profile.is_requested_by_operation_team:
auth_token = GenerateUserProfileAuthToken(
self.user_profile,
self.request.app['JWT_SECRET_KEY'],
self.request.app['JWT_EXP_DELTA_SECONDS'],
self.request.app['JWT_REFRESH_EXP_DELTA_SECONDS']
)
if not self.user_profile.is_requested_by_operation_team:
message = f'User {self.user_profile.phone_number} has logged in.'
user_funnel_slack_notification(self.request, message)
return {
"token_approved": True,
"auth_token": auth_token.get_auth_token(),
"auth_token_expiry": auth_token.get_auth_token_expiry(),
"refresh_token": auth_token.get_refresh_token(),
"refresh_token_expiry": auth_token.get_refresh_token_expiry()
}, 200
else:
self.logger.info(
'User SMS Token Verification failure: %s: %s' %
(self.user_profile.phone_number, twofa_response_status)
)
raise UnknownSMSTokenError('AYOU0128')
async def verify_whatsapp_token(self):
whatsapp_response_status = dict()
if not self.user_profile.is_requested_by_operation_team:
sms_token_session_id = self.user_profile.sms_token_session_id
response = await create_async_tasks(
self.request.app,
self.request.app['FRESHWORKS_WHATSAPP_VERIFY_ENDPOINT'].format(
sms_token_session_id
),
[''],
headers={'Authorization': 'Bearer ' + self.request.app['FRESHWORKS_AUTH_TOKEN']}
)
endpoint, _, whatsapp_response = response[0]
whatsapp_response_status = json.loads(whatsapp_response)
if whatsapp_response_status.get('code') == 404:
self.logger.info(
'Invalid Whatsapp Token Session Id: %s: %s' %
(self.user_profile.phone_number, whatsapp_response_status)
)
raise UnknownSMSTokenError('AYOU0126')
else:
if ((whatsapp_response_status and whatsapp_response_status['outbound_messages'][0]['data']['message_template'][
'rich_template_data']['body']['params'][0]['data'] == self.user_profile.sms_token) or
self.user_profile.is_requested_by_operation_team):
auth_token = GenerateUserProfileAuthToken(
self.user_profile,
self.request.app['JWT_SECRET_KEY'],
self.request.app['JWT_EXP_DELTA_SECONDS'],
self.request.app['JWT_REFRESH_EXP_DELTA_SECONDS']
)
if not self.user_profile.is_requested_by_operation_team:
message = f'User {self.user_profile.phone_number} has logged in.'
user_funnel_slack_notification(self.request, message)
return {
"token_approved": True,
"auth_token": auth_token.get_auth_token(),
"auth_token_expiry": auth_token.get_auth_token_expiry(),
"refresh_token": auth_token.get_refresh_token(),
"refresh_token_expiry": auth_token.get_refresh_token_expiry()
}, 200
else:
self.logger.info(
'User Whatsapp Token Verification failure: %s: %s' %
(self.user_profile.phone_number, whatsapp_response_status)
)
raise UnknownSMSTokenError('AYOU0125')
async def verify_token(self):
if self.communication_vendor == 'TWILIO':
self.verify_twilio_token()
elif self.communication_vendor == '2FA':
if self.user_profile.phone_number.startswith('+91'):
return await self.verify_2fa_token()
else:
return await self.verify_whatsapp_token()
async def post(self):
post_data = await self.request.json()
# self.logger.info(post_data)
self.user_profile = UserProfile(
post_data['phone_number'],
sms_token=post_data['sms_token'],
sms_token_session_id=post_data['sms_token_session_id'],
user_id=post_data['user_id'],
is_offline_registration=post_data.get('is_offline_registration'),
is_requested_by_operation_team=post_data.get(
'is_requested_by_operation_team',
False
)
)
return await self.verify_token()
class RefreshToken(User):
async def get(self):
token = self.request.headers['Authorization'].split('Bearer ')[1]
payload = jwt.decode(
token,
self.request.app['JWT_SECRET_KEY'],
algorithms=['HS256']
)
if payload and payload.get('grant_type') == 'refresh':
user_profile = UserProfile()
user_profile.user_id = payload['user_id']
auth_token = GenerateUserProfileAuthToken(
user_profile,
self.request.app['JWT_SECRET_KEY'],
self.request.app['JWT_EXP_DELTA_SECONDS'],
self.request.app['JWT_REFRESH_EXP_DELTA_SECONDS']
)
return {
'auth_token': auth_token.get_auth_token()
}, 200
self.logger.error('Invalid Refresh Token Error')
raise InvalidRefreshToken('AYOU0063')
class CancelToken(User):
async def post(self):
post_data = await self.request.json()
verification = self.verify_services \
.verifications(post_data['verify_sid']) \
.update(status='canceled')
return verification.status, 200
class UserDetail(User):
async def get(self):
self.user_id = self.request.user
user_profile = await get_user_profile_by_user_id(self)
return user_profile, 200
async def patch(self):
patch_data = await self.request.json()
if patch_data.get('email'):
self.user_email = patch_data['email']
if patch_data.get('name'):
self.user_name = patch_data['name']
if not self.user_name and not self.user_email:
raise InvalidUser('AYOU0017')
await update_user_details_(self)
return {
'Success': True
}, 200
class EmailRequest(User):
async def connect_smtp(self):
mail_params = self.request.app['MAIL_PARAMS']
host = mail_params.get('host')
isSSL = mail_params.get('SSL', False)
isTLS = mail_params.get('TLS', False)
port = mail_params.get('port', 465 if isSSL else 25)
self.smtp = aiosmtplib.SMTP(hostname=host, port=port, use_tls=isSSL)
await self.smtp.connect()
# if isTLS:
# await self.smtp.starttls()
if 'user' in mail_params:
await self.smtp.login(mail_params['user'], mail_params['password'])
async def send_mail_async(self, sender, to, subject, text, textType='plain', **params):
"""Send an outgoing email with the given parameters.
:param textType: Mime subtype of text, defaults to 'plain' (can be 'html').
:type text: str
:param params: An optional set of parameters. (See below)
:type params; dict
Optional Parameters:
:cc: A list of Cc email addresses.
:bcc: A list of Bcc email addresses.
"""
# Default Parameters
cc = params.get("cc", [])
bcc = params.get("bcc", [])
# Prepare Message
msg = MIMEMultipart()
msg.preamble = subject
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = ', '.join(to)
if len(cc):
msg['Cc'] = ', '.join(cc)
if len(bcc):
msg['Bcc'] = ', '.join(bcc)
msg.attach(MIMEText(text, textType, 'utf-8'))
try:
self.smtp = self.request.app['SMTP']
await self.smtp.send_message(msg)
except aiosmtplib.errors.SMTPServerDisconnected:
await self.connect_smtp()
await self.smtp.send_message(msg)
async def post(self):
pass
class SupportTicket(EmailRequest):
async def post(self):
self.user_id = self.request.user
post_data = await self.request.json()
self.logger.info('Support Data: %s' % post_data)
self.support_message = \
'User Email: ' + post_data['email'] + '\n\n' + \
'User Ph No: ' + post_data['phone_number'] + '\n\n' + \
post_data['message']
await self.send_mail_async(
self.request.app['MAIL_PARAMS']['user'],
[self.request.app['SUPPORT_EMAIL']],
"Support request from User: %s" % post_data['name'],
self.support_message,
textType="plain"
)
await store_support_queries_(self)
return True, 200
class ClinicSignupEmail(EmailRequest):
async def post(self):
self.clinic_user_id = self.request.clinic
post_data = await self.request.json()
self.logger.info('Clinic Signup Data: %s' % post_data)
self.support_message = post_data['message']
await self.send_mail_async(
self.request.app['MAIL_PARAMS']['user'],
[post_data['to_email']],
post_data['subject'],
self.support_message,
textType="html"
)
return True, 200
class KnowlarityLoginUser(User):
def handle_errored_response(self, response):
updated_response = list()
for y in response.split('"cu')[1:]:
y = '{"cu' + y[:-2]
try:
updated_response.append(json.loads(y))
except:
pass
return updated_response
async def get(self):
calendar_date = self.request.rel_url.query.get('calendar_date')
calendar_end_date = self.request.rel_url.query.get('calendar_end_date')
if not calendar_date:
cal_date = dt.datetime.today()
else:
cal_date = dt.datetime.strptime(calendar_date, '%Y-%m-%d')
now = get_ist_time()
if calendar_end_date:
cal_end_date = dt.datetime.strptime(calendar_end_date, '%Y-%m-%d')
cal_date = dt.datetime.strptime(calendar_end_date, '%Y-%m-%d') + dt.timedelta(days=-28)
end_date_time = dt.datetime.combine(cal_end_date, dt.time(23, 59, 59))
else:
if cal_date.date() == now.date():
end_date_time = dt.datetime.combine(cal_date, dt.time(now.hour, now.minute, now.second))
else:
end_date_time = dt.datetime.combine(cal_date, dt.time(23, 59, 59))
start_date_time = dt.datetime.combine(cal_date, dt.time(0, 0, 0))
headers = {
'Accept': 'application/json',
'x-api-key': self.request.app['KNOWLARITY_X_API_KEY'],
'Authorization': self.request.app['KNOWLARITY_AUTHORIZATION'],
'content-type': 'application/json'
}
params = {
'start_time': start_date_time.strftime('%Y-%m-%d %H:%M:%S+05:30'),
'end_time': end_date_time.strftime('%Y-%m-%d %H:%M:%S+05:30'),
'limit': 200
}
tasks = [
create_async_tasks(
self.request.app,
self.request.app['KNOWLARITY_CALL_LOG_ENDPOINT'],
[''],
params=params,
headers=headers
)
]
if calendar_date or calendar_end_date:
cal_end_date = dt.datetime.strptime(calendar_end_date, '%Y-%m-%d') if calendar_end_date else cal_date
exotel_url = self.request.app['EXOTEL_HOST'] + \
f"DateCreated=gte%3A{cal_date.strftime('%Y-%m-%d')}%2000%3A00%3A00%3Blte%3A{cal_end_date.strftime('%Y-%m-%d')}%2023%3A59%3A59"
tasks.append(
create_async_tasks(
self.request.app,
exotel_url,
['']
)
)
knowlarity_response, exotel_response = await asyncio.gather(*tasks)
exotel_response = exotel_response[0]
_, _, exotel_response = exotel_response
exotel_response_obj = json.loads(exotel_response)['Calls'] or list()
else:
knowlarity_response = await tasks[0]
exotel_response_obj = list()
knowlarity_response = knowlarity_response[0]
_, _, response = knowlarity_response
if isinstance(response, dict):
response_str = json.dumps(response)
else:
response_str = response
response_str = response_str.replace("\t", "").replace("\n", "").replace(",}", "}").replace(",]", "]")
try:
response_json = json.loads(response_str)
iter_objs = response_json.get('objects', []) # safe, returns empty list if 'objects' not present
except Exception as e:
iter_objs = self.handle_errored_response(response_str)
knowlarity_user_profiles_dict = dict()
call_logs_dict = defaultdict(list)
recent_phone_number = None
for idx, knowlarity_object in enumerate(iter_objs):
knowlarity_profile_dict = dict()
if idx == 0:
recent_phone_number = knowlarity_object['customer_number']
raw_phone = knowlarity_object['customer_number']
normalized_phone = raw_phone if raw_phone.startswith('+') else '+91' + raw_phone.lstrip('0')
knowlarity_profile_dict['knowlarity_uuid'] = knowlarity_object['uuid']
knowlarity_profile_dict['phone_number'] = normalized_phone
knowlarity_profile_dict['ph_no'] = normalized_phone[3:]
knowlarity_profile_dict['login_method'] = 'knowlarity'
knowlarity_profile_dict['is_web_registration'] = False
knowlarity_profile_dict['name'] = knowlarity_object['caller_name']
call_start_time = dt.datetime.strptime(
knowlarity_object['start_time'], '%Y-%m-%d %H:%M:%S%z'
).astimezone(IST)
call_logs_dict[normalized_phone].append({
'call_type': 'Outgoing' if knowlarity_object['Call_Type'] == 1 else 'Incoming',
'call_duration': convert_sec_to_time(knowlarity_object['call_duration']),
'call_time': call_start_time.strftime('%H:%M'),
'call_start_time': call_start_time.timestamp(),
'call_status': knowlarity_object['agent_number'].split('#')[0],
'agent': knowlarity_object['destination'],
'call_recording': knowlarity_object['call_recording'],
'call_fa_icon': 'fas fa-arrow-up' if knowlarity_object['Call_Type'] == 1 else 'fas fa-arrow-down',
'call_fa_color': 'color:green;' if 'Missed' not in knowlarity_object['agent_number']
and 'Disconnected' not in knowlarity_object['agent_number'] else 'color:red;',
})
if normalized_phone not in knowlarity_user_profiles_dict:
knowlarity_profile_dict['last_login_on'] = call_logs_dict[normalized_phone][0]['call_start_time']
knowlarity_profile_dict['call_logs'] = call_logs_dict[normalized_phone]
knowlarity_user_profiles_dict[normalized_phone] = knowlarity_profile_dict
for idx, exotel_object in enumerate(exotel_response_obj):
exotel_profile_dict = dict()
if exotel_object['Direction'] == 'outbound-dial':
to_phone_number = '+91' + exotel_object['To'][1:]
from_phone_number = '+91' + exotel_object['From'][1:]
else:
to_phone_number = '+91' + exotel_object['From'][1:]
from_phone_number = '+91' + exotel_object['To'][1:]
print(
f"Exotel To: {exotel_object['To']} From: {exotel_object['From']} Direction: {exotel_object['Direction']}")
print(f"to_phone_number: {to_phone_number}")
print(f"exists in knowlarity_user_profiles_dict: {to_phone_number in knowlarity_user_profiles_dict}")
print(f"exists in call_logs_dict: {to_phone_number in call_logs_dict}")
call_start_time = dt.datetime.strptime(
exotel_object['StartTime'] + '+05:30', '%Y-%m-%d %H:%M:%S%z'
).astimezone(IST)
call_logs_dict[to_phone_number].append({
'call_type': 'Outgoing' if exotel_object['Direction'] == 'outbound-dial' else 'Incoming',
'call_duration': convert_sec_to_time(exotel_object['Duration']),
'call_time': call_start_time.strftime('%H:%M'),
'call_start_time': call_start_time.timestamp(),
'call_status': exotel_object['Status'],
'agent': from_phone_number,
'call_recording': exotel_object['RecordingUrl'],
'call_fa_icon': 'fas fa-arrow-up' if exotel_object['Direction'] == 'outbound-dial' else 'fas fa-arrow-down',
'call_fa_color': 'color:green;' if 'Missed' not in exotel_object['Status']
and 'Disconnected' not in exotel_object['Status'] else 'color:red;',
})
if to_phone_number not in knowlarity_user_profiles_dict:
exotel_profile_dict['knowlarity_uuid'] = exotel_object['Sid']
exotel_profile_dict['phone_number'] = to_phone_number
exotel_profile_dict['ph_no'] = exotel_object['To'][1:]
exotel_profile_dict['login_method'] = 'exotel'
exotel_profile_dict['is_web_registration'] = False
exotel_profile_dict['name'] = exotel_object['CallerName']
exotel_profile_dict['last_login_on'] = call_logs_dict[to_phone_number][0]['call_start_time']
exotel_profile_dict['call_logs'] = call_logs_dict[to_phone_number]
knowlarity_user_profiles_dict[to_phone_number] = exotel_profile_dict
'''Sorting call logs of knowlarity and exotel based on call start time'''
for phone_number in knowlarity_user_profiles_dict:
knowlarity_user_profiles_dict[phone_number]['call_logs'] = sorted(
call_logs_dict.get(phone_number, []),
key=lambda x: int(x['call_start_time']), reverse=True
)
return {
'knowlarity_user_profiles': knowlarity_user_profiles_dict,
'call_logs': dict(call_logs_dict),
'recent_phone_number': recent_phone_number
}, 200
async def post(self):
"""This is a knowlarity Hook API. It is set up inside Knowlarity Call Flow API Hooks.
This API will be called when Call Ends from User"""
self.logger.info('Knowlarity LoginUser Triggered')
self.logger.info('Sleeping for 20 Seconds for Knowlarity to give us this new User details')
await asyncio.sleep(20)
knowlarity_response, _ = await self.get()
try:
self.logger.info(next(iter(knowlarity_response['knowlarity_user_profiles'])))
except StopIteration:
pass
await asyncio.sleep(10)
knowlarity_response, _ = await self.get()
self.logger.info(next(iter(knowlarity_response['knowlarity_user_profiles'])))
self.knowlarity_user_profiles = knowlarity_response['knowlarity_user_profiles']
'''Commenting below. Because if old user for whom name is saved with 'ignore' in it,
if that customer calls today, last_login_on field is not being updated here. If last_login_on field
is not updated, its causing issue in therapist-mgr service to mark it as Closed '''
# self.knowlarity_user_profiles = {
# k: v for k, v in self.knowlarity_user_profiles.items()
# if 'ignore' not in v['name'] and 'crap' not in v['name'] and 'wrong' not in v['name']
# and 'cross' not in v['name']
# }
phone_numbers_of_day = set(self.knowlarity_user_profiles.keys())
unregistered_phone_numbers, registered_phone_numbers = \
await filter_unregistered_phone_numbers_(self, phone_numbers_of_day)
tasks = list()
if registered_phone_numbers:
tasks.append(update_existing_users_bulk_(self, registered_phone_numbers, 'knowlarity'))
if unregistered_phone_numbers:
tasks.append(register_new_users_bulk_(self, unregistered_phone_numbers))
recent_phone_number = knowlarity_response['recent_phone_number']
recent_call_details = self.knowlarity_user_profiles[recent_phone_number]['call_logs'][0]
message = (f'{recent_call_details["call_type"]} call from {recent_phone_number} '
f'for duration {recent_call_details["call_duration"]} ')
message += f'attended by {CALL_AGENTS.get(recent_call_details["agent"], "Unknown")}' \
if recent_call_details["agent"] else f"{recent_call_details['call_status']}"
tasks.append(notify_slack(self.request.app, message))
await asyncio.gather(*tasks)
return {
'users_registered': len(unregistered_phone_numbers),
'users_updated': len(registered_phone_numbers)
}, 200
class EmailLoginUser(User):
def filter_justdial_message(self, justdial_message, nq_type):
justdial_message = (justdial_message.replace('User Area :', 'User Area:').
replace('User City :', 'User City:').
replace('Search Date & Time :', 'Search Date & Time:').
replace('User Phone : ', 'User Phone: ').
replace('User Email :', 'User Email:'))
if nq_type == 'inq':
activity_pre_str = ' inquired for your business category '
activity_post_str = str(re.escape(' Feedback Details User Area'))
else:
activity_pre_str = ' enquired for '
activity_post_str = str(re.escape(' User Area: '))
activity = re.findall(activity_pre_str + "(.*)" + activity_post_str + '|$', justdial_message)[0]
user_area_pre_str = str(re.escape(' User Area: '))
user_area_post_str = str(re.escape(' User City: '))
user_req_pre_str = str(re.escape(' User Requirement: '))
date_time_pre_str = user_req_post_str = str(re.escape(' Search Date & Time: '))
date_time_post_str = phone_number_pre_str = str(re.escape(' User Phone: '))
phone_number_post_str = email_pre_str = str(re.escape(' User Email: '))
email_post_str = str(re.escape(' Send Email '))
area = re.findall(user_area_pre_str + "(.*)" + user_area_post_str + '|$', justdial_message)[0]
requirement = re.findall(user_req_pre_str + "(.*)" + user_req_post_str + '|$', justdial_message)[0]
date_time = re.findall(date_time_pre_str + "(.*)" + date_time_post_str + '|$', justdial_message)[0]
if not date_time or date_time == '':
date_time_post_str = str(re.escape(' View Contact Details '))
date_time = re.findall(date_time_pre_str + "(.*)" + date_time_post_str + '|$', justdial_message)[0]
if not date_time or date_time == '':
date_time_post_str = str(re.escape(' View More Leads '))
date_time = re.findall(date_time_pre_str + "(.*)" + date_time_post_str + '|$', justdial_message)[0]
date_time = datetime.strptime(date_time, '%Y-%m-%d %H:%M:%S').astimezone(IST)
phone_number = re.findall(phone_number_pre_str + "(.*)" + phone_number_post_str + '|$', justdial_message)[0]
email = re.findall(email_pre_str + "(.*)" + email_post_str + '|$', justdial_message)[0]
return activity, area, requirement, date_time, phone_number, email
async def parse_email_message(self, message, message_id, mail_date):
user_dict = dict()
lines = message.split('\n')
phone_number = None
name = None
email = None
try:
if len(lines) > 2:
activity = ''
area = ''
login_method = 'email'
date_time = mail_date
from_split = lines[0].split('From: ')
if len(from_split) > 1:
name_email_split = from_split[1].split(' ')
if len(name_email_split) > 1:
name = name_email_split[0]
email = name_email_split[1].rstrip()[1:-1]
phone_number_split = lines[2].split('Phone No : ')
if phone_number_split and 'Ph No' in phone_number_split[0]:
phone_number_split = lines[2].split('User Ph No: ')
if len(phone_number_split) > 1:
if '+91' not in phone_number_split[1]:
phone_number = '+91' + phone_number_split[1].rstrip()
else:
phone_number = phone_number_split[1].rstrip()
if len(lines) < 5:
print(lines) #TODO: TO BE REMOVED
requirement = lines[4]
else:
if 'Justdial' in lines[0] or 'JD APP' in lines[0]:
login_method = 'justdial'
justdial_message = lines[0]
# self.logger.info('Justdial Message: %s' % justdial_message)
inq_proprietor = str(re.escape('Dear Mr Raghavan(Proprietor), '))
enq_proprietor = str(re.escape('Dear Mr Raghavan(Proprietor) '))
hi_proprietor = str(re.escape('Hi Mr Raghavan(Proprietor) '))
inquired_for = str(re.escape(' inquired for your business category '))
enquired_for = str(re.escape(' enquired for '))
enq_name = re.findall(enq_proprietor + "(.*)" + enquired_for + '|$', justdial_message)[0]
inq_name = re.findall(inq_proprietor + "(.*)" + inquired_for + '|$', justdial_message)[0]
hi_name = re.findall(hi_proprietor + "(.*)" + enquired_for + '|$', justdial_message)[0]
if enq_name:
name = enq_name
activity, area, requirement, date_time, phone_number, email = (
self.filter_justdial_message(justdial_message, 'enq'))
elif inq_name:
name = inq_name
activity, area, requirement, date_time, phone_number, email = (
self.filter_justdial_message(justdial_message, 'inq'))
elif hi_name:
name = hi_name
activity, area, requirement, date_time, phone_number, email = (
self.filter_justdial_message(justdial_message, 'enq'))
else:
print('Unknown Mail Parsing Error')
else:
print('Unknown Mail Type:', message)
except Exception:
self.logger.error('Unknown Error: parse_email_message: %s' % lines)
if phone_number:
if name:
user_dict['user_name'] = name
if email:
user_dict['user_email'] = email
user_dict['activity'] = area + ': ' + requirement if requirement else activity
user_dict['phone_number'] = phone_number
user_dict['is_offline_registration'] = True
user_dict['login_method'] = login_method
user_dict['last_login_on'] = date_time
user_dict['message_id'] = message_id
await self.login_user(user_dict)
def convert_sec_to_time(self, seconds):
min, sec = divmod(seconds, 60)
hour, min = divmod(min, 60)
return '%02d:%02d' % (min, sec)
def data_encoder(self, text):
if len(text) > 0:
message = base64.urlsafe_b64decode(text)
message = str(message, 'utf-8')
return message
def read_message(self, content) -> tuple:
message = None
headers = content['payload']['headers']
subject = [i['value'] for i in headers if i["name"] == "Subject"]
from_mail = [i['value'] for i in headers if i["name"] == "From"]
mail_date = [i['value'] for i in headers if i["name"] == "Date"]
try:
mail_date = datetime.strptime(mail_date[0][:-6], '%a, %d %b %Y %H:%M:%S %z').astimezone(IST)
except ValueError:
try:
mail_date = datetime.strptime(mail_date[0], '%a, %d %b %Y %H:%M:%S %z').astimezone(IST)
except ValueError:
self.logger.error('Mail date: %s' % mail_date[0])
if "data" in content['payload']['body']:
message = content['payload']['body']['data']
message = self.data_encoder(message)
elif "data" in content['payload']['parts'][0]['body']:
message = content['payload']['parts'][0]['body']['data']
message = self.data_encoder(message)
# else:
# print("Mail Body has no data.")
return subject, message, from_mail, mail_date
async def post(self):
email = 'raghavan@ayursh.com'
CREDS = self.request.app['gmail_creds']
# try:
# CREDS = CREDS.with_scopes(['https://www.googleapis.com/auth/gmail.readonly'])
# CREDS = CREDS.with_subject(email)
# service = build("gmail", "v1", credentials=CREDS)
# post_data = await self.request.json()
#
# self.calendar_date = datetime.today().strftime('%Y-%m-%d')
# # self.calendar_date = '2024-04-07' #TODO to be removed
# after_date = self.calendar_date.replace('-', '/')
# before_date = (datetime.strptime(self.calendar_date, '%Y-%m-%d') +
# timedelta(days=1)).strftime('%Y/%m/%d')
# self.logger.info('Email UserLogin1')
# gmail_response = service.users().messages().list(
# userId="me",
# q=["to:support@ayursh.com",
# "after:" + after_date, "before:" + before_date],
# ).execute()
# self.logger.info('Email UserLogin2')
# messages = gmail_response.get('messages', [])
# while 'nextPageToken' in gmail_response:
# page_token = gmail_response['nextPageToken']
# gmail_response = service.users().messages().list(
# userId="me",
# q=["to:support@ayursh.com",
# "after:" + after_date, "before:" + before_date],
# pageToken=page_token
# ).execute()
# messages.extend(gmail_response['messages'])
# self.logger.info('Email UserLogin3')
# batch = service.new_batch_http_request()
# self.logger.info('Email UserLogin3a')
# for message in messages:
# batch.add(
# service.users().messages().get(userId='me', id=message['id'])
# )
# self.logger.info('Email UserLogin3b')
# await asyncio.to_thread(batch.execute)
# self.logger.info('Email UserLogin4')
# async_tasks = []
# for request_id in batch._order:
# resp, content = batch._responses[request_id]
# message = json.loads(content)
# message_id = message['id']
# subject, message, from_mail, mail_date = self.read_message(message)
# if ((subject and 'Enquiry for Ayursh Ayurveda' in subject[0])
# or from_mail[0] in ['tech@ayursh.com', 'Ayursh <tech@ayursh.com>']):
# async_tasks.append(self.parse_email_message(message, message_id, mail_date))
# await asyncio.gather(*async_tasks)
# self.logger.info('Email UserLogin5')
#
# except HttpError as error:
# # TODO(developer) - Handle errors from gmail API.
# self.logger.error(f"An error occurred while fetching Gmail Mails: {error}")
return True, 200
class FreshchatHook(User):
'''
-----BEGIN RSA PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAi3uqd8UMlfZ7kUB/A3uXxeSG6scJDWsKiE7qBVrx86Gzdk9VGpbcoOXJkq0EV7A0YybVnLTqhwg1y/CyorQCB8u6Vn54p0m006RZ63loACo0Rp8geFy1xnOfvpUhMKolJfxHecp7tzcOo8V9YkU/7E0D2B7uByBxhtMMtt9Q/AocX3/s7W3DfYnfteQJvuHpH5OuwTePflHbPh8g+mGJujG7frMy2MHN7ggp5pfzqtD3x7s2szflD7WFbV11MO9at/sNpGnH2+8JUy9lwxjwqhtQk3tZy62eGM05y3UCMzwCwij+RxSL940FJHQJP2rXmick4UmxcqdJubMUOrEbrwIDAQAB
-----END RSA PUBLIC KEY-----
'''
freshchat_user_endpoint = 'https://api.in.freshchat.com/v2/users/'
async def get_user_data(self, user_id):
response = await create_async_tasks(
self.request.app,
self.freshchat_user_endpoint,
[user_id],
headers={'Authorization': 'Bearer ' + self.request.app['FRESHWORKS_AUTH_TOKEN']}
)
endpoint, _, freshchat_response = response[0]
freshchat_response_dict = json.loads(freshchat_response)
first_name = None
phone_number = None
if freshchat_response_dict:
first_name = freshchat_response_dict.get('first_name', 'Unknown')
phone_number = freshchat_response_dict.get('phone')
return first_name, phone_number
async def post(self):
post_data = await self.request.json()
self.logger.info(post_data)
# post_data = {'request_id': '449a771a-e36f-4dde-87b0-ab053463ad37', 'actor': {'actor_type': 'user', 'actor_id': '0f99d066-db1a-43b7-ad18-9fbb41c86755'}, 'action': 'message_create', 'action_time': '2025-09-05T10:38:01.082Z', 'data': {'message': {'message_parts': [], 'app_id': 'bac58c97-9fc9-4922-b0c4-bd358cf7485f', 'actor_id': '0f99d066-db1a-43b7-ad18-9fbb41c86755', 'org_actor_id': '1963914426827436032', 'id': 'b9bab384-4a23-4923-ad5f-786fe64833e1', 'channel_id': 'd9bdf75a-35db-40d4-96e7-08fc32037c7b', 'conversation_id': '52730d9f-9581-4776-9949-549de23acb50', 'freshchat_conversation_id': '1041667939870955', 'freshchat_channel_id': '108651', 'interaction_id': '1041667939870955-1757068661872', 'message_type': 'normal', 'actor_type': 'user', 'created_time': '2025-09-05T10:38:01.082Z', 'user_id': '0f99d066-db1a-43b7-ad18-9fbb41c86755', 'restrictResponse': False, 'botsPrivateNote': False, 'isBotsInput': False}}}
user_dict = dict()
if post_data:
is_user = post_data['actor']['actor_type'] == 'user'
is_agent = post_data['actor']['actor_type'] == 'agent'
if is_user or is_agent:
user_id = post_data['actor']['actor_id']
if user_id not in FRESHWORKS_SPAM_ACTOR_ID_LIST:
first_name, phone_number = await self.get_user_data(user_id)
if phone_number or user_id == FRESHWORKS_AGENT_ID:
message = post_data['data'].get('message')
if message:
if message.get('message_parts'):
has_text = message['message_parts'][0].get('text')
if has_text:
message_list = post_data['data']['message']['message_parts']
message = ''
for msg in message_list:
message += msg['text']['content'] + '\n'
message = message.strip('\n')
conversation_id = post_data['data']['message']['conversation_id']
if first_name:
user_dict['user_name'] = first_name
user_dict['activity'] = message
user_dict['phone_number'] = phone_number
user_dict['is_offline_registration'] = True
user_dict['login_method'] = 'freshworks'
message_date = datetime.strptime(post_data['action_time'], '%Y-%m-%dT%H:%M:%S.%fZ').astimezone(IST)
self.calendar_date = message_date.date().strftime('%Y-%m-%d')
user_dict['last_login_on'] = message_date
user_dict['conversation_id'] = conversation_id
if (user_dict.get('activity') and 'Thank you' not in user_dict['activity'] and
'Welcome' not in user_dict['activity']):
'''Assumption is, if Thank you keyword present, its highly likely clinic auto response.
We dont want to create login to Clinics and waste OTP SMS in 2factor.'''
await self.login_user(user_dict)
else:
self.logger.info(post_data['data']['message']['message_parts'][0])
else:
self.logger.error('Freshworks Hook Data: Phone number is empty')
else:
self.logger.error('Freshworks Hook Data is empty')
return True, 200
class ExotelLoginUser(User):
async def post(self):
"""This is a Exotel callback API. While placing call in Exotel in therapy services, status_callback is passed
with callback_url.
This API will be called when Call Ends from User"""
self.logger.info('Exotel LoginUser Triggered')
post_data = await self.request.json()
call_details = (await create_async_tasks(
self.request.app,
self.request.app['EXOTEL_NEW_HOST'] + post_data['call_details']['legs'],
['']
))[0]
_, _, call_details = call_details
call_details = json.loads(call_details)
if call_details['http_code'] == 200:
from_number = '+91' + call_details['response']['leg_details']['from'][0]['contact_uri'][1:]
to_number = call_details['response']['leg_details']['to']
if to_number:
to_number = '+91' + to_number[0]['contact_uri'][1:]
else:
to_number = from_number
else:
from_number = post_data['call_details']['custom_field'].split(',')[0]
to_number = post_data['call_details']['custom_field'].split(',')[1]
phone_numbers_set = {from_number, to_number}.difference(set(CALL_AGENTS.keys()))
self.logger.info('Exotel Post data: %s' % post_data)
start_time = datetime.strptime(post_data['call_details']['start_time'], '%Y-%m-%dT%H:%M:%S%z')
self.knowlarity_user_profiles = {}
for phone_number in phone_numbers_set:
self.knowlarity_user_profiles[phone_number] = {}
self.knowlarity_user_profiles[phone_number]['last_login_on'] = start_time.timestamp()
await update_existing_users_bulk_(self, phone_numbers_set, 'exotel')
return True, 200
class ClinosLogin(User):
async def post(self):
self.logger.info('Clinos LoginUser Triggered')
post_data = await self.request.json()
location_in_range = True
if post_data['message'] == 'LOGIN_SUCCESSFUL':
message = (f'User {post_data["email"]} has logged in successfully from '
f'https://maps.google.com/?q={post_data["latitude"]},{post_data["longitude"]}')
else:
if post_data['message'] == 'OUT_RANGE':
message = '****** ALERT ************** ALERT ******'
location_in_range = False
else:
message = ''
message += (f'User {post_data["email"]} is trying to login from '
f'https://maps.google.com/?q={post_data["latitude"]},{post_data["longitude"]}')
user_funnel_slack_notification(self.request, message)
if location_in_range:
return True, 200
else:
return False, 200
Comments
Post a Comment