Package pyraf :: Module irafexecute
[hide private]
[frames] | no frames]

Source Code for Module pyraf.irafexecute

  1  """irafexecute.py: Functions to execute IRAF connected subprocesses 
  2   
  3  $Id: irafexecute.py 1463 2011-06-24 22:58:30Z stsci_embray $ 
  4  """ 
  5  from __future__ import division # confidence high 
  6   
  7  import os, re, signal, string, struct, sys, time, types, numpy, cStringIO 
  8  from stsci.tools import irafutils 
  9  from stsci.tools.irafglobals import IrafTask 
 10  import subproc, filecache, wutil 
 11  import iraf, gki, irafukey 
 12  import irafgwcs 
 13   
 14  #stdgraph = None 
 15   
 16  IPC_PREFIX = numpy.array([01120],numpy.int16).tostring() 
 17   
 18  # weirdo protocol to get output from task back to subprocess 
 19  # definitions from cl/task.h and lib/clio.h 
 20  IPCOUT = "IPC$IPCIO-OUT" 
 21  IPCDONEMSG = "# IPC$IPCIO-FINISHED\n" 
 22   
 23  # set flag indicating big endian or little endian byte order 
 24  try: 
 25      # sys.byteorder was added in Python 2.0 
 26      isBigEndian = (sys.byteorder == "big") 
 27  except AttributeError: 
 28      i = 1 
 29      tup = struct.unpack('hh',struct.pack('=i',i)) 
 30      if tup[1] == 1: 
 31          isBigEndian = 1 
 32      else: 
 33          isBigEndian = 0 
 34      del i, tup 
 35   
 36  # Create an instance of the stdimage kernel 
 37  stdimagekernel = gki.GkiController() 
 38   
39 -class IrafProcessError(Exception):
40 - def __init__(self, msg, errno=-1, errmsg="", errtask=""):
41 Exception.__init__(self, msg) 42 self.errno = errno 43 self.errmsg = errmsg 44 self.errtask = errtask
45
46 -def _getExecutable(arg):
47 """Get executable pathname. 48 49 Arg may be a string with the path, an IrafProcess, an IrafTask, 50 or a string with the name of an IrafTask. 51 """ 52 if isinstance(arg, IrafProcess): 53 return arg.executable 54 elif isinstance(arg, IrafTask): 55 return arg.getFullpath() 56 elif isinstance(arg, types.StringType): 57 if os.path.exists(arg): 58 return arg 59 task = iraf.getTask(arg, found=1) 60 if task is not None: 61 return task.getFullpath() 62 raise IrafProcessError("Cannot find task or executable %s" % arg)
63
64 -class _ProcessProxy(filecache.FileCache):
65 66 """Proxy for a single process that restarts it if needed 67 68 Restart is triggered by change of executable on disk. 69 """ 70
71 - def __init__(self, process):
72 self.process = process 73 self.envdict = {} 74 # pass executable filename to FileCache 75 filecache.FileCache.__init__(self, process.executable)
76
77 - def newValue(self):
78 # no action required at proxy creation 79 pass
80
81 - def updateValue(self):
82 """Called when executable changes to start a new version""" 83 self.process.terminate() 84 # seems to be necessary to delete this process before starting 85 # next one to avoid some weird problems... 86 del self.process 87 self.process = IrafProcess(self.filename) 88 self.process.initialize(self.envdict)
89
90 - def getProcess(self, envdict):
91 """Get the process; create & initialize using envdict if needed""" 92 self.envdict = envdict 93 return self.get()
94
95 - def getValue(self):
96 return self.process
97 98
99 -class _ProcessCache:
100 101 """Cache of active processes indexed by executable path""" 102 103 DFT_LIMIT = 8 104
105 - def __init__(self, limit=DFT_LIMIT):
106 self._data = {} # dictionary with active process proxies 107 self._pcount = 0 # total number of processes started 108 self._plimit = limit # number of active processes allowed 109 self._locked = {} # processes locked into cache
110
111 - def error(self, msg, level=0):
112 """Write an error message if Verbose is set""" 113 if iraf.Verbose>level: 114 sys.stderr.write(msg) 115 sys.stderr.flush()
116
117 - def get(self, task, envdict):
118 """Get process for given task. Create a new one if needed.""" 119 executable = _getExecutable(task) 120 if self._data.has_key(executable): 121 # use existing process 122 rank, proxy = self._data[executable] 123 process = proxy.getProcess(envdict) 124 if not process.running: 125 if process.isAlive(): 126 return process 127 # Hmm, looks like there is something wrong with this process 128 # Kill it and start a new one 129 #XXX Eventually can make this a level 0 message 130 #XXX Leave as level -1 for now so we see if bug is gone 131 self.error("Warning: process %s is bad, restarting it\n" % 132 (executable,), level=-1) 133 self.kill(executable, verbose=0) 134 # Whoops, process is already active... 135 # This could happen if one task in an executable tries to 136 # execute another task in the same executable. Don't know 137 # if IRAF allows this, but we can handle it by just creating 138 # a new process running the same executable. 139 # create and initialize a new process 140 # this will be added to cache after successful task completion 141 process = IrafProcess(executable) 142 process.initialize(envdict) 143 return process
144
145 - def add(self, process):
146 """Add process to cache or update its rank if already there""" 147 self._pcount = self._pcount+1 148 executable = process.executable 149 if self._data.has_key(executable): 150 # don't replace current cached process 151 rank, proxy = self._data[executable] 152 oldprocess = proxy.process 153 if oldprocess != process: 154 # argument is a duplicate process, terminate this copy 155 process.terminate() 156 elif self._plimit <= len(self._locked): 157 # cache is null or all processes are locked 158 process.terminate() 159 return 160 else: 161 # new process -- make a proxy 162 proxy = _ProcessProxy(process) 163 if len(self._data) >= self._plimit: 164 # delete the oldest entry to make room 165 self._deleteOldest() 166 self._data[executable] = (self._pcount, proxy)
167
168 - def _deleteOldest(self):
169 """Delete oldest unlocked process from the cache 170 171 If all processes are locked, delete oldest locked process. 172 """ 173 # each entry contains rank (to sort and find oldest) and process 174 values = self._data.values() 175 values.sort() 176 if len(self._locked) < len(self._data): 177 # find and delete oldest unlocked process 178 for rank, proxy in values: 179 process = proxy.process 180 executable = process.executable 181 if not (self._locked.has_key(executable) or process.running): 182 # terminate it 183 self.terminate(executable) 184 return 185 # no unlocked processes or all unlocked are running 186 # delete oldest locked process 187 rank, proxy = values[0] 188 executable = proxy.process.executable 189 self.terminate(executable)
190
191 - def setenv(self, msg):
192 """Update process value of environment variable by sending msg""" 193 for rank, proxy in self._data.values(): 194 # just save messages in a list, they all get sent at 195 # once when a task is run 196 proxy.process.appendEnv(msg)
197
198 - def setSize(self, limit):
199 """Set number of processes allowed in cache""" 200 self._plimit = limit 201 if self._plimit <= 0: 202 self._locked = {} 203 self.flush() 204 else: 205 while len(self._data) > self._plimit: 206 self._deleteOldest()
207
208 - def resetSize(self):
209 """Set the number of processes allowed in cache back to the default""" 210 self.setSize(_ProcessCache.DFT_LIMIT)
211
212 - def lock(self, *args):
213 """Lock the specified tasks into the cache 214 215 Takes task names (strings) as arguments. 216 """ 217 # don't bother if cache is disabled or full 218 if self._plimit <= len(self._locked): 219 return 220 for taskname in args: 221 task = iraf.getTask(taskname, found=1) 222 if task is None: 223 print "No such task `%s'" % taskname 224 elif task.__class__.__name__ == "IrafTask": 225 # cache only executable tasks (not CL tasks, etc.) 226 executable = task.getFullpath() 227 process = self.get(task, iraf.getVarDict()) 228 self.add(process) 229 if self._data.has_key(executable): 230 self._locked[executable] = 1 231 else: 232 self.error("Cannot cache %s\n" % taskname)
233
234 - def delget(self, process):
235 """Get process object and delete it from cache 236 237 process can be an IrafProcess, task name, IrafTask, or 238 executable filename. 239 """ 240 executable = _getExecutable(process) 241 if self._data.has_key(executable): 242 rank, proxy = self._data[executable] 243 if not isinstance(process, IrafProcess): 244 process = proxy.process 245 # don't delete from cache if this is a duplicate process 246 if process == proxy.process: 247 del self._data[executable] 248 if self._locked.has_key(executable): 249 del self._locked[executable] 250 # could restart the process if locked? 251 return process
252
253 - def kill(self, process, verbose=1):
254 """Kill process and delete it from cache 255 256 process can be an IrafProcess, task name, IrafTask, or 257 executable filename. 258 """ 259 process = self.delget(process) 260 if isinstance(process, IrafProcess): 261 process.kill(verbose)
262
263 - def terminate(self, process):
264 """Terminate process and delete it from cache""" 265 # This is gentler than kill(), which should be used only 266 # when there are process errors. 267 process = self.delget(process) 268 if isinstance(process, IrafProcess): 269 process.terminate()
270
271 - def flush(self, *args):
272 """Flush given processes (all non-locked if no args given) 273 274 Takes task names (strings) as arguments. 275 """ 276 if args: 277 for taskname in args: 278 task = iraf.getTask(taskname, found=1) 279 if task is not None: self.terminate(task) 280 else: 281 for rank, proxy in self._data.values(): 282 executable = proxy.process.executable 283 if not self._locked.has_key(executable): 284 self.terminate(executable)
285
286 - def list(self):
287 """List processes sorted from newest to oldest with locked flag""" 288 values = self._data.values() 289 values.sort() 290 values.reverse() 291 n = 0 292 for rank, proxy in values: 293 n = n+1 294 executable = proxy.process.executable 295 if self._locked.has_key(executable): 296 print "%2d: L %s" % (n, executable) 297 else: 298 print "%2d: %s" % (n, executable)
299
300 - def __del__(self):
301 self._locked = {} 302 self.flush()
303 304 processCache = _ProcessCache() 305
306 -def IrafExecute(task, envdict, stdin=None, stdout=None, stderr=None, 307 stdgraph=None):
308 309 """Execute IRAF task (defined by the IrafTask object task) 310 using the provided envionmental variables.""" 311 312 global processCache 313 try: 314 # Start 'er up 315 irafprocess = processCache.get(task, envdict) 316 except (iraf.IrafError, subproc.SubprocessError, IrafProcessError), value: 317 raise 318 raise IrafProcessError("Cannot start IRAF executable\n%s" % value) 319 320 # Run it 321 try: 322 taskname = task.getName() 323 if stdgraph: 324 # Redirect graphics 325 prevkernel = gki.kernel 326 gki.kernel = gki.GkiRedirection(stdgraph) 327 gki.kernel.wcs = prevkernel.wcs 328 else: 329 # do graphics task initialization 330 gki.kernel.taskStart(taskname) 331 focusMark = wutil.focusController.getCurrentMark() 332 gki.kernel.pushStdio(None,None,None) 333 try: 334 irafprocess.run(task, pstdin=stdin, pstdout=stdout, pstderr=stderr) 335 finally: 336 if stdgraph: 337 # undo graphics redirection 338 gki.kernel = prevkernel 339 else: 340 # for interactive graphics restore previous stdio 341 wutil.focusController.restoreToMark(focusMark) 342 gki.kernel.popStdio() 343 # do any cleanup needed on task completion 344 if not stdgraph: 345 gki.kernel.taskDone(taskname) 346 except KeyboardInterrupt: 347 # On keyboard interrupt (^C), kill the subprocess 348 processCache.kill(irafprocess) 349 raise 350 except (iraf.IrafError, IrafProcessError), exc: 351 # on error, kill the subprocess, then re-raise the original exception 352 try: 353 processCache.kill(irafprocess) 354 except Exception, exc2: 355 # append new exception text to previous one (right thing to do?) 356 exc.args = exc.args + exc2.args 357 raise exc 358 else: 359 # add to the process cache on successful exit 360 processCache.add(irafprocess) 361 return
362 363 # patterns for matching messages from process 364 365 # '=param' and '_curpack' have to be treated specially because 366 # they write to the task rather than to stdout 367 # 'param=value' is special because it allows errors 368 369 _p_par_get = r'\s*=\s*(?P<gname>[a-zA-Z_$][\w.]*(?:\[\d+\])?)\s*\n' 370 _p_par_set = r'(?P<sname>[a-zA-Z_][\w.]*(?:\[\d+\])?)\s*=\s*(?P<svalue>.*)\n' 371 _re_msg = re.compile( 372 r'(?P<par_get>' + _p_par_get + ')|' + 373 r'(?P<par_set>' + _p_par_set + ')' 374 ) 375 376 _p_curpack = r'_curpack(?:\s.*|)\n' 377 _p_stty = r'stty.*\n' 378 _p_sysescape = r'!(?P<sys_cmd>.*)\n' 379 380 _re_clcmd = re.compile( 381 r'(?P<curpack>' + _p_curpack + ')|' + 382 r'(?P<stty>' + _p_stty + ')|' + 383 r'(?P<sysescape>' + _p_sysescape + ')' 384 ) 385
386 -class IrafProcess:
387 388 """IRAF process class""" 389
390 - def __init__(self, executable):
391 392 """Start IRAF task executable.""" 393 394 self.executable = executable 395 self.process = subproc.Subprocess(executable+' -c') 396 self.running = 0 # flag indicating whether process is active 397 self.task = None 398 self.stdin = None 399 self.stdout = None 400 self.stderr = None 401 self.default_stdin = None 402 self.default_stdout = None 403 self.default_stderr = None 404 self.stdinIsatty = 0 405 self.stdoutIsatty = 0 406 self.envVarList = []
407
408 - def initialize(self, envdict):
409 410 """Initialization: Copy environment variables to process""" 411 412 outenvstr = [] 413 for key, value in envdict.items(): 414 outenvstr.append("set %s=%s\n" % (key, str(value))) 415 outenvstr.append("chdir %s\n" % os.getcwd()) 416 if outenvstr: self.writeString("".join(outenvstr)) 417 self.envVarList = [] 418 419 # end set up mode 420 self.writeString('_go_\n')
421
422 - def appendEnv(self, msg):
423 424 """Append environment variable set command to list""" 425 426 # Changes are saved and sent to task before starting 427 # it next time. Note that environment variable changes 428 # are not immediately sent to a running task (because it is 429 # not expecting them.) 430 431 self.envVarList.append(msg)
432
433 - def run(self, task, pstdin=None, pstdout=None, pstderr=None):
434 435 """Run the IRAF logical task (which must be in this executable) 436 437 The IrafTask object must have these methods: 438 439 getName(): return the name of the task 440 getParam(param): get parameter value 441 setParam(param,value): set parameter value 442 getParObject(param): get parameter object 443 """ 444 445 self.task = task 446 # set IO streams 447 stdin = pstdin or sys.stdin 448 stdout = pstdout or sys.stdout 449 stderr = pstderr or sys.stderr 450 self.stdin = stdin 451 self.stdout = stdout 452 self.stderr = stderr 453 self.default_stdin = stdin 454 self.default_stdout = stdout 455 self.default_stderr = stderr 456 457 # stdinIsatty flag is used in xfer to decide whether to 458 # read inputs in blocks or not. As long as input comes 459 # from __stdin__, consider it equivalent to a tty. 460 self.stdinIsatty = (hasattr(stdin,'isatty') and stdin.isatty()) or \ 461 self.stdin == sys.__stdin__ 462 self.stdoutIsatty = hasattr(stdout,'isatty') and stdout.isatty() 463 464 # stdinIsraw flag is used in xfer to decide whether to 465 # read inputs as RAW input or not. 466 self.stdinIsraw = False 467 468 # redir_info tells task that IO has been redirected 469 470 redir_info = '' 471 if pstdin and pstdin != sys.__stdin__: 472 redir_info = '<' 473 if (pstdout and pstdout != sys.__stdout__) or \ 474 (pstderr and pstderr != sys.__stderr__): 475 redir_info = redir_info+'>' 476 477 # update IRAF environment variables if necessary 478 if self.envVarList: 479 self.writeString(''.join(self.envVarList)) 480 self.envVarList = [] 481 482 # if stdout is a terminal, set the lines & columns sizes 483 # this ensures that they are up-to-date at the start of the task 484 # (which is better than the CL does) 485 if self.stdoutIsatty: 486 nlines, ncols = wutil.getTermWindowSize() 487 self.writeString('set ttynlines=%d\nset ttyncols=%d\n' % 488 (nlines, ncols)) 489 490 taskname = self.task.getName() 491 # remove leading underscore, which is just a convention for CL 492 if taskname[:1]=='_': taskname = taskname[1:] 493 self.writeString(taskname+redir_info+'\n') 494 self.running = 1 495 try: 496 # begin slave mode 497 self.slave() 498 finally: 499 self.running = 0
500
501 - def isAlive(self):
502 503 """Returns true if process appears to be OK""" 504 505 return self.process.active()
506
507 - def terminate(self):
508 509 """Terminate the IRAF process (when process in normal end state)""" 510 511 # Standard IRAF task termination (assuming we already have the 512 # task's attention for input): 513 # Send bye message to task 514 # Wait briefly for EOF, which signals task is done 515 # Kill it anyway if it is still hanging around 516 517 if not self.process.pid: return # no need, process gone 518 try: 519 self.writeString("bye\n") 520 if self.process.wait(0.5): 521 return 522 except (IrafProcessError, subproc.SubprocessError), e: 523 pass 524 # No more Mr. Nice Guy 525 try: 526 self.process.die() 527 except subproc.SubprocessError, e: 528 if iraf.Verbose>0: 529 # too bad, if we can't kill it assume it is already dead 530 self.stderr.write("Warning: cannot terminate process %s\n" % 531 (e,)) 532 self.stderr.flush()
533
534 - def kill(self, verbose=1):
535 536 """Kill the IRAF process (more drastic than terminate)""" 537 538 # Try stopping process in IRAF-approved way first; if that fails 539 # blow it away. Copied with minor mods from subproc.py. 540 541 if not self.process.pid: return # no need, process gone 542 543 self.stdout.flush() 544 self.stderr.flush() 545 import pyrafglobals 546 if verbose and not pyrafglobals._use_ecl: 547 sys.stderr.write("Killing IRAF task `%s'\n" % self.task.getName()) 548 sys.stderr.flush() 549 if self.process.cont(): 550 # get the task's attention for input 551 try: 552 os.kill(self.process.pid, signal.SIGTERM) 553 except os.error: 554 pass 555 self.terminate()
556
557 - def writeString(self, s):
558 """Convert ascii string to IRAF form and write to IRAF process""" 559 560 self.write(Asc2IrafString(s))
561
562 - def readString(self):
563 564 """Read IRAF string from process and convert to ascii string""" 565 566 return Iraf2AscString(self.read())
567
568 - def write(self, data):
569 570 """write binary data to IRAF process in blocks of <= 4096 bytes""" 571 572 i = 0 573 block = 4096 574 try: 575 while i<len(data): 576 # Write: 577 # IRAF magic number 578 # number of following bytes 579 # data 580 dsection = data[i:i+block] 581 self.process.write(IPC_PREFIX + 582 struct.pack('=h',len(dsection)) + 583 dsection) 584 i = i + block 585 except subproc.SubprocessError, e: 586 raise IrafProcessError("Error in write: %s" % str(e))
587
588 - def read(self):
589 590 """Read binary data from IRAF pipe""" 591 592 try: 593 # read pipe header first 594 header = self.process.read(4) 595 if (header[0:2] != IPC_PREFIX): 596 raise IrafProcessError("Not a legal IRAF pipe record") 597 ntemp = struct.unpack('=h',header[2:]) 598 nbytes = ntemp[0] 599 # read the rest 600 data = self.process.read(nbytes) 601 return data 602 except subproc.SubprocessError, e: 603 raise IrafProcessError("Error in read: %s" % str(e))
604
605 - def slave(self):
606 607 """Talk to the IRAF process in slave mode. 608 Raises an IrafProcessError if an error occurs.""" 609 610 self.msg = '' 611 self.xferline = '' 612 # try to speed up loop a bit 613 re_match = _re_msg.match 614 xfer = self.xfer 615 xmit = self.xmit 616 par_get = self.par_get 617 par_set = self.par_set 618 executeClCommand = self.executeClCommand 619 while 1: 620 621 # each read may return multiple lines; only 622 # read new data when old has been used up 623 624 if not self.msg: self.msg = self.readString() 625 626 msg = self.msg 627 msg5 = msg[:5] 628 629 if msg5 == 'xfer(': 630 xfer() 631 elif msg5 == 'xmit(': 632 xmit() 633 elif msg[:4] == 'bye\n': 634 return 635 elif msg5 in ['error','ERROR']: 636 errno, text = self._scanErrno(msg) 637 raise IrafProcessError("IRAF task terminated abnormally\n"+msg, 638 errno=errno, errmsg=text, errtask=self.task.getName()) 639 else: 640 # pattern match to see what this message is 641 mcmd = re_match(msg) 642 if mcmd is None: 643 # Could be any legal CL command. 644 executeClCommand() 645 elif mcmd.group('par_get'): 646 par_get(mcmd) 647 elif mcmd.group('par_set'): 648 par_set(mcmd) 649 else: 650 # should never get here 651 raise RuntimeError("Program bug: uninterpreted message `%s'" 652 % (msg,))
653
654 - def _scanErrno(self, msg):
655 sp = "\s*" 656 quote = "\"" 657 m = re.search( 658 "(ERROR|error)" + sp + "\(" + sp + "(\d+)" + 659 sp + "," + sp + quote + "([^\"]*)" + quote + sp + 660 "\)" + sp, msg) 661 if m: 662 try: 663 errno = int(m.group(2)) 664 except: 665 errno = -9999999 666 text = m.group(3) 667 else: 668 errno, text = -9999998, msg 669 return errno, text
670
671 - def setStdio(self):
672 """Set self.stdin/stdout based on current state 673 674 If in graphics mode, I/O is done through status line. 675 Else I/O is done through normal streams. 676 """ 677 678 self.stdout = gki.kernel.getStdout(default=self.default_stdout) 679 self.stderr = gki.kernel.getStderr(default=self.default_stderr) 680 self.stdin = gki.kernel.getStdin(default=self.default_stdin)
681
682 - def par_get(self, mcmd):
683 # parameter get request 684 # list parameters can generate EOF exception 685 paramname = mcmd.group('gname') 686 # interactive parameter prompts may be redirected to the graphics 687 # status line, but do not get redirected to a file 688 c_stdin = sys.stdin 689 c_stdout = sys.stdout 690 c_stderr = sys.stderr 691 # 692 # These lines reset stdin/stdout/stderr to the graphics 693 # window. 694 sys.stdin = gki.kernel.getStdin(default=sys.__stdin__) 695 sys.stdout = gki.kernel.getStdout(default=sys.__stdout__) 696 sys.stderr = gki.kernel.getStderr(default=sys.__stderr__) 697 try: 698 try: 699 pmsg = self.task.getParam(paramname, native=0) 700 if type(pmsg) != types.StringType: 701 # Only psets should return a non-string type (they 702 # return the task object). 703 # Work a little to get the underlying string value. 704 # (Yes, this is klugy, but there are so many places 705 # where it is necessary to return the task object 706 # for a pset that this seems like a small price to 707 # pay.) 708 pobj = self.task.getParObject(paramname) 709 pmsg = pobj.get(lpar=1) 710 else: 711 # replace all newlines in strings with "\n" 712 pmsg = pmsg.replace('\n','\\n') 713 pmsg = pmsg + '\n' 714 except EOFError: 715 pmsg = 'EOF\n' 716 finally: 717 # Make sure that STDIN/STDOUT/STDERR are reset to 718 # tty mode instead of being stuck in graphics window. 719 sys.stdin = c_stdin 720 sys.stdout = c_stdout 721 sys.stderr = c_stderr 722 self.writeString(pmsg) 723 self.msg = self.msg[mcmd.end():]
724
725 - def par_set(self, mcmd):
726 # set value of parameter 727 group = mcmd.group 728 paramname = group('sname') 729 newvalue = group('svalue') 730 self.msg = self.msg[mcmd.end():] 731 try: 732 self.task.setParam(paramname,newvalue) 733 except ValueError, e: 734 # on ValueError, just print warning and then force set 735 if iraf.Verbose>0: 736 self.stderr.write('Warning: %s\n' % (e,)) 737 self.stderr.flush() 738 self.task.setParam(paramname,newvalue,check=0)
739
740 - def xmit(self):
741 742 """Handle xmit data transmissions""" 743 744 chan, nbytes = self.chanbytes() 745 746 checkForEscapeSeq = (chan == 4 and (nbytes==6 or nbytes==5)) 747 xdata = self.read() 748 749 if len(xdata) != 2*nbytes: 750 raise IrafProcessError( 751 "Error, wrong number of bytes read\n" + 752 ("(got %d, expected %d, chan %d)" % 753 (len(xdata), 2*nbytes, chan))) 754 if chan == 4: 755 if self.task.getTbflag(): 756 # for tasks with .tb flag, stdout is binary data 757 txdata = xdata 758 else: 759 # normally stdout is translated text data 760 txdata = Iraf2AscString(xdata) 761 762 if checkForEscapeSeq: 763 if (txdata[0:5] == "\033+rAw"): 764 # Turn on RAW mode for STDIN 765 self.stdinIsraw = True 766 return 767 768 if (txdata[0:5] == "\033-rAw"): 769 # Turn off RAW mode for STDIN 770 self.stdinIsraw = False 771 return 772 773 if (txdata[0:5] == "\033=rDw"): 774 # ignore IRAF io escape sequences for now 775 # This mode enables screen redraw code 776 return 777 778 self.stdout.write(txdata) 779 self.stdout.flush() 780 elif chan == 5: 781 sys.stdout.flush() 782 self.stderr.write(Iraf2AscString(xdata)) 783 self.stderr.flush() 784 elif chan == 6: 785 gki.kernel.append(numpy.fromstring(xdata, dtype=numpy.int16)) 786 elif chan == 7: 787 stdimagekernel.append(numpy.fromstring(xdata, dtype=numpy.int16)) 788 elif chan == 8: 789 self.stdout.write("data for STDPLOT\n") 790 self.stdout.flush() 791 elif chan == 9: 792 sdata = numpy.fromstring(xdata, dtype=numpy.int16) 793 if isBigEndian: 794 # Actually, the channel destination is sent 795 # by the iraf process as a 4 byte int, the following 796 # code basically chooses the right two bytes to 797 # find it in. 798 forChan = sdata[1] 799 else: 800 forChan = sdata[0] 801 if forChan == 6: 802 # STDPLOT control 803 # Pass it to the kernel to deal with 804 # Only returns a value for getwcs 805 wcs = gki.kernel.control(sdata[2:]) 806 if wcs: 807 # Write directly to stdin of subprocess; 808 # strangely enough, it doesn't use the 809 # STDGRAPH I/O channel. 810 self.write(wcs) 811 gki.kernel.clearReturnData() 812 self.setStdio() 813 elif forChan == 7: 814 # STDIMAGE control, see previous block for comments on details 815 wcs = stdimagekernel.control(sdata[2:]) 816 if wcs: 817 self.write(wcs) 818 stdimagekernel.clearReturnData() 819 else: 820 self.stdout.write("GRAPHICS control data for channel %d\n" % (forChan,)) 821 self.stdout.flush() 822 else: 823 self.stdout.write("data for channel %d\n" % (chan,)) 824 self.stdout.flush()
825
826 - def xfer(self):
827 828 """Handle xfer data requests""" 829 830 chan, nbytes = self.chanbytes() 831 nchars = nbytes//2 832 if chan == 3: 833 834 # Read data from stdin unless xferline already has 835 # some untransmitted data from a previous read 836 837 line = self.xferline 838 if not line: 839 if self.stdinIsatty: 840 if not self.stdinIsraw: 841 self.setStdio() 842 # tty input, read a single line 843 line = irafutils.tkreadline(self.stdin) 844 else: 845 # Raw input requested 846 # Input character needs to be converted 847 # to its ASCII integer code. 848 #line = raw_input() 849 line = irafukey.getSingleTTYChar() 850 else: 851 # file input, read a big chunk of data 852 853 # NOTE: Here we are reading ahead in the stdin stream, 854 # which works fine with a single IRAF task. This approach 855 # could conceivably cause problems if some program expects 856 # to continue reading from this stream starting at the 857 # first line not read by the IRAF task. That sounds 858 # very unlikely to be a good design and will not work 859 # as a result of this approach. Sending the data in 860 # large chunks is *much* faster than sending many 861 # small messages (due to the overhead of handshaking 862 # between the CL task and this main process.) That's 863 # why it is done this way. 864 865 line = self.stdin.read(nchars) 866 self.xferline = line 867 # Send two messages, the first with the number of characters 868 # in the line and the second with the line itself. 869 # For very long lines, may need multiple messages. Task 870 # will keep sending xfer requests until it gets the 871 # newline. 872 873 if not self.stdinIsraw: 874 if len(line)<=nchars: 875 # short line 876 self.writeString(str(len(line))) 877 self.writeString(line) 878 self.xferline = '' 879 else: 880 # long line 881 self.writeString(str(nchars)) 882 self.writeString(line[:nchars]) 883 self.xferline = line[nchars:] 884 else: 885 self.writeString(str(len(line))) 886 self.writeString(line) 887 self.xferline = '' 888 else: 889 raise IrafProcessError("xfer request for unknown channel %d" % chan)
890
891 - def chanbytes(self):
892 """Parse xmit(chan,nbytes) and return integer tuple 893 894 Assumes first 5 characters have already been checked 895 """ 896 msg = self.msg 897 try: 898 i = msg.find(",",5) 899 if i<0 or msg[-2:] != ")\n": raise ValueError 900 chan = int(msg[5:i]) 901 nbytes = int(msg[i+1:-2]) 902 self.msg = '' 903 except ValueError: 904 raise IrafProcessError("Illegal message format `%s'" % self.msg) 905 return chan, nbytes
906
907 - def executeClCommand(self):
908 909 """Execute an arbitrary CL command""" 910 911 # pattern match to handle special commands that write to task 912 mcmd = _re_clcmd.match(self.msg) 913 if mcmd is None: 914 # general command 915 i = self.msg.find("\n") 916 if i>=0: 917 cmd = self.msg[:i+1] 918 self.msg = self.msg[i+1:] 919 else: 920 cmd = self.msg 921 self.msg = "" 922 if not (cmd.find(IPCOUT) >= 0): 923 # normal case -- execute the CL script code 924 # redirect I/O (but don't use graphics status line) 925 iraf.clExecute(cmd, Stdout=self.default_stdout, 926 Stdin=self.default_stdin, Stderr=self.default_stderr) 927 else: 928 # 929 # Bizzaro protocol -- redirection to file with special 930 # name given by IPCOUT causes output to be written back 931 # to subprocess instead of to stdout. 932 # 933 # I think this only occurs one place in the entire system 934 # (in clio/clepset.x) so I'm not trying to handle it robustly. 935 # Just raise an exception if it does not fit my preconceptions. 936 # 937 ll = -(len(IPCOUT)+3) 938 if cmd[ll:] != "> %s\n" % IPCOUT: 939 raise IrafProcessError( 940 "Error: cannot understand IPCOUT syntax in `%s'" 941 % (cmd,)) 942 sys.stdout.flush() 943 # strip the redirection off and capture output of command 944 buffer = cStringIO.StringIO() 945 # redirect other I/O (but don't use graphics status line) 946 iraf.clExecute(cmd[:ll]+"\n", Stdout=buffer, 947 Stdin=self.default_stdin, Stderr=self.default_stderr) 948 # send it off to the task with special flag line at end 949 buffer.write(IPCDONEMSG) 950 self.writeString(buffer.getvalue()) 951 buffer.close() 952 elif mcmd.group('stty'): 953 # terminal window size 954 if self.stdoutIsatty: 955 nlines, ncols = wutil.getTermWindowSize() 956 else: 957 # a kluge -- if self.stdout is not a tty, assume it is a 958 # file and give a large number for the number of lines 959 nlines, ncols = 100000, 80 960 self.writeString('set ttynlines=%d\nset ttyncols=%d\n' % 961 (nlines, ncols)) 962 self.msg = self.msg[mcmd.end():] 963 elif mcmd.group('curpack'): 964 # current package request 965 self.writeString(iraf.curpack() + '\n') 966 self.msg = self.msg[mcmd.end():] 967 elif mcmd.group('sysescape'): 968 # OS escape 969 tmsg = mcmd.group('sys_cmd') 970 # use my version of system command so redirection works 971 sysstatus = iraf.clOscmd(tmsg, Stdin=self.stdin, 972 Stdout=self.stdout, Stderr=self.stderr) 973 self.writeString(str(sysstatus)+"\n") 974 self.msg = self.msg[mcmd.end():] 975 # self.stdout.write(self.msg + "\n") 976 else: 977 # should never get here 978 raise RuntimeError( 979 "Program bug: uninterpreted message `%s'" 980 % (self.msg,))
981 982 983 # IRAF string conversions using numpy module 984
985 -def Asc2IrafString(ascii_string):
986 """translate ascii to IRAF 16-bit string format""" 987 inarr = numpy.fromstring(ascii_string, numpy.int8) 988 return inarr.astype(numpy.int16).tostring()
989
990 -def Iraf2AscString(iraf_string):
991 """translate 16-bit IRAF characters to ascii""" 992 inarr = numpy.fromstring(iraf_string, numpy.int16) 993 return inarr.astype(numpy.int8).tostring()
994