tools v1.2
__all__ = ['AudioTranscriptionTool', 'BaseImageGenerationTool', 'BaseTool', 'CalculatorTool', 'ComputerTool', 'DateTimeTool', 'EmailTool', 'ExtractionTool', 'FileManagerTool', 'GriptapeCloudToolTool', 'ImageQueryTool', 'InpaintingImageGenerationTool', 'OutpaintingImageGenerationTool', 'PromptImageGenerationTool', 'PromptSummaryTool', 'QueryTool', 'RagTool', 'RestApiTool', 'SqlTool', 'StructureRunTool', 'StructuredOutputTool', 'TextToSpeechTool', 'VariationImageGenerationTool', 'VectorStoreTool', 'WebScraperTool', 'WebSearchTool']module-attribute
Bases:
BaseTool
Source Code in griptape/tools/audio_transcription/tool.py
@define class AudioTranscriptionTool(BaseTool): """A tool that can be used to generate transcriptions from input audio.""" audio_transcription_driver: BaseAudioTranscriptionDriver = field(kw_only=True) audio_loader: AudioLoader = field(default=Factory(lambda: AudioLoader()), kw_only=True) @activity( config={ "description": "This tool can be used to generate transcriptions of audio files on disk.", "schema": Schema({Literal("path", description="The paths to an audio file on disk."): str}), }, ) def transcribe_audio_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: audio_path = params["values"]["path"] audio_artifact = self.audio_loader.load(audio_path) return self.audio_transcription_driver.run(audio_artifact) @activity( config={ "description": "This tool can be used to generate the transcription of an audio artifact in memory.", "schema": Schema({"schema": Schema({"memory_name": str, "artifact_namespace": str, "artifact_name": str})}), }, ) def transcribe_audio_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact: memory = self.find_input_memory(params["values"]["memory_name"]) artifact_namespace = params["values"]["artifact_namespace"] artifact_name = params["values"]["artifact_name"] if memory is None: return ErrorArtifact("memory not found") audio_artifact = cast( "AudioArtifact", load_artifact_from_memory(memory, artifact_namespace, artifact_name, AudioArtifact), ) return self.audio_transcription_driver.run(audio_artifact)
audio_loader = field(default=Factory(lambda: AudioLoader()), kw_only=True)class-attribute instance-attributeaudio_transcription_driver = field(kw_only=True)class-attribute instance-attribute
transcribe_audio_from_disk(params)
Source Code in griptape/tools/audio_transcription/tool.py
@activity( config={ "description": "This tool can be used to generate transcriptions of audio files on disk.", "schema": Schema({Literal("path", description="The paths to an audio file on disk."): str}), }, ) def transcribe_audio_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: audio_path = params["values"]["path"] audio_artifact = self.audio_loader.load(audio_path) return self.audio_transcription_driver.run(audio_artifact)
transcribe_audio_from_memory(params)
Source Code in griptape/tools/audio_transcription/tool.py
@activity( config={ "description": "This tool can be used to generate the transcription of an audio artifact in memory.", "schema": Schema({"schema": Schema({"memory_name": str, "artifact_namespace": str, "artifact_name": str})}), }, ) def transcribe_audio_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact: memory = self.find_input_memory(params["values"]["memory_name"]) artifact_namespace = params["values"]["artifact_namespace"] artifact_name = params["values"]["artifact_name"] if memory is None: return ErrorArtifact("memory not found") audio_artifact = cast( "AudioArtifact", load_artifact_from_memory(memory, artifact_namespace, artifact_name, AudioArtifact), ) return self.audio_transcription_driver.run(audio_artifact)
BaseImageGenerationTool
Bases:
ArtifactFileOutputMixin
,  BaseTool
Source Code in griptape/tools/base_image_generation_tool.py
@define class BaseImageGenerationTool(ArtifactFileOutputMixin, BaseTool): """A base class for tools that generate images from text prompts.""" PROMPT_DESCRIPTION = "Features and qualities to include in the generated image, descriptive and succinct." NEGATIVE_PROMPT_DESCRIPTION = ( "Features and qualities to avoid in the generated image. Affirmatively describe " "what to avoid, for example: to avoid the color red, include 'red' " "rather than 'no red'." )
NEGATIVE_PROMPT_DESCRIPTION = "Features and qualities to avoid in the generated image. Affirmatively describe what to avoid, for example: to avoid the color red, include 'red' rather than 'no red'."class-attribute instance-attributePROMPT_DESCRIPTION = 'Features and qualities to include in the generated image, descriptive and succinct.'class-attribute instance-attribute
BaseTool
Bases:
ActivityMixin
,  SerializableMixin
,  RunnableMixin['BaseTool']
, ABC
Attributes
| Name | Type | Description | 
|---|---|---|
name | str | Tool name. | 
input_memory | Optional[list[TaskMemory]] | TaskMemory available in tool activities. Gets automatically set if None. | 
output_memory | Optional[dict[str, list[TaskMemory]]] | TaskMemory that activities write to be default. Gets automatically set if None. | 
install_dependencies_on_init | bool | Determines whether dependencies from the tool requirements.txt file are installed in init. | 
dependencies_install_directory | Optional[str] | Custom dependency install directory. | 
verbose | bool | Determines whether tool operations (such as dependency installation) should be verbose. | 
off_prompt | bool | Determines whether tool activity output goes to the output memory. | 
Source Code in griptape/tools/base_tool.py
@define class BaseTool(ActivityMixin, SerializableMixin, RunnableMixin["BaseTool"], ABC): """Abstract class for all tools to inherit from for. Attributes: name: Tool name. input_memory: TaskMemory available in tool activities. Gets automatically set if None. output_memory: TaskMemory that activities write to be default. Gets automatically set if None. install_dependencies_on_init: Determines whether dependencies from the tool requirements.txt file are installed in init. dependencies_install_directory: Custom dependency install directory. verbose: Determines whether tool operations (such as dependency installation) should be verbose. off_prompt: Determines whether tool activity output goes to the output memory. """ REQUIREMENTS_FILE = "requirements.txt" name: str = field( default=Factory(lambda self: self.__class__.__name__, takes_self=True), kw_only=True, metadata={"serializable": True}, ) input_memory: Optional[list[TaskMemory]] = field(default=None, kw_only=True, metadata={"serializable": True}) output_memory: Optional[dict[str, list[TaskMemory]]] = field( default=None, kw_only=True, metadata={"serializable": True} ) install_dependencies_on_init: bool = field(default=True, kw_only=True, metadata={"serializable": True}) dependencies_install_directory: Optional[str] = field(default=None, kw_only=True, metadata={"serializable": True}) verbose: bool = field(default=False, kw_only=True, metadata={"serializable": True}) off_prompt: bool = field(default=False, kw_only=True, metadata={"serializable": True}) def __attrs_post_init__(self) -> None: if ( self.install_dependencies_on_init and self.has_requirements and not self.are_requirements_met(self.requirements_path) ): self.install_dependencies(os.environ.copy()) @output_memory.validator # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] def validate_output_memory(self, _: Attribute, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None: if output_memory: for activity_name, memory_list in output_memory.items(): if not self.find_activity(activity_name): raise ValueError(f"activity {activity_name} doesn't exist") if memory_list is None: raise ValueError(f"memory list for activity '{activity_name}' can't be None") output_memory_names = [memory.name for memory in memory_list] if len(output_memory_names) > len(set(output_memory_names)): raise ValueError(f"memory names have to be unique in activity '{activity_name}' output") @property def requirements_path(self) -> str: return os.path.join(self.abs_dir_path, self.REQUIREMENTS_FILE) @property def abs_file_path(self) -> str: return os.path.abspath(inspect.getfile(self.__class__)) @property def abs_dir_path(self) -> str: return os.path.dirname(self.abs_file_path) @property def has_requirements(self) -> bool: return os.path.exists(self.requirements_path) # This method has to remain a method and can't be decorated with @property because # of the max depth recursion issue in `self.activities`. def schema(self) -> dict: full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.") return full_schema.json_schema(f"{self.name} ToolAction Schema") def activity_schemas(self) -> list[Schema]: schemas = [] for activity in self.activities(): schema_dict: dict[Literal | schema.Optional, Any] = { Literal("name"): self.name, Literal("path", description=self.activity_description(activity)): self.activity_name(activity), } activity_schema = self.activity_schema(activity) # If no schema is defined, we just make `input` optional instead of omitting it. # This works better with lower-end models that may accidentally pass in an empty dict. if activity_schema is None: schema_dict[schema.Optional("input")] = {} else: schema_dict[Literal("input")] = activity_schema.schema schemas.append(Schema(schema_dict)) return schemas def run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> BaseArtifact: try: output = self.before_run(activity, subtask, action) output = self.try_run(activity, subtask, action, output) output = self.after_run(activity, subtask, action, output) except Exception as e: logging.debug(traceback.format_exc()) output = ErrorArtifact(str(e), exception=e) return output def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> Optional[dict]: super().before_run() return action.input @observable(tags=["Tool.run()"]) def try_run( self, activity: Callable, subtask: ActionsSubtask, action: ToolAction, value: Optional[dict], ) -> BaseArtifact: activity_result = activity(deepcopy(value)) if isinstance(activity_result, BaseArtifact): result = activity_result else: logging.warning("Activity result is not an artifact; converting result to InfoArtifact") if activity_result is None: result = InfoArtifact("Tool returned an empty value") else: result = InfoArtifact(activity_result) return result def after_run( self, activity: Callable, subtask: ActionsSubtask, action: ToolAction, value: BaseArtifact, ) -> BaseArtifact: super().after_run() if self.output_memory: output_memories = self.output_memory[getattr(activity, "name")] or [] for memory in output_memories: value = memory.process_output(activity, subtask, value) return value return value def validate(self) -> bool: if not os.path.exists(self.requirements_path): raise Exception(f"{self.REQUIREMENTS_FILE} not found") return True def tool_dir(self) -> str: class_file = inspect.getfile(self.__class__) return os.path.dirname(os.path.abspath(class_file)) def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None: env = env or {} command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"] if self.dependencies_install_directory is None: command.extend(["-U"]) else: command.extend(["-t", self.dependencies_install_directory]) subprocess.run( command, env=env, cwd=self.tool_dir(), stdout=None if self.verbose else subprocess.DEVNULL, stderr=None if self.verbose else subprocess.DEVNULL, check=False, ) def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]: if self.input_memory: return next((m for m in self.input_memory if m.name == memory_name), None) return None def to_native_tool_name(self, activity: Callable) -> str: """Converts a Tool's name and an Activity into to a native tool name. The native tool name is a combination of the Tool's name and the Activity's name. The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores. Args: activity: Activity to convert Returns: str: Native tool name. """ tool_name = self.name if re.match(r"^[a-zA-Z0-9]+$", tool_name) is None: raise ValueError("Tool name can only contain letters and numbers.") activity_name = self.activity_name(activity) if re.match(r"^[a-zA-Z0-9_]+$", activity_name) is None: raise ValueError("Activity name can only contain letters, numbers, and underscores.") return f"{tool_name}_{activity_name}" def are_requirements_met(self, requirements_path: str) -> bool: requirements = Path(requirements_path).read_text().splitlines() try: for requirement in requirements: importlib.metadata.version(requirement) return True except importlib.metadata.PackageNotFoundError: return False
REQUIREMENTS_FILE = 'requirements.txt'class-attribute instance-attributeabs_dir_pathpropertyabs_file_pathpropertydependencies_install_directory = field(default=None, kw_only=True, metadata={'serializable': True})class-attribute instance-attributehas_requirementspropertyinput_memory = field(default=None, kw_only=True, metadata={'serializable': True})class-attribute instance-attributeinstall_dependencies_on_init = field(default=True, kw_only=True, metadata={'serializable': True})class-attribute instance-attributename = field(default=Factory(lambda self: self.__class__.__name__, takes_self=True), kw_only=True, metadata={'serializable': True})class-attribute instance-attributeoff_prompt = field(default=False, kw_only=True, metadata={'serializable': True})class-attribute instance-attributeoutput_memory = field(default=None, kw_only=True, metadata={'serializable': True})class-attribute instance-attributerequirements_pathpropertyverbose = field(default=False, kw_only=True, metadata={'serializable': True})class-attribute instance-attribute
attrs_post_init()
Source Code in griptape/tools/base_tool.py
def __attrs_post_init__(self) -> None: if ( self.install_dependencies_on_init and self.has_requirements and not self.are_requirements_met(self.requirements_path) ): self.install_dependencies(os.environ.copy())
activity_schemas()
Source Code in griptape/tools/base_tool.py
def activity_schemas(self) -> list[Schema]: schemas = [] for activity in self.activities(): schema_dict: dict[Literal | schema.Optional, Any] = { Literal("name"): self.name, Literal("path", description=self.activity_description(activity)): self.activity_name(activity), } activity_schema = self.activity_schema(activity) # If no schema is defined, we just make `input` optional instead of omitting it. # This works better with lower-end models that may accidentally pass in an empty dict. if activity_schema is None: schema_dict[schema.Optional("input")] = {} else: schema_dict[Literal("input")] = activity_schema.schema schemas.append(Schema(schema_dict)) return schemas
after_run(activity, subtask, action, value)
Source Code in griptape/tools/base_tool.py
def after_run( self, activity: Callable, subtask: ActionsSubtask, action: ToolAction, value: BaseArtifact, ) -> BaseArtifact: super().after_run() if self.output_memory: output_memories = self.output_memory[getattr(activity, "name")] or [] for memory in output_memories: value = memory.process_output(activity, subtask, value) return value return value
are_requirements_met(requirements_path)
Source Code in griptape/tools/base_tool.py
def are_requirements_met(self, requirements_path: str) -> bool: requirements = Path(requirements_path).read_text().splitlines() try: for requirement in requirements: importlib.metadata.version(requirement) return True except importlib.metadata.PackageNotFoundError: return False
before_run(activity, subtask, action)
Source Code in griptape/tools/base_tool.py
def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> Optional[dict]: super().before_run() return action.input
find_input_memory(memory_name)
Source Code in griptape/tools/base_tool.py
def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]: if self.input_memory: return next((m for m in self.input_memory if m.name == memory_name), None) return None
install_dependencies(env=None)
Source Code in griptape/tools/base_tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None: env = env or {} command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"] if self.dependencies_install_directory is None: command.extend(["-U"]) else: command.extend(["-t", self.dependencies_install_directory]) subprocess.run( command, env=env, cwd=self.tool_dir(), stdout=None if self.verbose else subprocess.DEVNULL, stderr=None if self.verbose else subprocess.DEVNULL, check=False, )
run(activity, subtask, action)
Source Code in griptape/tools/base_tool.py
def run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> BaseArtifact: try: output = self.before_run(activity, subtask, action) output = self.try_run(activity, subtask, action, output) output = self.after_run(activity, subtask, action, output) except Exception as e: logging.debug(traceback.format_exc()) output = ErrorArtifact(str(e), exception=e) return output
schema()
Source Code in griptape/tools/base_tool.py
def schema(self) -> dict: full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.") return full_schema.json_schema(f"{self.name} ToolAction Schema")
to_native_tool_name(activity)
Converts a Tool's name and an Activity into to a native tool name.
The native tool name is a combination of the Tool's name and the Activity's name. The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.
Parameters
| Name | Type | Description | Default | 
|---|---|---|---|
activity | Callable | Activity to convert | required | 
Returns
| Name | Type | Description | 
|---|---|---|
str | str | Native tool name. | 
Source Code in griptape/tools/base_tool.py
def to_native_tool_name(self, activity: Callable) -> str: """Converts a Tool's name and an Activity into to a native tool name. The native tool name is a combination of the Tool's name and the Activity's name. The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores. Args: activity: Activity to convert Returns: str: Native tool name. """ tool_name = self.name if re.match(r"^[a-zA-Z0-9]+$", tool_name) is None: raise ValueError("Tool name can only contain letters and numbers.") activity_name = self.activity_name(activity) if re.match(r"^[a-zA-Z0-9_]+$", activity_name) is None: raise ValueError("Activity name can only contain letters, numbers, and underscores.") return f"{tool_name}_{activity_name}"
tool_dir()
Source Code in griptape/tools/base_tool.py
def tool_dir(self) -> str: class_file = inspect.getfile(self.__class__) return os.path.dirname(os.path.abspath(class_file))
try_run(activity, subtask, action, value)
Source Code in griptape/tools/base_tool.py
@observable(tags=["Tool.run()"]) def try_run( self, activity: Callable, subtask: ActionsSubtask, action: ToolAction, value: Optional[dict], ) -> BaseArtifact: activity_result = activity(deepcopy(value)) if isinstance(activity_result, BaseArtifact): result = activity_result else: logging.warning("Activity result is not an artifact; converting result to InfoArtifact") if activity_result is None: result = InfoArtifact("Tool returned an empty value") else: result = InfoArtifact(activity_result) return result
validate()
Source Code in griptape/tools/base_tool.py
def validate(self) -> bool: if not os.path.exists(self.requirements_path): raise Exception(f"{self.REQUIREMENTS_FILE} not found") return True
validateoutput_memory(, output_memory)
Source Code in griptape/tools/base_tool.py
@output_memory.validator # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] def validate_output_memory(self, _: Attribute, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None: if output_memory: for activity_name, memory_list in output_memory.items(): if not self.find_activity(activity_name): raise ValueError(f"activity {activity_name} doesn't exist") if memory_list is None: raise ValueError(f"memory list for activity '{activity_name}' can't be None") output_memory_names = [memory.name for memory in memory_list] if len(output_memory_names) > len(set(output_memory_names)): raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")
CalculatorTool
Bases:
BaseTool
Source Code in griptape/tools/calculator/tool.py
class CalculatorTool(BaseTool): @activity( config={ "description": "Can be used for computing simple numerical or algebraic calculations in Python", "schema": Schema( { Literal( "expression", description="Arithmetic expression parsable in pure Python. Single line only. " "Don't use variables. Don't use any imports or external libraries", ): str, }, ), }, ) def calculate(self, params: dict) -> BaseArtifact: import numexpr # pyright: ignore[reportMissingImports] try: expression = params["values"]["expression"] return TextArtifact(numexpr.evaluate(expression)) except Exception as e: return ErrorArtifact(f"error calculating: {e}")
calculate(params)
Source Code in griptape/tools/calculator/tool.py
@activity( config={ "description": "Can be used for computing simple numerical or algebraic calculations in Python", "schema": Schema( { Literal( "expression", description="Arithmetic expression parsable in pure Python. Single line only. " "Don't use variables. Don't use any imports or external libraries", ): str, }, ), }, ) def calculate(self, params: dict) -> BaseArtifact: import numexpr # pyright: ignore[reportMissingImports] try: expression = params["values"]["expression"] return TextArtifact(numexpr.evaluate(expression)) except Exception as e: return ErrorArtifact(f"error calculating: {e}")
ComputerTool
Bases:
BaseTool
Source Code in griptape/tools/computer/tool.py
@define class ComputerTool(BaseTool): local_workdir: Optional[str] = field(default=None, kw_only=True) container_workdir: str = field(default="/griptape", kw_only=True) env_vars: dict = field(factory=dict, kw_only=True) dockerfile_path: str = field( default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/Dockerfile')}", takes_self=True), kw_only=True, ) requirements_txt_path: str = field( default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/requirements.txt')}", takes_self=True), kw_only=True, ) docker_client: DockerClient = field( default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True, ) _tempdir: Optional[tempfile.TemporaryDirectory] = field(default=None, kw_only=True) def __attrs_post_init__(self) -> None: super().__attrs_post_init__() if self.local_workdir: Path(self.local_workdir).mkdir(parents=True, exist_ok=True) else: self._tempdir = tempfile.TemporaryDirectory() self.local_workdir = self._tempdir.name @docker_client.validator # pyright: ignore[reportAttributeAccessIssue] def validate_docker_client(self, _: Attribute, docker_client: DockerClient) -> None: if not docker_client: raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running") def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None: super().install_dependencies(env) self.remove_existing_container(self.container_name(self)) self.build_image(self) @activity( config={ "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze" " files in the file system. If you need to use code output use `print` statements. " "You have access to the following external Python libraries: " "{{ _self.dependencies() }}", "schema": Schema( { Literal("code", description="Python code to execute"): str, Literal( "filename", description="name of the file to put the Python code in before executing it", ): str, }, ), }, ) def execute_code(self, params: dict) -> BaseArtifact: code = params["values"]["code"] filename = params["values"]["filename"] return self.execute_code_in_container(filename, code) @activity( config={ "description": "Can be used to execute shell commands in Linux", "schema": Schema({Literal("command", description="shell command to execute"): str}), }, ) def execute_command(self, params: dict) -> BaseArtifact: command = params["values"]["command"] return self.execute_command_in_container(command) def execute_command_in_container(self, command: str) -> BaseArtifact: from docker.models.containers import Container try: binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}} if self.local_workdir else None container = self.docker_client.containers.run( # pyright: ignore[reportCallIssue] self.image_name(self), environment=self.env_vars, command=command, name=self.container_name(self), volumes=binds, # pyright: ignore[reportArgumentType] According to the [docs](https://docker-py.readthedocs.io/en/stable/containers.html), the type of `volumes` is dict[str, dict[str, str]]. stdout=True, stderr=True, detach=True, ) if isinstance(container, Container): container.wait() stderr = container.logs(stdout=False, stderr=True).decode().strip() stdout = container.logs(stdout=True, stderr=False).decode().strip() container.stop() container.remove() if stderr: return ErrorArtifact(stderr) return TextArtifact(stdout) return ErrorArtifact("error running container") except Exception as e: return ErrorArtifact(f"error executing command: {e}") def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact: container_file_path = os.path.join(self.container_workdir, filename) if self.local_workdir: tempdir = None local_workdir = self.local_workdir else: tempdir = tempfile.TemporaryDirectory() local_workdir = tempdir.name local_file_path = os.path.join(local_workdir, filename) try: Path(local_file_path).write_text(code) return self.execute_command_in_container(f"python {container_file_path}") except Exception as e: return ErrorArtifact(f"error executing code: {e}") finally: if tempdir: tempdir.cleanup() def default_docker_client(self) -> Optional[DockerClient]: import docker try: return docker.from_env() except Exception as e: logging.exception(e) return None def image_name(self, tool: BaseTool) -> str: import stringcase # pyright: ignore[reportMissingImports] return f"{stringcase.snakecase(tool.name)}_image" def container_name(self, tool: BaseTool) -> str: import stringcase # pyright: ignore[reportMissingImports] return f"{stringcase.snakecase(tool.name)}_container" def remove_existing_container(self, name: str) -> None: from docker.errors import NotFound from docker.models.containers import Container try: existing_container = self.docker_client.containers.get(name) if isinstance(existing_container, Container): existing_container.remove(force=True) logging.info("Removed existing container %s", name) except NotFound: pass def build_image(self, tool: BaseTool) -> None: with tempfile.TemporaryDirectory() as temp_dir: shutil.copy(self.dockerfile_path, temp_dir) shutil.copy(self.requirements_txt_path, temp_dir) image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True) if isinstance(image, tuple): logging.info("Built image: %s", image[0].short_id) def dependencies(self) -> list[str]: with open(self.requirements_txt_path) as file: return [line.strip() for line in file] def __del__(self) -> None: if self._tempdir: self._tempdir.cleanup()
_tempdir = field(default=None, kw_only=True)class-attribute instance-attributecontainer_workdir = field(default='/griptape', kw_only=True)class-attribute instance-attributedocker_client = field(default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True)class-attribute instance-attributedockerfile_path = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/Dockerfile')}', takes_self=True), kw_only=True)class-attribute instance-attributeenv_vars = field(factory=dict, kw_only=True)class-attribute instance-attributelocal_workdir = field(default=None, kw_only=True)class-attribute instance-attributerequirements_txt_path = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/requirements.txt')}', takes_self=True), kw_only=True)class-attribute instance-attribute
attrs_post_init()
Source Code in griptape/tools/computer/tool.py
def __attrs_post_init__(self) -> None: super().__attrs_post_init__() if self.local_workdir: Path(self.local_workdir).mkdir(parents=True, exist_ok=True) else: self._tempdir = tempfile.TemporaryDirectory() self.local_workdir = self._tempdir.name
del()
Source Code in griptape/tools/computer/tool.py
def __del__(self) -> None: if self._tempdir: self._tempdir.cleanup()
build_image(tool)
Source Code in griptape/tools/computer/tool.py
def build_image(self, tool: BaseTool) -> None: with tempfile.TemporaryDirectory() as temp_dir: shutil.copy(self.dockerfile_path, temp_dir) shutil.copy(self.requirements_txt_path, temp_dir) image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True) if isinstance(image, tuple): logging.info("Built image: %s", image[0].short_id)
container_name(tool)
Source Code in griptape/tools/computer/tool.py
def container_name(self, tool: BaseTool) -> str: import stringcase # pyright: ignore[reportMissingImports] return f"{stringcase.snakecase(tool.name)}_container"
default_docker_client()
Source Code in griptape/tools/computer/tool.py
def default_docker_client(self) -> Optional[DockerClient]: import docker try: return docker.from_env() except Exception as e: logging.exception(e) return None
dependencies()
Source Code in griptape/tools/computer/tool.py
def dependencies(self) -> list[str]: with open(self.requirements_txt_path) as file: return [line.strip() for line in file]
execute_code(params)
Source Code in griptape/tools/computer/tool.py
@activity( config={ "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze" " files in the file system. If you need to use code output use `print` statements. " "You have access to the following external Python libraries: " "{{ _self.dependencies() }}", "schema": Schema( { Literal("code", description="Python code to execute"): str, Literal( "filename", description="name of the file to put the Python code in before executing it", ): str, }, ), }, ) def execute_code(self, params: dict) -> BaseArtifact: code = params["values"]["code"] filename = params["values"]["filename"] return self.execute_code_in_container(filename, code)
execute_code_in_container(filename, code)
Source Code in griptape/tools/computer/tool.py
def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact: container_file_path = os.path.join(self.container_workdir, filename) if self.local_workdir: tempdir = None local_workdir = self.local_workdir else: tempdir = tempfile.TemporaryDirectory() local_workdir = tempdir.name local_file_path = os.path.join(local_workdir, filename) try: Path(local_file_path).write_text(code) return self.execute_command_in_container(f"python {container_file_path}") except Exception as e: return ErrorArtifact(f"error executing code: {e}") finally: if tempdir: tempdir.cleanup()
execute_command(params)
Source Code in griptape/tools/computer/tool.py
@activity( config={ "description": "Can be used to execute shell commands in Linux", "schema": Schema({Literal("command", description="shell command to execute"): str}), }, ) def execute_command(self, params: dict) -> BaseArtifact: command = params["values"]["command"] return self.execute_command_in_container(command)
execute_command_in_container(command)
Source Code in griptape/tools/computer/tool.py
def execute_command_in_container(self, command: str) -> BaseArtifact: from docker.models.containers import Container try: binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}} if self.local_workdir else None container = self.docker_client.containers.run( # pyright: ignore[reportCallIssue] self.image_name(self), environment=self.env_vars, command=command, name=self.container_name(self), volumes=binds, # pyright: ignore[reportArgumentType] According to the [docs](https://docker-py.readthedocs.io/en/stable/containers.html), the type of `volumes` is dict[str, dict[str, str]]. stdout=True, stderr=True, detach=True, ) if isinstance(container, Container): container.wait() stderr = container.logs(stdout=False, stderr=True).decode().strip() stdout = container.logs(stdout=True, stderr=False).decode().strip() container.stop() container.remove() if stderr: return ErrorArtifact(stderr) return TextArtifact(stdout) return ErrorArtifact("error running container") except Exception as e: return ErrorArtifact(f"error executing command: {e}")
image_name(tool)
Source Code in griptape/tools/computer/tool.py
def image_name(self, tool: BaseTool) -> str: import stringcase # pyright: ignore[reportMissingImports] return f"{stringcase.snakecase(tool.name)}_image"
install_dependencies(env=None)
Source Code in griptape/tools/computer/tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None: super().install_dependencies(env) self.remove_existing_container(self.container_name(self)) self.build_image(self)
remove_existing_container(name)
Source Code in griptape/tools/computer/tool.py
def remove_existing_container(self, name: str) -> None: from docker.errors import NotFound from docker.models.containers import Container try: existing_container = self.docker_client.containers.get(name) if isinstance(existing_container, Container): existing_container.remove(force=True) logging.info("Removed existing container %s", name) except NotFound: pass
validatedocker_client(, docker_client)
Source Code in griptape/tools/computer/tool.py
@docker_client.validator # pyright: ignore[reportAttributeAccessIssue] def validate_docker_client(self, _: Attribute, docker_client: DockerClient) -> None: if not docker_client: raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")
DateTimeTool
Bases:
BaseTool
Source Code in griptape/tools/date_time/tool.py
@define class DateTimeTool(BaseTool): denylist: list[str] = field(default=Factory(lambda: ["get_relative_datetime"]), kw_only=True) @activity(config={"description": "Can be used to return current date and time."}) def get_current_datetime(self) -> BaseArtifact: return TextArtifact(str(datetime.now())) @activity( config={ "description": "Can be used to return a relative date and time.", "schema": schema.Schema( { schema.Literal( "relative_date_string", description='Relative date in English compatible with the dateparser library. For example, "now EST", "20 minutes ago", ' '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"', ): str, }, ), }, ) def get_relative_datetime(self, params: dict) -> BaseArtifact: from dateparser import parse try: date_string = params["values"]["relative_date_string"] relative_datetime = parse(date_string) if relative_datetime: return TextArtifact(str(relative_datetime)) return ErrorArtifact("invalid date string") except Exception as e: return ErrorArtifact(f"error getting current datetime: {e}") @activity( config={ "description": "Can be used to add a timedelta to a datetime.", "schema": schema.Schema( { schema.Literal( "timedelta_kwargs", description='A dictionary of keyword arguments to pass to the timedelta function. For example, {"days": -1, "hours": 2}', ): dict, schema.Optional( schema.Literal( "iso_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00". Defaults to the current datetime if not provided.', ) ): str, }, ), }, ) def add_timedelta(self, timedelta_kwargs: dict, iso_datetime: Optional[str] = None) -> BaseArtifact: if iso_datetime is None: iso_datetime = datetime.now().isoformat() return TextArtifact((datetime.fromisoformat(iso_datetime) + timedelta(**timedelta_kwargs)).isoformat()) @activity( config={ "description": "Can be used to calculate the difference between two datetimes. The difference is calculated as end_datetime - start_datetime.", "schema": schema.Schema( { schema.Literal( "start_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00"', ): str, schema.Literal( "end_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-02T00:00:00"', ): str, } ), } ) def get_datetime_diff(self, start_datetime: str, end_datetime: str) -> BaseArtifact: return TextArtifact(str(datetime.fromisoformat(end_datetime) - datetime.fromisoformat(start_datetime)))
denylist = field(default=Factory(lambda: ['get_relative_datetime']), kw_only=True)class-attribute instance-attribute
add_timedelta(timedelta_kwargs, iso_datetime=None)
Source Code in griptape/tools/date_time/tool.py
@activity( config={ "description": "Can be used to add a timedelta to a datetime.", "schema": schema.Schema( { schema.Literal( "timedelta_kwargs", description='A dictionary of keyword arguments to pass to the timedelta function. For example, {"days": -1, "hours": 2}', ): dict, schema.Optional( schema.Literal( "iso_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00". Defaults to the current datetime if not provided.', ) ): str, }, ), }, ) def add_timedelta(self, timedelta_kwargs: dict, iso_datetime: Optional[str] = None) -> BaseArtifact: if iso_datetime is None: iso_datetime = datetime.now().isoformat() return TextArtifact((datetime.fromisoformat(iso_datetime) + timedelta(**timedelta_kwargs)).isoformat())
get_current_datetime()
Source Code in griptape/tools/date_time/tool.py
@activity(config={"description": "Can be used to return current date and time."}) def get_current_datetime(self) -> BaseArtifact: return TextArtifact(str(datetime.now()))
get_datetime_diff(start_datetime, end_datetime)
Source Code in griptape/tools/date_time/tool.py
@activity( config={ "description": "Can be used to calculate the difference between two datetimes. The difference is calculated as end_datetime - start_datetime.", "schema": schema.Schema( { schema.Literal( "start_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00"', ): str, schema.Literal( "end_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-02T00:00:00"', ): str, } ), } ) def get_datetime_diff(self, start_datetime: str, end_datetime: str) -> BaseArtifact: return TextArtifact(str(datetime.fromisoformat(end_datetime) - datetime.fromisoformat(start_datetime)))
get_relative_datetime(params)
Source Code in griptape/tools/date_time/tool.py
@activity( config={ "description": "Can be used to return a relative date and time.", "schema": schema.Schema( { schema.Literal( "relative_date_string", description='Relative date in English compatible with the dateparser library. For example, "now EST", "20 minutes ago", ' '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"', ): str, }, ), }, ) def get_relative_datetime(self, params: dict) -> BaseArtifact: from dateparser import parse try: date_string = params["values"]["relative_date_string"] relative_datetime = parse(date_string) if relative_datetime: return TextArtifact(str(relative_datetime)) return ErrorArtifact("invalid date string") except Exception as e: return ErrorArtifact(f"error getting current datetime: {e}")
EmailTool
Bases:
BaseTool
Attributes
| Name | Type | Description | 
|---|---|---|
username | Optional[str] | Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol. Example: bender@futurama.com. | 
password | Optional[str] | Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail, this would be an App Password. | 
email_max_retrieve_count | Optional[int] | Used to limit the number of messages retrieved during any given activities. | 
smtp_host | Optional[str] | Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the send activity. | 
smtp_port | Optional[int] | Port of the SMTP server. Example: 465. Required when using the send activity. | 
smtp_use_ssl | bool | Whether to use SSL when sending email via the SMTP protocol. | 
smtp_user | Optional[str] | Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the send activity. | 
smtp_password | Optional[str] | Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the send activity. | 
imap_url | Optional[str] | Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the retrieve activity. | 
imap_user | Optional[str] | Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the retrieve activity. | 
imap_password | Optional[str] | Password to retrieve email via the IMAP protocol. Overrides password for IMAP only. Required when using the retrieve activity. | 
mailboxes | Optional[dict[str, Optional[str]]] | Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the retrieve activity. Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'} | 
email_loader | EmailLoader | Instance of EmailLoader. | 
Source Code in griptape/tools/email/tool.py
@define class EmailTool(BaseTool): """Tool for working with email. Attributes: username: Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol. Example: bender@futurama.com. password: Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail, this would be an App Password. email_max_retrieve_count: Used to limit the number of messages retrieved during any given activities. smtp_host: Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the `send` activity. smtp_port: Port of the SMTP server. Example: 465. Required when using the `send` activity. smtp_use_ssl: Whether to use SSL when sending email via the SMTP protocol. smtp_user: Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the `send` activity. smtp_password: Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the `send` activity. imap_url: Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the `retrieve` activity. imap_user: Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the `retrieve` activity. imap_password: Password to retrieve email via the IMAP protocol. Overrides password for IMAP only. Required when using the `retrieve` activity. mailboxes: Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the `retrieve` activity. Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'} email_loader: Instance of `EmailLoader`. """ username: Optional[str] = field(default=None, kw_only=True) password: Optional[str] = field(default=None, kw_only=True) email_max_retrieve_count: Optional[int] = field(default=None, kw_only=True) smtp_host: Optional[str] = field(default=None, kw_only=True) smtp_port: Optional[int] = field(default=None, kw_only=True) smtp_use_ssl: bool = field(default=True, kw_only=True) smtp_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) smtp_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) imap_url: Optional[str] = field(default=None, kw_only=True) imap_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) imap_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) mailboxes: Optional[dict[str, Optional[str]]] = field(default=None, kw_only=True) email_loader: EmailLoader = field( default=Factory( lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password), takes_self=True, ), kw_only=True, ) @activity( config={ "description": "Can be used to retrieve emails." "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}", "schema": Schema( { Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str, schema.Optional( Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'"), ): str, schema.Optional( Literal("search_criteria", description="Optional search criteria to filter emails by key"), ): str, schema.Optional(Literal("max_count", description="Optional max email count")): int, }, ), }, ) def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact: if self.mailboxes is None: return ErrorArtifact("mailboxes is required") values = params["values"] max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count return self.email_loader.load( EmailLoader.EmailQuery( label=values["label"], key=values.get("key"), search_criteria=values.get("search_criteria"), max_count=max_count, ), ) @activity( config={ "description": "Can be used to send emails", "schema": Schema( { Literal("to", description="Recipient's email address"): str, Literal("subject", description="Email subject"): str, Literal("body", description="Email body"): str, }, ), }, ) def send(self, params: dict) -> InfoArtifact | ErrorArtifact: values = params["values"] if self.smtp_user is None: return ErrorArtifact("smtp_user is required") if self.smtp_password is None: return ErrorArtifact("smtp_password is required") if self.smtp_host is None: return ErrorArtifact("smtp_host is required") if self.smtp_port is None: return ErrorArtifact("smtp_port is required") msg = MIMEText(values["body"]) msg["Subject"] = values["subject"] msg["From"] = self.smtp_user msg["To"] = values["to"] try: with self._create_smtp_client(self.smtp_host, self.smtp_port) as client: client.login(self.smtp_user, self.smtp_password) client.sendmail(msg["From"], [msg["To"]], msg.as_string()) return InfoArtifact("email was successfully sent") except Exception as e: logging.exception(e) return ErrorArtifact(f"error sending email: {e}") def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL: if self.smtp_use_ssl: return smtplib.SMTP_SSL(smtp_host, smtp_port) return smtplib.SMTP(smtp_host, smtp_port)
email_loader = field(default=Factory(lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password), takes_self=True), kw_only=True)class-attribute instance-attributeemail_max_retrieve_count = field(default=None, kw_only=True)class-attribute instance-attributeimap_password = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)class-attribute instance-attributeimap_url = field(default=None, kw_only=True)class-attribute instance-attributeimap_user = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)class-attribute instance-attributemailboxes = field(default=None, kw_only=True)class-attribute instance-attributepassword = field(default=None, kw_only=True)class-attribute instance-attributesmtp_host = field(default=None, kw_only=True)class-attribute instance-attributesmtp_password = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)class-attribute instance-attributesmtp_port = field(default=None, kw_only=True)class-attribute instance-attributesmtp_use_ssl = field(default=True, kw_only=True)class-attribute instance-attributesmtp_user = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)class-attribute instance-attributeusername = field(default=None, kw_only=True)class-attribute instance-attribute
_create_smtp_client(smtp_host, smtp_port)
Source Code in griptape/tools/email/tool.py
def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL: if self.smtp_use_ssl: return smtplib.SMTP_SSL(smtp_host, smtp_port) return smtplib.SMTP(smtp_host, smtp_port)
retrieve(params)
Source Code in griptape/tools/email/tool.py
@activity( config={ "description": "Can be used to retrieve emails." "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}", "schema": Schema( { Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str, schema.Optional( Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'"), ): str, schema.Optional( Literal("search_criteria", description="Optional search criteria to filter emails by key"), ): str, schema.Optional(Literal("max_count", description="Optional max email count")): int, }, ), }, ) def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact: if self.mailboxes is None: return ErrorArtifact("mailboxes is required") values = params["values"] max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count return self.email_loader.load( EmailLoader.EmailQuery( label=values["label"], key=values.get("key"), search_criteria=values.get("search_criteria"), max_count=max_count, ), )
send(params)
Source Code in griptape/tools/email/tool.py
@activity( config={ "description": "Can be used to send emails", "schema": Schema( { Literal("to", description="Recipient's email address"): str, Literal("subject", description="Email subject"): str, Literal("body", description="Email body"): str, }, ), }, ) def send(self, params: dict) -> InfoArtifact | ErrorArtifact: values = params["values"] if self.smtp_user is None: return ErrorArtifact("smtp_user is required") if self.smtp_password is None: return ErrorArtifact("smtp_password is required") if self.smtp_host is None: return ErrorArtifact("smtp_host is required") if self.smtp_port is None: return ErrorArtifact("smtp_port is required") msg = MIMEText(values["body"]) msg["Subject"] = values["subject"] msg["From"] = self.smtp_user msg["To"] = values["to"] try: with self._create_smtp_client(self.smtp_host, self.smtp_port) as client: client.login(self.smtp_user, self.smtp_password) client.sendmail(msg["From"], [msg["To"]], msg.as_string()) return InfoArtifact("email was successfully sent") except Exception as e: logging.exception(e) return ErrorArtifact(f"error sending email: {e}")
ExtractionTool
Attributes
| Name | Type | Description | 
|---|---|---|
extraction_engine | BaseExtractionEngine | ExtractionEngine. | 
Source Code in griptape/tools/extraction/tool.py
@define(kw_only=True) class ExtractionTool(BaseTool, RuleMixin): """Tool for using an Extraction Engine. Attributes: extraction_engine: `ExtractionEngine`. """ extraction_engine: BaseExtractionEngine = field() @activity( config={ "description": "Can be used extract structured text from data.", "schema": Schema( { Literal("data"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def extract(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact: data = params["values"]["data"] if isinstance(data, str): artifacts = ListArtifact([TextArtifact(data)]) else: memory = self.find_input_memory(data["memory_name"]) artifact_namespace = data["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") return self.extraction_engine.extract_artifacts(artifacts)
extraction_engine = field()class-attribute instance-attribute
extract(params)
Source Code in griptape/tools/extraction/tool.py
@activity( config={ "description": "Can be used extract structured text from data.", "schema": Schema( { Literal("data"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def extract(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact: data = params["values"]["data"] if isinstance(data, str): artifacts = ListArtifact([TextArtifact(data)]) else: memory = self.find_input_memory(data["memory_name"]) artifact_namespace = data["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") return self.extraction_engine.extract_artifacts(artifacts)
FileManagerTool
Bases:
BaseTool
Attributes
| Name | Type | Description | 
|---|---|---|
file_manager_driver | BaseFileManagerDriver | File Manager Driver to use to list, load, and save files. | 
Source Code in griptape/tools/file_manager/tool.py
@define class FileManagerTool(BaseTool): """FileManagerTool is a tool that can be used to list, load, and save files. Attributes: file_manager_driver: File Manager Driver to use to list, load, and save files. """ file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True) loaders: dict[str, loaders.BaseLoader] = field( default=Factory( lambda self: { "application/pdf": loaders.PdfLoader(file_manager_driver=self.file_manager_driver), "text/csv": loaders.CsvLoader(file_manager_driver=self.file_manager_driver), "text": loaders.TextLoader(file_manager_driver=self.file_manager_driver), "image": loaders.ImageLoader(file_manager_driver=self.file_manager_driver), "application/octet-stream": BlobLoader(file_manager_driver=self.file_manager_driver), }, takes_self=True, ), kw_only=True, ) @activity( config={ "description": "Can be used to list files on disk", "schema": Schema( {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str}, ), }, ) def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: path = params["values"]["path"] return self.file_manager_driver.list_files(path) @activity( config={ "description": "Can be used to load files from disk", "schema": Schema( { Literal( "paths", description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']", ): Schema([str]), }, ), }, ) def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact: paths = params["values"]["paths"] artifacts = [] for path in paths: # Fetch the file to try and determine the appropriate loader abs_path = os.path.join(self.file_manager_driver.workdir, path) mime_type = get_mime_type(abs_path) loader = next((loader for key, loader in self.loaders.items() if mime_type.startswith(key))) artifact = loader.load(path) if isinstance(artifact, ListArtifact): artifacts.extend(artifact.value) else: artifacts.append(artifact) return ListArtifact(artifacts) @activity( config={ "description": "Can be used to save memory artifacts to disk", "schema": Schema( { Literal( "dir_name", description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'", ): str, Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str, "memory_name": str, "artifact_namespace": str, }, ), }, ) def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact: dir_name = params["values"]["dir_name"] file_name = params["values"]["file_name"] memory_name = params["values"]["memory_name"] artifact_namespace = params["values"]["artifact_namespace"] memory = self.find_input_memory(params["values"]["memory_name"]) if not memory: return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found") list_artifact = memory.load_artifacts(artifact_namespace) if len(list_artifact) == 0: return ErrorArtifact( f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts", ) for artifact in list_artifact.value: formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name try: value = artifact.value if isinstance(artifact.value, (str, bytes)) else artifact.to_text() self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), value) except FileNotFoundError: return ErrorArtifact("Path not found") except IsADirectoryError: return ErrorArtifact("Path is a directory") except NotADirectoryError: return ErrorArtifact("Not a directory") except Exception as e: return ErrorArtifact(f"Failed to load file: {e!s}") return InfoArtifact("Successfully saved memory artifacts to disk") @activity( config={ "description": "Can be used to save content to a file", "schema": Schema( { Literal( "path", description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'", ): str, "content": str, }, ), }, ) def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact: path = params["values"]["path"] content = params["values"]["content"] return self.file_manager_driver.save_file(path, content)
file_manager_driver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True)class-attribute instance-attributeloaders = field(default=Factory(lambda self: {'application/pdf': loaders.PdfLoader(file_manager_driver=self.file_manager_driver), 'text/csv': loaders.CsvLoader(file_manager_driver=self.file_manager_driver), 'text': loaders.TextLoader(file_manager_driver=self.file_manager_driver), 'image': loaders.ImageLoader(file_manager_driver=self.file_manager_driver), 'application/octet-stream': BlobLoader(file_manager_driver=self.file_manager_driver)}, takes_self=True), kw_only=True)class-attribute instance-attribute
list_files_from_disk(params)
Source Code in griptape/tools/file_manager/tool.py
@activity( config={ "description": "Can be used to list files on disk", "schema": Schema( {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str}, ), }, ) def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: path = params["values"]["path"] return self.file_manager_driver.list_files(path)
load_files_from_disk(params)
Source Code in griptape/tools/file_manager/tool.py
@activity( config={ "description": "Can be used to load files from disk", "schema": Schema( { Literal( "paths", description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']", ): Schema([str]), }, ), }, ) def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact: paths = params["values"]["paths"] artifacts = [] for path in paths: # Fetch the file to try and determine the appropriate loader abs_path = os.path.join(self.file_manager_driver.workdir, path) mime_type = get_mime_type(abs_path) loader = next((loader for key, loader in self.loaders.items() if mime_type.startswith(key))) artifact = loader.load(path) if isinstance(artifact, ListArtifact): artifacts.extend(artifact.value) else: artifacts.append(artifact) return ListArtifact(artifacts)
save_content_to_file(params)
Source Code in griptape/tools/file_manager/tool.py
@activity( config={ "description": "Can be used to save content to a file", "schema": Schema( { Literal( "path", description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'", ): str, "content": str, }, ), }, ) def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact: path = params["values"]["path"] content = params["values"]["content"] return self.file_manager_driver.save_file(path, content)
save_memory_artifacts_to_disk(params)
Source Code in griptape/tools/file_manager/tool.py
@activity( config={ "description": "Can be used to save memory artifacts to disk", "schema": Schema( { Literal( "dir_name", description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'", ): str, Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str, "memory_name": str, "artifact_namespace": str, }, ), }, ) def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact: dir_name = params["values"]["dir_name"] file_name = params["values"]["file_name"] memory_name = params["values"]["memory_name"] artifact_namespace = params["values"]["artifact_namespace"] memory = self.find_input_memory(params["values"]["memory_name"]) if not memory: return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found") list_artifact = memory.load_artifacts(artifact_namespace) if len(list_artifact) == 0: return ErrorArtifact( f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts", ) for artifact in list_artifact.value: formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name try: value = artifact.value if isinstance(artifact.value, (str, bytes)) else artifact.to_text() self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), value) except FileNotFoundError: return ErrorArtifact("Path not found") except IsADirectoryError: return ErrorArtifact("Path is a directory") except NotADirectoryError: return ErrorArtifact("Not a directory") except Exception as e: return ErrorArtifact(f"Failed to load file: {e!s}") return InfoArtifact("Successfully saved memory artifacts to disk")
GriptapeCloudToolTool
Bases:
BaseGriptapeCloudTool
Attributes
| Name | Type | Description | 
|---|---|---|
tool_id | str | The ID of the tool to run. | 
Source Code in griptape/tools/griptape_cloud_tool/tool.py
@define() class GriptapeCloudToolTool(BaseGriptapeCloudTool): """Runs a Gen AI Builder hosted Tool. Attributes: tool_id: The ID of the tool to run. """ tool_id: str = field(kw_only=True) def __attrs_post_init__(self) -> None: self._init_activities() def _init_activities(self) -> None: schema = self._get_schema() tool_name, activity_schemas = self._parse_schema(schema) if self.name == self.__class__.__name__: self.name = tool_name for activity_name, (description, activity_schema) in activity_schemas.items(): activity_handler = self._create_activity_handler(activity_name, description, activity_schema) setattr(self, activity_name, MethodType(activity_handler, self)) def _get_schema(self) -> dict: response = requests.get(urljoin(self.base_url, f"/api/tools/{self.tool_id}/openapi"), headers=self.headers) response.raise_for_status() schema = response.json() if not isinstance(schema, dict): raise RuntimeError(f"Invalid schema for tool {self.tool_id}: {schema}") if "error" in schema and "tool_run_id" in schema: raise RuntimeError(f"Failed to retrieve schema for tool {self.tool_id}: {schema['error']}") return schema def _parse_schema(self, schema: dict) -> tuple[str, dict[str, tuple[str, Schema]]]: """Parses an openapi schema into a dictionary of activity names and their respective descriptions + schemas.""" activities = {} name = schema.get("info", {}).get("title") for path, path_info in schema.get("paths", {}).items(): if not path.startswith("/activities"): continue for method, method_info in path_info.items(): if "post" in method.lower(): activity_name = method_info["operationId"] description = method_info.get("description", "") activity_schema = self.__extract_schema_from_ref( schema, method_info.get("requestBody", {}) .get("content", {}) .get("application/json", {}) .get("schema", {}), ) activities[activity_name] = (description, activity_schema) return name, activities def __extract_schema_from_ref(self, schema: dict, schema_ref: dict) -> Schema: """Extracts a schema from a $ref if present, resolving it into native schema properties.""" if "$ref" in schema_ref: # Resolve the reference and retrieve the schema data ref_path = schema_ref["$ref"].split("/")[-1] schema_data = schema["components"]["schemas"].get(ref_path, {}) else: # Use the provided schema directly if no $ref is found schema_data = schema_ref # Convert the schema_data dictionary into a Schema with its properties properties = {} for prop, prop_info in schema_data.get("properties", {}).items(): prop_type = prop_info.get("type", "string") prop_description = prop_info.get("description", "") schema_prop = Literal(prop, description=prop_description) is_optional = prop not in schema_data.get("required", []) if is_optional: schema_prop = SchemaOptional(schema_prop) properties[schema_prop] = self._map_openapi_type_to_python(prop_type, prop_info) return Schema(properties) def _map_openapi_type_to_python( self, openapi_type: str, schema_info: Optional[dict] = None ) -> type | list[type] | Or: """Maps OpenAPI types to native Python types.""" type_mapping = {"string": str, "integer": int, "boolean": bool, "number": float, "object": dict} if openapi_type == "array" and schema_info is not None and "items" in schema_info: enum = schema_info["items"].get("enum") if enum: return enum items_type = schema_info["items"].get("type", "string") return [self._map_openapi_type_to_python(items_type)] # pyright: ignore[reportReturnType] if schema_info is not None and schema_info.get("enum"): return Or(*schema_info["enum"]) return type_mapping.get(openapi_type, str) def _create_activity_handler(self, activity_name: str, description: str, activity_schema: Schema) -> Callable: """Creates an activity handler method for the tool.""" @activity(config={"name": activity_name, "description": description, "schema": activity_schema}) def activity_handler(self: GriptapeCloudToolTool, values: dict) -> Any: return self._run_activity(activity_name, values) return activity_handler def _run_activity(self, activity_name: str, params: dict) -> BaseArtifact: """Runs an activity on the tool with the provided parameters.""" url = urljoin(self.base_url, f"/api/tools/{self.tool_id}/activities/{activity_name}") response = requests.post(url, json=params, headers=self.headers) response.raise_for_status() try: return BaseArtifact.from_dict(response.json()) except ValueError: return TextArtifact(response.text)
tool_id = field(kw_only=True)class-attribute instance-attribute
attrs_post_init()
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def __attrs_post_init__(self) -> None: self._init_activities()
__extract_schema_from_ref(schema, schema_ref)
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def __extract_schema_from_ref(self, schema: dict, schema_ref: dict) -> Schema: """Extracts a schema from a $ref if present, resolving it into native schema properties.""" if "$ref" in schema_ref: # Resolve the reference and retrieve the schema data ref_path = schema_ref["$ref"].split("/")[-1] schema_data = schema["components"]["schemas"].get(ref_path, {}) else: # Use the provided schema directly if no $ref is found schema_data = schema_ref # Convert the schema_data dictionary into a Schema with its properties properties = {} for prop, prop_info in schema_data.get("properties", {}).items(): prop_type = prop_info.get("type", "string") prop_description = prop_info.get("description", "") schema_prop = Literal(prop, description=prop_description) is_optional = prop not in schema_data.get("required", []) if is_optional: schema_prop = SchemaOptional(schema_prop) properties[schema_prop] = self._map_openapi_type_to_python(prop_type, prop_info) return Schema(properties)
_create_activity_handler(activity_name, description, activity_schema)
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _create_activity_handler(self, activity_name: str, description: str, activity_schema: Schema) -> Callable: """Creates an activity handler method for the tool.""" @activity(config={"name": activity_name, "description": description, "schema": activity_schema}) def activity_handler(self: GriptapeCloudToolTool, values: dict) -> Any: return self._run_activity(activity_name, values) return activity_handler
_get_schema()
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _get_schema(self) -> dict: response = requests.get(urljoin(self.base_url, f"/api/tools/{self.tool_id}/openapi"), headers=self.headers) response.raise_for_status() schema = response.json() if not isinstance(schema, dict): raise RuntimeError(f"Invalid schema for tool {self.tool_id}: {schema}") if "error" in schema and "tool_run_id" in schema: raise RuntimeError(f"Failed to retrieve schema for tool {self.tool_id}: {schema['error']}") return schema
_init_activities()
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _init_activities(self) -> None: schema = self._get_schema() tool_name, activity_schemas = self._parse_schema(schema) if self.name == self.__class__.__name__: self.name = tool_name for activity_name, (description, activity_schema) in activity_schemas.items(): activity_handler = self._create_activity_handler(activity_name, description, activity_schema) setattr(self, activity_name, MethodType(activity_handler, self))
_map_openapi_type_to_python(openapi_type, schema_info=None)
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _map_openapi_type_to_python( self, openapi_type: str, schema_info: Optional[dict] = None ) -> type | list[type] | Or: """Maps OpenAPI types to native Python types.""" type_mapping = {"string": str, "integer": int, "boolean": bool, "number": float, "object": dict} if openapi_type == "array" and schema_info is not None and "items" in schema_info: enum = schema_info["items"].get("enum") if enum: return enum items_type = schema_info["items"].get("type", "string") return [self._map_openapi_type_to_python(items_type)] # pyright: ignore[reportReturnType] if schema_info is not None and schema_info.get("enum"): return Or(*schema_info["enum"]) return type_mapping.get(openapi_type, str)
_parse_schema(schema)
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _parse_schema(self, schema: dict) -> tuple[str, dict[str, tuple[str, Schema]]]: """Parses an openapi schema into a dictionary of activity names and their respective descriptions + schemas.""" activities = {} name = schema.get("info", {}).get("title") for path, path_info in schema.get("paths", {}).items(): if not path.startswith("/activities"): continue for method, method_info in path_info.items(): if "post" in method.lower(): activity_name = method_info["operationId"] description = method_info.get("description", "") activity_schema = self.__extract_schema_from_ref( schema, method_info.get("requestBody", {}) .get("content", {}) .get("application/json", {}) .get("schema", {}), ) activities[activity_name] = (description, activity_schema) return name, activities
_run_activity(activity_name, params)
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _run_activity(self, activity_name: str, params: dict) -> BaseArtifact: """Runs an activity on the tool with the provided parameters.""" url = urljoin(self.base_url, f"/api/tools/{self.tool_id}/activities/{activity_name}") response = requests.post(url, json=params, headers=self.headers) response.raise_for_status() try: return BaseArtifact.from_dict(response.json()) except ValueError: return TextArtifact(response.text)
ImageQueryTool
Bases:
BaseTool
Source Code in griptape/tools/image_query/tool.py
@define class ImageQueryTool(BaseTool): prompt_driver: BasePromptDriver = field(kw_only=True) image_loader: ImageLoader = field(default=Factory(lambda: ImageLoader()), kw_only=True) @activity( config={ "description": "This tool can be used to query the contents of images on disk.", "schema": Schema( { Literal( "query", description="A detailed question to be answered using the contents of the provided images.", ): str, Literal("image_paths", description="The paths to an image files on disk."): Schema([str]), }, ), }, ) def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: query = params["values"]["query"] image_paths = params["values"]["image_paths"] image_artifacts = [] for image_path in image_paths: image_artifacts.append(self.image_loader.load(image_path)) return cast( "TextArtifact", self.prompt_driver.run( PromptStack.from_artifact( ListArtifact([TextArtifact(query), *image_artifacts]), ) ).to_artifact(), ) @activity( config={ "description": "This tool can be used to query the contents of images in memory.", "schema": Schema( { Literal( "query", description="A detailed question to be answered using the contents of the provided images.", ): str, Literal("image_artifacts", description="Image artifact memory references."): [ {"image_artifact_namespace": str, "image_artifact_name": str}, ], "memory_name": str, }, ), }, ) def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact: query = params["values"]["query"] image_artifact_references = params["values"]["image_artifacts"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") image_artifacts = [] for image_artifact_reference in image_artifact_references: try: image_artifact = load_artifact_from_memory( memory, image_artifact_reference["image_artifact_namespace"], image_artifact_reference["image_artifact_name"], ImageArtifact, ) image_artifacts.append(cast("ImageArtifact", image_artifact)) except ValueError: # If we're unable to parse the artifact as an ImageArtifact, attempt to # parse a BlobArtifact and load it as an ImageArtifact. blob_artifact = load_artifact_from_memory( memory, image_artifact_reference["image_artifact_namespace"], image_artifact_reference["image_artifact_name"], BlobArtifact, ) image_artifacts.append(self.image_loader.load(blob_artifact.value)) except Exception as e: return ErrorArtifact(str(e)) return cast( "TextArtifact", self.prompt_driver.run( PromptStack.from_artifact( ListArtifact([TextArtifact(query), *image_artifacts]), ) ).to_artifact(), )
image_loader = field(default=Factory(lambda: ImageLoader()), kw_only=True)class-attribute instance-attributeprompt_driver = field(kw_only=True)class-attribute instance-attribute
query_image_from_disk(params)
Source Code in griptape/tools/image_query/tool.py
@activity( config={ "description": "This tool can be used to query the contents of images on disk.", "schema": Schema( { Literal( "query", description="A detailed question to be answered using the contents of the provided images.", ): str, Literal("image_paths", description="The paths to an image files on disk."): Schema([str]), }, ), }, ) def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: query = params["values"]["query"] image_paths = params["values"]["image_paths"] image_artifacts = [] for image_path in image_paths: image_artifacts.append(self.image_loader.load(image_path)) return cast( "TextArtifact", self.prompt_driver.run( PromptStack.from_artifact( ListArtifact([TextArtifact(query), *image_artifacts]), ) ).to_artifact(), )
query_images_from_memory(params)
Source Code in griptape/tools/image_query/tool.py
@activity( config={ "description": "This tool can be used to query the contents of images in memory.", "schema": Schema( { Literal( "query", description="A detailed question to be answered using the contents of the provided images.", ): str, Literal("image_artifacts", description="Image artifact memory references."): [ {"image_artifact_namespace": str, "image_artifact_name": str}, ], "memory_name": str, }, ), }, ) def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact: query = params["values"]["query"] image_artifact_references = params["values"]["image_artifacts"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") image_artifacts = [] for image_artifact_reference in image_artifact_references: try: image_artifact = load_artifact_from_memory( memory, image_artifact_reference["image_artifact_namespace"], image_artifact_reference["image_artifact_name"], ImageArtifact, ) image_artifacts.append(cast("ImageArtifact", image_artifact)) except ValueError: # If we're unable to parse the artifact as an ImageArtifact, attempt to # parse a BlobArtifact and load it as an ImageArtifact. blob_artifact = load_artifact_from_memory( memory, image_artifact_reference["image_artifact_namespace"], image_artifact_reference["image_artifact_name"], BlobArtifact, ) image_artifacts.append(self.image_loader.load(blob_artifact.value)) except Exception as e: return ErrorArtifact(str(e)) return cast( "TextArtifact", self.prompt_driver.run( PromptStack.from_artifact( ListArtifact([TextArtifact(query), *image_artifacts]), ) ).to_artifact(), )
InpaintingImageGenerationTool
Bases:
BaseImageGenerationTool
Attributes
| Name | Type | Description | 
|---|---|---|
image_generation_driver | BaseImageGenerationDriver | The image generation driver used to generate the image. | 
output_dir | Optional[str] | If provided, the generated image will be written to disk in output_dir. | 
output_file | Optional[str] | If provided, the generated image will be written to disk as output_file. | 
Source Code in griptape/tools/inpainting_image_generation/tool.py
@define class InpaintingImageGenerationTool(BaseImageGenerationTool): """A tool that can be used to generate prompted inpaintings of an image. Attributes: image_generation_driver: The image generation driver used to generate the image. output_dir: If provided, the generated image will be written to disk in output_dir. output_file: If provided, the generated image will be written to disk as output_file. """ image_generation_driver: BaseImageGenerationDriver = field(kw_only=True) image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True) @activity( config={ "description": "Modifies an image within a specified mask area using image and mask files.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, Literal("mask_file", description="The path to mask image file."): str, }, ), }, ) def image_inpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] mask_file = params["values"]["mask_file"] input_artifact = self.image_loader.load(image_file) mask_artifact = self.image_loader.load(mask_file) return self._generate_inpainting( prompt, negative_prompt, cast("ImageArtifact", input_artifact), cast("ImageArtifact", mask_artifact) ) @activity( config={ "description": "Modifies an image within a specified mask area using image and mask artifacts in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "image_artifact_namespace": str, "image_artifact_name": str, "mask_artifact_namespace": str, "mask_artifact_name": str, }, ), }, ) def image_inpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_artifact_namespace = params["values"]["image_artifact_namespace"] image_artifact_name = params["values"]["image_artifact_name"] mask_artifact_namespace = params["values"]["mask_artifact_namespace"] mask_artifact_name = params["values"]["mask_artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory( memory, image_artifact_namespace, image_artifact_name, ImageArtifact, ) mask_artifact = load_artifact_from_memory( memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact, ) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_inpainting( prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact) ) def _generate_inpainting( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact ) -> ImageArtifact: output_artifact = self.image_generation_driver.run_image_inpainting( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_generation_driver = field(kw_only=True)class-attribute instance-attributeimage_loader = field(default=ImageLoader(), kw_only=True)class-attribute instance-attribute
_generate_inpainting(prompt, negative_prompt, image_artifact, mask_artifact)
Source Code in griptape/tools/inpainting_image_generation/tool.py
def _generate_inpainting( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact ) -> ImageArtifact: output_artifact = self.image_generation_driver.run_image_inpainting( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_inpainting_from_file(params)
Source Code in griptape/tools/inpainting_image_generation/tool.py
@activity( config={ "description": "Modifies an image within a specified mask area using image and mask files.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, Literal("mask_file", description="The path to mask image file."): str, }, ), }, ) def image_inpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] mask_file = params["values"]["mask_file"] input_artifact = self.image_loader.load(image_file) mask_artifact = self.image_loader.load(mask_file) return self._generate_inpainting( prompt, negative_prompt, cast("ImageArtifact", input_artifact), cast("ImageArtifact", mask_artifact) )
image_inpainting_from_memory(params)
Source Code in griptape/tools/inpainting_image_generation/tool.py
@activity( config={ "description": "Modifies an image within a specified mask area using image and mask artifacts in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "image_artifact_namespace": str, "image_artifact_name": str, "mask_artifact_namespace": str, "mask_artifact_name": str, }, ), }, ) def image_inpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_artifact_namespace = params["values"]["image_artifact_namespace"] image_artifact_name = params["values"]["image_artifact_name"] mask_artifact_namespace = params["values"]["mask_artifact_namespace"] mask_artifact_name = params["values"]["mask_artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory( memory, image_artifact_namespace, image_artifact_name, ImageArtifact, ) mask_artifact = load_artifact_from_memory( memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact, ) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_inpainting( prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact) )
OutpaintingImageGenerationTool
Bases:
BaseImageGenerationTool
Attributes
| Name | Type | Description | 
|---|---|---|
image_generation_driver | BaseImageGenerationDriver | The image generation driver used to generate the image. | 
output_dir | Optional[str] | If provided, the generated image will be written to disk in output_dir. | 
output_file | Optional[str] | If provided, the generated image will be written to disk as output_file. | 
Source Code in griptape/tools/outpainting_image_generation/tool.py
@define class OutpaintingImageGenerationTool(BaseImageGenerationTool): """A tool that can be used to generate prompted outpaintings of an image. Attributes: image_generation_driver: The image generation driver used to generate the image. output_dir: If provided, the generated image will be written to disk in output_dir. output_file: If provided, the generated image will be written to disk as output_file. """ image_generation_driver: BaseImageGenerationDriver = field(kw_only=True) image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True) @activity( config={ "description": "Modifies an image outside a specified mask area using image and mask files.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, Literal("mask_file", description="The path to mask image file."): str, }, ), }, ) def image_outpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] mask_file = params["values"]["mask_file"] input_artifact = self.image_loader.load(image_file) mask_artifact = self.image_loader.load(mask_file) return self._generate_outpainting(prompt, negative_prompt, input_artifact, mask_artifact) @activity( config={ "description": "Modifies an image outside a specified mask area using image and mask artifacts in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "image_artifact_namespace": str, "mask_artifact_namespace": str, }, ), }, ) def image_outpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_artifact_namespace = params["values"]["image_artifact_namespace"] image_artifact_name = params["values"]["image_artifact_name"] mask_artifact_namespace = params["values"]["mask_artifact_namespace"] mask_artifact_name = params["values"]["mask_artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory( memory, image_artifact_namespace, image_artifact_name, ImageArtifact, ) mask_artifact = load_artifact_from_memory( memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact, ) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_outpainting( prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact) ) def _generate_outpainting( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact ) -> ImageArtifact | ErrorArtifact: output_artifact = self.image_generation_driver.run_image_outpainting( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_generation_driver = field(kw_only=True)class-attribute instance-attributeimage_loader = field(default=ImageLoader(), kw_only=True)class-attribute instance-attribute
_generate_outpainting(prompt, negative_prompt, image_artifact, mask_artifact)
Source Code in griptape/tools/outpainting_image_generation/tool.py
def _generate_outpainting( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact ) -> ImageArtifact | ErrorArtifact: output_artifact = self.image_generation_driver.run_image_outpainting( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_outpainting_from_file(params)
Source Code in griptape/tools/outpainting_image_generation/tool.py
@activity( config={ "description": "Modifies an image outside a specified mask area using image and mask files.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, Literal("mask_file", description="The path to mask image file."): str, }, ), }, ) def image_outpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] mask_file = params["values"]["mask_file"] input_artifact = self.image_loader.load(image_file) mask_artifact = self.image_loader.load(mask_file) return self._generate_outpainting(prompt, negative_prompt, input_artifact, mask_artifact)
image_outpainting_from_memory(params)
Source Code in griptape/tools/outpainting_image_generation/tool.py
@activity( config={ "description": "Modifies an image outside a specified mask area using image and mask artifacts in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "image_artifact_namespace": str, "mask_artifact_namespace": str, }, ), }, ) def image_outpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_artifact_namespace = params["values"]["image_artifact_namespace"] image_artifact_name = params["values"]["image_artifact_name"] mask_artifact_namespace = params["values"]["mask_artifact_namespace"] mask_artifact_name = params["values"]["mask_artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory( memory, image_artifact_namespace, image_artifact_name, ImageArtifact, ) mask_artifact = load_artifact_from_memory( memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact, ) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_outpainting( prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact) )
PromptImageGenerationTool
Bases:
BaseImageGenerationTool
Attributes
| Name | Type | Description | 
|---|---|---|
image_generation_driver | BaseImageGenerationDriver | The image generation driver used to generate the image. | 
output_dir | Optional[str] | If provided, the generated image will be written to disk in output_dir. | 
output_file | Optional[str] | If provided, the generated image will be written to disk as output_file. | 
Source Code in griptape/tools/prompt_image_generation/tool.py
@define class PromptImageGenerationTool(BaseImageGenerationTool): """A tool that can be used to generate an image from a text prompt. Attributes: image_generation_driver: The image generation driver used to generate the image. output_dir: If provided, the generated image will be written to disk in output_dir. output_file: If provided, the generated image will be written to disk as output_file. """ image_generation_driver: BaseImageGenerationDriver = field(kw_only=True) @activity( config={ "description": "Generates an image from text prompts.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, } ), }, ) def generate_image(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] output_artifact = self.image_generation_driver.run_text_to_image( prompts=[prompt], negative_prompts=[negative_prompt] ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_generation_driver = field(kw_only=True)class-attribute instance-attribute
generate_image(params)
Source Code in griptape/tools/prompt_image_generation/tool.py
@activity( config={ "description": "Generates an image from text prompts.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, } ), }, ) def generate_image(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] output_artifact = self.image_generation_driver.run_text_to_image( prompts=[prompt], negative_prompts=[negative_prompt] ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
PromptSummaryTool
Attributes
| Name | Type | Description | 
|---|---|---|
prompt_summary_engine | PromptSummaryEngine | PromptSummaryEngine. | 
Source Code in griptape/tools/prompt_summary/tool.py
@define(kw_only=True) class PromptSummaryTool(BaseTool, RuleMixin): """Tool for using a Prompt Summary Engine. Attributes: prompt_summary_engine: `PromptSummaryEngine`. """ prompt_summary_engine: PromptSummaryEngine = field(kw_only=True, default=Factory(lambda: PromptSummaryEngine())) @activity( config={ "description": "Can be used to summarize text content.", "schema": Schema( { Literal("summary"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def summarize(self, params: dict) -> BaseArtifact: summary = params["values"]["summary"] if isinstance(summary, str): artifacts = ListArtifact([TextArtifact(summary)]) else: memory = self.find_input_memory(summary["memory_name"]) artifact_namespace = summary["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") return self.prompt_summary_engine.summarize_artifacts(artifacts, rulesets=self.rulesets)
prompt_summary_engine = field(kw_only=True, default=Factory(lambda: PromptSummaryEngine()))class-attribute instance-attribute
summarize(params)
Source Code in griptape/tools/prompt_summary/tool.py
@activity( config={ "description": "Can be used to summarize text content.", "schema": Schema( { Literal("summary"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def summarize(self, params: dict) -> BaseArtifact: summary = params["values"]["summary"] if isinstance(summary, str): artifacts = ListArtifact([TextArtifact(summary)]) else: memory = self.find_input_memory(summary["memory_name"]) artifact_namespace = summary["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") return self.prompt_summary_engine.summarize_artifacts(artifacts, rulesets=self.rulesets)
QueryTool
Source Code in griptape/tools/query/tool.py
@define(kw_only=True) class QueryTool(BaseTool, RuleMixin): """Tool for performing a query against data.""" prompt_driver: BasePromptDriver = field(default=Factory(lambda: Defaults.drivers_config.prompt_driver)) _rag_engine: RagEngine = field( default=Factory( lambda self: RagEngine( response_stage=ResponseRagStage( response_modules=[ PromptResponseRagModule(prompt_driver=self.prompt_driver, rulesets=self.rulesets) ], ), ), takes_self=True, ), alias="_rag_engine", ) @activity( config={ "description": "Can be used to search through textual content.", "schema": Schema( { Literal("query", description="A natural language search query"): str, Literal("content"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def query(self, params: dict) -> ListArtifact | ErrorArtifact: query = params["values"]["query"] content = params["values"]["content"] if isinstance(content, str): text_artifacts = [TextArtifact(content)] else: memory = self.find_input_memory(content["memory_name"]) artifact_namespace = content["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") text_artifacts = [artifact for artifact in artifacts if isinstance(artifact, TextArtifact)] outputs = self._rag_engine.process(RagContext(query=query, text_chunks=text_artifacts)).outputs if len(outputs) > 0: return ListArtifact(outputs) return ErrorArtifact("query output is empty")
_rag_engine = field(default=Factory(lambda self: RagEngine(response_stage=ResponseRagStage(response_modules=[PromptResponseRagModule(prompt_driver=self.prompt_driver, rulesets=self.rulesets)])), takes_self=True), alias='_rag_engine')class-attribute instance-attributeprompt_driver = field(default=Factory(lambda: Defaults.drivers_config.prompt_driver))class-attribute instance-attribute
query(params)
Source Code in griptape/tools/query/tool.py
@activity( config={ "description": "Can be used to search through textual content.", "schema": Schema( { Literal("query", description="A natural language search query"): str, Literal("content"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def query(self, params: dict) -> ListArtifact | ErrorArtifact: query = params["values"]["query"] content = params["values"]["content"] if isinstance(content, str): text_artifacts = [TextArtifact(content)] else: memory = self.find_input_memory(content["memory_name"]) artifact_namespace = content["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") text_artifacts = [artifact for artifact in artifacts if isinstance(artifact, TextArtifact)] outputs = self._rag_engine.process(RagContext(query=query, text_chunks=text_artifacts)).outputs if len(outputs) > 0: return ListArtifact(outputs) return ErrorArtifact("query output is empty")
RagTool
Bases:
BaseTool
Attributes
| Name | Type | Description | 
|---|---|---|
description | str | LLM-friendly RAG engine description. | 
rag_engine | RagEngine | RagEngine. | 
Source Code in griptape/tools/rag/tool.py
@define(kw_only=True) class RagTool(BaseTool): """Tool for querying a RAG engine. Attributes: description: LLM-friendly RAG engine description. rag_engine: `RagEngine`. """ description: str = field() rag_engine: RagEngine = field() @activity( config={ "description": "{{ _self.description }}", "schema": Schema({Literal("query", description="A natural language search query"): str}), }, ) def search(self, params: dict) -> ListArtifact | ErrorArtifact: query = params["values"]["query"] try: artifacts = self.rag_engine.process_query(query).outputs outputs = [] for artifact in artifacts: if isinstance(artifact, ListArtifact): outputs.extend(artifact.value) else: outputs.append(artifact) if len(outputs) > 0: return ListArtifact(outputs) return ErrorArtifact("query output is empty") except Exception as e: return ErrorArtifact(f"error querying: {e}")
description = field()class-attribute instance-attributerag_engine = field()class-attribute instance-attribute
search(params)
Source Code in griptape/tools/rag/tool.py
@activity( config={ "description": "{{ _self.description }}", "schema": Schema({Literal("query", description="A natural language search query"): str}), }, ) def search(self, params: dict) -> ListArtifact | ErrorArtifact: query = params["values"]["query"] try: artifacts = self.rag_engine.process_query(query).outputs outputs = [] for artifact in artifacts: if isinstance(artifact, ListArtifact): outputs.extend(artifact.value) else: outputs.append(artifact) if len(outputs) > 0: return ListArtifact(outputs) return ErrorArtifact("query output is empty") except Exception as e: return ErrorArtifact(f"error querying: {e}")
RestApiTool
Bases:
BaseTool
Attributes
| Name | Type | Description | 
|---|---|---|
base_url | str | The base url that will be used for the request. | 
path | Optional[str] | The resource path that will be appended to base_url. | 
description | str | A description of what the REST API does. | 
request_body_schema | Optional[str] | A JSON schema string describing the request body. Recommended for PUT, POST, and PATCH requests. | 
request_query_params_schema | Optional[str] | A JSON schema string describing the available query parameters. | 
request_path_params_schema | Optional[str] | A JSON schema string describing the available path parameters. The schema must describe an array of string values. | 
response_body_schema | Optional[str] | A JSON schema string describing the response body. | 
request_headers | Optional[dict[str, str]] | Headers to include in the requests. | 
Source Code in griptape/tools/rest_api/tool.py
@define class RestApiTool(BaseTool): """A tool for making REST API requests. Attributes: base_url: The base url that will be used for the request. path: The resource path that will be appended to base_url. description: A description of what the REST API does. request_body_schema: A JSON schema string describing the request body. Recommended for PUT, POST, and PATCH requests. request_query_params_schema: A JSON schema string describing the available query parameters. request_path_params_schema: A JSON schema string describing the available path parameters. The schema must describe an array of string values. response_body_schema: A JSON schema string describing the response body. request_headers: Headers to include in the requests. """ base_url: str = field(kw_only=True) path: Optional[str] = field(default=None, kw_only=True) description: str = field(kw_only=True) request_path_params_schema: Optional[str] = field(default=None, kw_only=True) request_query_params_schema: Optional[str] = field(default=None, kw_only=True) request_body_schema: Optional[str] = field(default=None, kw_only=True) response_body_schema: Optional[str] = field(default=None, kw_only=True) request_headers: Optional[dict[str, str]] = field(default=None, kw_only=True) @property def full_url(self) -> str: return self._build_url(self.base_url, path=self.path) @activity( config={ "description": dedent( """ This tool can be used to make a put request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema({Literal("body", description="The request body."): dict}), }, ) def put(self, params: dict) -> BaseArtifact: from requests import exceptions, put values = params["values"] base_url = self.base_url path = self.path body = values["body"] url = self._build_url(base_url, path=path) try: response = put(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err)) @activity( config={ "description": dedent( """ This tool can be used to make a patch request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema( { Literal("path_params", description="The request path parameters."): Schema([str]), Literal("body", description="The request body."): dict, }, ), }, ) def patch(self, params: dict) -> BaseArtifact: from requests import exceptions, patch values = params["values"] base_url = self.base_url path = self.path body = values["body"] path_params = values["path_params"] url = self._build_url(base_url, path=path, path_params=path_params) try: response = patch(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err)) @activity( config={ "description": dedent( """ This tool can be used to make a post request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema({Literal("body", description="The request body."): dict}), }, ) def post(self, params: dict) -> BaseArtifact: from requests import exceptions, post values = params["values"] base_url = self.base_url path = self.path url = self._build_url(base_url, path=path) body = values["body"] try: response = post(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err)) @activity( config={ "description": dedent( """ This tool can be used to make a get request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": schema.Optional( Schema( { schema.Optional(Literal("query_params", description="The request query parameters.")): dict, schema.Optional(Literal("path_params", description="The request path parameters.")): Schema( [str] ), }, ), ), }, ) def get(self, params: dict) -> BaseArtifact: from requests import exceptions, get values = params["values"] base_url = self.base_url path = self.path query_params = {} path_params = [] if values: query_params = values.get("query_params", {}) path_params = values.get("path_params", []) url = self._build_url(base_url, path=path, path_params=path_params) try: response = get(url, params=query_params, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err)) @activity( config={ "description": dedent( """ This tool can be used to make a delete request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} """, ), "schema": Schema( { schema.Optional(Literal("query_params", description="The request query parameters.")): dict, schema.Optional(Literal("path_params", description="The request path parameters.")): Schema([str]), }, ), }, ) def delete(self, params: dict) -> BaseArtifact: from requests import delete, exceptions values = params["values"] base_url = self.base_url path = self.path query_params = values.get("query_params", {}) path_params = values.get("path_params", []) url = self._build_url(base_url, path=path, path_params=path_params) try: response = delete(url, params=query_params, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err)) def _build_url(self, base_url: str, path: Optional[str] = None, path_params: Optional[list] = None) -> str: url = "" if path: url += path.strip("/") if path_params: url += f"/{str.join('/', map(str, path_params))}" return urljoin(base_url.strip("/"), url)
base_url = field(kw_only=True)class-attribute instance-attributedescription = field(kw_only=True)class-attribute instance-attributefull_urlpropertypath = field(default=None, kw_only=True)class-attribute instance-attributerequest_body_schema = field(default=None, kw_only=True)class-attribute instance-attributerequest_headers = field(default=None, kw_only=True)class-attribute instance-attributerequest_path_params_schema = field(default=None, kw_only=True)class-attribute instance-attributerequest_query_params_schema = field(default=None, kw_only=True)class-attribute instance-attributeresponse_body_schema = field(default=None, kw_only=True)class-attribute instance-attribute
_build_url(base_url, path=None, path_params=None)
Source Code in griptape/tools/rest_api/tool.py
def _build_url(self, base_url: str, path: Optional[str] = None, path_params: Optional[list] = None) -> str: url = "" if path: url += path.strip("/") if path_params: url += f"/{str.join('/', map(str, path_params))}" return urljoin(base_url.strip("/"), url)
delete(params)
Source Code in griptape/tools/rest_api/tool.py
@activity( config={ "description": dedent( """ This tool can be used to make a delete request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} """, ), "schema": Schema( { schema.Optional(Literal("query_params", description="The request query parameters.")): dict, schema.Optional(Literal("path_params", description="The request path parameters.")): Schema([str]), }, ), }, ) def delete(self, params: dict) -> BaseArtifact: from requests import delete, exceptions values = params["values"] base_url = self.base_url path = self.path query_params = values.get("query_params", {}) path_params = values.get("path_params", []) url = self._build_url(base_url, path=path, path_params=path_params) try: response = delete(url, params=query_params, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err))
get(params)
Source Code in griptape/tools/rest_api/tool.py
@activity( config={ "description": dedent( """ This tool can be used to make a get request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": schema.Optional( Schema( { schema.Optional(Literal("query_params", description="The request query parameters.")): dict, schema.Optional(Literal("path_params", description="The request path parameters.")): Schema( [str] ), }, ), ), }, ) def get(self, params: dict) -> BaseArtifact: from requests import exceptions, get values = params["values"] base_url = self.base_url path = self.path query_params = {} path_params = [] if values: query_params = values.get("query_params", {}) path_params = values.get("path_params", []) url = self._build_url(base_url, path=path, path_params=path_params) try: response = get(url, params=query_params, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err))
patch(params)
Source Code in griptape/tools/rest_api/tool.py
@activity( config={ "description": dedent( """ This tool can be used to make a patch request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema( { Literal("path_params", description="The request path parameters."): Schema([str]), Literal("body", description="The request body."): dict, }, ), }, ) def patch(self, params: dict) -> BaseArtifact: from requests import exceptions, patch values = params["values"] base_url = self.base_url path = self.path body = values["body"] path_params = values["path_params"] url = self._build_url(base_url, path=path, path_params=path_params) try: response = patch(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err))
post(params)
Source Code in griptape/tools/rest_api/tool.py
@activity( config={ "description": dedent( """ This tool can be used to make a post request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema({Literal("body", description="The request body."): dict}), }, ) def post(self, params: dict) -> BaseArtifact: from requests import exceptions, post values = params["values"] base_url = self.base_url path = self.path url = self._build_url(base_url, path=path) body = values["body"] try: response = post(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err))
put(params)
Source Code in griptape/tools/rest_api/tool.py
@activity( config={ "description": dedent( """ This tool can be used to make a put request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema({Literal("body", description="The request body."): dict}), }, ) def put(self, params: dict) -> BaseArtifact: from requests import exceptions, put values = params["values"] base_url = self.base_url path = self.path body = values["body"] url = self._build_url(base_url, path=path) try: response = put(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err))
SqlTool
Bases:
BaseTool
Source Code in griptape/tools/sql/tool.py
@define class SqlTool(BaseTool): sql_loader: SqlLoader = field(kw_only=True) schema_name: Optional[str] = field(default=None, kw_only=True) table_name: str = field(kw_only=True) table_description: Optional[str] = field(default=None, kw_only=True) engine_name: Optional[str] = field(default=None, kw_only=True) @property def full_table_name(self) -> str: return f"{self.schema_name}.{self.table_name}" if self.schema_name else self.table_name @property def table_schema(self) -> Optional[str]: return self.sql_loader.sql_driver.get_table_schema(self.full_table_name, schema=self.schema_name) @activity( config={ "description": "Can be used to execute{% if _self.engine_name %} {{ _self.engine_name }}{% endif %} SQL SELECT queries " "in table {{ _self.full_table_name }}. " "Make sure the `SELECT` statement contains enough columns to get an answer without knowing " "the original question. " "Be creative when you use `WHERE` statements: you can use wildcards, `LOWER()`, and other functions " "to get better results. " "You can use JOINs if more tables are available in other tools.\n" "{{ _self.table_name }} schema: {{ _self.table_schema }}\n" "{% if _self.table_description %}{{ _self.table_name }} description: {{ _self.table_description }}{% endif %}", "schema": Schema({"sql_query": str}), }, ) def execute_query(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact: try: query = params["values"]["sql_query"] rows = self.sql_loader.load(query) except Exception as e: return ErrorArtifact(f"error executing query: {e}") if len(rows) > 0: return rows return InfoArtifact("No results found")
engine_name = field(default=None, kw_only=True)class-attribute instance-attributefull_table_namepropertyschema_name = field(default=None, kw_only=True)class-attribute instance-attributesql_loader = field(kw_only=True)class-attribute instance-attributetable_description = field(default=None, kw_only=True)class-attribute instance-attributetable_name = field(kw_only=True)class-attribute instance-attributetable_schemaproperty
execute_query(params)
Source Code in griptape/tools/sql/tool.py
@activity( config={ "description": "Can be used to execute{% if _self.engine_name %} {{ _self.engine_name }}{% endif %} SQL SELECT queries " "in table {{ _self.full_table_name }}. " "Make sure the `SELECT` statement contains enough columns to get an answer without knowing " "the original question. " "Be creative when you use `WHERE` statements: you can use wildcards, `LOWER()`, and other functions " "to get better results. " "You can use JOINs if more tables are available in other tools.\n" "{{ _self.table_name }} schema: {{ _self.table_schema }}\n" "{% if _self.table_description %}{{ _self.table_name }} description: {{ _self.table_description }}{% endif %}", "schema": Schema({"sql_query": str}), }, ) def execute_query(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact: try: query = params["values"]["sql_query"] rows = self.sql_loader.load(query) except Exception as e: return ErrorArtifact(f"error executing query: {e}") if len(rows) > 0: return rows return InfoArtifact("No results found")
StructureRunTool
Bases:
BaseTool
Attributes
| Name | Type | Description | 
|---|---|---|
description | str | A description of what the Structure does. | 
structure_run_driver | BaseStructureRunDriver | Driver to run the Structure. | 
Source Code in griptape/tools/structure_run/tool.py
@define class StructureRunTool(BaseTool): """Tool for running a Structure. Attributes: description: A description of what the Structure does. structure_run_driver: Driver to run the Structure. """ description: str = field(kw_only=True, metadata={"serializable": True}) structure_run_driver: BaseStructureRunDriver = field(kw_only=True, metadata={"serializable": True}) @activity( config={ "description": "Can be used to run a Gen AI Builder Structure with the following description: {{ _self.description }}", "schema": Schema( { Literal("args", description="A list of string arguments to submit to the Structure Run"): Schema( [str] ) }, ), }, ) def run_structure(self, params: dict) -> BaseArtifact: args: list[str] = params["values"]["args"] return self.structure_run_driver.run(*[TextArtifact(arg) for arg in args])
description = field(kw_only=True, metadata={'serializable': True})class-attribute instance-attributestructure_run_driver = field(kw_only=True, metadata={'serializable': True})class-attribute instance-attribute
run_structure(params)
Source Code in griptape/tools/structure_run/tool.py
@activity( config={ "description": "Can be used to run a Gen AI Builder Structure with the following description: {{ _self.description }}", "schema": Schema( { Literal("args", description="A list of string arguments to submit to the Structure Run"): Schema( [str] ) }, ), }, ) def run_structure(self, params: dict) -> BaseArtifact: args: list[str] = params["values"]["args"] return self.structure_run_driver.run(*[TextArtifact(arg) for arg in args])
StructuredOutputTool
Bases:
BaseTool
Source Code in griptape/tools/structured_output/tool.py
@define class StructuredOutputTool(BaseTool): output_schema: Union[Schema, type[BaseModel]] = field(kw_only=True) @activity( config={ "description": "Used to provide the final response which ends this conversation.", "schema": lambda _self: _self.output_schema, } ) def provide_output(self, params: dict) -> BaseArtifact: return JsonArtifact(params["values"])
output_schema = field(kw_only=True)class-attribute instance-attribute
provide_output(params)
Source Code in griptape/tools/structured_output/tool.py
@activity( config={ "description": "Used to provide the final response which ends this conversation.", "schema": lambda _self: _self.output_schema, } ) def provide_output(self, params: dict) -> BaseArtifact: return JsonArtifact(params["values"])
TextToSpeechTool
Bases:
ArtifactFileOutputMixin
,  BaseTool
Attributes
| Name | Type | Description | 
|---|---|---|
text_to_speech_driver | BaseTextToSpeechDriver | The text to audio generation driver used to generate the speech audio. | 
output_dir | Optional[str] | If provided, the generated audio will be written to disk in output_dir. | 
output_file | Optional[str] | If provided, the generated audio will be written to disk as output_file. | 
Source Code in griptape/tools/text_to_speech/tool.py
@define class TextToSpeechTool(ArtifactFileOutputMixin, BaseTool): """A tool that can be used to generate speech from input text. Attributes: text_to_speech_driver: The text to audio generation driver used to generate the speech audio. output_dir: If provided, the generated audio will be written to disk in output_dir. output_file: If provided, the generated audio will be written to disk as output_file. """ text_to_speech_driver: BaseTextToSpeechDriver = field(kw_only=True) @activity( config={ "description": "Can be used to generate speech from the provided input text.", "schema": Schema({Literal("text", description="The literal text to be converted to speech."): str}), }, ) def text_to_speech(self, params: dict[str, Any]) -> AudioArtifact | ErrorArtifact: text = params["values"]["text"] output_artifact = self.text_to_speech_driver.run_text_to_audio(prompts=[text]) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
text_to_speech_driver = field(kw_only=True)class-attribute instance-attribute
text_to_speech(params)
Source Code in griptape/tools/text_to_speech/tool.py
@activity( config={ "description": "Can be used to generate speech from the provided input text.", "schema": Schema({Literal("text", description="The literal text to be converted to speech."): str}), }, ) def text_to_speech(self, params: dict[str, Any]) -> AudioArtifact | ErrorArtifact: text = params["values"]["text"] output_artifact = self.text_to_speech_driver.run_text_to_audio(prompts=[text]) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
VariationImageGenerationTool
Bases:
BaseImageGenerationTool
Attributes
| Name | Type | Description | 
|---|---|---|
image_generation_driver | BaseImageGenerationDriver | The image generation driver used to generate the image. | 
output_dir | Optional[str] | If provided, the generated image will be written to disk in output_dir. | 
output_file | Optional[str] | If provided, the generated image will be written to disk as output_file. | 
Source Code in griptape/tools/variation_image_generation/tool.py
@define class VariationImageGenerationTool(BaseImageGenerationTool): """A tool that can be used to generate prompted variations of an image. Attributes: image_generation_driver: The image generation driver used to generate the image. output_dir: If provided, the generated image will be written to disk in output_dir. output_file: If provided, the generated image will be written to disk as output_file. """ image_generation_driver: BaseImageGenerationDriver = field(kw_only=True) image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True) @activity( config={ "description": "Generates a variation of a given input image file.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, }, ), }, ) def image_variation_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] image_artifact = self.image_loader.load(image_file) return self._generate_variation(prompt, negative_prompt, image_artifact) @activity( config={ "description": "Generates a variation of a given input image artifact in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "artifact_namespace": str, "artifact_name": str, }, ), }, ) def image_variation_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] artifact_namespace = params["values"]["artifact_namespace"] artifact_name = params["values"]["artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory(memory, artifact_namespace, artifact_name, ImageArtifact) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_variation(prompt, negative_prompt, cast("ImageArtifact", image_artifact)) def _generate_variation( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact ) -> ImageArtifact | ErrorArtifact: output_artifact = self.image_generation_driver.run_image_variation( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_generation_driver = field(kw_only=True)class-attribute instance-attributeimage_loader = field(default=ImageLoader(), kw_only=True)class-attribute instance-attribute
_generate_variation(prompt, negative_prompt, image_artifact)
Source Code in griptape/tools/variation_image_generation/tool.py
def _generate_variation( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact ) -> ImageArtifact | ErrorArtifact: output_artifact = self.image_generation_driver.run_image_variation( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_variation_from_file(params)
Source Code in griptape/tools/variation_image_generation/tool.py
@activity( config={ "description": "Generates a variation of a given input image file.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, }, ), }, ) def image_variation_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] image_artifact = self.image_loader.load(image_file) return self._generate_variation(prompt, negative_prompt, image_artifact)
image_variation_from_memory(params)
Source Code in griptape/tools/variation_image_generation/tool.py
@activity( config={ "description": "Generates a variation of a given input image artifact in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "artifact_namespace": str, "artifact_name": str, }, ), }, ) def image_variation_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] artifact_namespace = params["values"]["artifact_namespace"] artifact_name = params["values"]["artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory(memory, artifact_namespace, artifact_name, ImageArtifact) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_variation(prompt, negative_prompt, cast("ImageArtifact", image_artifact))
VectorStoreTool
Bases:
BaseTool
Attributes
| Name | Type | Description | 
|---|---|---|
description | str | LLM-friendly vector DB description. | 
vector_store_driver | BaseVectorStoreDriver | BaseVectorStoreDriver. | 
query_params | dict[str, Any] | Optional dictionary of vector store driver query parameters. | 
process_query_output | Callable[[list[Entry]], BaseArtifact] | Optional lambda for processing vector store driver query output Entrys. | 
Source Code in griptape/tools/vector_store/tool.py
@define(kw_only=True) class VectorStoreTool(BaseTool): """A tool for querying a vector database. Attributes: description: LLM-friendly vector DB description. vector_store_driver: `BaseVectorStoreDriver`. query_params: Optional dictionary of vector store driver query parameters. process_query_output: Optional lambda for processing vector store driver query output `Entry`s. """ DEFAULT_TOP_N = 5 description: str = field() vector_store_driver: BaseVectorStoreDriver = field() query_params: dict[str, Any] = field(factory=dict) process_query_output: Callable[[list[BaseVectorStoreDriver.Entry]], BaseArtifact] = field( default=Factory(lambda: lambda es: ListArtifact([e.to_artifact() for e in es])), ) @activity( config={ "description": "Can be used to search a database with the following description: {{ _self.description }}", "schema": Schema( { Literal( "query", description="A natural language search query to run against the vector database", ): str, }, ), }, ) def search(self, params: dict) -> BaseArtifact: query = params["values"]["query"] try: return self.process_query_output(self.vector_store_driver.query(query, **self.query_params)) except Exception as e: return ErrorArtifact(f"error querying vector store: {e}")
DEFAULT_TOP_N = 5class-attribute instance-attributedescription = field()class-attribute instance-attributeprocess_query_output = field(default=Factory(lambda: lambda es: ListArtifact([e.to_artifact() for e in es])))class-attribute instance-attributequery_params = field(factory=dict)class-attribute instance-attributevector_store_driver = field()class-attribute instance-attribute
search(params)
Source Code in griptape/tools/vector_store/tool.py
@activity( config={ "description": "Can be used to search a database with the following description: {{ _self.description }}", "schema": Schema( { Literal( "query", description="A natural language search query to run against the vector database", ): str, }, ), }, ) def search(self, params: dict) -> BaseArtifact: query = params["values"]["query"] try: return self.process_query_output(self.vector_store_driver.query(query, **self.query_params)) except Exception as e: return ErrorArtifact(f"error querying vector store: {e}")
WebScraperTool
Bases:
BaseTool
Source Code in griptape/tools/web_scraper/tool.py
@define class WebScraperTool(BaseTool): web_loader: WebLoader = field(default=Factory(lambda: WebLoader()), kw_only=True) text_chunker: TextChunker = field(default=Factory(lambda: TextChunker(max_tokens=400)), kw_only=True) @activity( config={ "description": "Can be used to browse a web page and load its content", "schema": Schema({Literal("url", description="Valid HTTP URL"): str}), }, ) def get_content(self, params: dict) -> ListArtifact | ErrorArtifact: url = params["values"]["url"] try: result = self.web_loader.load(url) chunks = self.text_chunker.chunk(result) return ListArtifact(chunks) except Exception as e: return ErrorArtifact("Error getting page content: " + str(e))
text_chunker = field(default=Factory(lambda: TextChunker(max_tokens=400)), kw_only=True)class-attribute instance-attributeweb_loader = field(default=Factory(lambda: WebLoader()), kw_only=True)class-attribute instance-attribute
get_content(params)
Source Code in griptape/tools/web_scraper/tool.py
@activity( config={ "description": "Can be used to browse a web page and load its content", "schema": Schema({Literal("url", description="Valid HTTP URL"): str}), }, ) def get_content(self, params: dict) -> ListArtifact | ErrorArtifact: url = params["values"]["url"] try: result = self.web_loader.load(url) chunks = self.text_chunker.chunk(result) return ListArtifact(chunks) except Exception as e: return ErrorArtifact("Error getting page content: " + str(e))
WebSearchTool
Bases:
BaseTool
Source Code in griptape/tools/web_search/tool.py
@define class WebSearchTool(BaseTool): web_search_driver: BaseWebSearchDriver = field(kw_only=True) @activity( config={ "description": "Can be used for searching the web via the {{ _self.web_search_driver.__class__.__name__}}.", "schema": Schema( { Literal( "query", description="Search engine request that returns a list of pages with titles, descriptions, and URLs", ): str, }, ), }, ) def search(self, values: dict) -> ListArtifact | ErrorArtifact: # `BaseWebSearchDriver.query` already has a parameter named `query`, so we need to pop it from the values # to avoid passing it twice. query = values.pop("query") try: return self.web_search_driver.search(query, **values) except Exception as e: return ErrorArtifact(f"Error searching '{query}' with {self.web_search_driver.__class__.__name__}: {e}")
web_search_driver = field(kw_only=True)class-attribute instance-attribute
search(values)
Source Code in griptape/tools/web_search/tool.py
@activity( config={ "description": "Can be used for searching the web via the {{ _self.web_search_driver.__class__.__name__}}.", "schema": Schema( { Literal( "query", description="Search engine request that returns a list of pages with titles, descriptions, and URLs", ): str, }, ), }, ) def search(self, values: dict) -> ListArtifact | ErrorArtifact: # `BaseWebSearchDriver.query` already has a parameter named `query`, so we need to pop it from the values # to avoid passing it twice. query = values.pop("query") try: return self.web_search_driver.search(query, **values) except Exception as e: return ErrorArtifact(f"Error searching '{query}' with {self.web_search_driver.__class__.__name__}: {e}")
- On this page
 - BaseImageGenerationTool
 - BaseTool
 - CalculatorTool
 - ComputerTool
 - DateTimeTool
 - EmailTool
 - ExtractionTool
 - FileManagerTool
 - GriptapeCloudToolTool
 - ImageQueryTool
 - InpaintingImageGenerationTool
 - OutpaintingImageGenerationTool
 - PromptImageGenerationTool
 - PromptSummaryTool
 - QueryTool
 - RagTool
 - RestApiTool
 - SqlTool
 - StructureRunTool
 - StructuredOutputTool
 - TextToSpeechTool
 - VariationImageGenerationTool
 - VectorStoreTool
 - WebScraperTool
 - WebSearchTool