""" **abstract_simulation.py** provides an abstract class for unsteady adaptive FEM simulations.
We define a simulation as a sequence of time-dependent initial boundary value problems.
This module only provides an abstract base class for simulations.
For an example of implementing a specific set of governing equations,
see the `abstract_phasechange_simulation` module.
For examples of implementing instantiable simulation classes,
see e.g. `cavity_melting_simulation` and `cavity_freezing_simulation`.
This module essentially contains many lessons learned by a PhD student*
applying FEniCS to time-dependent mixed FE problems with goal-oriented AMR.
If you are not solving time-dependent problems,
not using mixed finite elements,
or not using goal-oriented AMR,
then perhaps you do not need to consider much of this module.
The bulk of the difficult work is done by the `fenics` module;
but `fenics` is mostly only a library for finite element spatial discretization of PDE's.
In that context,
little attention is paid to the time discretization of initial value problem.
Furthermore, we use some advanced features of `fenics`,
namely mixed finite elements and goal-oriented adaptive mesh refinement (AMR),
the latter of which is not among the better supported "core features" of `fenics`.
* Alexander G. Zimmerman, AICES Graduate School, RWTH Aachen University
"""
import phaseflow
import fenics
import abc
import numpy
import matplotlib
import os
[docs]class AbstractSimulation(metaclass = abc.ABCMeta):
""" A class for time-dependent simulations with goal-oriented AMR using FeniCS """
def __init__(self, time_order = 1, integration_measure = fenics.dx, setup_solver = True):
self.integration_measure = integration_measure
self.time_order = time_order
self._times = [0.,]*(time_order + 1)
self._timestep_sizes = []
for i in range(time_order):
self._timestep_sizes.append(fenics.Constant(1.))
self._mesh = self.initial_mesh()
self._element = self.element()
self._function_space = fenics.FunctionSpace(self._mesh, self._element)
self._solutions = []
for i in range(time_order + 1):
self._solutions.append(fenics.Function(self.function_space))
self.newton_solution = fenics.Function(self.function_space)
self.adaptive_solver = None
self.solver_status = {"iterations": 0, "solved": False}
self.solver_needs_setup = True
if setup_solver:
self.setup_solver()
self.output_dir = ""
@property
def timestep_size(self):
return self._timestep_sizes[0]
@property
def mesh(self):
return self._mesh.leaf_node()
@mesh.setter
def mesh(self, value):
""" Automatically redefine the function space and solutions when the mesh is redefined. """
self._mesh = value
self.solver_needs_setup = True
self.reinit_solutions()
@property
def function_space(self):
return self._function_space.leaf_node()
@property
def solution(self):
return self._solutions[0].leaf_node()
@property
def time(self):
return self._times[0]
[docs] @abc.abstractmethod
def coarse_mesh(self):
""" Redefine this to return a `fenics.Mesh`. """
[docs] @abc.abstractmethod
def element(self):
""" Redefine this to return a `fenics.MixedElement`. """
[docs] @abc.abstractmethod
def initial_values(self):
""" Redefine this to return a `fenics.Function` containing the initial values. """
[docs] @abc.abstractmethod
def boundary_conditions(self):
""" Redefine this to return a list of `fenics.DirichletBC`. """
[docs] def adaptive_goal(self):
""" Redefine this to return an adaptive goal. """
[docs] def initial_mesh(self):
""" Redefine this to refine the mesh before adaptive mesh refinement. """
return self.coarse_mesh()
[docs] def reinit_solutions(self):
""" Create the function space and solution functions for the current mesh and element. """
self._function_space = fenics.FunctionSpace(self._mesh, self._element)
for i in range(len(self._solutions)):
self._solutions[i] = fenics.Function(self._function_space)
[docs] def setup_solver(self):
""" Sometimes it is necessary to set up the solver again after breaking
important references, e.g. after re-meshing.
"""
self._governing_form = self.governing_form()
self._boundary_conditions = self.boundary_conditions()
self._problem = fenics.NonlinearVariationalProblem(
F = self._governing_form,
u = self.solution,
bcs = self._boundary_conditions,
J = fenics.derivative(
form = self._governing_form,
u = self.solution))
save_parameters = False
if hasattr(self, "solver"):
save_parameters = True
if save_parameters:
solver_parameters = self.solver.parameters.copy()
self.solver = fenics.NonlinearVariationalSolver(problem = self._problem)
if save_parameters:
self.solver.parameters = solver_parameters.copy()
self._adaptive_goal = self.adaptive_goal()
if self._adaptive_goal is not None:
save_parameters = False
if self.adaptive_solver is not None:
save_parameters = True
if save_parameters:
adaptive_solver_parameters = self.adaptive_solver.parameters.copy()
self.adaptive_solver = fenics.AdaptiveNonlinearVariationalSolver(
problem = self._problem,
goal = self._adaptive_goal)
if save_parameters:
self.adaptive_solver.parameters = adaptive_solver_parameters.copy()
self.solver_needs_setup = False
""" The following methods are used to solve time steps and advance the unsteady simulation. """
[docs] def solve(self, goal_tolerance = None):
""" Solve the nonlinear variational problem.
Optionally provide `goal_tolerance` to use the adaptive solver.
"""
if self.solver_needs_setup:
self.setup_solver()
self._times[0] = self._times[1] + self.timestep_size.__float__()
if goal_tolerance is None:
solver_status = self.solver.solve()
self.solver_status["iterations"] = solver_status[0]
else:
share_solver_parameters(
self.adaptive_solver.parameters["nonlinear_variational_solver"],
self.solver.parameters)
self.adaptive_solver.solve(goal_tolerance)
""" `fenics.AdaptiveNonlinearVariationalSolver` does not return status."""
self.solver_status["iterations"] = "NA"
self.solver_status["solved"] = True
return self.solver_status
[docs] def advance(self):
""" Move solutions backward in the queue to prepare for a new time step.
This is a separate method, since one may want to call `solve` multiple times
before being satisfied with the solution state.
"""
if self.time_order > 1:
self._solutions[2].leaf_node().assign(self._solutions[1].leaf_node())
self._times[2] = 0. + self._times[1]
self._timestep_sizes[1].assign(self._timestep_sizes[0])
self._solutions[1].leaf_node().assign(self._solutions[0].leaf_node())
self._times[1] = 0. + self._times[0]
""" The following are some utility methods. """
[docs] def time_discrete_terms(self):
""" Apply first-order implicit Euler finite difference method. """
wnp1 = fenics.split(self._solutions[0].leaf_node())
wn = fenics.split(self._solutions[1].leaf_node())
if self.time_order == 1:
return tuple([
phaseflow.backward_difference_formulas.apply_backward_euler(
self._timestep_sizes[0],
(wnp1[i], wn[i]))
for i in range(len(wn))])
if self.time_order > 1:
wnm1 = fenics.split(self._solutions[2].leaf_node())
if self.time_order == 2:
return tuple([
phaseflow.backward_difference_formulas.apply_bdf2(
(self._timestep_sizes[0], self._timestep_sizes[1]),
(wnp1[i], wn[i], wnm1[i]))
for i in range(len(wn))])
if self.time_order > 2:
raise NotImplementedError()
[docs] def assign_initial_values(self):
""" Set values of all solutions from `self.initial_values()`. """
initial_values = self.initial_values()
for i in range(len(self._solutions)):
self._solutions[i].assign(initial_values)
[docs] def reset_initial_guess(self):
""" Set the values of the latest solution from the next latest solution. """
self._solutions[0].leaf_node().vector()[:] = self._solutions[1].leaf_node().vector()
[docs] def save_newton_solution(self):
""" When not using AMR, we can save a copy of the solution from the latest successful Newton iteration.
This can be useful, since a failed Newton iteration will blow up the solution, replacing it with garbage.
This will fail if the mesh has been changed by the adaptive solver
and `self.newton_solution` has not been reinitialized with
`self.newton_solution = fenics.Function(self.function_space)`.
"""
self.newton_solution.vector()[:] = self._solutions[0].vector()
[docs] def load_newton_solution(self):
""" When not using AMR, we can load a copy of the solution from the latest successful Newton iteration.
This can be useful, since a failed Newton iteration will blow up the solution, replacing it with garbage.
This will fail if the mesh has been changed by the adaptive solver
and `self.newton_solution` has not been reinitialized with
`self.newton_solution = fenics.Function(self.function_space)`.
"""
self._solutions[0].vector()[:] = self.newton_solution.vector()
[docs] def set_solution_on_subdomain(self, subdomain, values):
""" Abuse `fenics.DirichletBC` to set values of a function on a subdomain.
Parameters
----------
subdomain
`fenics.SubDomain`
values
container of objects that would typically be passed to
`fenics.DirichletBC` as the values of the boundary condition,
one for each subspace of the mixed finite element solution space
"""
function_space = fenics.FunctionSpace(self.mesh.leaf_node(), self.element())
new_solution = fenics.Function(function_space)
new_solution.vector()[:] = self.solution.vector()
for function_subspace_index in range(len(fenics.split(self.solution))):
hack = fenics.DirichletBC(
function_space.sub(function_subspace_index),
values[function_subspace_index],
subdomain)
hack.apply(new_solution.vector())
self.solution.vector()[:] = new_solution.vector()
[docs] def deepcopy(self):
""" Return an entire deep copy of `self`.
For example, this is useful for checkpointing small problems in memory,
or for running a batch of simulations with parameter changes.
"""
sim = type(self)(
time_order = self.time_order,
integration_measure = self.integration_measure(),
setup_solver = False)
sim._mesh = fenics.Mesh(self.mesh)
sim._function_space = fenics.FunctionSpace(sim.mesh, sim._element)
for i in range(len(self._solutions)):
sim._solutions[i] = fenics.Function(sim.function_space)
sim._solutions[i].leaf_node().vector()[:] = self._solutions[i].leaf_node().vector()
sim._times[i] = 0. + self._times[i]
for i in range(len(self._timestep_sizes)):
sim._timestep_sizes[i] = self._timestep_sizes[i]
sim.setup_solver()
sim.solver.parameters = self.solver.parameters.copy()
return sim
[docs] def print_constants(self):
""" Print the names and values of all `fenics.Constant` attributes.
For example, this is useful for verifying that the correct parameters
have been set.
"""
for key in self.__dict__.keys():
attribute = self.__dict__[key]
if type(attribute) is type(fenics.Constant(0.)):
print(attribute.name() + " = " + str(attribute.values()))
[docs] def write_checkpoint(self, filepath):
"""Write solutions, times, and timestep sizes to a checkpoint file."""
print("Writing checkpoint to " + filepath)
with fenics.HDF5File(self.mesh.mpi_comm(), filepath, "w") as h5:
h5.write(self._solutions[0].function_space().mesh().leaf_node(), "mesh")
for i in range(len(self._solutions)):
h5.write(self._solutions[i].leaf_node(), "solution" + str(i))
""" The fenics.HDF5File interface does not allow us to write floats,
but rather only a numpy array. """
h5.write(numpy.array((self._times[i],)), "time" + str(i))
[docs] def read_checkpoint(self, filepath):
"""Read solutions and times from a checkpoint file."""
self._mesh = fenics.Mesh()
print("Reading checkpoint from " + filepath)
with fenics.HDF5File(self.mesh.mpi_comm(), filepath, "r") as h5:
h5.read(self._mesh, "mesh", True)
self._function_space = fenics.FunctionSpace(self.mesh, self._element)
for i in range(self.time_order + 1):
self._solutions[i] = fenics.Function(self.function_space)
h5.read(self._solutions[i], "solution" + str(i))
""" fenics.HDF5File doesn't implement read methods for every write method.
Our only option here seems to be to use a fenics.Vector to store values,
because a reader is implemented for GenericVector, which Vector inherits from.
Furthermore, for the correct read method to be called, we must pass a boolean
as a third argument related to distributed memory.
"""
time = fenics.Vector(fenics.mpi_comm_world(), 1)
h5.read(time, "time" + str(i), False)
self._times[i] = time.get_local()[0]
self.newton_solution = fenics.Function(self.function_space)
self.setup_solver()
[docs] def write_solution(self, file, solution_index = 0):
""" Write the solution to a file.
Parameters
----------
file : fenics.XDMFFile
This method should have been called from within the context of the open `file`.
"""
print("Writing solution to " + file.path)
for var in self._solutions[solution_index].leaf_node().split():
file.write(var, self._times[solution_index])
def convert_checkpoints_to_xdmf_solution(self, checkpoint_dir, xdmf_solution_filepath):
with phaseflow.helpers.SolutionFile(xdmf_solution_filepath) as xdmf_solution_file:
for filename in os.listdir(checkpoint_dir):
if ("checkpoint" in filename) and filename.endswith(".h5"):
self.read_checkpoint(checkpoint_dir + "/" + filename)
self.write_solution(xdmf_solution_file)
[docs] def plot(self, solution_index = 0, savefigs = False):
""" Plot the adaptive mesh and all parts of the mixed finite element solution. """
if not (self.output_dir == ""):
phaseflow.helpers.mkdir_p(self.output_dir)
self._plot(
solution = self._solutions[solution_index],
time = self._times[solution_index],
savefigs = savefigs)
def _plot(self, solution, time, savefigs = False):
phaseflow.plotting.plot(solution.function_space().mesh().leaf_node())
matplotlib.pyplot.title("$\Omega_h, t = " + str(time) + "$")
matplotlib.pyplot.xlabel("$x$")
matplotlib.pyplot.ylabel("$y$")
if savefigs:
matplotlib.pyplot.savefig(fname = self.output_dir + "mesh_t" + str(time) + ".png")
matplotlib.pyplot.show()
w = solution.leaf_node().split()
for i in range(len(w)):
some_mappable_thing = phaseflow.plotting.plot(w[i])
matplotlib.pyplot.colorbar(some_mappable_thing)
matplotlib.pyplot.title("$w_" + str(i) + ", t = " + str(time) + "$")
matplotlib.pyplot.xlabel("$x$")
matplotlib.pyplot.ylabel("$y$")
if savefigs:
matplotlib.pyplot.savefig(fname = self.output_dir + "w" + str(i) + "_t" + str(time) + ".png")
matplotlib.pyplot.show()
[docs]def share_solver_parameters(share_to_parameters, share_from_parameters):
""" FEniCS implements a setter for the solver parameters which does not allow us to
`adaptive_solver.parameters["nonlinear_variational_solver"] = solver.parameters`
so we recursively catch the resulting KeyError exception and set all parameters.
"""
for key in share_from_parameters:
try:
share_to_parameters[key] = share_from_parameters[key]
except KeyError:
share_solver_parameters(share_to_parameters[key], share_from_parameters[key])