sghorbal commited on
Commit
e6ef7ae
·
1 Parent(s): 705f8f0

Initial commit

Browse files
Files changed (6) hide show
  1. .env.example +4 -0
  2. Dockerfile +20 -0
  3. README.md +13 -0
  4. dags/new_transaction.py +102 -0
  5. entrypoint.sh +24 -0
  6. requirements.txt +4 -0
.env.example ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:password@host:5432/airflow
2
+ AIRFLOW__CORE__EXECUTOR=LocalExecutor
3
+ AIRFLOW__WEBSERVER__SECRET_KEY=your_secret_key_here
4
+ AIRFLOW__CORE__LOAD_EXAMPLES=False
Dockerfile ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM apache/airflow:slim-2.10.5-python3.9
2
+
3
+ COPY requirements.txt /
4
+ COPY dags /opt/airflow/dags
5
+ COPY entrypoint.sh /entrypoint.sh
6
+
7
+ USER root
8
+ RUN chmod +x /entrypoint.sh
9
+
10
+ USER airflow
11
+ # Install system dependencies
12
+ RUN pip install --no-cache-dir -r /requirements.txt
13
+
14
+ # Expose the web server port
15
+ EXPOSE 8088
16
+
17
+ # Expose the scheduler port
18
+ EXPOSE 8793
19
+
20
+ ENTRYPOINT ["/entrypoint.sh"]
README.md CHANGED
@@ -1,2 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # fraud-detection-orchestrator
2
  Airflow orchestrator for automation of the fraud detection pipeline
 
1
+ ---
2
+ title: Fraud Detection Pipeline Orchestrator
3
+ emoji: ⚡
4
+ colorFrom: purple
5
+ colorTo: yellow
6
+ sdk: docker
7
+ app_port: 8088
8
+ pinned: false
9
+ short_description: Orchestrator of the fraud detection pipeline
10
+ ---
11
+
12
+ Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
13
+
14
  # fraud-detection-orchestrator
15
  Airflow orchestrator for automation of the fraud detection pipeline
dags/new_transaction.py ADDED
@@ -0,0 +1,102 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import requests
3
+ from datetime import datetime, timedelta
4
+ from airflow import DAG
5
+ from airflow.decorators import task
6
+ import json
7
+
8
+ default_args = {
9
+ "owner": "airflow",
10
+ "start_date": datetime.now() - timedelta(minutes=5),
11
+ "catchup": False
12
+ }
13
+
14
+ @task(task_id="pull_transaction")
15
+ def _pull_transaction(ti):
16
+ """
17
+ Pulls a new transaction from the fraud detection service and pushes it to XCom.
18
+ """
19
+ response = requests.get("https://charlestng-real-time-fraud-detection.hf.space/current-transactions")
20
+
21
+ # Check response status code
22
+ response.raise_for_status()
23
+
24
+ # Load the JSON data
25
+ str_data = response.json()
26
+ data = json.loads(str_data)
27
+
28
+ transaction_dict = {key: value for key, value in zip(data['columns'], data['data'][0])}
29
+
30
+ # Push the transaction dictionary to XCom
31
+ ti.xcom_push(key="transaction_dict", value=transaction_dict)
32
+
33
+ logging.info(f"Fetched data: {transaction_dict}")
34
+
35
+ @task(task_id="push_transaction")
36
+ def _push_transaction(ti):
37
+ """
38
+ Pushes the transaction data to the fraud detection pipeline.
39
+ This function is called after pulling the transaction data.
40
+ It maps the transaction data to the required parameters for the API call.
41
+ """
42
+ params_mapping = {
43
+ 'transaction_number': 'trans_num',
44
+ 'transaction_amount': 'amt',
45
+ 'transaction_timestamp': 'current_time',
46
+ 'transaction_category': 'category',
47
+ 'customer_credit_card_number': 'cc_num',
48
+ 'customer_first_name': 'first',
49
+ 'customer_last_name': 'last',
50
+ 'customer_gender': 'gender',
51
+ 'merchant_name': 'merchant',
52
+ 'merchant_latitude': 'merch_lat',
53
+ 'merchant_longitude': 'merch_long',
54
+ 'customer_latitude': 'lat',
55
+ 'customer_longitude': 'long',
56
+ 'customer_city': 'city',
57
+ 'customer_state': 'state',
58
+ 'customer_zip': 'zip',
59
+ 'customer_city_population': 'city_pop',
60
+ 'customer_job': 'job',
61
+ 'customer_dob': 'dob',
62
+ 'is_fraud': 'is_fraud',
63
+ }
64
+
65
+ # Pull the transaction dictionary from XCom
66
+ transaction_dict = ti.xcom_pull(task_ids="pull_transaction", key="transaction_dict")
67
+
68
+ # Check if the transaction dictionary is empty
69
+ if not transaction_dict:
70
+ logging.error("No transaction data found.")
71
+ return
72
+
73
+ # Call the fraud detection pipeline with the transaction dictionary
74
+ data = {key: transaction_dict[value] for key, value in params_mapping.items()}
75
+
76
+ api_response = requests.post(
77
+ url='https://slimg-fraud-detection-service-api.hf.space/transaction/process',
78
+ params=data,
79
+ headers={
80
+ 'Content-Type': 'application/json',
81
+ 'Authorization': 'je_suis_une_clef_d_api',
82
+ },
83
+ )
84
+
85
+ # Check response status code
86
+ api_response.raise_for_status()
87
+
88
+ # Load the JSON data
89
+ data = api_response.json()
90
+
91
+ logging.info(f"Fraud detection response: {data}")
92
+
93
+ with DAG(dag_id="process_new_transaction",
94
+ default_args=default_args,
95
+ schedule_interval="*/5 * * * *") as dag:
96
+ """
97
+ DAG to fetch a new transaction and call the fraud detection pipeline
98
+ """
99
+ pull = _pull_transaction()
100
+ push = _push_transaction()
101
+
102
+ pull >> push
entrypoint.sh ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/bash
2
+
3
+ # Charger les variables d'environnement
4
+ if [ -f .env ]; then
5
+ export $(grep -v '^#' .env | xargs)
6
+ fi
7
+
8
+ # Init DB si besoin
9
+ airflow db upgrade
10
+
11
+ # Créer un user admin si aucun n'existe
12
+ airflow users create \
13
+ --username admin \
14
+ --firstname Admin \
15
+ --lastname User \
16
+ --role Admin \
17
+ --email [email protected] \
18
+ --password admin || true
19
+
20
+ # Lancer le scheduler en background
21
+ airflow scheduler &
22
+
23
+ # Lancer le webserver (en foreground, pour que le conteneur reste "vivant")
24
+ exec airflow webserver --port 8088
requirements.txt ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ requests
2
+ sqlalchemy
3
+ psycopg2-binary
4
+ apache-airflow==2.10.5