Package pyraf :: Module subproc
[hide private]
[frames] | no frames]

Source Code for Module pyraf.subproc

   1  """Run a subprocess and communicate with it via stdin, stdout, and stderr. 
   2   
   3  Requires that platform supports, eg, posix-style os.pipe and os.fork. 
   4   
   5  Subprocess class features: 
   6   
   7   - provides non-blocking stdin and stderr reads 
   8   
   9   - provides subprocess stop and continue, kill-on-deletion 
  10   
  11   - provides detection of subprocess startup failure 
  12   
  13   - Subprocess objects have nice, informative string rep (as every good object 
  14     ought). 
  15   
  16   - RecordFile class provides record-oriented IO for file-like stream objects. 
  17   
  18  $Id: subproc.py 1463 2011-06-24 22:58:30Z stsci_embray $ 
  19  """ 
  20  from __future__ import division # confidence high 
  21   
  22  __version__ = "Revision: 1.7r " 
  23   
  24  # Id: subproc.py,v 1.7r 1998 
  25  # Originally by ken manheimer, ken.manheimer@nist.gov, jan 1995. 
  26  # Major revisions by R. White, rlw@stsci.edu, 1999 Jan 23 
  27   
  28  # Prior art: Initially based python code examples demonstrating usage of pipes 
  29  #                        and subprocesses, primarily one by jose pereira. 
  30   
  31  # Implementation notes: 
  32  # - I'm not using the fcntl module to implement non-blocking file descriptors, 
  33  #       because i don't know what all in it is portable and what is not.  I'm not 
  34  #       about to provide for different platform contingencies - at that extent, the 
  35  #       effort would be better spent hacking 'expect' into python. 
  36  # - Todo? - Incorporate an error-output handler approach, where error output is 
  37  #       checked on regular IO, when a handler is defined, and passed to the 
  38  #       handler (eg for printing) immediately as it shows... 
  39  # - Detection of failed subprocess startup is a gross kludge, at present. 
  40   
  41  # - new additions (1.3, 1.4): 
  42  #  - Readbuf, taken from donn cave's iobuf addition, implements non-blocking 
  43  #       reads based solely on os.read with select, while capitalizing big-time on 
  44  #       multi-char read chunking. 
  45  #  - Subproc deletion frees up pipe file descriptors, so they're not exhausted. 
  46  # 
  47  # ken.manheimer@nist.gov 
  48   
  49   
  50  import errno, os, select, signal, string, subprocess, sys, time, types 
  51   
  52  OS_HAS_FORK = hasattr(os, 'fork') 
  53   
54 -def ascii_read(fd, sz):
55 """ Perform an os.read in a way that can handle both Python2 and Python3 56 IO. Assume we are always piping only ASCII characters (since that is all 57 we have ever done with IRAF). Either way, return the read data as a str. 58 """ 59 if sys.version_info[0] > 2: 60 return os.read(fd, sz).decode('ascii') 61 else: 62 return os.read(fd, sz)
63 64
65 -def ascii_write(fd, bufstr):
66 """ Perform an os.write in a way that can handle both Python2 and Python3 67 IO. Assume we are always piping only ASCII characters (since that is all 68 we have ever done with IRAF). Either way, write the byte data to fd. 69 """ 70 if sys.version_info[0] > 2: 71 return os.write(fd, bufstr.encode('ascii')) 72 else: 73 return os.write(fd, bufstr)
74 75 76 try:
77 - class SubprocessError(Exception):
78 pass
79 except TypeError: 80 # string based exceptions 81 SubprocessError = 'SubprocessError' 82
83 -class Subprocess:
84 """Run and communicate asynchronously with a subprocess. 85 86 control_(stderr,stdout,stdin) determines whether the corresponding 87 I/O streams of the subprocess are captured or not. The default is 88 the capture stdout/stdin but not stderr. 89 90 Provides non-blocking reads in the form of .readPendingChars and 91 .readPendingLine. 92 93 .readline will block until it gets a complete line. 94 95 .peekPendingChar does a non-blocking, non-consuming read for pending 96 output, and can be used before .readline to check non-destructively for 97 pending output. .waitForPendingChar(timeout) blocks until 98 a new character is pending, or timeout secs pass, with granularity of 99 pollPause seconds. 100 101 There are corresponding read and peekPendingErrXXX routines, to read from 102 the subprocess stderr stream.""" 103
104 - def __init__(self, cmd, 105 control_stderr=0, control_stdout=1, control_stdin=1, 106 expire_noisily=0, in_fd=0, out_fd=1, err_fd=2, 107 maxChunkSize=1024):
108 """Launch a subprocess, given command string COMMAND.""" 109 self.cmd = cmd 110 self.pid = None 111 self.expire_noisily = expire_noisily # Announce subproc destruction? 112 self.control_stderr = control_stderr 113 self.control_stdout = control_stdout 114 self.control_stdin = control_stdin 115 self.maxChunkSize = maxChunkSize 116 self.in_fd, self.out_fd, self.err_fd = in_fd, out_fd, err_fd 117 self.fork()
118
119 - def fork(self, cmd=None):
120 """Fork a subprocess with designated COMMAND (default, self.cmd).""" 121 if cmd: self.cmd = cmd 122 if type(self.cmd) == types.StringType: 123 cmd = self.cmd.split() 124 else: 125 cmd = self.cmd 126 self.cmd = " ".join(cmd) 127 # Create pipes 128 self.parentPipes = [] 129 childPipes = [] 130 if self.control_stdout: 131 pRc, cWp = os.pipe() # parent-read-child, child-write-parent 132 self.parentPipes.append(pRc) 133 childPipes.append(cWp) 134 if self.control_stdin: 135 cRp, pWc = os.pipe() # child-read-parent, parent-write-child 136 self.parentPipes.append(pWc) 137 childPipes.append(cRp) 138 if self.control_stderr: 139 pRe, cWe = os.pipe() # parent-read-error, child-write-error 140 self.parentPipes.append(pRe) 141 childPipes.append(cWe) 142 143 self.pid = os.fork() 144 145 if self.pid == 0: #### CHILD #### 146 parentErr = os.dup(self.err_fd) # Preserve handle on *parent* stderr 147 # Reopen stdin, out, err, on pipe ends: 148 if self.control_stdin: 149 os.dup2(cRp, self.in_fd) # cRp = sys.stdin 150 if self.control_stdout: 151 os.dup2(cWp, self.out_fd) # cWp = sys.stdout 152 if self.control_stderr: 153 os.dup2(cWe, self.err_fd) # cWe = sys.stderr 154 # close parent ends of pipes 155 for i in self.parentPipes: os.close(i) 156 # Ensure (within reason) stray file descriptors are closed 157 fdmax = max(256, parentErr) 158 if self.parentPipes: 159 fdmax = max(fdmax, max(self.parentPipes)) 160 rclose = range(fdmax+1) 161 excludes = [self.in_fd, self.out_fd, self.err_fd, parentErr] 162 for i in excludes+self.parentPipes: 163 rclose.remove(i) 164 os_close = os.close 165 os_error = os.error 166 for i in rclose: 167 try: os_close(i) 168 except os_error: pass 169 try: 170 os.execvp(cmd[0], cmd) 171 os._exit(1) # Shouldn't get here 172 173 except os.error, e: 174 if self.control_stderr: 175 os.dup2(parentErr, 2) # Reconnect to parent's stderr 176 sys.stderr.write("**execvp failed, '%s'**\n" % str(e)) 177 os._exit(1) 178 179 else: ### PARENT ### 180 # Connect to the child's file descriptors and close child ends of pipes 181 self.toChild = self.readbuf = self.errbuf = None 182 self.toChild_fdlist = [] 183 self.fromChild_fdlist = [] 184 if self.control_stdin: 185 self.toChild_fdlist.append(pWc) 186 self.toChild = pWc 187 if self.control_stderr: 188 self.errbuf = ReadBuf(pRe,self.maxChunkSize) 189 self.fromChild_fdlist.append(pRe) 190 if self.control_stdout: 191 self.readbuf = ReadBuf(pRc,self.maxChunkSize) 192 self.fromChild_fdlist.append(pRc) 193 # close child ends of pipes 194 for i in childPipes: os.close(i) 195 try: 196 pid, err = os.waitpid(self.pid, os.WNOHANG) 197 except os.error, (errnum, msg): 198 if errnum == 10: 199 raise SubprocessError("Subprocess '%s' failed." % self.cmd) 200 else: 201 raise SubprocessError("Subprocess '%s' failed [%d]: %s" % 202 (self.cmd, errnum, msg)) 203 if pid != self.pid: 204 # flag indicating process is still running 205 self.return_code = None 206 elif err == 0: 207 # Child has exited already but not in error, so we won't 208 # shut down the pipes or say anything more at this point. 209 # Set return_code so we don't call waitpid again. 210 self.return_code = 0 211 else: 212 # Process exited with an error. Clean up immediately and 213 # raise an exception. 214 self._cleanUp(err) 215 sig = err & 0xff 216 rc = (err & 0xff00) >> 8 217 self.return_code = rc 218 if sig: 219 raise SubprocessError( 220 "Child process '%s' killed by signal %d with return code %d" 221 % (self.cmd, sig, rc)) 222 else: 223 raise SubprocessError( 224 "Child process '%s' exited with return code %d" % 225 (self.cmd, rc))
226 227 ### Write input to subprocess ### 228
229 - def write(self, strval, timeout=10, printtime=2):
230 """Write a STRING to the subprocess. Times out (and raises an 231 exception) if the process is not ready in timeout seconds. 232 Prints a message indicating that it is waiting every 233 printtime seconds.""" 234 235 if not self.pid: 236 raise SubprocessError("No child process for '%s'" % self.cmd) 237 if not self.control_stdin: 238 raise SubprocessError( 239 "Haven't grabbed subprocess input stream for %s." % self) 240 241 # See if subprocess is ready for write. 242 # Add a wait in case subprocess is still starting up or is 243 # otherwise temporarily unable to respond. 244 # Loop with message if wait takes longer than that, until wait 245 # exceeds the total timeout. 246 247 if timeout < 0: timeout = 0 248 if printtime>timeout: printtime = timeout 249 totalwait = 0 250 try: 251 while totalwait <= timeout: 252 ## if totalwait: print "waiting for subprocess..." 253 totalwait = totalwait + printtime 254 if select.select([],self.toChild_fdlist,[],printtime)[1]: 255 if ascii_write(self.toChild, strval) != len(strval): 256 raise SubprocessError("Write error to %s" % self) 257 return # ===> 258 raise SubprocessError("Write to %s blocked" % self) 259 except select.error, e: 260 raise SubprocessError( 261 "Select error for %s: file descriptors %s\n%s" % 262 (self,self.toChild_fdlist,str(e)))
263
264 - def writeline(self, line=''):
265 """Write STRING, with added newline termination, to subprocess.""" 266 self.write(line + '\n')
267
268 - def closeOutput(self):
269 """Close write pipe to subprocess (signals EOF to subprocess)""" 270 if not self.control_stdin: 271 raise SubprocessError( 272 "Haven't grabbed subprocess input stream for %s." % self) 273 os.close(self.toChild) 274 self.parentPipes.remove(self.toChild) 275 self.toChild = None 276 self.toChild_fdlist = [] 277 self.control_stdin = 0
278 279 ### Get output from subprocess ### 280
281 - def peekPendingChar(self):
282 """Return, but (effectively) do not consume a single pending output 283 char, or return null string if none pending.""" 284 285 if not self.control_stdout: 286 raise SubprocessError( 287 "Haven't grabbed subprocess output stream for %s." % self) 288 return self.readbuf.peekPendingChar()
289
290 - def peekPendingErrChar(self):
291 """Return, but (effectively) do not consume a single pending output 292 error char, or return null string if none pending.""" 293 294 if not self.control_stderr: 295 raise SubprocessError( 296 "Haven't grabbed subprocess error stream for %s." % self) 297 return self.errbuf.peekPendingChar()
298
299 - def waitForPendingChar(self, timeout, pollPause=.1):
300 """Block max TIMEOUT secs until we peek a pending char, returning the 301 char, or '' if none encountered. 302 pollPause is included for backward compatibility, but does nothing.""" 303 304 if not self.control_stdout: 305 raise SubprocessError( 306 "Haven't grabbed subprocess output stream for %s." % self) 307 return self.readbuf.peekPendingChar(timeout)
308
309 - def waitForPendingErrChar(self, timeout, pollPause=.1):
310 """Block max TIMEOUT secs until we peek a pending error char, returning the 311 char, or '' if none encountered. 312 pollPause is included for backward compatibility, but does nothing.""" 313 314 if not self.control_stderr: 315 raise SubprocessError( 316 "Haven't grabbed subprocess error stream for %s." % self) 317 return self.errbuf.peekPendingChar(timeout)
318
319 - def read(self, n=None):
320 """Read N chars (blocking), or all pending if no N specified.""" 321 if not self.control_stdout: 322 raise SubprocessError( 323 "Haven't grabbed subprocess output stream for %s." % self) 324 if n is None: 325 return self.readPendingChars() 326 else: 327 return self.readbuf.read(n)
328
329 - def readErr(self, n=None):
330 """Read N chars from stderr (blocking), or all pending if no N specified.""" 331 if not self.control_stderr: 332 raise SubprocessError( 333 "Haven't grabbed subprocess error stream for %s." % self) 334 if n is None: 335 return self.readPendingErrChars() 336 else: 337 return self.errbuf.read(n)
338
339 - def readPendingChars(self, max=None):
340 """Read all currently pending subprocess output as a single string.""" 341 if not self.control_stdout: 342 raise SubprocessError( 343 "Haven't grabbed subprocess output stream for %s." % self) 344 return self.readbuf.readPendingChars(max)
345
346 - def readPendingErrChars(self, max=None):
347 """Read all currently pending subprocess error output as a single 348 string.""" 349 if not self.control_stderr: 350 raise SubprocessError( 351 "Haven't grabbed subprocess error stream for %s." % self) 352 return self.errbuf.readPendingChars(max)
353
354 - def readPendingLine(self):
355 """Read currently pending subprocess output, up to a complete line 356 (newline inclusive).""" 357 if not self.control_stdout: 358 raise SubprocessError( 359 "Haven't grabbed subprocess output stream for %s." % self) 360 return self.readbuf.readPendingLine()
361
362 - def readPendingErrLine(self):
363 """Read currently pending subprocess error output, up to a complete 364 line (newline inclusive).""" 365 if not self.control_stderr: 366 raise SubprocessError( 367 "Haven't grabbed subprocess error stream for %s." % self) 368 return self.errbuf.readPendingLine()
369
370 - def readline(self):
371 """Return next complete line of subprocess output, blocking until 372 then.""" 373 if not self.control_stdout: 374 raise SubprocessError( 375 "Haven't grabbed subprocess output stream for %s." % self) 376 return self.readbuf.readline()
377
378 - def readlineErr(self):
379 """Return next complete line of subprocess error output, blocking until 380 then.""" 381 if not self.control_stderr: 382 raise SubprocessError( 383 "Haven't grabbed subprocess error stream for %s." % self) 384 return self.errbuf.readline()
385 386 ### Subprocess Control ### 387
388 - def active(self, checkpipes=1):
389 """True if subprocess is alive and kicking. 390 391 If checkpipes is true, also checks the pipes to the process to make 392 sure they are still OK. 393 """ 394 status = self.status(boolean=1) 395 if status and checkpipes: 396 try: 397 readable, writable, errors = select.select( 398 self.fromChild_fdlist, self.toChild_fdlist, [], 0) 399 except select.error: 400 status = 0 401 return status
402
403 - def status(self, boolean=0):
404 """Return string indicating whether process is alive or dead.""" 405 active = 0 406 if not self.cmd: 407 status = 'sans command' 408 elif not self.pid: 409 status = 'sans process' 410 elif not self.cont(): 411 status = "(unresponding) '%s'" % self.cmd 412 else: 413 status = "'%s'" % self.cmd 414 active = 1 415 if boolean: 416 return active 417 else: 418 return status
419
420 - def wait(self,timeout=0):
421 """Wait timeout seconds for process to die. Returns true if process 422 is dead (and was reaped), false if alive.""" 423 424 if self.return_code is not None: 425 # process completed during startup, so just clean up now 426 self._cleanUp(self.return_code << 8) 427 return 1 428 429 # Try a few times to reap the process with waitpid: 430 totalwait = timeout 431 deltawait = timeout/1000.0 432 if deltawait < 0.01: deltawait = 0.01 433 while totalwait >= 0: 434 pid, err = os.waitpid(self.pid, os.WNOHANG) 435 if pid: 436 self._cleanUp(err) 437 return 1 438 time.sleep(deltawait) 439 totalwait = totalwait - deltawait 440 return 0
441
442 - def _cleanUp(self, err):
443 """Cleanup after process is done""" 444 if not self.pid: return 445 if self.expire_noisily: 446 self._noisy_print(err) 447 self._closePipes() 448 self.pid = None 449 self.return_code = (err & 0xff00) >> 8
450
451 - def _closePipes(self):
452 """Close all pipes from parent to child""" 453 for p in self.parentPipes: 454 try: 455 os.close(p) 456 except os.error: 457 pass 458 self.parentPipes = [] 459 self.toChild = None 460 self.toChild_fdlist = [] 461 self.fromChild_fdlist = [] 462 self.control_stdin = 0 463 self.control_stdout = 0 464 self.control_stderr = 0
465
466 - def _noisy_print(self,err):
467 sig = err & 0xff 468 rc = (err & 0xff00) >> 8 469 if sig == 0: 470 sigval = '' 471 elif sig == signal.SIGTERM: 472 sigval = 'TERMinated ' 473 elif sig == signal.SIGKILL: 474 sigval = 'KILLed ' 475 else: 476 sigval = 'Signal %d ' % sig 477 if rc: 478 retval = 'Status %d ' % rc 479 else: 480 retval = '' 481 sys.stderr.write("\n(%ssubproc %d '%s' %s/ %s)\n" % 482 (sigval, self.pid, self.cmd, retval, 483 hex(id(self))[2:])) 484 sys.stderr.flush()
485
486 - def stop(self, verbose=1):
487 """Signal subprocess with STOP (17), returning 'stopped' if ok, or 0 488 otherwise.""" 489 try: 490 os.kill(self.pid, signal.SIGSTOP) 491 except os.error: 492 if verbose: 493 print "Stop failed for '%s' - '%s'" % (self.cmd, sys.exc_value) 494 return 0 495 if verbose: print "Stopped '%s'" % self.cmd 496 return 'stopped'
497
498 - def cont(self, verbose=0):
499 """Signal subprocess with CONT (19), returning 'continued' if ok, or 0 500 otherwise.""" 501 try: 502 os.kill(self.pid, signal.SIGCONT) 503 except os.error: 504 if verbose: 505 print ("Continue failed for '%s' - '%s'" % 506 (self.cmd, sys.exc_value)) 507 return 0 508 if verbose: print "Continued '%s'" % self.cmd 509 return 'continued'
510
511 - def die(self):
512 """Send process PID signal SIG (default 9, 'kill'), returning once 513 it is successfully reaped. 514 515 SubprocessError is raised if process is not successfully killed.""" 516 517 # close pipes to child 518 self._closePipes() 519 if not self.pid: 520 raise SubprocessError("No process") 521 elif not self.cont(): 522 raise SubprocessError("Can't signal subproc %s" % self) 523 524 # Try sending first a TERM and then a KILL signal. 525 sigs = [('TERM', signal.SIGTERM), ('KILL', signal.SIGKILL)] 526 for sig in sigs: 527 try: 528 os.kill(self.pid, sig[1]) 529 except os.error: 530 # keep trying 531 pass 532 # done if we can reap the process; else try next signal 533 if self.wait(0.5): return # ===> 534 # Only got here if subprocess is not gone: 535 raise SubprocessError( 536 "Failed kill of subproc %d, '%s', with signals %s" % 537 (self.pid, self.cmd, map(lambda(x): x[0], sigs)))
538
539 - def __del__(self):
540 """Terminate the subprocess""" 541 if self.pid and not self.wait(0): self.die()
542
543 - def __repr__(self):
544 status = self.status() 545 return '<Subprocess ' + status + ', at ' + hex(id(self))[2:] + '>'
546 547 ############################################################################# 548 ##### Non-blocking read operations ##### 549 ############################################################################# 550
551 -class ReadBuf:
552 """Output buffer for non-blocking reads on selectable files like pipes and 553 sockets. Init with a file descriptor for the file.""" 554
555 - def __init__(self, fd, maxChunkSize=1024):
556 """Encapsulate file descriptor FD, with optional MAX_READ_CHUNK_SIZE 557 (default 1024).""" 558 559 if fd < 0: 560 raise ValueError("File descriptor fd is negative") 561 self.fd = fd 562 self.eof = 0 # May be set with stuff still in .buf 563 self.buf = '' 564 self.chunkSize = maxChunkSize # Biggest read chunk, default 1024.
565
566 - def fileno(self):
567 return self.fd
568
569 - def peekPendingChar(self,timeout=0):
570 """Return, but don't consume, first character of unconsumed output from 571 file, or empty string if none. If timeout is set, waits maximum of 572 timeout seconds before returning. Default is timeout=0 (do not wait 573 at all.)""" 574 575 if self.buf: return self.buf[0] # ===> 576 577 if self.eof: return '' # ===> 578 579 try: 580 sel = select.select([self.fd], [], [self.fd], timeout) 581 except select.error: 582 # select error occurs if self.fd been closed 583 # treat like EOF 584 self.eof = 1 585 return '' 586 if sel[0]: 587 self.buf = ascii_read(self.fd, self.chunkSize) 588 if self.buf: 589 return self.buf[0] # ===> 590 else: 591 self.eof = 1 592 return '' # ===> 593 else: return '' # ===>
594
595 - def readPendingChar(self):
596 """Consume first character of unconsumed output from file, or empty 597 string if none.""" 598 599 if self.buf: 600 got, self.buf = self.buf[0], self.buf[1:] 601 return got # ===> 602 603 if self.eof: return '' # ===> 604 605 try: 606 sel = select.select([self.fd], [], [self.fd], 0) 607 except select.error: 608 # select error occurs if self.fd been closed 609 # treat like EOF 610 self.eof = 1 611 return '' 612 if sel[0]: 613 self.buf = ascii_read(self.fd, self.chunkSize) 614 if self.buf: 615 got, self.buf = self.buf[0], self.buf[1:] 616 return got 617 else: 618 self.eof = 1 619 return '' # ===> 620 else: return '' # ===>
621
622 - def readPendingChars(self, max=None):
623 """Consume uncomsumed output from FILE, or empty string if nothing 624 pending.""" 625 626 if (max is not None) and (max <= 0): return '' # ===> 627 628 if self.buf: 629 if max and (len(self.buf) > max): 630 got, self.buf = self.buf[0:max], self.buf[max:] 631 else: 632 got, self.buf = self.buf, '' 633 return got # ===> 634 635 if self.eof: return '' # ===> 636 637 try: 638 sel = select.select([self.fd], [], [self.fd], 0) 639 except select.error: 640 # select error occurs if self.fd been closed 641 # treat like EOF 642 self.eof = 1 643 return '' 644 if sel[0]: 645 got = ascii_read(self.fd, self.chunkSize) 646 if got: 647 if max and (len(got) > max): 648 self.buf = got[max:] 649 return got[:max] # ===> 650 else: 651 return got # ===> 652 else: 653 self.eof = 1 654 return '' # ===> 655 else: return '' # ===>
656
657 - def readPendingLine(self, block=0):
658 """Return pending output from FILE, up to first newline (inclusive). 659 660 Does not block unless optional arg BLOCK is true. This may return 661 a partial line if the input line is longer than chunkSize (default 662 1024) characters.""" 663 664 if self.buf: 665 to = self.buf.find('\n') 666 if to != -1: 667 got, self.buf = self.buf[:to+1], self.buf[to+1:] 668 return got # ===> 669 got, self.buf = self.buf, '' 670 elif self.eof: 671 return '' # ===> 672 else: 673 got = '' 674 675 # 'got' contains the (former) contents of the buffer, but it 676 # doesn't include a newline. 677 fdlist = [self.fd] 678 if block: 679 # wait indefinitely for input 680 waittime = None 681 else: 682 # don't wait at all 683 waittime = 0 684 while 1: # (we'll only loop if block set) 685 try: 686 sel = select.select(fdlist, [], fdlist, waittime) 687 except select.error: 688 # select error occurs if self.fd has been closed 689 # treat like EOF 690 self.eof = 1 691 return got 692 if sel[0]: 693 newgot = ascii_read(self.fd, self.chunkSize) 694 if newgot: 695 got = got + newgot 696 to = got.find('\n') 697 if to != -1: 698 got, self.buf = got[:to+1], got[to+1:] 699 return got # ===> 700 else: 701 # return partial line on EOF 702 self.eof = 1 703 return got # ===> 704 if not block: 705 return got # ===>
706 # otherwise - no newline, blocking requested, no eof - loop. # ==^ 707
708 - def readline(self):
709 """Return next output line from file, blocking until it is received.""" 710 711 return self.readPendingLine(1) # ===>
712
713 - def read(self, nchars):
714 """Read nchars from input, blocking until they are available. 715 Returns a shorter string on EOF.""" 716 717 if nchars <= 0: return '' 718 if self.buf: 719 if len(self.buf) >= nchars: 720 got, self.buf = self.buf[:nchars], self.buf[nchars:] 721 return got # ===> 722 got, self.buf = self.buf, '' 723 elif self.eof: 724 return '' # ===> 725 else: 726 got = '' 727 728 fdlist = [self.fd] 729 while 1: 730 try: 731 sel = select.select(fdlist, [], fdlist) 732 except select.error: 733 # select error occurs if self.fd has been closed 734 # treat like EOF 735 self.eof = 1 736 return got 737 if sel[0]: 738 newgot = ascii_read(self.fd, self.chunkSize) 739 if newgot: 740 got = got + newgot 741 if len(got) >= nchars: 742 got, self.buf = got[:nchars], got[nchars:] 743 return got # ===> 744 else: 745 self.eof = 1 746 return got 747 else: 748 print 'Select returned without input?'
749 750 751 ############################################################################# 752 ##### Encapsulated reading and writing ##### 753 ############################################################################# 754 # Encapsulate messages so the end can be unambiguously identified, even 755 # when they contain multiple, possibly empty lines. 756
757 -class RecordFile:
758 """Encapsulate stream object for record-oriented IO. 759 760 Particularly useful when dealing with non-line oriented communications 761 over pipes, eg with subprocesses.""" 762 763 # Message is written preceded by a line containing the message length. 764
765 - def __init__(self, f):
766 self.file = f
767
768 - def write_record(self, s):
769 "Write so self.read knows exactly how much to read." 770 f = self.__dict__['file'] 771 f.write("%s\n%s" % (len(s), s)) 772 if hasattr(f, 'flush'): 773 f.flush()
774
775 - def read_record(self):
776 "Read and reconstruct message as prepared by self.write." 777 f = self.__dict__['file'] 778 line = f.readline()[:-1] 779 if line: 780 try: 781 l = int(line) 782 except ValueError: 783 raise IOError("corrupt %s file structure" 784 % self.__class__.__name__) 785 return f.read(l) 786 else: 787 # EOF. 788 return ''
789
790 - def __getattr__(self, attr):
791 """Implement characteristic file object attributes.""" 792 f = self.__dict__['file'] 793 if hasattr(f, attr): 794 return getattr(f, attr) 795 else: 796 raise AttributeError(attr)
797
798 - def __repr__(self):
799 return "<%s of %s at %s>" % (self.__class__.__name__, 800 self.__dict__['file'], 801 hex(id(self))[2:])
802
803 -def record_trial(s):
804 """Exercise encapsulated write/read with an arbitrary string. 805 806 Raise IOError if the string gets distorted through transmission!""" 807 from StringIO import StringIO 808 sf = StringIO() 809 c = RecordFile(sf) 810 c.write(s) 811 c.seek(0) 812 r = c.read() 813 show = " start:\t %s\n end:\t %s\n" % (`s`, `r`) 814 if r != s: 815 raise IOError("String distorted:\n%s" % show)
816 817 ############################################################################# 818 ##### An example subprocess interfaces ##### 819 ############################################################################# 820
821 -class Ph:
822 """Convenient interface to CCSO 'ph' nameserver subprocess. 823 824 .query('string...') method takes a query and returns a list of dicts, each 825 of which represents one entry.""" 826 827 # Note that i made this a class that handles a subprocess object, rather 828 # than one that inherits from it. I didn't see any functional 829 # disadvantages, and didn't think that full support of the entire 830 # Subprocess functionality was in any way suitable for interaction with 831 # this specialized interface. ? klm 13-Jan-1995 832
833 - def __init__(self):
834 try: 835 self.proc = Subprocess("ph", expire_noisily=1) 836 except: 837 raise SubprocessError("failure starting ph: %s" % 838 str(sys.exc_value))
839
840 - def query(self, q):
841 """Send a query and return a list of dicts for responses. 842 843 Raise a ValueError if ph responds with an error.""" 844 845 self.clear() 846 847 self.proc.writeline('query ' + q) 848 got = []; it = {} 849 while 1: 850 response = self.getreply() # Should get null on new prompt. 851 errs = self.proc.readPendingErrChars() 852 if errs: 853 sys.stderr.write(errs) 854 if it: 855 got.append(it) 856 it = {} 857 if not response: 858 return got # ===> 859 elif type(response) == types.StringType: 860 raise ValueError("ph failed match: '%s'" % response) 861 for line in response: 862 # convert to a dict: 863 line = line.split(':') 864 it[line[0].strip()] = ' '.join(line[1:]).strip()
865
866 - def getreply(self):
867 """Consume next response from ph, returning list of lines or string 868 err.""" 869 # Key on first char: (First line may lack newline.) 870 # - dash discard line 871 # - 'ph> ' conclusion of response 872 # - number error message 873 # - whitespace beginning of next response 874 875 nextChar = self.proc.waitForPendingChar(60) 876 if not nextChar: 877 raise SubprocessError('ph subprocess not responding') 878 elif nextChar == '-': 879 # dashed line - discard it, and continue reading: 880 self.proc.readline() 881 return self.getreply() # ===> 882 elif nextChar == 'p': 883 # 'ph> ' prompt - don't think we should hit this, but what the hay: 884 return '' # ===> 885 elif nextChar in '0123456789': 886 # Error notice - we're currently assuming single line errors: 887 return self.proc.readline()[:-1] # ===> 888 elif nextChar in ' \t': 889 # Get content, up to next dashed line: 890 got = [] 891 while nextChar != '-' and nextChar != '': 892 got.append(self.proc.readline()[:-1]) 893 nextChar = self.proc.peekPendingChar() 894 return got
895 - def __repr__(self):
896 return "<Ph instance, %s at %s>\n" % (self.proc.status(), 897 hex(id(self))[2:])
898 - def clear(self):
899 """Clear-out initial preface or residual subproc input and output.""" 900 pause = .5; maxIter = 10 # 5 seconds to clear 901 iterations = 0 902 got = '' 903 self.proc.write('') 904 while iterations < maxIter: 905 got = got + self.proc.readPendingChars() 906 # Strip out all but the last incomplete line: 907 got = got.split('\n')[-1] 908 if got == 'ph> ': return # Ok. ===> 909 time.sleep(pause) 910 raise SubprocessError('ph not responding within %s secs' % 911 pause * maxIter)
912 913 ############################################################################# 914 ##### Run a subprocess with Python I/O redirection ##### 915 ############################################################################# 916 # This runs a command rather like os.system does, but it redirects 917 # the I/O using the current Python sys.stdin, sys.stdout, sys.stderr 918 # filehandles. 919
920 -def systemRedir(cmd):
921 """Run the command as a subprocess with Python I/O redirection in effect 922 923 cmd can be a string or a list of strings. 924 """ 925 #XXX should trap errors and return status? 926 process = RedirProcess(cmd) 927 try: 928 process.run() 929 except KeyboardInterrupt: 930 process.die() 931 sys.stderr.write("\nKilled process `%s'\n" % process.cmd) 932 sys.stderr.flush() 933 raise 934 return process.return_code
935 936 # run subprocess with Python I/O redirection in a subshell 937
938 -def subshellRedir(cmd, shell=None):
939 """Run the command in a subshell with Python I/O redirection in effect 940 cmd should be a simple string with the command and its arguments. 941 shell is the shell to use -- default is value of SHELL environment 942 variable or /bin/sh if SHELL is not defined. 943 """ 944 if OS_HAS_FORK: 945 shell = shell or os.environ.get('SHELL') or '/bin/sh' 946 return systemRedir((shell, "-c", cmd)) 947 else: 948 return _wrapSubprocess(cmd)
949
950 -def _wrapSubprocess(cmdline):
951 """ This function is set up mostly for use on Windows (w/out Cygwin) 952 since that is the only mode it is currently expected to be used in. """ 953 # subprocess.call should work for most commands 954 return subprocess.call(cmdline, shell=True) # this waits
955 956
957 -class RedirProcess(Subprocess):
958 959 """Run a system command with I/O redirected using sys.stdin/out/err""" 960
961 - def __init__(self, cmd, expire_noisily=0):
962 # grab only streams for currently redirected IO 963 doIn = doOut = doErr = 1 964 if sys.stdin == sys.__stdin__: doIn = 0 965 if sys.stdout == sys.__stdout__: doOut = 0 966 if sys.stderr == sys.__stderr__: doErr = 0 967 968 # even if none are redirected, run it as subprocess 969 # so it can be interrupted with ^C 970 971 # initialize the process 972 973 Subprocess.__init__(self, cmd, expire_noisily=expire_noisily, 974 control_stderr=doErr, control_stdout=doOut, control_stdin=doIn)
975
976 - def run(self, timeout=5):
977 """Copy the subprocess I/O to the Python stdin/out/err filehandles""" 978 979 if not self.pid: return 980 981 doIn = self.control_stdin 982 doOut = self.control_stdout 983 doErr = self.control_stderr 984 while (doIn or doOut or doErr): 985 try: 986 readable, writable, errors = select.select(self.fromChild_fdlist, 987 self.toChild_fdlist, [], timeout) 988 except select.error, e: 989 # select error occurs if a file descriptor has been closed 990 # this should not happen -- raise an exception 991 raise SubprocessError( 992 "Select error for %s: file descriptors %s\n%s" % 993 (self,self.toChild_fdlist+self.fromChild_fdlist,str(e))) 994 if readable: 995 # stderr is first in fromChild_fdlist (if present) 996 if doErr and (self.fromChild_fdlist[0] in readable): 997 # stderr 998 s = self.readPendingErrChars() 999 if s: 1000 sys.stderr.write(s) 1001 sys.stderr.flush() 1002 else: 1003 # EOF 1004 os.close(self.fromChild_fdlist[0]) 1005 self.parentPipes.remove(self.fromChild_fdlist[0]) 1006 del self.fromChild_fdlist[0] 1007 doErr = 0 1008 self.control_stderr = 0 1009 else: 1010 # stdout 1011 s = self.readPendingChars() 1012 if s: 1013 sys.stdout.write(s) 1014 sys.stdout.flush() 1015 else: 1016 # EOF 1017 if doErr: 1018 os.close(self.fromChild_fdlist[1]) 1019 self.parentPipes.remove(self.fromChild_fdlist[1]) 1020 del self.fromChild_fdlist[1] 1021 else: 1022 os.close(self.fromChild_fdlist[0]) 1023 self.parentPipes.remove(self.fromChild_fdlist[0]) 1024 del self.fromChild_fdlist[0] 1025 doOut = 0 1026 self.control_stdout = 0 1027 elif writable: 1028 # stdin 1029 try: 1030 s = sys.stdin.read(self.maxChunkSize) 1031 if s: 1032 try: 1033 self.write(s) 1034 except IOError, (errnum, msg): 1035 # broken pipe may be OK 1036 # just call it an EOF and see what happens 1037 if errnum == errno.EPIPE: 1038 raise EOFError 1039 else: 1040 raise (errnum, msg) 1041 else: 1042 # EOF if readline returns null 1043 os.close(self.toChild) 1044 self.parentPipes.remove(self.toChild) 1045 del self.toChild_fdlist[0] 1046 self.toChild = None 1047 doIn = 0 1048 self.control_stdin = 0 1049 except EOFError: 1050 os.close(self.toChild) 1051 self.parentPipes.remove(self.toChild) 1052 del self.toChild_fdlist[0] 1053 self.toChild = None 1054 doIn = 0 1055 self.control_stdin = 0 1056 else: 1057 # timeout, just continue 1058 pass 1059 # Finished with the IO under our control, but task could 1060 # still be running. Wait for it to finish. 1061 while not self.wait(5): 1062 # see whether something bad happened 1063 if not self.active(): 1064 sys.stderr.write(self.status()+'\n') 1065 sys.stderr.flush() 1066 self.die() 1067 break
1068 1069 1070 ############################################################################# 1071 ##### Test ##### 1072 ############################################################################# 1073
1074 -def test():
1075 print "\tOpening subprocess:" 1076 p = Subprocess('cat', expire_noisily=1) # set to expire noisily... 1077 print p 1078 print "\tOpening bogus subprocess, should fail:" 1079 try: 1080 # grab stderr just to make sure the error message still appears 1081 b = Subprocess('/', 1, expire_noisily=1) 1082 print "\tOops! Null-named subprocess startup *succeeded*?!?" 1083 except SubprocessError: 1084 print "\t...yep, it failed." 1085 print '\tWrite, then read, two newline-terminated lines, using readline:' 1086 p.write('first full line written\n'); p.write('second.\n') 1087 print `p.readline()` 1088 print `p.readline()` 1089 print '\tThree lines, last sans newline, read using combination:' 1090 p.write('first\n'); p.write('second\n'); p.write('third, (no cr)') 1091 print '\tFirst line via readline:' 1092 print `p.readline()` 1093 print '\tRest via readPendingChars:' 1094 print p.readPendingChars() 1095 print "\tStopping then continuing subprocess (verbose):" 1096 if not p.stop(1): # verbose stop 1097 print '\t** Stop seems to have failed!' 1098 else: 1099 print '\tWriting line while subprocess is paused...' 1100 p.write('written while subprocess paused\n') 1101 print '\tNonblocking read of paused subprocess (should be empty):' 1102 print p.readPendingChars() 1103 print '\tContinuing subprocess (verbose):' 1104 if not p.cont(1): # verbose continue 1105 print '\t** Continue seems to have failed! Probably lost subproc...' 1106 return p 1107 else: 1108 print '\tReading accumulated line, blocking read:' 1109 print p.readline() 1110 print "\tDeleting subproc, which was set to die noisily:" 1111 del p 1112 print "\tDone." 1113 return None
1114 1115 if __name__ == "__main__": 1116 test() 1117