Files

392 lines
12 KiB
Python

#!/usr/bin/env python3
"""
NetBird CLI - Simple command-line interface for NetBird API
"""
import os
import sys
import json
import argparse
import urllib.request
import urllib.error
import urllib.parse
from typing import Optional, Dict, Any, List
BASE_URL = "https://net.dcglab.co.uk/api"
def get_token() -> str:
"""Get API token from environment."""
token = os.environ.get("NETBIRD_API_TOKEN")
if not token:
print("Error: NETBIRD_API_TOKEN environment variable not set", file=sys.stderr)
print("Create a Personal Access Token in your NetBird dashboard:", file=sys.stderr)
print(" User settings → Personal Access Tokens", file=sys.stderr)
sys.exit(1)
return token
def api_request(
method: str,
endpoint: str,
data: Optional[Dict] = None,
params: Optional[Dict] = None,
) -> Any:
"""Make an API request to NetBird."""
token = get_token()
url = f"{BASE_URL}{endpoint}"
if params:
query = "&".join(f"{k}={urllib.parse.quote(str(v))}" for k, v in params.items())
url = f"{url}?{query}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
req = urllib.request.Request(
url,
method=method,
headers=headers,
)
if data:
req.data = json.dumps(data).encode("utf-8")
try:
with urllib.request.urlopen(req) as response:
if response.status == 204:
return None
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
print(f"HTTP Error {e.code}: {e.reason}", file=sys.stderr)
try:
error_json = json.loads(error_body)
print(json.dumps(error_json, indent=2), file=sys.stderr)
except:
print(error_body, file=sys.stderr)
sys.exit(1)
except urllib.error.URLError as e:
print(f"Connection error: {e.reason}", file=sys.stderr)
sys.exit(1)
def format_peer(peer: Dict) -> str:
"""Format peer for display."""
status = "🟢" if peer.get("connected") else "🔴"
name = peer.get("name", "unknown")
ip = peer.get("ip", "N/A")
hostname = peer.get("hostname", "N/A")
os_info = peer.get("os", "N/A")
return f"{status} {name} ({hostname}) - IP: {ip} - OS: {os_info}"
def format_group(group: Dict) -> str:
"""Format group for display."""
name = group.get("name", "unknown")
gid = group.get("id", "N/A")[:8]
peers = group.get("peers_count", 0)
return f"{name} ({gid}...) - {peers} peers"
def format_policy(policy: Dict) -> str:
"""Format policy for display."""
name = policy.get("name", "unknown")
enabled = "" if policy.get("enabled") else ""
rules_count = len(policy.get("rules", []))
return f"{enabled} {name} - {rules_count} rules"
def format_setup_key(key: Dict) -> str:
"""Format setup key for display."""
name = key.get("name", "unknown")
key_type = key.get("type", "unknown")
revoked = "🚫 REVOKED" if key.get("revoked") else ""
auto_groups = ", ".join(key.get("auto_groups", [])) or "none"
return f"{name} ({key_type}) - Auto-groups: {auto_groups} {revoked}"
def format_route(route: Dict) -> str:
"""Format route for display."""
desc = route.get("description", "unknown")
network = route.get("network", "N/A")
enabled = "" if route.get("enabled") else ""
return f"{enabled} {desc} - {network}"
# ===== PEERS =====
def list_peers():
"""List all peers."""
peers = api_request("GET", "/peers")
if not peers:
print("No peers found.")
return
print(f"Found {len(peers)} peer(s):\n")
for peer in peers:
print(format_peer(peer))
def get_peer(peer_id: str):
"""Get peer details."""
peer = api_request("GET", f"/peers/{peer_id}")
print(json.dumps(peer, indent=2))
def delete_peer(peer_id: str):
"""Delete a peer."""
api_request("DELETE", f"/peers/{peer_id}")
print(f"Peer {peer_id} deleted.")
# ===== GROUPS =====
def list_groups():
"""List all groups."""
groups = api_request("GET", "/groups")
if not groups:
print("No groups found.")
return
print(f"Found {len(groups)} group(s):\n")
for group in groups:
print(format_group(group))
def get_group(group_id: str):
"""Get group details."""
group = api_request("GET", f"/groups/{group_id}")
print(json.dumps(group, indent=2))
def create_group(name: str, peers: Optional[List[str]] = None):
"""Create a new group."""
data = {"name": name}
if peers:
data["peers"] = peers
group = api_request("POST", "/groups", data=data)
print(f"Group created: {group.get('name')} (ID: {group.get('id')})")
def delete_group(group_id: str):
"""Delete a group."""
api_request("DELETE", f"/groups/{group_id}")
print(f"Group {group_id} deleted.")
# ===== SETUP KEYS =====
def list_setup_keys():
"""List all setup keys."""
keys = api_request("GET", "/setup-keys")
if not keys:
print("No setup keys found.")
return
print(f"Found {len(keys)} setup key(s):\n")
for key in keys:
print(format_setup_key(key))
def create_setup_key(name: str, key_type: str, expires: int, auto_groups: Optional[List[str]] = None):
"""Create a new setup key."""
data = {
"name": name,
"type": key_type,
"expires_in": expires,
"auto_groups": auto_groups or []
}
key = api_request("POST", "/setup-keys", data=data)
print(f"Setup key created: {key.get('name')}")
print(f"Key: {key.get('key')}")
print(f"ID: {key.get('id')}")
def revoke_setup_key(key_id: str):
"""Revoke a setup key."""
api_request("PUT", f"/setup-keys/{key_id}", data={"revoked": True, "auto_groups": []})
print(f"Setup key {key_id} revoked.")
# ===== POLICIES =====
def list_policies():
"""List all policies."""
policies = api_request("GET", "/policies")
if not policies:
print("No policies found.")
return
print(f"Found {len(policies)} policy(ies):\n")
for policy in policies:
print(format_policy(policy))
def get_policy(policy_id: str):
"""Get policy details."""
policy = api_request("GET", f"/policies/{policy_id}")
print(json.dumps(policy, indent=2))
def delete_policy(policy_id: str):
"""Delete a policy."""
api_request("DELETE", f"/policies/{policy_id}")
print(f"Policy {policy_id} deleted.")
# ===== ROUTES =====
def list_routes():
"""List all routes."""
routes = api_request("GET", "/routes")
if not routes:
print("No routes found.")
return
print(f"Found {len(routes)} route(s):\n")
for route in routes:
print(format_route(route))
def get_route(route_id: str):
"""Get route details."""
route = api_request("GET", f"/routes/{route_id}")
print(json.dumps(route, indent=2))
# ===== USERS =====
def list_users():
"""List all users."""
users = api_request("GET", "/users")
if not users:
print("No users found.")
return
print(f"Found {len(users)} user(s):\n")
for user in users:
name = user.get("name", user.get("email", "unknown"))
role = user.get("role", "unknown")
service = "[service]" if user.get("is_service_user") else ""
print(f"{name} ({role}) {service}")
def main():
parser = argparse.ArgumentParser(description="NetBird CLI")
subparsers = parser.add_subparsers(dest="resource", help="Resource type")
# Peers
peers_parser = subparsers.add_parser("peers", help="Manage peers")
peers_sub = peers_parser.add_subparsers(dest="action")
peers_sub.add_parser("list", help="List all peers")
peers_get = peers_sub.add_parser("get", help="Get peer details")
peers_get.add_argument("peer_id", help="Peer ID")
peers_del = peers_sub.add_parser("delete", help="Delete a peer")
peers_del.add_argument("peer_id", help="Peer ID")
# Groups
groups_parser = subparsers.add_parser("groups", help="Manage groups")
groups_sub = groups_parser.add_subparsers(dest="action")
groups_sub.add_parser("list", help="List all groups")
groups_get = groups_sub.add_parser("get", help="Get group details")
groups_get.add_argument("group_id", help="Group ID")
groups_create = groups_sub.add_parser("create", help="Create a group")
groups_create.add_argument("name", help="Group name")
groups_create.add_argument("--peers", nargs="+", help="Peer IDs to add")
groups_del = groups_sub.add_parser("delete", help="Delete a group")
groups_del.add_argument("group_id", help="Group ID")
# Setup Keys
keys_parser = subparsers.add_parser("setup-keys", help="Manage setup keys")
keys_sub = keys_parser.add_subparsers(dest="action")
keys_sub.add_parser("list", help="List all setup keys")
keys_create = keys_sub.add_parser("create", help="Create a setup key")
keys_create.add_argument("--name", required=True, help="Key name")
keys_create.add_argument("--type", choices=["one-off", "reusable"], default="reusable", help="Key type")
keys_create.add_argument("--expires", type=int, default=86400, help="Expiration in seconds (default: 86400 = 24h)")
keys_create.add_argument("--auto-groups", nargs="+", help="Group IDs to auto-assign")
keys_revoke = keys_sub.add_parser("revoke", help="Revoke a setup key")
keys_revoke.add_argument("key_id", help="Key ID")
# Policies
policies_parser = subparsers.add_parser("policies", help="Manage policies")
policies_sub = policies_parser.add_subparsers(dest="action")
policies_sub.add_parser("list", help="List all policies")
policies_get = policies_sub.add_parser("get", help="Get policy details")
policies_get.add_argument("policy_id", help="Policy ID")
policies_del = policies_sub.add_parser("delete", help="Delete a policy")
policies_del.add_argument("policy_id", help="Policy ID")
# Routes
routes_parser = subparsers.add_parser("routes", help="Manage routes")
routes_sub = routes_parser.add_subparsers(dest="action")
routes_sub.add_parser("list", help="List all routes")
routes_get = routes_sub.add_parser("get", help="Get route details")
routes_get.add_argument("route_id", help="Route ID")
# Users
users_parser = subparsers.add_parser("users", help="Manage users")
users_sub = users_parser.add_subparsers(dest="action")
users_sub.add_parser("list", help="List all users")
args = parser.parse_args()
if not args.resource:
parser.print_help()
sys.exit(1)
# Handle commands
if args.resource == "peers":
if args.action == "list":
list_peers()
elif args.action == "get":
get_peer(args.peer_id)
elif args.action == "delete":
delete_peer(args.peer_id)
elif args.resource == "groups":
if args.action == "list":
list_groups()
elif args.action == "get":
get_group(args.group_id)
elif args.action == "create":
create_group(args.name, args.peers)
elif args.action == "delete":
delete_group(args.group_id)
elif args.resource == "setup-keys":
if args.action == "list":
list_setup_keys()
elif args.action == "create":
create_setup_key(args.name, args.type, args.expires, args.auto_groups)
elif args.action == "revoke":
revoke_setup_key(args.key_id)
elif args.resource == "policies":
if args.action == "list":
list_policies()
elif args.action == "get":
get_policy(args.policy_id)
elif args.action == "delete":
delete_policy(args.policy_id)
elif args.resource == "routes":
if args.action == "list":
list_routes()
elif args.action == "get":
get_route(args.route_id)
elif args.resource == "users":
if args.action == "list":
list_users()
if __name__ == "__main__":
main()