summaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmmaster.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c2664
1 files changed, 2664 insertions, 0 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
new file mode 100644
index 000000000000..27e984f7e4cd
--- /dev/null
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -0,0 +1,2664 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * dlmmod.c
+ *
+ * standalone DLM module
+ *
+ * Copyright (C) 2004 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ *
+ */
+
+
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/utsname.h>
+#include <linux/init.h>
+#include <linux/sysctl.h>
+#include <linux/random.h>
+#include <linux/blkdev.h>
+#include <linux/socket.h>
+#include <linux/inet.h>
+#include <linux/spinlock.h>
+#include <linux/delay.h>
+
+
+#include "cluster/heartbeat.h"
+#include "cluster/nodemanager.h"
+#include "cluster/tcp.h"
+
+#include "dlmapi.h"
+#include "dlmcommon.h"
+#include "dlmdebug.h"
+#include "dlmdomain.h"
+
+#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
+#include "cluster/masklog.h"
+
+enum dlm_mle_type {
+ DLM_MLE_BLOCK,
+ DLM_MLE_MASTER,
+ DLM_MLE_MIGRATION
+};
+
+struct dlm_lock_name
+{
+ u8 len;
+ u8 name[DLM_LOCKID_NAME_MAX];
+};
+
+struct dlm_master_list_entry
+{
+ struct list_head list;
+ struct list_head hb_events;
+ struct dlm_ctxt *dlm;
+ spinlock_t spinlock;
+ wait_queue_head_t wq;
+ atomic_t woken;
+ struct kref mle_refs;
+ unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+ unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+ unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+ unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+ u8 master;
+ u8 new_master;
+ enum dlm_mle_type type;
+ struct o2hb_callback_func mle_hb_up;
+ struct o2hb_callback_func mle_hb_down;
+ union {
+ struct dlm_lock_resource *res;
+ struct dlm_lock_name name;
+ } u;
+};
+
+static void dlm_mle_node_down(struct dlm_ctxt *dlm,
+ struct dlm_master_list_entry *mle,
+ struct o2nm_node *node,
+ int idx);
+static void dlm_mle_node_up(struct dlm_ctxt *dlm,
+ struct dlm_master_list_entry *mle,
+ struct o2nm_node *node,
+ int idx);
+
+static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
+static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
+ unsigned int namelen, void *nodemap,
+ u32 flags);
+
+static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
+ struct dlm_master_list_entry *mle,
+ const char *name,
+ unsigned int namelen)
+{
+ struct dlm_lock_resource *res;
+
+ if (dlm != mle->dlm)
+ return 0;
+
+ if (mle->type == DLM_MLE_BLOCK ||
+ mle->type == DLM_MLE_MIGRATION) {
+ if (namelen != mle->u.name.len ||
+ memcmp(name, mle->u.name.name, namelen)!=0)
+ return 0;
+ } else {
+ res = mle->u.res;
+ if (namelen != res->lockname.len ||
+ memcmp(res->lockname.name, name, namelen) != 0)
+ return 0;
+ }
+ return 1;
+}
+
+#if 0
+/* Code here is included but defined out as it aids debugging */
+
+void dlm_print_one_mle(struct dlm_master_list_entry *mle)
+{
+ int i = 0, refs;
+ char *type;
+ char attached;
+ u8 master;
+ unsigned int namelen;
+ const char *name;
+ struct kref *k;
+
+ k = &mle->mle_refs;
+ if (mle->type == DLM_MLE_BLOCK)
+ type = "BLK";
+ else if (mle->type == DLM_MLE_MASTER)
+ type = "MAS";
+ else
+ type = "MIG";
+ refs = atomic_read(&k->refcount);
+ master = mle->master;
+ attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
+
+ if (mle->type != DLM_MLE_MASTER) {
+ namelen = mle->u.name.len;
+ name = mle->u.name.name;
+ } else {
+ namelen = mle->u.res->lockname.len;
+ name = mle->u.res->lockname.name;
+ }
+
+ mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n",
+ i, type, refs, master, mle->new_master, attached,
+ namelen, namelen, name);
+}
+
+static void dlm_dump_mles(struct dlm_ctxt *dlm)
+{
+ struct dlm_master_list_entry *mle;
+ struct list_head *iter;
+
+ mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
+ mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
+ spin_lock(&dlm->master_lock);
+ list_for_each(iter, &dlm->master_list) {
+ mle = list_entry(iter, struct dlm_master_list_entry, list);
+ dlm_print_one_mle(mle);
+ }
+ spin_unlock(&dlm->master_lock);
+}
+
+int dlm_dump_all_mles(const char __user *data, unsigned int len)
+{
+ struct list_head *iter;
+ struct dlm_ctxt *dlm;
+
+ spin_lock(&dlm_domain_lock);
+ list_for_each(iter, &dlm_domains) {
+ dlm = list_entry (iter, struct dlm_ctxt, list);
+ mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
+ dlm_dump_mles(dlm);
+ }
+ spin_unlock(&dlm_domain_lock);
+ return len;
+}
+EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
+
+#endif /* 0 */
+
+
+static kmem_cache_t *dlm_mle_cache = NULL;
+
+
+static void dlm_mle_release(struct kref *kref);
+static void dlm_init_mle(struct dlm_master_list_entry *mle,
+ enum dlm_mle_type type,
+ struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ const char *name,
+ unsigned int namelen);
+static void dlm_put_mle(struct dlm_master_list_entry *mle);
+static void __dlm_put_mle(struct dlm_master_list_entry *mle);
+static int dlm_find_mle(struct dlm_ctxt *dlm,
+ struct dlm_master_list_entry **mle,
+ char *name, unsigned int namelen);
+
+static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
+
+
+static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ struct dlm_master_list_entry *mle,
+ int *blocked);
+static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ struct dlm_master_list_entry *mle,
+ int blocked);
+static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ struct dlm_master_list_entry *mle,
+ struct dlm_master_list_entry **oldmle,
+ const char *name, unsigned int namelen,
+ u8 new_master, u8 master);
+
+static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res);
+static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res);
+static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ u8 target);
+
+
+int dlm_is_host_down(int errno)
+{
+ switch (errno) {
+ case -EBADF:
+ case -ECONNREFUSED:
+ case -ENOTCONN:
+ case -ECONNRESET:
+ case -EPIPE:
+ case -EHOSTDOWN:
+ case -EHOSTUNREACH:
+ case -ETIMEDOUT:
+ case -ECONNABORTED:
+ case -ENETDOWN:
+ case -ENETUNREACH:
+ case -ENETRESET:
+ case -ESHUTDOWN:
+ case -ENOPROTOOPT:
+ case -EINVAL: /* if returned from our tcp code,
+ this means there is no socket */
+ return 1;
+ }
+ return 0;
+}
+
+
+/*
+ * MASTER LIST FUNCTIONS
+ */
+
+
+/*
+ * regarding master list entries and heartbeat callbacks:
+ *
+ * in order to avoid sleeping and allocation that occurs in
+ * heartbeat, master list entries are simply attached to the
+ * dlm's established heartbeat callbacks. the mle is attached
+ * when it is created, and since the dlm->spinlock is held at
+ * that time, any heartbeat event will be properly discovered
+ * by the mle. the mle needs to be detached from the
+ * dlm->mle_hb_events list as soon as heartbeat events are no
+ * longer useful to the mle, and before the mle is freed.
+ *
+ * as a general rule, heartbeat events are no longer needed by
+ * the mle once an "answer" regarding the lock master has been
+ * received.
+ */
+static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
+ struct dlm_master_list_entry *mle)
+{
+ assert_spin_locked(&dlm->spinlock);
+
+ list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
+}
+
+
+static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
+ struct dlm_master_list_entry *mle)
+{
+ if (!list_empty(&mle->hb_events))
+ list_del_init(&mle->hb_events);
+}
+
+
+static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
+ struct dlm_master_list_entry *mle)
+{
+ spin_lock(&dlm->spinlock);
+ __dlm_mle_detach_hb_events(dlm, mle);
+ spin_unlock(&dlm->spinlock);
+}
+
+/* remove from list and free */
+static void __dlm_put_mle(struct dlm_master_list_entry *mle)
+{
+ struct dlm_ctxt *dlm;
+ dlm = mle->dlm;
+
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&dlm->master_lock);
+ BUG_ON(!atomic_read(&mle->mle_refs.refcount));
+
+ kref_put(&mle->mle_refs, dlm_mle_release);
+}
+
+
+/* must not have any spinlocks coming in */
+static void dlm_put_mle(struct dlm_master_list_entry *mle)
+{
+ struct dlm_ctxt *dlm;
+ dlm = mle->dlm;
+
+ spin_lock(&dlm->spinlock);
+ spin_lock(&dlm->master_lock);
+ __dlm_put_mle(mle);
+ spin_unlock(&dlm->master_lock);
+ spin_unlock(&dlm->spinlock);
+}
+
+static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
+{
+ kref_get(&mle->mle_refs);
+}
+
+static void dlm_init_mle(struct dlm_master_list_entry *mle,
+ enum dlm_mle_type type,
+ struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ const char *name,
+ unsigned int namelen)
+{
+ assert_spin_locked(&dlm->spinlock);
+
+ mle->dlm = dlm;
+ mle->type = type;
+ INIT_LIST_HEAD(&mle->list);
+ INIT_LIST_HEAD(&mle->hb_events);
+ memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
+ spin_lock_init(&mle->spinlock);
+ init_waitqueue_head(&mle->wq);
+ atomic_set(&mle->woken, 0);
+ kref_init(&mle->mle_refs);
+ memset(mle->response_map, 0, sizeof(mle->response_map));
+ mle->master = O2NM_MAX_NODES;
+ mle->new_master = O2NM_MAX_NODES;
+
+ if (mle->type == DLM_MLE_MASTER) {
+ BUG_ON(!res);
+ mle->u.res = res;
+ } else if (mle->type == DLM_MLE_BLOCK) {
+ BUG_ON(!name);
+ memcpy(mle->u.name.name, name, namelen);
+ mle->u.name.len = namelen;
+ } else /* DLM_MLE_MIGRATION */ {
+ BUG_ON(!name);
+ memcpy(mle->u.name.name, name, namelen);
+ mle->u.name.len = namelen;
+ }
+
+ /* copy off the node_map and register hb callbacks on our copy */
+ memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
+ memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
+ clear_bit(dlm->node_num, mle->vote_map);
+ clear_bit(dlm->node_num, mle->node_map);
+
+ /* attach the mle to the domain node up/down events */
+ __dlm_mle_attach_hb_events(dlm, mle);
+}
+
+
+/* returns 1 if found, 0 if not */
+static int dlm_find_mle(struct dlm_ctxt *dlm,
+ struct dlm_master_list_entry **mle,
+ char *name, unsigned int namelen)
+{
+ struct dlm_master_list_entry *tmpmle;
+ struct list_head *iter;
+
+ assert_spin_locked(&dlm->master_lock);
+
+ list_for_each(iter, &dlm->master_list) {
+ tmpmle = list_entry(iter, struct dlm_master_list_entry, list);
+ if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
+ continue;
+ dlm_get_mle(tmpmle);
+ *mle = tmpmle;
+ return 1;
+ }
+ return 0;
+}
+
+void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
+{
+ struct dlm_master_list_entry *mle;
+ struct list_head *iter;
+
+ assert_spin_locked(&dlm->spinlock);
+
+ list_for_each(iter, &dlm->mle_hb_events) {
+ mle = list_entry(iter, struct dlm_master_list_entry,
+ hb_events);
+ if (node_up)
+ dlm_mle_node_up(dlm, mle, NULL, idx);
+ else
+ dlm_mle_node_down(dlm, mle, NULL, idx);
+ }
+}
+
+static void dlm_mle_node_down(struct dlm_ctxt *dlm,
+ struct dlm_master_list_entry *mle,
+ struct o2nm_node *node, int idx)
+{
+ spin_lock(&mle->spinlock);
+
+ if (!test_bit(idx, mle->node_map))
+ mlog(0, "node %u already removed from nodemap!\n", idx);
+ else
+ clear_bit(idx, mle->node_map);
+
+ spin_unlock(&mle->spinlock);
+}
+
+static void dlm_mle_node_up(struct dlm_ctxt *dlm,
+ struct dlm_master_list_entry *mle,
+ struct o2nm_node *node, int idx)
+{
+ spin_lock(&mle->spinlock);
+
+ if (test_bit(idx, mle->node_map))
+ mlog(0, "node %u already in node map!\n", idx);
+ else
+ set_bit(idx, mle->node_map);
+
+ spin_unlock(&mle->spinlock);
+}
+
+
+int dlm_init_mle_cache(void)
+{
+ dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
+ sizeof(struct dlm_master_list_entry),
+ 0, SLAB_HWCACHE_ALIGN,
+ NULL, NULL);
+ if (dlm_mle_cache == NULL)
+ return -ENOMEM;
+ return 0;
+}
+
+void dlm_destroy_mle_cache(void)
+{
+ if (dlm_mle_cache)
+ kmem_cache_destroy(dlm_mle_cache);
+}
+
+static void dlm_mle_release(struct kref *kref)
+{
+ struct dlm_master_list_entry *mle;
+ struct dlm_ctxt *dlm;
+
+ mlog_entry_void();
+
+ mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
+ dlm = mle->dlm;
+
+ if (mle->type != DLM_MLE_MASTER) {
+ mlog(0, "calling mle_release for %.*s, type %d\n",
+ mle->u.name.len, mle->u.name.name, mle->type);
+ } else {
+ mlog(0, "calling mle_release for %.*s, type %d\n",
+ mle->u.res->lockname.len,
+ mle->u.res->lockname.name, mle->type);
+ }
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&dlm->master_lock);
+
+ /* remove from list if not already */
+ if (!list_empty(&mle->list))
+ list_del_init(&mle->list);
+
+ /* detach the mle from the domain node up/down events */
+ __dlm_mle_detach_hb_events(dlm, mle);
+
+ /* NOTE: kfree under spinlock here.
+ * if this is bad, we can move this to a freelist. */
+ kmem_cache_free(dlm_mle_cache, mle);
+}
+
+
+/*
+ * LOCK RESOURCE FUNCTIONS
+ */
+
+static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ u8 owner)
+{
+ assert_spin_locked(&res->spinlock);
+
+ mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
+
+ if (owner == dlm->node_num)
+ atomic_inc(&dlm->local_resources);
+ else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
+ atomic_inc(&dlm->unknown_resources);
+ else
+ atomic_inc(&dlm->remote_resources);
+
+ res->owner = owner;
+}
+
+void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res, u8 owner)
+{
+ assert_spin_locked(&res->spinlock);
+
+ if (owner == res->owner)
+ return;
+
+ if (res->owner == dlm->node_num)
+ atomic_dec(&dlm->local_resources);
+ else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
+ atomic_dec(&dlm->unknown_resources);
+ else
+ atomic_dec(&dlm->remote_resources);
+
+ dlm_set_lockres_owner(dlm, res, owner);
+}
+
+
+static void dlm_lockres_release(struct kref *kref)
+{
+ struct dlm_lock_resource *res;
+
+ res = container_of(kref, struct dlm_lock_resource, refs);
+
+ /* This should not happen -- all lockres' have a name
+ * associated with them at init time. */
+ BUG_ON(!res->lockname.name);
+
+ mlog(0, "destroying lockres %.*s\n", res->lockname.len,
+ res->lockname.name);
+
+ /* By the time we're ready to blow this guy away, we shouldn't
+ * be on any lists. */
+ BUG_ON(!list_empty(&res->list));
+ BUG_ON(!list_empty(&res->granted));
+ BUG_ON(!list_empty(&res->converting));
+ BUG_ON(!list_empty(&res->blocked));
+ BUG_ON(!list_empty(&res->dirty));
+ BUG_ON(!list_empty(&res->recovering));
+ BUG_ON(!list_empty(&res->purge));
+
+ kfree(res->lockname.name);
+
+ kfree(res);
+}
+
+void dlm_lockres_get(struct dlm_lock_resource *res)
+{
+ kref_get(&res->refs);
+}
+
+void dlm_lockres_put(struct dlm_lock_resource *res)
+{
+ kref_put(&res->refs, dlm_lockres_release);
+}
+
+static void dlm_init_lockres(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ const char *name, unsigned int namelen)
+{
+ char *qname;
+
+ /* If we memset here, we lose our reference to the kmalloc'd
+ * res->lockname.name, so be sure to init every field
+ * correctly! */
+
+ qname = (char *) res->lockname.name;
+ memcpy(qname, name, namelen);
+
+ res->lockname.len = namelen;
+ res->lockname.hash = full_name_hash(name, namelen);
+
+ init_waitqueue_head(&res->wq);
+ spin_lock_init(&res->spinlock);
+ INIT_LIST_HEAD(&res->list);
+ INIT_LIST_HEAD(&res->granted);
+ INIT_LIST_HEAD(&res->converting);
+ INIT_LIST_HEAD(&res->blocked);
+ INIT_LIST_HEAD(&res->dirty);
+ INIT_LIST_HEAD(&res->recovering);
+ INIT_LIST_HEAD(&res->purge);
+ atomic_set(&res->asts_reserved, 0);
+ res->migration_pending = 0;
+
+ kref_init(&res->refs);
+
+ /* just for consistency */
+ spin_lock(&res->spinlock);
+ dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
+ spin_unlock(&res->spinlock);
+
+ res->state = DLM_LOCK_RES_IN_PROGRESS;
+
+ res->last_used = 0;
+
+ memset(res->lvb, 0, DLM_LVB_LEN);
+}
+
+struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
+ const char *name,
+ unsigned int namelen)
+{
+ struct dlm_lock_resource *res;
+
+ res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
+ if (!res)
+ return NULL;
+
+ res->lockname.name = kmalloc(namelen, GFP_KERNEL);
+ if (!res->lockname.name) {
+ kfree(res);
+ return NULL;
+ }
+
+ dlm_init_lockres(dlm, res, name, namelen);
+ return res;
+}
+
+/*
+ * lookup a lock resource by name.
+ * may already exist in the hashtable.
+ * lockid is null terminated
+ *
+ * if not, allocate enough for the lockres and for
+ * the temporary structure used in doing the mastering.
+ *
+ * also, do a lookup in the dlm->master_list to see
+ * if another node has begun mastering the same lock.
+ * if so, there should be a block entry in there
+ * for this name, and we should *not* attempt to master
+ * the lock here. need to wait around for that node
+ * to assert_master (or die).
+ *
+ */
+struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
+ const char *lockid,
+ int flags)
+{
+ struct dlm_lock_resource *tmpres=NULL, *res=NULL;
+ struct dlm_master_list_entry *mle = NULL;
+ struct dlm_master_list_entry *alloc_mle = NULL;
+ int blocked = 0;
+ int ret, nodenum;
+ struct dlm_node_iter iter;
+ unsigned int namelen;
+ int tries = 0;
+
+ BUG_ON(!lockid);
+
+ namelen = strlen(lockid);
+
+ mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
+
+lookup:
+ spin_lock(&dlm->spinlock);
+ tmpres = __dlm_lookup_lockres(dlm, lockid, namelen);
+ if (tmpres) {
+ spin_unlock(&dlm->spinlock);
+ mlog(0, "found in hash!\n");
+ if (res)
+ dlm_lockres_put(res);
+ res = tmpres;
+ goto leave;
+ }
+
+ if (!res) {
+ spin_unlock(&dlm->spinlock);
+ mlog(0, "allocating a new resource\n");
+ /* nothing found and we need to allocate one. */
+ alloc_mle = (struct dlm_master_list_entry *)
+ kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
+ if (!alloc_mle)
+ goto leave;
+ res = dlm_new_lockres(dlm, lockid, namelen);
+ if (!res)
+ goto leave;
+ goto lookup;
+ }
+
+ mlog(0, "no lockres found, allocated our own: %p\n", res);
+
+ if (flags & LKM_LOCAL) {
+ /* caller knows it's safe to assume it's not mastered elsewhere
+ * DONE! return right away */
+ spin_lock(&res->spinlock);
+ dlm_change_lockres_owner(dlm, res, dlm->node_num);
+ __dlm_insert_lockres(dlm, res);
+ spin_unlock(&res->spinlock);
+ spin_unlock(&dlm->spinlock);
+ /* lockres still marked IN_PROGRESS */
+ goto wake_waiters;
+ }
+
+ /* check master list to see if another node has started mastering it */
+ spin_lock(&dlm->master_lock);
+
+ /* if we found a block, wait for lock to be mastered by another node */
+ blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
+ if (blocked) {
+ if (mle->type == DLM_MLE_MASTER) {
+ mlog(ML_ERROR, "master entry for nonexistent lock!\n");
+ BUG();
+ } else if (mle->type == DLM_MLE_MIGRATION) {
+ /* migration is in progress! */
+ /* the good news is that we now know the
+ * "current" master (mle->master). */
+
+ spin_unlock(&dlm->master_lock);
+ assert_spin_locked(&dlm->spinlock);
+
+ /* set the lockres owner and hash it */
+ spin_lock(&res->spinlock);
+ dlm_set_lockres_owner(dlm, res, mle->master);
+ __dlm_insert_lockres(dlm, res);
+ spin_unlock(&res->spinlock);
+ spin_unlock(&dlm->spinlock);
+
+ /* master is known, detach */
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ mle = NULL;
+ goto wake_waiters;
+ }
+ } else {
+ /* go ahead and try to master lock on this node */
+ mle = alloc_mle;
+ /* make sure this does not get freed below */
+ alloc_mle = NULL;
+ dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
+ set_bit(dlm->node_num, mle->maybe_map);
+ list_add(&mle->list, &dlm->master_list);
+ }
+
+ /* at this point there is either a DLM_MLE_BLOCK or a
+ * DLM_MLE_MASTER on the master list, so it's safe to add the
+ * lockres to the hashtable. anyone who finds the lock will
+ * still have to wait on the IN_PROGRESS. */
+
+ /* finally add the lockres to its hash bucket */
+ __dlm_insert_lockres(dlm, res);
+ /* get an extra ref on the mle in case this is a BLOCK
+ * if so, the creator of the BLOCK may try to put the last
+ * ref at this time in the assert master handler, so we
+ * need an extra one to keep from a bad ptr deref. */
+ dlm_get_mle(mle);
+ spin_unlock(&dlm->master_lock);
+ spin_unlock(&dlm->spinlock);
+
+ /* must wait for lock to be mastered elsewhere */
+ if (blocked)
+ goto wait;
+
+redo_request:
+ ret = -EINVAL;
+ dlm_node_iter_init(mle->vote_map, &iter);
+ while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+ ret = dlm_do_master_request(mle, nodenum);
+ if (ret < 0)
+ mlog_errno(ret);
+ if (mle->master != O2NM_MAX_NODES) {
+ /* found a master ! */
+ break;
+ }
+ }
+
+wait:
+ /* keep going until the response map includes all nodes */
+ ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
+ if (ret < 0) {
+ mlog(0, "%s:%.*s: node map changed, redo the "
+ "master request now, blocked=%d\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name, blocked);
+ if (++tries > 20) {
+ mlog(ML_ERROR, "%s:%.*s: spinning on "
+ "dlm_wait_for_lock_mastery, blocked=%d\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name, blocked);
+ dlm_print_one_lock_resource(res);
+ /* dlm_print_one_mle(mle); */
+ tries = 0;
+ }
+ goto redo_request;
+ }
+
+ mlog(0, "lockres mastered by %u\n", res->owner);
+ /* make sure we never continue without this */
+ BUG_ON(res->owner == O2NM_MAX_NODES);
+
+ /* master is known, detach if not already detached */
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ /* put the extra ref */
+ dlm_put_mle(mle);
+
+wake_waiters:
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+
+leave:
+ /* need to free the unused mle */
+ if (alloc_mle)
+ kmem_cache_free(dlm_mle_cache, alloc_mle);
+
+ return res;
+}
+
+
+#define DLM_MASTERY_TIMEOUT_MS 5000
+
+static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ struct dlm_master_list_entry *mle,
+ int *blocked)
+{
+ u8 m;
+ int ret, bit;
+ int map_changed, voting_done;
+ int assert, sleep;
+
+recheck:
+ ret = 0;
+ assert = 0;
+
+ /* check if another node has already become the owner */
+ spin_lock(&res->spinlock);
+ if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
+ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+ spin_unlock(&res->spinlock);
+
+ spin_lock(&mle->spinlock);
+ m = mle->master;
+ map_changed = (memcmp(mle->vote_map, mle->node_map,
+ sizeof(mle->vote_map)) != 0);
+ voting_done = (memcmp(mle->vote_map, mle->response_map,
+ sizeof(mle->vote_map)) == 0);
+
+ /* restart if we hit any errors */
+ if (map_changed) {
+ int b;
+ mlog(0, "%s: %.*s: node map changed, restarting\n",
+ dlm->name, res->lockname.len, res->lockname.name);
+ ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
+ b = (mle->type == DLM_MLE_BLOCK);
+ if ((*blocked && !b) || (!*blocked && b)) {
+ mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ *blocked, b);
+ *blocked = b;
+ }
+ spin_unlock(&mle->spinlock);
+ if (ret < 0) {
+ mlog_errno(ret);
+ goto leave;
+ }
+ mlog(0, "%s:%.*s: restart lock mastery succeeded, "
+ "rechecking now\n", dlm->name, res->lockname.len,
+ res->lockname.name);
+ goto recheck;
+ }
+
+ if (m != O2NM_MAX_NODES) {
+ /* another node has done an assert!
+ * all done! */
+ sleep = 0;
+ } else {
+ sleep = 1;
+ /* have all nodes responded? */
+ if (voting_done && !*blocked) {
+ bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
+ if (dlm->node_num <= bit) {
+ /* my node number is lowest.
+ * now tell other nodes that I am
+ * mastering this. */
+ mle->master = dlm->node_num;
+ assert = 1;
+ sleep = 0;
+ }
+ /* if voting is done, but we have not received
+ * an assert master yet, we must sleep */
+ }
+ }
+
+ spin_unlock(&mle->spinlock);
+
+ /* sleep if we haven't finished voting yet */
+ if (sleep) {
+ unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
+
+ /*
+ if (atomic_read(&mle->mle_refs.refcount) < 2)
+ mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
+ atomic_read(&mle->mle_refs.refcount),
+ res->lockname.len, res->lockname.name);
+ */
+ atomic_set(&mle->woken, 0);
+ (void)wait_event_timeout(mle->wq,
+ (atomic_read(&mle->woken) == 1),
+ timeo);
+ if (res->owner == O2NM_MAX_NODES) {
+ mlog(0, "waiting again\n");
+ goto recheck;
+ }
+ mlog(0, "done waiting, master is %u\n", res->owner);
+ ret = 0;
+ goto leave;
+ }
+
+ ret = 0; /* done */
+ if (assert) {
+ m = dlm->node_num;
+ mlog(0, "about to master %.*s here, this=%u\n",
+ res->lockname.len, res->lockname.name, m);
+ ret = dlm_do_assert_master(dlm, res->lockname.name,
+ res->lockname.len, mle->vote_map, 0);
+ if (ret) {
+ /* This is a failure in the network path,
+ * not in the response to the assert_master
+ * (any nonzero response is a BUG on this node).
+ * Most likely a socket just got disconnected
+ * due to node death. */
+ mlog_errno(ret);
+ }
+ /* no longer need to restart lock mastery.
+ * all living nodes have been contacted. */
+ ret = 0;
+ }
+
+ /* set the lockres owner */
+ spin_lock(&res->spinlock);
+ dlm_change_lockres_owner(dlm, res, m);
+ spin_unlock(&res->spinlock);
+
+leave:
+ return ret;
+}
+
+struct dlm_bitmap_diff_iter
+{
+ int curnode;
+ unsigned long *orig_bm;
+ unsigned long *cur_bm;
+ unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
+};
+
+enum dlm_node_state_change
+{
+ NODE_DOWN = -1,
+ NODE_NO_CHANGE = 0,
+ NODE_UP
+};
+
+static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
+ unsigned long *orig_bm,
+ unsigned long *cur_bm)
+{
+ unsigned long p1, p2;
+ int i;
+
+ iter->curnode = -1;
+ iter->orig_bm = orig_bm;
+ iter->cur_bm = cur_bm;
+
+ for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
+ p1 = *(iter->orig_bm + i);
+ p2 = *(iter->cur_bm + i);
+ iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
+ }
+}
+
+static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
+ enum dlm_node_state_change *state)
+{
+ int bit;
+
+ if (iter->curnode >= O2NM_MAX_NODES)
+ return -ENOENT;
+
+ bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
+ iter->curnode+1);
+ if (bit >= O2NM_MAX_NODES) {
+ iter->curnode = O2NM_MAX_NODES;
+ return -ENOENT;
+ }
+
+ /* if it was there in the original then this node died */
+ if (test_bit(bit, iter->orig_bm))
+ *state = NODE_DOWN;
+ else
+ *state = NODE_UP;
+
+ iter->curnode = bit;
+ return bit;
+}
+
+
+static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ struct dlm_master_list_entry *mle,
+ int blocked)
+{
+ struct dlm_bitmap_diff_iter bdi;
+ enum dlm_node_state_change sc;
+ int node;
+ int ret = 0;
+
+ mlog(0, "something happened such that the "
+ "master process may need to be restarted!\n");
+
+ assert_spin_locked(&mle->spinlock);
+
+ dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
+ node = dlm_bitmap_diff_iter_next(&bdi, &sc);
+ while (node >= 0) {
+ if (sc == NODE_UP) {
+ /* a node came up. easy. might not even need
+ * to talk to it if its node number is higher
+ * or if we are already blocked. */
+ mlog(0, "node up! %d\n", node);
+ if (blocked)
+ goto next;
+
+ if (node > dlm->node_num) {
+ mlog(0, "node > this node. skipping.\n");
+ goto next;
+ }
+
+ /* redo the master request, but only for the new node */
+ mlog(0, "sending request to new node\n");
+ clear_bit(node, mle->response_map);
+ set_bit(node, mle->vote_map);
+ } else {
+ mlog(ML_ERROR, "node down! %d\n", node);
+
+ /* if the node wasn't involved in mastery skip it,
+ * but clear it out from the maps so that it will
+ * not affect mastery of this lockres */
+ clear_bit(node, mle->response_map);
+ clear_bit(node, mle->vote_map);
+ if (!test_bit(node, mle->maybe_map))
+ goto next;
+
+ /* if we're already blocked on lock mastery, and the
+ * dead node wasn't the expected master, or there is
+ * another node in the maybe_map, keep waiting */
+ if (blocked) {
+ int lowest = find_next_bit(mle->maybe_map,
+ O2NM_MAX_NODES, 0);
+
+ /* act like it was never there */
+ clear_bit(node, mle->maybe_map);
+
+ if (node != lowest)
+ goto next;
+
+ mlog(ML_ERROR, "expected master %u died while "
+ "this node was blocked waiting on it!\n",
+ node);
+ lowest = find_next_bit(mle->maybe_map,
+ O2NM_MAX_NODES,
+ lowest+1);
+ if (lowest < O2NM_MAX_NODES) {
+ mlog(0, "still blocked. waiting "
+ "on %u now\n", lowest);
+ goto next;
+ }
+
+ /* mle is an MLE_BLOCK, but there is now
+ * nothing left to block on. we need to return
+ * all the way back out and try again with
+ * an MLE_MASTER. dlm_do_local_recovery_cleanup
+ * has already run, so the mle refcount is ok */
+ mlog(0, "no longer blocking. we can "
+ "try to master this here\n");
+ mle->type = DLM_MLE_MASTER;
+ memset(mle->maybe_map, 0,
+ sizeof(mle->maybe_map));
+ memset(mle->response_map, 0,
+ sizeof(mle->maybe_map));
+ memcpy(mle->vote_map, mle->node_map,
+ sizeof(mle->node_map));
+ mle->u.res = res;
+ set_bit(dlm->node_num, mle->maybe_map);
+
+ ret = -EAGAIN;
+ goto next;
+ }
+
+ clear_bit(node, mle->maybe_map);
+ if (node > dlm->node_num)
+ goto next;
+
+ mlog(0, "dead node in map!\n");
+ /* yuck. go back and re-contact all nodes
+ * in the vote_map, removing this node. */
+ memset(mle->response_map, 0,
+ sizeof(mle->response_map));
+ }
+ ret = -EAGAIN;
+next:
+ node = dlm_bitmap_diff_iter_next(&bdi, &sc);
+ }
+ return ret;
+}
+
+
+/*
+ * DLM_MASTER_REQUEST_MSG
+ *
+ * returns: 0 on success,
+ * -errno on a network error
+ *
+ * on error, the caller should assume the target node is "dead"
+ *
+ */
+
+static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
+{
+ struct dlm_ctxt *dlm = mle->dlm;
+ struct dlm_master_request request;
+ int ret, response=0, resend;
+
+ memset(&request, 0, sizeof(request));
+ request.node_idx = dlm->node_num;
+
+ BUG_ON(mle->type == DLM_MLE_MIGRATION);
+
+ if (mle->type != DLM_MLE_MASTER) {
+ request.namelen = mle->u.name.len;
+ memcpy(request.name, mle->u.name.name, request.namelen);
+ } else {
+ request.namelen = mle->u.res->lockname.len;
+ memcpy(request.name, mle->u.res->lockname.name,
+ request.namelen);
+ }
+
+again:
+ ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
+ sizeof(request), to, &response);
+ if (ret < 0) {
+ if (ret == -ESRCH) {
+ /* should never happen */
+ mlog(ML_ERROR, "TCP stack not ready!\n");
+ BUG();
+ } else if (ret == -EINVAL) {
+ mlog(ML_ERROR, "bad args passed to o2net!\n");
+ BUG();
+ } else if (ret == -ENOMEM) {
+ mlog(ML_ERROR, "out of memory while trying to send "
+ "network message! retrying\n");
+ /* this is totally crude */
+ msleep(50);