qgallouedec HF staff commited on
Commit
7bdd956
·
verified ·
1 Parent(s): b797456

Update run_job.py

Browse files
Files changed (1) hide show
  1. run_job.py +1 -69
run_job.py CHANGED
@@ -1,69 +1 @@
1
- import fire
2
-
3
- CONFIG = {
4
- "preserve_insertion_order": False
5
- }
6
-
7
- CMD_SRC_KWARGS = """
8
- SELECT ('hf://datasets/{src}/' || lo.arguments['splits']['{split}']) AS path, function
9
- FROM (
10
- SELECT unnest(li.loading_codes) AS lo, li.function[4:] as function
11
- FROM (
12
- SELECT unnest(libraries) as li
13
- FROM read_json('https://datasets-server.huggingface.co/compatible-libraries?dataset={src}')
14
- ) WHERE li.function[:3] = 'pl.'
15
- ) WHERE lo.config_name='{config}';
16
- """.strip()
17
-
18
- CMD_SRC = """
19
- CREATE VIEW src AS SELECT * FROM {function}('{path}');
20
- """.strip()
21
-
22
-
23
- CMD_DST = """
24
- COPY ({query}) to 'tmp' (FORMAT PARQUET, ROW_GROUP_SIZE_BYTES '100MB', ROW_GROUPS_PER_FILE 5, PER_THREAD_OUTPUT true);
25
- """.strip()
26
-
27
- CMD_SRC_DRY_RUN = CMD_SRC[:-1] + " LIMIT 5;"
28
- CMD_DST_DRY_RUN = "{query};"
29
-
30
- DATA_CARD = "# Dataset Card for {dst}\n\nDataset prepared from [{src}](https://huggingface.co/datasets/{src}) using\n\n```\n{query}\n```\n"
31
-
32
- def sql(src: str, dst: str, query: str, config: str = "default", split: str = "train", private: bool = False, dry_run: bool = False):
33
- import os
34
- import duckdb
35
- from huggingface_hub import CommitScheduler, DatasetCard
36
-
37
- class CommitAndCleanScheduler(CommitScheduler):
38
-
39
- def push_to_hub(self):
40
- for path in self.folder_path.with_name("tmp").glob("*.parquet"):
41
- with path.open("rb") as f:
42
- footer = f.read(4) and f.seek(-4, os.SEEK_END) and f.read(4)
43
- if footer == b"PAR1":
44
- path.rename(self.folder_path / path.name)
45
- super().push_to_hub()
46
- for path in self.last_uploaded:
47
- path.unlink(missing_ok=True)
48
-
49
- con = duckdb.connect(":memory:", config=CONFIG)
50
- src_kwargs = con.sql(CMD_SRC_KWARGS.format(src=src, config=config, split=split)).df().to_dict(orient="records")
51
- if not src_kwargs:
52
- raise ValueError(f'Invalid --config "{config}" for dataset "{src}", please select a valid dataset config/subset.')
53
-
54
- con.sql((CMD_SRC_DRY_RUN if dry_run else CMD_SRC).format(**src_kwargs[0]))
55
-
56
- if dry_run:
57
- print(f"Sample data from '{src}' that would be written to dataset '{dst}':\n")
58
- result = con.sql(CMD_DST_DRY_RUN.format(query=query.rstrip("\n ;")))
59
- print(result.df().to_markdown())
60
- return
61
-
62
- with CommitAndCleanScheduler(repo_id=dst, repo_type="dataset", folder_path="dst", path_in_repo="data", every=0.1, private=private):
63
- con.sql("PRAGMA enable_progress_bar;")
64
- result = con.sql(CMD_DST.format(query=query.rstrip("\n ;")))
65
- DatasetCard(DATA_CARD.format(src=src, dst=dst, query=query)).push_to_hub(repo_id=dst, repo_type="dataset")
66
- print("done")
67
-
68
- if __name__ == '__main__':
69
- fire.Fire(sql)
 
1
+ print("hi")