Skip to content

Connections

Connection plugins create and manage per-host sessions for tasks that declare a connection_plugin_name. They are selected by task metadata, and each host may provide plugin-specific connection overrides through connection_options.

Select A Connection Plugin

Tasks choose a connection plugin by name.

use genja::genja_task;

struct BackupConfig;

#[genja_task(name = "backup_config", connection_plugin_name = "ssh")]
impl BackupConfig {
    async fn start_async(
        &self,
        _host: &genja::genja_core::inventory::Host,
        _context: &genja::genja_core::task::TaskRuntimeContext,
    ) -> Result<genja::genja_core::task::HostTaskResult, genja::genja_core::task::TaskError> {
        Ok(genja::genja_core::task::HostTaskResult::passed(
            genja::genja_core::task::TaskSuccess::new(),
        ))
    }
}
from genja.task import task


@task(name="backup_config", connection_plugin_name="ssh")
class BackupConfig:
    ...

The named plugin must already be registered, and the host inventory must resolve the connection parameters that plugin needs.

Inventory Connection Options

Hosts, groups, and defaults may define connection_options. Genja merges these values while resolving the connection requested by the task.

router1:
  hostname: 10.0.0.1
  platform: ios
  username: admin
  connection_options:
    ssh:
      port: 22
      extras:
        device_type: cisco_ios
    netconf:
      port: 830

core:
  connection_options:
    ssh:
      username: automation

defaults:
  connection_options:
    ssh:
      extras:
        timeout: 30

The resolved connection payload contains:

  • hostname
  • port
  • username
  • password
  • platform
  • extras

Rust Connection Plugins

Rust connection plugins implement PluginConnection. The trait is async because open(...) and execute_command(...) may perform network I/O.

Required methods:

  • create(...)
  • open(...)
  • close(...)
  • is_alive()

Optional defaulted methods:

  • execute_command(...)
  • group()
use genja::async_trait;
use genja::genja_core::inventory::{ConnectionKey, ResolvedConnectionParams};
use genja_plugin_manager::plugin_types::{Plugin, PluginConnection, Plugins};
use genja_plugin_manager::PluginManager;

// Factory: Creates connection instances
#[derive(Debug)]
struct SshPlugin;

impl Plugin for SshPlugin {
    fn name(&self) -> String {
        "ssh".to_string()
    }
}

#[async_trait]
impl PluginConnection for SshPlugin {
    // Factory only needs create() - other methods can panic or return defaults
    fn create(&self, key: &ConnectionKey) -> Box<dyn PluginConnection> {
        Box::new(SshConnection::new(key.clone()))
    }

    async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
        panic!("open() should not be called on factory")
    }

    fn close(&mut self) -> ConnectionKey {
        panic!("close() should not be called on factory")
    }

    fn is_alive(&self) -> bool {
        false // Factory itself is not a connection
    }
}

// Instance: Manages a specific host connection
#[derive(Debug)]
struct SshConnection {
    key: ConnectionKey,
    alive: bool,
}

impl SshConnection {
    fn new(key: ConnectionKey) -> Self {
        Self {
            key,
            alive: false,
        }
    }
}

impl Plugin for SshConnection {
    fn name(&self) -> String {
        "ssh".to_string()
    }
}

#[async_trait]
impl PluginConnection for SshConnection {
    fn create(&self, key: &ConnectionKey) -> Box<dyn PluginConnection> {
        // Instance shouldn't create new instances
        panic!("create() should not be called on connection instance")
    }

    async fn open(&mut self, params: &ResolvedConnectionParams) -> Result<(), String> {
        // Actual SSH connection logic would go here
        self.alive = true;
        Ok(())
    }

    async fn execute_command(&mut self, command: &str) -> Result<String, String> {
        if !self.alive {
            return Err("Connection not open".to_string());
        }
        Ok(format!("ran {command}"))
    }

    fn close(&mut self) -> ConnectionKey {
        self.alive = false;
        self.key.clone()
    }

    fn is_alive(&self) -> bool {
        self.alive
    }
}

// Register only the factory
let mut plugins = PluginManager::new();
plugins.register_plugin(Plugins::Connection(Box::new(SshPlugin)));

create(...) returns a per-host connection instance. The registered plugin acts as the factory; the returned SshConnection instance handles open, command execution, liveness checks, and close.

Python Connection Plugins

Python connection plugins extend ConnectionPluginBase and return a ConnectionBase instance from create(...).

import genja as genja_lib
from genja.connection import (
    ConnectionBase,
    ConnectionKey,
    ConnectionPluginBase,
    ResolvedConnectionParams,
)


class SshConnection(ConnectionBase):
    def __init__(self, key: ConnectionKey):
        self.key = key
        self.alive = False

    def open(self, params: ResolvedConnectionParams) -> None:
        self.alive = True

    def execute_command(self, command: str) -> str:
        return f"ran {command}"

    def close(self) -> ConnectionKey:
        self.alive = False
        return self.key

    def is_alive(self) -> bool:
        return self.alive


class SshPlugin(ConnectionPluginBase):
    name = "ssh"

    def create(self, key: ConnectionKey) -> SshConnection:
        return SshConnection(key)


plugins = genja_lib.PluginManager()
plugins.register_plugin(SshPlugin())

Async Variant

Python connection factories and instance methods may be synchronous or asynchronous. Async implementations use the same base classes:

from genja.connection import (
    ConnectionBase,
    ConnectionKey,
    ConnectionPluginBase,
    ResolvedConnectionParams,
)


class AsyncSshConnection(ConnectionBase):
    def __init__(self, key: ConnectionKey):
        self.key = key
        self.alive = False

    async def open(self, params: ResolvedConnectionParams) -> None:
        self.alive = True

    async def execute_command(self, command: str) -> str:
        return f"ran {command}"

    async def close(self) -> ConnectionKey:
        self.alive = False
        return self.key

    async def is_alive(self) -> bool:
        return self.alive


class AsyncSshPlugin(ConnectionPluginBase):
    name = "ssh"

    async def create(self, key: ConnectionKey) -> AsyncSshConnection:
        return AsyncSshConnection(key)

Runtime Context

When a task declares a connection plugin, the runtime passes a TaskRuntimeContext value into start(...). That context is created for the current task execution on the current host.

The connection plugin result is accessed from the runtime context:

  • Rust: TaskRuntimeContext::connection()
  • Python: TaskRuntimeContext.connection()

The connection accessors are the relevant public task-facing API for connection-aware Python tasks.

use genja::genja_core::inventory::Host;
use genja::genja_core::task::{
    HostTaskResult, TaskError, TaskRuntimeContext, TaskSuccess,
};

async fn start(
    &self,
    _host: &Host,
    context: &TaskRuntimeContext,
) -> Result<HostTaskResult, TaskError> {
    let output = context.execute_command("show version").await?;
    Ok(HostTaskResult::passed(TaskSuccess::new().with_result(output)))
}
from genja.task import Host, TaskInfo, TaskRuntimeContext, TaskSuccessResult


def start(
    self,
    task: TaskInfo,
    host: Host,
    context: TaskRuntimeContext,
) -> TaskSuccessResult:
    connection = context.connection()
    output = None
    if connection is not None:
        output = connection.execute_command("show version")
    return TaskSuccessResult(
        summary=f"connected to {host.hostname}",
        metadata={"show_version": output},
    )