Add voxtelesys_integration/api/voxtelesys.py

This commit is contained in:
Zac Gaetano 2026-05-11 08:09:44 -04:00
parent bad09d7981
commit f85af56750

View file

@ -0,0 +1,203 @@
"""
voxtelesys_integration.api.voxtelesys
All public-facing API methods for the Voxtelesys integration.
"""
from __future__ import annotations
import hashlib, hmac, json
from typing import Any
import frappe, requests
from frappe import _
from frappe.utils import now_datetime
VOICE_API_BASE = "https://voiceapi.voxtelesys.com/v1"
def _headers():
from voxtelesys_integration.doctype.voxtelesys_settings.voxtelesys_settings import VoxtelesysSettings
return {"Authorization": f"Bearer {VoxtelesysSettings.get_api_token()}", "Content-Type": "application/json", "Accept": "application/json"}
def _post(path, payload):
resp = requests.post(f"{VOICE_API_BASE}{path}", headers=_headers(), json=payload, timeout=15)
resp.raise_for_status(); return resp.json()
def _get(path, params=None):
resp = requests.get(f"{VOICE_API_BASE}{path}", headers=_headers(), params=params or {}, timeout=15)
resp.raise_for_status(); return resp.json()
def _verify_signature(body, signature):
secret = frappe.db.get_single_value("Voxtelesys Settings", "webhook_secret")
if not secret: return True
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature or "")
def _voxml_response(*elements):
body = "\n ".join(elements)
return f'<?xml version="1.0" encoding="UTF-8"?>\n<Response>\n {body}\n</Response>'
def _voxml_dial(agent_number, record, status_callback_url):
record_attr = 'record="true"' if record else ""
return f'<Dial action="{status_callback_url}" {record_attr}><Number>{agent_number}</Number></Dial>'
def _voxml_say(text): return f"<Say>{text}</Say>"
def _voxml_hangup(): return "<Hangup/>"
def _pick_agent_number(mode):
if mode == "Direct (no routing)": return None
users = frappe.get_all("User",
filters=[["custom_voxtelesys_number","!=",""],["enabled","=",1]],
fields=["name","custom_voxtelesys_number","last_active"],
order_by="last_active desc" if mode == "Availability-Based" else "name asc")
if not users: return None
if mode == "Round Robin":
idx_key = "voxtelesys_rr_idx"
idx = int(frappe.cache().get_value(idx_key) or 0)
user = users[idx % len(users)]
frappe.cache().set_value(idx_key, (idx + 1) % len(users))
return user.custom_voxtelesys_number
return users[0].custom_voxtelesys_number
@frappe.whitelist(allow_guest=True)
def handle_inbound_call():
if frappe.request.method != "POST": frappe.throw("Method not allowed", frappe.PermissionError)
raw_body = frappe.request.data
sig = frappe.request.headers.get("X-Voxtelesys-Signature", "")
if not _verify_signature(raw_body, sig): frappe.throw("Invalid webhook signature", frappe.PermissionError)
data = _parse_webhook_body(raw_body)
call_sid = data.get("CallSid") or data.get("call_sid") or ""
from_number = data.get("From") or data.get("from") or ""
to_number = data.get("To") or data.get("to") or ""
log = _upsert_call_log(call_sid=call_sid, direction="Inbound", status="Ringing",
from_number=from_number, to_number=to_number, start_time=now_datetime())
_link_contact(log, from_number)
settings = frappe.get_single("Voxtelesys Settings")
base_url = settings.voxml_base_url or frappe.utils.get_url()
status_cb = f"{base_url.rstrip('/')}/api/method/voxtelesys_integration.api.voxtelesys.handle_call_status"
agent_num = _pick_agent_number(settings.agent_routing_mode or "Round Robin")
if agent_num:
xml = _voxml_response(_voxml_dial(agent_num, settings.recording_enabled, status_cb))
else:
xml = _voxml_response(_voxml_say("Thank you for calling. All agents are currently unavailable. Please try again later."), _voxml_hangup())
frappe.local.response["content_type"] = "application/xml"
frappe.local.response["http_status_code"] = 200
return xml
@frappe.whitelist(allow_guest=True)
def handle_call_status():
if frappe.request.method != "POST": frappe.throw("Method not allowed", frappe.PermissionError)
data = _parse_webhook_body(frappe.request.data)
call_sid = data.get("CallSid") or data.get("call_sid") or ""
raw_status = (data.get("CallStatus") or data.get("DialCallStatus") or data.get("call_status") or "").lower()
status_map = {"ringing":"Ringing","in-progress":"In Progress","completed":"Completed",
"no-answer":"No Answer","busy":"Busy","failed":"Failed","canceled":"Cancelled","cancelled":"Cancelled"}
status = status_map.get(raw_status, "Completed")
recording_url = data.get("RecordingUrl") or data.get("recording_url") or ""
updates: dict[str, Any] = {"status": status}
if status in ("Completed","No Answer","Busy","Failed","Cancelled"):
updates["end_time"] = now_datetime()
try: updates["duration"] = int(data.get("CallDuration") or data.get("duration") or 0)
except (TypeError, ValueError): pass
if recording_url: updates["recording_url"] = recording_url
_upsert_call_log(call_sid=call_sid, **updates)
frappe.local.response["content_type"] = "application/xml"
frappe.local.response["http_status_code"] = 200
return _voxml_response()
@frappe.whitelist()
def make_outbound_call(to, from_="", reference_doctype="", reference_name=""):
settings = frappe.get_single("Voxtelesys Settings")
if not settings.enabled: frappe.throw(_("Voxtelesys integration is not enabled."))
caller_id = from_ or settings.caller_id or ""
if not caller_id: frappe.throw(_("No Caller ID configured. Set it in Voxtelesys Settings."))
base_url = settings.voxml_base_url or frappe.utils.get_url()
status_cb = f"{base_url.rstrip('/')}/api/method/voxtelesys_integration.api.voxtelesys.handle_call_status"
payload: dict[str, Any] = {"to": to, "from": caller_id, "status_callback": status_cb, "status_callback_method": "POST"}
if settings.outbound_trunk_group: payload["trunk_group"] = settings.outbound_trunk_group
if settings.recording_enabled: payload["record"] = True
try:
result = _post("/calls", payload)
except requests.HTTPError as exc:
frappe.log_error(str(exc), "Voxtelesys: Outbound call failed")
frappe.throw(_("Voxtelesys API error: {0}").format(str(exc)), title=_("Call Failed"))
call_sid = result.get("sid") or result.get("call_sid") or result.get("CallSid") or ""
log = _upsert_call_log(call_sid=call_sid, direction="Outbound", status="Ringing",
from_number=caller_id, to_number=to, start_time=now_datetime())
if reference_doctype and reference_name: _add_link(log, reference_doctype, reference_name)
return {"ok": True, "call_log": log.name, "call_sid": call_sid}
@frappe.whitelist()
def test_connection():
frappe.only_for("System Manager")
try: return {"ok": True, "data": _get("/account")}
except Exception as exc: return {"ok": False, "error": str(exc)}
def boot_session(bootinfo):
try:
settings = frappe.get_single("Voxtelesys Settings")
bootinfo.voxtelesys = {"enabled": bool(settings.enabled), "caller_id": settings.caller_id or ""}
except Exception:
bootinfo.voxtelesys = {"enabled": False}
@frappe.whitelist()
def get_linked_call_logs(doctype, name, **kwargs):
logs = frappe.db.sql("""
SELECT vcl.name, vcl.from_number, vcl.to_number, vcl.direction,
vcl.status, vcl.start_time, vcl.duration, vcl.recording_url, vcl.summary
FROM `tabVoxtelesys Call Log` vcl
INNER JOIN `tabDynamic Link` dl
ON dl.parent = vcl.name AND dl.parenttype = 'Voxtelesys Call Log'
AND dl.link_doctype = %(doctype)s AND dl.link_name = %(name)s
ORDER BY vcl.start_time DESC LIMIT 50""",
{"doctype": doctype, "name": name}, as_dict=True)
for log in logs: log["doctype"] = "Voxtelesys Call Log"
return logs
def sync_pending_call_logs():
settings = frappe.get_single("Voxtelesys Settings")
if not settings.enabled: return
stale = frappe.get_all("Voxtelesys Call Log", filters={"status": ["in",["Ringing","In Progress"]]}, fields=["name","call_sid"], limit=50)
for row in stale:
if not row.call_sid: continue
try:
data = _get(f"/calls/{row.call_sid}")
raw_status = (data.get("status") or "").lower()
status_map = {"completed":"Completed","no-answer":"No Answer","busy":"Busy","failed":"Failed","canceled":"Cancelled","cancelled":"Cancelled"}
if raw_status in status_map:
doc = frappe.get_doc("Voxtelesys Call Log", row.name)
doc.status = status_map[raw_status]; doc.end_time = now_datetime()
doc.duration = data.get("duration") or 0
doc.recording_url = data.get("recording_url") or doc.recording_url
doc.save(ignore_permissions=True)
except Exception:
frappe.log_error(frappe.get_traceback(), f"Voxtelesys: Failed to sync call log {row.name}")
def _parse_webhook_body(raw):
try: return json.loads(raw)
except (json.JSONDecodeError, ValueError): pass
return dict(frappe.form_dict)
def _upsert_call_log(call_sid, **fields):
if call_sid and frappe.db.exists("Voxtelesys Call Log", {"call_sid": call_sid}):
doc_name = frappe.db.get_value("Voxtelesys Call Log", {"call_sid": call_sid})
doc = frappe.get_doc("Voxtelesys Call Log", doc_name)
for k, v in fields.items():
if v is not None: doc.set(k, v)
doc.save(ignore_permissions=True)
else:
doc = frappe.new_doc("Voxtelesys Call Log"); doc.call_sid = call_sid
for k, v in fields.items(): doc.set(k, v)
doc.insert(ignore_permissions=True)
frappe.db.commit()
return doc
def _link_contact(doc, phone_number):
if not phone_number: return
normalised = phone_number.replace("+","").replace(" ","").replace("-","")
contact = frappe.db.sql("SELECT parent FROM `tabContact Phone` WHERE REPLACE(REPLACE(REPLACE(phone,'+',''),' ',''),'-','') LIKE %(num)s LIMIT 1", {"num": f"%{normalised}"}, as_dict=True)
if contact: _add_link(doc, "Contact", contact[0].parent); return
customer = frappe.db.get_value("Customer", {"mobile_no": ["like", f"%{normalised}"]}, "name")
if customer: _add_link(doc, "Customer", customer)
def _add_link(doc, link_doctype, link_name):
for row in doc.get("links") or []:
if row.link_doctype == link_doctype and row.link_name == link_name: return
doc.append("links", {"link_doctype": link_doctype, "link_name": link_name})
doc.save(ignore_permissions=True)