mirror of
https://github.com/ION606/ML-pipeline.git
synced 2026-05-14 21:06:54 +00:00
184 lines
5.1 KiB
Python
184 lines
5.1 KiB
Python
import tempfile
|
|
from pathlib import Path
|
|
import re
|
|
from types import FunctionType
|
|
import docker
|
|
import json
|
|
|
|
import debug as debugMod
|
|
import conversation_store
|
|
from config import Config
|
|
from queries import show_thinking
|
|
|
|
|
|
class UserEnvironment:
|
|
def __init__(self, user_id: str):
|
|
self.user_id = user_id
|
|
self.client = docker.from_env()
|
|
self.temp_dir = tempfile.TemporaryDirectory(prefix=f"{user_id}_code_")
|
|
self._ensure_sandbox_image()
|
|
|
|
def _ensure_sandbox_image(self):
|
|
try:
|
|
self.client.images.get("code-sandbox")
|
|
except docker.errors.ImageNotFound:
|
|
debugMod.log("building code-sandbox image from Dockerfile.sandbox...")
|
|
|
|
try:
|
|
self.client.images.build(
|
|
path=".",
|
|
dockerfile="Dockerfile.sandbox",
|
|
tag="code-sandbox",
|
|
rm=True,
|
|
forcerm=True
|
|
)
|
|
|
|
debugMod.log("successfully built code-sandbox image")
|
|
|
|
except docker.errors.BuildError as e:
|
|
raise RuntimeError(f"Failed to build Docker image: {str(e)}") from e
|
|
|
|
except docker.errors.APIError as e:
|
|
raise RuntimeError(f"Docker API error: {str(e)}") from e
|
|
|
|
def execute_code(self, code: str, context=None, timeout=15, memory_limit=100):
|
|
# Validate input
|
|
if len(code) > Config.MAX_CODE_LENGTH:
|
|
return {"err": "Code exceeds length limit"}
|
|
|
|
# Create temp file
|
|
tmp_path = Path(self.temp_dir.name) / "script.py"
|
|
with open(tmp_path, "w") as f:
|
|
if context:
|
|
f.write(f"context = {repr(context)}\n")
|
|
f.write(code)
|
|
|
|
container = None
|
|
|
|
try:
|
|
# Execute in container
|
|
container = self.client.containers.run(
|
|
image="code-sandbox",
|
|
command=[],
|
|
volumes={
|
|
str(tmp_path): {'bind': '/sandbox/script.py', 'mode': 'ro'}
|
|
},
|
|
mem_limit=f"{memory_limit}m",
|
|
cpu_period=100000,
|
|
cpu_quota=50000, # Limit to 50% CPU
|
|
network_mode='none',
|
|
user='sandboxuser',
|
|
read_only=True,
|
|
security_opt=['no-new-privileges'],
|
|
cap_drop=['ALL'],
|
|
detach=True,
|
|
stdout=True,
|
|
stderr=True,
|
|
)
|
|
|
|
# Wait for completion
|
|
result = container.wait(timeout=timeout + 5)
|
|
logs = container.logs().decode()
|
|
|
|
# Parse results
|
|
output = logs[:Config.OUTPUT_CHAR_LIMIT]
|
|
error = None
|
|
|
|
if result['StatusCode'] != 0:
|
|
error = f"Container exited with code {result['StatusCode']}"
|
|
|
|
return {
|
|
"output": output,
|
|
"error": error,
|
|
"status": result['StatusCode']
|
|
}
|
|
|
|
except docker.errors.ContainerError as e:
|
|
return {"err": f"Container error: {str(e)}"}
|
|
except docker.errors.DockerException as e:
|
|
return {"err": f"Docker error: {str(e)}"}
|
|
except Exception as e:
|
|
return {"err": f"Execution failed: {str(e)}"}
|
|
finally:
|
|
tmp_path.unlink(missing_ok=True)
|
|
if container:
|
|
try:
|
|
container.remove(force=True)
|
|
except docker.errors.NotFound:
|
|
pass
|
|
|
|
def cleanup(self):
|
|
self.temp_dir.cleanup()
|
|
|
|
|
|
def orchestrate_code(orchestrate: FunctionType, vector_store, chunks, user_env: UserEnvironment, code_blocks, query, response, links):
|
|
debugMod.log("\nExecuting code...\n")
|
|
|
|
for code in code_blocks:
|
|
retry_count = 0
|
|
current_code = code.strip()
|
|
last_error = None
|
|
|
|
while retry_count < Config.MAX_CODE_RETRIES:
|
|
execution_result = user_env.execute_code(
|
|
current_code, context=chunks if chunks else None)
|
|
|
|
if isinstance(execution_result, dict) and execution_result['error']:
|
|
# hard code to let user know the program didn't explode
|
|
show_thinking(
|
|
"[hmmm...looks like this code didn't work properly, I'll try debugging it now!]")
|
|
|
|
last_error = execution_result['err']
|
|
debugMod.log(f"\nExecution error: {last_error}\n")
|
|
|
|
# Generate fix prompt using full orchestration
|
|
fix_prompt = f"""Fix this Python code. Error: {last_error}
|
|
Code:
|
|
```python
|
|
{current_code}
|
|
```
|
|
Requirements:
|
|
1. Preserve original functionality
|
|
2. Explain fixes in comments
|
|
3. Return ONLY corrected code in a single Python block"""
|
|
|
|
[fixed_response, _] = orchestrate(fix_prompt, vector_store)
|
|
new_blocks = re.findall(
|
|
Config.code_block_regex(), fixed_response, re.DOTALL)
|
|
|
|
if new_blocks:
|
|
current_code = new_blocks[0].strip()
|
|
retry_count += 1
|
|
debugMod.log(f"\nRetry #{retry_count} with modified code\n")
|
|
else:
|
|
break
|
|
else:
|
|
debugMod.log("\nCode Execution Result:\n", json.dumps(execution_result))
|
|
print("\nCode Execution Result:\n", execution_result['output'].strip())
|
|
|
|
if execution_result:
|
|
# Get current conversation ID after saving conversation
|
|
conv_id = conversation_store.save_conversation(query, response, links)
|
|
|
|
# Save code execution with context
|
|
conversation_store.save_code_execution(
|
|
code=current_code,
|
|
result=execution_result,
|
|
error=execution_result.get('err') if isinstance(
|
|
execution_result, dict) else None,
|
|
retries=retry_count,
|
|
conversation_id=conv_id
|
|
)
|
|
|
|
break
|
|
|
|
if last_error and retry_count >= Config.MAX_CODE_RETRIES:
|
|
debugMod.log(
|
|
f"\nFailed to fix after {Config.MAX_CODE_RETRIES} attempts. Final error: {last_error}\n")
|
|
# Request human intervention via orchestration
|
|
help_response = orchestrate(
|
|
f"Explain this code error to user: {last_error}",
|
|
vector_store
|
|
)[0]
|
|
debugMod.log(help_response + "\n")
|