add callback manager , model app
Browse files- callbackmanager.py +5 -0
- meldrx.py +3 -92
callbackmanager.py
CHANGED
|
@@ -4,6 +4,11 @@ from meldrx import MeldRxAPI
|
|
| 4 |
import json
|
| 5 |
import os
|
| 6 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 7 |
class CallbackManager:
|
| 8 |
def __init__(self, redirect_uri: str, client_secret: str = None):
|
| 9 |
client_id = os.getenv("APPID")
|
|
|
|
| 4 |
import json
|
| 5 |
import os
|
| 6 |
|
| 7 |
+
import gradio as gr
|
| 8 |
+
from meldrx import MeldRxAPI
|
| 9 |
+
import json
|
| 10 |
+
import os
|
| 11 |
+
|
| 12 |
class CallbackManager:
|
| 13 |
def __init__(self, redirect_uri: str, client_secret: str = None):
|
| 14 |
client_id = os.getenv("APPID")
|
meldrx.py
CHANGED
|
@@ -22,7 +22,7 @@ class MeldRxAPI:
|
|
| 22 |
self.session = requests.Session()
|
| 23 |
|
| 24 |
def _generate_code_verifier(self) -> str:
|
| 25 |
-
self.code_verifier = secrets.token_urlsafe(32) # 43 characters
|
| 26 |
return self.code_verifier
|
| 27 |
|
| 28 |
def _generate_code_challenge(self, code_verifier: str) -> str:
|
|
@@ -43,7 +43,7 @@ class MeldRxAPI:
|
|
| 43 |
token_data = response.json()
|
| 44 |
self.access_token = token_data.get("access_token")
|
| 45 |
if not self.access_token:
|
| 46 |
-
raise ValueError("No access token received
|
| 47 |
return True
|
| 48 |
except requests.RequestException as e:
|
| 49 |
print(f"Authentication failed: {e}")
|
|
@@ -107,7 +107,7 @@ class MeldRxAPI:
|
|
| 107 |
token_data = response.json()
|
| 108 |
self.access_token = token_data.get("access_token")
|
| 109 |
if not self.access_token:
|
| 110 |
-
raise ValueError("No access token received
|
| 111 |
return True
|
| 112 |
except requests.RequestException as e:
|
| 113 |
print(f"Authentication failed: {e}")
|
|
@@ -115,96 +115,7 @@ class MeldRxAPI:
|
|
| 115 |
except ValueError as e:
|
| 116 |
print(f"Authentication error: {e}")
|
| 117 |
return False
|
| 118 |
-
|
| 119 |
-
def _get_headers(self) -> Dict[str, str]:
|
| 120 |
-
"""
|
| 121 |
-
Generate headers with the Bearer token for authenticated requests.
|
| 122 |
-
|
| 123 |
-
Returns:
|
| 124 |
-
Dict[str, str]: Headers dictionary with Authorization token if available.
|
| 125 |
-
"""
|
| 126 |
-
headers = {"Content-Type": "application/json"}
|
| 127 |
-
if self.access_token:
|
| 128 |
-
headers["Authorization"] = f"Bearer {self.access_token}"
|
| 129 |
-
return headers
|
| 130 |
-
|
| 131 |
-
def get_patients(self) -> Optional[Dict[str, Any]]:
|
| 132 |
-
"""
|
| 133 |
-
Retrieve a list of patients from the specified workspace (FHIR API).
|
| 134 |
-
|
| 135 |
-
Returns:
|
| 136 |
-
Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise.
|
| 137 |
-
"""
|
| 138 |
-
url = f"{self.fhir_base_url}/Patient"
|
| 139 |
-
|
| 140 |
-
if not self.access_token and not self.authenticate():
|
| 141 |
-
print("Cannot proceed without authentication.")
|
| 142 |
-
return None
|
| 143 |
-
|
| 144 |
-
try:
|
| 145 |
-
response = self.session.get(url, headers=self._get_headers())
|
| 146 |
-
response.raise_for_status()
|
| 147 |
-
return response.json() if response.text else {}
|
| 148 |
-
except requests.RequestException as e:
|
| 149 |
-
print(f"Failed to retrieve patients: {e}")
|
| 150 |
-
return None
|
| 151 |
|
| 152 |
-
def get_authorization_url(self, scope: str = "patient/*.read openid profile", state: str = "random_state") -> str:
|
| 153 |
-
"""
|
| 154 |
-
Generate the SMART on FHIR authorization URL for patient login.
|
| 155 |
-
|
| 156 |
-
Args:
|
| 157 |
-
scope (str): The requested scopes (default: patient/*.read openid profile).
|
| 158 |
-
state (str): A random state string for security (default: random_state).
|
| 159 |
-
|
| 160 |
-
Returns:
|
| 161 |
-
str: The authorization URL to open in a browser.
|
| 162 |
-
"""
|
| 163 |
-
params = {
|
| 164 |
-
"response_type": "code",
|
| 165 |
-
"client_id": self.client_id,
|
| 166 |
-
"redirect_uri": self.redirect_uri,
|
| 167 |
-
"scope": scope,
|
| 168 |
-
"state": state,
|
| 169 |
-
"aud": self.fhir_base_url
|
| 170 |
-
}
|
| 171 |
-
query_string = "&".join(f"{k}={v}" for k, v in params.items())
|
| 172 |
-
return f"{self.authorize_url}?{query_string}"
|
| 173 |
-
|
| 174 |
-
def authenticate_with_code(self, auth_code: str) -> bool:
|
| 175 |
-
"""
|
| 176 |
-
Exchange an authorization code for an access token.
|
| 177 |
-
|
| 178 |
-
Args:
|
| 179 |
-
auth_code (str): The authorization code from the redirect URI.
|
| 180 |
-
|
| 181 |
-
Returns:
|
| 182 |
-
bool: True if authentication is successful, False otherwise.
|
| 183 |
-
"""
|
| 184 |
-
payload = {
|
| 185 |
-
"grant_type": "authorization_code",
|
| 186 |
-
"code": auth_code,
|
| 187 |
-
"redirect_uri": self.redirect_uri,
|
| 188 |
-
"client_id": self.client_id
|
| 189 |
-
}
|
| 190 |
-
if self.client_secret:
|
| 191 |
-
payload["client_secret"] = self.client_secret
|
| 192 |
-
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
| 193 |
-
|
| 194 |
-
try:
|
| 195 |
-
response = self.session.post(self.token_url, data=payload, headers=headers)
|
| 196 |
-
response.raise_for_status()
|
| 197 |
-
token_data = response.json()
|
| 198 |
-
self.access_token = token_data.get("access_token")
|
| 199 |
-
if not self.access_token:
|
| 200 |
-
raise ValueError("No access token received from the server.")
|
| 201 |
-
return True
|
| 202 |
-
except requests.RequestException as e:
|
| 203 |
-
print(f"Authentication failed: {e}")
|
| 204 |
-
return False
|
| 205 |
-
except ValueError as e:
|
| 206 |
-
print(f"Authentication error: {e}")
|
| 207 |
-
return False
|
| 208 |
def create_virtual_workspace(self, snapshot: str = "patient-prefetch",
|
| 209 |
patient_id: str = "AutoPopulatedIfNotManuallySet",
|
| 210 |
hook: str = "patient-view") -> bool:
|
|
|
|
| 22 |
self.session = requests.Session()
|
| 23 |
|
| 24 |
def _generate_code_verifier(self) -> str:
|
| 25 |
+
self.code_verifier = secrets.token_urlsafe(32) # 43 characters
|
| 26 |
return self.code_verifier
|
| 27 |
|
| 28 |
def _generate_code_challenge(self, code_verifier: str) -> str:
|
|
|
|
| 43 |
token_data = response.json()
|
| 44 |
self.access_token = token_data.get("access_token")
|
| 45 |
if not self.access_token:
|
| 46 |
+
raise ValueError("No access token received.")
|
| 47 |
return True
|
| 48 |
except requests.RequestException as e:
|
| 49 |
print(f"Authentication failed: {e}")
|
|
|
|
| 107 |
token_data = response.json()
|
| 108 |
self.access_token = token_data.get("access_token")
|
| 109 |
if not self.access_token:
|
| 110 |
+
raise ValueError("No access token received.")
|
| 111 |
return True
|
| 112 |
except requests.RequestException as e:
|
| 113 |
print(f"Authentication failed: {e}")
|
|
|
|
| 115 |
except ValueError as e:
|
| 116 |
print(f"Authentication error: {e}")
|
| 117 |
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 118 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 119 |
def create_virtual_workspace(self, snapshot: str = "patient-prefetch",
|
| 120 |
patient_id: str = "AutoPopulatedIfNotManuallySet",
|
| 121 |
hook: str = "patient-view") -> bool:
|