Files
ML-pipeline/codeExecution.py

184 lines
5.1 KiB
Python

import tempfile
from pathlib import Path
import re
from types import FunctionType
import docker
import json
import debug as debugMod
import conversation_store
from config import Config
from queries import show_thinking
class UserEnvironment:
def __init__(self, user_id: str):
self.user_id = user_id
self.client = docker.from_env()
self.temp_dir = tempfile.TemporaryDirectory(prefix=f"{user_id}_code_")
self._ensure_sandbox_image()
def _ensure_sandbox_image(self):
try:
self.client.images.get("code-sandbox")
except docker.errors.ImageNotFound:
debugMod.log("building code-sandbox image from Dockerfile.sandbox...")
try:
self.client.images.build(
path=".",
dockerfile="Dockerfile.sandbox",
tag="code-sandbox",
rm=True,
forcerm=True
)
debugMod.log("successfully built code-sandbox image")
except docker.errors.BuildError as e:
raise RuntimeError(f"Failed to build Docker image: {str(e)}") from e
except docker.errors.APIError as e:
raise RuntimeError(f"Docker API error: {str(e)}") from e
def execute_code(self, code: str, context=None, timeout=15, memory_limit=100):
# Validate input
if len(code) > Config.MAX_CODE_LENGTH:
return {"err": "Code exceeds length limit"}
# Create temp file
tmp_path = Path(self.temp_dir.name) / "script.py"
with open(tmp_path, "w") as f:
if context:
f.write(f"context = {repr(context)}\n")
f.write(code)
container = None
try:
# Execute in container
container = self.client.containers.run(
image="code-sandbox",
command=[],
volumes={
str(tmp_path): {'bind': '/sandbox/script.py', 'mode': 'ro'}
},
mem_limit=f"{memory_limit}m",
cpu_period=100000,
cpu_quota=50000, # Limit to 50% CPU
network_mode='none',
user='sandboxuser',
read_only=True,
security_opt=['no-new-privileges'],
cap_drop=['ALL'],
detach=True,
stdout=True,
stderr=True,
)
# Wait for completion
result = container.wait(timeout=timeout + 5)
logs = container.logs().decode()
# Parse results
output = logs[:Config.OUTPUT_CHAR_LIMIT]
error = None
if result['StatusCode'] != 0:
error = f"Container exited with code {result['StatusCode']}"
return {
"output": output,
"error": error,
"status": result['StatusCode']
}
except docker.errors.ContainerError as e:
return {"err": f"Container error: {str(e)}"}
except docker.errors.DockerException as e:
return {"err": f"Docker error: {str(e)}"}
except Exception as e:
return {"err": f"Execution failed: {str(e)}"}
finally:
tmp_path.unlink(missing_ok=True)
if container:
try:
container.remove(force=True)
except docker.errors.NotFound:
pass
def cleanup(self):
self.temp_dir.cleanup()
def orchestrate_code(orchestrate: FunctionType, vector_store, chunks, user_env: UserEnvironment, code_blocks, query, response, links):
debugMod.log("\nExecuting code...\n")
for code in code_blocks:
retry_count = 0
current_code = code.strip()
last_error = None
while retry_count < Config.MAX_CODE_RETRIES:
execution_result = user_env.execute_code(
current_code, context=chunks if chunks else None)
if isinstance(execution_result, dict) and execution_result['error']:
# hard code to let user know the program didn't explode
show_thinking(
"[hmmm...looks like this code didn't work properly, I'll try debugging it now!]")
last_error = execution_result['err']
debugMod.log(f"\nExecution error: {last_error}\n")
# Generate fix prompt using full orchestration
fix_prompt = f"""Fix this Python code. Error: {last_error}
Code:
```python
{current_code}
```
Requirements:
1. Preserve original functionality
2. Explain fixes in comments
3. Return ONLY corrected code in a single Python block"""
[fixed_response, _] = orchestrate(fix_prompt, vector_store)
new_blocks = re.findall(
Config.code_block_regex(), fixed_response, re.DOTALL)
if new_blocks:
current_code = new_blocks[0].strip()
retry_count += 1
debugMod.log(f"\nRetry #{retry_count} with modified code\n")
else:
break
else:
debugMod.log("\nCode Execution Result:\n", json.dumps(execution_result))
print("\nCode Execution Result:\n", execution_result['output'].strip())
if execution_result:
# Get current conversation ID after saving conversation
conv_id = conversation_store.save_conversation(query, response, links)
# Save code execution with context
conversation_store.save_code_execution(
code=current_code,
result=execution_result,
error=execution_result.get('err') if isinstance(
execution_result, dict) else None,
retries=retry_count,
conversation_id=conv_id
)
break
if last_error and retry_count >= Config.MAX_CODE_RETRIES:
debugMod.log(
f"\nFailed to fix after {Config.MAX_CODE_RETRIES} attempts. Final error: {last_error}\n")
# Request human intervention via orchestration
help_response = orchestrate(
f"Explain this code error to user: {last_error}",
vector_store
)[0]
debugMod.log(help_response + "\n")