added privatebin tool and tools export
This commit is contained in:
@@ -0,0 +1,101 @@
|
||||
from __future__ import annotations
|
||||
from typing import Dict, Any
|
||||
import json
|
||||
import urllib.parse as urlparse
|
||||
import privatebinapi
|
||||
|
||||
class PrivateBinWrapper():
|
||||
def __init__(self, parent):
|
||||
self.parent = parent
|
||||
|
||||
# read query params from the current request
|
||||
def _qs(self) -> Dict[str, Any]:
|
||||
parsed = urlparse.urlparse(self.parent.path)
|
||||
|
||||
# parse_qs returns lists and flatten singletons
|
||||
raw = urlparse.parse_qs(parsed.query, keep_blank_values=True)
|
||||
flat: Dict[str, Any] = {}
|
||||
|
||||
for k, v in raw.items():
|
||||
flat[k] = v[0] if len(v) == 1 else v
|
||||
return flat
|
||||
|
||||
def _ok(self, payload: Dict[str, Any], code: int = 200):
|
||||
body = json.dumps(payload).encode()
|
||||
self.parent._send(code, body)
|
||||
|
||||
def _err(self, msg: str, code: int = 400):
|
||||
self._ok({"error": msg}, code)
|
||||
|
||||
def _create(self, qs: Dict[str, Any]):
|
||||
base_url = qs.get("base_url")
|
||||
text = qs.get("text")
|
||||
password = qs.get("password")
|
||||
expiration = qs.get("expiration", "1day")
|
||||
formatting = qs.get("formatting", "plaintext")
|
||||
burn_after_reading = qs.get("burn_after_reading", "false").lower() == "true"
|
||||
discussion = qs.get("discussion", "false").lower() == "true"
|
||||
|
||||
if not base_url:
|
||||
return self._err("missing required 'base_url'")
|
||||
if not text:
|
||||
return self._err("missing required 'text'")
|
||||
|
||||
try:
|
||||
resp = privatebinapi.send(
|
||||
base_url,
|
||||
text=text,
|
||||
password=password,
|
||||
expiration=expiration,
|
||||
formatting=formatting,
|
||||
burn_after_reading=burn_after_reading,
|
||||
discussion=discussion,
|
||||
)
|
||||
# resp already contains: status, id, url, full_url, deletetoken, passcode?
|
||||
return self._ok(resp, 200)
|
||||
except Exception as e:
|
||||
return self._err(f"privatebin send failed: {e}", 502)
|
||||
|
||||
def _read(self, qs: Dict[str, Any]):
|
||||
full_url = qs.get("full_url")
|
||||
password = qs.get("password")
|
||||
if not full_url:
|
||||
return self._err("missing required 'full_url'")
|
||||
try:
|
||||
resp = privatebinapi.get(full_url, password=password)
|
||||
# has status, id, url, v, text, meta, attachment
|
||||
return self._ok(resp, 200)
|
||||
except Exception as e:
|
||||
return self._err(f"privatebin get failed: {e}", 502)
|
||||
|
||||
def _delete(self, qs: Dict[str, Any]):
|
||||
full_url = qs.get("full_url")
|
||||
deletetoken = qs.get("deletetoken")
|
||||
if not full_url or not deletetoken:
|
||||
return self._err("missing required 'full_url' or 'deletetoken'")
|
||||
try:
|
||||
resp = privatebinapi.delete(full_url, deletetoken)
|
||||
if resp == None:
|
||||
return self._err("response is null!")
|
||||
else:
|
||||
return self._ok(resp, 200)
|
||||
except Exception as e:
|
||||
return self._err(f"privatebin delete failed: {e}", 502)
|
||||
|
||||
def run_command(self, cmd_path: str):
|
||||
# expected forms:
|
||||
# /privatebin/create?base_url=...&text=...&password=...&expiration=...
|
||||
# /privatebin/read?full_url=...&password=...
|
||||
# /privatebin/delete?full_url=...&deletetoken=...
|
||||
cmd = (cmd_path or "").strip("/").lower()
|
||||
qs = self._qs()
|
||||
|
||||
if cmd == "create":
|
||||
return self._create(qs)
|
||||
if cmd == "read":
|
||||
return self._read(qs)
|
||||
if cmd == "delete":
|
||||
return self._delete(qs)
|
||||
|
||||
# unknown subpath
|
||||
return self._err("unknown privatebin command", 404)
|
||||
Reference in New Issue
Block a user