' + escape(kwargs.get("label", get_as_name(_as))).replace("\r"," ") + " |
" + graph.create_dot() + "") if fmt == "png": response = Response(graph.create_png(), mimetype='image/png') elif fmt == "svg": response = Response(graph.create_svg(), mimetype='image/svg+xml') else: abort(400, "Incorrect format") response.headers['Last-Modified'] = datetime.now() response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0, max-age=0' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '-1' return response def build_as_tree_from_raw_bird_ouput(host, proto, text): """Extract the as path from the raw bird "show route all" command""" path = None paths = [] net_dest = None for line in text: line = line.strip() # check for bird2 style line containing protocol name # matches: ... unicast_[(protocol)_ ... b2_unicast_line = re.search(r'(.*)unicast\s+\[(\w+)\s+', line) if b2_unicast_line: # save the net_dest and protocol name for later if b2_unicast_line.group(1).strip(): net_dest = b2_unicast_line.group(1).strip() peer_protocol_name = b2_unicast_line.group(2).strip() peer_match = False # check line for bird1 style line # matches: ___via_(next hop ip addr)_on_(iface)_[(protocol)_ ... b1_peer_line = re.search(r'(.*)via\s+([0-9a-fA-F:\.]+)\s+on.*\[(\w+)\s+', line) if b1_peer_line: # save the net_dest, peer and protocol name for later if b1_peer_line.group(1).strip(): net_dest = b1_peer_line.group(1).strip() peer_ip = b1_peer_line.group(2).strip() peer_protocol_name = b1_peer_line.group(3).strip() # flag that a match was found peer_match = True else: # if this wasn't a bird1 style peer line, then check for a bird2 # style line instead. Doing the check in the else clause prevents # falsely matching bird1 lines # matches: _via_(next hop address) b2_peer_line = re.search(r'via\s+([0-9a-fA-F:\.]+)', line) if b2_peer_line: peer_ip = b2_peer_line.group(1).strip() # flag that a match was found peer_match = True if peer_match: # common code for when either a bird1 or bird2 peer line was found if path: path.append(net_dest) paths.append(path) path = None # Check if via line is a internal route for rt_host, rt_ips in app.config["ROUTER_IP"].items(): # Special case for internal routing if peer_ip in rt_ips: paths.append([peer_protocol_name, rt_host]) path = None break else: path = [ peer_protocol_name ] # check for unreachable routes (common for bird1 & 2) # matches: ...unreachable_[(protocol)_ unreachable_line = re.search(r'(.*)unreachable\s+\[(\w+)\s+', line) if unreachable_line: if path: path.append(net_dest) paths.append(path) path = None if unreachable_line.group(1).strip(): net_dest = unreachable_line.group(1).strip() # check for on-link routes onlink_line = re.search(r'^dev', line) if onlink_line: paths.append([peer_protocol_name, net_dest]) path = None if line.startswith("BGP.as_path:") and path: ASes = line.replace("BGP.as_path:", "").strip().split(" ") if path: path.extend(ASes) else: path = ASes if path: path.append(net_dest) paths.append(path) return paths def show_route(request_type, hosts, proto): expression = get_query() if not expression: abort(400) set_session(request_type, hosts, proto, expression) bgpmap = request_type.endswith("bgpmap") all = (request_type.endswith("detail") and " all" or "") if bgpmap: all = " all" if request_type.startswith("adv"): command = "show route " + expression.strip() if bgpmap and not command.endswith("all"): command = command + " all" elif request_type.startswith("where"): command = "show route where net ~ [ " + expression + " ]" + all else: mask = "" if len(expression.split("/")) == 2: expression, mask = (expression.split("/")) if app.config.get("UNIFIED_DAEMON", False): if not ip_is_valid(expression): try: expression = resolve_any(expression) except: return error_page("%s is unresolvable" % expression) if not mask and ipv4_is_valid(expression): mask = "32" if not mask and ipv6_is_valid(expression): mask = "128" if not mask_is_valid(mask): return error_page("mask %s is invalid" % mask) else: if not mask and proto == "ipv4": mask = "32" if not mask and proto == "ipv6": mask = "128" if not mask_is_valid(mask): return error_page("mask %s is invalid" % mask) if proto == "ipv6" and not ipv6_is_valid(expression): try: expression = resolve(expression, "AAAA") except: return error_page("%s is unresolvable or invalid for %s" % (expression, proto)) if proto == "ipv4" and not ipv4_is_valid(expression): try: expression = resolve(expression, "A") except: return error_page("%s is unresolvable or invalid for %s" % (expression, proto)) if mask: expression += "/" + mask command = "show route for " + expression + all detail = {} errors = [] hosts = hosts.split("+") if hosts == ["all"]: hosts = list(app.config["PROXY"].keys()) allhosts = hosts[:] for host in allhosts: ret, res = bird_command(host, proto, command) res = res.split("\n") if ret is False: errors.append("%s" % res) continue if len(res) <= 1: errors.append("%s: bird command failed with error, %s" % (host, "\n".join(res))) continue if bgpmap: detail[host] = build_as_tree_from_raw_bird_ouput(host, proto, res) #for internal routes via hosts not selected #add them to the list, but only show preferred route if host not in hosts: detail[host] = detail[host][:1] for path in detail[host]: if len(path) == 2: if (path[1] not in allhosts) and (path[1] in app.config["PROXY"]): allhosts.append(path[1]) else: detail[host] = add_links(res) if bgpmap: # in python3 b64encode() encodes a byte-like object in to bytes # so the json string needs to be encoded to bytes and then the # base64 result decoded back to a string again detail = json.dumps(detail) detail = base64.b64encode(detail.encode('utf-8')) detail = detail.decode('utf-8') return render_template((bgpmap and 'bgpmap.html' or 'route.html'), detail=detail, command=command, expression=expression, errors=errors) if __name__ == "__main__": app.run(app.config.get("BIND_IP", "0.0.0.0"), app.config.get("BIND_PORT", 5000))