clique / src /clique2_ablations_parallel2.java
qingy2024's picture
Upload folder using huggingface_hub
bf620c6 verified
raw
history blame
17.1 kB
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.stream.IntStream;
public class clique2_ablations_parallel2 {
static int n, m;
public static List<SnapshotDTO> main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: java clique2_ablations <epsilon> <inputfile>");
}
final double EPS = Double.parseDouble(args[0]);
Scanner r;
try {
r = new Scanner(new FileReader(args[1]));
} catch (IOException e) {
System.err.println("Could not open " + args[1] + ". Falling back to stdin.");
r = new Scanner(System.in);
}
n = r.nextInt();
m = r.nextInt();
@SuppressWarnings("unchecked")
List<Integer>[] adj = new ArrayList[n + 1];
for (int i = 1; i <= n; i++) adj[i] = new ArrayList<>();
for (int i = 0; i < m; i++) {
int a = r.nextInt(), b = r.nextInt();
adj[a].add(b);
adj[b].add(a);
}
r.close();
long t0 = System.nanoTime();
List<SnapshotDTO> res = runLaplacianRMC(adj);
long t1 = System.nanoTime();
System.out.printf(Locale.US, "Runtime: %.3f ms%n", (t1 - t0) / 1_000_000.0);
return res;
}
public static List<SnapshotDTO> runLaplacianRMC(List<Integer>[] adj1Based) {
ArrayList<SnapshotDTO> out = new ArrayList<>();
runByComponents(adj1Based, out::add);
return out;
}
/**
* Optimized O(Mk) algorithm with performance improvements.
*/
public static List<SnapshotDTO> runLaplacianRMCStreaming(List<Integer>[] adj,
Consumer<SnapshotDTO> sink) {
final int n = adj.length - 1;
// Phase 1: peeling (same as before)
int[] deg0 = new int[n + 1];
PriorityQueue<Pair> pq = new PriorityQueue<>();
for (int i = 1; i <= n; i++) {
deg0[i] = adj[i].size();
pq.add(new Pair(i, deg0[i]));
}
Deque<Integer> peelStack = new ArrayDeque<>(n);
while (!pq.isEmpty()) {
Pair p = pq.poll();
if (p.degree != deg0[p.node]) continue;
peelStack.push(p.node);
for (int v : adj[p.node]) {
if (deg0[v] > 0) {
deg0[v]--;
pq.add(new Pair(v, deg0[v]));
}
}
deg0[p.node] = 0;
}
// Build addition order and index
int[] addOrder = new int[n];
int[] idx = new int[n + 1];
for (int t = 0; t < n; t++) {
int u = peelStack.pop();
addOrder[t] = u;
idx[u] = t;
}
// Phase 1.5: orient edges by idx and sort successors
@SuppressWarnings("unchecked")
ArrayList<Integer>[] succ = new ArrayList[n + 1];
@SuppressWarnings("unchecked")
ArrayList<Integer>[] pred = new ArrayList[n + 1];
for (int i = 1; i <= n; i++) {
succ[i] = new ArrayList<>();
pred[i] = new ArrayList<>();
}
// Parallelize edge orientation
IntStream.rangeClosed(1, n).parallel().forEach(u -> {
for (int v : adj[u]) {
if (u < v) {
if (idx[u] < idx[v]) {
synchronized(succ[u]) { succ[u].add(v); }
synchronized(pred[v]) { pred[v].add(u); }
} else {
synchronized(succ[v]) { succ[v].add(u); }
synchronized(pred[u]) { pred[u].add(v); }
}
}
}
});
// Parallel sorting
IntStream.rangeClosed(1, n).parallel().forEach(v -> {
if (succ[v].size() > 1) {
succ[v].sort(Comparator.comparingInt(w -> idx[w]));
}
});
// Phase 2: reverse reconstruction with optimizations
DSU dsu = new DSU(n);
int[] deg = new int[n + 1];
long[] predSum = new long[n + 1];
// Optimized component tracking with ArrayList
@SuppressWarnings("unchecked")
ArrayList<Integer>[] compNodes = new ArrayList[n + 1];
for (int i = 1; i <= n; i++) compNodes[i] = new ArrayList<>();
// Reusable temporary array for node lists
int[] tempNodes = new int[100]; // Start small, will grow as needed
// Use original linear search (more reliable than binary search)
final SumSucc sumSucc = new SumSucc(succ, idx, deg);
List<SnapshotDTO> recon = new ArrayList<>();
// Progress tracking
long startTime = System.currentTimeMillis();
long lastProgressTime = startTime;
int progressInterval = 50;
int totalNodes = n;
for (int t = 0; t < addOrder.length; t++) {
int u = addOrder[t];
dsu.makeIfNeeded(u);
long Su = 0L;
final int Tu = idx[u];
compNodes[u].add(u);
// connect u to all its predecessors (earlier neighbors)
for (int v : pred[u]) {
long a = deg[u];
long b = deg[v];
// S_v = pred_sum[v] + sum of deg[w] for successors w of v with idx[w] < idx[u]
long Sv = predSum[v] + sumSucc.until(v, Tu);
long dQu = 2L * a * a - 2L * Su + a;
long dQv = 2L * b * b - 2L * Sv + b;
long edgeTerm = (a - b) * (a - b);
int ru = dsu.find(u);
int rv = dsu.find(v);
dsu.Q[ru] += (double) dQu;
dsu.Q[rv] += (double) dQv;
int r;
if (ru != rv) {
r = dsu.union(ru, rv);
dsu.Q[r] += (double) edgeTerm;
int o = (r == ru) ? rv : ru;
compNodes[r].addAll(compNodes[o]);
compNodes[o].clear();
} else {
r = ru;
dsu.Q[r] += (double) edgeTerm;
}
// degree increments
deg[u] += 1;
deg[v] += 1;
// Update sumDeg for the component
dsu.sumDeg[r] += 2;
// push +1 to predSum of successors (outdegree ≤ k)
// for (int y : succ[u]) predSum[y] += 1;
// for (int y : succ[v]) predSum[y] += 1;
if (succ[u].size() > 1000) {
succ[u].parallelStream().forEach(y -> predSum[y] += 1);
} else {
for (int y : succ[u]) predSum[y] += 1;
}
if (succ[v].size() > 1000) {
succ[v].parallelStream().forEach(y -> predSum[y] += 1);
} else {
for (int y : succ[v]) predSum[y] += 1;
}
// maintain Su: add deg[v] AFTER its increment
Su += deg[v];
}
// Efficient node array creation
int r = dsu.find(u);
int compSize = compNodes[r].size();
if (tempNodes.length < compSize) {
tempNodes = new int[compSize];
}
for (int i = 0; i < compSize; i++) {
tempNodes[i] = compNodes[r].get(i) - 1;
}
Arrays.sort(tempNodes, 0, compSize);
int[] nodes = Arrays.copyOf(tempNodes, compSize);
int compId = dsu.componentId(r);
SnapshotDTO snap = new SnapshotDTO(compId, nodes, nodes.length, dsu.sumDeg[r], dsu.Q[r]);
sink.accept(snap);
// Progress reporting every 50 steps
if ((t + 1) % progressInterval == 0 || t == addOrder.length - 1) {
long currentTime = System.currentTimeMillis();
long elapsedMs = currentTime - lastProgressTime;
long totalElapsedMs = currentTime - startTime;
double stepsPerSecond = elapsedMs > 0 ? (progressInterval * 1000.0 / elapsedMs) : 0;
double avgStepsPerSecond = totalElapsedMs > 0 ? ((t + 1) * 1000.0 / totalElapsedMs) : 0;
// System.out.printf(Locale.US, "Step #%d/%d (%.1f%%) - Speed: %.1f ops/sec (avg: %.1f ops/sec)%n",
// t + 1, totalNodes,
// (t + 1) * 100.0 / totalNodes,
// stepsPerSecond, avgStepsPerSecond);
lastProgressTime = currentTime;
}
}
return recon;
}
// === Added: component-parallel wrapper (no algorithm changes) ===
/** Return #threads from -Dthreads or THREADS env; default: max(2, availableProcessors()). */
static int threadCount() {
String prop = System.getProperty("threads");
if (prop != null) { try { return Math.max(1, Integer.parseInt(prop.trim())); } catch (Exception ignore) {} }
String env = System.getenv("THREADS");
if (env != null) { try { return Math.max(1, Integer.parseInt(env.trim())); } catch (Exception ignore) {} }
int ap = Runtime.getRuntime().availableProcessors();
return Math.max(2, ap);
}
/** Connected components on 1-based adjacency. Returns list of int[] of global node ids. */
static List<int[]> connectedComponents1Based(List<Integer>[] adj) {
final int n = adj.length - 1;
boolean[] vis = new boolean[n + 1];
int[] q = new int[n];
ArrayList<int[]> out = new ArrayList<>();
for (int s = 1; s <= n; s++) {
if (vis[s]) continue;
int qs = 0, qe = 0, t = 0;
q[qe++] = s; vis[s] = true;
int[] tmp = new int[n];
while (qs < qe) {
int u = q[qs++]; tmp[t++] = u;
for (int v : adj[u]) if (!vis[v]) { vis[v] = true; q[qe++] = v; }
}
if (t > 0) out.add(Arrays.copyOf(tmp, t));
}
return out;
}
/** Build 1-based induced subgraph for given global node ids. */
static List<Integer>[] induceSubgraph1Based(List<Integer>[] adj, int[] nodes) {
final int k = nodes.length;
@SuppressWarnings("unchecked")
List<Integer>[] sub = new List[k + 1];
for (int i = 1; i <= k; i++) sub[i] = new ArrayList<>(Math.max(2, adj[nodes[i-1]].size()));
// map global -> local (1..k)
int maxId = 0; for (int u : nodes) if (u > maxId) maxId = u;
int[] toLocal = new int[maxId + 1];
for (int i = 0; i < k; i++) toLocal[nodes[i]] = i + 1;
for (int i = 0; i < k; i++) {
int gu = nodes[i], lu = i + 1;
for (int gv : adj[gu]) {
if (gv <= maxId) {
int lv = toLocal[gv];
if (lv != 0) sub[lu].add(lv);
}
}
}
return sub;
}
/**
* Run the existing streaming algorithm per connected component in parallel.
* No changes to core logic; only maps local subgraph node indices back to global ids.
*/
public static List<SnapshotDTO> runByComponents(List<Integer>[] adj, Consumer<SnapshotDTO> sink) {
List<int[]> comps = connectedComponents1Based(adj);
final int T = threadCount();
System.err.println("[clique2] CPUs=" + Runtime.getRuntime().availableProcessors() +
" threads=" + T + " components=" + comps.size());
if (comps.size() <= 1 || T <= 1) {
// fall back to original single-core path
return runLaplacianRMCStreaming(adj, sink);
}
ArrayList<SnapshotDTO> merged = new ArrayList<>();
// Build tasks
ArrayList<Callable<List<SnapshotDTO>>> tasks = new ArrayList<>(comps.size());
for (int[] comp : comps) {
tasks.add(() -> {
List<Integer>[] sub = induceSubgraph1Based(adj, comp);
ArrayList<SnapshotDTO> local = new ArrayList<>();
// Run your existing algorithm on the subgraph
runLaplacianRMCStreaming(sub, local::add);
// Map node ids back to global
for (SnapshotDTO s : local) {
int[] a = s.nodes;
for (int i = 0; i < a.length; i++) a[i] = comp[a[i] - 1];
}
return local;
});
}
// Execute with an explicit pool (no parallelStream/common-pool surprises)
ExecutorService pool = (T >= 64) ? Executors.newWorkStealingPool(T)
: Executors.newFixedThreadPool(T);
try {
List<Future<List<SnapshotDTO>>> futs = pool.invokeAll(tasks);
for (Future<List<SnapshotDTO>> f : futs) merged.addAll(f.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
pool.shutdown();
}
if (sink != null) for (SnapshotDTO s : merged) sink.accept(s);
return merged;
}
// Keep original linear search for SumSucc - more reliable
static final class SumSucc {
final ArrayList<Integer>[] succ;
final int[] idx;
final int[] deg;
SumSucc(ArrayList<Integer>[] succ, int[] idx, int[] deg) {
this.succ = succ;
this.idx = idx;
this.deg = deg;
}
long until(int v, int T) {
final ArrayList<Integer> sv = succ[v];
final int sz = sv.size();
if (sz == 0) return 0L;
// Binary search for position where idx[w] >= T
int low = 0, high = sz;
while (low < high) {
int mid = (low + high) / 2;
if (idx[sv.get(mid)] < T) {
low = mid + 1;
} else {
high = mid;
}
}
final int pos = low;
if (pos == 0) return 0L;
// Sum prefix [0, pos)
if (pos > 500) { // Threshold; tune based on profiling (e.g., 100-1000)
return sv.subList(0, pos).parallelStream().mapToLong(w -> deg[w]).sum();
} else {
long s = 0L;
for (int i = 0; i < pos; i++) {
s += deg[sv.get(i)];
}
return s;
}
}
}
// Original helper classes (unchanged)
static class Result {
double bestSL;
int bestRoot;
}
static class Pair implements Comparable<Pair> {
final int node, degree;
Pair(int node, int degree) { this.node = node; this.degree = degree; }
public int compareTo(Pair o) {
if (degree != o.degree) return Integer.compare(degree, o.degree);
return Integer.compare(node, o.node);
}
}
static class DSU {
final int[] parent;
final int[] size;
final boolean[] made;
final double[] Q;
final int[] sumDeg;
final int[] compId; // compId[root] > 0 iff the root represents a live component
int nextCompId = 1; // 1-based; 0 means "unassigned"
DSU(int n) {
parent = new int[n + 1];
size = new int[n + 1];
made = new boolean[n + 1];
Q = new double[n + 1];
sumDeg = new int[n + 1];
compId = new int[n + 1];
}
void makeIfNeeded(int v) {
if (!made[v]) {
made[v] = true;
parent[v] = v;
size[v] = 1;
Q[v] = 0.0;
sumDeg[v] = 0;
if (compId[v] == 0) compId[v] = nextCompId++;
}
}
int find(int v) {
if (!made[v]) return v;
if (parent[v] != v) parent[v] = find(parent[v]);
return parent[v];
}
int union(int a, int b) {
makeIfNeeded(a);
makeIfNeeded(b);
int ra = find(a), rb = find(b);
if (ra == rb) return ra;
if (size[ra] < size[rb]) { int t = ra; ra = rb; rb = t; }
parent[rb] = ra;
size[ra] += size[rb];
Q[ra] += Q[rb];
sumDeg[ra] += sumDeg[rb];
int aId = compId[ra], bId = compId[rb];
int keep = (aId == 0) ? bId : (bId == 0 ? aId : Math.min(aId, bId));
compId[ra] = keep;
compId[rb] = 0; // retire loser id
return ra;
}
int componentId(int v) { return compId[find(v)]; }
}
public static final class SnapshotDTO {
public final int componentId;
public final int[] nodes;
public final int nC;
public final long sumDegIn;
public final double Q;
public SnapshotDTO(int componentId, int[] nodes, int nC, long sumDegIn, double Q) { this.componentId = componentId; this.nodes = nodes; this.nC = nC; this.sumDegIn = sumDegIn; this.Q = Q; }
}
}