Spaces:
Sleeping
Sleeping
import pandas as pd | |
import re | |
from settings import * | |
# from utils_fcts import print_df_shape # laisser commenté : importé dans utils_fcts ; erreru import circulaire | |
def clean_cols(myl): | |
# myl = [re.sub(r'\[.*\]', '', x) for x in myl] | |
myl = [re.sub(r'[-\[\]\+ \(\)°%]', '_', x.strip()) for x in myl] | |
myl = [re.sub(r'_+', '_', x) for x in myl] | |
myl = [x.strip("_") for x in myl] | |
return myl | |
def getheadercols(r1, r2, r3): | |
# les headers ont un champ de moins que les valeurs | |
l1 = r1.split(';') #+ ["missing"] | |
l2 = r2.split(';') #+ [""] | |
l3 = r3.split(';') #+ [""] | |
assert len(l1) == len(l2) | |
assert len(l1) == len(l3) | |
for i in range(1, len(l1)): | |
if l1[i] == '': | |
l1[i] = l1[i - 1] | |
# enlever caractères spéciaux et unité | |
l1 = clean_cols(l1) | |
# Concatenate the first three rows to form new column names | |
new_columns = [f'{l1[i]}_{l2[i]}_{l3[i]}' for i in range(len(l1))] | |
new_columns = [re.sub(r'_+', '_', x) for x in new_columns] | |
new_columns[0] = db_timecol | |
return clean_cols(new_columns) | |
def file2tables(file_path): | |
error_msg = "" | |
ok_msg = "" | |
# Détection du type d'entrée : fichier ou chemin de fichier | |
if isinstance(file_path, str): # Si c'est un chemin de fichier | |
is_file_obj = False | |
f = open(file_path, encoding='latin1') | |
filepathlab = file_path | |
elif isinstance(file_path, io.StringIO): # Si c'est un objet fichier | |
is_file_obj = True | |
f = file_path # Utiliser l'objet de fichier directement | |
filepathlab = "file content" | |
else: | |
raise ValueError("file_input must be either a file path or an io.StringIO object.") | |
print("> START processing " + filepathlab) | |
time_pattern = re.compile(r'^\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}$') | |
dayP_pattern = re.compile(r'^P\d.+') | |
dayI_pattern = re.compile(r'^I\d.+') | |
#with open(file_path, encoding='latin1') as f: | |
try: | |
print(">>> start reading content") | |
# Lire les 3 lignes du header | |
header_lines = [f.readline().strip().rstrip(";") for _ in range(3)] | |
new_columns = getheadercols(header_lines[0], header_lines[1], header_lines[2]) | |
time_lines_init = [f.readline().strip().rstrip(";").split(";") for _ in | |
range(60 * 24)] | |
curr_day = time_lines_init[0][0].split(' ')[0] | |
# pas nécessaire de mettre range(3, 3 + 60 * 24)] ! _ est le pointeur | |
time_lines = [x for x in time_lines_init if len(x) == len(new_columns) and | |
time_pattern.match(x[0]) and | |
re.compile(curr_day).match(x[0])] | |
maxTime_idx = max(i for i, x in enumerate(time_lines_init) if len(x) == | |
len(new_columns) and time_pattern.match(x[0])) | |
dayPstart_idx = maxTime_idx + nHeaderSkip + 1 | |
# repositionner le pointeur | |
f.seek(0) | |
for _ in range(dayPstart_idx): | |
f.readline() | |
dayP_lines_init = [f.readline().strip().rstrip(";").split(";") for _ in | |
range(nRowsDayP)] | |
# pas nécessaire dayPstart_idx,(dayPstart_idx+nRowsDayP)) | |
dayP_lines = [x for x in dayP_lines_init if dayP_pattern.match(x[0])] | |
maxDayP_idx = max(i for i, x in enumerate(dayP_lines_init) if | |
dayP_pattern.match(x[0])) | |
dayIstart_idx = maxDayP_idx + dayPstart_idx + 1 | |
# repositionner le pointeur | |
f.seek(0) | |
for _ in range(dayIstart_idx): | |
f.readline() | |
dayI_lines_init = [f.readline().strip().rstrip(";").split(";") for _ in | |
range(nRowsDayI)] | |
# pas nécessaire range(dayIstart_idx,(dayIstart_idx+nRowsDayI) | |
dayI_lines = [x for x in dayI_lines_init if dayI_pattern.match(x[0])] | |
finally: | |
if not is_file_obj: | |
f.close() # Ne fermer que si nous avons ouvert le fichier | |
############################################################### | |
##################### EXTRAIRE LES DONNÉES TIME | |
############################################################### | |
time_data = pd.DataFrame(time_lines)#, columns=new_columns) | |
# la 1ère colonne en datetime | |
time_data.loc[:, 0] = pd.to_datetime(time_data.iloc[:, 0], | |
format='%d.%m.%Y %H:%M').dt.strftime('%Y-%m-%d %H:%M:%S') | |
# Conversion des autres colonnes en numérique (float) | |
for col in range(1, time_data.shape[1]): | |
time_data.iloc[:, col] = pd.to_numeric(time_data.iloc[:, col], errors='coerce') | |
#time_data.to_pickle('time_data_v3.pkl') | |
ntime, ncols = time_data.shape | |
time_data.columns = new_columns | |
if ntime < 60 * 24: | |
error_msg += filepathlab + " - WARNING : missing 'time' data (available : " + str(ntime) + "/" + str(60 * 24) | |
elif ntime == 60 * 24: | |
ok_msg += filepathlab + " - SUCCESS reading 'time' data " | |
print(','.join(list(set(time_real_cols + time_txt_cols)-set(time_data.columns)))) | |
assert time_data.columns.isin(time_real_cols + time_txt_cols).all() | |
time_missingcols = list(set(time_real_cols + time_txt_cols) - | |
set(time_data.columns)) | |
if len(time_missingcols) > 0: | |
error_msg += "\n"+ filepathlab + " - WARNING : missing 'time' data columns (" + ','.join(time_missingcols) + ")\n" | |
else: | |
ok_msg += "\n"+ filepathlab + " - SUCCESS found all 'time' data columns\n" | |
date_day = pd.to_datetime(curr_day, format='%d.%m.%Y').strftime('%Y-%m-%d') | |
############################################################### | |
##################### EXTRAIRE LES DONNÉES DAY P | |
############################################################### | |
dayP_dataL = pd.DataFrame(dayP_lines) | |
dayP_datam = dayP_dataL.melt(id_vars=[0], | |
value_vars=list(dayP_dataL.columns), | |
var_name='variable', | |
value_name='value') | |
dayP_datam.iloc[:, 0] = dayP_datam.iloc[:, 0] + "_" + dayP_datam.iloc[:, 1].astype(str) | |
dayP_datam.drop(columns=['variable'], inplace=True) | |
dayP_data = dayP_datam.T | |
dayP_data.columns = dayP_data.iloc[0] | |
dayP_data = dayP_data[1:] | |
dayP_data.insert(0, day_txt_cols[0], date_day) | |
ndayP, ndayPcols = dayP_data.shape | |
assert ndayP <= 1 | |
if ndayP == 0: | |
error_msg += "\n"+ filepathlab + " - WARNING : no 'dayP' data found" | |
else : | |
ok_msg += filepathlab + " - SUCCESS reading 'dayP' data " | |
assert ndayPcols <= len(dayP_real_cols) + len(day_txt_cols) | |
dayP_missingcols = list(set(dayP_real_cols + day_txt_cols) - set(dayP_data.columns)) | |
if len(dayP_missingcols) > 0: | |
error_msg += "\n"+ filepathlab + " - WARNING : missing 'dayP' data columns (" + \ | |
','.join(dayP_missingcols) + ")\n" | |
else: | |
ok_msg += "\n"+ filepathlab + " - SUCCESS found all 'dayP' data columns\n" | |
############################################################### | |
##################### EXTRAIRE LES DONNÉES DAY I | |
############################################################### | |
dayI_dataL = pd.DataFrame(dayI_lines) | |
dayI_datam = dayI_dataL.melt(id_vars=[0], | |
value_vars=list(dayI_dataL.columns), | |
var_name='variable', | |
value_name='value') | |
dayI_datam.iloc[:, 0] = dayI_datam.iloc[:, 0] + "_" + dayI_datam.iloc[:, 1].astype(str) | |
dayI_datam.drop(columns=['variable'], inplace=True) | |
dayI_data = dayI_datam.T | |
dayI_data.columns = dayI_data.iloc[0] | |
dayI_data = dayI_data[1:] | |
dayI_data.insert(0, day_txt_cols[0], date_day) | |
# Conversion des autres colonnes en numérique (float) | |
for col in range(1, dayI_data.shape[1]): | |
dayI_data.iloc[:, col] = pd.to_numeric(dayI_data.iloc[:, col], errors='coerce') | |
ndayI, ndayIcols = dayI_data.shape | |
assert ndayI <= 1 | |
if ndayI == 0: | |
error_msg += "\n"+ filepathlab + " - WARNING : no 'dayI' data found\n" | |
else : | |
ok_msg += filepathlab + " - SUCCESS reading 'dayI' data\n" | |
assert ndayIcols <= len(dayI_real_cols) + len(day_txt_cols) | |
dayI_missingcols = list(set(dayI_real_cols + day_txt_cols) - set(dayI_data.columns)) | |
if len(dayI_missingcols) > 0: | |
error_msg += "\n"+ filepathlab + " - WARNING : missing 'dayI' data columns (" + \ | |
','.join(dayI_missingcols) + ")\n" | |
else: | |
ok_msg += "\n"+ filepathlab + " - SUCCESS found all 'dayI' data columns\n" | |
for col in time_data.columns: | |
if col.endswith('_L1'): | |
col_l2 = col.replace('_L1', '_L2') | |
if col_l2 in time_data.columns: | |
new_col = col.replace('_L1', '_L1_L2') | |
if not new_col in time_data.columns: | |
time_data[new_col] = time_data[col] + time_data[col_l2] | |
return {"time_data" : time_data, | |
"dayP_data" : dayP_data, | |
"dayI_data" : dayI_data, | |
"error" : error_msg, | |
"success" : ok_msg} | |
def create_and_insert(timeData, dayiData, daypData=None): | |
# Connexion à la base de données SQLite | |
conn = sqlite3.connect(db_file) | |
c = conn.cursor() | |
# Créer les tables si pas existant | |
c.execute('''CREATE TABLE IF NOT EXISTS ''' + dbTime_name + "(" + | |
','.join([x + " TEXT" for x in time_txt_cols]) + "," + | |
','.join([x + " REAL" for x in time_real_cols + time_added_cols]) + ''' | |
)''') | |
if daypData: | |
c.execute('''CREATE TABLE IF NOT EXISTS ''' + dbDayP_name + "(" + | |
','.join([x + " TEXT" for x in day_txt_cols]) + "," + | |
','.join([x + " REAL" for x in dayP_real_cols]) + ''' | |
)''') | |
c.execute('''CREATE TABLE IF NOT EXISTS ''' + dbDayI_name + "(" + | |
','.join([x + " TEXT" for x in day_txt_cols]) + "," + | |
','.join([x + " REAL" for x in dayI_real_cols]) + ''' | |
)''') | |
conn.commit() | |
# Insérer les données dans la base de données | |
timeData.to_sql(dbTime_name, conn, | |
if_exists='append', index=False) | |
if daypData: | |
daypData.to_sql(dbDayP_name, conn, | |
if_exists='append', index=False) | |
dayiData.to_sql(dbDayI_name, conn, | |
if_exists='append', index=False) | |
# Il est important de fermer la connexion une fois que toutes les opérations sont complétées | |
conn.close() | |
def create_and_concat(timeData, dayiData,currentTimeData, | |
currentDayiData, currentDaypData = None,daypData=None): | |
out_txt = "" | |
out_txt += "receive time data to add : " + print_df_shape(timeData) | |
out_txt += "receive dayI data to add : " + print_df_shape(dayiData) | |
### pas implémnté pour dayp | |
if currentTimeData : | |
newTimeData = pd.concat([timeData, currentTimeData], axis=0) | |
else: | |
newTimeData = timeData | |
if currentDayiData: | |
newDayiData = pd.concat([dayiData, currentDayiData], axis=0) | |
else: | |
newDayiData = dayiData | |
out_txt += "return updated time data : " + print_df_shape(newTimeData) | |
out_txt += "return updated dayI data : " + print_df_shape(newDayiData) | |
return{"new_dayI_data" : newDayiData, | |
"new_time_data": newTimeData, | |
"msg" : out_txt | |
} | |