2011-04-26 5 views
3

ssh 명령 줄 인터페이스를 구현하는 서브 프로세스를 실행하여 원격 제어 기능을 제공하는 Python 모듈이 있습니까?openssh 클라이언트를 감싸는 엔트리 포인트를 제공하는 파이썬 모듈이 있습니까?

  • 실행하는 컴퓨터에서이 명령 및 반환 종료 코드, 표준 출력 및 표준 오류 또는 디렉토리가 해당 시스템
  • 에 존재하는지 여부를 타임 아웃 후
  • 테스트를 예외를 발생 : 유용 할 기능의 종류는 다음과

내가 paramikoConch의 알고 해당 시스템에서 그 파일 이름이 바이트 물품. 그러나,이 모듈을 사용하는 프로그램은 장시간 실행중인 기존 ssh 연결을 대상 시스템에 재사용하는 openssh ControlMaster 기능을 사용할 수 있기를 바랍니다. 그렇지 않으면 SSH 연결 설정 시간이 실행 ​​시간을 압도 할 수 있습니다. 또한이 방법을 사용하면 모듈을 사용하는 프로그램을 다시 작성하지 않고도 openssh 명령 줄 인터페이스를 구현하는 다른 프로그램을 사용할 수 있습니다.

답변

1

rsync를 사용하고자하는 것과 같은 소리, 특히 "해당 시스템에 디렉토리가 존재하는지 테스트", "이 바이트를 해당 컴퓨터의 해당 파일 이름에 기록", " 해당 라인의 콜백은 해당 머신의 해당 파일에 기록됩니다. " 사람들이 제공 한 기존의 Python-rsync 브리지 중 일부를 사용하거나 자신의 브릿지 중 하나를 만들 수 있습니다.

데몬 프로세스를 유지 관리하지 않고 장기 실행 SSH 터널을 열어 둘 수있는 방법은 없습니다. paramiko와 Conch는 모두 SSH 채널을 지원합니다. SSH 채널은 고전적인 SSH 포트 리디렉션과 마찬가지로 데몬 프로세스에서 쉽게 실행될 수 있습니다.

+0

ControlMaster 모드의 openssh는 사실상 데몬입니다. –

+0

나는 그것을 이해한다. 초기 연결을 수동으로 해제하고 실행을 기대하거나 Python이이 프로세스를 제어하도록 하시겠습니까? – jathanism

+0

흥미로운 질문입니다. openssh에 이미 마스터 코드를 실행하고 필요에 따라 잠시 동안 연결을 유지하는 코드가 있지만이 방법은 응답하지 않는 ssh 서버와 완료 될 때까지 걸리는 명령의 차이를 명확하게 나타내지는 않습니다 어떤 경우에는 저에게 중요한 일이 될 수 있습니다. –

3

귀하의 필요에 맞게 파이썬 Fabric을 찾을 수 있습니다. paramiko를 사용하는 것 같지만 execution model에 설명 된 것처럼 연결 캐싱에 대해서는 현명합니다.

기존의 장기 실행 연결은 사용하지 않습니다.

0

아직 사용할 수있는 코드를 찾을 수 없으므로 아래 버전을 작성했습니다. 나는 아직도 이런 종류의 일을하는 오픈 소스 모듈을 찾고 싶어한다.

"""Remote control a machine given an ssh client on Linux""" 

from pwd import getpwuid 
from socket import socket, AF_UNIX, SOCK_STREAM, error 
from os import getuid, write, close, unlink, read 
import select, errno 
from subprocess import PIPE, Popen, check_call 
from time import time, sleep 
from tempfile import mkstemp 

def whoami(): 
    """figure out user ID""" 
    return getpwuid(getuid()).pw_name 

def arg_escape(text): 
    """Escape things that confuse shells""" 
    return text.replace('(', '\\(').replace(')', '\\)') 

def try_open_socket(socket_name): 
    """can we talk to socket_name?""" 
    sock = socket(AF_UNIX, SOCK_STREAM) 
    try: 
     sock.connect(socket_name) 
    except error: 
     return False 
    else: 
     return True 

class ProcessTimeoutError(Exception): 
    """Indicates that a process failed to finish in alotted time""" 
    pass 

class ConnectionTimeoutError(Exception): 
    """Indicates that it was not possible to connect in alotted time""" 

class CalledProcessError(Exception): 
    """Indicates non-zero exit of a process we expected to exit cleanly""" 

def local_run_with_timeout(command, timeout=60, check=True): 
    """Run a command with a timeout after which it will be SIGKILLed. 

    If check is set raise CalledProcessError if the command fails. 

    Based on the standard library subprocess.Popen._communicate_with_poll. 
    """ 
    process = Popen(command, shell=False, stdout=PIPE, stderr=PIPE) 
    poller = select.poll() 
    start = time() 
    fd2file = {} 
    content = {} 
    for stream in [process.stdout, process.stderr]: 
     poller.register(stream.fileno(), select.POLLIN | select.POLLPRI) 
     fd2file[stream.fileno()] = stream 
     content[stream] = '' 
    vout = lambda: content[process.stdout] 
    verr = lambda: content[process.stderr] 
    while fd2file: 
     delta = time() - start 
     if delta > timeout: 
      process.kill() 
      raise ProcessTimeoutError(command, timeout, vout(), verr()) 
     try: 
      ready = poller.poll(timeout-delta) 
     except select.error, exc: 
      if exc.args[0] == errno.EINTR: 
       continue 
      raise 
     for fileno, mode in ready: 
      if mode & (select.POLLIN | select.POLLPRI): 
       data = read(fileno, 4096) 
       content[fd2file[fileno]] += data 
       if data: 
        continue 
      fd2file[fileno].close() 
      fd2file.pop(fileno) 
      poller.unregister(fileno) 
    process.wait() 
    if check and process.returncode != 0: 
     raise CalledProcessError(process.returncode, delta, command, 
           vout(), verr()) 
    return (process.returncode, vout(), verr()) 



class Endpoint: 
    """Perform operations on a remote machine""" 
    def __init__(self, host, user=None, process_timeout=10, 
       connection_timeout=20): 
     self.host = host 
     self.user = user 
     self.target = ((self.user+'@') if self.user else '') + self.host 
     self.process_timeout = process_timeout 
     self.connection_timeout = connection_timeout 

    def start(self): 
     """Start the SSH connection and return the unix socket name. 

     Requires http://sourceforge.net/projects/sshpass/ and 
     http://software.clapper.org/daemonize/ to be installed 
     """ 
     socket_name = '/tmp/' + whoami() + '-ssh-' + self.target 
     if not try_open_socket(socket_name): 
      check_call(['daemonize', '/usr/bin/ssh', 
         '-N', '-M', '-S', socket_name, self.target]) 
      start = time() 
      while not try_open_socket(socket_name): 
       delta = time() - start 
       if delta > self.connection_timeout: 
        raise ConnectionTimeoutError(delta, self.target) 
       sleep(1) 
     return socket_name 

    def call(self, command, timeout=None, check=False): 
     """Run command with timeout""" 
     if not timeout: 
      timeout = self.process_timeout 
     socket_name = self.start() 
     if type(command) == type(''): 
      command = command.split() 
     command_escape = [arg_escape(x) for x in command] 
     command_string = ' '.join(command_escape) 

     return local_run_with_timeout(
      ['/usr/bin/ssh', '-S', socket_name, 
      self.target, command_string], timeout=timeout, check=check) 
    def check_call(self, command, timeout=None): 
     """Run command with timeout""" 
     exitcode, stdout, stderr = self.call(command, timeout=timeout, 
              check=True) 
     return stdout, stderr 

    def isdir(self, directory): 
     """Return true if a directory exists""" 
     return 'directory\n' in self.call(['stat', directory])[1] 

    def write_file(self, content, filename): 
     """Store content on filename""" 
     handle, name = mkstemp() 
     try: 
      write(handle, content) 
      close(handle) 
      socket_name = self.start() 
      exitcode, stdout, stderr = local_run_with_timeout(
       ['/usr/bin/scp', '-o', 'ControlPath='+socket_name, 
       '-o', 'ControlMaster=auto', name, self.target+':'+filename], 
       timeout=self.process_timeout, check=True) 
     finally: 
      unlink(name) 

def test_check_call(): 
    """Run through some test cases. """ 
    tep = Endpoint('localhost') 
    assert 'dev' in tep.check_call('ls /')[0] 
    assert tep.call('false')[0] != 0 

def test_basic_timeout(): 
    """Ensure timeouts trigger""" 
    import pytest # "easy_install pytest" FTW 
    start = time() 
    with pytest.raises(ProcessTimeoutError): 
     Endpoint('localhost').call('sleep 5', timeout=0.2) 
    assert not (time()-start > 3) 

def test_timeout_output(): 
    """check timeouts embed stdout""" 
    import pytest # "easy_install pytest" FTW 
    with pytest.raises(ProcessTimeoutError): 
     Endpoint('localhost').call('find /', timeout=0.2) 

def test_non_zero_exit(): 
    """chek check_call raises an CalledProcessError on timeout""" 
    import pytest # "easy_install pytest" FTW 
    with pytest.raises(CalledProcessError): 
     Endpoint('localhost').check_call('false') 

def test_fs(): 
    """check filesystem operations""" 
    tep = Endpoint('localhost') 
    assert tep.isdir('/usr') 
    assert not tep.isdir(str(time())) 
    tep.write_file('hello world', '/tmp/test') 
    tep.check_call(['grep','hello','/tmp/test']) 
관련 문제