Spaces:
Runtime error
Runtime error
| import sqlite3 | |
| import time | |
| import atexit | |
| class FileDict: | |
| def __init__(self, file_path, buffer_size=100000, buffer_idle_time=5, table='filedict'): | |
| self.file_path = file_path | |
| self.table = table | |
| self.conn = sqlite3.connect(file_path,check_same_thread=False) | |
| self.conn.execute('CREATE TABLE IF NOT EXISTS {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table)) | |
| self.buffer = [] | |
| self.buffer_size = buffer_size | |
| self.last_commit_time = time.time() | |
| self.buffer_idle_time = buffer_idle_time | |
| atexit.register(self.close) | |
| def get(self, key): | |
| try:return self.__getitem__(key) | |
| except KeyError: return None | |
| def __getitem__(self, key): | |
| self._check_buffer() | |
| cursor = self.conn.execute('SELECT value FROM {} WHERE key = ?'.format(self.table), (key,)) | |
| result = cursor.fetchone() | |
| if result is None: | |
| raise KeyError(key) | |
| return result[0] | |
| def Tables(self): | |
| cursor = self.conn.execute("SELECT name FROM sqlite_master WHERE type='table';") | |
| tables = cursor.fetchall() | |
| table_names = [t[0] for t in tables] | |
| return table_names | |
| def __setitem__(self, key, value): | |
| try: | |
| self.check_key(key) | |
| self.buffer.append(('set', key, value)) | |
| self._check_buffer() | |
| except sqlite3.IntegrityError: | |
| self.buffer.append(('update', key, value)) | |
| self._check_buffer() | |
| def __delitem__(self, key): | |
| self.buffer.append(('del', key)) | |
| self._check_buffer() | |
| def __iter__(self): | |
| self._check_buffer() | |
| cursor = self.conn.execute('SELECT key FROM {}'.format(self.table)) | |
| while True: | |
| result=cursor.fetchone() | |
| if not result or result is None: break | |
| yield result[0] | |
| cursor.close() | |
| #raise StopIteration | |
| def items(self): | |
| self._check_buffer() | |
| cursor = self.conn.execute('SELECT key, value FROM {}'.format(self.table)) | |
| while True: | |
| result=cursor.fetchone() | |
| if not result or result is None: break | |
| yield result | |
| cursor.close() | |
| #raise StopIteration | |
| #return cursor.fetchall() | |
| def from_dict(self, dict): | |
| self.check_dict(dict) | |
| self.conn.execute('DROP TABLE IF EXISTS {}'.format(self.table)) | |
| self.conn.execute('CREATE TABLE {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table)) | |
| self.conn.executemany('INSERT INTO {} (key, value) VALUES (?, ?)'.format(self.table), dict.items()) | |
| self.conn.commit() | |
| def add_items(self, items): | |
| for key, value in items.items(): | |
| try: | |
| self.check_key(key) | |
| self.buffer.append(('set', key, value)) | |
| self._check_buffer() | |
| except sqlite3.IntegrityError: | |
| self.buffer.append(('update', key, value)) | |
| self._check_buffer() | |
| self._check_buffer() | |
| def _check_buffer(self): | |
| if not self.buffer: | |
| return | |
| idle_time = time.time() - self.last_commit_time | |
| if len(self.buffer) >= self.buffer_size or idle_time >= self.buffer_idle_time: | |
| self._commit() | |
| def _commit(self): | |
| if not self.buffer: | |
| return | |
| cursor = self.conn.cursor() | |
| for op in self.buffer: | |
| if op[0] == 'set': | |
| cursor.execute('INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)'.format(self.table), (op[1], op[2])) | |
| elif op[0] == 'update': | |
| cursor.execute('UPDATE {} SET value = ? WHERE key = ?'.format(self.table), (op[2], op[1])) | |
| elif op[0] == 'del': | |
| cursor.execute('DELETE FROM {} WHERE key = ?'.format(self.table), (op[1],)) | |
| self.buffer = [] | |
| self.last_commit_time = time.time() | |
| self.conn.commit() | |
| def check_dict(self, dictionary): | |
| for key in dictionary: | |
| self.check_key(key) | |
| def check_key(self, key): | |
| if not isinstance(key, str): | |
| raise TypeError('Keys must be strings.') | |
| if not key: | |
| raise ValueError('Keys cannot be empty strings.') | |
| def search_keys(self, pattern, like=True, values=False): | |
| self._check_buffer() | |
| operator = 'LIKE' if like else '=' | |
| cursor = self.conn.cursor() | |
| cursor.execute(f"SELECT key FROM {self.table} WHERE key {operator} ?", (pattern,)) | |
| while True: | |
| result=cursor.fetchone() | |
| if not result or result is None: break | |
| yield result[0] | |
| cursor.close() | |
| def close(self): | |
| self._commit() | |
| try: | |
| self.conn.commit() | |
| except: | |
| pass | |
| self.conn.close() | |