Add initial skills: caldav, netbird, ourgroceries, planka
This commit is contained in:
@@ -0,0 +1,391 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
NetBird CLI - Simple command-line interface for NetBird API
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import argparse
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
BASE_URL = "https://net.dcglab.co.uk/api"
|
||||
|
||||
|
||||
def get_token() -> str:
|
||||
"""Get API token from environment."""
|
||||
token = os.environ.get("NETBIRD_API_TOKEN")
|
||||
if not token:
|
||||
print("Error: NETBIRD_API_TOKEN environment variable not set", file=sys.stderr)
|
||||
print("Create a Personal Access Token in your NetBird dashboard:", file=sys.stderr)
|
||||
print(" User settings → Personal Access Tokens", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return token
|
||||
|
||||
|
||||
def api_request(
|
||||
method: str,
|
||||
endpoint: str,
|
||||
data: Optional[Dict] = None,
|
||||
params: Optional[Dict] = None,
|
||||
) -> Any:
|
||||
"""Make an API request to NetBird."""
|
||||
token = get_token()
|
||||
|
||||
url = f"{BASE_URL}{endpoint}"
|
||||
if params:
|
||||
query = "&".join(f"{k}={urllib.parse.quote(str(v))}" for k, v in params.items())
|
||||
url = f"{url}?{query}"
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
method=method,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
if data:
|
||||
req.data = json.dumps(data).encode("utf-8")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req) as response:
|
||||
if response.status == 204:
|
||||
return None
|
||||
return json.loads(response.read().decode("utf-8"))
|
||||
except urllib.error.HTTPError as e:
|
||||
error_body = e.read().decode("utf-8")
|
||||
print(f"HTTP Error {e.code}: {e.reason}", file=sys.stderr)
|
||||
try:
|
||||
error_json = json.loads(error_body)
|
||||
print(json.dumps(error_json, indent=2), file=sys.stderr)
|
||||
except:
|
||||
print(error_body, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except urllib.error.URLError as e:
|
||||
print(f"Connection error: {e.reason}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def format_peer(peer: Dict) -> str:
|
||||
"""Format peer for display."""
|
||||
status = "🟢" if peer.get("connected") else "🔴"
|
||||
name = peer.get("name", "unknown")
|
||||
ip = peer.get("ip", "N/A")
|
||||
hostname = peer.get("hostname", "N/A")
|
||||
os_info = peer.get("os", "N/A")
|
||||
return f"{status} {name} ({hostname}) - IP: {ip} - OS: {os_info}"
|
||||
|
||||
|
||||
def format_group(group: Dict) -> str:
|
||||
"""Format group for display."""
|
||||
name = group.get("name", "unknown")
|
||||
gid = group.get("id", "N/A")[:8]
|
||||
peers = group.get("peers_count", 0)
|
||||
return f"{name} ({gid}...) - {peers} peers"
|
||||
|
||||
|
||||
def format_policy(policy: Dict) -> str:
|
||||
"""Format policy for display."""
|
||||
name = policy.get("name", "unknown")
|
||||
enabled = "✅" if policy.get("enabled") else "❌"
|
||||
rules_count = len(policy.get("rules", []))
|
||||
return f"{enabled} {name} - {rules_count} rules"
|
||||
|
||||
|
||||
def format_setup_key(key: Dict) -> str:
|
||||
"""Format setup key for display."""
|
||||
name = key.get("name", "unknown")
|
||||
key_type = key.get("type", "unknown")
|
||||
revoked = "🚫 REVOKED" if key.get("revoked") else ""
|
||||
auto_groups = ", ".join(key.get("auto_groups", [])) or "none"
|
||||
return f"{name} ({key_type}) - Auto-groups: {auto_groups} {revoked}"
|
||||
|
||||
|
||||
def format_route(route: Dict) -> str:
|
||||
"""Format route for display."""
|
||||
desc = route.get("description", "unknown")
|
||||
network = route.get("network", "N/A")
|
||||
enabled = "✅" if route.get("enabled") else "❌"
|
||||
return f"{enabled} {desc} - {network}"
|
||||
|
||||
|
||||
# ===== PEERS =====
|
||||
|
||||
def list_peers():
|
||||
"""List all peers."""
|
||||
peers = api_request("GET", "/peers")
|
||||
if not peers:
|
||||
print("No peers found.")
|
||||
return
|
||||
|
||||
print(f"Found {len(peers)} peer(s):\n")
|
||||
for peer in peers:
|
||||
print(format_peer(peer))
|
||||
|
||||
|
||||
def get_peer(peer_id: str):
|
||||
"""Get peer details."""
|
||||
peer = api_request("GET", f"/peers/{peer_id}")
|
||||
print(json.dumps(peer, indent=2))
|
||||
|
||||
|
||||
def delete_peer(peer_id: str):
|
||||
"""Delete a peer."""
|
||||
api_request("DELETE", f"/peers/{peer_id}")
|
||||
print(f"Peer {peer_id} deleted.")
|
||||
|
||||
|
||||
# ===== GROUPS =====
|
||||
|
||||
def list_groups():
|
||||
"""List all groups."""
|
||||
groups = api_request("GET", "/groups")
|
||||
if not groups:
|
||||
print("No groups found.")
|
||||
return
|
||||
|
||||
print(f"Found {len(groups)} group(s):\n")
|
||||
for group in groups:
|
||||
print(format_group(group))
|
||||
|
||||
|
||||
def get_group(group_id: str):
|
||||
"""Get group details."""
|
||||
group = api_request("GET", f"/groups/{group_id}")
|
||||
print(json.dumps(group, indent=2))
|
||||
|
||||
|
||||
def create_group(name: str, peers: Optional[List[str]] = None):
|
||||
"""Create a new group."""
|
||||
data = {"name": name}
|
||||
if peers:
|
||||
data["peers"] = peers
|
||||
|
||||
group = api_request("POST", "/groups", data=data)
|
||||
print(f"Group created: {group.get('name')} (ID: {group.get('id')})")
|
||||
|
||||
|
||||
def delete_group(group_id: str):
|
||||
"""Delete a group."""
|
||||
api_request("DELETE", f"/groups/{group_id}")
|
||||
print(f"Group {group_id} deleted.")
|
||||
|
||||
|
||||
# ===== SETUP KEYS =====
|
||||
|
||||
def list_setup_keys():
|
||||
"""List all setup keys."""
|
||||
keys = api_request("GET", "/setup-keys")
|
||||
if not keys:
|
||||
print("No setup keys found.")
|
||||
return
|
||||
|
||||
print(f"Found {len(keys)} setup key(s):\n")
|
||||
for key in keys:
|
||||
print(format_setup_key(key))
|
||||
|
||||
|
||||
def create_setup_key(name: str, key_type: str, expires: int, auto_groups: Optional[List[str]] = None):
|
||||
"""Create a new setup key."""
|
||||
data = {
|
||||
"name": name,
|
||||
"type": key_type,
|
||||
"expires_in": expires,
|
||||
"auto_groups": auto_groups or []
|
||||
}
|
||||
|
||||
key = api_request("POST", "/setup-keys", data=data)
|
||||
print(f"Setup key created: {key.get('name')}")
|
||||
print(f"Key: {key.get('key')}")
|
||||
print(f"ID: {key.get('id')}")
|
||||
|
||||
|
||||
def revoke_setup_key(key_id: str):
|
||||
"""Revoke a setup key."""
|
||||
api_request("PUT", f"/setup-keys/{key_id}", data={"revoked": True, "auto_groups": []})
|
||||
print(f"Setup key {key_id} revoked.")
|
||||
|
||||
|
||||
# ===== POLICIES =====
|
||||
|
||||
def list_policies():
|
||||
"""List all policies."""
|
||||
policies = api_request("GET", "/policies")
|
||||
if not policies:
|
||||
print("No policies found.")
|
||||
return
|
||||
|
||||
print(f"Found {len(policies)} policy(ies):\n")
|
||||
for policy in policies:
|
||||
print(format_policy(policy))
|
||||
|
||||
|
||||
def get_policy(policy_id: str):
|
||||
"""Get policy details."""
|
||||
policy = api_request("GET", f"/policies/{policy_id}")
|
||||
print(json.dumps(policy, indent=2))
|
||||
|
||||
|
||||
def delete_policy(policy_id: str):
|
||||
"""Delete a policy."""
|
||||
api_request("DELETE", f"/policies/{policy_id}")
|
||||
print(f"Policy {policy_id} deleted.")
|
||||
|
||||
|
||||
# ===== ROUTES =====
|
||||
|
||||
def list_routes():
|
||||
"""List all routes."""
|
||||
routes = api_request("GET", "/routes")
|
||||
if not routes:
|
||||
print("No routes found.")
|
||||
return
|
||||
|
||||
print(f"Found {len(routes)} route(s):\n")
|
||||
for route in routes:
|
||||
print(format_route(route))
|
||||
|
||||
|
||||
def get_route(route_id: str):
|
||||
"""Get route details."""
|
||||
route = api_request("GET", f"/routes/{route_id}")
|
||||
print(json.dumps(route, indent=2))
|
||||
|
||||
|
||||
# ===== USERS =====
|
||||
|
||||
def list_users():
|
||||
"""List all users."""
|
||||
users = api_request("GET", "/users")
|
||||
if not users:
|
||||
print("No users found.")
|
||||
return
|
||||
|
||||
print(f"Found {len(users)} user(s):\n")
|
||||
for user in users:
|
||||
name = user.get("name", user.get("email", "unknown"))
|
||||
role = user.get("role", "unknown")
|
||||
service = "[service]" if user.get("is_service_user") else ""
|
||||
print(f"{name} ({role}) {service}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="NetBird CLI")
|
||||
subparsers = parser.add_subparsers(dest="resource", help="Resource type")
|
||||
|
||||
# Peers
|
||||
peers_parser = subparsers.add_parser("peers", help="Manage peers")
|
||||
peers_sub = peers_parser.add_subparsers(dest="action")
|
||||
peers_sub.add_parser("list", help="List all peers")
|
||||
peers_get = peers_sub.add_parser("get", help="Get peer details")
|
||||
peers_get.add_argument("peer_id", help="Peer ID")
|
||||
peers_del = peers_sub.add_parser("delete", help="Delete a peer")
|
||||
peers_del.add_argument("peer_id", help="Peer ID")
|
||||
|
||||
# Groups
|
||||
groups_parser = subparsers.add_parser("groups", help="Manage groups")
|
||||
groups_sub = groups_parser.add_subparsers(dest="action")
|
||||
groups_sub.add_parser("list", help="List all groups")
|
||||
groups_get = groups_sub.add_parser("get", help="Get group details")
|
||||
groups_get.add_argument("group_id", help="Group ID")
|
||||
groups_create = groups_sub.add_parser("create", help="Create a group")
|
||||
groups_create.add_argument("name", help="Group name")
|
||||
groups_create.add_argument("--peers", nargs="+", help="Peer IDs to add")
|
||||
groups_del = groups_sub.add_parser("delete", help="Delete a group")
|
||||
groups_del.add_argument("group_id", help="Group ID")
|
||||
|
||||
# Setup Keys
|
||||
keys_parser = subparsers.add_parser("setup-keys", help="Manage setup keys")
|
||||
keys_sub = keys_parser.add_subparsers(dest="action")
|
||||
keys_sub.add_parser("list", help="List all setup keys")
|
||||
keys_create = keys_sub.add_parser("create", help="Create a setup key")
|
||||
keys_create.add_argument("--name", required=True, help="Key name")
|
||||
keys_create.add_argument("--type", choices=["one-off", "reusable"], default="reusable", help="Key type")
|
||||
keys_create.add_argument("--expires", type=int, default=86400, help="Expiration in seconds (default: 86400 = 24h)")
|
||||
keys_create.add_argument("--auto-groups", nargs="+", help="Group IDs to auto-assign")
|
||||
keys_revoke = keys_sub.add_parser("revoke", help="Revoke a setup key")
|
||||
keys_revoke.add_argument("key_id", help="Key ID")
|
||||
|
||||
# Policies
|
||||
policies_parser = subparsers.add_parser("policies", help="Manage policies")
|
||||
policies_sub = policies_parser.add_subparsers(dest="action")
|
||||
policies_sub.add_parser("list", help="List all policies")
|
||||
policies_get = policies_sub.add_parser("get", help="Get policy details")
|
||||
policies_get.add_argument("policy_id", help="Policy ID")
|
||||
policies_del = policies_sub.add_parser("delete", help="Delete a policy")
|
||||
policies_del.add_argument("policy_id", help="Policy ID")
|
||||
|
||||
# Routes
|
||||
routes_parser = subparsers.add_parser("routes", help="Manage routes")
|
||||
routes_sub = routes_parser.add_subparsers(dest="action")
|
||||
routes_sub.add_parser("list", help="List all routes")
|
||||
routes_get = routes_sub.add_parser("get", help="Get route details")
|
||||
routes_get.add_argument("route_id", help="Route ID")
|
||||
|
||||
# Users
|
||||
users_parser = subparsers.add_parser("users", help="Manage users")
|
||||
users_sub = users_parser.add_subparsers(dest="action")
|
||||
users_sub.add_parser("list", help="List all users")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.resource:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
# Handle commands
|
||||
if args.resource == "peers":
|
||||
if args.action == "list":
|
||||
list_peers()
|
||||
elif args.action == "get":
|
||||
get_peer(args.peer_id)
|
||||
elif args.action == "delete":
|
||||
delete_peer(args.peer_id)
|
||||
|
||||
elif args.resource == "groups":
|
||||
if args.action == "list":
|
||||
list_groups()
|
||||
elif args.action == "get":
|
||||
get_group(args.group_id)
|
||||
elif args.action == "create":
|
||||
create_group(args.name, args.peers)
|
||||
elif args.action == "delete":
|
||||
delete_group(args.group_id)
|
||||
|
||||
elif args.resource == "setup-keys":
|
||||
if args.action == "list":
|
||||
list_setup_keys()
|
||||
elif args.action == "create":
|
||||
create_setup_key(args.name, args.type, args.expires, args.auto_groups)
|
||||
elif args.action == "revoke":
|
||||
revoke_setup_key(args.key_id)
|
||||
|
||||
elif args.resource == "policies":
|
||||
if args.action == "list":
|
||||
list_policies()
|
||||
elif args.action == "get":
|
||||
get_policy(args.policy_id)
|
||||
elif args.action == "delete":
|
||||
delete_policy(args.policy_id)
|
||||
|
||||
elif args.resource == "routes":
|
||||
if args.action == "list":
|
||||
list_routes()
|
||||
elif args.action == "get":
|
||||
get_route(args.route_id)
|
||||
|
||||
elif args.resource == "users":
|
||||
if args.action == "list":
|
||||
list_users()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user