GitHub Action commited on
Commit
402975a
·
1 Parent(s): 86183a9

Sync from GitHub with Git LFS

Browse files
Files changed (1) hide show
  1. agents/peer_sync.py +132 -168
agents/peer_sync.py CHANGED
@@ -8,9 +8,10 @@ import uuid
8
  import ipaddress
9
  import re
10
  import netifaces
 
 
11
  from tools.storage import Storage
12
  from datetime import datetime
13
- import select
14
 
15
  storage = Storage()
16
  my_id = storage.get_config_value("agent_id", str(uuid.uuid4()))
@@ -126,147 +127,107 @@ def expand_and_filter(addresses):
126
  return out
127
 
128
  # ======================
129
- # IPv4 UDP LAN Discovery (приём + рассылка по /24)
130
  # ======================
131
- def udp_lan_discovery_v4():
132
  DISCOVERY_INTERVAL = 30
 
133
  local_addresses = storage.get_config_value("local_addresses", [])
134
- # порты, на которые слать/listen
 
135
  udp_port_set = set()
136
  for a in local_addresses:
137
- try:
138
- _proto, hostport = a.split("://", 1)
139
- _h, p = parse_hostport(hostport)
140
- if p:
141
- udp_port_set.add(p)
142
- except:
143
- continue
144
 
145
- # слушатели IPv4
146
  listen_sockets = []
147
  for port in udp_port_set:
 
148
  try:
149
  sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
150
  sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
151
  sock.bind(("", port))
152
  listen_sockets.append(sock)
153
- print(f"[UDPv4] Слушаем UDP на 0.0.0.0:{port}")
154
  except Exception as e:
155
- print(f"[UDPv4] Не удалось слушать порт {port}: {e}")
156
-
157
- send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
158
- send_sock.settimeout(0.5)
159
 
160
- while True:
161
- # --- приём ---
162
- for sock in listen_sockets:
163
- try:
164
- sock.settimeout(0.05)
165
- data, addr = sock.recvfrom(4096)
166
- msg = json.loads(data.decode("utf-8"))
167
- peer_id = msg.get("id")
168
- if not peer_id or peer_id == my_id:
169
- continue
170
- name = msg.get("name", "unknown")
171
- addresses = msg.get("addresses", [])
172
- expanded = expand_and_filter(addresses)
173
- if expanded:
174
- print(f"[UDPv4] RX от {addr} -> {expanded}")
175
- storage.add_or_update_peer(peer_id, name, expanded, "discovery", "online")
176
- except socket.timeout:
177
- pass
178
- except Exception as e:
179
- print(f"[UDPv4] Ошибка при обработке пакета: {e}")
180
-
181
- # --- рассылка ---
182
- v4s = get_all_local_ips_v4()
183
- msg_data = json.dumps({"id": my_id, "name": agent_name, "addresses": local_addresses}).encode("utf-8")
184
- for local_ip in v4s:
185
- try:
186
- net = ipaddress.ip_network(local_ip + "/24", strict=False)
187
- except Exception:
188
- continue
189
- for ip in net.hosts():
190
- ip_str = str(ip)
191
- if ip_str == local_ip or ip_str.startswith("127.") or ip_str.startswith("0."):
192
- continue
193
- for port in udp_port_set:
194
- try:
195
- send_sock.sendto(msg_data, (ip_str, port))
196
- except:
197
- pass
198
-
199
- time.sleep(DISCOVERY_INTERVAL)
200
-
201
- # ======================
202
- # IPv6 UDP Discovery (приём + мультикаст ff02::1)
203
- # ======================
204
- def udp_lan_discovery_v6():
205
- DISCOVERY_INTERVAL = 30
206
- local_addresses = storage.get_config_value("local_addresses", [])
207
- udp_port_set = set()
208
- for a in local_addresses:
209
- try:
210
- _proto, hostport = a.split("://", 1)
211
- h, p = parse_hostport(hostport)
212
- if p:
213
- udp_port_set.add(p)
214
- except:
215
- continue
216
-
217
- # слушатели IPv6
218
- listen_sockets = []
219
- for port in udp_port_set:
220
  try:
221
  sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
222
  sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
223
  sock6.bind(("::", port))
224
  listen_sockets.append(sock6)
225
- print(f"[UDPv6] Слушаем UDP на [::]:{port}")
226
  except Exception as e:
227
- print(f"[UDPv6] Не удалось слушать порт {port}: {e}")
228
-
229
- # отправитель IPv6 (мультикаст link-local all-nodes)
230
- try:
231
- send_sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
232
- # hop limit 1
233
- send_sock6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1)
234
- except Exception as e:
235
- print(f"[UDPv6] Не удалось создать отправитель: {e}")
236
- send_sock6 = None
237
 
238
- maddr = "ff02::1" # all-nodes on-link
239
- msg_data = json.dumps({"id": my_id, "name": agent_name, "addresses": local_addresses}).encode("utf-8")
 
 
 
240
 
241
  while True:
242
- # --- приём ---
243
- for sock in listen_sockets:
 
244
  try:
245
- sock.settimeout(0.05)
246
- data, addr = sock.recvfrom(8192)
247
  msg = json.loads(data.decode("utf-8"))
248
  peer_id = msg.get("id")
249
- if not peer_id or peer_id == my_id:
250
  continue
 
251
  name = msg.get("name", "unknown")
252
  addresses = msg.get("addresses", [])
253
- expanded = expand_and_filter(addresses)
254
- if expanded:
255
- print(f"[UDPv6] RX от {addr} -> {expanded}")
256
- storage.add_or_update_peer(peer_id, name, expanded, "discovery", "online")
257
- except socket.timeout:
258
- pass
259
  except Exception as e:
260
- print(f"[UDPv6] Ошибка при обработке пакета: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
261
 
262
- # --- мультикаст рассылка ---
263
- if send_sock6:
264
- for port in udp_port_set:
265
- try:
266
- # iface scope index не задаём — ОС выберет подходящий on-link интерфейс
267
- send_sock6.sendto(msg_data, (maddr, port))
268
- except:
269
- pass
 
 
 
 
 
 
 
270
 
271
  time.sleep(DISCOVERY_INTERVAL)
272
 
@@ -322,9 +283,9 @@ def udp_discovery_sender():
322
 
323
  time.sleep(1)
324
 
325
- # ======================
326
- # TCP Listener (v4+v6) для Peer Exchange
327
- # ======================
328
  def tcp_listener():
329
  sockets = []
330
  for host, port in tcp_ports:
@@ -352,106 +313,110 @@ def tcp_listener():
352
  if not sockets:
353
  time.sleep(1)
354
  continue
 
355
  readable, _, _ = select.select(sockets, [], [], 1)
356
  for s in readable:
357
  try:
358
  conn, addr = s.accept()
359
  data = b""
360
  try:
361
- chunk = conn.recv(1024)
362
- data = chunk or b""
363
  except:
364
- data = b""
365
 
366
  if data == b"PEER_EXCHANGE_REQUEST":
367
  print(f"[TCP] PEER_EXCHANGE_REQUEST от {addr}")
368
  try:
369
- peers = []
370
  for pid, addresses_json in storage.get_online_peers(limit=50):
371
  try:
372
  addresses = json.loads(addresses_json)
373
- except Exception:
374
  addresses = []
375
- # фильтруем/расширяем перед отдачей
376
  addresses = expand_and_filter(addresses)
377
- peers.append({"id": pid, "addresses": addresses})
378
- payload = json.dumps(peers).encode("utf-8")
379
  conn.sendall(payload)
380
- print(f"[TCP] Отправлен список пиров ({len(peers)}) в {addr}")
381
  except Exception as e:
382
  print(f"[TCP] Ошибка при отправке списка пиров: {e}")
383
  conn.close()
384
  except Exception as e:
385
  print(f"[TCP] Ошибка при обработке соединения: {e}")
386
 
387
- # ======================
388
- # Peer Exchange (TCP инициатор, читает ответ и пишет в БД)
389
- # ======================
390
  def peer_exchange():
391
- PEER_EXCHANGE_INTERVAL = 30 # почаще, чтобы быстрее раскидать пиров
392
  while True:
393
- rows = storage.get_online_peers(limit=50)
394
- for pid, addresses_json in rows:
395
- if pid == my_id:
 
396
  continue
 
397
  try:
398
  addr_list = json.loads(addresses_json)
399
  except:
400
  addr_list = []
 
401
  for addr in addr_list:
402
  norm = storage.normalize_address(addr)
403
  if not norm:
404
  continue
405
- proto, hostport = norm.split("://", 1)
 
406
  if proto not in ["tcp", "any"]:
407
  continue
 
408
  host, port = parse_hostport(hostport)
409
- if not host or not port or is_loopback(host):
410
  continue
411
 
412
  try:
413
- family = socket.AF_INET6 if is_ipv6(host) else socket.AF_INET
414
- s = socket.socket(family, socket.SOCK_STREAM)
415
- s.settimeout(3)
416
- # IPv6 connect tuple принимает (host, port, flowinfo, scope_id), но flow/scope 0 по умолчанию ок
417
- s.connect((host, port))
418
- s.sendall(b"PEER_EXCHANGE_REQUEST")
419
-
420
- # читаем весь ответ (до закрытия сокета/таймаута)
421
- chunks = []
422
- try:
423
- while True:
424
- b = s.recv(8192)
425
- if not b:
426
  break
427
- chunks.append(b)
428
- except socket.timeout:
429
- pass
430
- s.close()
 
 
431
 
432
- if not chunks:
 
 
 
 
433
  continue
434
 
435
  try:
436
- peers_list = json.loads(b"".join(chunks).decode("utf-8"))
 
 
 
 
 
 
 
 
 
 
437
  except Exception as e:
438
- print(f"[PeerExchange] ошибка парсинга ответа: {e}")
439
- continue
440
 
441
- for p in peers_list:
442
- p_id = p.get("id")
443
- if not p_id or p_id == my_id:
444
- continue
445
- addrs = expand_and_filter(p.get("addresses", []))
446
- if addrs:
447
- storage.add_or_update_peer(p_id, "unknown", addrs, "exchange", "online")
448
- print(f"[PeerExchange] получили {p_id} -> {addrs}")
449
-
450
- # если успешно отработали по одному TCP-адресу — дальше по этому пиру можно не перебирать
451
- break
452
- except Exception as e:
453
- # пробуем следующий адрес пира
454
  continue
 
455
  time.sleep(PEER_EXCHANGE_INTERVAL)
456
 
457
  # ======================
@@ -461,11 +426,10 @@ def start_sync():
461
  print("[PeerSync] Запуск фоновой синхронизации")
462
  storage.load_bootstrap()
463
 
464
- threading.Thread(target=udp_lan_discovery_v4, daemon=True).start()
465
- threading.Thread(target=udp_lan_discovery_v6, daemon=True).start()
466
  threading.Thread(target=udp_discovery_sender, daemon=True).start()
467
  threading.Thread(target=peer_exchange, daemon=True).start()
468
  threading.Thread(target=tcp_listener, daemon=True).start()
469
 
470
  while True:
471
- time.sleep(60)
 
8
  import ipaddress
9
  import re
10
  import netifaces
11
+ import select
12
+
13
  from tools.storage import Storage
14
  from datetime import datetime
 
15
 
16
  storage = Storage()
17
  my_id = storage.get_config_value("agent_id", str(uuid.uuid4()))
 
127
  return out
128
 
129
  # ======================
130
+ # UDP LAN Discovery (IPv4 broadcast + IPv6 multicast)
131
  # ======================
132
+ def udp_lan_discovery():
133
  DISCOVERY_INTERVAL = 30
134
+
135
  local_addresses = storage.get_config_value("local_addresses", [])
136
+
137
+ # Получаем уникальные порты из local_addresses
138
  udp_port_set = set()
139
  for a in local_addresses:
140
+ _, port = parse_hostport(a.split("://", 1)[1])
141
+ if port:
142
+ udp_port_set.add(port)
 
 
 
 
143
 
144
+ # ------------------ Слушатели ------------------
145
  listen_sockets = []
146
  for port in udp_port_set:
147
+ # IPv4
148
  try:
149
  sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
150
  sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
151
  sock.bind(("", port))
152
  listen_sockets.append(sock)
153
+ print(f"[UDP/LAN Discovery] слушаем IPv4 на *:{port}")
154
  except Exception as e:
155
+ print(f"[UDP/LAN Discovery] Не удалось создать IPv4 сокет {port}: {e}")
 
 
 
156
 
157
+ # IPv6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
158
  try:
159
  sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
160
  sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
161
  sock6.bind(("::", port))
162
  listen_sockets.append(sock6)
163
+ print(f"[UDP/LAN Discovery] слушаем IPv6 на [::]:{port}")
164
  except Exception as e:
165
+ print(f"[UDP/LAN Discovery] Не удалось создать IPv6 сокет {port}: {e}")
 
 
 
 
 
 
 
 
 
166
 
167
+ msg_data = json.dumps({
168
+ "id": my_id,
169
+ "name": agent_name,
170
+ "addresses": local_addresses
171
+ }).encode("utf-8")
172
 
173
  while True:
174
+ # ------------------ Приём ------------------
175
+ rlist, _, _ = select.select(listen_sockets, [], [], 0.5)
176
+ for sock in rlist:
177
  try:
178
+ data, addr = sock.recvfrom(2048)
 
179
  msg = json.loads(data.decode("utf-8"))
180
  peer_id = msg.get("id")
181
+ if peer_id == my_id:
182
  continue
183
+
184
  name = msg.get("name", "unknown")
185
  addresses = msg.get("addresses", [])
186
+ storage.add_or_update_peer(peer_id, name, addresses, "discovery", "online")
187
+ print(f"[UDP/LAN Discovery] peer={peer_id} name={name} addresses={addresses} from {addr}")
 
 
 
 
188
  except Exception as e:
189
+ print(f"[UDP/LAN Discovery] ошибка при приёме: {e}")
190
+
191
+ # ------------------ Отправка ------------------
192
+ for port in udp_port_set:
193
+ # ---------------- IPv4 Broadcast ----------------
194
+ for iface in netifaces.interfaces():
195
+ addrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET, [])
196
+ for a in addrs:
197
+ if "broadcast" in a:
198
+ bcast = a["broadcast"]
199
+ try:
200
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
201
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
202
+ sock.sendto(msg_data, (bcast, port))
203
+ sock.close()
204
+ print(f"[UDP/LAN Discovery] -> IPv4 broadcast {bcast}:{port}")
205
+ except Exception as e:
206
+ print(f"[UDP/LAN Discovery] ошибка IPv4 broadcast {bcast}:{port}: {e}")
207
+
208
+ # ---------------- IPv6 Multicast ----------------
209
+ for iface in netifaces.interfaces():
210
+ ifaddrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET6, [])
211
+ for a in ifaddrs:
212
+ addr = a.get("addr")
213
+ if not addr:
214
+ continue
215
 
216
+ # Если link-local, добавляем scope_id
217
+ if addr.startswith("fe80:"):
218
+ multicast_addr = f"ff02::1%{iface}"
219
+ else:
220
+ multicast_addr = "ff02::1"
221
+
222
+ try:
223
+ sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
224
+ sock6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, socket.if_nametoindex(iface))
225
+ sock6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1)
226
+ sock6.sendto(msg_data, (multicast_addr, port))
227
+ sock6.close()
228
+ print(f"[UDP/LAN Discovery] -> IPv6 multicast {multicast_addr}:{port}")
229
+ except Exception:
230
+ continue
231
 
232
  time.sleep(DISCOVERY_INTERVAL)
233
 
 
283
 
284
  time.sleep(1)
285
 
286
+ # ==========================
287
+ # TCP Listener (IPv4 + IPv6, link-local + global)
288
+ # ==========================
289
  def tcp_listener():
290
  sockets = []
291
  for host, port in tcp_ports:
 
313
  if not sockets:
314
  time.sleep(1)
315
  continue
316
+
317
  readable, _, _ = select.select(sockets, [], [], 1)
318
  for s in readable:
319
  try:
320
  conn, addr = s.accept()
321
  data = b""
322
  try:
323
+ data = conn.recv(1024) or b""
 
324
  except:
325
+ pass
326
 
327
  if data == b"PEER_EXCHANGE_REQUEST":
328
  print(f"[TCP] PEER_EXCHANGE_REQUEST от {addr}")
329
  try:
330
+ peers_list = []
331
  for pid, addresses_json in storage.get_online_peers(limit=50):
332
  try:
333
  addresses = json.loads(addresses_json)
334
+ except:
335
  addresses = []
 
336
  addresses = expand_and_filter(addresses)
337
+ peers_list.append({"id": pid, "addresses": addresses})
338
+ payload = json.dumps(peers_list).encode("utf-8")
339
  conn.sendall(payload)
340
+ print(f"[TCP] Отправлен список пиров ({len(peers_list)}) в {addr}")
341
  except Exception as e:
342
  print(f"[TCP] Ошибка при отправке списка пиров: {e}")
343
  conn.close()
344
  except Exception as e:
345
  print(f"[TCP] Ошибка при обработке соединения: {e}")
346
 
347
+ # ==========================
348
+ # Peer Exchange (инициатор TCP)
349
+ # ==========================
350
  def peer_exchange():
351
+ PEER_EXCHANGE_INTERVAL = 120
352
  while True:
353
+ peers = storage.get_online_peers(limit=50)
354
+ for peer in peers:
355
+ peer_id, addresses_json = peer["id"], peer["addresses"]
356
+ if peer_id == my_id:
357
  continue
358
+
359
  try:
360
  addr_list = json.loads(addresses_json)
361
  except:
362
  addr_list = []
363
+
364
  for addr in addr_list:
365
  norm = storage.normalize_address(addr)
366
  if not norm:
367
  continue
368
+
369
+ proto, hostport = norm.split("://")
370
  if proto not in ["tcp", "any"]:
371
  continue
372
+
373
  host, port = parse_hostport(hostport)
374
+ if not host or not port:
375
  continue
376
 
377
  try:
378
+ # Для link-local IPv6 автоматически добавляем scope_id
379
+ if is_ipv6(host) and host.startswith("fe80:"):
380
+ for iface in get_all_local_ips_v6(): # перебор интерфейсов
381
+ if iface.endswith(host): # простой вариант сопоставления
382
+ scope_id = socket.if_nametoindex(iface)
383
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
384
+ sock.settimeout(3)
385
+ sock.connect((host, port, 0, scope_id))
 
 
 
 
 
386
  break
387
+ else:
388
+ continue
389
+ else:
390
+ sock = socket.socket(socket.AF_INET6 if is_ipv6(host) else socket.AF_INET, socket.SOCK_STREAM)
391
+ sock.settimeout(3)
392
+ sock.connect((host, port))
393
 
394
+ sock.sendall(b"PEER_EXCHANGE_REQUEST")
395
+ data = sock.recv(64 * 1024)
396
+ sock.close()
397
+
398
+ if not data:
399
  continue
400
 
401
  try:
402
+ peers_recv = json.loads(data.decode("utf-8"))
403
+ for p in peers_recv:
404
+ if p.get("id") and p["id"] != my_id:
405
+ storage.add_or_update_peer(
406
+ p["id"],
407
+ p.get("name", "unknown"),
408
+ p.get("addresses", []),
409
+ "peer_exchange",
410
+ "online"
411
+ )
412
+ print(f"[PeerExchange] Получено {len(peers_recv)} пиров от {host}:{port}")
413
  except Exception as e:
414
+ print(f"[PeerExchange] Ошибка разбора ответа от {host}:{port} -> {e}")
415
+ break # успешное соединение, не пробуем остальные адреса этого пира
416
 
417
+ except Exception:
 
 
 
 
 
 
 
 
 
 
 
 
418
  continue
419
+
420
  time.sleep(PEER_EXCHANGE_INTERVAL)
421
 
422
  # ======================
 
426
  print("[PeerSync] Запуск фоновой синхронизации")
427
  storage.load_bootstrap()
428
 
429
+ threading.Thread(target=udp_lan_discovery, daemon=True).start()
 
430
  threading.Thread(target=udp_discovery_sender, daemon=True).start()
431
  threading.Thread(target=peer_exchange, daemon=True).start()
432
  threading.Thread(target=tcp_listener, daemon=True).start()
433
 
434
  while True:
435
+ time.sleep(60)