#
# partIntfHelpers.py: partitioning interface helper functions
#
# Copyright (C) 2002 Red Hat, Inc. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Author(s): Matt Wilson
# Jeremy Katz
# Mike Fulbright
# Harald Hoyer
#
"""Helper functions shared between partitioning interfaces."""
from abc import ABCMeta, abstractmethod
import string
from constants import *
import parted
import iutil
import network
from storage.formats import getFormat
from storage.devicelibs.lvm import LVM_MAX_NAME_LEN
import gettext
_ = lambda x: gettext.ldgettext("anaconda", x)
import logging
log = logging.getLogger("anaconda")
def sanityCheckVolumeGroupName(volname):
"""Make sure that the volume group name doesn't contain invalid chars."""
badNames = ['lvm', 'root', '.', '..' ]
if not volname:
return _("Please enter a volume group name.")
# ripped the value for this out of linux/include/lvm.h
if len(volname) > LVM_MAX_NAME_LEN:
return _("Volume Group Names must be less than %d characters") % LVM_MAX_NAME_LEN
if volname in badNames:
return _("Error - the volume group name %s is not valid." % (volname,))
for i in range(0, len(volname)):
rc = string.find(string.letters + string.digits + '.' + '_' + '-', volname[i])
if rc == -1:
return _("Error - the volume group name contains illegal "
"characters or spaces. Acceptable characters "
"are letters, digits, '.' or '_'.")
return None
def sanityCheckLogicalVolumeName(logvolname):
"""Make sure that the logical volume name doesn't contain invalid chars."""
badNames = ['group', '.', '..' ]
if not logvolname:
return _("Please enter a logical volume name.")
# ripped the value for this out of linux/include/lvm.h
if len(logvolname) > LVM_MAX_NAME_LEN:
return _("Logical Volume Names must be less than %d characters") % LVM_MAX_NAME_LEN
if logvolname in badNames:
return _("Error - the logical volume name %s is not "
"valid." % (logvolname,))
for i in range(0, len(logvolname)):
rc = string.find(string.letters + string.digits + '.' + '_', logvolname[i])
if rc == -1:
return _("Error - the logical volume name contains illegal "
"characters or spaces. Acceptable characters "
"are letters, digits, '.' or '_'.")
return None
def sanityCheckMountPoint(mntpt):
"""Sanity check that the mountpoint is valid.
mntpt is the mountpoint being used.
The Rules
Start with one /
Don't end with /
No spaces
No /../
No /./
No //
Don't end with /..
Don't end with /.
"""
if not mntpt.startswith("/") or \
(len(mntpt) > 1 and mntpt.endswith("/")) or \
" " in mntpt or \
"/../" in mntpt or \
"/./" in mntpt or \
"//" in mntpt or \
mntpt.endswith("/..") or \
mntpt.endswith("/.") :
return _("The mount point %s is invalid. Mount points must start "
"with '/' and cannot end with '/', and must contain "
"printable characters and no spaces.") % mntpt
def doDeleteDevice(intf, storage, device, confirm=1, quiet=0):
"""Delete a partition from the request list.
intf is the interface
storage is the storage instance
device is the device to delete
"""
if not device:
intf.messageWindow(_("Unable To Delete"),
_("You must first select a partition to delete."),
custom_icon="error")
return False
reason = storage.deviceImmutable(device)
if reason:
intf.messageWindow(_("Unable To Delete"),
reason,
custom_icon="error")
return False
if confirm and not confirmDelete(intf, device):
return False
deps = storage.deviceDeps(device)
while deps:
leaves = [d for d in deps if d.isleaf]
for leaf in leaves:
storage.destroyDevice(leaf)
deps.remove(leaf)
storage.destroyDevice(device)
return True
def doClearPartitionedDevice(intf, storage, device, confirm=1, quiet=0):
""" Remove all devices/partitions currently on device.
device -- a partitioned device such as a disk
"""
if confirm:
rc = intf.messageWindow(_("Confirm Delete"),
_("You are about to delete all partitions on "
"the device '%s'.") % (device.path,),
type="custom", custom_icon="warning",
custom_buttons=[_("Cancel"), _("_Delete")])
if not rc:
return False
immutable = []
partitions = [p for p in storage.partitions if p.disk == device]
if not partitions:
return False
partitions.sort(key=lambda p: p.partedPartition.number, reverse=True)
for p in partitions:
deps = storage.deviceDeps(p)
clean = True # true if part and its deps were removed
while deps:
leaves = [d for d in deps if d.isleaf]
for leaf in leaves:
if leaf in immutable:
# this device was removed from deps at the same time it
# was added to immutable, so it won't appear in leaves
# in the next iteration
continue
if storage.deviceImmutable(leaf):
immutable.append(leaf)
for dep in [d for d in deps if d != leaf]:
# mark devices this device depends on as immutable
# to prevent getting stuck with non-leaf deps
# protected by immutable leaf devices
if leaf.dependsOn(dep):
deps.remove(dep)
if dep not in immutable:
immutable.append(dep)
clean = False
else:
storage.destroyDevice(leaf)
deps.remove(leaf)
if storage.deviceImmutable(p):
immutable.append(p)
clean = False
if clean:
storage.destroyDevice(p)
if immutable and not quiet:
remaining = "\t" + "\n\t".join(p.path for p in immutable) + "\n"
intf.messageWindow(_("Notice"),
_("The following partitions were not deleted "
"because they are in use:\n\n%s") % remaining,
custom_icon="warning")
return True
def checkForSwapNoMatch(anaconda):
"""Check for any partitions of type 0x82 which don't have a swap fs."""
for device in anaconda.id.storage.partitions:
if not device.exists:
# this is only for existing partitions
continue
if device.getFlag(parted.PARTITION_SWAP) and \
not device.format.type == "swap":
rc = anaconda.intf.messageWindow(_("Format as Swap?"),
_("%s has a partition type of 0x82 "
"(Linux swap) but does not appear to "
"be formatted as a Linux swap "
"partition.\n\n"
"Would you like to format this "
"partition as a swap partition?")
% device.path, type = "yesno",
custom_icon="question")
if rc == 1:
format = getFormat("swap", device=device.path)
anaconda.id.storage.formatDevice(device, format)
return
def mustHaveSelectedDrive(intf):
txt =_("You need to select at least one hard drive to install %s.") % (productName,)
intf.messageWindow(_("Error"), txt, custom_icon="error")
def queryNoFormatPreExisting(intf):
"""Ensure the user wants to use a partition without formatting."""
txt = _("You have chosen to use a pre-existing "
"partition for this installation without formatting it. "
"We recommend that you format this partition "
"to make sure files from a previous operating system installation "
"do not cause problems with this installation of Linux. "
"However, if this partition contains files that you need "
"to keep, such as home directories, then "
"continue without formatting this partition.")
rc = intf.messageWindow(_("Format?"), txt, type = "custom", custom_buttons=[_("_Modify Partition"), _("Do _Not Format")], custom_icon="warning")
return rc
def partitionSanityErrors(intf, errors):
"""Errors were found sanity checking. Tell the user they must fix."""
rc = 1
if errors:
errorstr = string.join(errors, "\n\n")
rc = intf.messageWindow(_("Error with Partitioning"),
_("The following critical errors exist "
"with your requested partitioning "
"scheme. "
"These errors must be corrected prior "
"to continuing with your install of "
"%(productName)s.\n\n%(errorstr)s") \
% {'productName': productName,
'errorstr': errorstr},
custom_icon="error")
return rc
def partitionSanityWarnings(intf, warnings):
"""Sanity check found warnings. Make sure the user wants to continue."""
rc = 1
if warnings:
warningstr = string.join(warnings, "\n\n")
rc = intf.messageWindow(_("Partitioning Warning"),
_("The following warnings exist with "
"your requested partition scheme.\n\n%s"
"\n\nWould you like to continue with "
"your requested partitioning "
"scheme?") % (warningstr),
type="yesno", custom_icon="warning")
return rc
def partitionPreExistFormatWarnings(intf, warnings):
"""Double check that preexistings being formatted are fine."""
rc = 1
if warnings:
labelstr1 = _("The following pre-existing partitions have been "
"selected to be formatted, destroying all data.")
labelstr2 = _("Select 'Yes' to continue and format these "
"partitions, or 'No' to go back and change these "
"settings.")
commentstr = ""
for (dev, type, mntpt) in warnings:
commentstr = commentstr + "/dev/%s %s %s\n" % (dev,type,mntpt)
rc = intf.messageWindow(_("Format Warning"), "%s\n\n%s\n\n%s" %
(labelstr1, labelstr2, commentstr),
type="yesno", custom_icon="warning")
return rc
def getPreExistFormatWarnings(storage):
"""Return a list of preexisting devices being formatted."""
devices = []
for device in storage.devicetree.devices:
if device.exists and not device.format.exists and \
not device.format.hidden:
devices.append(device)
devices.sort(key=lambda d: d.name)
rc = []
for device in devices:
rc.append((device.path,
device.format.name,
getattr(device.format, "mountpoint", "")))
return rc
def confirmDelete(intf, device):
"""Confirm the deletion of a device."""
if not device:
return
if device.type == "lvmvg":
errmsg = (_("You are about to delete the volume group \"%s\"."
"\n\nALL logical volumes in this volume group "
"will be lost!") % device.name)
elif device.type == "lvmlv":
errmsg = (_("You are about to delete the logical volume \"%s\".")
% device.name)
elif device.type == "mdarray":
errmsg = _("You are about to delete a RAID device.")
elif device.type == "partition":
errmsg = (_("You are about to delete the %s partition.")
% device.path)
else:
# we may want something a little bit prettier than device.type
errmsg = (_("You are about to delete the %(type)s %(name)s") \
% {'type': device.type, 'name': device.name})
rc = intf.messageWindow(_("Confirm Delete"), errmsg, type="custom",
custom_buttons=[_("Cancel"), _("_Delete")],
custom_icon="question")
return rc
def confirmResetPartitionState(intf):
"""Confirm reset of partitioning to that present on the system."""
rc = intf.messageWindow(_("Confirm Reset"),
_("Are you sure you want to reset the "
"partition table to its original state?"),
type="yesno", custom_icon="question")
return rc
""" iSCSI GUI helper objects """
# the credentials constants: are necessary to implement a concrete iSCSIWizard
CRED_NONE = (0, _("No authentication"))
CRED_ONE = (1, _("CHAP pair"))
CRED_BOTH = (2, _("CHAP pair and a reverse pair"))
CRED_REUSE = (3, _("Use the credentials from the discovery step"))
def parse_ip(string_ip):
"""
May rise network.IPMissing or network.IPError
Returns (ip, port) tuple.
"""
count = len(string_ip.split(":"))
idx = string_ip.rfind("]:")
# Check for IPV6 [IPV6-ip]:port
if idx != -1:
ip = string_ip[1:idx]
port = string_ip[idx+2:]
# Check for IPV4 aaa.bbb.ccc.ddd:port
elif count == 2:
idx = string_ip.rfind(":")
ip = string_ip[:idx]
port = string_ip[idx+1:]
else:
ip = string_ip
port = "3260"
network.sanityCheckIPString(ip)
return (ip, port)
class iSCSIWizard():
"""
A base class for both the GUI and TUI iSCSI wizards.
To get an instantiable class, all its methods have to be overriden.
"""
__metaclass__ = ABCMeta
@abstractmethod
def destroy_dialogs(self):
pass
@abstractmethod
def display_discovery_dialog(self, initiator, initiator_set):
pass
@abstractmethod
def display_login_dialog(self):
pass
@abstractmethod
def display_nodes_dialog(self, found_nodes, ifaces):
pass
@abstractmethod
def display_success_dialog(self, success_nodes, fail_nodes, fail_reason,
ifaces):
pass
@abstractmethod
def get_discovery_dict(self):
pass
@abstractmethod
def get_initiator(self):
pass
@abstractmethod
def get_login_dict(self):
pass
@abstractmethod
def set_initiator(self, initiator, initiator_set):
pass
def drive_iscsi_addition(anaconda, wizard):
"""
This method is the UI controller that drives adding of iSCSI drives
wizard is the UI wizard object of class derived from iSCSIWizard.
Returns a list of all newly added iSCSI nodes (or empty list on error etc.)
"""
STEP_DISCOVERY = 0
STEP_NODES = 1
STEP_LOGIN = 2
STEP_SUMMARY = 3
STEP_STABILIZE = 4
STEP_DONE = 10
login_ok_nodes = []
step = STEP_DISCOVERY
while step != STEP_DONE:
# go through the wizard's dialogs, read the user input (selected nodes,
# login credentials) and provide it to the iscsi subsystem
try:
if step == STEP_DISCOVERY:
rc = wizard.display_discovery_dialog(
anaconda.id.storage.iscsi.initiator,
anaconda.id.storage.iscsi.initiatorSet)
if not rc:
break
anaconda.id.storage.iscsi.initiator = wizard.get_initiator()
discovery_dict = wizard.get_discovery_dict()
discovery_dict["intf"] = anaconda.intf
found_nodes = anaconda.id.storage.iscsi.discover(**discovery_dict)
step = STEP_NODES
elif step == STEP_NODES:
if not found_nodes:
log.debug("iscsi: no iSCSI nodes to log in")
anaconda.intf.messageWindow(_("iSCSI Nodes"),
_("No iSCSI nodes to log in"))
break
(rc, selected_nodes) = wizard.display_nodes_dialog(found_nodes,
anaconda.id.storage.iscsi.ifaces)
if not rc or len(selected_nodes) == 0:
break
step = STEP_LOGIN
elif step == STEP_LOGIN:
rc = wizard.display_login_dialog()
if not rc:
break
login_dict = wizard.get_login_dict()
login_dict["intf"] = anaconda.intf
login_fail_nodes = []
login_fail_msg = ""
for node in selected_nodes:
(rc, msg) = anaconda.id.storage.iscsi.log_into_node(node,
**login_dict)
if rc:
login_ok_nodes.append(node)
else:
login_fail_nodes.append(node)
if msg:
# only remember the last message:
login_fail_msg = msg
step = STEP_SUMMARY
elif step == STEP_SUMMARY:
rc = wizard.display_success_dialog(login_ok_nodes,
login_fail_nodes,
login_fail_msg,
anaconda.id.storage.iscsi.ifaces)
if rc:
step = STEP_STABILIZE
else:
# user wants to try logging into the failed nodes again
found_nodes = login_fail_nodes
step = STEP_NODES
elif step == STEP_STABILIZE:
anaconda.id.storage.iscsi.stabilize(anaconda.intf)
step = STEP_DONE
except (network.IPMissing, network.IPError) as msg:
log.info("addIscsiDrive() cancelled due to an invalid IP address.")
anaconda.intf.messageWindow(_("iSCSI Error"), str(msg))
if step != STEP_DISCOVERY:
break
except (ValueError, IOError) as e:
log.info("addIscsiDrive() IOError exception: %s" % e)
step_str = _("Discovery") if step == STEP_DISCOVERY else _("Login")
anaconda.intf.messageWindow(_("iSCSI %s Error") % step_str, str(e))
break
wizard.destroy_dialogs()
return login_ok_nodes