A Fast Shortest Distance Algorithm for Any Graphs



To find shortest paths in a graph, we have the Dijkstra's algorithm and the Bellman–Ford algorithm. The former algorithm works for graphs having non negative edge weights, and the later works for all graphs but is slower.

In this article, we describe a new (maybe not) algorithm, which is a merge of the two algorithms. It is identical to the Dijkstra's algorithm when there is no negative edge and it works for all graphs with better practical complexity than the Bellman–Ford algorithm. Same as the Bellman-Ford algorithm, this algorithm can also find negative cycles.


Algorithm Description

routine graph_min_distances:

Input:
  neighbors: A func to get a list of neighbor vertices for a start vertex.
  W: A func to get the edge weight for two vertices, start and end.
  s: The source vertex.
  V: A upper bound of the number of vertices.

For each (visited or being visited) vertex, we have data:
    dist: The tentative distance, initially inf.
    prev: The previous vertex of the tentative distance path, initially null.
    len: The number of vertices in the tentative distance path, initial zero.
    threshold: A threshold for dist, initially inf.
    tick: Used by find_cycle.

Output:
  cycle: A vertex in a found negative cycle, initially null.
  dists: The map v => v.dist for all visited vertices.
  prevs: The map v => v.prev for all visited vertices.

Init:
  Q: vertices priority queue, intially containing s only.
  Set dist = 0, len = 1 and threshold = 0 for s.

while Q is not empty:
  pop the vertex v having the min dist from Q
  threshold = -inf
  for n in neighbors(v)
    if n.dist < v.dist + W(v, n)
      u = n.prev
      n.dist = v.dist + W(v, n)
      n.prev = v
      n.len = v.len + 1
      cycle := detect_cycle(n, u)
      return if cycle is not null
      if n.dist < n.threshold or n is in Q
        add n into Q or update Q
      end
    end
    threshold = max(threshold, n.dist - W(v, n))
  end
  v.threshold = threshold
end

end routine

cycle_tick := an integer, initally 0.

subroutine find_cycle(v, min_len):
Input:
  v: The start vertex to walk back.
  min_len: Stop once the len field is not bigger than this.

Output:
  null or a vertex in a cycle

cycle_tick += 1
for x in [v, v.prev, v.prev.prev, ...] in order
  return null if x is null
  if x.tick == cycle_tick
    return x
  end
  return null if x.len <= min_len
  x.tick = cycle_tick
end
return null

end subroutine

subroutine detect_cycle:
Input:
  n: the just updated vertex
  u: the prev before the update
  v = n.prev: the new updated n.prev

Output:
  null or a vertex in a cycle.

if all weights seen are non negative
  return null
end
if n.len > V or n == source
  return find_cycle(n)
end
if u is None or u.len >= v.len or u == source
  return null
end
return find_cycle(n, min_len=u.len)

end subroutine

The Q could be implemented with a heap with index operations. Unlike the Dijkstra's algorithm, any container works correctly here, but priority queue reduces the duplications of vertex visiting significantly.


Proof

To proof the algorithm, we need some notations.

For a path 𝑃=[𝑣1,...,𝑣𝑛], let 𝑊(𝑃)𝑛1𝑖=1𝑊(𝑣𝑖,𝑣𝑖+1) be the sum of edges' weights and 𝐿(𝑃)=𝑛 be the number of vertices in the path. Compatible with the definitions, if 𝐿(𝑃)=1, then 𝑊(𝑃)=0.

Given a graph 𝐺 with vertices 𝑉 and edges 𝐸, and the source vertex 𝑠, the distance of a any vertex 𝑣 is defined as 𝐷(𝑣)min𝑃=[𝑠,...,𝑣]𝑊(𝑃).

A path 𝑃=[𝑠,...,𝑣] having 𝑊(𝑃)=𝐷(𝑣) is called a distance path. We define 𝐷(𝑣)= if there is no path from 𝑠 to 𝑣. Distance paths exist if and only if |𝐷(𝑣)|<.

Claim: If 𝑃=[𝑠,...,𝑢,𝑣] is a distance path for 𝑣, then 𝑃𝑢=[𝑠,...,𝑢] is a distance path for 𝑢.

Proof: If not, there exists a path 𝑄=[𝑠,...,𝑢] having 𝑊(𝑄)<𝑊(𝑃𝑢). Then for the path 𝑄𝑣=[𝑠,...,𝑢,𝑣] (𝑄 extended to 𝑣), 𝑊(𝑄𝑢)=𝑊(𝑄)+𝑊(𝑢,𝑣)<𝑊(𝑃𝑢)+𝑊(𝑢,𝑣)=𝑊(𝑃). So 𝑃 can not be a distance path. This conflicts and proofs the claim.

Let 𝑇(𝑣) be the (tentative) distance found by the algorithm for the vertex 𝑣, then 𝑇(𝑣)>=𝐷(𝑣). We need to proof that 𝑇(𝑣)=𝐷(𝑣) if 𝐷(𝑣)>.

If 𝐷(𝑣)=, the vertex is not connected with 𝑠. The algorithm can not find 𝑣 in the search process, and keeps 𝑇(𝑣)=. It is easy to see that 𝐷(𝑣)= if and only if 𝑇(𝑣)=.

If the violating set {𝑣𝑇(𝑣)>𝐷(𝑣),|𝐷(𝑣)|<} is not empty, we pick a 𝑣 having a distance path 𝑃=[𝑠,...,𝑢,𝑣] and 𝐿(𝑃) is the smallest among the violating set. Needless to say that 𝐿(𝑃)>1. The truncated path 𝑃𝑢=[𝑠,...,𝑢] has one fewer vertex and is also a distance path, so 𝑇(𝑢)=𝐷(𝑢). The algorithm adds 𝑣 to the queue after finding a distance path for 𝑢. The later iteration when popping 𝑣 from the queue is able to find a distance path for 𝑣. This conflicts and proofs 𝑇(𝑣)=𝐷(𝑣) for each 𝑣 with |𝐷(𝑣)|<.

Now assume 𝐷(𝑣)=. This indicates there are negative cycles (the sum of the cycle edges' weights is negative). The algorithm keeps running for all vertices accessible from negative cycles and we can follow the path 𝑣𝑣.prev, ... to find negative cycles. But the full check is slow. The algorithm uses some heuristic conditions. These conditions only report negative cycle existence when it really exists. This is good enough if we can bound the loop. In the algorithm, we note that 𝑣.len keep increasing for a cycle loop, this is finite when working the check 𝐿(𝑣)<=|𝑉|.

In this proof, we do not depend on the visiting order of the vertex queue 𝑄. So the algorithm is correct for any order, but the order by dist is better for complexity, and makes it fall back to Dijkstra's algorithm naturally. If the algorithm visit a vertex with a larger dist, the algorithm has a large probability of not updating any neighbors, and the later iteration (when visiting another vertex) adds this vertex again.


Complexity

It is easy to see the complexity is the same as the Dijkstra's algorithm when there is no negative edge. The worse complexity is still unknown, but in practise, we find the visiting times for each vertex is quite small.

We generated two random grid (100x100) graphs with negative weights only on the outer rectangle edges in one direction (the other direction weights are all large to avoid small and trivial cycles) and a third grid graph similar but with no negative weights, the example code visits 1-2 times.


Code


from typing import Any
import dataclasses
import math

class Result:
def __init__(self, **kwds):
for k, v in kwds.items():
setattr(self, k, v)

def graph_min_distances(graph, source, V=None, random_queue=False):
if V is None: V = len(graph)
VertexData = dataclasses.make_dataclass(
"VertexData",
[
("current", Any, dataclasses.field(default=None)),
("dist", float, dataclasses.field(default=math.inf)),
("prev", Any, dataclasses.field(default=None)),
("threshold", float, dataclasses.field(default=math.inf)),
("len", int, dataclasses.field(default=0)),
("tick", int, dataclasses.field(default=0)),
("heap_index", int, dataclasses.field(default=-1)),
]
)
vertex_to_data = {source: VertexData(current=source, dist=0, threshold=0, len=1, heap_index=0)}

# queue is a heap with min dist vertex at the top (index 0).
queue = [vertex_to_data[source]]
def queue_cmp(x, y):
a, b = queue[x], queue[y]
if a.dist != b.dist: return a.dist > b.dist
return a.len > b.len
def queue_swap(x, y):
queue[x], queue[y] = queue[y], queue[x]
queue[x].heap_index = x
queue[y].heap_index = y
def queue_shift_up(x):
while x > 0:
p = (x - 1) // 2
if not queue_cmp(p, x): break
queue_swap(p, x)
x = p
return x
def queue_shift_down(x):
while x * 2 + 1 < len(queue):
l = x * 2 + 1
r = l + 1
c = r if r < len(queue) and queue_cmp(l, r) else l
if not queue_cmp(x, c): break
queue_swap(x, c)
x = c
return x
def queue_pop():
if random_queue:
v_d = queue.pop()
v_d.heap_index = -1
return v_d
v_d = queue[0]
v_d.heap_index = -1
if len(queue) == 1:
queue.pop()
return v_d
queue[0] = queue.pop()
queue[0].heap_index = 0
queue_shift_down(0)
return v_d
def queue_update(v_d):
if v_d.heap_index == -1:
v_d.heap_index = len(queue)
queue.append(v_d)
if random_queue:
i = random.randint(0, len(queue) - 1)
if i != v_d.heap_index:
queue_swap(i, v_d.heap_index)
else:
queue_shift_up(v_d.heap_index)

get_neighbors = graph
if isinstance(get_neighbors, dict):
get_neighbors = lambda v: graph.get(v, {})

result = Result(source=source, find_cycle_ops=0, edge_visits=0,
vertex_times={}, queue_pops=0, queue_updates=0)

cycle_tick = 0
def find_cycle(v_d, min_len=0):
nonlocal cycle_tick
cycle_tick += 1
while v_d is not None:
result.find_cycle_ops += 1
if v_d.tick == cycle_tick:
return v_d.current
if v_d.len <= min_len: return None
v_d.tick = cycle_tick
v_d = v_d.prev
return None

seen_negative = False
def detect_cycle(n_d, u_d):
if not seen_negative: return None
if n_d.len > V or n_d.current == source:
n = find_cycle(n_d)
assert n is not None
return n
v_d = n_d.prev
if u_d is None or v_d.len <= u_d.len or u_d.current == source: return None
return find_cycle(v_d, min_len=u_d.len)

def make_result(c):
result.cycle = c
result.dists = {d.current: d.dist for d in vertex_to_data.values()}
result.prevs = {d.current: d.prev.current if d.prev is not None else None
for d in vertex_to_data.values() }
result.lens = {d.current: d.len for d in vertex_to_data.values()}
return result

while queue:
v_d = queue_pop()
#print("visiting", v_d.current, "dist", v_d.dist)
v = v_d.current
th = -math.inf
result.queue_pops += 1
result.vertex_times[v] = result.vertex_times.get(v, 0) + 1
for n, w in get_neighbors(v).items():
if w < 0: seen_negative = True
result.edge_visits += 1
n_d = vertex_to_data.get(n, None)
if n_d is None:
n_d = VertexData(current=n)
vertex_to_data[n] = n_d
if v_d.dist + w < n_d.dist:
u_d = n_d.prev
n_d.dist = v_d.dist + w
n_d.prev = v_d
n_d.len = v_d.len + 1
c = detect_cycle(n_d, u_d)
if c is not None:
return make_result(c)
if n_d.dist < n_d.threshold or n_d.heap_index != -1:
queue_update(n_d)
result.queue_updates += 1
#print(f"update {n} dist {n_d.dist}")
th = max(th, n_d.dist - w)
v_d.threshold = th
return make_result(None)

def graph_min_distances_result_check(g, result):
source = result.source
if result.cycle is not None:
s = 0
v = result.cycle
while True:
u = result.prevs[v]
s += g[u][v]
v = u
if v == result.cycle: break
assert s < 0, f"cycle {result.cycle} not negative"
return
assert result.dists[source] == 0, f"dists[source] ({result.dists[source]}) == 0"
eps = 1.0e-10
for n, w in g.get(source, {}).items():
assert result.dists[n] <= w
for a, edges in g.items():
a_d = result.dists.get(a, math.inf)
p = result.prevs.get(a, None)
if a != source and p is not None:
w = g[p][a]
p_d = result.dists.get(p, math.inf)
assert a_d >= p_d + w - eps
for b, w in edges.items():
b_d = result.dists.get(b, math.inf)
assert b_d <= a_d + w + eps

No comments: