#!/usr/bin/env python3 """ NetBird CLI - Simple command-line interface for NetBird API """ import os import sys import json import argparse import urllib.request import urllib.error import urllib.parse from typing import Optional, Dict, Any, List BASE_URL = "https://net.dcglab.co.uk/api" def get_token() -> str: """Get API token from environment.""" token = os.environ.get("NETBIRD_API_TOKEN") if not token: print("Error: NETBIRD_API_TOKEN environment variable not set", file=sys.stderr) print("Create a Personal Access Token in your NetBird dashboard:", file=sys.stderr) print(" User settings → Personal Access Tokens", file=sys.stderr) sys.exit(1) return token def api_request( method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None, ) -> Any: """Make an API request to NetBird.""" token = get_token() url = f"{BASE_URL}{endpoint}" if params: query = "&".join(f"{k}={urllib.parse.quote(str(v))}" for k, v in params.items()) url = f"{url}?{query}" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json", } req = urllib.request.Request( url, method=method, headers=headers, ) if data: req.data = json.dumps(data).encode("utf-8") try: with urllib.request.urlopen(req) as response: if response.status == 204: return None return json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") print(f"HTTP Error {e.code}: {e.reason}", file=sys.stderr) try: error_json = json.loads(error_body) print(json.dumps(error_json, indent=2), file=sys.stderr) except: print(error_body, file=sys.stderr) sys.exit(1) except urllib.error.URLError as e: print(f"Connection error: {e.reason}", file=sys.stderr) sys.exit(1) def format_peer(peer: Dict) -> str: """Format peer for display.""" status = "🟢" if peer.get("connected") else "🔴" name = peer.get("name", "unknown") ip = peer.get("ip", "N/A") hostname = peer.get("hostname", "N/A") os_info = peer.get("os", "N/A") return f"{status} {name} ({hostname}) - IP: {ip} - OS: {os_info}" def format_group(group: Dict) -> str: """Format group for display.""" name = group.get("name", "unknown") gid = group.get("id", "N/A")[:8] peers = group.get("peers_count", 0) return f"{name} ({gid}...) - {peers} peers" def format_policy(policy: Dict) -> str: """Format policy for display.""" name = policy.get("name", "unknown") enabled = "✅" if policy.get("enabled") else "❌" rules_count = len(policy.get("rules", [])) return f"{enabled} {name} - {rules_count} rules" def format_setup_key(key: Dict) -> str: """Format setup key for display.""" name = key.get("name", "unknown") key_type = key.get("type", "unknown") revoked = "🚫 REVOKED" if key.get("revoked") else "" auto_groups = ", ".join(key.get("auto_groups", [])) or "none" return f"{name} ({key_type}) - Auto-groups: {auto_groups} {revoked}" def format_route(route: Dict) -> str: """Format route for display.""" desc = route.get("description", "unknown") network = route.get("network", "N/A") enabled = "✅" if route.get("enabled") else "❌" return f"{enabled} {desc} - {network}" # ===== PEERS ===== def list_peers(): """List all peers.""" peers = api_request("GET", "/peers") if not peers: print("No peers found.") return print(f"Found {len(peers)} peer(s):\n") for peer in peers: print(format_peer(peer)) def get_peer(peer_id: str): """Get peer details.""" peer = api_request("GET", f"/peers/{peer_id}") print(json.dumps(peer, indent=2)) def delete_peer(peer_id: str): """Delete a peer.""" api_request("DELETE", f"/peers/{peer_id}") print(f"Peer {peer_id} deleted.") # ===== GROUPS ===== def list_groups(): """List all groups.""" groups = api_request("GET", "/groups") if not groups: print("No groups found.") return print(f"Found {len(groups)} group(s):\n") for group in groups: print(format_group(group)) def get_group(group_id: str): """Get group details.""" group = api_request("GET", f"/groups/{group_id}") print(json.dumps(group, indent=2)) def create_group(name: str, peers: Optional[List[str]] = None): """Create a new group.""" data = {"name": name} if peers: data["peers"] = peers group = api_request("POST", "/groups", data=data) print(f"Group created: {group.get('name')} (ID: {group.get('id')})") def delete_group(group_id: str): """Delete a group.""" api_request("DELETE", f"/groups/{group_id}") print(f"Group {group_id} deleted.") # ===== SETUP KEYS ===== def list_setup_keys(): """List all setup keys.""" keys = api_request("GET", "/setup-keys") if not keys: print("No setup keys found.") return print(f"Found {len(keys)} setup key(s):\n") for key in keys: print(format_setup_key(key)) def create_setup_key(name: str, key_type: str, expires: int, auto_groups: Optional[List[str]] = None): """Create a new setup key.""" data = { "name": name, "type": key_type, "expires_in": expires, "auto_groups": auto_groups or [] } key = api_request("POST", "/setup-keys", data=data) print(f"Setup key created: {key.get('name')}") print(f"Key: {key.get('key')}") print(f"ID: {key.get('id')}") def revoke_setup_key(key_id: str): """Revoke a setup key.""" api_request("PUT", f"/setup-keys/{key_id}", data={"revoked": True, "auto_groups": []}) print(f"Setup key {key_id} revoked.") # ===== POLICIES ===== def list_policies(): """List all policies.""" policies = api_request("GET", "/policies") if not policies: print("No policies found.") return print(f"Found {len(policies)} policy(ies):\n") for policy in policies: print(format_policy(policy)) def get_policy(policy_id: str): """Get policy details.""" policy = api_request("GET", f"/policies/{policy_id}") print(json.dumps(policy, indent=2)) def delete_policy(policy_id: str): """Delete a policy.""" api_request("DELETE", f"/policies/{policy_id}") print(f"Policy {policy_id} deleted.") # ===== ROUTES ===== def list_routes(): """List all routes.""" routes = api_request("GET", "/routes") if not routes: print("No routes found.") return print(f"Found {len(routes)} route(s):\n") for route in routes: print(format_route(route)) def get_route(route_id: str): """Get route details.""" route = api_request("GET", f"/routes/{route_id}") print(json.dumps(route, indent=2)) # ===== USERS ===== def list_users(): """List all users.""" users = api_request("GET", "/users") if not users: print("No users found.") return print(f"Found {len(users)} user(s):\n") for user in users: name = user.get("name", user.get("email", "unknown")) role = user.get("role", "unknown") service = "[service]" if user.get("is_service_user") else "" print(f"{name} ({role}) {service}") def main(): parser = argparse.ArgumentParser(description="NetBird CLI") subparsers = parser.add_subparsers(dest="resource", help="Resource type") # Peers peers_parser = subparsers.add_parser("peers", help="Manage peers") peers_sub = peers_parser.add_subparsers(dest="action") peers_sub.add_parser("list", help="List all peers") peers_get = peers_sub.add_parser("get", help="Get peer details") peers_get.add_argument("peer_id", help="Peer ID") peers_del = peers_sub.add_parser("delete", help="Delete a peer") peers_del.add_argument("peer_id", help="Peer ID") # Groups groups_parser = subparsers.add_parser("groups", help="Manage groups") groups_sub = groups_parser.add_subparsers(dest="action") groups_sub.add_parser("list", help="List all groups") groups_get = groups_sub.add_parser("get", help="Get group details") groups_get.add_argument("group_id", help="Group ID") groups_create = groups_sub.add_parser("create", help="Create a group") groups_create.add_argument("name", help="Group name") groups_create.add_argument("--peers", nargs="+", help="Peer IDs to add") groups_del = groups_sub.add_parser("delete", help="Delete a group") groups_del.add_argument("group_id", help="Group ID") # Setup Keys keys_parser = subparsers.add_parser("setup-keys", help="Manage setup keys") keys_sub = keys_parser.add_subparsers(dest="action") keys_sub.add_parser("list", help="List all setup keys") keys_create = keys_sub.add_parser("create", help="Create a setup key") keys_create.add_argument("--name", required=True, help="Key name") keys_create.add_argument("--type", choices=["one-off", "reusable"], default="reusable", help="Key type") keys_create.add_argument("--expires", type=int, default=86400, help="Expiration in seconds (default: 86400 = 24h)") keys_create.add_argument("--auto-groups", nargs="+", help="Group IDs to auto-assign") keys_revoke = keys_sub.add_parser("revoke", help="Revoke a setup key") keys_revoke.add_argument("key_id", help="Key ID") # Policies policies_parser = subparsers.add_parser("policies", help="Manage policies") policies_sub = policies_parser.add_subparsers(dest="action") policies_sub.add_parser("list", help="List all policies") policies_get = policies_sub.add_parser("get", help="Get policy details") policies_get.add_argument("policy_id", help="Policy ID") policies_del = policies_sub.add_parser("delete", help="Delete a policy") policies_del.add_argument("policy_id", help="Policy ID") # Routes routes_parser = subparsers.add_parser("routes", help="Manage routes") routes_sub = routes_parser.add_subparsers(dest="action") routes_sub.add_parser("list", help="List all routes") routes_get = routes_sub.add_parser("get", help="Get route details") routes_get.add_argument("route_id", help="Route ID") # Users users_parser = subparsers.add_parser("users", help="Manage users") users_sub = users_parser.add_subparsers(dest="action") users_sub.add_parser("list", help="List all users") args = parser.parse_args() if not args.resource: parser.print_help() sys.exit(1) # Handle commands if args.resource == "peers": if args.action == "list": list_peers() elif args.action == "get": get_peer(args.peer_id) elif args.action == "delete": delete_peer(args.peer_id) elif args.resource == "groups": if args.action == "list": list_groups() elif args.action == "get": get_group(args.group_id) elif args.action == "create": create_group(args.name, args.peers) elif args.action == "delete": delete_group(args.group_id) elif args.resource == "setup-keys": if args.action == "list": list_setup_keys() elif args.action == "create": create_setup_key(args.name, args.type, args.expires, args.auto_groups) elif args.action == "revoke": revoke_setup_key(args.key_id) elif args.resource == "policies": if args.action == "list": list_policies() elif args.action == "get": get_policy(args.policy_id) elif args.action == "delete": delete_policy(args.policy_id) elif args.resource == "routes": if args.action == "list": list_routes() elif args.action == "get": get_route(args.route_id) elif args.resource == "users": if args.action == "list": list_users() if __name__ == "__main__": main()