summaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlmglue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r--fs/ocfs2/dlmglue.c2904
1 files changed, 2904 insertions, 0 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
new file mode 100644
index 000000000000..e971ec2f8407
--- /dev/null
+++ b/fs/ocfs2/dlmglue.c
@@ -0,0 +1,2904 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmglue.c
+ *
+ * Code which implements an OCFS2 specific interface to our DLM.
+ *
+ * Copyright (C) 2003, 2004 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/mm.h>
+#include <linux/smp_lock.h>
+#include <linux/crc32.h>
+#include <linux/kthread.h>
+#include <linux/pagemap.h>
+#include <linux/debugfs.h>
+#include <linux/seq_file.h>
+
+#include <cluster/heartbeat.h>
+#include <cluster/nodemanager.h>
+#include <cluster/tcp.h>
+
+#include <dlm/dlmapi.h>
+
+#define MLOG_MASK_PREFIX ML_DLM_GLUE
+#include <cluster/masklog.h>
+
+#include "ocfs2.h"
+
+#include "alloc.h"
+#include "dlmglue.h"
+#include "extent_map.h"
+#include "heartbeat.h"
+#include "inode.h"
+#include "journal.h"
+#include "slot_map.h"
+#include "super.h"
+#include "uptodate.h"
+#include "vote.h"
+
+#include "buffer_head_io.h"
+
+struct ocfs2_mask_waiter {
+ struct list_head mw_item;
+ int mw_status;
+ struct completion mw_complete;
+ unsigned long mw_mask;
+ unsigned long mw_goal;
+};
+
+static void ocfs2_inode_ast_func(void *opaque);
+static void ocfs2_inode_bast_func(void *opaque,
+ int level);
+static void ocfs2_super_ast_func(void *opaque);
+static void ocfs2_super_bast_func(void *opaque,
+ int level);
+static void ocfs2_rename_ast_func(void *opaque);
+static void ocfs2_rename_bast_func(void *opaque,
+ int level);
+
+/* so far, all locks have gotten along with the same unlock ast */
+static void ocfs2_unlock_ast_func(void *opaque,
+ enum dlm_status status);
+static int ocfs2_do_unblock_meta(struct inode *inode,
+ int *requeue);
+static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
+ int *requeue);
+static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
+ int *requeue);
+static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
+ int *requeue);
+static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
+ int *requeue);
+typedef void (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
+static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres,
+ int *requeue,
+ ocfs2_convert_worker_t *worker);
+
+struct ocfs2_lock_res_ops {
+ void (*ast)(void *);
+ void (*bast)(void *, int);
+ void (*unlock_ast)(void *, enum dlm_status);
+ int (*unblock)(struct ocfs2_lock_res *, int *);
+};
+
+static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
+ .ast = ocfs2_inode_ast_func,
+ .bast = ocfs2_inode_bast_func,
+ .unlock_ast = ocfs2_unlock_ast_func,
+ .unblock = ocfs2_unblock_inode_lock,
+};
+
+static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
+ .ast = ocfs2_inode_ast_func,
+ .bast = ocfs2_inode_bast_func,
+ .unlock_ast = ocfs2_unlock_ast_func,
+ .unblock = ocfs2_unblock_meta,
+};
+
+static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
+ int blocking);
+
+static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
+ .ast = ocfs2_inode_ast_func,
+ .bast = ocfs2_inode_bast_func,
+ .unlock_ast = ocfs2_unlock_ast_func,
+ .unblock = ocfs2_unblock_data,
+};
+
+static struct ocfs2_lock_res_ops ocfs2_super_lops = {
+ .ast = ocfs2_super_ast_func,
+ .bast = ocfs2_super_bast_func,
+ .unlock_ast = ocfs2_unlock_ast_func,
+ .unblock = ocfs2_unblock_osb_lock,
+};
+
+static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
+ .ast = ocfs2_rename_ast_func,
+ .bast = ocfs2_rename_bast_func,
+ .unlock_ast = ocfs2_unlock_ast_func,
+ .unblock = ocfs2_unblock_osb_lock,
+};
+
+static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
+{
+ return lockres->l_type == OCFS2_LOCK_TYPE_META ||
+ lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
+ lockres->l_type == OCFS2_LOCK_TYPE_RW;
+}
+
+static inline int ocfs2_is_super_lock(struct ocfs2_lock_res *lockres)
+{
+ return lockres->l_type == OCFS2_LOCK_TYPE_SUPER;
+}
+
+static inline int ocfs2_is_rename_lock(struct ocfs2_lock_res *lockres)
+{
+ return lockres->l_type == OCFS2_LOCK_TYPE_RENAME;
+}
+
+static inline struct ocfs2_super *ocfs2_lock_res_super(struct ocfs2_lock_res *lockres)
+{
+ BUG_ON(!ocfs2_is_super_lock(lockres)
+ && !ocfs2_is_rename_lock(lockres));
+
+ return (struct ocfs2_super *) lockres->l_priv;
+}
+
+static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
+{
+ BUG_ON(!ocfs2_is_inode_lock(lockres));
+
+ return (struct inode *) lockres->l_priv;
+}
+
+static int ocfs2_lock_create(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres,
+ int level,
+ int dlm_flags);
+static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
+ int wanted);
+static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres,
+ int level);
+static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
+static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
+static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
+static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
+static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres);
+static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
+ int convert);
+#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \
+ mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \
+ "resource %s: %s\n", dlm_errname(_stat), _func, \
+ _lockres->l_name, dlm_errmsg(_stat)); \
+} while (0)
+static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres);
+static int ocfs2_meta_lock_update(struct inode *inode,
+ struct buffer_head **bh);
+static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
+static inline int ocfs2_highest_compat_lock_level(int level);
+static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
+ struct ocfs2_lock_res *lockres,
+ int new_level);
+
+static char *ocfs2_lock_type_strings[] = {
+ [OCFS2_LOCK_TYPE_META] = "Meta",
+ [OCFS2_LOCK_TYPE_DATA] = "Data",
+ [OCFS2_LOCK_TYPE_SUPER] = "Super",
+ [OCFS2_LOCK_TYPE_RENAME] = "Rename",
+ /* Need to differntiate from [R]ename.. serializing writes is the
+ * important job it does, anyway. */
+ [OCFS2_LOCK_TYPE_RW] = "Write/Read",
+};
+
+static char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
+{
+ mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type);
+ return ocfs2_lock_type_strings[type];
+}
+
+static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
+ u64 blkno,
+ u32 generation,
+ char *name)
+{
+ int len;
+
+ mlog_entry_void();
+
+ BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
+
+ len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016"MLFx64"%08x",
+ ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, blkno,
+ generation);
+
+ BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
+
+ mlog(0, "built lock resource with name: %s\n", name);
+
+ mlog_exit_void();
+}
+
+static spinlock_t ocfs2_dlm_tracking_lock = SPIN_LOCK_UNLOCKED;
+
+static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
+ struct ocfs2_dlm_debug *dlm_debug)
+{
+ mlog(0, "Add tracking for lockres %s\n", res->l_name);
+
+ spin_lock(&ocfs2_dlm_tracking_lock);
+ list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
+ spin_unlock(&ocfs2_dlm_tracking_lock);
+}
+
+static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
+{
+ spin_lock(&ocfs2_dlm_tracking_lock);
+ if (!list_empty(&res->l_debug_list))
+ list_del_init(&res->l_debug_list);
+ spin_unlock(&ocfs2_dlm_tracking_lock);
+}
+
+static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *res,
+ enum ocfs2_lock_type type,
+ u64 blkno,
+ u32 generation,
+ struct ocfs2_lock_res_ops *ops,
+ void *priv)
+{
+ ocfs2_build_lock_name(type, blkno, generation, res->l_name);
+
+ res->l_type = type;
+ res->l_ops = ops;
+ res->l_priv = priv;
+
+ res->l_level = LKM_IVMODE;
+ res->l_requested = LKM_IVMODE;
+ res->l_blocking = LKM_IVMODE;
+ res->l_action = OCFS2_AST_INVALID;
+ res->l_unlock_action = OCFS2_UNLOCK_INVALID;
+
+ res->l_flags = OCFS2_LOCK_INITIALIZED;
+
+ ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
+}
+
+void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
+{
+ /* This also clears out the lock status block */
+ memset(res, 0, sizeof(struct ocfs2_lock_res));
+ spin_lock_init(&res->l_lock);
+ init_waitqueue_head(&res->l_event);
+ INIT_LIST_HEAD(&res->l_blocked_list);
+ INIT_LIST_HEAD(&res->l_mask_waiters);
+}
+
+void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
+ enum ocfs2_lock_type type,
+ struct inode *inode)
+{
+ struct ocfs2_lock_res_ops *ops;
+
+ switch(type) {
+ case OCFS2_LOCK_TYPE_RW:
+ ops = &ocfs2_inode_rw_lops;
+ break;
+ case OCFS2_LOCK_TYPE_META:
+ ops = &ocfs2_inode_meta_lops;
+ break;
+ case OCFS2_LOCK_TYPE_DATA:
+ ops = &ocfs2_inode_data_lops;
+ break;
+ default:
+ mlog_bug_on_msg(1, "type: %d\n", type);
+ ops = NULL; /* thanks, gcc */
+ break;
+ };
+
+ ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type,
+ OCFS2_I(inode)->ip_blkno,
+ inode->i_generation, ops, inode);
+}
+
+static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
+ struct ocfs2_super *osb)
+{
+ /* Superblock lockres doesn't come from a slab so we call init
+ * once on it manually. */
+ ocfs2_lock_res_init_once(res);
+ ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
+ OCFS2_SUPER_BLOCK_BLKNO, 0,
+ &ocfs2_super_lops, osb);
+}
+
+static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
+ struct ocfs2_super *osb)
+{
+ /* Rename lockres doesn't come from a slab so we call init
+ * once on it manually. */
+ ocfs2_lock_res_init_once(res);
+ ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 0, 0,
+ &ocfs2_rename_lops, osb);
+}
+
+void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
+{
+ mlog_entry_void();
+
+ if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
+ return;
+
+ ocfs2_remove_lockres_tracking(res);
+
+ mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
+ "Lockres %s is on the blocked list\n",
+ res->l_name);
+ mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
+ "Lockres %s has mask waiters pending\n",
+ res->l_name);
+ mlog_bug_on_msg(spin_is_locked(&res->l_lock),
+ "Lockres %s is locked\n",
+ res->l_name);
+ mlog_bug_on_msg(res->l_ro_holders,
+ "Lockres %s has %u ro holders\n",
+ res->l_name, res->l_ro_holders);
+ mlog_bug_on_msg(res->l_ex_holders,
+ "Lockres %s has %u ex holders\n",
+ res->l_name, res->l_ex_holders);
+
+ /* Need to clear out the lock status block for the dlm */
+ memset(&res->l_lksb, 0, sizeof(res->l_lksb));
+
+ res->l_flags = 0UL;
+ mlog_exit_void();
+}
+
+static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
+ int level)
+{
+ mlog_entry_void();
+
+ BUG_ON(!lockres);
+
+ switch(level) {
+ case LKM_EXMODE:
+ lockres->l_ex_holders++;
+ break;
+ case LKM_PRMODE:
+ lockres->l_ro_holders++;
+ break;
+ default:
+ BUG();
+ }
+
+ mlog_exit_void();
+}
+
+static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
+ int level)
+{
+ mlog_entry_void();
+
+ BUG_ON(!lockres);
+
+ switch(level) {
+ case LKM_EXMODE:
+ BUG_ON(!lockres->l_ex_holders);
+ lockres->l_ex_holders--;
+ break;
+ case LKM_PRMODE:
+ BUG_ON(!lockres->l_ro_holders);
+ lockres->l_ro_holders--;
+ break;
+ default:
+ BUG();
+ }
+ mlog_exit_void();
+}
+
+/* WARNING: This function lives in a world where the only three lock
+ * levels are EX, PR, and NL. It *will* have to be adjusted when more
+ * lock types are added. */
+static inline int ocfs2_highest_compat_lock_level(int level)
+{
+ int new_level = LKM_EXMODE;
+
+ if (level == LKM_EXMODE)
+ new_level = LKM_NLMODE;
+ else if (level == LKM_PRMODE)
+ new_level = LKM_PRMODE;
+ return new_level;
+}
+
+static void lockres_set_flags(struct ocfs2_lock_res *lockres,
+ unsigned long newflags)
+{
+ struct list_head *pos, *tmp;
+ struct ocfs2_mask_waiter *mw;
+
+ assert_spin_locked(&lockres->l_lock);
+
+ lockres->l_flags = newflags;
+
+ list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
+ mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
+ if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
+ continue;
+
+ list_del_init(&mw->mw_item);
+ mw->mw_status = 0;
+ complete(&mw->mw_complete);
+ }
+}
+static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
+{
+ lockres_set_flags(lockres, lockres->l_flags | or);
+}
+static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
+ unsigned long clear)
+{
+ lockres_set_flags(lockres, lockres->l_flags & ~clear);
+}
+
+static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
+{
+ mlog_entry_void();
+
+ BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
+ BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
+ BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
+ BUG_ON(lockres->l_blocking <= LKM_NLMODE);
+
+ lockres->l_level = lockres->l_requested;
+ if (lockres->l_level <=
+ ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
+ lockres->l_blocking = LKM_NLMODE;
+ lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
+ }
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+
+ mlog_exit_void();
+}
+
+static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
+{
+ mlog_entry_void();
+
+ BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
+ BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
+
+ /* Convert from RO to EX doesn't really need anything as our
+ * information is already up to data. Convert from NL to
+ * *anything* however should mark ourselves as needing an
+ * update */
+ if (lockres->l_level == LKM_NLMODE)
+ lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
+
+ lockres->l_level = lockres->l_requested;
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+
+ mlog_exit_void();
+}
+
+static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
+{
+ mlog_entry_void();
+
+ BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
+ BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
+
+ if (lockres->l_requested > LKM_NLMODE &&
+ !(lockres->l_flags & OCFS2_LOCK_LOCAL))
+ lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
+
+ lockres->l_level = lockres->l_requested;
+ lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+
+ mlog_exit_void();
+}
+
+static void ocfs2_inode_ast_func(void *opaque)
+{
+ struct ocfs2_lock_res *lockres = opaque;
+ struct inode *inode;
+ struct dlm_lockstatus *lksb;
+ unsigned long flags;
+
+ mlog_entry_void();
+
+ inode = ocfs2_lock_res_inode(lockres);
+
+ mlog(0, "AST fired for inode %"MLFu64", l_action = %u, type = %s\n",
+ OCFS2_I(inode)->ip_blkno, lockres->l_action,
+ ocfs2_lock_type_string(lockres->l_type));
+
+ BUG_ON(!ocfs2_is_inode_lock(lockres));
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+
+ lksb = &(lockres->l_lksb);
+ if (lksb->status != DLM_NORMAL) {
+ mlog(ML_ERROR, "ocfs2_inode_ast_func: lksb status value of %u "
+ "on inode %"MLFu64"\n", lksb->status,
+ OCFS2_I(inode)->ip_blkno);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+ mlog_exit_void();
+ return;
+ }
+
+ switch(lockres->l_action) {
+ case OCFS2_AST_ATTACH:
+ ocfs2_generic_handle_attach_action(lockres);
+ lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
+ break;
+ case OCFS2_AST_CONVERT:
+ ocfs2_generic_handle_convert_action(lockres);
+ break;
+ case OCFS2_AST_DOWNCONVERT:
+ ocfs2_generic_handle_downconvert_action(lockres);
+ break;
+ default:
+ mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
+ "lockres flags = 0x%lx, unlock action: %u\n",
+ lockres->l_name, lockres->l_action, lockres->l_flags,
+ lockres->l_unlock_action);
+
+ BUG();
+ }
+
+ /* data and rw locking ignores refresh flag for now. */
+ if (lockres->l_type != OCFS2_LOCK_TYPE_META)
+ lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
+
+ /* set it to something invalid so if we get called again we
+ * can catch it. */
+ lockres->l_action = OCFS2_AST_INVALID;
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+ wake_up(&lockres->l_event);
+
+ mlog_exit_void();
+}
+
+static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
+ int level)
+{
+ int needs_downconvert = 0;
+ mlog_entry_void();
+
+ assert_spin_locked(&lockres->l_lock);
+
+ lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
+
+ if (level > lockres->l_blocking) {
+ /* only schedule a downconvert if we haven't already scheduled
+ * one that goes low enough to satisfy the level we're
+ * blocking. this also catches the case where we get
+ * duplicate BASTs */
+ if (ocfs2_highest_compat_lock_level(level) <
+ ocfs2_highest_compat_lock_level(lockres->l_blocking))
+ needs_downconvert = 1;
+
+ lockres->l_blocking = level;
+ }
+
+ mlog_exit(needs_downconvert);
+ return needs_downconvert;
+}
+
+static void ocfs2_generic_bast_func(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres,
+ int level)
+{
+ int needs_downconvert;
+ unsigned long flags;
+
+ mlog_entry_void();
+
+ BUG_ON(level <= LKM_NLMODE);
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
+ if (needs_downconvert)
+ ocfs2_schedule_blocked_lock(osb, lockres);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ ocfs2_kick_vote_thread(osb);
+
+ wake_up(&lockres->l_event);
+ mlog_exit_void();
+}
+
+static void ocfs2_inode_bast_func(void *opaque, int level)
+{
+ struct ocfs2_lock_res *lockres = opaque;
+ struct inode *inode;
+ struct ocfs2_super *osb;
+
+ mlog_entry_void();
+
+ BUG_ON(!ocfs2_is_inode_lock(lockres));
+
+ inode = ocfs2_lock_res_inode(lockres);
+ osb = OCFS2_SB(inode->i_sb);
+
+ mlog(0, "BAST fired for inode %"MLFu64", blocking = %d, level = %d "
+ "type = %s\n", OCFS2_I(inode)->ip_blkno, level,
+ lockres->l_level,
+ ocfs2_lock_type_string(lockres->l_type));
+
+ ocfs2_generic_bast_func(osb, lockres, level);
+
+ mlog_exit_void();
+}
+
+static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres,
+ int ignore_refresh)
+{
+ struct dlm_lockstatus *lksb = &lockres->l_lksb;
+ unsigned long flags;
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+
+ if (lksb->status != DLM_NORMAL) {
+ mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
+ lockres->l_name, lksb->status);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+ return;
+ }
+
+ switch(lockres->l_action) {
+ case OCFS2_AST_ATTACH:
+ ocfs2_generic_handle_attach_action(lockres);
+ break;
+ case OCFS2_AST_CONVERT:
+ ocfs2_generic_handle_convert_action(lockres);
+ break;
+ case OCFS2_AST_DOWNCONVERT:
+ ocfs2_generic_handle_downconvert_action(lockres);
+ break;
+ default:
+ BUG();
+ }
+
+ if (ignore_refresh)
+ lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
+
+ /* set it to something invalid so if we get called again we
+ * can catch it. */
+ lockres->l_action = OCFS2_AST_INVALID;
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ wake_up(&lockres->l_event);
+}
+
+static void ocfs2_super_ast_func(void *opaque)
+{
+ struct ocfs2_lock_res *lockres = opaque;
+
+ mlog_entry_void();
+ mlog(0, "Superblock AST fired\n");
+
+ BUG_ON(!ocfs2_is_super_lock(lockres));
+ ocfs2_generic_ast_func(lockres, 0);
+
+ mlog_exit_void();
+}
+
+static void ocfs2_super_bast_func(void *opaque,
+ int level)
+{
+ struct ocfs2_lock_res *lockres = opaque;
+ struct ocfs2_super *osb;
+
+ mlog_entry_void();
+ mlog(0, "Superblock BAST fired\n");
+
+ BUG_ON(!ocfs2_is_super_lock(lockres));
+ osb = ocfs2_lock_res_super(lockres);
+ ocfs2_generic_bast_func(osb, lockres, level);
+
+ mlog_exit_void();
+}
+
+static void ocfs2_rename_ast_func(void *opaque)
+{
+ struct ocfs2_lock_res *lockres = opaque;
+
+ mlog_entry_void();
+
+ mlog(0, "Rename AST fired\n");
+
+ BUG_ON(!ocfs2_is_rename_lock(lockres));
+
+ ocfs2_generic_ast_func(lockres, 1);
+
+ mlog_exit_void();
+}
+
+static void ocfs2_rename_bast_func(void *opaque,
+ int level)
+{
+ struct ocfs2_lock_res *lockres = opaque;
+ struct ocfs2_super *osb;
+
+ mlog_entry_void();
+
+ mlog(0, "Rename BAST fired\n");
+
+ BUG_ON(!ocfs2_is_rename_lock(lockres));
+
+ osb = ocfs2_lock_res_super(lockres);
+ ocfs2_generic_bast_func(osb, lockres, level);
+
+ mlog_exit_void();
+}
+
+static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
+ int convert)
+{
+ unsigned long flags;
+
+ mlog_entry_void();
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+ if (convert)
+ lockres->l_action = OCFS2_AST_INVALID;
+ else
+ lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ wake_up(&lockres->l_event);
+ mlog_exit_void();
+}
+
+/* Note: If we detect another process working on the lock (i.e.,
+ * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
+ * to do the right thing in that case.
+ */
+static int ocfs2_lock_create(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres,
+ int level,
+ int dlm_flags)
+{
+ int ret = 0;
+ enum dlm_status status;
+ unsigned long flags;
+
+ mlog_entry_void();
+
+ mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
+ dlm_flags);
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
+ (lockres->l_flags & OCFS2_LOCK_BUSY)) {
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+ goto bail;
+ }
+
+ lockres->l_action = OCFS2_AST_ATTACH;
+ lockres->l_requested = level;
+ lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ status = dlmlock(osb->dlm,
+ level,
+ &lockres->l_lksb,
+ dlm_flags,
+ lockres->l_name,
+ lockres->l_ops->ast,
+ lockres,
+ lockres->l_ops->bast);
+ if (status != DLM_NORMAL) {
+ ocfs2_log_dlm_error("dlmlock", status, lockres);
+ ret = -EINVAL;
+ ocfs2_recover_from_dlm_error(lockres, 1);
+ }
+
+ mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
+
+bail:
+ mlog_exit(ret);
+ return ret;
+}
+
+static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
+ int flag)
+{
+ unsigned long flags;
+ int ret;
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ ret = lockres->l_flags & flag;
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ return ret;
+}
+
+static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
+
+{
+ wait_event(lockres->l_event,
+ !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
+}
+
+static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
+
+{
+ wait_event(lockres->l_event,
+ !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
+}
+
+/* predict what lock level we'll be dropping down to on behalf
+ * of another node, and return true if the currently wanted
+ * level will be compatible with it. */
+static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
+ int wanted)
+{
+ BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
+
+ return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
+}
+
+static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
+{
+ INIT_LIST_HEAD(&mw->mw_item);
+ init_completion(&mw->mw_complete);
+}
+
+static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
+{
+ wait_for_completion(&mw->mw_complete);
+ /* Re-arm the completion in case we want to wait on it again */
+ INIT_COMPLETION(mw->mw_complete);
+ return mw->mw_status;
+}
+
+static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
+ struct ocfs2_mask_waiter *mw,
+ unsigned long mask,
+ unsigned long goal)
+{
+ BUG_ON(!list_empty(&mw->mw_item));
+
+ assert_spin_locked(&lockres->l_lock);
+
+ list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
+ mw->mw_mask = mask;
+ mw->mw_goal = goal;
+}
+
+/* returns 0 if the mw that was removed was already satisfied, -EBUSY
+ * if the mask still hadn't reached its goal */
+static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
+ struct ocfs2_mask_waiter *mw)
+{
+ unsigned long flags;
+ int ret = 0;
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ if (!list_empty(&mw->mw_item)) {
+ if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
+ ret = -EBUSY;
+
+ list_del_init(&mw->mw_item);
+ init_completion(&mw->mw_complete);
+ }
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ return ret;
+
+}
+
+static int ocfs2_cluster_lock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres,
+ int level,
+ int lkm_flags,
+ int arg_flags)
+{
+ struct ocfs2_mask_waiter mw;
+ enum dlm_status status;
+ int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
+ int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
+ unsigned long flags;
+
+ mlog_entry_void();
+
+ ocfs2_init_mask_waiter(&mw);
+
+again:
+ wait = 0;
+
+ if (catch_signals && signal_pending(current)) {
+ ret = -ERESTARTSYS;
+ goto out;
+ }
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+
+ mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
+ "Cluster lock called on freeing lockres %s! flags "
+ "0x%lx\n", lockres->l_name, lockres->l_flags);
+
+ /* We only compare against the currently granted level
+ * here. If the lock is blocked waiting on a downconvert,
+ * we'll get caught below. */
+ if (lockres->l_flags & OCFS2_LOCK_BUSY &&
+ level > lockres->l_level) {
+ /* is someone sitting in dlm_lock? If so, wait on
+ * them. */
+ lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
+ wait = 1;
+ goto unlock;
+ }
+
+ if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
+ /* lock has not been created yet. */
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
+ if (ret < 0) {
+ mlog_errno(ret);
+ goto out;
+ }
+ goto again;
+ }
+
+ if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
+ !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
+ /* is the lock is currently blocked on behalf of
+ * another node */
+ lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
+ wait = 1;
+ goto unlock;
+ }
+
+ if (level > lockres->l_level) {
+ if (lockres->l_action != OCFS2_AST_INVALID)
+ mlog(ML_ERROR, "lockres %s has action %u pending\n",
+ lockres->l_name, lockres->l_action);
+
+ lockres->l_action = OCFS2_AST_CONVERT;
+ lockres->l_requested = level;
+ lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ BUG_ON(level == LKM_IVMODE);
+ BUG_ON(level == LKM_NLMODE);
+
+ mlog(0, "lock %s, convert from %d to level = %d\n",
+ lockres->l_name, lockres->l_level, level);
+
+ /* call dlm_lock to upgrade lock now */
+ status = dlmlock(osb->dlm,
+ level,
+ &lockres->l_lksb,
+ lkm_flags|LKM_CONVERT|LKM_VALBLK,
+ lockres->l_name,
+ lockres->l_ops->ast,
+ lockres,
+ lockres->l_ops->bast);
+ if (status != DLM_NORMAL) {
+ if ((lkm_flags & LKM_NOQUEUE) &&
+ (status == DLM_NOTQUEUED))
+ ret = -EAGAIN;
+ else {
+ ocfs2_log_dlm_error("dlmlock", status,
+ lockres);
+ ret = -EINVAL;
+ }
+ ocfs2_recover_from_dlm_error(lockres, 1);
+ goto out;
+ }
+
+ mlog(0, "lock %s, successfull return from dlmlock\n",
+ lockres->l_name);
+
+ /* At this point we've gone inside the dlm and need to
+ * complete our work regardless. */
+ catch_signals = 0;
+
+ /* wait for busy to clear and carry on */
+ goto again;
+ }
+
+ /* Ok, if we get here then we're good to go. */
+ ocfs2_inc_holders(lockres, level);
+
+ ret = 0;
+unlock:
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+out:
+ /*
+ * This is helping work around a lock inversion between the page lock
+ * and dlm locks. One path holds the page lock while calling aops
+ * which block acquiring dlm locks. The voting thread holds dlm
+ * locks while acquiring page locks while down converting data locks.
+ * This block is helping an aop path notice the inversion and back
+ * off to unlock its page lock before trying the dlm lock again.
+ */
+ if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
+ mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
+ wait = 0;
+ if (lockres_remove_mask_waiter(lockres, &mw))
+ ret = -EAGAIN;
+ else
+ goto again;
+ }
+ if (wait) {
+ ret = ocfs2_wait_for_mask(&mw);
+ if (ret == 0)
+ goto again;
+ mlog_errno(ret);
+ }
+
+ mlog_exit(ret);
+ return ret;
+}
+
+static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres,
+ int level)
+{
+ unsigned long flags;
+
+ mlog_entry_void();
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ ocfs2_dec_holders(lockres, level);
+ ocfs2_vote_on_unlock(osb, lockres);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+ mlog_exit_void();
+}
+
+static int ocfs2_create_new_inode_lock(struct inode *inode,
+ struct ocfs2_lock_res *lockres)
+{
+ struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+ unsigned long flags;
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
+ lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ return ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL);
+}
+
+/* Grants us an EX lock on the data and metadata resources, skipping
+ * the normal cluster directory lookup. Use this ONLY on newly created
+ * inodes which other nodes can't possibly see, and which haven't been
+ * hashed in the inode hash yet. This can give us a good performance
+ * increase as it'll skip the network broadcast normally associated
+ * with creating a new lock resource. */
+int ocfs2_create_new_inode_locks(struct inode *inode)
+{
+ int ret;
+
+ BUG_ON(!inode);
+ BUG_ON(!ocfs2_inode_is_new(inode));
+
+ mlog_entry_void();
+
+ mlog(0, "Inode %"MLFu64"\n", OCFS2_I(inode)->ip_blkno);
+
+ /* NOTE: That we don't increment any of the holder counts, nor
+ * do we add anything to a journal handle. Since this is
+ * supposed to be a new inode which the cluster doesn't know
+ * about yet, there is no need to. As far as the LVB handling
+ * is concerned, this is basically like acquiring an EX lock
+ * on a resource which has an invalid one -- we'll set it
+ * valid when we release the EX. */
+
+ ret = ocfs2_create_new_inode_lock(inode,
+ &OCFS2_I(inode)->ip_rw_lockres);
+ if (ret) {
+ mlog_errno(ret);
+ goto bail;
+ }
+
+ ret = ocfs2_create_new_inode_lock(inode,
+ &OCFS2_I(inode)->ip_meta_lockres);
+ if (ret) {
+ mlog_errno(ret);
+ goto bail;
+ }
+
+ ret = ocfs2_create_new_inode_lock(inode,
+ &OCFS2_I(inode)->ip_data_lockres);
+ if (ret) {
+ mlog_errno(ret);
+ goto bail;
+ }
+
+bail:
+ mlog_exit(ret);
+ return ret;
+}
+
+int ocfs2_rw_lock(struct inode *inode, int write)
+{
+ int status, level;
+ struct ocfs2_lock_res *lockres;
+
+ BUG_ON(!inode);
+
+ mlog_entry_void();
+
+ mlog(0, "inode %"MLFu64" take %s RW lock\n",
+ OCFS2_I(inode)->ip_blkno,
+ write ? "EXMODE" : "PRMODE");
+
+ lockres = &OCFS2_I(inode)->ip_rw_lockres;
+
+ level = write ? LKM_EXMODE : LKM_PRMODE;
+
+ status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
+ 0);
+ if (status < 0)
+ mlog_errno(status);
+
+ mlog_exit(status);
+ return status;
+}
+
+void ocfs2_rw_unlock(struct inode *inode, int write)
+{
+ int level = write ? LKM_EXMODE : LKM_PRMODE;
+ struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
+
+ mlog_entry_void();
+
+ mlog(0, "inode %"MLFu64" drop %s RW lock\n",
+ OCFS2_I(inode)->ip_blkno,
+ write ? "EXMODE" : "PRMODE");
+
+ ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, leve