Shared filesystem management project for OpenStack.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

driver.py 65KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554
  1. # Copyright 2016 Mirantis Inc.
  2. # All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """
  16. Module with ZFSonLinux share driver that utilizes ZFS filesystem resources
  17. and exports them as shares.
  18. """
  19. import math
  20. import os
  21. import time
  22. from oslo_config import cfg
  23. from oslo_log import log
  24. from oslo_utils import importutils
  25. from oslo_utils import strutils
  26. from oslo_utils import timeutils
  27. from manila.common import constants
  28. from manila import exception
  29. from manila.i18n import _
  30. from manila.share import configuration
  31. from manila.share import driver
  32. from manila.share.drivers.zfsonlinux import utils as zfs_utils
  33. from manila.share.manager import share_manager_opts # noqa
  34. from manila.share import share_types
  35. from manila.share import utils as share_utils
  36. from manila import utils
  37. zfsonlinux_opts = [
  38. cfg.HostAddressOpt(
  39. "zfs_share_export_ip",
  40. required=True,
  41. help="IP to be added to user-facing export location. Required."),
  42. cfg.HostAddressOpt(
  43. "zfs_service_ip",
  44. required=True,
  45. help="IP to be added to admin-facing export location. Required."),
  46. cfg.ListOpt(
  47. "zfs_zpool_list",
  48. required=True,
  49. help="Specify list of zpools that are allowed to be used by backend. "
  50. "Can contain nested datasets. Examples: "
  51. "Without nested dataset: 'zpool_name'. "
  52. "With nested dataset: 'zpool_name/nested_dataset_name'. "
  53. "Required."),
  54. cfg.ListOpt(
  55. "zfs_dataset_creation_options",
  56. help="Define here list of options that should be applied "
  57. "for each dataset creation if needed. Example: "
  58. "compression=gzip,dedup=off. "
  59. "Note that, for secondary replicas option 'readonly' will be set "
  60. "to 'on' and for active replicas to 'off' in any way. "
  61. "Also, 'quota' will be equal to share size. Optional."),
  62. cfg.StrOpt(
  63. "zfs_dataset_name_prefix",
  64. default='manila_share_',
  65. help="Prefix to be used in each dataset name. Optional."),
  66. cfg.StrOpt(
  67. "zfs_dataset_snapshot_name_prefix",
  68. default='manila_share_snapshot_',
  69. help="Prefix to be used in each dataset snapshot name. Optional."),
  70. cfg.BoolOpt(
  71. "zfs_use_ssh",
  72. default=False,
  73. help="Remote ZFS storage hostname that should be used for SSH'ing. "
  74. "Optional."),
  75. cfg.StrOpt(
  76. "zfs_ssh_username",
  77. help="SSH user that will be used in 2 cases: "
  78. "1) By manila-share service in case it is located on different "
  79. "host than its ZFS storage. "
  80. "2) By manila-share services with other ZFS backends that "
  81. "perform replication. "
  82. "It is expected that SSH'ing will be key-based, passwordless. "
  83. "This user should be passwordless sudoer. Optional."),
  84. cfg.StrOpt(
  85. "zfs_ssh_user_password",
  86. secret=True,
  87. help="Password for user that is used for SSH'ing ZFS storage host. "
  88. "Not used for replication operations. They require "
  89. "passwordless SSH access. Optional."),
  90. cfg.StrOpt(
  91. "zfs_ssh_private_key_path",
  92. help="Path to SSH private key that should be used for SSH'ing ZFS "
  93. "storage host. Not used for replication operations. Optional."),
  94. cfg.ListOpt(
  95. "zfs_share_helpers",
  96. required=True,
  97. default=[
  98. "NFS=manila.share.drivers.zfsonlinux.utils.NFSviaZFSHelper",
  99. ],
  100. help="Specify list of share export helpers for ZFS storage. "
  101. "It should look like following: "
  102. "'FOO_protocol=foo.FooClass,BAR_protocol=bar.BarClass'. "
  103. "Required."),
  104. cfg.StrOpt(
  105. "zfs_replica_snapshot_prefix",
  106. required=True,
  107. default="tmp_snapshot_for_replication_",
  108. help="Set snapshot prefix for usage in ZFS replication. Required."),
  109. cfg.StrOpt(
  110. "zfs_migration_snapshot_prefix",
  111. required=True,
  112. default="tmp_snapshot_for_share_migration_",
  113. help="Set snapshot prefix for usage in ZFS migration. Required."),
  114. ]
  115. CONF = cfg.CONF
  116. CONF.register_opts(zfsonlinux_opts)
  117. LOG = log.getLogger(__name__)
  118. def ensure_share_server_not_provided(f):
  119. def wrap(self, context, *args, **kwargs):
  120. server = kwargs.get(
  121. "share_server", kwargs.get("destination_share_server"))
  122. if server:
  123. raise exception.InvalidInput(
  124. reason=_("Share server handling is not available. "
  125. "But 'share_server' was provided. '%s'. "
  126. "Share network should not be used.") % server.get(
  127. "id", server))
  128. return f(self, context, *args, **kwargs)
  129. return wrap
  130. def get_backend_configuration(backend_name):
  131. config_stanzas = CONF.list_all_sections()
  132. if backend_name not in config_stanzas:
  133. msg = _("Could not find backend stanza %(backend_name)s in "
  134. "configuration which is required for share replication and "
  135. "migration. Available stanzas are %(stanzas)s")
  136. params = {
  137. "stanzas": config_stanzas,
  138. "backend_name": backend_name,
  139. }
  140. raise exception.BadConfigurationException(reason=msg % params)
  141. config = configuration.Configuration(
  142. driver.share_opts, config_group=backend_name)
  143. config.append_config_values(zfsonlinux_opts)
  144. config.append_config_values(share_manager_opts)
  145. config.append_config_values(driver.ssh_opts)
  146. return config
  147. class ZFSonLinuxShareDriver(zfs_utils.ExecuteMixin, driver.ShareDriver):
  148. def __init__(self, *args, **kwargs):
  149. super(self.__class__, self).__init__(
  150. [False], *args, config_opts=[zfsonlinux_opts], **kwargs)
  151. self.replica_snapshot_prefix = (
  152. self.configuration.zfs_replica_snapshot_prefix)
  153. self.migration_snapshot_prefix = (
  154. self.configuration.zfs_migration_snapshot_prefix)
  155. self.backend_name = self.configuration.safe_get(
  156. 'share_backend_name') or 'ZFSonLinux'
  157. self.zpool_list = self._get_zpool_list()
  158. self.dataset_creation_options = (
  159. self.configuration.zfs_dataset_creation_options)
  160. self.share_export_ip = self.configuration.zfs_share_export_ip
  161. self.service_ip = self.configuration.zfs_service_ip
  162. self.private_storage = kwargs.get('private_storage')
  163. self._helpers = {}
  164. # Set config based capabilities
  165. self._init_common_capabilities()
  166. self._shell_executors = {}
  167. def _get_shell_executor_by_host(self, host):
  168. backend_name = share_utils.extract_host(host, level='backend_name')
  169. if backend_name in CONF.enabled_share_backends:
  170. # Return executor of this host
  171. return self.execute
  172. elif backend_name not in self._shell_executors:
  173. config = get_backend_configuration(backend_name)
  174. self._shell_executors[backend_name] = (
  175. zfs_utils.get_remote_shell_executor(
  176. ip=config.zfs_service_ip,
  177. port=22,
  178. conn_timeout=config.ssh_conn_timeout,
  179. login=config.zfs_ssh_username,
  180. password=config.zfs_ssh_user_password,
  181. privatekey=config.zfs_ssh_private_key_path,
  182. max_size=10,
  183. )
  184. )
  185. # Return executor of remote host
  186. return self._shell_executors[backend_name]
  187. def _init_common_capabilities(self):
  188. self.common_capabilities = {}
  189. if 'dedup=on' in self.dataset_creation_options:
  190. self.common_capabilities['dedupe'] = [True]
  191. elif 'dedup=off' in self.dataset_creation_options:
  192. self.common_capabilities['dedupe'] = [False]
  193. else:
  194. self.common_capabilities['dedupe'] = [True, False]
  195. if 'compression=off' in self.dataset_creation_options:
  196. self.common_capabilities['compression'] = [False]
  197. elif any('compression=' in option
  198. for option in self.dataset_creation_options):
  199. self.common_capabilities['compression'] = [True]
  200. else:
  201. self.common_capabilities['compression'] = [True, False]
  202. # NOTE(vponomaryov): Driver uses 'quota' approach for
  203. # ZFS dataset. So, we can consider it as
  204. # 'always thin provisioned' because this driver never reserves
  205. # space for dataset.
  206. self.common_capabilities['thin_provisioning'] = [True]
  207. self.common_capabilities['max_over_subscription_ratio'] = (
  208. self.configuration.max_over_subscription_ratio)
  209. self.common_capabilities['qos'] = [False]
  210. def _get_zpool_list(self):
  211. zpools = []
  212. for zpool in self.configuration.zfs_zpool_list:
  213. zpool_name = zpool.split('/')[0]
  214. if zpool_name in zpools:
  215. raise exception.BadConfigurationException(
  216. reason=_("Using the same zpool twice is prohibited. "
  217. "Duplicate is '%(zpool)s'. List of zpools: "
  218. "%(zpool_list)s.") % {
  219. 'zpool': zpool,
  220. 'zpool_list': ', '.join(
  221. self.configuration.zfs_zpool_list)})
  222. zpools.append(zpool_name)
  223. return zpools
  224. @zfs_utils.zfs_dataset_synchronized
  225. def _delete_dataset_or_snapshot_with_retry(self, name):
  226. """Attempts to destroy some dataset or snapshot with retries."""
  227. # NOTE(vponomaryov): it is possible to see 'dataset is busy' error
  228. # under the load. So, we are ok to perform retry in this case.
  229. mountpoint = self.get_zfs_option(name, 'mountpoint')
  230. if '@' not in name:
  231. # NOTE(vponomaryov): check that dataset has no open files.
  232. start_point = time.time()
  233. while time.time() - start_point < 60:
  234. try:
  235. out, err = self.execute('lsof', '-w', mountpoint)
  236. except exception.ProcessExecutionError:
  237. # NOTE(vponomaryov): lsof returns code 1 if search
  238. # didn't give results.
  239. break
  240. LOG.debug("Cannot destroy dataset '%(name)s', it has "
  241. "opened files. Will wait 2 more seconds. "
  242. "Out: \n%(out)s", {
  243. 'name': name, 'out': out})
  244. time.sleep(2)
  245. else:
  246. raise exception.ZFSonLinuxException(
  247. msg=_("Could not destroy '%s' dataset, "
  248. "because it had opened files.") % name)
  249. # NOTE(vponomaryov): Now, when no file usages and mounts of dataset
  250. # exist, destroy dataset.
  251. try:
  252. self.zfs('destroy', '-f', name)
  253. return
  254. except exception.ProcessExecutionError:
  255. LOG.info("Failed to destroy ZFS dataset, retrying one time")
  256. # NOTE(bswartz): There appears to be a bug in ZFS when creating and
  257. # destroying datasets concurrently where the filesystem remains mounted
  258. # even though ZFS thinks it's unmounted. The most reliable workaround
  259. # I've found is to force the unmount, then retry the destroy, with
  260. # short pauses around the unmount.
  261. time.sleep(1)
  262. try:
  263. self.execute('sudo', 'umount', mountpoint)
  264. except exception.ProcessExecutionError:
  265. # Ignore failed umount, it's normal
  266. pass
  267. time.sleep(1)
  268. # This time the destroy is expected to succeed.
  269. self.zfs('destroy', '-f', name)
  270. def _setup_helpers(self):
  271. """Setups share helper for ZFS backend."""
  272. self._helpers = {}
  273. helpers = self.configuration.zfs_share_helpers
  274. if helpers:
  275. for helper_str in helpers:
  276. share_proto, __, import_str = helper_str.partition('=')
  277. helper = importutils.import_class(import_str)
  278. self._helpers[share_proto.upper()] = helper(
  279. self.configuration)
  280. else:
  281. raise exception.BadConfigurationException(
  282. reason=_(
  283. "No share helpers selected for ZFSonLinux Driver. "
  284. "Please specify using config option 'zfs_share_helpers'."))
  285. def _get_share_helper(self, share_proto):
  286. """Returns share helper specific for used share protocol."""
  287. helper = self._helpers.get(share_proto)
  288. if helper:
  289. return helper
  290. else:
  291. raise exception.InvalidShare(
  292. reason=_("Wrong, unsupported or disabled protocol - "
  293. "'%s'.") % share_proto)
  294. def do_setup(self, context):
  295. """Perform basic setup and checks."""
  296. super(self.__class__, self).do_setup(context)
  297. self._setup_helpers()
  298. for ip in (self.share_export_ip, self.service_ip):
  299. if not utils.is_valid_ip_address(ip, 4):
  300. raise exception.BadConfigurationException(
  301. reason=_("Wrong IP address provided: "
  302. "%s") % self.share_export_ip)
  303. if not self.zpool_list:
  304. raise exception.BadConfigurationException(
  305. reason=_("No zpools specified for usage: "
  306. "%s") % self.zpool_list)
  307. # Make pool mounts shared so that cloned namespaces receive unmounts
  308. # and don't prevent us from unmounting datasets
  309. for zpool in self.configuration.zfs_zpool_list:
  310. self.execute('sudo', 'mount', '--make-rshared', ('/%s' % zpool))
  311. if self.configuration.zfs_use_ssh:
  312. # Check workability of SSH executor
  313. self.ssh_executor('whoami')
  314. def _get_pools_info(self):
  315. """Returns info about all pools used by backend."""
  316. pools = []
  317. for zpool in self.zpool_list:
  318. free_size = self.get_zpool_option(zpool, 'free')
  319. free_size = utils.translate_string_size_to_float(free_size)
  320. total_size = self.get_zpool_option(zpool, 'size')
  321. total_size = utils.translate_string_size_to_float(total_size)
  322. pool = {
  323. 'pool_name': zpool,
  324. 'total_capacity_gb': float(total_size),
  325. 'free_capacity_gb': float(free_size),
  326. 'reserved_percentage':
  327. self.configuration.reserved_share_percentage,
  328. }
  329. pool.update(self.common_capabilities)
  330. if self.configuration.replication_domain:
  331. pool['replication_type'] = 'readable'
  332. pools.append(pool)
  333. return pools
  334. def _update_share_stats(self):
  335. """Retrieves share stats info."""
  336. data = {
  337. 'share_backend_name': self.backend_name,
  338. 'storage_protocol': 'NFS',
  339. 'reserved_percentage':
  340. self.configuration.reserved_share_percentage,
  341. 'snapshot_support': True,
  342. 'create_share_from_snapshot_support': True,
  343. 'driver_name': 'ZFS',
  344. 'pools': self._get_pools_info(),
  345. }
  346. if self.configuration.replication_domain:
  347. data['replication_type'] = 'readable'
  348. super(self.__class__, self)._update_share_stats(data)
  349. def _get_share_name(self, share_id):
  350. """Returns name of dataset used for given share."""
  351. prefix = self.configuration.zfs_dataset_name_prefix or ''
  352. return prefix + share_id.replace('-', '_')
  353. def _get_snapshot_name(self, snapshot_id):
  354. """Returns name of dataset snapshot used for given share snapshot."""
  355. prefix = self.configuration.zfs_dataset_snapshot_name_prefix or ''
  356. return prefix + snapshot_id.replace('-', '_')
  357. def _get_dataset_creation_options(self, share, is_readonly=False):
  358. """Returns list of options to be used for dataset creation."""
  359. options = ['quota=%sG' % share['size']]
  360. extra_specs = share_types.get_extra_specs_from_share(share)
  361. dedupe_set = False
  362. dedupe = extra_specs.get('dedupe')
  363. if dedupe:
  364. dedupe = strutils.bool_from_string(
  365. dedupe.lower().split(' ')[-1], default=dedupe)
  366. if (dedupe in self.common_capabilities['dedupe']):
  367. options.append('dedup=%s' % ('on' if dedupe else 'off'))
  368. dedupe_set = True
  369. else:
  370. raise exception.ZFSonLinuxException(msg=_(
  371. "Cannot use requested '%(requested)s' value of 'dedupe' "
  372. "extra spec. It does not fit allowed value '%(allowed)s' "
  373. "that is configured for backend.") % {
  374. 'requested': dedupe,
  375. 'allowed': self.common_capabilities['dedupe']})
  376. compression_set = False
  377. compression_type = extra_specs.get('zfsonlinux:compression')
  378. if compression_type:
  379. if (compression_type == 'off' and
  380. False in self.common_capabilities['compression']):
  381. options.append('compression=off')
  382. compression_set = True
  383. elif (compression_type != 'off' and
  384. True in self.common_capabilities['compression']):
  385. options.append('compression=%s' % compression_type)
  386. compression_set = True
  387. else:
  388. raise exception.ZFSonLinuxException(msg=_(
  389. "Cannot use value '%s' of extra spec "
  390. "'zfsonlinux:compression' because compression is disabled "
  391. "for this backend. Set extra spec 'compression=True' to "
  392. "make scheduler pick up appropriate backend."
  393. ) % compression_type)
  394. for option in self.dataset_creation_options or []:
  395. if any(v in option for v in (
  396. 'readonly', 'sharenfs', 'sharesmb', 'quota')):
  397. continue
  398. if 'dedup' in option and dedupe_set is True:
  399. continue
  400. if 'compression' in option and compression_set is True:
  401. continue
  402. options.append(option)
  403. if is_readonly:
  404. options.append('readonly=on')
  405. else:
  406. options.append('readonly=off')
  407. return options
  408. def _get_dataset_name(self, share):
  409. """Returns name of dataset used for given share."""
  410. pool_name = share_utils.extract_host(share['host'], level='pool')
  411. # Pick pool with nested dataset name if set up
  412. for pool in self.configuration.zfs_zpool_list:
  413. pool_data = pool.split('/')
  414. if (pool_name == pool_data[0] and len(pool_data) > 1):
  415. pool_name = pool
  416. if pool_name[-1] == '/':
  417. pool_name = pool_name[0:-1]
  418. break
  419. dataset_name = self._get_share_name(share['id'])
  420. full_dataset_name = '%(pool)s/%(dataset)s' % {
  421. 'pool': pool_name, 'dataset': dataset_name}
  422. return full_dataset_name
  423. @ensure_share_server_not_provided
  424. def create_share(self, context, share, share_server=None):
  425. """Is called to create a share."""
  426. options = self._get_dataset_creation_options(share, is_readonly=False)
  427. cmd = ['create']
  428. for option in options:
  429. cmd.extend(['-o', option])
  430. dataset_name = self._get_dataset_name(share)
  431. cmd.append(dataset_name)
  432. ssh_cmd = '%(username)s@%(host)s' % {
  433. 'username': self.configuration.zfs_ssh_username,
  434. 'host': self.service_ip,
  435. }
  436. pool_name = share_utils.extract_host(share['host'], level='pool')
  437. self.private_storage.update(
  438. share['id'], {
  439. 'entity_type': 'share',
  440. 'dataset_name': dataset_name,
  441. 'ssh_cmd': ssh_cmd, # used with replication and migration
  442. 'pool_name': pool_name, # used in replication
  443. 'used_options': ' '.join(options),
  444. }
  445. )
  446. self.zfs(*cmd)
  447. return self._get_share_helper(
  448. share['share_proto']).create_exports(dataset_name)
  449. @ensure_share_server_not_provided
  450. def delete_share(self, context, share, share_server=None):
  451. """Is called to remove a share."""
  452. pool_name = self.private_storage.get(share['id'], 'pool_name')
  453. pool_name = pool_name or share_utils.extract_host(
  454. share["host"], level="pool")
  455. dataset_name = self.private_storage.get(share['id'], 'dataset_name')
  456. if not dataset_name:
  457. dataset_name = self._get_dataset_name(share)
  458. out, err = self.zfs('list', '-r', pool_name)
  459. data = self.parse_zfs_answer(out)
  460. for datum in data:
  461. if datum['NAME'] != dataset_name:
  462. continue
  463. # Delete dataset's snapshots first
  464. out, err = self.zfs('list', '-r', '-t', 'snapshot', pool_name)
  465. snapshots = self.parse_zfs_answer(out)
  466. full_snapshot_prefix = (
  467. dataset_name + '@')
  468. for snap in snapshots:
  469. if full_snapshot_prefix in snap['NAME']:
  470. self._delete_dataset_or_snapshot_with_retry(snap['NAME'])
  471. self._get_share_helper(
  472. share['share_proto']).remove_exports(dataset_name)
  473. self._delete_dataset_or_snapshot_with_retry(dataset_name)
  474. break
  475. else:
  476. LOG.warning(
  477. "Share with '%(id)s' ID and '%(name)s' NAME is "
  478. "absent on backend. Nothing has been deleted.",
  479. {'id': share['id'], 'name': dataset_name})
  480. self.private_storage.delete(share['id'])
  481. @ensure_share_server_not_provided
  482. def create_snapshot(self, context, snapshot, share_server=None):
  483. """Is called to create a snapshot."""
  484. dataset_name = self.private_storage.get(
  485. snapshot['share_instance_id'], 'dataset_name')
  486. snapshot_tag = self._get_snapshot_name(snapshot['id'])
  487. snapshot_name = dataset_name + '@' + snapshot_tag
  488. self.private_storage.update(
  489. snapshot['snapshot_id'], {
  490. 'entity_type': 'snapshot',
  491. 'snapshot_tag': snapshot_tag,
  492. }
  493. )
  494. self.zfs('snapshot', snapshot_name)
  495. return {"provider_location": snapshot_name}
  496. @ensure_share_server_not_provided
  497. def delete_snapshot(self, context, snapshot, share_server=None):
  498. """Is called to remove a snapshot."""
  499. self._delete_snapshot(context, snapshot)
  500. self.private_storage.delete(snapshot['snapshot_id'])
  501. def _get_saved_snapshot_name(self, snapshot_instance):
  502. snapshot_tag = self.private_storage.get(
  503. snapshot_instance['snapshot_id'], 'snapshot_tag')
  504. dataset_name = self.private_storage.get(
  505. snapshot_instance['share_instance_id'], 'dataset_name')
  506. snapshot_name = dataset_name + '@' + snapshot_tag
  507. return snapshot_name
  508. def _delete_snapshot(self, context, snapshot):
  509. snapshot_name = self._get_saved_snapshot_name(snapshot)
  510. out, err = self.zfs('list', '-r', '-t', 'snapshot', snapshot_name)
  511. data = self.parse_zfs_answer(out)
  512. for datum in data:
  513. if datum['NAME'] == snapshot_name:
  514. self._delete_dataset_or_snapshot_with_retry(snapshot_name)
  515. break
  516. else:
  517. LOG.warning(
  518. "Snapshot with '%(id)s' ID and '%(name)s' NAME is "
  519. "absent on backend. Nothing has been deleted.",
  520. {'id': snapshot['id'], 'name': snapshot_name})
  521. @ensure_share_server_not_provided
  522. def create_share_from_snapshot(self, context, share, snapshot,
  523. share_server=None):
  524. """Is called to create a share from snapshot."""
  525. dataset_name = self._get_dataset_name(share)
  526. ssh_cmd = '%(username)s@%(host)s' % {
  527. 'username': self.configuration.zfs_ssh_username,
  528. 'host': self.service_ip,
  529. }
  530. pool_name = share_utils.extract_host(share['host'], level='pool')
  531. options = self._get_dataset_creation_options(share, is_readonly=False)
  532. self.private_storage.update(
  533. share['id'], {
  534. 'entity_type': 'share',
  535. 'dataset_name': dataset_name,
  536. 'ssh_cmd': ssh_cmd, # used in replication
  537. 'pool_name': pool_name, # used in replication
  538. 'used_options': options,
  539. }
  540. )
  541. snapshot_name = self._get_saved_snapshot_name(snapshot)
  542. self.execute(
  543. # NOTE(vponomaryov): SSH is used as workaround for 'execute'
  544. # implementation restriction that does not support usage of '|'.
  545. 'ssh', ssh_cmd,
  546. 'sudo', 'zfs', 'send', '-vD', snapshot_name, '|',
  547. 'sudo', 'zfs', 'receive', '-v', dataset_name,
  548. )
  549. # Apply options based on used share type that may differ from
  550. # one used for original share.
  551. for option in options:
  552. self.zfs('set', option, dataset_name)
  553. # Delete with retry as right after creation it may be temporary busy.
  554. self.execute_with_retry(
  555. 'sudo', 'zfs', 'destroy',
  556. dataset_name + '@' + snapshot_name.split('@')[-1])
  557. return self._get_share_helper(
  558. share['share_proto']).create_exports(dataset_name)
  559. def get_pool(self, share):
  560. """Return pool name where the share resides on.
  561. :param share: The share hosted by the driver.
  562. """
  563. pool_name = share_utils.extract_host(share['host'], level='pool')
  564. return pool_name
  565. @ensure_share_server_not_provided
  566. def ensure_share(self, context, share, share_server=None):
  567. """Invoked to ensure that given share is exported."""
  568. dataset_name = self.private_storage.get(share['id'], 'dataset_name')
  569. if not dataset_name:
  570. dataset_name = self._get_dataset_name(share)
  571. pool_name = share_utils.extract_host(share['host'], level='pool')
  572. out, err = self.zfs('list', '-r', pool_name)
  573. data = self.parse_zfs_answer(out)
  574. for datum in data:
  575. if datum['NAME'] == dataset_name:
  576. ssh_cmd = '%(username)s@%(host)s' % {
  577. 'username': self.configuration.zfs_ssh_username,
  578. 'host': self.service_ip,
  579. }
  580. self.private_storage.update(
  581. share['id'], {'ssh_cmd': ssh_cmd})
  582. sharenfs = self.get_zfs_option(dataset_name, 'sharenfs')
  583. if sharenfs != 'off':
  584. self.zfs('share', dataset_name)
  585. export_locations = self._get_share_helper(
  586. share['share_proto']).get_exports(dataset_name)
  587. return export_locations
  588. else:
  589. raise exception.ShareResourceNotFound(share_id=share['id'])
  590. def get_network_allocations_number(self):
  591. """ZFS does not handle networking. Return 0."""
  592. return 0
  593. @ensure_share_server_not_provided
  594. def extend_share(self, share, new_size, share_server=None):
  595. """Extends size of existing share."""
  596. dataset_name = self._get_dataset_name(share)
  597. self.zfs('set', 'quota=%sG' % new_size, dataset_name)
  598. @ensure_share_server_not_provided
  599. def shrink_share(self, share, new_size, share_server=None):
  600. """Shrinks size of existing share."""
  601. dataset_name = self._get_dataset_name(share)
  602. consumed_space = self.get_zfs_option(dataset_name, 'used')
  603. consumed_space = utils.translate_string_size_to_float(consumed_space)
  604. if consumed_space >= new_size:
  605. raise exception.ShareShrinkingPossibleDataLoss(
  606. share_id=share['id'])
  607. self.zfs('set', 'quota=%sG' % new_size, dataset_name)
  608. @ensure_share_server_not_provided
  609. def update_access(self, context, share, access_rules, add_rules,
  610. delete_rules, share_server=None):
  611. """Updates access rules for given share."""
  612. dataset_name = self._get_dataset_name(share)
  613. executor = self._get_shell_executor_by_host(share['host'])
  614. return self._get_share_helper(share['share_proto']).update_access(
  615. dataset_name, access_rules, add_rules, delete_rules,
  616. executor=executor)
  617. def manage_existing(self, share, driver_options):
  618. """Manage existing ZFS dataset as manila share.
  619. ZFSonLinux driver accepts only one driver_option 'size'.
  620. If an administrator provides this option, then such quota will be set
  621. to dataset and used as share size. Otherwise, driver will set quota
  622. equal to nearest bigger rounded integer of usage size.
  623. Driver does not expect mountpoint to be changed (should be equal
  624. to default that is "/%(dataset_name)s").
  625. :param share: share data
  626. :param driver_options: Empty dict or dict with 'size' option.
  627. :return: dict with share size and its export locations.
  628. """
  629. old_export_location = share["export_locations"][0]["path"]
  630. old_dataset_name = old_export_location.split(":/")[-1]
  631. scheduled_pool_name = share_utils.extract_host(
  632. share["host"], level="pool")
  633. actual_pool_name = old_dataset_name.split("/")[0]
  634. new_dataset_name = self._get_dataset_name(share)
  635. # Calculate quota for managed dataset
  636. quota = driver_options.get("size")
  637. if not quota:
  638. consumed_space = self.get_zfs_option(old_dataset_name, "used")
  639. consumed_space = utils.translate_string_size_to_float(
  640. consumed_space)
  641. quota = int(consumed_space) + 1
  642. share["size"] = int(quota)
  643. # Save dataset-specific data in private storage
  644. options = self._get_dataset_creation_options(share, is_readonly=False)
  645. ssh_cmd = "%(username)s@%(host)s" % {
  646. "username": self.configuration.zfs_ssh_username,
  647. "host": self.service_ip,
  648. }
  649. self.private_storage.update(
  650. share["id"], {
  651. "entity_type": "share",
  652. "dataset_name": new_dataset_name,
  653. "ssh_cmd": ssh_cmd, # used in replication
  654. "pool_name": actual_pool_name, # used in replication
  655. "used_options": " ".join(options),
  656. }
  657. )
  658. # Perform checks on requested dataset
  659. if actual_pool_name != scheduled_pool_name:
  660. raise exception.ZFSonLinuxException(
  661. _("Cannot manage share '%(share_id)s' "
  662. "(share_instance '%(si_id)s'), because scheduled "
  663. "pool '%(sch)s' and actual '%(actual)s' differ.") % {
  664. "share_id": share["share_id"],
  665. "si_id": share["id"],
  666. "sch": scheduled_pool_name,
  667. "actual": actual_pool_name})
  668. out, err = self.zfs("list", "-r", actual_pool_name)
  669. data = self.parse_zfs_answer(out)
  670. for datum in data:
  671. if datum["NAME"] == old_dataset_name:
  672. break
  673. else:
  674. raise exception.ZFSonLinuxException(
  675. _("Cannot manage share '%(share_id)s' "
  676. "(share_instance '%(si_id)s'), because dataset "
  677. "'%(dataset)s' not found in zpool '%(zpool)s'.") % {
  678. "share_id": share["share_id"],
  679. "si_id": share["id"],
  680. "dataset": old_dataset_name,
  681. "zpool": actual_pool_name})
  682. # Rename dataset
  683. out, err = self.execute("sudo", "mount")
  684. if "%s " % old_dataset_name in out:
  685. self.zfs_with_retry("umount", "-f", old_dataset_name)
  686. time.sleep(1)
  687. self.zfs_with_retry("rename", old_dataset_name, new_dataset_name)
  688. self.zfs("mount", new_dataset_name)
  689. # Apply options to dataset
  690. for option in options:
  691. self.zfs("set", option, new_dataset_name)
  692. # Get new export locations of renamed dataset
  693. export_locations = self._get_share_helper(
  694. share["share_proto"]).get_exports(new_dataset_name)
  695. return {"size": share["size"], "export_locations": export_locations}
  696. def unmanage(self, share):
  697. """Removes the specified share from Manila management."""
  698. self.private_storage.delete(share['id'])
  699. def manage_existing_snapshot(self, snapshot_instance, driver_options):
  700. """Manage existing share snapshot with manila.
  701. :param snapshot_instance: SnapshotInstance data
  702. :param driver_options: expects only one optional key 'size'.
  703. :return: dict with share snapshot instance fields for update, example::
  704. {
  705. 'size': 1,
  706. 'provider_location': 'path/to/some/dataset@some_snapshot_tag',
  707. }
  708. """
  709. snapshot_size = int(driver_options.get("size", 0))
  710. old_provider_location = snapshot_instance.get("provider_location")
  711. old_snapshot_tag = old_provider_location.split("@")[-1]
  712. new_snapshot_tag = self._get_snapshot_name(snapshot_instance["id"])
  713. self.private_storage.update(
  714. snapshot_instance["snapshot_id"], {
  715. "entity_type": "snapshot",
  716. "old_snapshot_tag": old_snapshot_tag,
  717. "snapshot_tag": new_snapshot_tag,
  718. }
  719. )
  720. try:
  721. self.zfs("list", "-r", "-t", "snapshot", old_provider_location)
  722. except exception.ProcessExecutionError as e:
  723. raise exception.ManageInvalidShareSnapshot(reason=e.stderr)
  724. if not snapshot_size:
  725. consumed_space = self.get_zfs_option(old_provider_location, "used")
  726. consumed_space = utils.translate_string_size_to_float(
  727. consumed_space)
  728. snapshot_size = int(math.ceil(consumed_space))
  729. dataset_name = self.private_storage.get(
  730. snapshot_instance["share_instance_id"], "dataset_name")
  731. new_provider_location = dataset_name + "@" + new_snapshot_tag
  732. self.zfs("rename", old_provider_location, new_provider_location)
  733. return {
  734. "size": snapshot_size,
  735. "provider_location": new_provider_location,
  736. }
  737. def unmanage_snapshot(self, snapshot_instance):
  738. """Unmanage dataset snapshot."""
  739. self.private_storage.delete(snapshot_instance["snapshot_id"])
  740. def _get_replication_snapshot_prefix(self, replica):
  741. """Returns replica-based snapshot prefix."""
  742. replication_snapshot_prefix = "%s_%s" % (
  743. self.replica_snapshot_prefix, replica['id'].replace('-', '_'))
  744. return replication_snapshot_prefix
  745. def _get_replication_snapshot_tag(self, replica):
  746. """Returns replica- and time-based snapshot tag."""
  747. current_time = timeutils.utcnow().isoformat()
  748. snapshot_tag = "%s_time_%s" % (
  749. self._get_replication_snapshot_prefix(replica), current_time)
  750. return snapshot_tag
  751. def _get_active_replica(self, replica_list):
  752. for replica in replica_list:
  753. if replica['replica_state'] == constants.REPLICA_STATE_ACTIVE:
  754. return replica
  755. msg = _("Active replica not found.")
  756. raise exception.ReplicationException(reason=msg)
  757. def _get_migration_snapshot_prefix(self, share_instance):
  758. """Returns migration-based snapshot prefix."""
  759. migration_snapshot_prefix = "%s_%s" % (
  760. self.migration_snapshot_prefix,
  761. share_instance['id'].replace('-', '_'))
  762. return migration_snapshot_prefix
  763. def _get_migration_snapshot_tag(self, share_instance):
  764. """Returns migration- and time-based snapshot tag."""
  765. current_time = timeutils.utcnow().isoformat()
  766. snapshot_tag = "%s_time_%s" % (
  767. self._get_migration_snapshot_prefix(share_instance), current_time)
  768. snapshot_tag = (
  769. snapshot_tag.replace('-', '_').replace('.', '_').replace(':', '_'))
  770. return snapshot_tag
  771. @ensure_share_server_not_provided
  772. def create_replica(self, context, replica_list, new_replica,
  773. access_rules, replica_snapshots, share_server=None):
  774. """Replicates the active replica to a new replica on this backend."""
  775. active_replica = self._get_active_replica(replica_list)
  776. src_dataset_name = self.private_storage.get(
  777. active_replica['id'], 'dataset_name')
  778. ssh_to_src_cmd = self.private_storage.get(
  779. active_replica['id'], 'ssh_cmd')
  780. dst_dataset_name = self._get_dataset_name(new_replica)
  781. ssh_cmd = '%(username)s@%(host)s' % {
  782. 'username': self.configuration.zfs_ssh_username,
  783. 'host': self.service_ip,
  784. }
  785. snapshot_tag = self._get_replication_snapshot_tag(new_replica)
  786. src_snapshot_name = (
  787. '%(dataset_name)s@%(snapshot_tag)s' % {
  788. 'snapshot_tag': snapshot_tag,
  789. 'dataset_name': src_dataset_name,
  790. }
  791. )
  792. # Save valuable data to DB
  793. self.private_storage.update(active_replica['id'], {
  794. 'repl_snapshot_tag': snapshot_tag,
  795. })
  796. self.private_storage.update(new_replica['id'], {
  797. 'entity_type': 'replica',
  798. 'replica_type': 'readable',
  799. 'dataset_name': dst_dataset_name,
  800. 'ssh_cmd': ssh_cmd,
  801. 'pool_name': share_utils.extract_host(
  802. new_replica['host'], level='pool'),
  803. 'repl_snapshot_tag': snapshot_tag,
  804. })
  805. # Create temporary snapshot. It will exist until following replica sync
  806. # After it - new one will appear and so in loop.
  807. self.execute(
  808. 'ssh', ssh_to_src_cmd,
  809. 'sudo', 'zfs', 'snapshot', src_snapshot_name,
  810. )
  811. # Send/receive temporary snapshot
  812. out, err = self.execute(
  813. 'ssh', ssh_to_src_cmd,
  814. 'sudo', 'zfs', 'send', '-vDR', src_snapshot_name, '|',
  815. 'ssh', ssh_cmd,
  816. 'sudo', 'zfs', 'receive', '-v', dst_dataset_name,
  817. )
  818. msg = ("Info about replica '%(replica_id)s' creation is following: "
  819. "\n%(out)s")
  820. LOG.debug(msg, {'replica_id': new_replica['id'], 'out': out})
  821. # Make replica readonly
  822. self.zfs('set', 'readonly=on', dst_dataset_name)
  823. # Set original share size as quota to new replica
  824. self.zfs('set', 'quota=%sG' % active_replica['size'], dst_dataset_name)
  825. # Apply access rules from original share
  826. self._get_share_helper(new_replica['share_proto']).update_access(
  827. dst_dataset_name, access_rules, add_rules=[], delete_rules=[],
  828. make_all_ro=True)
  829. return {
  830. 'export_locations': self._get_share_helper(
  831. new_replica['share_proto']).create_exports(dst_dataset_name),
  832. 'replica_state': constants.REPLICA_STATE_IN_SYNC,
  833. 'access_rules_status': constants.STATUS_ACTIVE,
  834. }
  835. @ensure_share_server_not_provided
  836. def delete_replica(self, context, replica_list, replica_snapshots, replica,
  837. share_server=None):
  838. """Deletes a replica. This is called on the destination backend."""
  839. pool_name = self.private_storage.get(replica['id'], 'pool_name')
  840. dataset_name = self.private_storage.get(replica['id'], 'dataset_name')
  841. if not dataset_name:
  842. dataset_name = self._get_dataset_name(replica)
  843. # Delete dataset's snapshots first
  844. out, err = self.zfs('list', '-r', '-t', 'snapshot', pool_name)
  845. data = self.parse_zfs_answer(out)
  846. for datum in data:
  847. if dataset_name in datum['NAME']:
  848. self._delete_dataset_or_snapshot_with_retry(datum['NAME'])
  849. # Now we delete dataset itself
  850. out, err = self.zfs('list', '-r', pool_name)
  851. data = self.parse_zfs_answer(out)
  852. for datum in data:
  853. if datum['NAME'] == dataset_name:
  854. self._get_share_helper(
  855. replica['share_proto']).remove_exports(dataset_name)
  856. self._delete_dataset_or_snapshot_with_retry(dataset_name)
  857. break
  858. else:
  859. LOG.warning(
  860. "Share replica with '%(id)s' ID and '%(name)s' NAME is "
  861. "absent on backend. Nothing has been deleted.",
  862. {'id': replica['id'], 'name': dataset_name})
  863. self.private_storage.delete(replica['id'])
  864. @ensure_share_server_not_provided
  865. def update_replica_state(self, context, replica_list, replica,
  866. access_rules, replica_snapshots,
  867. share_server=None):
  868. """Syncs replica and updates its 'replica_state'."""
  869. return self._update_replica_state(
  870. context, replica_list, replica, replica_snapshots, access_rules)
  871. def _update_replica_state(self, context, replica_list, replica,
  872. replica_snapshots=None, access_rules=None):
  873. active_replica = self._get_active_replica(replica_list)
  874. src_dataset_name = self.private_storage.get(
  875. active_replica['id'], 'dataset_name')
  876. ssh_to_src_cmd = self.private_storage.get(
  877. active_replica['id'], 'ssh_cmd')
  878. ssh_to_dst_cmd = self.private_storage.get(
  879. replica['id'], 'ssh_cmd')
  880. dst_dataset_name = self.private_storage.get(
  881. replica['id'], 'dataset_name')
  882. # Create temporary snapshot
  883. previous_snapshot_tag = self.private_storage.get(
  884. replica['id'], 'repl_snapshot_tag')
  885. snapshot_tag = self._get_replication_snapshot_tag(replica)
  886. src_snapshot_name = src_dataset_name + '@' + snapshot_tag
  887. self.execute(
  888. 'ssh', ssh_to_src_cmd,
  889. 'sudo', 'zfs', 'snapshot', src_snapshot_name,
  890. )
  891. # Make sure it is readonly
  892. self.zfs('set', 'readonly=on', dst_dataset_name)
  893. # Send/receive diff between previous snapshot and last one
  894. out, err = self.execute(
  895. 'ssh', ssh_to_src_cmd,
  896. 'sudo', 'zfs', 'send', '-vDRI',
  897. previous_snapshot_tag, src_snapshot_name, '|',
  898. 'ssh', ssh_to_dst_cmd,
  899. 'sudo', 'zfs', 'receive', '-vF', dst_dataset_name,
  900. )
  901. msg = ("Info about last replica '%(replica_id)s' sync is following: "
  902. "\n%(out)s")
  903. LOG.debug(msg, {'replica_id': replica['id'], 'out': out})
  904. # Update DB data that will be used on following replica sync
  905. self.private_storage.update(active_replica['id'], {
  906. 'repl_snapshot_tag': snapshot_tag,
  907. })
  908. self.private_storage.update(
  909. replica['id'], {'repl_snapshot_tag': snapshot_tag})
  910. # Destroy all snapshots on dst filesystem except referenced ones.
  911. snap_references = set()
  912. for repl in replica_list:
  913. snap_references.add(
  914. self.private_storage.get(repl['id'], 'repl_snapshot_tag'))
  915. dst_pool_name = dst_dataset_name.split('/')[0]
  916. out, err = self.zfs('list', '-r', '-t', 'snapshot', dst_pool_name)
  917. data = self.parse_zfs_answer(out)
  918. for datum in data:
  919. if (dst_dataset_name in datum['NAME'] and
  920. '@' + self.replica_snapshot_prefix in datum['NAME'] and
  921. datum['NAME'].split('@')[-1] not in snap_references):
  922. self._delete_dataset_or_snapshot_with_retry(datum['NAME'])
  923. # Destroy all snapshots on src filesystem except referenced ones.
  924. src_pool_name = src_snapshot_name.split('/')[0]
  925. out, err = self.execute(
  926. 'ssh', ssh_to_src_cmd,
  927. 'sudo', 'zfs', 'list', '-r', '-t', 'snapshot', src_pool_name,
  928. )
  929. data = self.parse_zfs_answer(out)
  930. full_src_snapshot_prefix = (
  931. src_dataset_name + '@' +
  932. self._get_replication_snapshot_prefix(replica))
  933. for datum in data:
  934. if (full_src_snapshot_prefix in datum['NAME'] and
  935. datum['NAME'].split('@')[-1] not in snap_references):
  936. self.execute_with_retry(
  937. 'ssh', ssh_to_src_cmd,
  938. 'sudo', 'zfs', 'destroy', '-f', datum['NAME'],
  939. )
  940. if access_rules:
  941. # Apply access rules from original share
  942. # TODO(vponomaryov): we should remove somehow rules that were
  943. # deleted on active replica after creation of secondary replica.
  944. # For the moment there will be difference and it can be considered
  945. # as a bug.
  946. self._get_share_helper(replica['share_proto']).update_access(
  947. dst_dataset_name, access_rules, add_rules=[], delete_rules=[],
  948. make_all_ro=True)
  949. # Return results
  950. return constants.REPLICA_STATE_IN_SYNC
  951. @ensure_share_server_not_provided
  952. def promote_replica(self, context, replica_list, replica, access_rules,
  953. share_server=None):
  954. """Promotes secondary replica to active and active to secondary."""
  955. active_replica = self._get_active_replica(replica_list)
  956. src_dataset_name = self.private_storage.get(
  957. active_replica['id'], 'dataset_name')
  958. ssh_to_src_cmd = self.private_storage.get(
  959. active_replica['id'], 'ssh_cmd')
  960. dst_dataset_name = self.private_storage.get(
  961. replica['id'], 'dataset_name')
  962. replica_dict = {
  963. r['id']: {
  964. 'id': r['id'],
  965. # NOTE(vponomaryov): access rules will be updated in next
  966. # 'sync' operation.
  967. 'access_rules_status': constants.SHARE_INSTANCE_RULES_SYNCING,
  968. }
  969. for r in replica_list
  970. }
  971. try:
  972. # Mark currently active replica as readonly
  973. self.execute(
  974. 'ssh', ssh_to_src_cmd,
  975. 'set', 'readonly=on', src_dataset_name,
  976. )
  977. # Create temporary snapshot of currently active replica
  978. snapshot_tag = self._get_replication_snapshot_tag(active_replica)
  979. src_snapshot_name = src_dataset_name + '@' + snapshot_tag
  980. self.execute(
  981. 'ssh', ssh_to_src_cmd,
  982. 'sudo', 'zfs', 'snapshot', src_snapshot_name,
  983. )
  984. # Apply temporary snapshot to all replicas
  985. for repl in replica_list:
  986. if repl['replica_state'] == constants.REPLICA_STATE_ACTIVE:
  987. continue
  988. previous_snapshot_tag = self.private_storage.get(
  989. repl['id'], 'repl_snapshot_tag')
  990. dataset_name = self.private_storage.get(
  991. repl['id'], 'dataset_name')
  992. ssh_to_dst_cmd = self.private_storage.get(
  993. repl['id'], 'ssh_cmd')
  994. try:
  995. # Send/receive diff between previous snapshot and last one
  996. out, err = self.execute(
  997. 'ssh', ssh_to_src_cmd,
  998. 'sudo', 'zfs', 'send', '-vDRI',
  999. previous_snapshot_tag, src_snapshot_name, '|',
  1000. 'ssh', ssh_to_dst_cmd,
  1001. 'sudo', 'zfs', 'receive', '-vF', dataset_name,
  1002. )
  1003. except exception.ProcessExecutionError as e:
  1004. LOG.warning("Failed to sync replica %(id)s. %(e)s",
  1005. {'id': repl['id'], 'e': e})
  1006. replica_dict[repl['id']]['replica_state'] = (
  1007. constants.REPLICA_STATE_OUT_OF_SYNC)
  1008. continue
  1009. msg = ("Info about last replica '%(replica_id)s' "
  1010. "sync is following: \n%(out)s")
  1011. LOG.debug(msg, {'replica_id': repl['id'], 'out': out})
  1012. # Update latest replication snapshot for replica
  1013. self.private_storage.update(
  1014. repl['id'], {'repl_snapshot_tag': snapshot_tag})
  1015. # Update latest replication snapshot for currently active replica
  1016. self.private_storage.update(
  1017. active_replica['id'], {'repl_snapshot_tag': snapshot_tag})
  1018. replica_dict[active_replica['id']]['replica_state'] = (
  1019. constants.REPLICA_STATE_IN_SYNC)
  1020. except Exception as e:
  1021. LOG.warning(
  1022. "Failed to update currently active replica. \n%s", e)
  1023. replica_dict[active_replica['id']]['replica_state'] = (
  1024. constants.REPLICA_STATE_OUT_OF_SYNC)
  1025. # Create temporary snapshot of new replica and sync it with other
  1026. # secondary replicas.
  1027. snapshot_tag = self._get_replication_snapshot_tag(replica)
  1028. src_snapshot_name = dst_dataset_name + '@' + snapshot_tag
  1029. ssh_to_src_cmd = self.private_storage.get(replica['id'], 'ssh_cmd')
  1030. self.zfs('snapshot', src_snapshot_name)
  1031. for repl in replica_list:
  1032. if (repl['replica_state'] == constants.REPLICA_STATE_ACTIVE or
  1033. repl['id'] == replica['id']):
  1034. continue
  1035. previous_snapshot_tag = self.private_storage.get(
  1036. repl['id'], 'repl_snapshot_tag')
  1037. dataset_name = self.private_storage.get(
  1038. repl['id'], 'dataset_name')
  1039. ssh_to_dst_cmd = self.private_storage.get(
  1040. repl['id'], 'ssh_cmd')
  1041. try:
  1042. # Send/receive diff between previous snapshot and last one
  1043. out, err = self.execute(
  1044. 'ssh', ssh_to_src_cmd,
  1045. 'sudo', 'zfs', 'send', '-vDRI',
  1046. previous_snapshot_tag, src_snapshot_name, '|',
  1047. 'ssh', ssh_to_dst_cmd,
  1048. 'sudo', 'zfs', 'receive', '-vF', dataset_name,
  1049. )
  1050. except exception.ProcessExecutionError as e:
  1051. LOG.warning("Failed to sync replica %(id)s. %(e)s",
  1052. {'id': repl['id'], 'e': e})
  1053. replica_dict[repl['id']]['replica_state'] = (
  1054. constants.REPLICA_STATE_OUT_OF_SYNC)
  1055. continue
  1056. msg = ("Info about last replica '%(replica_id)s' "
  1057. "sync is following: \n%(out)s")
  1058. LOG.debug(msg, {'replica_id': repl['id'], 'out': out})
  1059. # Update latest replication snapshot for replica
  1060. self.private_storage.update(
  1061. repl['id'], {'repl_snapshot_tag': snapshot_tag})
  1062. # Update latest replication snapshot for new active replica
  1063. self.private_storage.update(
  1064. replica['id'], {'repl_snapshot_tag': snapshot_tag})
  1065. replica_dict[replica['id']]['replica_state'] = (
  1066. constants.REPLICA_STATE_ACTIVE)
  1067. self._get_share_helper(replica['share_proto']).update_access(
  1068. dst_dataset_name, access_rules, add_rules=[], delete_rules=[])
  1069. replica_dict[replica['id']]['access_rules_status'] = (
  1070. constants.STATUS_ACTIVE)
  1071. self.zfs('set', 'readonly=off', dst_dataset_name)
  1072. return list(replica_dict.values())
  1073. @ensure_share_server_not_provided
  1074. def create_replicated_snapshot(self, context, replica_list,
  1075. replica_snapshots, share_server=None):
  1076. """Create a snapshot and update across the replicas."""
  1077. active_replica = self._get_active_replica(replica_list)
  1078. src_dataset_name = self.private_storage.get(
  1079. active_replica['id'], 'dataset_name')
  1080. ssh_to_src_cmd = self.private_storage.get(
  1081. active_replica['id'], 'ssh_cmd')
  1082. replica_snapshots_dict = {
  1083. si['id']: {'id': si['id']} for si in replica_snapshots}
  1084. active_snapshot_instance_id = [
  1085. si['id'] for si in replica_snapshots
  1086. if si['share_instance_id'] == active_replica['id']][0]
  1087. snapshot_tag = self._get_snapshot_name(active_snapshot_instance_id)
  1088. # Replication should not be dependent on manually created snapshots
  1089. # so, create additional one, newer, that will be used for replication
  1090. # synchronizations.
  1091. repl_snapshot_tag = self._get_replication_snapshot_tag(active_replica)
  1092. src_snapshot_name = src_dataset_name + '@' + repl_snapshot_tag
  1093. self.private_storage.update(
  1094. replica_snapshots[0]['snapshot_id'], {
  1095. 'entity_type': 'snapshot',
  1096. 'snapshot_tag': snapshot_tag,
  1097. }
  1098. )
  1099. for tag in (snapshot_tag, repl_snapshot_tag):
  1100. self.execute(
  1101. 'ssh', ssh_to_src_cmd,
  1102. 'sudo', 'zfs', 'snapshot', src_dataset_name + '@' + tag,
  1103. )
  1104. # Populate snapshot to all replicas
  1105. for replica_snapshot in replica_snapshots:
  1106. replica_id = replica_snapshot['share_instance_id']
  1107. if replica_id == active_replica['id']:
  1108. replica_snapshots_dict[replica_snapshot['id']]['status'] = (
  1109. constants.STATUS_AVAILABLE)
  1110. continue
  1111. previous_snapshot_tag = self.private_storage.get(
  1112. replica_id, 'repl_snapshot_tag')
  1113. dst_dataset_name = self.private_storage.get(
  1114. replica_id, 'dataset_name')
  1115. ssh_to_dst_cmd = self.private_storage.get(replica_id, 'ssh_cmd')
  1116. try:
  1117. # Send/receive diff between previous snapshot and last one
  1118. out, err = self.execute(
  1119. 'ssh', ssh_to_src_cmd,
  1120. 'sudo', 'zfs', 'send', '-vDRI',
  1121. previous_snapshot_tag, src_snapshot_name, '|',
  1122. 'ssh', ssh_to_dst_cmd,
  1123. 'sudo', 'zfs', 'receive', '-vF', dst_dataset_name,
  1124. )
  1125. except exception.ProcessExecutionError as e:
  1126. LOG.warning(
  1127. "Failed to sync snapshot instance %(id)s. %(e)s",
  1128. {'id': replica_snapshot['id'], 'e': e})
  1129. replica_snapshots_dict[replica_snapshot['id']]['status'] = (
  1130. constants.STATUS_ERROR)
  1131. continue
  1132. replica_snapshots_dict[replica_snapshot['id']]['status'] = (
  1133. constants.STATUS_AVAILABLE)
  1134. msg = ("Info about last replica '%(replica_id)s' "
  1135. "sync is following: \n%(out)s")
  1136. LOG.debug(msg, {'replica_id': replica_id, 'out': out})
  1137. # Update latest replication snapshot for replica
  1138. self.private_storage.update(
  1139. replica_id, {'repl_snapshot_tag': repl_snapshot_tag})
  1140. # Update latest replication snapshot for currently active replica
  1141. self.private_storage.update(
  1142. active_replica['id'], {'repl_snapshot_tag': repl_snapshot_tag})
  1143. return list(replica_snapshots_dict.values())
  1144. @ensure_share_server_not_provided
  1145. def delete_replicated_snapshot(self, context, replica_list,
  1146. replica_snapshots, share_server=None):
  1147. """Delete a snapshot by deleting its instances across the replicas."""
  1148. active_replica = self._get_active_replica(replica_list)
  1149. replica_snapshots_dict = {
  1150. si['id']: {'id': si['id']} for si in replica_snapshots}
  1151. for replica_snapshot in replica_snapshots:
  1152. replica_id = replica_snapshot['share_instance_id']
  1153. snapshot_name = self._get_saved_snapshot_name(replica_snapshot)
  1154. if active_replica['id'] == replica_id:
  1155. self._delete_snapshot(context, replica_snapshot)
  1156. replica_snapshots_dict[replica_snapshot['id']]['status'] = (
  1157. constants.STATUS_DELETED)
  1158. continue
  1159. ssh_cmd = self.private_storage.get(replica_id, 'ssh_cmd')
  1160. out, err = self.execute(
  1161. 'ssh', ssh_cmd,
  1162. 'sudo', 'zfs', 'list', '-r', '-t', 'snapshot', snapshot_name,
  1163. )
  1164. data = self.parse_zfs_answer(out)
  1165. for datum in data:
  1166. if datum['NAME'] != snapshot_name:
  1167. continue
  1168. self.execute_with_retry(
  1169. 'ssh', ssh_cmd,
  1170. 'sudo', 'zfs', 'destroy', '-f', datum['NAME'],
  1171. )
  1172. self.private_storage.delete(replica_snapshot['id'])
  1173. replica_snapshots_dict[replica_snapshot['id']]['status'] = (
  1174. constants.STATUS_DELETED)
  1175. self.private_storage.delete(replica_snapshot['snapshot_id'])
  1176. return list(replica_snapshots_dict.values())
  1177. @ensure_share_server_not_provided
  1178. def update_replicated_snapshot(self, context, replica_list,
  1179. share_replica, replica_snapshots,
  1180. replica_snapshot, share_server=None):
  1181. """Update the status of a snapshot instance that lives on a replica."""
  1182. self._update_replica_state(context, replica_list, share_replica)
  1183. snapshot_name = self._get_saved_snapshot_name(replica_snapshot)
  1184. out, err = self.zfs('list', '-r', '-t', 'snapshot', snapshot_name)
  1185. data = self.parse_zfs_answer(out)
  1186. snapshot_found = False
  1187. for datum in data:
  1188. if datum['NAME'] == snapshot_name:
  1189. snapshot_found = True
  1190. break
  1191. return_dict = {'id': replica_snapshot['id']}
  1192. if snapshot_found:
  1193. return_dict.update({'status': constants.STATUS_AVAILABLE})
  1194. else:
  1195. return_dict.update({'status': constants.STATUS_ERROR})
  1196. return return_dict
  1197. @ensure_share_server_not_provided
  1198. def migration_check_compatibility(
  1199. self, context, source_share, destination_share,
  1200. share_server=None, destination_share_server=None):
  1201. """Is called to test compatibility with destination backend."""
  1202. backend_name = share_utils.extract_host(
  1203. destination_share['host'], level='backend_name')
  1204. config = get_backend_configuration(backend_name)
  1205. compatible = self.configuration.share_driver == config.share_driver
  1206. return {
  1207. 'compatible': compatible,
  1208. 'writable': False,
  1209. 'preserve_metadata': True,
  1210. 'nondisruptive': True,
  1211. }
  1212. @ensure_share_server_not_provided
  1213. def migration_start(
  1214. self, context, source_share, destination_share, source_snapshots,
  1215. snapshot_mappings, share_server=None,
  1216. destination_share_server=None):
  1217. """Is called to start share migration."""
  1218. src_dataset_name = self.private_storage.get(
  1219. source_share['id'], 'dataset_name')
  1220. dst_dataset_name = self._get_dataset_name(destination_share)
  1221. backend_name = share_utils.extract_host(
  1222. destination_share['host'], level='backend_name')
  1223. ssh_cmd = '%(username)s@%(host)s' % {
  1224. 'username': self.configuration.zfs_ssh_username,
  1225. 'host': self.configuration.zfs_service_ip,
  1226. }
  1227. config = get_backend_configuration(backend_name)
  1228. remote_ssh_cmd = '%(username)s@%(host)s' % {
  1229. 'username': config.zfs_ssh_username,
  1230. 'host': config.zfs_service_ip,
  1231. }
  1232. snapshot_tag = self._get_migration_snapshot_tag(destination_share)
  1233. src_snapshot_name = (
  1234. '%(dataset_name)s@%(snapshot_tag)s' % {
  1235. 'snapshot_tag': snapshot_tag,
  1236. 'dataset_name': src_dataset_name,
  1237. }
  1238. )
  1239. # Save valuable data to DB
  1240. self.private_storage.update(source_share['id'], {
  1241. 'migr_snapshot_tag': snapshot_tag,
  1242. })
  1243. self.private_storage.update(destination_share['id'], {
  1244. 'entity_type': 'share',
  1245. 'dataset_name': dst_dataset_name,
  1246. 'ssh_cmd': remote_ssh_cmd,
  1247. 'pool_name': share_utils.extract_host(
  1248. destination_share['host'], level='pool'),
  1249. 'migr_snapshot_tag': snapshot_tag,
  1250. })
  1251. # Create temporary snapshot on src host.
  1252. self.execute('sudo', 'zfs', 'snapshot', src_snapshot_name)
  1253. # Send/receive temporary snapshot
  1254. cmd = (
  1255. 'ssh ' + ssh_cmd + ' '
  1256. 'sudo zfs send -vDR ' + src_snapshot_name + ' '
  1257. '| ssh ' + remote_ssh_cmd + ' '
  1258. 'sudo zfs receive -v ' + dst_dataset_name
  1259. )
  1260. filename = dst_dataset_name.replace('/', '_')
  1261. with utils.tempdir() as tmpdir:
  1262. tmpfilename = os.path.join(tmpdir, '%s.sh' % filename)
  1263. with open(tmpfilename, "w") as migr_script:
  1264. migr_script.write(cmd)
  1265. self.execute('sudo', 'chmod', '755', tmpfilename)
  1266. self.execute('nohup', tmpfilename, '&')
  1267. @ensure_share_server_not_provided
  1268. def migration_continue(
  1269. self, context, source_share, destination_share, source_snapshots,
  1270. snapshot_mappings, share_server=None,
  1271. destination_share_server=None):
  1272. """Is called in source share's backend to continue migration."""
  1273. snapshot_tag = self.private_storage.get(
  1274. destination_share['id'], 'migr_snapshot_tag')
  1275. out, err = self.execute('ps', 'aux')
  1276. if not '@%s' % snapshot_tag in out:
  1277. dst_dataset_name = self.private_storage.get(
  1278. destination_share['id'], 'dataset_name')
  1279. try:
  1280. self.execute(
  1281. 'sudo', 'zfs', 'get', 'quota', dst_dataset_name,
  1282. executor=self._get_shell_executor_by_host(
  1283. destination_share['host']),
  1284. )
  1285. return True
  1286. except exception.ProcessExecutionError as e:
  1287. raise exception.ZFSonLinuxException(msg=_(
  1288. 'Migration process is absent and dst dataset '
  1289. 'returned following error: %s') % e)
  1290. @ensure_share_server_not_provided
  1291. def migration_complete(
  1292. self, context, source_share, destination_share, source_snapshots,
  1293. snapshot_mappings, share_server=None,
  1294. destination_share_server=None):
  1295. """Is called to perform 2nd phase of driver migration of a given share.
  1296. """
  1297. dst_dataset_name = self.private_storage.get(
  1298. destination_share['id'], 'dataset_name')
  1299. snapshot_tag = self.private_storage.get(
  1300. destination_share['id'], 'migr_snapshot_tag')
  1301. dst_snapshot_name = (
  1302. '%(dataset_name)s@%(snapshot_tag)s' % {
  1303. 'snapshot_tag': snapshot_tag,
  1304. 'dataset_name': dst_dataset_name,
  1305. }
  1306. )
  1307. dst_executor = self._get_shell_executor_by_host(
  1308. destination_share['host'])
  1309. # Destroy temporary migration snapshot on dst host
  1310. self.execute(
  1311. 'sudo', 'zfs', 'destroy', dst_snapshot_name,
  1312. executor=dst_executor,
  1313. )
  1314. # Get export locations of new share instance
  1315. export_locations = self._get_share_helper(
  1316. destination_share['share_proto']).create_exports(
  1317. dst_dataset_name,
  1318. executor=dst_executor)
  1319. # Destroy src share and temporary migration snapshot on src (this) host
  1320. self.delete_share(context, source_share)
  1321. return {'export_locations': export_locations}
  1322. @ensure_share_server_not_provided
  1323. def migration_cancel(
  1324. self, context, source_share, destination_share, source_snapshots,
  1325. snapshot_mappings, share_server=None,
  1326. destination_share_server=None):
  1327. """Is called to cancel driver migration."""
  1328. src_dataset_name = self.private_storage.get(
  1329. source_share['id'], 'dataset_name')
  1330. dst_dataset_name = self.private_storage.get(
  1331. destination_share['id'], 'dataset_name')
  1332. ssh_cmd = self.private_storage.get(
  1333. destination_share['id'], 'ssh_cmd')
  1334. snapshot_tag = self.private_storage.get(
  1335. destination_share['id'], 'migr_snapshot_tag')
  1336. # Kill migration process if exists
  1337. try:
  1338. out, err = self.execute('ps', 'aux')
  1339. lines = out.split('\n')
  1340. for line in lines:
  1341. if '@%s' % snapshot_tag in line:
  1342. migr_pid = [
  1343. x for x in line.strip().split(' ') if x != ''][1]
  1344. self.execute('sudo', 'kill', '-9', migr_pid)
  1345. except exception.ProcessExecutionError as e:
  1346. LOG.warning(
  1347. "Caught following error trying to kill migration process: %s",
  1348. e)
  1349. # Sleep couple of seconds before destroying updated objects
  1350. time.sleep(2)
  1351. # Destroy snapshot on source host
  1352. self._delete_dataset_or_snapshot_with_retry(
  1353. src_dataset_name + '@' + snapshot_tag)
  1354. # Destroy dataset and its migration snapshot on destination host
  1355. try:
  1356. self.execute(
  1357. 'ssh', ssh_cmd,
  1358. 'sudo', 'zfs', 'destroy', '-r', dst_dataset_name,
  1359. )
  1360. except exception.ProcessExecutionError as e:
  1361. LOG.warning(
  1362. "Failed to destroy destination dataset with following error: "
  1363. "%s",
  1364. e)
  1365. LOG.debug(
  1366. "Migration of share with ID '%s' has been canceled.",
  1367. source_share["id"])