Source code for parcalc.parcalc

# -*- coding: utf-8 -*-
# Copyright 2011 by Pawel T. Jochym <>
#    This file is part of Elastic.

#    Elastic is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.

#    Elastic is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    GNU General Public License for more details.

#    You should have received a copy of the GNU General Public License
#    along with Elastic.  If not, see <>.


.. _par-calc-mod:

Parallel Calculator Module

Parallel calculator module is an extension of the standard
`ASE <>`_ calculator working in the
parallel cluster environment. It is very useful in all situations where
you need to run several, independent calculations and you have a large
cluster of machines at your disposal (probably with some queuing system).

This implementation uses VASP but the code can be easily adapted for use
with other ASE calculators with minor changes.
The final goal is to provide a universal module for parallel
calculator execution in the cluster environment.

The SIESTA code by Georgios Tritsaris <>
Not fully tested after merge.


from __future__ import print_function, division
import logging
import ase
from ase.calculators.vasp import Vasp
from ase.calculators.siesta import Siesta
from ase.calculators.aims import Aims
from ase.calculators.calculator import Calculator, FileIOCalculator, all_changes

try :  # Python3
    from queue import Empty
except ImportError : # Python2
    from Queue import Empty

from multiprocessing import Process, Queue

import time
import os
import tempfile
import shutil
from copy import deepcopy
from subprocess import check_output
from contextlib import contextmanager

class _NonBlockingRunException(Exception):
    Internal exception. Should never be propagated outside.
    def __str__(self):
        return '''The __NonBlockingRunException should be caught inside
        the calculator class. If you got it outside it is a bug.
        Contact the author and/or submit a bug ticket at github.'''

from traceback import print_stack

[docs]@contextmanager def work_dir(path): ''' Context menager for executing commands in some working directory. Returns to the previous wd when finished. Usage: >>> with work_dir(path): ...'git status') ''' starting_directory = os.getcwd() try: os.chdir(path) yield finally: os.chdir(starting_directory)
[docs]class ClusterVasp(Vasp): ''' Adaptation of VASP calculator to the cluster environment where you often have to make some preparations before job submission. You can easily adapt this class to your particular environment. It is also easy to use this as a template for other type of calculator. ''' def __init__(self, nodes=1, ppn=8, block=True, ncl=False, **kwargs): Vasp.__init__(self, **kwargs) self.nodes=nodes self.ppn=ppn self.block=block self.ncl=ncl self.calc_running=False self.working_dir=os.getcwd()
[docs] def prepare_calc_dir(self): ''' Prepare the calculation directory for VASP execution. This needs to be re-implemented for each local setup. The following code reflects just my particular setup. ''' with open("vasprun.conf","w") as f: f.write('NODES="nodes=%s:ppn=%d"\n' % (self.nodes, self.ppn)) f.write('BLOCK=%d\n' % (self.block,)) if self.ncl : f.write('NCL=%d\n' % (1,))
#print(self.nodes, self.ppn)
[docs] def calc_finished(self): ''' Check if the lockfile is in the calculation directory. It is removed by the script at the end regardless of the success of the calculation. This is totally tied to implementation and you need to implement your own scheme! ''' #print_stack(limit=5) if not self.calc_running : #print('Calc running:',self.calc_running) return True else: # The calc is marked as running check if this is still true # We do it by external scripts. You need to write these # scripts for your own system. # See examples/scripts directory for examples. with work_dir(self.working_dir) : o=check_output(['check-job']) #print('Status',o) if o[0] in b'R' : # Still running - we do nothing to preserve the state return False else : # The job is not running maybe it finished maybe crashed # We hope for the best at this point ad pass to the # Standard update function return True
def set(self,**kwargs): if 'block' in kwargs : self.block=kwargs['block'] del kwargs['block'] else : self.block=True if 'ncl' in kwargs : self.ncl=kwargs['ncl'] del kwargs['ncl'] else : self.ncl=False Vasp.set(self, **kwargs)
[docs] def clean(self): with work_dir(self.working_dir) : Vasp.clean(self)
def update(self, atoms): if self.calc_running : # we have started the calculation and have # nothing to read really. But we need to check # first if this is still true. if self.calc_finished(): # We were running but recently finished => read the results # This is a piece of copy-and-paste programming # This is a copy of code from Vasp.calculate self.calc_running=False with work_dir(self.working_dir) : atoms_sorted ='CONTCAR', format='vasp') if self.int_params['ibrion'] > -1 and self.int_params['nsw'] > 0: # Update atomic positions and unit cell with the ones read # from CONTCAR. atoms.positions = atoms_sorted[self.resort].positions atoms.cell = atoms_sorted.cell self.converged = self.read_convergence() Vasp.set_results(self,atoms) return else : return # We are not in the middle of calculation. # Update as normal Vasp.update(self, atoms) def set_results(self, atoms): with work_dir(self.working_dir) : #print('set_results') Vasp.set_results(self, atoms)
[docs] def run(self): ''' Blocking/Non-blocing run method. In blocking mode it just runs parent run method. In non-blocking mode it raises the __NonBlockingRunException to bail out of the processing of standard calculate method (or any other method in fact) and signal that the data is not ready to be collected. ''' # This is only called from self.calculate - thus # we do not need to change to working_dir # since calculate already did if not self.block : #print('Interrupt processing of calculate', os.getcwd()) raise _NonBlockingRunException
[docs] def calculate(self, atoms): ''' Blocking/Non-blocking calculate method If we are in blocking mode we just run, wait for the job to end and read in the results. Easy ... The non-blocking mode is a little tricky. We need to start the job and guard against it reading back possible old data from the directory - the queuing system may not even started the job when we get control back from the starting script. Thus anything we read after invocation is potentially garbage - even if it is a converged calculation data. We handle it by custom run function above which raises an exception after submitting the job. This skips post-run processing in the calculator, preserves the state of the data and signals here that we need to wait for results. ''' with work_dir(self.working_dir) : self.prepare_calc_dir() self.calc_running=True #print('Run VASP.calculate') try : Vasp.calculate(self, atoms) self.calc_running=False #print('VASP.calculate returned') except _NonBlockingRunException as e: # We have nothing else to docs # until the job finishes #print('Interrupted ', self.working_dir, os.getcwd()) pass
[docs]class ClusterSiesta(Siesta): ''' Siesta calculator. Not fully tested by me - so this should be considered beta quality. Nevertheless it is based on working implementation ''' def __init__(self, nodes=1, ppn=8, **kwargs): Siesta.__init__(self, **kwargs) self.nodes=nodes self.ppn=ppn def prepare_calc_dir(self): with open("siestarun.conf","w") as f: f.write('NODES="nodes=%d:ppn=%d"' % (self.nodes, self.ppn)) #print(self.nodes, self.ppn) def get_potential_energy(self, atoms): self.prepare_calc_dir() Siesta.get_potential_energy(self, atoms) def clean(self): self.converged = False return
[docs]class ClusterAims(Aims): ''' Encapsulating Aims calculator for the cluster environment. ''' def __init__(self, nodes=1, ppn=8, **kwargs): Aims.__init__(self, **kwargs) self.nodes=nodes self.ppn=ppn def prepare_calc_dir(self): with open("siestarun.conf","w") as f: f.write('NODES="nodes=%d:ppn=%d"' % (self.nodes, self.ppn)) #print(self.nodes, self.ppn) def run(self): self.prepare_calc_dir()
[docs]class RemoteCalculator(Calculator): ''' Remote calculator based on ASE calculator class. This class is only involved with the machanics of remotly executing the software and transporting the data. The calculation is delegated to the actual calculator class. ''' # Queue system submit command qsub_tool='qsub' qstat_tool='qstat' qsub_cmd='cd %(rdir)s ; %(qsub_tool)s -N %(title)s -l procs=%(procs)d ./run-pw.pbs' # Remote execution command remote_exec_cmd='ssh %(user)s@%(host)s "%(command)s"' # If you cannot mount the data directory into your system it is best # to use the rsync command to transfer the results back into the system. # Command for copying the data out to the computing system copy_out_cmd='rsync -a "%(ldir)s" "%(user)s@%(host)s:%(rdir)s"' # Command for copying the data in after the calculation copy_in_cmd='rsync -a "%(user)s@%(host)s:%(rdir)s" "%(ldir)s"' # Template for the PBS batch job pbs_template='' # Command to check the state of the job pbs_check_cmd='''%(qstat_tool)s -f %(jobid)s |grep job_state |awk '{print $3}' ''' # Access data host='' user='' # Location: # local working directory wdir='.' # Remote working directory relative to the home directory or absolute rdir='.' # Repetition timer (seconds) for checkin the state of the job. job_check_time=15 def __init__(self, restart=None, ignore_bad_restart_file=False, label=None, atoms=None, calc=None, block=False, **kwargs): '''Basic calculator implementation. restart: str Prefix for restart file. May contain a directory. Default is None: don't restart. ignore_bad_restart_file: bool Ignore broken or missing restart file. By default, it is an error if the restart file is missing or broken. label: str Name used for all files. May contain a directory. atoms: Atoms object Optional Atoms object to which the calculator will be attached. When restarting, atoms will get its positions and unit-cell updated from file. Create a remote execution calculator based on actual ASE calculator calc. ''' logging.debug("Calc: %s Label: %s" % (calc, label)) Calculator.__init__(self, restart, ignore_bad_restart_file, label, atoms, **kwargs) logging.debug("Dir: %s Ext: %s" % (, self.ext)) self.calc=calc self.jobid=None self.block=block def write_pbs_in(self,properties): with work_dir( with open(os.path.join(,'run-ase-calc.pbs'),'w') as fh: fh.write(self.pbs_template % { 'command': self.build_command(self,prop=properties, params=self.parameters) }) def build_command(self,prop=['energy'],params={}): cmd=self.qsub_cmd % { 'qsub_tool': self.qsub_tool, 'qstat_tool': self.qstat_tool, 'title': self.label, 'procs': self.parameters['procs'], 'rdir': os.path.join(self.parameters['rdir'],os.path.split([-1]) } cmd=self.remote_exec_cmd % { 'command': cmd, 'user': self.parameters['user'], 'host': self.parameters['host'] } return cmd
[docs] def write_input(self, atoms=None, properties=['energy'], system_changes=all_changes): '''Write input file(s).''' with work_dir( self.calc.write_input(self, atoms, properties, system_changes) self.write_pbs_in(properties) % { 'ldir':, 'rdir': self.parameters['rdir'], 'user': self.parameters['user'], 'host': self.parameters['host'] }, shell=True)
def job_ready(self): try : cmd=self.remote_exec_cmd % { 'command': self.pbs_check_cmd % { 'qsub_tool': self.qsub_tool, 'qstat_tool': self.qstat_tool, 'jobid':self.jobid }, 'user': self.parameters['user'], 'host': self.parameters['host'] } state=subprocess.check_output(cmd, shell=True).split()[-1] except (subprocess.CalledProcessError, IndexError) : # Unknown state. We assume it has finished and continue state='N' return not (state in ['Q','R'])
[docs] def run_calculation(self, atoms=None, properties=['energy'], system_changes=all_changes): ''' Internal calculation executor. We cannot use FileIOCalculator directly since we need to support remote execution. This calculator is different from others. It prepares the directory, launches the remote process and raises the exception to signal that we need to come back for results when the job is finished. ''' self.calc.calculate(self, atoms, properties, system_changes) self.write_input(self.atoms, properties, system_changes) if self.command is None: raise RuntimeError('Please configure Remote calculator!') olddir = os.getcwd() errorcode=0 try: os.chdir( output = subprocess.check_output(self.command, shell=True) self.jobid=output.split()[0] self.submited=True #print "Job %s submitted. Waiting for it." % (self.jobid) # Waiting loop. To be removed. except subprocess.CalledProcessError as e: errorcode=e.returncode finally: os.chdir(olddir) if errorcode: raise RuntimeError('%s returned an error: %d' % (, errorcode)) self.read_results()
[docs] def read_results(self): """Read energy, forces, ... from output file(s).""" if self.submited: # The job has been submitted. Check the state. if not self.job_ready() : if self.block : while not self.job_ready() : time.sleep(self.job_check_time) else : raise CalcNotReadyError # Assume the calc finished. Copy the files back. % { 'ldir': self.wdir, 'rdir': os.path.join(self.parameters['rdir'],os.path.split([-1]), 'user': self.parameters['user'], 'host': self.parameters['host'] }, shell=True) fn=os.path.join(,'pw.out') # Read the pan-ultimate line of the output file try: ln=open(fn).readlines()[-2] if ln.find('JOB DONE.')>-1 : # Job is done we can read the output r=read_quantumespresso_textoutput(fn) self.submited=False self.jobid=None else : # Job not ready. raise CalcNotReadyError except (IOError, IndexError) : # Job not ready. raise CalcNotReadyError # All is fine - really read the results self.calc.read_results(self)
[docs] @classmethod def ParallelCalculate(cls,syslst,properties=['energy'],system_changes=all_changes): ''' Run a series of calculations in parallel using (implicitely) some remote machine/cluster. The function returns the list of systems ready for the extraction of calculated properties. ''' print('Launching:',end=' ') sys.stdout.flush() for n,s in enumerate(syslst): try : s.calc.block=False s.calc.calculate(atoms=s,properties=properties,system_changes=system_changes) except CalcNotReadyError: s.calc.block=True print(n+1, end=' ') sys.stdout.flush() print() print(' Done:', end=' ') sys.stdout.flush() for n,s in enumerate(syslst): s.calc.read_results() print( n+1, end=' ') sys.stdout.flush() print() return syslst
verbose=True class __PCalcProc(Process): ''' Internal helper class representing the calculation process isolated from the rest of the ASE script. The process (not thread) runs in the separate directory, created on-the-fly and removed at the end if the cleanup is true and we are in blocking (default) mode. In this mode it is vital for the calculator to read in all the results after the run since the files will be removed as soon as the "calculate" function terminates. You can pass False to the cleanup argument to prevent the clean-up. This is very usefull for debuging. The process waits without any time-out for the calculation to finish. This is great for short and simple calculations and quick testing. You run the job and get back your results. ''' def __init__(self, iq, oq, calc, prefix, cleanup=True): Process.__init__(self) self.calc=calc self.basedir=os.getcwd(), dir=self.basedir) self.oq=oq self.CleanUp=cleanup def run(self): with work_dir( : n, system.set_calculator(deepcopy(self.calc)) system.get_calculator().block=True system.get_calculator() #print("Start at :", if hasattr(self.calc, 'name') and'Siesta': system.get_potential_energy() else: system.get_calculator().calculate(system) #print("Finito: ", os.getcwd(), system.get_volume(), system.get_pressure()) self.oq.put([n,system]) if self.CleanUp : system.get_calculator().clean() os.chdir(self.basedir) shutil.rmtree(, ignore_errors=True)
[docs]def ParCalculate(systems,calc,cleanup=True,block=True,prefix="Calc_"): ''' Run calculators in parallel for all systems. Calculators are executed in isolated processes and directories. The resulting objects are returned in the list (one per input system). ''' if type(systems) != type([]) : sysl=[systems] else : sysl=systems if block : iq=Queue(len(sysl)+1) oq=Queue(len(sysl)+1) # Create workers for s in sysl: __PCalcProc(iq, oq, calc, prefix=prefix, cleanup=cleanup).start() # Put jobs into the queue for n,s in enumerate(sysl): iq.put([n,s]) # Protection against too quick insertion time.sleep(0.2) if verbose : print("Workers started:", len(sysl)) # Collect the results res=[] while len(res)<len(sysl) : n,s=oq.get() res.append([n,s]) #print("Got from oq:", n, s.get_volume(), s.get_pressure()) else : # We do not need the multiprocessing complications for non-blocking # workers. We just run all in sequence. basedir=os.getcwd() res=[] for n,s in enumerate(sysl): s.set_calculator(deepcopy(calc)) s.get_calculator().block=block place=tempfile.mkdtemp(prefix=prefix, dir=basedir) os.chdir(place) s.get_calculator().working_dir=place #print("Start at :", place) if hasattr(calc, 'name') and'Siesta': s.get_potential_energy() else: s.get_calculator().calculate(s) os.chdir(basedir) #print("Submited", s.get_calculator().calc_finished(), os.getcwd()) # Protection against too quick insertion time.sleep(0.2) res.append([n,s]) if verbose : print("Workers started:", len(sysl)) return [r for ns,s in enumerate(sysl) for nr,r in res if nr==ns]
# Testing routines using VASP as a calculator in the cluster environment. # TODO: Make it calculator/environment agnostic if __name__ == '__main__': from ase.lattice.spacegroup import crystal from ase.units import GPa import elastic import numpy from pylab import * a = 4.291 MgO = crystal(['Mg', 'O'], [(0, 0, 0), (0.5, 0.5, 0.5)], spacegroup=225, cellpar=[a, a, a, 90, 90, 90]) ################################## # Provide your own calculator here ################################## calc=ClusterVasp(nodes=1,ppn=8) # The calculator must be runnable in an isolated directory # Without disturbing other running instances of the same calculator # They are run in separate processes (not threads!) MgO.set_calculator(calc) calc.set(prec = 'Accurate', xc = 'PBE', lreal = False, isif=2, nsw=20, ibrion=2, kpts=[1,1,1]) print("Residual pressure: %.3f GPa" % (MgO.get_pressure()/GPa)) calc.clean() systems=[] for av in numpy.linspace(a*0.95,a*1.05,5): systems.append(crystal(['Mg', 'O'], [(0, 0, 0), (0.5, 0.5, 0.5)], spacegroup=225, cellpar=[av, av, av, 90, 90, 90])) pcalc=ClusterVasp(nodes=1,ppn=8) pcalc.set(prec = 'Accurate', xc = 'PBE', lreal = False, isif=2, nsw=20, ibrion=2, kpts=[1,1,1]) res=ParCalculate(systems,pcalc) v=[] p=[] for s in res : v.append(s.get_volume()) p.append(s.get_pressure()/GPa) plot(v,p,'o') show()