392 lines
12 KiB
Python
392 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
NetBird CLI - Simple command-line interface for NetBird API
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import urllib.request
|
|
import urllib.error
|
|
import urllib.parse
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
BASE_URL = "https://net.dcglab.co.uk/api"
|
|
|
|
|
|
def get_token() -> str:
|
|
"""Get API token from environment."""
|
|
token = os.environ.get("NETBIRD_API_TOKEN")
|
|
if not token:
|
|
print("Error: NETBIRD_API_TOKEN environment variable not set", file=sys.stderr)
|
|
print("Create a Personal Access Token in your NetBird dashboard:", file=sys.stderr)
|
|
print(" User settings → Personal Access Tokens", file=sys.stderr)
|
|
sys.exit(1)
|
|
return token
|
|
|
|
|
|
def api_request(
|
|
method: str,
|
|
endpoint: str,
|
|
data: Optional[Dict] = None,
|
|
params: Optional[Dict] = None,
|
|
) -> Any:
|
|
"""Make an API request to NetBird."""
|
|
token = get_token()
|
|
|
|
url = f"{BASE_URL}{endpoint}"
|
|
if params:
|
|
query = "&".join(f"{k}={urllib.parse.quote(str(v))}" for k, v in params.items())
|
|
url = f"{url}?{query}"
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
req = urllib.request.Request(
|
|
url,
|
|
method=method,
|
|
headers=headers,
|
|
)
|
|
|
|
if data:
|
|
req.data = json.dumps(data).encode("utf-8")
|
|
|
|
try:
|
|
with urllib.request.urlopen(req) as response:
|
|
if response.status == 204:
|
|
return None
|
|
return json.loads(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as e:
|
|
error_body = e.read().decode("utf-8")
|
|
print(f"HTTP Error {e.code}: {e.reason}", file=sys.stderr)
|
|
try:
|
|
error_json = json.loads(error_body)
|
|
print(json.dumps(error_json, indent=2), file=sys.stderr)
|
|
except:
|
|
print(error_body, file=sys.stderr)
|
|
sys.exit(1)
|
|
except urllib.error.URLError as e:
|
|
print(f"Connection error: {e.reason}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def format_peer(peer: Dict) -> str:
|
|
"""Format peer for display."""
|
|
status = "🟢" if peer.get("connected") else "🔴"
|
|
name = peer.get("name", "unknown")
|
|
ip = peer.get("ip", "N/A")
|
|
hostname = peer.get("hostname", "N/A")
|
|
os_info = peer.get("os", "N/A")
|
|
return f"{status} {name} ({hostname}) - IP: {ip} - OS: {os_info}"
|
|
|
|
|
|
def format_group(group: Dict) -> str:
|
|
"""Format group for display."""
|
|
name = group.get("name", "unknown")
|
|
gid = group.get("id", "N/A")[:8]
|
|
peers = group.get("peers_count", 0)
|
|
return f"{name} ({gid}...) - {peers} peers"
|
|
|
|
|
|
def format_policy(policy: Dict) -> str:
|
|
"""Format policy for display."""
|
|
name = policy.get("name", "unknown")
|
|
enabled = "✅" if policy.get("enabled") else "❌"
|
|
rules_count = len(policy.get("rules", []))
|
|
return f"{enabled} {name} - {rules_count} rules"
|
|
|
|
|
|
def format_setup_key(key: Dict) -> str:
|
|
"""Format setup key for display."""
|
|
name = key.get("name", "unknown")
|
|
key_type = key.get("type", "unknown")
|
|
revoked = "🚫 REVOKED" if key.get("revoked") else ""
|
|
auto_groups = ", ".join(key.get("auto_groups", [])) or "none"
|
|
return f"{name} ({key_type}) - Auto-groups: {auto_groups} {revoked}"
|
|
|
|
|
|
def format_route(route: Dict) -> str:
|
|
"""Format route for display."""
|
|
desc = route.get("description", "unknown")
|
|
network = route.get("network", "N/A")
|
|
enabled = "✅" if route.get("enabled") else "❌"
|
|
return f"{enabled} {desc} - {network}"
|
|
|
|
|
|
# ===== PEERS =====
|
|
|
|
def list_peers():
|
|
"""List all peers."""
|
|
peers = api_request("GET", "/peers")
|
|
if not peers:
|
|
print("No peers found.")
|
|
return
|
|
|
|
print(f"Found {len(peers)} peer(s):\n")
|
|
for peer in peers:
|
|
print(format_peer(peer))
|
|
|
|
|
|
def get_peer(peer_id: str):
|
|
"""Get peer details."""
|
|
peer = api_request("GET", f"/peers/{peer_id}")
|
|
print(json.dumps(peer, indent=2))
|
|
|
|
|
|
def delete_peer(peer_id: str):
|
|
"""Delete a peer."""
|
|
api_request("DELETE", f"/peers/{peer_id}")
|
|
print(f"Peer {peer_id} deleted.")
|
|
|
|
|
|
# ===== GROUPS =====
|
|
|
|
def list_groups():
|
|
"""List all groups."""
|
|
groups = api_request("GET", "/groups")
|
|
if not groups:
|
|
print("No groups found.")
|
|
return
|
|
|
|
print(f"Found {len(groups)} group(s):\n")
|
|
for group in groups:
|
|
print(format_group(group))
|
|
|
|
|
|
def get_group(group_id: str):
|
|
"""Get group details."""
|
|
group = api_request("GET", f"/groups/{group_id}")
|
|
print(json.dumps(group, indent=2))
|
|
|
|
|
|
def create_group(name: str, peers: Optional[List[str]] = None):
|
|
"""Create a new group."""
|
|
data = {"name": name}
|
|
if peers:
|
|
data["peers"] = peers
|
|
|
|
group = api_request("POST", "/groups", data=data)
|
|
print(f"Group created: {group.get('name')} (ID: {group.get('id')})")
|
|
|
|
|
|
def delete_group(group_id: str):
|
|
"""Delete a group."""
|
|
api_request("DELETE", f"/groups/{group_id}")
|
|
print(f"Group {group_id} deleted.")
|
|
|
|
|
|
# ===== SETUP KEYS =====
|
|
|
|
def list_setup_keys():
|
|
"""List all setup keys."""
|
|
keys = api_request("GET", "/setup-keys")
|
|
if not keys:
|
|
print("No setup keys found.")
|
|
return
|
|
|
|
print(f"Found {len(keys)} setup key(s):\n")
|
|
for key in keys:
|
|
print(format_setup_key(key))
|
|
|
|
|
|
def create_setup_key(name: str, key_type: str, expires: int, auto_groups: Optional[List[str]] = None):
|
|
"""Create a new setup key."""
|
|
data = {
|
|
"name": name,
|
|
"type": key_type,
|
|
"expires_in": expires,
|
|
"auto_groups": auto_groups or []
|
|
}
|
|
|
|
key = api_request("POST", "/setup-keys", data=data)
|
|
print(f"Setup key created: {key.get('name')}")
|
|
print(f"Key: {key.get('key')}")
|
|
print(f"ID: {key.get('id')}")
|
|
|
|
|
|
def revoke_setup_key(key_id: str):
|
|
"""Revoke a setup key."""
|
|
api_request("PUT", f"/setup-keys/{key_id}", data={"revoked": True, "auto_groups": []})
|
|
print(f"Setup key {key_id} revoked.")
|
|
|
|
|
|
# ===== POLICIES =====
|
|
|
|
def list_policies():
|
|
"""List all policies."""
|
|
policies = api_request("GET", "/policies")
|
|
if not policies:
|
|
print("No policies found.")
|
|
return
|
|
|
|
print(f"Found {len(policies)} policy(ies):\n")
|
|
for policy in policies:
|
|
print(format_policy(policy))
|
|
|
|
|
|
def get_policy(policy_id: str):
|
|
"""Get policy details."""
|
|
policy = api_request("GET", f"/policies/{policy_id}")
|
|
print(json.dumps(policy, indent=2))
|
|
|
|
|
|
def delete_policy(policy_id: str):
|
|
"""Delete a policy."""
|
|
api_request("DELETE", f"/policies/{policy_id}")
|
|
print(f"Policy {policy_id} deleted.")
|
|
|
|
|
|
# ===== ROUTES =====
|
|
|
|
def list_routes():
|
|
"""List all routes."""
|
|
routes = api_request("GET", "/routes")
|
|
if not routes:
|
|
print("No routes found.")
|
|
return
|
|
|
|
print(f"Found {len(routes)} route(s):\n")
|
|
for route in routes:
|
|
print(format_route(route))
|
|
|
|
|
|
def get_route(route_id: str):
|
|
"""Get route details."""
|
|
route = api_request("GET", f"/routes/{route_id}")
|
|
print(json.dumps(route, indent=2))
|
|
|
|
|
|
# ===== USERS =====
|
|
|
|
def list_users():
|
|
"""List all users."""
|
|
users = api_request("GET", "/users")
|
|
if not users:
|
|
print("No users found.")
|
|
return
|
|
|
|
print(f"Found {len(users)} user(s):\n")
|
|
for user in users:
|
|
name = user.get("name", user.get("email", "unknown"))
|
|
role = user.get("role", "unknown")
|
|
service = "[service]" if user.get("is_service_user") else ""
|
|
print(f"{name} ({role}) {service}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="NetBird CLI")
|
|
subparsers = parser.add_subparsers(dest="resource", help="Resource type")
|
|
|
|
# Peers
|
|
peers_parser = subparsers.add_parser("peers", help="Manage peers")
|
|
peers_sub = peers_parser.add_subparsers(dest="action")
|
|
peers_sub.add_parser("list", help="List all peers")
|
|
peers_get = peers_sub.add_parser("get", help="Get peer details")
|
|
peers_get.add_argument("peer_id", help="Peer ID")
|
|
peers_del = peers_sub.add_parser("delete", help="Delete a peer")
|
|
peers_del.add_argument("peer_id", help="Peer ID")
|
|
|
|
# Groups
|
|
groups_parser = subparsers.add_parser("groups", help="Manage groups")
|
|
groups_sub = groups_parser.add_subparsers(dest="action")
|
|
groups_sub.add_parser("list", help="List all groups")
|
|
groups_get = groups_sub.add_parser("get", help="Get group details")
|
|
groups_get.add_argument("group_id", help="Group ID")
|
|
groups_create = groups_sub.add_parser("create", help="Create a group")
|
|
groups_create.add_argument("name", help="Group name")
|
|
groups_create.add_argument("--peers", nargs="+", help="Peer IDs to add")
|
|
groups_del = groups_sub.add_parser("delete", help="Delete a group")
|
|
groups_del.add_argument("group_id", help="Group ID")
|
|
|
|
# Setup Keys
|
|
keys_parser = subparsers.add_parser("setup-keys", help="Manage setup keys")
|
|
keys_sub = keys_parser.add_subparsers(dest="action")
|
|
keys_sub.add_parser("list", help="List all setup keys")
|
|
keys_create = keys_sub.add_parser("create", help="Create a setup key")
|
|
keys_create.add_argument("--name", required=True, help="Key name")
|
|
keys_create.add_argument("--type", choices=["one-off", "reusable"], default="reusable", help="Key type")
|
|
keys_create.add_argument("--expires", type=int, default=86400, help="Expiration in seconds (default: 86400 = 24h)")
|
|
keys_create.add_argument("--auto-groups", nargs="+", help="Group IDs to auto-assign")
|
|
keys_revoke = keys_sub.add_parser("revoke", help="Revoke a setup key")
|
|
keys_revoke.add_argument("key_id", help="Key ID")
|
|
|
|
# Policies
|
|
policies_parser = subparsers.add_parser("policies", help="Manage policies")
|
|
policies_sub = policies_parser.add_subparsers(dest="action")
|
|
policies_sub.add_parser("list", help="List all policies")
|
|
policies_get = policies_sub.add_parser("get", help="Get policy details")
|
|
policies_get.add_argument("policy_id", help="Policy ID")
|
|
policies_del = policies_sub.add_parser("delete", help="Delete a policy")
|
|
policies_del.add_argument("policy_id", help="Policy ID")
|
|
|
|
# Routes
|
|
routes_parser = subparsers.add_parser("routes", help="Manage routes")
|
|
routes_sub = routes_parser.add_subparsers(dest="action")
|
|
routes_sub.add_parser("list", help="List all routes")
|
|
routes_get = routes_sub.add_parser("get", help="Get route details")
|
|
routes_get.add_argument("route_id", help="Route ID")
|
|
|
|
# Users
|
|
users_parser = subparsers.add_parser("users", help="Manage users")
|
|
users_sub = users_parser.add_subparsers(dest="action")
|
|
users_sub.add_parser("list", help="List all users")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.resource:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
# Handle commands
|
|
if args.resource == "peers":
|
|
if args.action == "list":
|
|
list_peers()
|
|
elif args.action == "get":
|
|
get_peer(args.peer_id)
|
|
elif args.action == "delete":
|
|
delete_peer(args.peer_id)
|
|
|
|
elif args.resource == "groups":
|
|
if args.action == "list":
|
|
list_groups()
|
|
elif args.action == "get":
|
|
get_group(args.group_id)
|
|
elif args.action == "create":
|
|
create_group(args.name, args.peers)
|
|
elif args.action == "delete":
|
|
delete_group(args.group_id)
|
|
|
|
elif args.resource == "setup-keys":
|
|
if args.action == "list":
|
|
list_setup_keys()
|
|
elif args.action == "create":
|
|
create_setup_key(args.name, args.type, args.expires, args.auto_groups)
|
|
elif args.action == "revoke":
|
|
revoke_setup_key(args.key_id)
|
|
|
|
elif args.resource == "policies":
|
|
if args.action == "list":
|
|
list_policies()
|
|
elif args.action == "get":
|
|
get_policy(args.policy_id)
|
|
elif args.action == "delete":
|
|
delete_policy(args.policy_id)
|
|
|
|
elif args.resource == "routes":
|
|
if args.action == "list":
|
|
list_routes()
|
|
elif args.action == "get":
|
|
get_route(args.route_id)
|
|
|
|
elif args.resource == "users":
|
|
if args.action == "list":
|
|
list_users()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|