File size: 2,499 Bytes
8e976dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
04e5f87
8e976dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
04e5f87
 
 
8e976dc
 
 
 
 
 
04e5f87
8e976dc
 
 
 
 
 
 
 
04e5f87
8e976dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
04e5f87
8e976dc
 
 
 
 
 
 
 
 
04e5f87
8e976dc
 
04e5f87
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import gradio as gr
import time

SNOWFLAKE_EPOCH = 1288834974657


def sequence_id(id):
    return id & 0b111111111111


def machine_id(id):
    return (id >> 12) & 0b1111111111


def creation_time(id):
    return id >> 22


def snow_to_components(snowflake_id: int):
    timestamp_ms = creation_time(snowflake_id) + SNOWFLAKE_EPOCH
    # datacenter_id_val = datacenter_id(snowflake_id)
    machine_id_val = machine_id(snowflake_id)
    sequence_id_val = sequence_id(snowflake_id)
    readable_time = time.strftime(
        "%Y-%m-%d %H:%M:%S", time.localtime(timestamp_ms / 1000)
    )
    return (
        readable_time,
        timestamp_ms,
        machine_id_val,
        sequence_id_val,
    )


def components_to_snow(
    timestamp_ms: int,
    machine_id_val: int,
    sequence_id_val: int,
):
    # timestamp_bits = bin()[2:].zfill(41)
    time_component = bin(int(timestamp_ms) - SNOWFLAKE_EPOCH)
    return int(time_component[2:].zfill(42) + bin(machine_id_val)[2:].zfill(10) + bin(sequence_id_val)[2:].zfill(12),2)


def update_components(snowflake_input_str):
    if snowflake_input_str:
        snowflake_input = int(snowflake_input_str)
        return snow_to_components(snowflake_input)
    return None, None, None, None, None


def update_snowflake(
    _,
    timestamp_input,
    machine_input,
    sequence_input,
):
    if all([timestamp_input, machine_input, sequence_input]):
        snowflake_id = components_to_snow(
            int(timestamp_input),
            int(machine_input),
            int(sequence_input),
        )
        return str(snowflake_id)
    return None


with gr.Blocks() as demo:
    gr.Markdown("## Snowflake ID Converter")
    with gr.Row():
        snowflake = gr.Textbox(label="Snowflake ID", interactive=True)
    with gr.Accordion("Snowflake ID Components", open=False):
        readable_time = gr.Textbox(label="Timestamp (Readable)", interactive=False)
        timestamp = gr.Number(0,label="Timestamp (ms)", interactive=True)
        machine = gr.Number(0, label="Machine ID", step=1, interactive=True)
        sequence = gr.Number(0, label="Sequence ID", step=1, interactive=True)
        id_components = [
            readable_time,
            timestamp,
            machine,
            sequence,
        ]
    snowflake.input(update_components, inputs=snowflake, outputs=id_components)
    for component in [timestamp, machine, sequence]:
        component.input(update_snowflake, inputs=id_components, outputs=snowflake)

demo.launch()