summary refs log tree commit diff
diff options
context:
space:
mode:
authorAlex Elder <aelder@sgi.com>2010-05-24 11:57:36 -0500
committerAlex Elder <aelder@sgi.com>2010-05-24 11:57:36 -0500
commit88e88374ee4958786397a57f684de6f1fc5e0242 (patch)
tree750fe86ece5d65e597223eb07c5ce7cf5b3749a0
parent7e125f7b9cbfce4101191b8076d606c517a73066 (diff)
parentccf7c23fc129e75ef60e6f59f60a485b7a056598 (diff)
downloadlinux-88e88374ee4958786397a57f684de6f1fc5e0242.tar.gz
Merge branch 'delayed-logging-for-2.6.35' into for-linus
-rw-r--r--Documentation/filesystems/xfs-delayed-logging-design.txt816
-rw-r--r--fs/xfs/Makefile1
-rw-r--r--fs/xfs/linux-2.6/xfs_buf.c9
-rw-r--r--fs/xfs/linux-2.6/xfs_quotaops.c1
-rw-r--r--fs/xfs/linux-2.6/xfs_super.c12
-rw-r--r--fs/xfs/linux-2.6/xfs_trace.h83
-rw-r--r--fs/xfs/quota/xfs_dquot.c6
-rw-r--r--fs/xfs/xfs_ag.h24
-rw-r--r--fs/xfs/xfs_alloc.c357
-rw-r--r--fs/xfs/xfs_alloc.h7
-rw-r--r--fs/xfs/xfs_alloc_btree.c2
-rw-r--r--fs/xfs/xfs_buf_item.c166
-rw-r--r--fs/xfs/xfs_buf_item.h18
-rw-r--r--fs/xfs/xfs_error.c2
-rw-r--r--fs/xfs/xfs_log.c120
-rw-r--r--fs/xfs/xfs_log.h14
-rw-r--r--fs/xfs/xfs_log_cil.c725
-rw-r--r--fs/xfs/xfs_log_priv.h118
-rw-r--r--fs/xfs/xfs_log_recover.c46
-rw-r--r--fs/xfs/xfs_log_recover.h2
-rw-r--r--fs/xfs/xfs_mount.h1
-rw-r--r--fs/xfs/xfs_trans.c144
-rw-r--r--fs/xfs/xfs_trans.h44
-rw-r--r--fs/xfs/xfs_trans_buf.c46
-rw-r--r--fs/xfs/xfs_trans_item.c114
-rw-r--r--fs/xfs/xfs_trans_priv.h15
-rw-r--r--fs/xfs/xfs_types.h2
27 files changed, 2382 insertions, 513 deletions
diff --git a/Documentation/filesystems/xfs-delayed-logging-design.txt b/Documentation/filesystems/xfs-delayed-logging-design.txt
new file mode 100644
index 000000000000..d8119e9d2d60
--- /dev/null
+++ b/Documentation/filesystems/xfs-delayed-logging-design.txt
@@ -0,0 +1,816 @@
+XFS Delayed Logging Design
+--------------------------
+
+Introduction to Re-logging in XFS
+---------------------------------
+
+XFS logging is a combination of logical and physical logging. Some objects,
+such as inodes and dquots, are logged in logical format where the details
+logged are made up of the changes to in-core structures rather than on-disk
+structures. Other objects - typically buffers - have their physical changes
+logged. The reason for these differences is to reduce the amount of log space
+required for objects that are frequently logged. Some parts of inodes are more
+frequently logged than others, and inodes are typically more frequently logged
+than any other object (except maybe the superblock buffer) so keeping the
+amount of metadata logged low is of prime importance.
+
+The reason that this is such a concern is that XFS allows multiple separate
+modifications to a single object to be carried in the log at any given time.
+This allows the log to avoid needing to flush each change to disk before
+recording a new change to the object. XFS does this via a method called
+"re-logging". Conceptually, this is quite simple - all it requires is that any
+new change to the object is recorded with a *new copy* of all the existing
+changes in the new transaction that is written to the log.
+
+That is, if we have a sequence of changes A through to F, and the object was
+written to disk after change D, we would see in the log the following series
+of transactions, their contents and the log sequence number (LSN) of the
+transaction:
+
+	Transaction		Contents	LSN
+	   A			   A		   X
+	   B			  A+B		  X+n
+	   C			 A+B+C		 X+n+m
+	   D			A+B+C+D		X+n+m+o
+	    <object written to disk>
+	   E			   E		   Y (> X+n+m+o)
+	   F			  E+F		  Yٍ+p
+
+In other words, each time an object is relogged, the new transaction contains
+the aggregation of all the previous changes currently held only in the log.
+
+This relogging technique also allows objects to be moved forward in the log so
+that an object being relogged does not prevent the tail of the log from ever
+moving forward.  This can be seen in the table above by the changing
+(increasing) LSN of each subsquent transaction - the LSN is effectively a
+direct encoding of the location in the log of the transaction.
+
+This relogging is also used to implement long-running, multiple-commit
+transactions.  These transaction are known as rolling transactions, and require
+a special log reservation known as a permanent transaction reservation. A
+typical example of a rolling transaction is the removal of extents from an
+inode which can only be done at a rate of two extents per transaction because
+of reservation size limitations. Hence a rolling extent removal transaction
+keeps relogging the inode and btree buffers as they get modified in each
+removal operation. This keeps them moving forward in the log as the operation
+progresses, ensuring that current operation never gets blocked by itself if the
+log wraps around.
+
+Hence it can be seen that the relogging operation is fundamental to the correct
+working of the XFS journalling subsystem. From the above description, most
+people should be able to see why the XFS metadata operations writes so much to
+the log - repeated operations to the same objects write the same changes to
+the log over and over again. Worse is the fact that objects tend to get
+dirtier as they get relogged, so each subsequent transaction is writing more
+metadata into the log.
+
+Another feature of the XFS transaction subsystem is that most transactions are
+asynchronous. That is, they don't commit to disk until either a log buffer is
+filled (a log buffer can hold multiple transactions) or a synchronous operation
+forces the log buffers holding the transactions to disk. This means that XFS is
+doing aggregation of transactions in memory - batching them, if you like - to
+minimise the impact of the log IO on transaction throughput.
+
+The limitation on asynchronous transaction throughput is the number and size of
+log buffers made available by the log manager. By default there are 8 log
+buffers available and the size of each is 32kB - the size can be increased up
+to 256kB by use of a mount option.
+
+Effectively, this gives us the maximum bound of outstanding metadata changes
+that can be made to the filesystem at any point in time - if all the log
+buffers are full and under IO, then no more transactions can be committed until
+the current batch completes. It is now common for a single current CPU core to
+be to able to issue enough transactions to keep the log buffers full and under
+IO permanently. Hence the XFS journalling subsystem can be considered to be IO
+bound.
+
+Delayed Logging: Concepts
+-------------------------
+
+The key thing to note about the asynchronous logging combined with the
+relogging technique XFS uses is that we can be relogging changed objects
+multiple times before they are committed to disk in the log buffers. If we
+return to the previous relogging example, it is entirely possible that
+transactions A through D are committed to disk in the same log buffer.
+
+That is, a single log buffer may contain multiple copies of the same object,
+but only one of those copies needs to be there - the last one "D", as it
+contains all the changes from the previous changes. In other words, we have one
+necessary copy in the log buffer, and three stale copies that are simply
+wasting space. When we are doing repeated operations on the same set of
+objects, these "stale objects" can be over 90% of the space used in the log
+buffers. It is clear that reducing the number of stale objects written to the
+log would greatly reduce the amount of metadata we write to the log, and this
+is the fundamental goal of delayed logging.
+
+From a conceptual point of view, XFS is already doing relogging in memory (where
+memory == log buffer), only it is doing it extremely inefficiently. It is using
+logical to physical formatting to do the relogging because there is no
+infrastructure to keep track of logical changes in memory prior to physically
+formatting the changes in a transaction to the log buffer. Hence we cannot avoid
+accumulating stale objects in the log buffers.
+
+Delayed logging is the name we've given to keeping and tracking transactional
+changes to objects in memory outside the log buffer infrastructure. Because of
+the relogging concept fundamental to the XFS journalling subsystem, this is
+actually relatively easy to do - all the changes to logged items are already
+tracked in the current infrastructure. The big problem is how to accumulate
+them and get them to the log in a consistent, recoverable manner.
+Describing the problems and how they have been solved is the focus of this
+document.
+
+One of the key changes that delayed logging makes to the operation of the
+journalling subsystem is that it disassociates the amount of outstanding
+metadata changes from the size and number of log buffers available. In other
+words, instead of there only being a maximum of 2MB of transaction changes not
+written to the log at any point in time, there may be a much greater amount
+being accumulated in memory. Hence the potential for loss of metadata on a
+crash is much greater than for the existing logging mechanism.
+
+It should be noted that this does not change the guarantee that log recovery
+will result in a consistent filesystem. What it does mean is that as far as the
+recovered filesystem is concerned, there may be many thousands of transactions
+that simply did not occur as a result of the crash. This makes it even more
+important that applications that care about their data use fsync() where they
+need to ensure application level data integrity is maintained.
+
+It should be noted that delayed logging is not an innovative new concept that
+warrants rigorous proofs to determine whether it is correct or not. The method
+of accumulating changes in memory for some period before writing them to the
+log is used effectively in many filesystems including ext3 and ext4. Hence
+no time is spent in this document trying to convince the reader that the
+concept is sound. Instead it is simply considered a "solved problem" and as
+such implementing it in XFS is purely an exercise in software engineering.
+
+The fundamental requirements for delayed logging in XFS are simple:
+
+	1. Reduce the amount of metadata written to the log by at least
+	   an order of magnitude.
+	2. Supply sufficient statistics to validate Requirement #1.
+	3. Supply sufficient new tracing infrastructure to be able to debug
+	   problems with the new code.
+	4. No on-disk format change (metadata or log format).
+	5. Enable and disable with a mount option.
+	6. No performance regressions for synchronous transaction workloads.
+
+Delayed Logging: Design
+-----------------------
+
+Storing Changes
+
+The problem with accumulating changes at a logical level (i.e. just using the
+existing log item dirty region tracking) is that when it comes to writing the
+changes to the log buffers, we need to ensure that the object we are formatting
+is not changing while we do this. This requires locking the object to prevent
+concurrent modification. Hence flushing the logical changes to the log would
+require us to lock every object, format them, and then unlock them again.
+
+This introduces lots of scope for deadlocks with transactions that are already
+running. For example, a transaction has object A locked and modified, but needs
+the delayed logging tracking lock to commit the transaction. However, the
+flushing thread has the delayed logging tracking lock already held, and is
+trying to get the lock on object A to flush it to the log buffer. This appears
+to be an unsolvable deadlock condition, and it was solving this problem that
+was the barrier to implementing delayed logging for so long.
+
+The solution is relatively simple - it just took a long time to recognise it.
+Put simply, the current logging code formats the changes to each item into an
+vector array that points to the changed regions in the item. The log write code
+simply copies the memory these vectors point to into the log buffer during
+transaction commit while the item is locked in the transaction. Instead of
+using the log buffer as the destination of the formatting code, we can use an
+allocated memory buffer big enough to fit the formatted vector.
+
+If we then copy the vector into the memory buffer and rewrite the vector to
+point to the memory buffer rather than the object itself, we now have a copy of
+the changes in a format that is compatible with the log buffer writing code.
+that does not require us to lock the item to access. This formatting and
+rewriting can all be done while the object is locked during transaction commit,
+resulting in a vector that is transactionally consistent and can be accessed
+without needing to lock the owning item.
+
+Hence we avoid the need to lock items when we need to flush outstanding
+asynchronous transactions to the log. The differences between the existing
+formatting method and the delayed logging formatting can be seen in the
+diagram below.
+
+Current format log vector:
+
+Object    +---------------------------------------------+
+Vector 1      +----+
+Vector 2                    +----+
+Vector 3                                   +----------+
+
+After formatting:
+
+Log Buffer    +-V1-+-V2-+----V3----+
+
+Delayed logging vector:
+
+Object    +---------------------------------------------+
+Vector 1      +----+
+Vector 2                    +----+
+Vector 3                                   +----------+
+
+After formatting:
+
+Memory Buffer +-V1-+-V2-+----V3----+
+Vector 1      +----+
+Vector 2           +----+
+Vector 3                +----------+
+
+The memory buffer and associated vector need to be passed as a single object,
+but still need to be associated with the parent object so if the object is
+relogged we can replace the current memory buffer with a new memory buffer that
+contains the latest changes.
+
+The reason for keeping the vector around after we've formatted the memory
+buffer is to support splitting vectors across log buffer boundaries correctly.
+If we don't keep the vector around, we do not know where the region boundaries
+are in the item, so we'd need a new encapsulation method for regions in the log
+buffer writing (i.e. double encapsulation). This would be an on-disk format
+change and as such is not desirable.  It also means we'd have to write the log
+region headers in the formatting stage, which is problematic as there is per
+region state that needs to be placed into the headers during the log write.
+
+Hence we need to keep the vector, but by attaching the memory buffer to it and
+rewriting the vector addresses to point at the memory buffer we end up with a
+self-describing object that can be passed to the log buffer write code to be
+handled in exactly the same manner as the existing log vectors are handled.
+Hence we avoid needing a new on-disk format to handle items that have been
+relogged in memory.
+
+
+Tracking Changes
+
+Now that we can record transactional changes in memory in a form that allows
+them to be used without limitations, we need to be able to track and accumulate
+them so that they can be written to the log at some later point in time.  The
+log item is the natural place to store this vector and buffer, and also makes sense
+to be the object that is used to track committed objects as it will always
+exist once the object has been included in a transaction.
+
+The log item is already used to track the log items that have been written to
+the log but not yet written to disk. Such log items are considered "active"
+and as such are stored in the Active Item List (AIL) which is a LSN-ordered
+double linked list. Items are inserted into this list during log buffer IO
+completion, after which they are unpinned and can be written to disk. An object
+that is in the AIL can be relogged, which causes the object to be pinned again
+and then moved forward in the AIL when the log buffer IO completes for that
+transaction.
+
+Essentially, this shows that an item that is in the AIL can still be modified
+and relogged, so any tracking must be separate to the AIL infrastructure. As
+such, we cannot reuse the AIL list pointers for tracking committed items, nor
+can we store state in any field that is protected by the AIL lock. Hence the
+committed item tracking needs it's own locks, lists and state fields in the log
+item.
+
+Similar to the AIL, tracking of committed items is done through a new list
+called the Committed Item List (CIL).  The list tracks log items that have been
+committed and have formatted memory buffers attached to them. It tracks objects
+in transaction commit order, so when an object is relogged it is removed from
+it's place in the list and re-inserted at the tail. This is entirely arbitrary
+and done to make it easy for debugging - the last items in the list are the
+ones that are most recently modified. Ordering of the CIL is not necessary for
+transactional integrity (as discussed in the next section) so the ordering is
+done for convenience/sanity of the developers.
+
+
+Delayed Logging: Checkpoints
+
+When we have a log synchronisation event, commonly known as a "log force",
+all the items in the CIL must be written into the log via the log buffers.
+We need to write these items in the order that they exist in the CIL, and they
+need to be written as an atomic transaction. The need for all the objects to be
+written as an atomic transaction comes from the requirements of relogging and
+log replay - all the changes in all the objects in a given transaction must
+either be completely replayed during log recovery, or not replayed at all. If
+a transaction is not replayed because it is not complete in the log, then
+no later transactions should be replayed, either.
+
+To fulfill this requirement, we need to write the entire CIL in a single log
+transaction. Fortunately, the XFS log code has no fixed limit on the size of a
+transaction, nor does the log replay code. The only fundamental limit is that
+the transaction cannot be larger than just under half the size of the log.  The
+reason for this limit is that to find the head and tail of the log, there must
+be at least one complete transaction in the log at any given time. If a
+transaction is larger than half the log, then there is the possibility that a
+crash during the write of a such a transaction could partially overwrite the
+only complete previous transaction in the log. This will result in a recovery
+failure and an inconsistent filesystem and hence we must enforce the maximum
+size of a checkpoint to be slightly less than a half the log.
+
+Apart from this size requirement, a checkpoint transaction looks no different
+to any other transaction - it contains a transaction header, a series of
+formatted log items and a commit record at the tail. From a recovery
+perspective, the checkpoint transaction is also no different - just a lot
+bigger with a lot more items in it. The worst case effect of this is that we
+might need to tune the recovery transaction object hash size.
+
+Because the checkpoint is just another transaction and all the changes to log
+items are stored as log vectors, we can use the existing log buffer writing
+code to write the changes into the log. To do this efficiently, we need to
+minimise the time we hold the CIL locked while writing the checkpoint
+transaction. The current log write code enables us to do this easily with the
+way it separates the writing of the transaction contents (the log vectors) from
+the transaction commit record, but tracking this requires us to have a
+per-checkpoint context that travels through the log write process through to
+checkpoint completion.
+
+Hence a checkpoint has a context that tracks the state of the current
+checkpoint from initiation to checkpoint completion. A new context is initiated
+at the same time a checkpoint transaction is started. That is, when we remove
+all the current items from the CIL during a checkpoint operation, we move all
+those changes into the current checkpoint context. We then initialise a new
+context and attach that to the CIL for aggregation of new transactions.
+
+This allows us to unlock the CIL immediately after transfer of all the
+committed items and effectively allow new transactions to be issued while we
+are formatting the checkpoint into the log. It also allows concurrent
+checkpoints to be written into the log buffers in the case of log force heavy
+workloads, just like the existing transaction commit code does. This, however,
+requires that we strictly order the commit records in the log so that
+checkpoint sequence order is maintained during log replay.
+
+To ensure that we can be writing an item into a checkpoint transaction at
+the same time another transaction modifies the item and inserts the log item
+into the new CIL, then checkpoint transaction commit code cannot use log items
+to store the list of log vectors that need to be written into the transaction.
+Hence log vectors need to be able to be chained together to allow them to be
+detatched from the log items. That is, when the CIL is flushed the memory
+buffer and log vector attached to each log item needs to be attached to the
+checkpoint context so that the log item can be released. In diagrammatic form,
+the CIL would look like this before the flush:
+
+	CIL Head
+	   |
+	   V
+	Log Item <-> log vector 1	-> memory buffer
+	   |				-> vector array
+	   V
+	Log Item <-> log vector 2	-> memory buffer
+	   |				-> vector array
+	   V
+	......
+	   |
+	   V
+	Log Item <-> log vector N-1	-> memory buffer
+	   |				-> vector array
+	   V
+	Log Item <-> log vector N	-> memory buffer
+					-> vector array
+
+And after the flush the CIL head is empty, and the checkpoint context log
+vector list would look like:
+
+	Checkpoint Context
+	   |
+	   V
+	log vector 1	-> memory buffer
+	   |		-> vector array
+	   |		-> Log Item
+	   V
+	log vector 2	-> memory buffer
+	   |		-> vector array
+	   |		-> Log Item
+	   V
+	......
+	   |
+	   V
+	log vector N-1	-> memory buffer
+	   |		-> vector array
+	   |		-> Log Item
+	   V
+	log vector N	-> memory buffer
+			-> vector array
+			-> Log Item
+
+Once this transfer is done, the CIL can be unlocked and new transactions can
+start, while the checkpoint flush code works over the log vector chain to
+commit the checkpoint.
+
+Once the checkpoint is written into the log buffers, the checkpoint context is
+attached to the log buffer that the commit record was written to along with a
+completion callback. Log IO completion will call that callback, which can then
+run transaction committed processing for the log items (i.e. insert into AIL
+and unpin) in the log vector chain and then free the log vector chain and
+checkpoint context.
+
+Discussion Point: I am uncertain as to whether the log item is the most
+efficient way to track vectors, even though it seems like the natural way to do
+it. The fact that we walk the log items (in the CIL) just to chain the log
+vectors and break the link between the log item and the log vector means that
+we take a cache line hit for the log item list modification, then another for
+the log vector chaining. If we track by the log vectors, then we only need to
+break the link between the log item and the log vector, which means we should
+dirty only the log item cachelines. Normally I wouldn't be concerned about one
+vs two dirty cachelines except for the fact I've seen upwards of 80,000 log
+vectors in one checkpoint transaction. I'd guess this is a "measure and
+compare" situation that can be done after a working and reviewed implementation
+is in the dev tree....
+
+Delayed Logging: Checkpoint Sequencing
+
+One of the key aspects of the XFS transaction subsystem is that it tags
+committed transactions with the log sequence number of the transaction commit.
+This allows transactions to be issued asynchronously even though there may be
+future operations that cannot be completed until that transaction is fully
+committed to the log. In the rare case that a dependent operation occurs (e.g.
+re-using a freed metadata extent for a data extent), a special, optimised log
+force can be issued to force the dependent transaction to disk immediately.
+
+To do this, transactions need to record the LSN of the commit record of the
+transaction. This LSN comes directly from the log buffer the transaction is
+written into. While this works just fine for the existing transaction
+mechanism, it does not work for delayed logging because transactions are not
+written directly into the log buffers. Hence some other method of sequencing
+transactions is required.
+
+As discussed in the checkpoint section, delayed logging uses per-checkpoint
+contexts, and as such it is simple to assign a sequence number to each
+checkpoint. Because the switching of checkpoint contexts must be done
+atomically, it is simple to ensure that each new context has a monotonically
+increasing sequence number assigned to it without the need for an external
+atomic counter - we can just take the current context sequence number and add
+one to it for the new context.
+
+Then, instead of assigning a log buffer LSN to the transaction commit LSN
+during the commit, we can assign the current checkpoint sequence. This allows
+operations that track transactions that have not yet completed know what
+checkpoint sequence needs to be committed before they can continue. As a
+result, the code that forces the log to a specific LSN now needs to ensure that
+the log forces to a specific checkpoint.
+
+To ensure that we can do this, we need to track all the checkpoint contexts
+that are currently committing to the log. When we flush a checkpoint, the
+context gets added to a "committing" list which can be searched. When a
+checkpoint commit completes, it is removed from the committing list. Because
+the checkpoint context records the LSN of the commit record for the checkpoint,
+we can also wait on the log buffer that contains the commit record, thereby
+using the existing log force mechanisms to execute synchronous forces.
+
+It should be noted that the synchronous forces may need to be extended with
+mitigation algorithms similar to the current log buffer code to allow
+aggregation of multiple synchronous transactions if there are already
+synchronous transactions being flushed. Investigation of the performance of the
+current design is needed before making any decisions here.
+
+The main concern with log forces is to ensure that all the previous checkpoints
+are also committed to disk before the one we need to wait for. Therefore we
+need to check that all the prior contexts in the committing list are also
+complete before waiting on the one we need to complete. We do this
+synchronisation in the log force code so that we don't need to wait anywhere
+else for such serialisation - it only matters when we do a log force.
+
+The only remaining complexity is that a log force now also has to handle the
+case where the forcing sequence number is the same as the current context. That
+is, we need to flush the CIL and potentially wait for it to complete. This is a
+simple addition to the existing log forcing code to check the sequence numbers
+and push if required. Indeed, placing the current sequence checkpoint flush in
+the log force code enables the current mechanism for issuing synchronous
+transactions to remain untouched (i.e. commit an asynchronous transaction, then
+force the log at the LSN of that transaction) and so the higher level code
+behaves the same regardless of whether delayed logging is being used or not.
+
+Delayed Logging: Checkpoint Log Space Accounting
+
+The big issue for a checkpoint transaction is the log space reservation for the
+transaction. We don't know how big a checkpoint transaction is going to be
+ahead of time, nor how many log buffers it will take to write out, nor the
+number of split log vector regions are going to be used. We can track the
+amount of log space required as we add items to the commit item list, but we
+still need to reserve the space in the log for the checkpoint.
+
+A typical transaction reserves enough space in the log for the worst case space
+usage of the transaction. The reservation accounts for log record headers,
+transaction and region headers, headers for split regions, buffer tail padding,
+etc. as well as the actual space for all the changed metadata in the
+transaction. While some of this is fixed overhead, much of it is dependent on
+the size of the transaction and the number of regions being logged (the number
+of log vectors in the transaction).
+
+An example of the differences would be logging directory changes versus logging
+inode changes. If you modify lots of inode cores (e.g. chmod -R g+w *), then
+there are lots of transactions that only contain an inode core and an inode log
+format structure. That is, two vectors totaling roughly 150 bytes. If we modify
+10,000 inodes, we have about 1.5MB of metadata to write in 20,000 vectors. Each
+vector is 12 bytes, so the total to be logged is approximately 1.75MB. In
+comparison, if we are logging full directory buffers, they are typically 4KB
+each, so we in 1.5MB of directory buffers we'd have roughly 400 buffers and a
+buffer format structure for each buffer - roughly 800 vectors or 1.51MB total
+space.  From this, it should be obvious that a static log space reservation is
+not particularly flexible and is difficult to select the "optimal value" for
+all workloads.
+
+Further, if we are going to use a static reservation, which bit of the entire
+reservation does it cover? We account for space used by the transaction
+reservation by tracking the space currently used by the object in the CIL and
+then calculating the increase or decrease in space used as the object is
+relogged. This allows for a checkpoint reservation to only have to account for
+log buffer metadata used such as log header records.
+
+However, even using a static reservation for just the log metadata is
+problematic. Typically log record headers use at least 16KB of log space per
+1MB of log space consumed (512 bytes per 32k) and the reservation needs to be
+large enough to handle arbitrary sized checkpoint transactions. This
+reservation needs to be made before the checkpoint is started, and we need to
+be able to reserve the space without sleeping.  For a 8MB checkpoint, we need a
+reservation of around 150KB, which is a non-trivial amount of space.
+
+A static reservation needs to manipulate the log grant counters - we can take a
+permanent reservation on the space, but we still need to make sure we refresh
+the write reservation (the actual space available to the transaction) after
+every checkpoint transaction completion. Unfortunately, if this space is not
+available when required, then the regrant code will sleep waiting for it.
+
+The problem with this is that it can lead to deadlocks as we may need to commit
+checkpoints to be able to free up log space (refer back to the description of
+rolling transactions for an example of this).  Hence we *must* always have
+space available in the log if we are to use static reservations, and that is
+very difficult and complex to arrange. It is possible to do, but there is a
+simpler way.
+
+The simpler way of doing this is tracking the entire log space used by the
+items in the CIL and using this to dynamically calculate the amount of log
+space required by the log metadata. If this log metadata space changes as a
+result of a transaction commit inserting a new memory buffer into the CIL, then
+the difference in space required is removed from the transaction that causes
+the change. Transactions at this level will *always* have enough space
+available in their reservation for this as they have already reserved the
+maximal amount of log metadata space they require, and such a delta reservation
+will always be less than or equal to the maximal amount in the reservation.
+
+Hence we can grow the checkpoint transaction reservation dynamically as items
+are added to the CIL and avoid the need for reserving and regranting log space
+up front. This avoids deadlocks and removes a blocking point from the
+checkpoint flush code.
+
+As mentioned early, transactions can't grow to more than half the size of the
+log. Hence as part of the reservation growing, we need to also check the size
+of the reservation against the maximum allowed transaction size. If we reach
+the maximum threshold, we need to push the CIL to the log. This is effectively
+a "background flush" and is done on demand. This is identical to
+a CIL push triggered by a log force, only that there is no waiting for the
+checkpoint commit to complete. This background push is checked and executed by
+transaction commit code.
+
+If the transaction subsystem goes idle while we still have items in the CIL,
+they will be flushed by the periodic log force issued by the xfssyncd. This log
+force will push the CIL to disk, and if the transaction subsystem stays idle,
+allow the idle log to be covered (effectively marked clean) in exactly the same
+manner that is done for the existing logging method. A discussion point is
+whether this log force needs to be done more frequently than the current rate
+which is once every 30s.
+
+
+Delayed Logging: Log Item Pinning
+
+Currently log items are pinned during transaction commit while the items are
+still locked. This happens just after the items are formatted, though it could
+be done any time before the items are unlocked. The result of this mechanism is
+that items get pinned once for every transaction that is committed to the log
+buffers. Hence items that are relogged in the log buffers will have a pin count
+for every outstanding transaction they were dirtied in. When each of these
+transactions is completed, they will unpin the item once. As a result, the item
+only becomes unpinned when all the transactions complete and there are no
+pending transactions. Thus the pinning and unpinning of a log item is symmetric
+as there is a 1:1 relationship with transaction commit and log item completion.
+
+For delayed logging, however, we have an assymetric transaction commit to
+completion relationship. Every time an object is relogged in the CIL it goes
+through the commit process without a corresponding completion being registered.
+That is, we now have a many-to-one relationship between transaction commit and
+log item completion. The result of this is that pinning and unpinning of the
+log items becomes unbalanced if we retain the "pin on transaction commit, unpin
+on transaction completion" model.
+
+To keep pin/unpin symmetry, the algorithm needs to change to a "pin on
+insertion into the CIL, unpin on checkpoint completion". In other words, the
+pinning and unpinning becomes symmetric around a checkpoint context. We have to
+pin the object the first time it is inserted into the CIL - if it is already in
+the CIL during a transaction commit, then we do not pin it again. Because there
+can be multiple outstanding checkpoint contexts, we can still see elevated pin
+counts, but as each checkpoint completes the pin count will retain the correct
+value according to it's context.
+
+Just to make matters more slightly more complex, this checkpoint level context
+for the pin count means that the pinning of an item must take place under the
+CIL commit/flush lock. If we pin the object outside this lock, we cannot
+guarantee which context the pin count is associated with. This is because of
+the fact pinning the item is dependent on whether the item is present in the
+current CIL or not. If we don't pin the CIL first before we check and pin the
+object, we have a race with CIL being flushed between the check and the pin
+(or not pinning, as the case may be). Hence we must hold the CIL flush/commit
+lock to guarantee that we pin the items correctly.
+
+Delayed Logging: Concurrent Scalability
+
+A fundamental requirement for the CIL is that accesses through transaction
+commits must scale to many concurrent commits. The current transaction commit
+code does not break down even when there are transactions coming from 2048
+processors at once. The current transaction code does not go any faster than if
+there was only one CPU using it, but it does not slow down either.
+
+As a result, the delayed logging transaction commit code needs to be designed
+for concurrency from the ground up. It is obvious that there are serialisation
+points in the design - the three important ones are:
+
+	1. Locking out new transaction commits while flushing the CIL
+	2. Adding items to the CIL and updating item space accounting
+	3. Checkpoint commit ordering
+
+Looking at the transaction commit and CIL flushing interactions, it is clear
+that we have a many-to-one interaction here. That is, the only restriction on
+the number of concurrent transactions that can be trying to commit at once is
+the amount of space available in the log for their reservations. The practical
+limit here is in the order of several hundred concurrent transactions for a
+128MB log, which means that it is generally one per CPU in a machine.
+
+The amount of time a transaction commit needs to hold out a flush is a
+relatively long period of time - the pinning of log items needs to be done
+while we are holding out a CIL flush, so at the moment that means it is held
+across the formatting of the objects into memory buffers (i.e. while memcpy()s
+are in progress). Ultimately a two pass algorithm where the formatting is done
+separately to the pinning of objects could be used to reduce the hold time of
+the transaction commit side.
+
+Because of the number of potential transaction commit side holders, the lock
+really needs to be a sleeping lock - if the CIL flush takes the lock, we do not
+want every other CPU in the machine spinning on the CIL lock. Given that
+flushing the CIL could involve walking a list of tens of thousands of log
+items, it will get held for a significant time and so spin contention is a
+significant concern. Preventing lots of CPUs spinning doing nothing is the
+main reason for choosing a sleeping lock even though nothing in either the
+transaction commit or CIL flush side sleeps with the lock held.
+
+It should also be noted that CIL flushing is also a relatively rare operation
+compared to transaction commit for asynchronous transaction workloads - only
+time will tell if using a read-write semaphore for exclusion will limit
+transaction commit concurrency due to cache line bouncing of the lock on the
+read side.
+
+The second serialisation point is on the transaction commit side where items
+are inserted into the CIL. Because transactions can enter this code
+concurrently, the CIL needs to be protected separately from the above
+commit/flush exclusion. It also needs to be an exclusive lock but it is only
+held for a very short time and so a spin lock is appropriate here. It is
+possible that this lock will become a contention point, but given the short
+hold time once per transaction I think that contention is unlikely.
+
+The final serialisation point is the checkpoint commit record ordering code
+that is run as part of the checkpoint commit and log force sequencing. The code
+path that triggers a CIL flush (i.e. whatever triggers the log force) will enter
+an ordering loop after writing all the log vectors into the log buffers but
+before writing the commit record. This loop walks the list of committing
+checkpoints and needs to block waiting for checkpoints to complete their commit
+record write. As a result it needs a lock and a wait variable. Log force
+sequencing also requires the same lock, list walk, and blocking mechanism to
+ensure completion of checkpoints.
+
+These two sequencing operations can use the mechanism even though the
+events they are waiting for are different. The checkpoint commit record
+sequencing needs to wait until checkpoint contexts contain a commit LSN
+(obtained through completion of a commit record write) while log force
+sequencing needs to wait until previous checkpoint contexts are removed from
+the committing list (i.e. they've completed). A simple wait variable and
+broadcast wakeups (thundering herds) has been used to implement these two
+serialisation queues. They use the same lock as the CIL, too. If we see too
+much contention on the CIL lock, or too many context switches as a result of
+the broadcast wakeups these operations can be put under a new spinlock and
+given separate wait lists to reduce lock contention and the number of processes
+woken by the wrong event.
+
+
+Lifecycle Changes
+
+The existing log item life cycle is as follows:
+
+	1. Transaction allocate
+	2. Transaction reserve
+	3. Lock item
+	4. Join item to transaction
+		If not already attached,
+			Allocate log item
+			Attach log item to owner item
+		Attach log item to transaction
+	5. Modify item
+		Record modifications in log item
+	6. Transaction commit
+		Pin item in memory
+		Format item into log buffer
+		Write commit LSN into transaction
+		Unlock item
+		Attach transaction to log buffer
+
+	<log buffer IO dispatched>
+	<log buffer IO completes>
+
+	7. Transaction completion
+		Mark log item committed
+		Insert log item into AIL
+			Write commit LSN into log item
+		Unpin log item
+	8. AIL traversal
+		Lock item
+		Mark log item clean
+		Flush item to disk
+
+	<item IO completion>
+
+	9. Log item removed from AIL
+		Moves log tail
+		Item unlocked
+
+Essentially, steps 1-6 operate independently from step 7, which is also
+independent of steps 8-9. An item can be locked in steps 1-6 or steps 8-9
+at the same time step 7 is occurring, but only steps 1-6 or 8-9 can occur
+at the same time. If the log item is in the AIL or between steps 6 and 7
+and steps 1-6 are re-entered, then the item is relogged. Only when steps 8-9
+are entered and completed is the object considered clean.
+
+With delayed logging, there are new steps inserted into the life cycle:
+
+	1. Transaction allocate
+	2. Transaction reserve
+	3. Lock item
+	4. Join item to transaction
+		If not already attached,
+			Allocate log item
+			Attach log item to owner item
+		Attach log item to transaction
+	5. Modify item
+		Record modifications in log item
+	6. Transaction commit
+		Pin item in memory if not pinned in CIL
+		Format item into log vector + buffer
+		Attach log vector and buffer to log item
+		Insert log item into CIL
+		Write CIL context sequence into transaction
+		Unlock item
+
+	<next log force>
+
+	7. CIL push
+		lock CIL flush
+		Chain log vectors and buffers together
+		Remove items from CIL
+		unlock CIL flush
+		write log vectors into log
+		sequence commit records
+		attach checkpoint context to log buffer
+
+	<log buffer IO dispatched>
+	<log buffer IO completes>
+
+	8. Checkpoint completion
+		Mark log item committed
+		Insert item into AIL
+			Write commit LSN into log item
+		Unpin log item
+	9. AIL traversal
+		Lock item
+		Mark log item clean
+		Flush item to disk
+	<item IO completion>
+	10. Log item removed from AIL
+		Moves log tail
+		Item unlocked
+
+From this, it can be seen that the only life cycle differences between the two
+logging methods are in the middle of the life cycle - they still have the same
+beginning and end and execution constraints. The only differences are in the
+commiting of the log items to the log itself and the completion processing.
+Hence delayed logging should not introduce any constraints on log item
+behaviour, allocation or freeing that don't already exist.
+
+As a result of this zero-impact "insertion" of delayed logging infrastructure
+and the design of the internal structures to avoid on disk format changes, we
+can basically switch between delayed logging and the existing mechanism with a
+mount option. Fundamentally, there is no reason why the log manager would not
+be able to swap methods automatically and transparently depending on load
+characteristics, but this should not be necessary if delayed logging works as
+designed.
+
+Roadmap:
+
+2.6.35 Inclusion in mainline as an experimental mount option
+	=> approximately 2-3 months to merge window
+	=> needs to be in xfs-dev tree in 4-6 weeks
+	=> code is nearing readiness for review
+
+2.6.37 Remove experimental tag from mount option
+	=> should be roughly 6 months after initial merge
+	=> enough time to:
+		=> gain confidence and fix problems reported by early
+		   adopters (a.k.a. guinea pigs)
+		=> address worst performance regressions and undesired
+		   behaviours
+		=> start tuning/optimising code for parallelism
+		=> start tuning/optimising algorithms consuming
+		   excessive CPU time
+
+2.6.39 Switch default mount option to use delayed logging
+	=> should be roughly 12 months after initial merge
+	=> enough time to shake out remaining problems before next round of
+	   enterprise distro kernel rebases
diff --git a/fs/xfs/Makefile b/fs/xfs/Makefile
index b4769e40e8bc..c8fb13f83b3f 100644
--- a/fs/xfs/Makefile
+++ b/fs/xfs/Makefile
@@ -77,6 +77,7 @@ xfs-y				+= xfs_alloc.o \
 				   xfs_itable.o \
 				   xfs_dfrag.o \
 				   xfs_log.o \
+				   xfs_log_cil.o \
 				   xfs_log_recover.o \
 				   xfs_mount.o \
 				   xfs_mru_cache.o \
diff --git a/fs/xfs/linux-2.6/xfs_buf.c b/fs/xfs/linux-2.6/xfs_buf.c
index f01de3c55c43..649ade8ef598 100644
--- a/fs/xfs/linux-2.6/xfs_buf.c
+++ b/fs/xfs/linux-2.6/xfs_buf.c
@@ -37,6 +37,7 @@
 
 #include "xfs_sb.h"
 #include "xfs_inum.h"
+#include "xfs_log.h"
 #include "xfs_ag.h"
 #include "xfs_dmapi.h"
 #include "xfs_mount.h"
@@ -850,6 +851,12 @@ xfs_buf_lock_value(
  *	Note that this in no way locks the underlying pages, so it is only
  *	useful for synchronizing concurrent use of buffer objects, not for
  *	synchronizing independent access to the underlying pages.
+ *
+ *	If we come across a stale, pinned, locked buffer, we know that we
+ *	are being asked to lock a buffer that has been reallocated. Because
+ *	it is pinned, we know that the log has not been pushed to disk and
+ *	hence it will still be locked. Rather than sleeping until someone
+ *	else pushes the log, push it ourselves before trying to get the lock.
  */
 void
 xfs_buf_lock(
@@ -857,6 +864,8 @@ xfs_buf_lock(
 {
 	trace_xfs_buf_lock(bp, _RET_IP_);
 
+	if (atomic_read(&bp->b_pin_count) && (bp->b_flags & XBF_STALE))
+		xfs_log_force(bp->b_mount, 0);
 	if (atomic_read(&bp->b_io_remaining))
 		blk_run_address_space(bp->b_target->bt_mapping);
 	down(&bp->b_sema);
diff --git a/fs/xfs/linux-2.6/xfs_quotaops.c b/fs/xfs/linux-2.6/xfs_quotaops.c
index e31bf21fe5d3..9ac8aea91529 100644
--- a/fs/xfs/linux-2.6/xfs_quotaops.c
+++ b/fs/xfs/linux-2.6/xfs_quotaops.c
@@ -19,6 +19,7 @@
 #include "xfs_dmapi.h"
 #include "xfs_sb.h"
 #include "xfs_inum.h"
+#include "xfs_log.h"
 #include "xfs_ag.h"
 #include "xfs_mount.h"
 #include "xfs_quota.h"
diff --git a/fs/xfs/linux-2.6/xfs_super.c b/fs/xfs/linux-2.6/xfs_super.c
index f24dbe5efde3..f2d1718c9165 100644
--- a/fs/xfs/linux-2.6/xfs_super.c
+++ b/fs/xfs/linux-2.6/xfs_super.c
@@ -119,6 +119,8 @@ mempool_t *xfs_ioend_pool;
 #define MNTOPT_DMAPI	"dmapi"		/* DMI enabled (DMAPI / XDSM) */
 #define MNTOPT_XDSM	"xdsm"		/* DMI enabled (DMAPI / XDSM) */
 #define MNTOPT_DMI	"dmi"		/* DMI enabled (DMAPI / XDSM) */
+#define MNTOPT_DELAYLOG   "delaylog"	/* Delayed loging enabled */
+#define MNTOPT_NODELAYLOG "nodelaylog"	/* Delayed loging disabled */
 
 /*
  * Table driven mount option parser.
@@ -374,6 +376,13 @@ xfs_parseargs(
 			mp->m_flags |= XFS_MOUNT_DMAPI;
 		} else if (!strcmp(this_char, MNTOPT_DMI)) {
 			mp->m_flags |= XFS_MOUNT_DMAPI;
+		} else if (!strcmp(this_char, MNTOPT_DELAYLOG)) {
+			mp->m_flags |= XFS_MOUNT_DELAYLOG;
+			cmn_err(CE_WARN,
+				"Enabling EXPERIMENTAL delayed logging feature "
+				"- use at your own risk.\n");
+		} else if (!strcmp(this_char, MNTOPT_NODELAYLOG)) {
+			mp->m_flags &= ~XFS_MOUNT_DELAYLOG;
 		} else if (!strcmp(this_char, "ihashsize")) {
 			cmn_err(CE_WARN,
 	"XFS: ihashsize no longer used, option is deprecated.");
@@ -535,6 +544,7 @@ xfs_showargs(
 		{ XFS_MOUNT_FILESTREAMS,	"," MNTOPT_FILESTREAM },
 		{ XFS_MOUNT_DMAPI,		"," MNTOPT_DMAPI },
 		{ XFS_MOUNT_GRPID,		"," MNTOPT_GRPID },
+		{ XFS_MOUNT_DELAYLOG,		"," MNTOPT_DELAYLOG },
 		{ 0, NULL }
 	};
 	static struct proc_xfs_info xfs_info_unset[] = {
@@ -1755,7 +1765,7 @@ xfs_init_zones(void)
 	 * but it is much faster.
 	 */
 	xfs_buf_item_zone = kmem_zone_init((sizeof(xfs_buf_log_item_t) +
-				(((XFS_MAX_BLOCKSIZE / XFS_BLI_CHUNK) /
+				(((XFS_MAX_BLOCKSIZE / XFS_BLF_CHUNK) /
 				  NBWORD) * sizeof(int))), "xfs_buf_item");
 	if (!xfs_buf_item_zone)
 		goto out_destroy_trans_zone;
diff --git a/fs/xfs/linux-2.6/xfs_trace.h b/fs/xfs/linux-2.6/xfs_trace.h
index 8a319cfd2901..ff6bc797baf2 100644
--- a/fs/xfs/linux-2.6/xfs_trace.h
+++ b/fs/xfs/linux-2.6/xfs_trace.h
@@ -1059,83 +1059,112 @@ TRACE_EVENT(xfs_bunmap,
 
 );
 
+#define XFS_BUSY_SYNC \
+	{ 0,	"async" }, \
+	{ 1,	"sync" }
+
 TRACE_EVENT(xfs_alloc_busy,
-	TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno, xfs_agblock_t agbno,
-		 xfs_extlen_t len, int slot),
-	TP_ARGS(mp, agno, agbno, len, slot),
+	TP_PROTO(struct xfs_trans *trans, xfs_agnumber_t agno,
+		 xfs_agblock_t agbno, xfs_extlen_t len, int sync),
+	TP_ARGS(trans, agno, agbno, len, sync),
 	TP_STRUCT__entry(
 		__field(dev_t, dev)
+		__field(struct xfs_trans *, tp)
+		__field(int, tid)
 		__field(xfs_agnumber_t, agno)
 		__field(xfs_agblock_t, agbno)
 		__field(xfs_extlen_t, len)
-		__field(int, slot)
+		__field(int, sync)
 	),
 	TP_fast_assign(
-		__entry->dev = mp->m_super->s_dev;
+		__entry->dev = trans->t_mountp->m_super->s_dev;
+		__entry->tp = trans;
+		__entry->tid = trans->t_ticket->t_tid;
 		__entry->agno = agno;
 		__entry->agbno = agbno;
 		__entry->len = len;
-		__entry->slot = slot;
+		__entry->sync = sync;
 	),
-	TP_printk("dev %d:%d agno %u agbno %u len %u slot %d",
+	TP_printk("dev %d:%d trans 0x%p tid 0x%x agno %u agbno %u len %u %s",
 		  MAJOR(__entry->dev), MINOR(__entry->dev),
+		  __entry->tp,
+		  __entry->tid,
 		  __entry->agno,
 		  __entry->agbno,
 		  __entry->len,
-		  __entry->slot)
+		  __print_symbolic(__entry->sync, XFS_BUSY_SYNC))
 
 );
 
-#define XFS_BUSY_STATES \
-	{ 0,	"found" }, \
-	{ 1,	"missing" }
-
 TRACE_EVENT(xfs_alloc_unbusy,
 	TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno,
-		 int slot, int found),
-	TP_ARGS(mp, agno, slot, found),
+		 xfs_agblock_t agbno, xfs_extlen_t len),
+	TP_ARGS(mp, agno, agbno, len),
 	TP_STRUCT__entry(
 		__field(dev_t, dev)
 		__field(xfs_agnumber_t, agno)
-		__field(int, slot)
-		__field(int, found)
+		__field(xfs_agblock_t, agbno)
+		__field(xfs_extlen_t, len)
 	),
 	TP_fast_assign(
 		__entry->dev = mp->m_super->s_dev;
 		__entry->agno = agno;
-		__entry->slot = slot;
-		__entry->found = found;
+		__entry->agbno = agbno;
+		__entry->len = len;
 	),
-	TP_printk("dev %d:%d agno %u slot %d %s",
+	TP_printk("dev %d:%d agno %u agbno %u len %u",
 		  MAJOR(__entry->dev), MINOR(__entry->dev),
 		  __entry->agno,
-		  __entry->slot,
-		  __print_symbolic(__entry->found, XFS_BUSY_STATES))
+		  __entry->agbno,
+		  __entry->len)
 );
 
+#define XFS_BUSY_STATES \
+	{ 0,	"missing" }, \
+	{ 1,	"found" }
+
 TRACE_EVENT(xfs_alloc_busysearch,
-	TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno, xfs_agblock_t agbno,
-		 xfs_extlen_t len, xfs_lsn_t lsn),
-	TP_ARGS(mp, agno, agbno, len, lsn),
+	TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno,
+		 xfs_agblock_t agbno, xfs_extlen_t len, int found),
+	TP_ARGS(mp, agno, agbno, len, found),
 	TP_STRUCT__entry(
 		__field(dev_t, dev)
 		__field(xfs_agnumber_t, agno)
 		__field(xfs_agblock_t, agbno)
 		__field(xfs_extlen_t, len)
-		__field(xfs_lsn_t, lsn)
+		__field(int, found)
 	),
 	TP_fast_assign(
 		__entry->dev = mp->m_super->s_dev;
 		__entry->agno = agno;
 		__entry->agbno = agbno;
 		__entry->len = len;
-		__entry->lsn = lsn;
+		__entry->found = found;
 	),
-	TP_printk("dev %d:%d agno %u agbno %u len %u force lsn 0x%llx",
+	TP_printk("dev %d:%d agno %u agbno %u len %u %s",
 		  MAJOR(__entry->dev), MINOR(__entry->dev),
 		  __entry->agno,
 		  __entry->agbno,
 		  __entry->len,
+		  __print_symbolic(__entry->found, XFS_BUSY_STATES))
+);
+
+TRACE_EVENT(xfs_trans_commit_lsn,
+	TP_PROTO(struct xfs_trans *trans),
+	TP_ARGS(trans),
+	TP_STRUCT__entry(
+		__field(dev_t, dev)
+		__field(struct xfs_trans *, tp)
+		__field(xfs_lsn_t, lsn)
+	),
+	TP_fast_assign(
+		__entry->dev = trans->t_mountp->m_super->s_dev;
+		__entry->tp = trans;
+		__entry->lsn = trans->t_commit_lsn;
+	),
+	TP_printk("dev %d:%d trans 0x%p commit_lsn 0x%llx",
+		  MAJOR(__entry->dev), MINOR(__entry->dev),
+		  __entry->tp,
 		  __entry->lsn)
 );
 
diff --git a/fs/xfs/quota/xfs_dquot.c b/fs/xfs/quota/xfs_dquot.c
index b89ec5df0129..585e7633dfc7 100644
--- a/fs/xfs/quota/xfs_dquot.c
+++ b/fs/xfs/quota/xfs_dquot.c
@@ -344,9 +344,9 @@ xfs_qm_init_dquot_blk(
 	for (i = 0; i < q->qi_dqperchunk; i++, d++, curid++)
 		xfs_qm_dqinit_core(curid, type, d);
 	xfs_trans_dquot_buf(tp, bp,
-			    (type & XFS_DQ_USER ? XFS_BLI_UDQUOT_BUF :
-			    ((type & XFS_DQ_PROJ) ? XFS_BLI_PDQUOT_BUF :
-			     XFS_BLI_GDQUOT_BUF)));
+			    (type & XFS_DQ_USER ? XFS_BLF_UDQUOT_BUF :
+			    ((type & XFS_DQ_PROJ) ? XFS_BLF_PDQUOT_BUF :
+			     XFS_BLF_GDQUOT_BUF)));
 	xfs_trans_log_buf(tp, bp, 0, BBTOB(q->qi_dqchunklen) - 1);
 }
 
diff --git a/fs/xfs/xfs_ag.h b/fs/xfs/xfs_ag.h
index abb8222b88c9..401f364ad36c 100644
--- a/fs/xfs/xfs_ag.h
+++ b/fs/xfs/xfs_ag.h
@@ -175,14 +175,20 @@ typedef struct xfs_agfl {
 } xfs_agfl_t;
 
 /*
- * Busy block/extent entry.  Used in perag to mark blocks that have been freed
- * but whose transactions aren't committed to disk yet.
+ * Busy block/extent entry.  Indexed by a rbtree in perag to mark blocks that
+ * have been freed but whose transactions aren't committed to disk yet.
+ *
+ * Note that we use the transaction ID to record the transaction, not the
+ * transaction structure itself. See xfs_alloc_busy_insert() for details.
  */
-typedef struct xfs_perag_busy {
-	xfs_agblock_t	busy_start;
-	xfs_extlen_t	busy_length;
-	struct xfs_trans *busy_tp;	/* transaction that did the free */
-} xfs_perag_busy_t;
+struct xfs_busy_extent {
+	struct rb_node	rb_node;	/* ag by-bno indexed search tree */
+	struct list_head list;		/* transaction busy extent list */
+	xfs_agnumber_t	agno;
+	xfs_agblock_t	bno;
+	xfs_extlen_t	length;
+	xlog_tid_t	tid;		/* transaction that created this */
+};
 
 /*
  * Per-ag incore structure, copies of information in agf and agi,
@@ -216,7 +222,8 @@ typedef struct xfs_perag {
 	xfs_agino_t	pagl_leftrec;
 	xfs_agino_t	pagl_rightrec;
 #ifdef __KERNEL__
-	spinlock_t	pagb_lock;	/* lock for pagb_list */
+	spinlock_t	pagb_lock;	/* lock for pagb_tree */
+	struct rb_root	pagb_tree;	/* ordered tree of busy extents */
 
 	atomic_t        pagf_fstrms;    /* # of filestreams active in this AG */
 
@@ -226,7 +233,6 @@ typedef struct xfs_perag {
 	int		pag_ici_reclaimable;	/* reclaimable inodes */
 #endif
 	int		pagb_count;	/* pagb slots in use */
-	xfs_perag_busy_t pagb_list[XFS_PAGB_NUM_SLOTS];	/* unstable blocks */
 } xfs_perag_t;
 
 /*
diff --git a/fs/xfs/xfs_alloc.c b/fs/xfs/xfs_alloc.c
index 94cddbfb2560..a7fbe8a99b12 100644
--- a/fs/xfs/xfs_alloc.c
+++ b/fs/xfs/xfs_alloc.c
@@ -46,11 +46,9 @@
 #define	XFSA_FIXUP_BNO_OK	1
 #define	XFSA_FIXUP_CNT_OK	2
 
-STATIC void
-xfs_alloc_search_busy(xfs_trans_t *tp,
-		    xfs_agnumber_t agno,
-		    xfs_agblock_t bno,
-		    xfs_extlen_t len);
+static int
+xfs_alloc_busy_search(struct xfs_mount *mp, xfs_agnumber_t agno,
+		    xfs_agblock_t bno, xfs_extlen_t len);
 
 /*
  * Prototypes for per-ag allocation routines
@@ -540,9 +538,16 @@ xfs_alloc_ag_vextent(
 				be32_to_cpu(agf->agf_length));
 			xfs_alloc_log_agf(args->tp, args->agbp,
 						XFS_AGF_FREEBLKS);
-			/* search the busylist for these blocks */
-			xfs_alloc_search_busy(args->tp, args->agno,
-					args->agbno, args->len);
+			/*
+			 * Search the busylist for these blocks and mark the
+			 * transaction as synchronous if blocks are found. This
+			 * avoids the need to block due to a synchronous log
+			 * force to ensure correct ordering as the synchronous
+			 * transaction will guarantee that for us.
+			 */
+			if (xfs_alloc_busy_search(args->mp, args->agno,
+						args->agbno, args->len))
+				xfs_trans_set_sync(args->tp);
 		}
 		if (!args->isfl)
 			xfs_trans_mod_sb(args->tp,
@@ -1693,7 +1698,7 @@ xfs_free_ag_extent(
 	 * when the iclog commits to disk.  If a busy block is allocated,
 	 * the iclog is pushed up to the LSN that freed the block.
 	 */
-	xfs_alloc_mark_busy(tp, agno, bno, len);
+	xfs_alloc_busy_insert(tp, agno, bno, len);
 	return 0;
 
  error0:
@@ -1989,14 +1994,20 @@ xfs_alloc_get_freelist(
 	*bnop = bno;
 
 	/*
-	 * As blocks are freed, they are added to the per-ag busy list
-	 * and remain there until the freeing transaction is committed to
-	 * disk.  Now that we have allocated blocks, this list must be
-	 * searched to see if a block is being reused.  If one is, then
-	 * the freeing transaction must be pushed to disk NOW by forcing
-	 * to disk all iclogs up that transaction's LSN.
+	 * As blocks are freed, they are added to the per-ag busy list and
+	 * remain there until the freeing transaction is committed to disk.
+	 * Now that we have allocated blocks, this list must be searched to see
+	 * if a block is being reused.  If one is, then the freeing transaction
+	 * must be pushed to disk before this transaction.
+	 *
+	 * We do this by setting the current transaction to a sync transaction
+	 * which guarantees that the freeing transaction is on disk before this
+	 * transaction. This is done instead of a synchronous log force here so
+	 * that we don't sit and wait with the AGF locked in the transaction
+	 * during the log force.
 	 */
-	xfs_alloc_search_busy(tp, be32_to_cpu(agf->agf_seqno), bno, 1);
+	if (xfs_alloc_busy_search(mp, be32_to_cpu(agf->agf_seqno), bno, 1))
+		xfs_trans_set_sync(tp);
 	return 0;
 }
 
@@ -2201,7 +2212,7 @@ xfs_alloc_read_agf(
 			be32_to_cpu(agf->agf_levels[XFS_BTNUM_CNTi]);
 		spin_lock_init(&pag->pagb_lock);
 		pag->pagb_count = 0;
-		memset(pag->pagb_list, 0, sizeof(pag->pagb_list));
+		pag->pagb_tree = RB_ROOT;
 		pag->pagf_init = 1;
 	}
 #ifdef DEBUG
@@ -2479,127 +2490,263 @@ error0:
  * list is reused, the transaction that freed it must be forced to disk
  * before continuing to use the block.
  *
- * xfs_alloc_mark_busy - add to the per-ag busy list
- * xfs_alloc_clear_busy - remove an item from the per-ag busy list
+ * xfs_alloc_busy_insert - add to the per-ag busy list
+ * xfs_alloc_busy_clear - remove an item from the per-ag busy list
+ * xfs_alloc_busy_search - search for a busy extent
+ */
+
+/*
+ * Insert a new extent into the busy tree.
+ *
+ * The busy extent tree is indexed by the start block of the busy extent.
+ * there can be multiple overlapping ranges in the busy extent tree but only
+ * ever one entry at a given start block. The reason for this is that
+ * multi-block extents can be freed, then smaller chunks of that extent
+ * allocated and freed again before the first transaction commit is on disk.
+ * If the exact same start block is freed a second time, we have to wait for
+ * that busy extent to pass out of the tree before the new extent is inserted.
+ * There are two main cases we have to handle here.
+ *
+ * The first case is a transaction that triggers a "free - allocate - free"
+ * cycle. This can occur during btree manipulations as a btree block is freed
+ * to the freelist, then allocated from the free list, then freed again. In
+ * this case, the second extxpnet free is what triggers the duplicate and as
+ * such the transaction IDs should match. Because the extent was allocated in
+ * this transaction, the transaction must be marked as synchronous. This is
+ * true for all cases where the free/alloc/free occurs in the one transaction,
+ * hence the addition of the ASSERT(tp->t_flags & XFS_TRANS_SYNC) to this case.
+ * This serves to catch violations of the second case quite effectively.
+ *
+ * The second case is where the free/alloc/free occur in different
+ * transactions. In this case, the thread freeing the extent the second time
+ * can't mark the extent busy immediately because it is already tracked in a
+ * transaction that may be committing.  When the log commit for the existing
+ * busy extent completes, the busy extent will be removed from the tree. If we
+ * allow the second busy insert to continue using that busy extent structure,
+ * it can be freed before this transaction is safely in the log.  Hence our
+ * only option in this case is to force the log to remove the existing busy
+ * extent from the list before we insert the new one with the current
+ * transaction ID.
+ *
+ * The problem we are trying to avoid in the free-alloc-free in separate
+ * transactions is most easily described with a timeline:
+ *
+ *      Thread 1	Thread 2	Thread 3	xfslogd
+ *	xact alloc
+ *	free X
+ *	mark busy
+ *	commit xact
+ *	free xact
+ *			xact alloc
+ *			alloc X
+ *			busy search
+ *			mark xact sync
+ *			commit xact
+ *			free xact
+ *			force log
+ *			checkpoint starts
+ *			....
+ *					xact alloc
+ *					free X
+ *					mark busy
+ *					finds match
+ *					*** KABOOM! ***
+ *					....
+ *							log IO completes
+ *							unbusy X
+ *			checkpoint completes
+ *
+ * By issuing a log force in thread 3 @ "KABOOM", the thread will block until
+ * the checkpoint completes, and the busy extent it matched will have been
+ * removed from the tree when it is woken. Hence it can then continue safely.
+ *
+ * However, to ensure this matching process is robust, we need to use the
+ * transaction ID for identifying transaction, as delayed logging results in
+ * the busy extent and transaction lifecycles being different. i.e. the busy
+ * extent is active for a lot longer than the transaction.  Hence the
+ * transaction structure can be freed and reallocated, then mark the same
+ * extent busy again in the new transaction. In this case the new transaction
+ * will have a different tid but can have the same address, and hence we need
+ * to check against the tid.
+ *
+ * Future: for delayed logging, we could avoid the log force if the extent was
+ * first freed in the current checkpoint sequence. This, however, requires the
+ * ability to pin the current checkpoint in memory until this transaction
+ * commits to ensure that both the original free and the current one combine
+ * logically into the one checkpoint. If the checkpoint sequences are
+ * different, however, we still need to wait on a log force.
  */
 void
-xfs_alloc_mark_busy(xfs_trans_t *tp,
-		    xfs_agnumber_t agno,
-		    xfs_agblock_t bno,
-		    xfs_extlen_t len)
+xfs_alloc_busy_insert(
+	struct xfs_trans	*tp,
+	xfs_agnumber_t		agno,
+	xfs_agblock_t		bno,
+	xfs_extlen_t		len)
 {
-	xfs_perag_busy_t	*bsy;
+	struct xfs_busy_extent	*new;
+	struct xfs_busy_extent	*busyp;
 	struct xfs_perag	*pag;
-	int			n;
+	struct rb_node		**rbp;
+	struct rb_node		*parent;
+	int			match;
 
-	pag = xfs_perag_get(tp->t_mountp, agno);
-	spin_lock(&pag->pagb_lock);
 
-	/* search pagb_list for an open slot */
-	for (bsy = pag->pagb_list, n = 0;
-	     n < XFS_PAGB_NUM_SLOTS;
-	     bsy++, n++) {
-		if (bsy->busy_tp == NULL) {
-			break;
-		}
+	new = kmem_zalloc(sizeof(struct xfs_busy_extent), KM_MAYFAIL);
+	if (!new) {
+		/*
+		 * No Memory!  Since it is now not possible to track the free
+		 * block, make this a synchronous transaction to insure that
+		 * the block is not reused before this transaction commits.
+		 */
+		trace_xfs_alloc_busy(tp, agno, bno, len, 1);
+		xfs_trans_set_sync(tp);
+		return;
 	}
 
-	trace_xfs_alloc_busy(tp->t_mountp, agno, bno, len, n);
+	new->agno = agno;
+	new->bno = bno;
+	new->length = len;
+	new->tid = xfs_log_get_trans_ident(tp);
 
-	if (n < XFS_PAGB_NUM_SLOTS) {
-		bsy = &pag->pagb_list[n];
-		pag->pagb_count++;
-		bsy->busy_start = bno;
-		bsy->busy_length = len;
-		bsy->busy_tp = tp;
-		xfs_trans_add_busy(tp, agno, n);
-	} else {
+	INIT_LIST_HEAD(&new->list);
+
+	/* trace before insert to be able to see failed inserts */
+	trace_xfs_alloc_busy(tp, agno, bno, len, 0);
+
+	pag = xfs_perag_get(tp->t_mountp, new->agno);
+restart:
+	spin_lock(&pag->pagb_lock);
+	rbp = &pag->pagb_tree.rb_node;
+	parent = NULL;
+	busyp = NULL;
+	match = 0;
+	while (*rbp && match >= 0) {
+		parent = *rbp;
+		busyp = rb_entry(parent, struct xfs_busy_extent, rb_node);
+
+		if (new->bno < busyp->bno) {
+			/* may overlap, but exact start block is lower */
+			rbp = &(*rbp)->rb_left;
+			if (new->bno + new->length > busyp->bno)
+				match = busyp->tid == new->tid ? 1 : -1;
+		} else if (new->bno > busyp->bno) {
+			/* may overlap, but exact start block is higher */
+			rbp = &(*rbp)->rb_right;
+			if (bno < busyp->bno + busyp->length)
+				match = busyp->tid == new->tid ? 1 : -1;
+		} else {
+			match = busyp->tid == new->tid ? 1 : -1;
+			break;
+		}
+	}
+	if (match < 0) {
+		/* overlap marked busy in different transaction */
+		spin_unlock(&pag->pagb_lock);
+		xfs_log_force(tp->t_mountp, XFS_LOG_SYNC);
+		goto restart;
+	}
+	if (match > 0) {
 		/*
-		 * The busy list is full!  Since it is now not possible to
-		 * track the free block, make this a synchronous transaction
-		 * to insure that the block is not reused before this
-		 * transaction commits.
+		 * overlap marked busy in same transaction. Update if exact
+		 * start block match, otherwise combine the busy extents into
+		 * a single range.
 		 */
-		xfs_trans_set_sync(tp);
-	}
+		if (busyp->bno == new->bno) {
+			busyp->length = max(busyp->length, new->length);
+			spin_unlock(&pag->pagb_lock);
+			ASSERT(tp->t_flags & XFS_TRANS_SYNC);
+			xfs_perag_put(pag);
+			kmem_free(new);
+			return;
+		}
+		rb_erase(&busyp->rb_node, &pag->pagb_tree);
+		new->length = max(busyp->bno + busyp->length,
+					new->bno + new->length) -
+				min(busyp->bno, new->bno);
+		new->bno = min(busyp->bno, new->bno);
+	} else
+		busyp = NULL;
 
+	rb_link_node(&new->rb_node, parent, rbp);
+	rb_insert_color(&new->rb_node, &pag->pagb_tree);
+
+	list_add(&new->list, &tp->t_busy);
 	spin_unlock(&pag->pagb_lock);
 	xfs_perag_put(pag);
+	kmem_free(busyp);
 }
 
-void
-xfs_alloc_clear_busy(xfs_trans_t *tp,
-		     xfs_agnumber_t agno,
-		     int idx)
+/*
+ * Search for a busy extent within the range of the extent we are about to
+ * allocate.  You need to be holding the busy extent tree lock when calling
+ * xfs_alloc_busy_search(). This function returns 0 for no overlapping busy
+ * extent, -1 for an overlapping but not exact busy extent, and 1 for an exact
+ * match. This is done so that a non-zero return indicates an overlap that
+ * will require a synchronous transaction, but it can still be
+ * used to distinguish between a partial or exact match.
+ */
+static int
+xfs_alloc_busy_search(
+	struct xfs_mount	*mp,
+	xfs_agnumber_t		agno,
+	xfs_agblock_t		bno,
+	xfs_extlen_t		len)
 {
 	struct xfs_perag	*pag;
-	xfs_perag_busy_t	*list;
+	struct rb_node		*rbp;
+	struct xfs_busy_extent	*busyp;
+	int			match = 0;
 
-	ASSERT(idx < XFS_PAGB_NUM_SLOTS);
-	pag = xfs_perag_get(tp->t_mountp, agno);
+	pag = xfs_perag_get(mp, agno);
 	spin_lock(&pag->pagb_lock);
-	list = pag->pagb_list;
 
-	trace_xfs_alloc_unbusy(tp->t_mountp, agno, idx, list[idx].busy_tp == tp);
-
-	if (list[idx].busy_tp == tp) {
-		list[idx].busy_tp = NULL;
-		pag->pagb_count--;
+	rbp = pag->pagb_tree.rb_node;
+
+	/* find closest start bno overlap */
+	while (rbp) {
+		busyp = rb_entry(rbp, struct xfs_busy_extent, rb_node);
+		if (bno < busyp->bno) {
+			/* may overlap, but exact start block is lower */
+			if (bno + len > busyp->bno)
+				match = -1;
+			rbp = rbp->rb_left;
+		} else if (bno > busyp->bno) {
+			/* may overlap, but exact start block is higher */
+			if (bno < busyp->bno + busyp->length)
+				match = -1;
+			rbp = rbp->rb_right;
+		} else {
+			/* bno matches busyp, length determines exact match */
+			match = (busyp->length == len) ? 1 : -1;
+			break;
+		}
 	}
-
 	spin_unlock(&pag->pagb_lock);
+	trace_xfs_alloc_busysearch(mp, agno, bno, len, !!match);
 	xfs_perag_put(pag);
+	return match;
 }
 
-
-/*
- * If we find the extent in the busy list, force the log out to get the
- * extent out of the busy list so the caller can use it straight away.
- */
-STATIC void
-xfs_alloc_search_busy(xfs_trans_t *tp,
-		    xfs_agnumber_t agno,
-		    xfs_agblock_t bno,
-		    xfs_extlen_t len)
+void
+xfs_alloc_busy_clear(
+	struct xfs_mount	*mp,
+	struct xfs_busy_extent	*busyp)
 {
 	struct xfs_perag	*pag;
-	xfs_perag_busy_t	*bsy;
-	xfs_agblock_t		uend, bend;
-	xfs_lsn_t		lsn = 0;
-	int			cnt;
 
-	pag = xfs_perag_get(tp->t_mountp, agno);
-	spin_lock(&pag->pagb_lock);
-	cnt = pag->pagb_count;
+	trace_xfs_alloc_unbusy(mp, busyp->agno, busyp->bno,
+						busyp->length);
 
-	/*
-	 * search pagb_list for this slot, skipping open slots. We have to
-	 * search the entire array as there may be multiple overlaps and
-	 * we have to get the most recent LSN for the log force to push out
-	 * all the transactions that span the range.
-	 */
-	uend = bno + len - 1;
-	for (cnt = 0; cnt < pag->pagb_count; cnt++) {
-		bsy = &pag->pagb_list[cnt];
-		if (!bsy->busy_tp)
-			continue;
+	ASSERT(xfs_alloc_busy_search(mp, busyp->agno, busyp->bno,
+						busyp->length) == 1);
 
-		bend = bsy->busy_start + bsy->busy_length - 1;
-		if (bno > bend || uend < bsy->busy_start)
-			continue;
+	list_del_init(&busyp->list);
 
-		/* (start1,length1) within (start2, length2) */
-		if (XFS_LSN_CMP(bsy->busy_tp->t_commit_lsn, lsn) > 0)
-			lsn = bsy->busy_tp->t_commit_lsn;
-	}
+	pag = xfs_perag_get(mp, busyp->agno);
+	spin_lock(&pag->pagb_lock);
+	rb_erase(&busyp->rb_node, &pag->pagb_tree);
 	spin_unlock(&pag->pagb_lock);
 	xfs_perag_put(pag);
-	trace_xfs_alloc_busysearch(tp->t_mountp, agno, bno, len, lsn);
 
-	/*
-	 * If a block was found, force the log through the LSN of the
-	 * transaction that freed the block
-	 */
-	if (lsn)
-		xfs_log_force_lsn(tp->t_mountp, lsn, XFS_LOG_SYNC);
+	kmem_free(busyp);
 }
diff --git a/fs/xfs/xfs_alloc.h b/fs/xfs/xfs_alloc.h
index 599bffa39784..6d05199b667c 100644
--- a/fs/xfs/xfs_alloc.h
+++ b/fs/xfs/xfs_alloc.h
@@ -22,6 +22,7 @@ struct xfs_buf;
 struct xfs_mount;
 struct xfs_perag;
 struct xfs_trans;
+struct xfs_busy_extent;
 
 /*
  * Freespace allocation types.  Argument to xfs_alloc_[v]extent.
@@ -119,15 +120,13 @@ xfs_alloc_longest_free_extent(struct xfs_mount *mp,
 #ifdef __KERNEL__
 
 void
-xfs_alloc_mark_busy(xfs_trans_t *tp,
+xfs_alloc_busy_insert(xfs_trans_t *tp,
 		xfs_agnumber_t agno,
 		xfs_agblock_t bno,
 		xfs_extlen_t len);
 
 void
-xfs_alloc_clear_busy(xfs_trans_t *tp,
-		xfs_agnumber_t ag,
-		int idx);
+xfs_alloc_busy_clear(struct xfs_mount *mp, struct xfs_busy_extent *busyp);
 
 #endif	/* __KERNEL__ */
 
diff --git a/fs/xfs/xfs_alloc_btree.c b/fs/xfs/xfs_alloc_btree.c
index b726e10d2c1c..83f494218759 100644
--- a/fs/xfs/xfs_alloc_btree.c
+++ b/fs/xfs/xfs_alloc_btree.c
@@ -134,7 +134,7 @@ xfs_allocbt_free_block(
 	 * disk. If a busy block is allocated, the iclog is pushed up to the
 	 * LSN that freed the block.
 	 */
-	xfs_alloc_mark_busy(cur->bc_tp, be32_to_cpu(agf->agf_seqno), bno, 1);
+	xfs_alloc_busy_insert(cur->bc_tp, be32_to_cpu(agf->agf_seqno), bno, 1);
 	xfs_trans_agbtree_delta(cur->bc_tp, -1);
 	return 0;
 }
diff --git a/fs/xfs/xfs_buf_item.c b/fs/xfs/xfs_buf_item.c
index 240340a4727b..02a80984aa05 100644
--- a/fs/xfs/xfs_buf_item.c
+++ b/fs/xfs/xfs_buf_item.c
@@ -64,7 +64,7 @@ xfs_buf_item_log_debug(
 	nbytes = last - first + 1;
 	bfset(bip->bli_logged, first, nbytes);
 	for (x = 0; x < nbytes; x++) {
-		chunk_num = byte >> XFS_BLI_SHIFT;
+		chunk_num = byte >> XFS_BLF_SHIFT;
 		word_num = chunk_num >> BIT_TO_WORD_SHIFT;
 		bit_num = chunk_num & (NBWORD - 1);
 		wordp = &(bip->bli_format.blf_data_map[word_num]);
@@ -166,7 +166,7 @@ xfs_buf_item_size(
 		 * cancel flag in it.
 		 */
 		trace_xfs_buf_item_size_stale(bip);
-		ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
+		ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
 		return 1;
 	}
 
@@ -197,9 +197,9 @@ xfs_buf_item_size(
 		} else if (next_bit != last_bit + 1) {
 			last_bit = next_bit;
 			nvecs++;
-		} else if (xfs_buf_offset(bp, next_bit * XFS_BLI_CHUNK) !=
-			   (xfs_buf_offset(bp, last_bit * XFS_BLI_CHUNK) +
-			    XFS_BLI_CHUNK)) {
+		} else if (xfs_buf_offset(bp, next_bit * XFS_BLF_CHUNK) !=
+			   (xfs_buf_offset(bp, last_bit * XFS_BLF_CHUNK) +
+			    XFS_BLF_CHUNK)) {
 			last_bit = next_bit;
 			nvecs++;
 		} else {
@@ -254,6 +254,20 @@ xfs_buf_item_format(
 	vecp++;
 	nvecs = 1;
 
+	/*
+	 * If it is an inode buffer, transfer the in-memory state to the
+	 * format flags and clear the in-memory state. We do not transfer
+	 * this state if the inode buffer allocation has not yet been committed
+	 * to the log as setting the XFS_BLI_INODE_BUF flag will prevent
+	 * correct replay of the inode allocation.
+	 */
+	if (bip->bli_flags & XFS_BLI_INODE_BUF) {
+		if (!((bip->bli_flags & XFS_BLI_INODE_ALLOC_BUF) &&
+		      xfs_log_item_in_current_chkpt(&bip->bli_item)))
+			bip->bli_format.blf_flags |= XFS_BLF_INODE_BUF;
+		bip->bli_flags &= ~XFS_BLI_INODE_BUF;
+	}
+
 	if (bip->bli_flags & XFS_BLI_STALE) {
 		/*
 		 * The buffer is stale, so all we need to log
@@ -261,7 +275,7 @@ xfs_buf_item_format(
 		 * cancel flag in it.
 		 */
 		trace_xfs_buf_item_format_stale(bip);
-		ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
+		ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
 		bip->bli_format.blf_size = nvecs;
 		return;
 	}
@@ -294,28 +308,28 @@ xfs_buf_item_format(
 		 * keep counting and scanning.
 		 */
 		if (next_bit == -1) {
-			buffer_offset = first_bit * XFS_BLI_CHUNK;
+			buffer_offset = first_bit * XFS_BLF_CHUNK;
 			vecp->i_addr = xfs_buf_offset(bp, buffer_offset);
-			vecp->i_len = nbits * XFS_BLI_CHUNK;
+			vecp->i_len = nbits * XFS_BLF_CHUNK;
 			vecp->i_type = XLOG_REG_TYPE_BCHUNK;
 			nvecs++;
 			break;
 		} else if (next_bit != last_bit + 1) {
-			buffer_offset = first_bit * XFS_BLI_CHUNK;
+			buffer_offset = first_bit * XFS_BLF_CHUNK;
 			vecp->i_addr = xfs_buf_offset(bp, buffer_offset);
-			vecp->i_len = nbits * XFS_BLI_CHUNK;
+			vecp->i_len = nbits * XFS_BLF_CHUNK;
 			vecp->i_type = XLOG_REG_TYPE_BCHUNK;
 			nvecs++;
 			vecp++;
 			first_bit = next_bit;
 			last_bit = next_bit;
 			nbits = 1;
-		} else if (xfs_buf_offset(bp, next_bit << XFS_BLI_SHIFT) !=
-			   (xfs_buf_offset(bp, last_bit << XFS_BLI_SHIFT) +
-			    XFS_BLI_CHUNK)) {
-			buffer_offset = first_bit * XFS_BLI_CHUNK;
+		} else if (xfs_buf_offset(bp, next_bit << XFS_BLF_SHIFT) !=
+			   (xfs_buf_offset(bp, last_bit << XFS_BLF_SHIFT) +
+			    XFS_BLF_CHUNK)) {
+			buffer_offset = first_bit * XFS_BLF_CHUNK;
 			vecp->i_addr = xfs_buf_offset(bp, buffer_offset);
-			vecp->i_len = nbits * XFS_BLI_CHUNK;
+			vecp->i_len = nbits * XFS_BLF_CHUNK;
 			vecp->i_type = XLOG_REG_TYPE_BCHUNK;
 /* You would think we need to bump the nvecs here too, but we do not
  * this number is used by recovery, and it gets confused by the boundary
@@ -341,10 +355,15 @@ xfs_buf_item_format(
 }
 
 /*
- * This is called to pin the buffer associated with the buf log
- * item in memory so it cannot be written out.  Simply call bpin()
- * on the buffer to do this.
+ * This is called to pin the buffer associated with the buf log item in memory
+ * so it cannot be written out.  Simply call bpin() on the buffer to do this.
+ *
+ * We also always take a reference to the buffer log item here so that the bli
+ * is held while the item is pinned in memory. This means that we can
+ * unconditionally drop the reference count a transaction holds when the
+ * transaction is completed.
  */
+
 STATIC void
 xfs_buf_item_pin(
 	xfs_buf_log_item_t	*bip)
@@ -356,6 +375,7 @@ xfs_buf_item_pin(
 	ASSERT(atomic_read(&bip->bli_refcount) > 0);
 	ASSERT((bip->bli_flags & XFS_BLI_LOGGED) ||
 	       (bip->bli_flags & XFS_BLI_STALE));
+	atomic_inc(&bip->bli_refcount);
 	trace_xfs_buf_item_pin(bip);
 	xfs_bpin(bp);
 }
@@ -393,7 +413,7 @@ xfs_buf_item_unpin(
 		ASSERT(XFS_BUF_VALUSEMA(bp) <= 0);
 		ASSERT(!(XFS_BUF_ISDELAYWRITE(bp)));
 		ASSERT(XFS_BUF_ISSTALE(bp));
-		ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
+		ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
 		trace_xfs_buf_item_unpin_stale(bip);
 
 		/*
@@ -489,20 +509,23 @@ xfs_buf_item_trylock(
 }
 
 /*
- * Release the buffer associated with the buf log item.
- * If there is no dirty logged data associated with the
- * buffer recorded in the buf log item, then free the
- * buf log item and remove the reference to it in the
- * buffer.
+ * Release the buffer associated with the buf log item.  If there is no dirty
+ * logged data associated with the buffer recorded in the buf log item, then
+ * free the buf log item and remove the reference to it in the buffer.
+ *
+ * This call ignores the recursion count.  It is only called when the buffer
+ * should REALLY be unlocked, regardless of the recursion count.
  *
- * This call ignores the recursion count.  It is only called
- * when the buffer should REALLY be unlocked, regardless
- * of the recursion count.
+ * We unconditionally drop the transaction's reference to the log item. If the
+ * item was logged, then another reference was taken when it was pinned, so we
+ * can safely drop the transaction reference now.  This also allows us to avoid
+ * potential races with the unpin code freeing the bli by not referencing the
+ * bli after we've dropped the reference count.
  *
- * If the XFS_BLI_HOLD flag is set in the buf log item, then
- * free the log item if necessary but do not unlock the buffer.
- * This is for support of xfs_trans_bhold(). Make sure the
- * XFS_BLI_HOLD field is cleared if we don't free the item.
+ * If the XFS_BLI_HOLD flag is set in the buf log item, then free the log item
+ * if necessary but do not unlock the buffer.  This is for support of
+ * xfs_trans_bhold(). Make sure the XFS_BLI_HOLD field is cleared if we don't
+ * free the item.
  */
 STATIC void
 xfs_buf_item_unlock(
@@ -514,73 +537,54 @@ xfs_buf_item_unlock(
 
 	bp = bip->bli_buf;
 
-	/*
-	 * Clear the buffer's association with this transaction.
-	 */
+	/* Clear the buffer's association with this transaction. */
 	XFS_BUF_SET_FSPRIVATE2(bp, NULL);
 
 	/*
-	 * If this is a transaction abort, don't return early.
-	 * Instead, allow the brelse to happen.
-	 * Normally it would be done for stale (cancelled) buffers
-	 * at unpin time, but we'll never go through the pin/unpin
-	 * cycle if we abort inside commit.
+	 * If this is a transaction abort, don't return early.  Instead, allow
+	 * the brelse to happen.  Normally it would be done for stale
+	 * (cancelled) buffers at unpin time, but we'll never go through the
+	 * pin/unpin cycle if we abort inside commit.
 	 */
 	aborted = (bip->bli_item.li_flags & XFS_LI_ABORTED) != 0;
 
 	/*
-	 * If the buf item is marked stale, then don't do anything.
-	 * We'll unlock the buffer and free the buf item when the
-	 * buffer is unpinned for the last time.
+	 * Before possibly freeing the buf item, determine if we should
+	 * release the buffer at the end of this routine.
 	 */
-	if (bip->bli_flags & XFS_BLI_STALE) {
-		bip->bli_flags &= ~XFS_BLI_LOGGED;
-		trace_xfs_buf_item_unlock_stale(bip);
-		ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
-		if (!aborted)
-			return;
-	}
+	hold = bip->bli_flags & XFS_BLI_HOLD;
+
+	/* Clear the per transaction state. */
+	bip->bli_flags &= ~(XFS_BLI_LOGGED | XFS_BLI_HOLD);
 
 	/*
-	 * Drop the transaction's reference to the log item if
-	 * it was not logged as part of the transaction.  Otherwise
-	 * we'll drop the reference in xfs_buf_item_unpin() when
-	 * the transaction is really through with the buffer.
+	 * If the buf item is marked stale, then don't do anything.  We'll
+	 * unlock the buffer and free the buf item when the buffer is unpinned
+	 * for the last time.
 	 */
-	if (!(bip->bli_flags & XFS_BLI_LOGGED)) {
-		atomic_dec(&bip->bli_refcount);
-	} else {
-		/*
-		 * Clear the logged flag since this is per
-		 * transaction state.
-		 */
-		bip->bli_flags &= ~XFS_BLI_LOGGED;
+	if (bip->bli_flags & XFS_BLI_STALE) {
+		trace_xfs_buf_item_unlock_stale(bip);
+		ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
+		if (!aborted) {
+			atomic_dec(&bip->bli_refcount);
+			return;
+		}
 	}
 
-	/*
-	 * Before possibly freeing the buf item, determine if we should
-	 * release the buffer at the end of this routine.
-	 */
-	hold = bip->bli_flags & XFS_BLI_HOLD;
 	trace_xfs_buf_item_unlock(bip);
 
 	/*
-	 * If the buf item isn't tracking any data, free it.
-	 * Otherwise, if XFS_BLI_HOLD is set clear it.
+	 * If the buf item isn't tracking any data, free it, otherwise drop the
+	 * reference we hold to it.
 	 */
 	if (xfs_bitmap_empty(bip->bli_format.blf_data_map,
-			     bip->bli_format.blf_map_size)) {
+			     bip->bli_format.blf_map_size))
 		xfs_buf_item_relse(bp);
-	} else if (hold) {
-		bip->bli_flags &= ~XFS_BLI_HOLD;
-	}
+	else
+		atomic_dec(&bip->bli_refcount);
 
-	/*
-	 * Release the buffer if XFS_BLI_HOLD was not set.
-	 */
-	if (!hold) {
+	if (!hold)
 		xfs_buf_relse(bp);
-	}
 }
 
 /*
@@ -717,12 +721,12 @@ xfs_buf_item_init(
 	}
 
 	/*
-	 * chunks is the number of XFS_BLI_CHUNK size pieces
+	 * chunks is the number of XFS_BLF_CHUNK size pieces
 	 * the buffer can be divided into. Make sure not to
 	 * truncate any pieces.  map_size is the size of the
 	 * bitmap needed to describe the chunks of the buffer.
 	 */
-	chunks = (int)((XFS_BUF_COUNT(bp) + (XFS_BLI_CHUNK - 1)) >> XFS_BLI_SHIFT);
+	chunks = (int)((XFS_BUF_COUNT(bp) + (XFS_BLF_CHUNK - 1)) >> XFS_BLF_SHIFT);
 	map_size = (int)((chunks + NBWORD) >> BIT_TO_WORD_SHIFT);
 
 	bip = (xfs_buf_log_item_t*)kmem_zone_zalloc(xfs_buf_item_zone,
@@ -790,8 +794,8 @@ xfs_buf_item_log(
 	/*
 	 * Convert byte offsets to bit numbers.
 	 */
-	first_bit = first >> XFS_BLI_SHIFT;
-	last_bit = last >> XFS_BLI_SHIFT;
+	first_bit = first >> XFS_BLF_SHIFT;
+	last_bit = last >> XFS_BLF_SHIFT;
 
 	/*
 	 * Calculate the total number of bits to be set.
diff --git a/fs/xfs/xfs_buf_item.h b/fs/xfs/xfs_buf_item.h
index df4454511f73..f20bb472d582 100644
--- a/fs/xfs/xfs_buf_item.h
+++ b/fs/xfs/xfs_buf_item.h
@@ -41,22 +41,22 @@ typedef struct xfs_buf_log_format {
  * This flag indicates that the buffer contains on disk inodes
  * and requires special recovery handling.
  */
-#define	XFS_BLI_INODE_BUF	0x1
+#define	XFS_BLF_INODE_BUF	0x1
 /*
  * This flag indicates that the buffer should not be replayed
  * during recovery because its blocks are being freed.
  */
-#define	XFS_BLI_CANCEL		0x2
+#define	XFS_BLF_CANCEL		0x2
 /*
  * This flag indicates that the buffer contains on disk
  * user or group dquots and may require special recovery handling.
  */
-#define	XFS_BLI_UDQUOT_BUF	0x4
-#define XFS_BLI_PDQUOT_BUF	0x8
-#define	XFS_BLI_GDQUOT_BUF	0x10
+#define	XFS_BLF_UDQUOT_BUF	0x4
+#define XFS_BLF_PDQUOT_BUF	0x8
+#define	XFS_BLF_GDQUOT_BUF	0x10
 
-#define	XFS_BLI_CHUNK		128
-#define	XFS_BLI_SHIFT		7
+#define	XFS_BLF_CHUNK		128
+#define	XFS_BLF_SHIFT		7
 #define	BIT_TO_WORD_SHIFT	5
 #define	NBWORD			(NBBY * sizeof(unsigned int))
 
@@ -69,6 +69,7 @@ typedef struct xfs_buf_log_format {
 #define	XFS_BLI_LOGGED		0x08
 #define	XFS_BLI_INODE_ALLOC_BUF	0x10
 #define XFS_BLI_STALE_INODE	0x20
+#define	XFS_BLI_INODE_BUF	0x40
 
 #define XFS_BLI_FLAGS \
 	{ XFS_BLI_HOLD,		"HOLD" }, \
@@ -76,7 +77,8 @@ typedef struct xfs_buf_log_format {
 	{ XFS_BLI_STALE,	"STALE" }, \
 	{ XFS_BLI_LOGGED,	"LOGGED" }, \
 	{ XFS_BLI_INODE_ALLOC_BUF, "INODE_ALLOC" }, \
-	{ XFS_BLI_STALE_INODE,	"STALE_INODE" }
+	{ XFS_BLI_STALE_INODE,	"STALE_INODE" }, \
+	{ XFS_BLI_INODE_BUF,	"INODE_BUF" }
 
 
 #ifdef __KERNEL__
diff --git a/fs/xfs/xfs_error.c b/fs/xfs/xfs_error.c
index ef96175c0744..047b8a8e5c29 100644
--- a/fs/xfs/xfs_error.c
+++ b/fs/xfs/xfs_error.c
@@ -170,7 +170,7 @@ xfs_cmn_err(int panic_tag, int level, xfs_mount_t *mp, char *fmt, ...)
 	va_list ap;
 
 #ifdef DEBUG
-	xfs_panic_mask |= XFS_PTAG_SHUTDOWN_CORRUPT;
+	xfs_panic_mask |= (XFS_PTAG_SHUTDOWN_CORRUPT | XFS_PTAG_LOGRES);
 #endif
 
 	if (xfs_panic_mask && (xfs_panic_mask & panic_tag)
diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c
index 3038dd52c72a..5215abc8023a 100644
--- a/fs/xfs/xfs_log.c
+++ b/fs/xfs/xfs_log.c
@@ -54,9 +54,6 @@ STATIC xlog_t *  xlog_alloc_log(xfs_mount_t	*mp,
 STATIC int	 xlog_space_left(xlog_t *log, int cycle, int bytes);
 STATIC int	 xlog_sync(xlog_t *log, xlog_in_core_t *iclog);
 STATIC void	 xlog_dealloc_log(xlog_t *log);
-STATIC int	 xlog_write(struct log *log, struct xfs_log_vec *log_vector,
-			    struct xlog_ticket *tic, xfs_lsn_t *start_lsn,
-			    xlog_in_core_t **commit_iclog, uint flags);
 
 /* local state machine functions */
 STATIC void xlog_state_done_syncing(xlog_in_core_t *iclog, int);
@@ -86,14 +83,6 @@ STATIC int xlog_regrant_write_log_space(xlog_t		*log,
 STATIC void xlog_ungrant_log_space(xlog_t	 *log,
 				   xlog_ticket_t *ticket);
 
-
-/* local ticket functions */
-STATIC xlog_ticket_t	*xlog_ticket_alloc(xlog_t *log,
-					 int	unit_bytes,
-					 int	count,
-					 char	clientid,
-					 uint	flags);
-
 #if defined(DEBUG)
 STATIC void	xlog_verify_dest_ptr(xlog_t *log, char *ptr);
 STATIC void	xlog_verify_grant_head(xlog_t *log, int equals);
@@ -360,6 +349,15 @@ xfs_log_reserve(
 		ASSERT(flags & XFS_LOG_PERM_RESERV);
 		internal_ticket = *ticket;
 
+		/*
+		 * this is a new transaction on the ticket, so we need to
+		 * change the transaction ID so that the next transaction has a
+		 * different TID in the log. Just add one to the existing tid
+		 * so that we can see chains of rolling transactions in the log
+		 * easily.
+		 */
+		internal_ticket->t_tid++;
+
 		trace_xfs_log_reserve(log, internal_ticket);
 
 		xlog_grant_push_ail(mp, internal_ticket->t_unit_res);
@@ -367,7 +365,8 @@ xfs_log_reserve(
 	} else {
 		/* may sleep if need to allocate more tickets */
 		internal_ticket = xlog_ticket_alloc(log, unit_bytes, cnt,
-						  client, flags);
+						  client, flags,
+						  KM_SLEEP|KM_MAYFAIL);
 		if (!internal_ticket)
 			return XFS_ERROR(ENOMEM);
 		internal_ticket->t_trans_type = t_type;
@@ -452,6 +451,13 @@ xfs_log_mount(
 	/* Normal transactions can now occur */
 	mp->m_log->l_flags &= ~XLOG_ACTIVE_RECOVERY;
 
+	/*
+	 * Now the log has been fully initialised and we know were our
+	 * space grant counters are, we can initialise the permanent ticket
+	 * needed for delayed logging to work.
+	 */
+	xlog_cil_init_post_recovery(mp->m_log);
+
 	return 0;
 
 out_destroy_ail:
@@ -658,6 +664,10 @@ xfs_log_item_init(
 	item->li_ailp = mp->m_ail;
 	item->li_type = type;
 	item->li_ops = ops;
+	item->li_lv = NULL;
+
+	INIT_LIST_HEAD(&item->li_ail);
+	INIT_LIST_HEAD(&item->li_cil);
 }
 
 /*
@@ -1168,6 +1178,9 @@ xlog_alloc_log(xfs_mount_t	*mp,
 	*iclogp = log->l_iclog;			/* complete ring */
 	log->l_iclog->ic_prev = prev_iclog;	/* re-write 1st prev ptr */
 
+	error = xlog_cil_init(log);
+	if (error)
+		goto out_free_iclog;
 	return log;
 
 out_free_iclog:
@@ -1494,6 +1507,8 @@ xlog_dealloc_log(xlog_t *log)
 	xlog_in_core_t	*iclog, *next_iclog;
 	int		i;
 
+	xlog_cil_destroy(log);
+
 	iclog = log->l_iclog;
 	for (i=0; i<log->l_iclog_bufs; i++) {
 		sv_destroy(&iclog->ic_force_wait);
@@ -1536,8 +1551,10 @@ xlog_state_finish_copy(xlog_t		*log,
  * print out info relating to regions written which consume
  * the reservation
  */
-STATIC void
-xlog_print_tic_res(xfs_mount_t *mp, xlog_ticket_t *ticket)
+void
+xlog_print_tic_res(
+	struct xfs_mount	*mp,
+	struct xlog_ticket	*ticket)
 {
 	uint i;
 	uint ophdr_spc = ticket->t_res_num_ophdrs * (uint)sizeof(xlog_op_header_t);
@@ -1637,6 +1654,10 @@ xlog_print_tic_res(xfs_mount_t *mp, xlog_ticket_t *ticket)
 			    "bad-rtype" : res_type_str[r_type-1]),
 			    ticket->t_res_arr[i].r_len);
 	}
+
+	xfs_cmn_err(XFS_PTAG_LOGRES, CE_ALERT, mp,
+		"xfs_log_write: reservation ran out. Need to up reservation");
+	xfs_force_shutdown(mp, SHUTDOWN_CORRUPT_INCORE);
 }
 
 /*
@@ -1865,7 +1886,7 @@ xlog_write_copy_finish(
  *	we don't update ic_offset until the end when we know exactly how many
  *	bytes have been written out.
  */
-STATIC int
+int
 xlog_write(
 	struct log		*log,
 	struct xfs_log_vec	*log_vector,
@@ -1889,22 +1910,26 @@ xlog_write(
 	*start_lsn = 0;
 
 	len = xlog_write_calc_vec_length(ticket, log_vector);
-	if (ticket->t_curr_res < len) {
-		xlog_print_tic_res(log->l_mp, ticket);
-#ifdef DEBUG
-		xlog_panic(
-	"xfs_log_write: reservation ran out. Need to up reservation");
-#else
-		/* Customer configurable panic */
-		xfs_cmn_err(XFS_PTAG_LOGRES, CE_ALERT, log->l_mp,
-	"xfs_log_write: reservation ran out. Need to up reservation");
+	if (log->l_cilp) {
+		/*
+		 * Region headers and bytes are already accounted for.
+		 * We only need to take into account start records and
+		 * split regions in this function.
+		 */
+		if (ticket->t_flags & XLOG_TIC_INITED)
+			ticket->t_curr_res -= sizeof(xlog_op_header_t);
 
-		/* If we did not panic, shutdown the filesystem */
-		xfs_force_shutdown(log->l_mp, SHUTDOWN_CORRUPT_INCORE);
-#endif
-	}
+		/*
+		 * Commit record headers need to be accounted for. These
+		 * come in as separate writes so are easy to detect.
+		 */
+		if (flags & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS))
+			ticket->t_curr_res -= sizeof(xlog_op_header_t);
+	} else
+		ticket->t_curr_res -= len;
 
-	ticket->t_curr_res -= len;
+	if (ticket->t_curr_res < 0)
+		xlog_print_tic_res(log->l_mp, ticket);
 
 	index = 0;
 	lv = log_vector;
@@ -3000,6 +3025,8 @@ _xfs_log_force(
 
 	XFS_STATS_INC(xs_log_force);
 
+	xlog_cil_push(log, 1);
+
 	spin_lock(&log->l_icloglock);
 
 	iclog = log->l_iclog;
@@ -3149,6 +3176,12 @@ _xfs_log_force_lsn(
 
 	XFS_STATS_INC(xs_log_force);
 
+	if (log->l_cilp) {
+		lsn = xlog_cil_push_lsn(log, lsn);
+		if (lsn == NULLCOMMITLSN)
+			return 0;
+	}
+
 try_again:
 	spin_lock(&log->l_icloglock);
 	iclog = log->l_iclog;
@@ -3313,22 +3346,30 @@ xfs_log_ticket_get(
 	return ticket;
 }
 
+xlog_tid_t
+xfs_log_get_trans_ident(
+	struct xfs_trans	*tp)
+{
+	return tp->t_ticket->t_tid;
+}
+
 /*
  * Allocate and initialise a new log ticket.
  */
-STATIC xlog_ticket_t *
+xlog_ticket_t *
 xlog_ticket_alloc(
 	struct log	*log,
 	int		unit_bytes,
 	int		cnt,
 	char		client,
-	uint		xflags)
+	uint		xflags,
+	int		alloc_flags)
 {
 	struct xlog_ticket *tic;
 	uint		num_headers;
 	int		iclog_space;
 
-	tic = kmem_zone_zalloc(xfs_log_ticket_zone, KM_SLEEP|KM_MAYFAIL);
+	tic = kmem_zone_zalloc(xfs_log_ticket_zone, alloc_flags);
 	if (!tic)
 		return NULL;
 
@@ -3647,6 +3688,11 @@ xlog_state_ioerror(
  *	c. nothing new gets queued up after (a) and (b) are done.
  *	d. if !logerror, flush the iclogs to disk, then seal them off
  *	   for business.
+ *
+ * Note: for delayed logging the !logerror case needs to flush the regions
+ * held in memory out to the iclogs before flushing them to disk. This needs
+ * to be done before the log is marked as shutdown, otherwise the flush to the
+ * iclogs will fail.
  */
 int
 xfs_log_force_umount(
@@ -3680,6 +3726,16 @@ xfs_log_force_umount(
 		return 1;
 	}
 	retval = 0;
+
+	/*
+	 * Flush the in memory commit item list before marking the log as
+	 * being shut down. We need to do it in this order to ensure all the
+	 * completed transactions are flushed to disk with the xfs_log_force()
+	 * call below.
+	 */
+	if (!logerror && (mp->m_flags & XFS_MOUNT_DELAYLOG))
+		xlog_cil_push(log, 1);
+
 	/*
 	 * We must hold both the GRANT lock and the LOG lock,
 	 * before we mark the filesystem SHUTDOWN and wake
diff --git a/fs/xfs/xfs_log.h b/fs/xfs/xfs_log.h
index 229d1f36ba9a..04c78e642cc8 100644
--- a/fs/xfs/xfs_log.h
+++ b/fs/xfs/xfs_log.h
@@ -19,7 +19,6 @@
 #define __XFS_LOG_H__
 
 /* get lsn fields */
-
 #define CYCLE_LSN(lsn) ((uint)((lsn)>>32))
 #define BLOCK_LSN(lsn) ((uint)(lsn))
 
@@ -114,6 +113,9 @@ struct xfs_log_vec {
 	struct xfs_log_vec	*lv_next;	/* next lv in build list */
 	int			lv_niovecs;	/* number of iovecs in lv */
 	struct xfs_log_iovec	*lv_iovecp;	/* iovec array */
+	struct xfs_log_item	*lv_item;	/* owner */
+	char			*lv_buf;	/* formatted buffer */
+	int			lv_buf_len;	/* size of formatted buffer */
 };
 
 /*
@@ -134,6 +136,7 @@ struct xlog_in_core;
 struct xlog_ticket;
 struct xfs_log_item;
 struct xfs_item_ops;
+struct xfs_trans;
 
 void	xfs_log_item_init(struct xfs_mount	*mp,
 			struct xfs_log_item	*item,
@@ -187,9 +190,16 @@ int	  xfs_log_need_covered(struct xfs_mount *mp);
 
 void	  xlog_iodone(struct xfs_buf *);
 
-struct xlog_ticket * xfs_log_ticket_get(struct xlog_ticket *ticket);
+struct xlog_ticket *xfs_log_ticket_get(struct xlog_ticket *ticket);
 void	  xfs_log_ticket_put(struct xlog_ticket *ticket);
 
+xlog_tid_t xfs_log_get_trans_ident(struct xfs_trans *tp);
+
+int	xfs_log_commit_cil(struct xfs_mount *mp, struct xfs_trans *tp,
+				struct xfs_log_vec *log_vector,
+				xfs_lsn_t *commit_lsn, int flags);
+bool	xfs_log_item_in_current_chkpt(struct xfs_log_item *lip);
+
 #endif
 
 
diff --git a/fs/xfs/xfs_log_cil.c b/fs/xfs/xfs_log_cil.c
new file mode 100644
index 000000000000..bb17cc044bf3
--- /dev/null
+++ b/fs/xfs/xfs_log_cil.c
@@ -0,0 +1,725 @@
+/*
+ * Copyright (c) 2010 Red Hat, Inc. All Rights Reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it would be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write the Free Software Foundation,
+ * Inc.,  51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ */
+
+#include "xfs.h"
+#include "xfs_fs.h"
+#include "xfs_types.h"
+#include "xfs_bit.h"
+#include "xfs_log.h"
+#include "xfs_inum.h"
+#include "xfs_trans.h"
+#include "xfs_trans_priv.h"
+#include "xfs_log_priv.h"
+#include "xfs_sb.h"
+#include "xfs_ag.h"
+#include "xfs_dir2.h"
+#include "xfs_dmapi.h"
+#include "xfs_mount.h"
+#include "xfs_error.h"
+#include "xfs_alloc.h"
+
+/*
+ * Perform initial CIL structure initialisation. If the CIL is not
+ * enabled in this filesystem, ensure the log->l_cilp is null so
+ * we can check this conditional to determine if we are doing delayed
+ * logging or not.
+ */
+int
+xlog_cil_init(
+	struct log	*log)
+{
+	struct xfs_cil	*cil;
+	struct xfs_cil_ctx *ctx;
+
+	log->l_cilp = NULL;
+	if (!(log->l_mp->m_flags & XFS_MOUNT_DELAYLOG))
+		return 0;
+
+	cil = kmem_zalloc(sizeof(*cil), KM_SLEEP|KM_MAYFAIL);
+	if (!cil)
+		return ENOMEM;
+
+	ctx = kmem_zalloc(sizeof(*ctx), KM_SLEEP|KM_MAYFAIL);
+	if (!ctx) {
+		kmem_free(cil);
+		return ENOMEM;
+	}
+
+	INIT_LIST_HEAD(&cil->xc_cil);
+	INIT_LIST_HEAD(&cil->xc_committing);
+	spin_lock_init(&cil->xc_cil_lock);
+	init_rwsem(&cil->xc_ctx_lock);
+	sv_init(&cil->xc_commit_wait, SV_DEFAULT, "cilwait");
+
+	INIT_LIST_HEAD(&ctx->committing);
+	INIT_LIST_HEAD(&ctx->busy_extents);
+	ctx->sequence = 1;
+	ctx->cil = cil;
+	cil->xc_ctx = ctx;
+
+	cil->xc_log = log;
+	log->l_cilp = cil;
+	return 0;
+}
+
+void
+xlog_cil_destroy(
+	struct log	*log)
+{
+	if (!log->l_cilp)
+		return;
+
+	if (log->l_cilp->xc_ctx) {
+		if (log->l_cilp->xc_ctx->ticket)
+			xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
+		kmem_free(log->l_cilp->xc_ctx);
+	}
+
+	ASSERT(list_empty(&log->l_cilp->xc_cil));
+	kmem_free(log->l_cilp);
+}
+
+/*
+ * Allocate a new ticket. Failing to get a new ticket makes it really hard to
+ * recover, so we don't allow failure here. Also, we allocate in a context that
+ * we don't want to be issuing transactions from, so we need to tell the
+ * allocation code this as well.
+ *
+ * We don't reserve any space for the ticket - we are going to steal whatever
+ * space we require from transactions as they commit. To ensure we reserve all
+ * the space required, we need to set the current reservation of the ticket to
+ * zero so that we know to steal the initial transaction overhead from the
+ * first transaction commit.
+ */
+static struct xlog_ticket *
+xlog_cil_ticket_alloc(
+	struct log	*log)
+{
+	struct xlog_ticket *tic;
+
+	tic = xlog_ticket_alloc(log, 0, 1, XFS_TRANSACTION, 0,
+				KM_SLEEP|KM_NOFS);
+	tic->t_trans_type = XFS_TRANS_CHECKPOINT;
+
+	/*
+	 * set the current reservation to zero so we know to steal the basic
+	 * transaction overhead reservation from the first transaction commit.
+	 */
+	tic->t_curr_res = 0;
+	return tic;
+}
+
+/*
+ * After the first stage of log recovery is done, we know where the head and
+ * tail of the log are. We need this log initialisation done before we can
+ * initialise the first CIL checkpoint context.
+ *
+ * Here we allocate a log ticket to track space usage during a CIL push.  This
+ * ticket is passed to xlog_write() directly so that we don't slowly leak log
+ * space by failing to account for space used by log headers and additional
+ * region headers for split regions.
+ */
+void
+xlog_cil_init_post_recovery(
+	struct log	*log)
+{
+	if (!log->l_cilp)
+		return;
+
+	log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log);
+	log->l_cilp->xc_ctx->sequence = 1;
+	log->l_cilp->xc_ctx->commit_lsn = xlog_assign_lsn(log->l_curr_cycle,
+								log->l_curr_block);
+}
+
+/*
+ * Insert the log item into the CIL and calculate the difference in space
+ * consumed by the item. Add the space to the checkpoint ticket and calculate
+ * if the change requires additional log metadata. If it does, take that space
+ * as well. Remove the amount of space we addded to the checkpoint ticket from
+ * the current transaction ticket so that the accounting works out correctly.
+ *
+ * If this is the first time the item is being placed into the CIL in this
+ * context, pin it so it can't be written to disk until the CIL is flushed to
+ * the iclog and the iclog written to disk.
+ */
+static void
+xlog_cil_insert(
+	struct log		*log,
+	struct xlog_ticket	*ticket,
+	struct xfs_log_item	*item,
+	struct xfs_log_vec	*lv)
+{
+	struct xfs_cil		*cil = log->l_cilp;
+	struct xfs_log_vec	*old = lv->lv_item->li_lv;
+	struct xfs_cil_ctx	*ctx = cil->xc_ctx;
+	int			len;
+	int			diff_iovecs;
+	int			iclog_space;
+
+	if (old) {
+		/* existing lv on log item, space used is a delta */
+		ASSERT(!list_empty(&item->li_cil));
+		ASSERT(old->lv_buf && old->lv_buf_len && old->lv_niovecs);
+
+		len = lv->lv_buf_len - old->lv_buf_len;
+		diff_iovecs = lv->lv_niovecs - old->lv_niovecs;
+		kmem_free(old->lv_buf);
+		kmem_free(old);
+	} else {
+		/* new lv, must pin the log item */
+		ASSERT(!lv->lv_item->li_lv);
+		ASSERT(list_empty(&item->li_cil));
+
+		len = lv->lv_buf_len;
+		diff_iovecs = lv->lv_niovecs;
+		IOP_PIN(lv->lv_item);
+
+	}
+	len += diff_iovecs * sizeof(xlog_op_header_t);
+
+	/* attach new log vector to log item */
+	lv->lv_item->li_lv = lv;
+
+	spin_lock(&cil->xc_cil_lock);
+	list_move_tail(&item->li_cil, &cil->xc_cil);
+	ctx->nvecs += diff_iovecs;
+
+	/*
+	 * If this is the first time the item is being committed to the CIL,
+	 * store the sequence number on the log item so we can tell
+	 * in future commits whether this is the first checkpoint the item is
+	 * being committed into.
+	 */
+	if (!item->li_seq)
+		item->li_seq = ctx->sequence;
+
+	/*
+	 * Now transfer enough transaction reservation to the context ticket
+	 * for the checkpoint. The context ticket is special - the unit
+	 * reservation has to grow as well as the current reservation as we
+	 * steal from tickets so we can correctly determine the space used
+	 * during the transaction commit.
+	 */
+	if (ctx->ticket->t_curr_res == 0) {
+		/* first commit in checkpoint, steal the header reservation */
+		ASSERT(ticket->t_curr_res >= ctx->ticket->t_unit_res + len);
+		ctx->ticket->t_curr_res = ctx->ticket->t_unit_res;
+		ticket->t_curr_res -= ctx->ticket->t_unit_res;
+	}
+
+	/* do we need space for more log record headers? */
+	iclog_space = log->l_iclog_size - log->l_iclog_hsize;
+	if (len > 0 && (ctx->space_used / iclog_space !=
+				(ctx->space_used + len) / iclog_space)) {
+		int hdrs;
+
+		hdrs = (len + iclog_space - 1) / iclog_space;
+		/* need to take into account split region headers, too */
+		hdrs *= log->l_iclog_hsize + sizeof(struct xlog_op_header);
+		ctx->ticket->t_unit_res += hdrs;
+		ctx->ticket->t_curr_res += hdrs;
+		ticket->t_curr_res -= hdrs;
+		ASSERT(ticket->t_curr_res >= len);
+	}
+	ticket->t_curr_res -= len;
+	ctx->space_used += len;
+
+	spin_unlock(&cil->xc_cil_lock);
+}
+
+/*
+ * Format log item into a flat buffers
+ *
+ * For delayed logging, we need to hold a formatted buffer containing all the
+ * changes on the log item. This enables us to relog the item in memory and
+ * write it out asynchronously without needing to relock the object that was
+ * modified at the time it gets written into the iclog.
+ *
+ * This function builds a vector for the changes in each log item in the
+ * transaction. It then works out the length of the buffer needed for each log
+ * item, allocates them and formats the vector for the item into the buffer.
+ * The buffer is then attached to the log item are then inserted into the
+ * Committed Item List for tracking until the next checkpoint is written out.
+ *
+ * We don't set up region headers during this process; we simply copy the
+ * regions into the flat buffer. We can do this because we still have to do a
+ * formatting step to write the regions into the iclog buffer.  Writing the
+ * ophdrs during the iclog write means that we can support splitting large
+ * regions across iclog boundares without needing a change in the format of the
+ * item/region encapsulation.
+ *
+ * Hence what we need to do now is change the rewrite the vector array to point
+ * to the copied region inside the buffer we just allocated. This allows us to
+ * format the regions into the iclog as though they are being formatted
+ * directly out of the objects themselves.
+ */
+static void
+xlog_cil_format_items(
+	struct log		*log,
+	struct xfs_log_vec	*log_vector,
+	struct xlog_ticket	*ticket,
+	xfs_lsn_t		*start_lsn)
+{
+	struct xfs_log_vec *lv;
+
+	if (start_lsn)
+		*start_lsn = log->l_cilp->xc_ctx->sequence;
+
+	ASSERT(log_vector);
+	for (lv = log_vector; lv; lv = lv->lv_next) {
+		void	*ptr;
+		int	index;
+		int	len = 0;
+
+		/* build the vector array and calculate it's length */
+		IOP_FORMAT(lv->lv_item, lv->lv_iovecp);
+		for (index = 0; index < lv->lv_niovecs; index++)
+			len += lv->lv_iovecp[index].i_len;
+
+		lv->lv_buf_len = len;
+		lv->lv_buf = kmem_zalloc(lv->lv_buf_len, KM_SLEEP|KM_NOFS);
+		ptr = lv->lv_buf;
+
+		for (index = 0; index < lv->lv_niovecs; index++) {
+			struct xfs_log_iovec *vec = &lv->lv_iovecp[index];
+
+			memcpy(ptr, vec->i_addr, vec->i_len);
+			vec->i_addr = ptr;
+			ptr += vec->i_len;
+		}
+		ASSERT(ptr == lv->lv_buf + lv->lv_buf_len);
+
+		xlog_cil_insert(log, ticket, lv->lv_item, lv);
+	}
+}
+
+static void
+xlog_cil_free_logvec(
+	struct xfs_log_vec	*log_vector)
+{
+	struct xfs_log_vec	*lv;
+
+	for (lv = log_vector; lv; ) {
+		struct xfs_log_vec *next = lv->lv_next;
+		kmem_free(lv->lv_buf);
+		kmem_free(lv);
+		lv = next;
+	}
+}
+
+/*
+ * Commit a transaction with the given vector to the Committed Item List.
+ *
+ * To do this, we need to format the item, pin it in memory if required and
+ * account for the space used by the transaction. Once we have done that we
+ * need to release the unused reservation for the transaction, attach the
+ * transaction to the checkpoint context so we carry the busy extents through
+ * to checkpoint completion, and then unlock all the items in the transaction.
+ *
+ * For more specific information about the order of operations in
+ * xfs_log_commit_cil() please refer to the comments in
+ * xfs_trans_commit_iclog().
+ *
+ * Called with the context lock already held in read mode to lock out
+ * background commit, returns without it held once background commits are
+ * allowed again.
+ */
+int
+xfs_log_commit_cil(
+	struct xfs_mount	*mp,
+	struct xfs_trans	*tp,
+	struct xfs_log_vec	*log_vector,
+	xfs_lsn_t		*commit_lsn,
+	int			flags)
+{
+	struct log		*log = mp->m_log;
+	int			log_flags = 0;
+	int			push = 0;
+
+	if (flags & XFS_TRANS_RELEASE_LOG_RES)
+		log_flags = XFS_LOG_REL_PERM_RESERV;
+
+	if (XLOG_FORCED_SHUTDOWN(log)) {
+		xlog_cil_free_logvec(log_vector);
+		return XFS_ERROR(EIO);
+	}
+
+	/* lock out background commit */
+	down_read(&log->l_cilp->xc_ctx_lock);
+	xlog_cil_format_items(log, log_vector, tp->t_ticket, commit_lsn);
+
+	/* check we didn't blow the reservation */
+	if (tp->t_ticket->t_curr_res < 0)
+		xlog_print_tic_res(log->l_mp, tp->t_ticket);
+
+	/* attach the transaction to the CIL if it has any busy extents */
+	if (!list_empty(&tp->t_busy)) {
+		spin_lock(&log->l_cilp->xc_cil_lock);
+		list_splice_init(&tp->t_busy,
+					&log->l_cilp->xc_ctx->busy_extents);
+		spin_unlock(&log->l_cilp->xc_cil_lock);
+	}
+
+	tp->t_commit_lsn = *commit_lsn;
+	xfs_log_done(mp, tp->t_ticket, NULL, log_flags);
+	xfs_trans_unreserve_and_mod_sb(tp);
+
+	/* check for background commit before unlock */
+	if (log->l_cilp->xc_ctx->space_used > XLOG_CIL_SPACE_LIMIT(log))
+		push = 1;
+	up_read(&log->l_cilp->xc_ctx_lock);
+
+	/*
+	 * We need to push CIL every so often so we don't cache more than we
+	 * can fit in the log. The limit really is that a checkpoint can't be
+	 * more than half the log (the current checkpoint is not allowed to
+	 * overwrite the previous checkpoint), but commit latency and memory
+	 * usage limit this to a smaller size in most cases.
+	 */
+	if (push)
+		xlog_cil_push(log, 0);
+	return 0;
+}
+
+/*
+ * Mark all items committed and clear busy extents. We free the log vector
+ * chains in a separate pass so that we unpin the log items as quickly as
+ * possible.
+ */
+static void
+xlog_cil_committed(
+	void	*args,
+	int	abort)
+{
+	struct xfs_cil_ctx	*ctx = args;
+	struct xfs_log_vec	*lv;
+	int			abortflag = abort ? XFS_LI_ABORTED : 0;
+	struct xfs_busy_extent	*busyp, *n;
+
+	/* unpin all the log items */
+	for (lv = ctx->lv_chain; lv; lv = lv->lv_next ) {
+		xfs_trans_item_committed(lv->lv_item, ctx->start_lsn,
+							abortflag);
+	}
+
+	list_for_each_entry_safe(busyp, n, &ctx->busy_extents, list)
+		xfs_alloc_busy_clear(ctx->cil->xc_log->l_mp, busyp);
+
+	spin_lock(&ctx->cil->xc_cil_lock);
+	list_del(&ctx->committing);
+	spin_unlock(&ctx->cil->xc_cil_lock);
+
+	xlog_cil_free_logvec(ctx->lv_chain);
+	kmem_free(ctx);
+}
+
+/*
+ * Push the Committed Item List to the log. If the push_now flag is not set,
+ * then it is a background flush and so we can chose to ignore it.
+ */
+int
+xlog_cil_push(
+	struct log		*log,
+	int			push_now)
+{
+	struct xfs_cil		*cil = log->l_cilp;
+	struct xfs_log_vec	*lv;
+	struct xfs_cil_ctx	*ctx;
+	struct xfs_cil_ctx	*new_ctx;
+	struct xlog_in_core	*commit_iclog;
+	struct xlog_ticket	*tic;
+	int			num_lv;
+	int			num_iovecs;
+	int			len;
+	int			error = 0;
+	struct xfs_trans_header thdr;
+	struct xfs_log_iovec	lhdr;
+	struct xfs_log_vec	lvhdr = { NULL };
+	xfs_lsn_t		commit_lsn;
+
+	if (!cil)
+		return 0;
+
+	new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_SLEEP|KM_NOFS);
+	new_ctx->ticket = xlog_cil_ticket_alloc(log);
+
+	/* lock out transaction commit, but don't block on background push */
+	if (!down_write_trylock(&cil->xc_ctx_lock)) {
+		if (!push_now)
+			goto out_free_ticket;
+		down_write(&cil->xc_ctx_lock);
+	}
+	ctx = cil->xc_ctx;
+
+	/* check if we've anything to push */
+	if (list_empty(&cil->xc_cil))
+		goto out_skip;
+
+	/* check for spurious background flush */
+	if (!push_now && cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log))
+		goto out_skip;
+
+	/*
+	 * pull all the log vectors off the items in the CIL, and
+	 * remove the items from the CIL. We don't need the CIL lock
+	 * here because it's only needed on the transaction commit
+	 * side which is currently locked out by the flush lock.
+	 */
+	lv = NULL;
+	num_lv = 0;
+	num_iovecs = 0;
+	len = 0;
+	while (!list_empty(&cil->xc_cil)) {
+		struct xfs_log_item	*item;
+		int			i;
+
+		item = list_first_entry(&cil->xc_cil,
+					struct xfs_log_item, li_cil);
+		list_del_init(&item->li_cil);
+		if (!ctx->lv_chain)
+			ctx->lv_chain = item->li_lv;
+		else
+			lv->lv_next = item->li_lv;
+		lv = item->li_lv;
+		item->li_lv = NULL;
+
+		num_lv++;
+		num_iovecs += lv->lv_niovecs;
+		for (i = 0; i < lv->lv_niovecs; i++)
+			len += lv->lv_iovecp[i].i_len;
+	}
+
+	/*
+	 * initialise the new context and attach it to the CIL. Then attach
+	 * the current context to the CIL committing lsit so it can be found
+	 * during log forces to extract the commit lsn of the sequence that
+	 * needs to be forced.
+	 */
+	INIT_LIST_HEAD(&new_ctx->committing);
+	INIT_LIST_HEAD(&new_ctx->busy_extents);
+	new_ctx->sequence = ctx->sequence + 1;
+	new_ctx->cil = cil;
+	cil->xc_ctx = new_ctx;
+
+	/*
+	 * The switch is now done, so we can drop the context lock and move out
+	 * of a shared context. We can't just go straight to the commit record,
+	 * though - we need to synchronise with previous and future commits so
+	 * that the commit records are correctly ordered in the log to ensure
+	 * that we process items during log IO completion in the correct order.
+	 *
+	 * For example, if we get an EFI in one checkpoint and the EFD in the
+	 * next (e.g. due to log forces), we do not want the checkpoint with
+	 * the EFD to be committed before the checkpoint with the EFI.  Hence
+	 * we must strictly order the commit records of the checkpoints so
+	 * that: a) the checkpoint callbacks are attached to the iclogs in the
+	 * correct order; and b) the checkpoints are replayed in correct order
+	 * in log recovery.
+	 *
+	 * Hence we need to add this context to the committing context list so
+	 * that higher sequences will wait for us to write out a commit record
+	 * before they do.
+	 */
+	spin_lock(&cil->xc_cil_lock);
+	list_add(&ctx->committing, &cil->xc_committing);
+	spin_unlock(&cil->xc_cil_lock);
+	up_write(&cil->xc_ctx_lock);
+
+	/*
+	 * Build a checkpoint transaction header and write it to the log to
+	 * begin the transaction. We need to account for the space used by the
+	 * transaction header here as it is not accounted for in xlog_write().
+	 *
+	 * The LSN we need to pass to the log items on transaction commit is
+	 * the LSN reported by the first log vector write. If we use the commit
+	 * record lsn then we can move the tail beyond the grant write head.
+	 */
+	tic = ctx->ticket;
+	thdr.th_magic = XFS_TRANS_HEADER_MAGIC;
+	thdr.th_type = XFS_TRANS_CHECKPOINT;
+	thdr.th_tid = tic->t_tid;
+	thdr.th_num_items = num_iovecs;
+	lhdr.i_addr = (xfs_caddr_t)&thdr;
+	lhdr.i_len = sizeof(xfs_trans_header_t);
+	lhdr.i_type = XLOG_REG_TYPE_TRANSHDR;
+	tic->t_curr_res -= lhdr.i_len + sizeof(xlog_op_header_t);
+
+	lvhdr.lv_niovecs = 1;
+	lvhdr.lv_iovecp = &lhdr;
+	lvhdr.lv_next = ctx->lv_chain;
+
+	error = xlog_write(log, &lvhdr, tic, &ctx->start_lsn, NULL, 0);
+	if (error)
+		goto out_abort;
+
+	/*
+	 * now that we've written the checkpoint into the log, strictly
+	 * order the commit records so replay will get them in the right order.
+	 */
+restart:
+	spin_lock(&cil->xc_cil_lock);
+	list_for_each_entry(new_ctx, &cil->xc_committing, committing) {
+		/*
+		 * Higher sequences will wait for this one so skip them.
+		 * Don't wait for own own sequence, either.
+		 */
+		if (new_ctx->sequence >= ctx->sequence)
+			continue;
+		if (!new_ctx->commit_lsn) {
+			/*
+			 * It is still being pushed! Wait for the push to
+			 * complete, then start again from the beginning.
+			 */
+			sv_wait(&cil->xc_commit_wait, 0, &cil->xc_cil_lock, 0);
+			goto restart;
+		}
+	}
+	spin_unlock(&cil->xc_cil_lock);
+
+	commit_lsn = xfs_log_done(log->l_mp, tic, &commit_iclog, 0);
+	if (error || commit_lsn == -1)
+		goto out_abort;
+
+	/* attach all the transactions w/ busy extents to iclog */
+	ctx->log_cb.cb_func = xlog_cil_committed;
+	ctx->log_cb.cb_arg = ctx;
+	error = xfs_log_notify(log->l_mp, commit_iclog, &ctx->log_cb);
+	if (error)
+		goto out_abort;
+
+	/*
+	 * now the checkpoint commit is complete and we've attached the
+	 * callbacks to the iclog we can assign the commit LSN to the context
+	 * and wake up anyone who is waiting for the commit to complete.
+	 */
+	spin_lock(&cil->xc_cil_lock);
+	ctx->commit_lsn = commit_lsn;
+	sv_broadcast(&cil->xc_commit_wait);
+	spin_unlock(&cil->xc_cil_lock);
+
+	/* release the hounds! */
+	return xfs_log_release_iclog(log->l_mp, commit_iclog);
+
+out_skip:
+	up_write(&cil->xc_ctx_lock);
+out_free_ticket:
+	xfs_log_ticket_put(new_ctx->ticket);
+	kmem_free(new_ctx);
+	return 0;
+
+out_abort:
+	xlog_cil_committed(ctx, XFS_LI_ABORTED);
+	return XFS_ERROR(EIO);
+}
+
+/*
+ * Conditionally push the CIL based on the sequence passed in.
+ *
+ * We only need to push if we haven't already pushed the sequence
+ * number given. Hence the only time we will trigger a push here is
+ * if the push sequence is the same as the current context.
+ *
+ * We return the current commit lsn to allow the callers to determine if a
+ * iclog flush is necessary following this call.
+ *
+ * XXX: Initially, just push the CIL unconditionally and return whatever
+ * commit lsn is there. It'll be empty, so this is broken for now.
+ */
+xfs_lsn_t
+xlog_cil_push_lsn(
+	struct log	*log,
+	xfs_lsn_t	push_seq)
+{
+	struct xfs_cil		*cil = log->l_cilp;
+	struct xfs_cil_ctx	*ctx;
+	xfs_lsn_t		commit_lsn = NULLCOMMITLSN;
+
+restart:
+	down_write(&cil->xc_ctx_lock);
+	ASSERT(push_seq <= cil->xc_ctx->sequence);
+
+	/* check to see if we need to force out the current context */
+	if (push_seq == cil->xc_ctx->sequence) {
+		up_write(&cil->xc_ctx_lock);
+		xlog_cil_push(log, 1);
+		goto restart;
+	}
+
+	/*
+	 * See if we can find a previous sequence still committing.
+	 * We can drop the flush lock as soon as we have the cil lock
+	 * because we are now only comparing contexts protected by
+	 * the cil lock.
+	 *
+	 * We need to wait for all previous sequence commits to complete
+	 * before allowing the force of push_seq to go ahead. Hence block
+	 * on commits for those as well.
+	 */
+	spin_lock(&cil->xc_cil_lock);
+	up_write(&cil->xc_ctx_lock);
+	list_for_each_entry(ctx, &cil->xc_committing, committing) {
+		if (ctx->sequence > push_seq)
+			continue;
+		if (!ctx->commit_lsn) {
+			/*
+			 * It is still being pushed! Wait for the push to
+			 * complete, then start again from the beginning.
+			 */
+			sv_wait(&cil->xc_commit_wait, 0, &cil->xc_cil_lock, 0);
+			goto restart;
+		}
+		if (ctx->sequence != push_seq)
+			continue;
+		/* found it! */
+		commit_lsn = ctx->commit_lsn;
+	}
+	spin_unlock(&cil->xc_cil_lock);
+	return commit_lsn;
+}
+
+/*
+ * Check if the current log item was first committed in this sequence.
+ * We can't rely on just the log item being in the CIL, we have to check
+ * the recorded commit sequence number.
+ *
+ * Note: for this to be used in a non-racy manner, it has to be called with
+ * CIL flushing locked out. As a result, it should only be used during the
+ * transaction commit process when deciding what to format into the item.
+ */
+bool
+xfs_log_item_in_current_chkpt(
+	struct xfs_log_item *lip)
+{
+	struct xfs_cil_ctx *ctx;
+
+	if (!(lip->li_mountp->m_flags & XFS_MOUNT_DELAYLOG))
+		return false;
+	if (list_empty(&lip->li_cil))
+		return false;
+
+	ctx = lip->li_mountp->m_log->l_cilp->xc_ctx;
+
+	/*
+	 * li_seq is written on the first commit of a log item to record the
+	 * first checkpoint it is written to. Hence if it is different to the
+	 * current sequence, we're in a new checkpoint.
+	 */
+	if (XFS_LSN_CMP(lip->li_seq, ctx->sequence) != 0)
+		return false;
+	return true;
+}
diff --git a/fs/xfs/xfs_log_priv.h b/fs/xfs/xfs_log_priv.h
index 9cf695154451..8c072618965c 100644
--- a/fs/xfs/xfs_log_priv.h
+++ b/fs/xfs/xfs_log_priv.h
@@ -152,8 +152,6 @@ static inline uint xlog_get_client_id(__be32 i)
 #define	XLOG_RECOVERY_NEEDED	0x4	/* log was recovered */
 #define XLOG_IO_ERROR		0x8	/* log hit an I/O error, and being
 					   shutdown */
-typedef __uint32_t xlog_tid_t;
-
 
 #ifdef __KERNEL__
 /*
@@ -379,6 +377,99 @@ typedef struct xlog_in_core {
 } xlog_in_core_t;
 
 /*
+ * The CIL context is used to aggregate per-transaction details as well be
+ * passed to the iclog for checkpoint post-commit processing.  After being
+ * passed to the iclog, another context needs to be allocated for tracking the
+ * next set of transactions to be aggregated into a checkpoint.
+ */
+struct xfs_cil;
+
+struct xfs_cil_ctx {
+	struct xfs_cil		*cil;
+	xfs_lsn_t		sequence;	/* chkpt sequence # */
+	xfs_lsn_t		start_lsn;	/* first LSN of chkpt commit */
+	xfs_lsn_t		commit_lsn;	/* chkpt commit record lsn */
+	struct xlog_ticket	*ticket;	/* chkpt ticket */
+	int			nvecs;		/* number of regions */
+	int			space_used;	/* aggregate size of regions */
+	struct list_head	busy_extents;	/* busy extents in chkpt */
+	struct xfs_log_vec	*lv_chain;	/* logvecs being pushed */
+	xfs_log_callback_t	log_cb;		/* completion callback hook. */
+	struct list_head	committing;	/* ctx committing list */
+};
+
+/*
+ * Committed Item List structure
+ *
+ * This structure is used to track log items that have been committed but not
+ * yet written into the log. It is used only when the delayed logging mount
+ * option is enabled.
+ *
+ * This structure tracks the list of committing checkpoint contexts so
+ * we can avoid the problem of having to hold out new transactions during a
+ * flush until we have a the commit record LSN of the checkpoint. We can
+ * traverse the list of committing contexts in xlog_cil_push_lsn() to find a
+ * sequence match and extract the commit LSN directly from there. If the
+ * checkpoint is still in the process of committing, we can block waiting for
+ * the commit LSN to be determined as well. This should make synchronous
+ * operations almost as efficient as the old logging methods.
+ */
+struct xfs_cil {
+	struct log		*xc_log;
+	struct list_head	xc_cil;
+	spinlock_t		xc_cil_lock;
+	struct xfs_cil_ctx	*xc_ctx;
+	struct rw_semaphore	xc_ctx_lock;
+	struct list_head	xc_committing;
+	sv_t			xc_commit_wait;
+};
+
+/*
+ * The amount of log space we should the CIL to aggregate is difficult to size.
+ * Whatever we chose we have to make we can get a reservation for the log space
+ * effectively, that it is large enough to capture sufficient relogging to
+ * reduce log buffer IO significantly, but it is not too large for the log or
+ * induces too much latency when writing out through the iclogs. We track both
+ * space consumed and the number of vectors in the checkpoint context, so we
+ * need to decide which to use for limiting.
+ *
+ * Every log buffer we write out during a push needs a header reserved, which
+ * is at least one sector and more for v2 logs. Hence we need a reservation of
+ * at least 512 bytes per 32k of log space just for the LR headers. That means
+ * 16KB of reservation per megabyte of delayed logging space we will consume,
+ * plus various headers.  The number of headers will vary based on the num of
+ * io vectors, so limiting on a specific number of vectors is going to result
+ * in transactions of varying size. IOWs, it is more consistent to track and
+ * limit space consumed in the log rather than by the number of objects being
+ * logged in order to prevent checkpoint ticket overruns.
+ *
+ * Further, use of static reservations through the log grant mechanism is
+ * problematic. It introduces a lot of complexity (e.g. reserve grant vs write
+ * grant) and a significant deadlock potential because regranting write space
+ * can block on log pushes. Hence if we have to regrant log space during a log
+ * push, we can deadlock.
+ *
+ * However, we can avoid this by use of a dynamic "reservation stealing"
+ * technique during transaction commit whereby unused reservation space in the
+ * transaction ticket is transferred to the CIL ctx commit ticket to cover the
+ * space needed by the checkpoint transaction. This means that we never need to
+ * specifically reserve space for the CIL checkpoint transaction, nor do we
+ * need to regrant space once the checkpoint completes. This also means the
+ * checkpoint transaction ticket is specific to the checkpoint context, rather
+ * than the CIL itself.
+ *
+ * With dynamic reservations, we can basically make up arbitrary limits for the
+ * checkpoint size so long as they don't violate any other size rules.  Hence
+ * the initial maximum size for the checkpoint transaction will be set to a
+ * quarter of the log or 8MB, which ever is smaller. 8MB is an arbitrary limit
+ * right now based on the latency of writing out a large amount of data through
+ * the circular iclog buffers.
+ */
+
+#define XLOG_CIL_SPACE_LIMIT(log)	\
+	(min((log->l_logsize >> 2), (8 * 1024 * 1024)))
+
+/*
  * The reservation head lsn is not made up of a cycle number and block number.
  * Instead, it uses a cycle number and byte number.  Logs don't expect to
  * overflow 31 bits worth of byte offset, so using a byte number will mean
@@ -388,6 +479,7 @@ typedef struct log {
 	/* The following fields don't need locking */
 	struct xfs_mount	*l_mp;	        /* mount point */
 	struct xfs_ail		*l_ailp;	/* AIL log is working with */
+	struct xfs_cil		*l_cilp;	/* CIL log is working with */
 	struct xfs_buf		*l_xbuf;        /* extra buffer for log
 						 * wrapping */
 	struct xfs_buftarg	*l_targ;        /* buftarg of log */
@@ -438,14 +530,17 @@ typedef struct log {
 
 #define XLOG_FORCED_SHUTDOWN(log)	((log)->l_flags & XLOG_IO_ERROR)
 
-
 /* common routines */
 extern xfs_lsn_t xlog_assign_tail_lsn(struct xfs_mount *mp);
 extern int	 xlog_recover(xlog_t *log);
 extern int	 xlog_recover_finish(xlog_t *log);
 extern void	 xlog_pack_data(xlog_t *log, xlog_in_core_t *iclog, int);
 
-extern kmem_zone_t	*xfs_log_ticket_zone;
+extern kmem_zone_t *xfs_log_ticket_zone;
+struct xlog_ticket *xlog_ticket_alloc(struct log *log, int unit_bytes,
+				int count, char client, uint xflags,
+				int alloc_flags);
+
 
 static inline void
 xlog_write_adv_cnt(void **ptr, int *len, int *off, size_t bytes)
@@ -455,6 +550,21 @@ xlog_write_adv_cnt(void **ptr, int *len, int *off, size_t bytes)
 	*off += bytes;
 }
 
+void	xlog_print_tic_res(struct xfs_mount *mp, struct xlog_ticket *ticket);
+int	xlog_write(struct log *log, struct xfs_log_vec *log_vector,
+				struct xlog_ticket *tic, xfs_lsn_t *start_lsn,
+				xlog_in_core_t **commit_iclog, uint flags);
+
+/*
+ * Committed Item List interfaces
+ */
+int	xlog_cil_init(struct log *log);
+void	xlog_cil_init_post_recovery(struct log *log);
+void	xlog_cil_destroy(struct log *log);
+
+int	xlog_cil_push(struct log *log, int push_now);
+xfs_lsn_t xlog_cil_push_lsn(struct log *log, xfs_lsn_t push_sequence);
+
 /*
  * Unmount record type is used as a pseudo transaction type for the ticket.
  * It's value must be outside the range of XFS_TRANS_* values.
diff --git a/fs/xfs/xfs_log_recover.c b/fs/xfs/xfs_log_recover.c
index 0de08e366315..14a69aec2c0b 100644
--- a/fs/xfs/xfs_log_recover.c
+++ b/fs/xfs/xfs_log_recover.c
@@ -1576,7 +1576,7 @@ xlog_recover_reorder_trans(
 
 		switch (ITEM_TYPE(item)) {
 		case XFS_LI_BUF:
-			if (!(buf_f->blf_flags & XFS_BLI_CANCEL)) {
+			if (!(buf_f->blf_flags & XFS_BLF_CANCEL)) {
 				trace_xfs_log_recover_item_reorder_head(log,
 							trans, item, pass);
 				list_move(&item->ri_list, &trans->r_itemq);
@@ -1638,7 +1638,7 @@ xlog_recover_do_buffer_pass1(
 	/*
 	 * If this isn't a cancel buffer item, then just return.
 	 */
-	if (!(flags & XFS_BLI_CANCEL)) {
+	if (!(flags & XFS_BLF_CANCEL)) {
 		trace_xfs_log_recover_buf_not_cancel(log, buf_f);
 		return;
 	}
@@ -1696,7 +1696,7 @@ xlog_recover_do_buffer_pass1(
  * Check to see whether the buffer being recovered has a corresponding
  * entry in the buffer cancel record table.  If it does then return 1
  * so that it will be cancelled, otherwise return 0.  If the buffer is
- * actually a buffer cancel item (XFS_BLI_CANCEL is set), then decrement
+ * actually a buffer cancel item (XFS_BLF_CANCEL is set), then decrement
  * the refcount on the entry in the table and remove it from the table
  * if this is the last reference.
  *
@@ -1721,7 +1721,7 @@ xlog_check_buffer_cancelled(
 		 * There is nothing in the table built in pass one,
 		 * so this buffer must not be cancelled.
 		 */
-		ASSERT(!(flags & XFS_BLI_CANCEL));
+		ASSERT(!(flags & XFS_BLF_CANCEL));
 		return 0;
 	}
 
@@ -1733,7 +1733,7 @@ xlog_check_buffer_cancelled(
 		 * There is no corresponding entry in the table built
 		 * in pass one, so this buffer has not been cancelled.
 		 */
-		ASSERT(!(flags & XFS_BLI_CANCEL));
+		ASSERT(!(flags & XFS_BLF_CANCEL));
 		return 0;
 	}
 
@@ -1752,7 +1752,7 @@ xlog_check_buffer_cancelled(
 			 * one in the table and remove it if this is the
 			 * last reference.
 			 */
-			if (flags & XFS_BLI_CANCEL) {
+			if (flags & XFS_BLF_CANCEL) {
 				bcp->bc_refcount--;
 				if (bcp->bc_refcount == 0) {
 					if (prevp == NULL) {
@@ -1772,7 +1772,7 @@ xlog_check_buffer_cancelled(
 	 * We didn't find a corresponding entry in the table, so
 	 * return 0 so that the buffer is NOT cancelled.
 	 */
-	ASSERT(!(flags & XFS_BLI_CANCEL));
+	ASSERT(!(flags & XFS_BLF_CANCEL));
 	return 0;
 }
 
@@ -1874,8 +1874,8 @@ xlog_recover_do_inode_buffer(
 			nbits = xfs_contig_bits(data_map, map_size,
 							 bit);
 			ASSERT(nbits > 0);
-			reg_buf_offset = bit << XFS_BLI_SHIFT;
-			reg_buf_bytes = nbits << XFS_BLI_SHIFT;
+			reg_buf_offset = bit << XFS_BLF_SHIFT;
+			reg_buf_bytes = nbits << XFS_BLF_SHIFT;
 			item_index++;
 		}
 
@@ -1889,7 +1889,7 @@ xlog_recover_do_inode_buffer(
 		}
 
 		ASSERT(item->ri_buf[item_index].i_addr != NULL);
-		ASSERT((item->ri_buf[item_index].i_len % XFS_BLI_CHUNK) == 0);
+		ASSERT((item->ri_buf[item_index].i_len % XFS_BLF_CHUNK) == 0);
 		ASSERT((reg_buf_offset + reg_buf_bytes) <= XFS_BUF_COUNT(bp));
 
 		/*
@@ -1955,9 +1955,9 @@ xlog_recover_do_reg_buffer(
 		nbits = xfs_contig_bits(data_map, map_size, bit);
 		ASSERT(nbits > 0);
 		ASSERT(item->ri_buf[i].i_addr != NULL);
-		ASSERT(item->ri_buf[i].i_len % XFS_BLI_CHUNK == 0);
+		ASSERT(item->ri_buf[i].i_len % XFS_BLF_CHUNK == 0);
 		ASSERT(XFS_BUF_COUNT(bp) >=
-		       ((uint)bit << XFS_BLI_SHIFT)+(nbits<<XFS_BLI_SHIFT));
+		       ((uint)bit << XFS_BLF_SHIFT)+(nbits<<XFS_BLF_SHIFT));
 
 		/*
 		 * Do a sanity check if this is a dquot buffer. Just checking
@@ -1966,7 +1966,7 @@ xlog_recover_do_reg_buffer(
 		 */
 		error = 0;
 		if (buf_f->blf_flags &
-		   (XFS_BLI_UDQUOT_BUF|XFS_BLI_PDQUOT_BUF|XFS_BLI_GDQUOT_BUF)) {
+		   (XFS_BLF_UDQUOT_BUF|XFS_BLF_PDQUOT_BUF|XFS_BLF_GDQUOT_BUF)) {
 			if (item->ri_buf[i].i_addr == NULL) {
 				cmn_err(CE_ALERT,
 					"XFS: NULL dquot in %s.", __func__);
@@ -1987,9 +1987,9 @@ xlog_recover_do_reg_buffer(
 		}
 
 		memcpy(xfs_buf_offset(bp,
-			(uint)bit << XFS_BLI_SHIFT),	/* dest */
+			(uint)bit << XFS_BLF_SHIFT),	/* dest */
 			item->ri_buf[i].i_addr,		/* source */
-			nbits<<XFS_BLI_SHIFT);		/* length */
+			nbits<<XFS_BLF_SHIFT);		/* length */
  next:
 		i++;
 		bit += nbits;
@@ -2148,11 +2148,11 @@ xlog_recover_do_dquot_buffer(
 	}
 
 	type = 0;
-	if (buf_f->blf_flags & XFS_BLI_UDQUOT_BUF)
+	if (buf_f->blf_flags & XFS_BLF_UDQUOT_BUF)
 		type |= XFS_DQ_USER;
-	if (buf_f->blf_flags & XFS_BLI_PDQUOT_BUF)
+	if (buf_f->blf_flags & XFS_BLF_PDQUOT_BUF)
 		type |= XFS_DQ_PROJ;
-	if (buf_f->blf_flags & XFS_BLI_GDQUOT_BUF)
+	if (buf_f->blf_flags & XFS_BLF_GDQUOT_BUF)
 		type |= XFS_DQ_GROUP;
 	/*
 	 * This type of quotas was turned off, so ignore this buffer
@@ -2173,7 +2173,7 @@ xlog_recover_do_dquot_buffer(
  * here which overlaps that may be stale.
  *
  * When meta-data buffers are freed at run time we log a buffer item
- * with the XFS_BLI_CANCEL bit set to indicate that previous copies
+ * with the XFS_BLF_CANCEL bit set to indicate that previous copies
  * of the buffer in the log should not be replayed at recovery time.
  * This is so that if the blocks covered by the buffer are reused for
  * file data before we crash we don't end up replaying old, freed
@@ -2207,7 +2207,7 @@ xlog_recover_do_buffer_trans(
 	if (pass == XLOG_RECOVER_PASS1) {
 		/*
 		 * In this pass we're only looking for buf items
-		 * with the XFS_BLI_CANCEL bit set.
+		 * with the XFS_BLF_CANCEL bit set.
 		 */
 		xlog_recover_do_buffer_pass1(log, buf_f);
 		return 0;
@@ -2244,7 +2244,7 @@ xlog_recover_do_buffer_trans(
 
 	mp = log->l_mp;
 	buf_flags = XBF_LOCK;
-	if (!(flags & XFS_BLI_INODE_BUF))
+	if (!(flags & XFS_BLF_INODE_BUF))
 		buf_flags |= XBF_MAPPED;
 
 	bp = xfs_buf_read(mp->m_ddev_targp, blkno, len, buf_flags);
@@ -2257,10 +2257,10 @@ xlog_recover_do_buffer_trans(
 	}
 
 	error = 0;
-	if (flags & XFS_BLI_INODE_BUF) {
+	if (flags & XFS_BLF_INODE_BUF) {
 		error = xlog_recover_do_inode_buffer(mp, item, bp, buf_f);
 	} else if (flags &
-		  (XFS_BLI_UDQUOT_BUF|XFS_BLI_PDQUOT_BUF|XFS_BLI_GDQUOT_BUF)) {
+		  (XFS_BLF_UDQUOT_BUF|XFS_BLF_PDQUOT_BUF|XFS_BLF_GDQUOT_BUF)) {
 		xlog_recover_do_dquot_buffer(mp, log, item, bp, buf_f);
 	} else {
 		xlog_recover_do_reg_buffer(mp, item, bp, buf_f);
diff --git a/fs/xfs/xfs_log_recover.h b/fs/xfs/xfs_log_recover.h
index 75d749207258..1c55ccbb379d 100644
--- a/fs/xfs/xfs_log_recover.h
+++ b/fs/xfs/xfs_log_recover.h
@@ -28,7 +28,7 @@
 #define XLOG_RHASH(tid)	\
 	((((__uint32_t)tid)>>XLOG_RHASH_SHIFT) & (XLOG_RHASH_SIZE-1))
 
-#define XLOG_MAX_REGIONS_IN_ITEM   (XFS_MAX_BLOCKSIZE / XFS_BLI_CHUNK / 2 + 1)
+#define XLOG_MAX_REGIONS_IN_ITEM   (XFS_MAX_BLOCKSIZE / XFS_BLF_CHUNK / 2 + 1)
 
 
 /*
diff --git a/fs/xfs/xfs_mount.h b/fs/xfs/xfs_mount.h
index 9ff48a16a7ee..1d2c7eed4eda 100644
--- a/fs/xfs/xfs_mount.h
+++ b/fs/xfs/xfs_mount.h
@@ -268,6 +268,7 @@ typedef struct xfs_mount {
 #define XFS_MOUNT_WSYNC		(1ULL << 0)	/* for nfs - all metadata ops
 						   must be synchronous except
 						   for space allocations */
+#define XFS_MOUNT_DELAYLOG	(1ULL << 1)	/* delayed logging is enabled */
 #define XFS_MOUNT_DMAPI		(1ULL << 2)	/* dmapi is enabled */
 #define XFS_MOUNT_WAS_CLEAN	(1ULL << 3)
 #define XFS_MOUNT_FS_SHUTDOWN	(1ULL << 4)	/* atomic stop of all filesystem
diff --git a/fs/xfs/xfs_trans.c b/fs/xfs/xfs_trans.c
index be578ecb4af2..ce558efa2ea0 100644
--- a/fs/xfs/xfs_trans.c
+++ b/fs/xfs/xfs_trans.c
@@ -44,6 +44,7 @@
 #include "xfs_trans_priv.h"
 #include "xfs_trans_space.h"
 #include "xfs_inode_item.h"
+#include "xfs_trace.h"
 
 kmem_zone_t	*xfs_trans_zone;
 
@@ -243,9 +244,8 @@ _xfs_trans_alloc(
 	tp->t_type = type;
 	tp->t_mountp = mp;
 	tp->t_items_free = XFS_LIC_NUM_SLOTS;
-	tp->t_busy_free = XFS_LBC_NUM_SLOTS;
 	xfs_lic_init(&(tp->t_items));
-	XFS_LBC_INIT(&(tp->t_busy));
+	INIT_LIST_HEAD(&tp->t_busy);
 	return tp;
 }
 
@@ -255,8 +255,13 @@ _xfs_trans_alloc(
  */
 STATIC void
 xfs_trans_free(
-	xfs_trans_t	*tp)
+	struct xfs_trans	*tp)
 {
+	struct xfs_busy_extent	*busyp, *n;
+
+	list_for_each_entry_safe(busyp, n, &tp->t_busy, list)
+		xfs_alloc_busy_clear(tp->t_mountp, busyp);
+
 	atomic_dec(&tp->t_mountp->m_active_trans);
 	xfs_trans_free_dqinfo(tp);
 	kmem_zone_free(xfs_trans_zone, tp);
@@ -285,9 +290,8 @@ xfs_trans_dup(
 	ntp->t_type = tp->t_type;
 	ntp->t_mountp = tp->t_mountp;
 	ntp->t_items_free = XFS_LIC_NUM_SLOTS;
-	ntp->t_busy_free = XFS_LBC_NUM_SLOTS;
 	xfs_lic_init(&(ntp->t_items));
-	XFS_LBC_INIT(&(ntp->t_busy));
+	INIT_LIST_HEAD(&ntp->t_busy);
 
 	ASSERT(tp->t_flags & XFS_TRANS_PERM_LOG_RES);
 	ASSERT(tp->t_ticket != NULL);
@@ -423,7 +427,6 @@ undo_blocks:
 	return error;
 }
 
-
 /*
  * Record the indicated change to the given field for application
  * to the file system's superblock when the transaction commits.
@@ -652,7 +655,7 @@ xfs_trans_apply_sb_deltas(
  * XFS_TRANS_SB_DIRTY will not be set when the transaction is updated but we
  * still need to update the incore superblock with the changes.
  */
-STATIC void
+void
 xfs_trans_unreserve_and_mod_sb(
 	xfs_trans_t	*tp)
 {
@@ -880,7 +883,7 @@ xfs_trans_fill_vecs(
  * they could be immediately flushed and we'd have to race with the flusher
  * trying to pull the item from the AIL as we add it.
  */
-static void
+void
 xfs_trans_item_committed(
 	struct xfs_log_item	*lip,
 	xfs_lsn_t		commit_lsn,
@@ -930,26 +933,6 @@ xfs_trans_item_committed(
 	IOP_UNPIN(lip);
 }
 
-/* Clear all the per-AG busy list items listed in this transaction */
-static void
-xfs_trans_clear_busy_extents(
-	struct xfs_trans	*tp)
-{
-	xfs_log_busy_chunk_t	*lbcp;
-	xfs_log_busy_slot_t	*lbsp;
-	int			i;
-
-	for (lbcp = &tp->t_busy; lbcp != NULL; lbcp = lbcp->lbc_next) {
-		i = 0;
-		for (lbsp = lbcp->lbc_busy; i < lbcp->lbc_unused; i++, lbsp++) {
-			if (XFS_LBC_ISFREE(lbcp, i))
-				continue;
-			xfs_alloc_clear_busy(tp, lbsp->lbc_ag, lbsp->lbc_idx);
-		}
-	}
-	xfs_trans_free_busy(tp);
-}
-
 /*
  * This is typically called by the LM when a transaction has been fully
  * committed to disk.  It needs to unpin the items which have
@@ -984,7 +967,6 @@ xfs_trans_committed(
 		kmem_free(licp);
 	}
 
-	xfs_trans_clear_busy_extents(tp);
 	xfs_trans_free(tp);
 }
 
@@ -1012,8 +994,7 @@ xfs_trans_uncommit(
 	xfs_trans_unreserve_and_mod_sb(tp);
 	xfs_trans_unreserve_and_mod_dquots(tp);
 
-	xfs_trans_free_items(tp, flags);
-	xfs_trans_free_busy(tp);
+	xfs_trans_free_items(tp, NULLCOMMITLSN, flags);
 	xfs_trans_free(tp);
 }
 
@@ -1075,6 +1056,8 @@ xfs_trans_commit_iclog(
 	*commit_lsn = xfs_log_done(mp, tp->t_ticket, &commit_iclog, log_flags);
 
 	tp->t_commit_lsn = *commit_lsn;
+	trace_xfs_trans_commit_lsn(tp);
+
 	if (nvec > XFS_TRANS_LOGVEC_COUNT)
 		kmem_free(log_vector);
 
@@ -1161,6 +1144,93 @@ xfs_trans_commit_iclog(
 	return xfs_log_release_iclog(mp, commit_iclog);
 }
 
+/*
+ * Walk the log items and allocate log vector structures for
+ * each item large enough to fit all the vectors they require.
+ * Note that this format differs from the old log vector format in
+ * that there is no transaction header in these log vectors.
+ */
+STATIC struct xfs_log_vec *
+xfs_trans_alloc_log_vecs(
+	xfs_trans_t	*tp)
+{
+	xfs_log_item_desc_t	*lidp;
+	struct xfs_log_vec	*lv = NULL;
+	struct xfs_log_vec	*ret_lv = NULL;
+
+	lidp = xfs_trans_first_item(tp);
+
+	/* Bail out if we didn't find a log item.  */
+	if (!lidp) {
+		ASSERT(0);
+		return NULL;
+	}
+
+	while (lidp != NULL) {
+		struct xfs_log_vec *new_lv;
+
+		/* Skip items which aren't dirty in this transaction. */
+		if (!(lidp->lid_flags & XFS_LID_DIRTY)) {
+			lidp = xfs_trans_next_item(tp, lidp);
+			continue;
+		}
+
+		/* Skip items that do not have any vectors for writing */
+		lidp->lid_size = IOP_SIZE(lidp->lid_item);
+		if (!lidp->lid_size) {
+			lidp = xfs_trans_next_item(tp, lidp);
+			continue;
+		}
+
+		new_lv = kmem_zalloc(sizeof(*new_lv) +
+				lidp->lid_size * sizeof(struct xfs_log_iovec),
+				KM_SLEEP);
+
+		/* The allocated iovec region lies beyond the log vector. */
+		new_lv->lv_iovecp = (struct xfs_log_iovec *)&new_lv[1];
+		new_lv->lv_niovecs = lidp->lid_size;
+		new_lv->lv_item = lidp->lid_item;
+		if (!ret_lv)
+			ret_lv = new_lv;
+		else
+			lv->lv_next = new_lv;
+		lv = new_lv;
+		lidp = xfs_trans_next_item(tp, lidp);
+	}
+
+	return ret_lv;
+}
+
+static int
+xfs_trans_commit_cil(
+	struct xfs_mount	*mp,
+	struct xfs_trans	*tp,
+	xfs_lsn_t		*commit_lsn,
+	int			flags)
+{
+	struct xfs_log_vec	*log_vector;
+	int			error;
+
+	/*
+	 * Get each log item to allocate a vector structure for
+	 * the log item to to pass to the log write code. The
+	 * CIL commit code will format the vector and save it away.
+	 */
+	log_vector = xfs_trans_alloc_log_vecs(tp);
+	if (!log_vector)
+		return ENOMEM;
+
+	error = xfs_log_commit_cil(mp, tp, log_vector, commit_lsn, flags);
+	if (error)
+		return error;
+
+	current_restore_flags_nested(&tp->t_pflags, PF_FSTRANS);
+
+	/* xfs_trans_free_items() unlocks them first */
+	xfs_trans_free_items(tp, *commit_lsn, 0);
+	xfs_trans_free(tp);
+	return 0;
+}
 
 /*
  * xfs_trans_commit
@@ -1221,7 +1291,11 @@ _xfs_trans_commit(
 		xfs_trans_apply_sb_deltas(tp);
 	xfs_trans_apply_dquot_deltas(tp);
 
-	error = xfs_trans_commit_iclog(mp, tp, &commit_lsn, flags);
+	if (mp->m_flags & XFS_MOUNT_DELAYLOG)
+		error = xfs_trans_commit_cil(mp, tp, &commit_lsn, flags);
+	else
+		error = xfs_trans_commit_iclog(mp, tp, &commit_lsn, flags);
+
 	if (error == ENOMEM) {
 		xfs_force_shutdown(mp, SHUTDOWN_LOG_IO_ERROR);
 		error = XFS_ERROR(EIO);
@@ -1259,8 +1333,7 @@ out_unreserve:
 			error = XFS_ERROR(EIO);
 	}
 	current_restore_flags_nested(&tp->t_pflags, PF_FSTRANS);
-	xfs_trans_free_items(tp, error ? XFS_TRANS_ABORT : 0);
-	xfs_trans_free_busy(tp);
+	xfs_trans_free_items(tp, NULLCOMMITLSN, error ? XFS_TRANS_ABORT : 0);
 	xfs_trans_free(tp);
 
 	XFS_STATS_INC(xs_trans_empty);
@@ -1338,8 +1411,7 @@ xfs_trans_cancel(
 	/* mark this thread as no longer being in a transaction */
 	current_restore_flags_nested(&tp->t_pflags, PF_FSTRANS);
 
-	xfs_trans_free_items(tp, flags);
-	xfs_trans_free_busy(tp);
+	xfs_trans_free_items(tp, NULLCOMMITLSN, flags);
 	xfs_trans_free(tp);
 }
 
diff --git a/fs/xfs/xfs_trans.h b/fs/xfs/xfs_trans.h
index c62beee0921e..8c69e7824f68 100644
--- a/fs/xfs/xfs_trans.h
+++ b/fs/xfs/xfs_trans.h
@@ -106,7 +106,8 @@ typedef struct xfs_trans_header {
 #define	XFS_TRANS_GROWFSRT_FREE		39
 #define	XFS_TRANS_SWAPEXT		40
 #define	XFS_TRANS_SB_COUNT		41
-#define	XFS_TRANS_TYPE_MAX		41
+#define	XFS_TRANS_CHECKPOINT		42
+#define	XFS_TRANS_TYPE_MAX		42
 /* new transaction types need to be reflected in xfs_logprint(8) */
 
 #define XFS_TRANS_TYPES \
@@ -148,6 +149,7 @@ typedef struct xfs_trans_header {
 	{ XFS_TRANS_GROWFSRT_FREE,	"GROWFSRT_FREE" }, \
 	{ XFS_TRANS_SWAPEXT,		"SWAPEXT" }, \
 	{ XFS_TRANS_SB_COUNT,		"SB_COUNT" }, \
+	{ XFS_TRANS_CHECKPOINT,		"CHECKPOINT" }, \
 	{ XFS_TRANS_DUMMY1,		"DUMMY1" }, \
 	{ XFS_TRANS_DUMMY2,		"DUMMY2" }, \
 	{ XLOG_UNMOUNT_REC_TYPE,	"UNMOUNT" }
@@ -813,6 +815,7 @@ struct xfs_log_item_desc;
 struct xfs_mount;
 struct xfs_trans;
 struct xfs_dquot_acct;
+struct xfs_busy_extent;
 
 typedef struct xfs_log_item {
 	struct list_head		li_ail;		/* AIL pointers */
@@ -828,6 +831,11 @@ typedef struct xfs_log_item {
 							/* buffer item iodone */
 							/* callback func */
 	struct xfs_item_ops		*li_ops;	/* function list */
+
+	/* delayed logging */
+	struct list_head		li_cil;		/* CIL pointers */
+	struct xfs_log_vec		*li_lv;		/* active log vector */
+	xfs_lsn_t			li_seq;		/* CIL commit seq */
 } xfs_log_item_t;
 
 #define	XFS_LI_IN_AIL	0x1
@@ -872,34 +880,6 @@ typedef struct xfs_item_ops {
 #define XFS_ITEM_PUSHBUF	3
 
 /*
- * This structure is used to maintain a list of block ranges that have been
- * freed in the transaction.  The ranges are listed in the perag[] busy list
- * between when they're freed and the transaction is committed to disk.
- */
-
-typedef struct xfs_log_busy_slot {
-	xfs_agnumber_t		lbc_ag;
-	ushort			lbc_idx;	/* index in perag.busy[] */
-} xfs_log_busy_slot_t;
-
-#define XFS_LBC_NUM_SLOTS	31
-typedef struct xfs_log_busy_chunk {
-	struct xfs_log_busy_chunk	*lbc_next;
-	uint				lbc_free;	/* free slots bitmask */
-	ushort				lbc_unused;	/* first unused */
-	xfs_log_busy_slot_t		lbc_busy[XFS_LBC_NUM_SLOTS];
-} xfs_log_busy_chunk_t;
-
-#define	XFS_LBC_MAX_SLOT	(XFS_LBC_NUM_SLOTS - 1)
-#define	XFS_LBC_FREEMASK	((1U << XFS_LBC_NUM_SLOTS) - 1)
-
-#define	XFS_LBC_INIT(cp)	((cp)->lbc_free = XFS_LBC_FREEMASK)
-#define	XFS_LBC_CLAIM(cp, slot)	((cp)->lbc_free &= ~(1 << (slot)))
-#define	XFS_LBC_SLOT(cp, slot)	(&((cp)->lbc_busy[(slot)]))
-#define	XFS_LBC_VACANCY(cp)	(((cp)->lbc_free) & XFS_LBC_FREEMASK)
-#define	XFS_LBC_ISFREE(cp, slot) ((cp)->lbc_free & (1 << (slot)))
-
-/*
  * This is the type of function which can be given to xfs_trans_callback()
  * to be called upon the transaction's commit to disk.
  */
@@ -950,8 +930,7 @@ typedef struct xfs_trans {
 	unsigned int		t_items_free;	/* log item descs free */
 	xfs_log_item_chunk_t	t_items;	/* first log item desc chunk */
 	xfs_trans_header_t	t_header;	/* header for in-log trans */
-	unsigned int		t_busy_free;	/* busy descs free */
-	xfs_log_busy_chunk_t	t_busy;		/* busy/async free blocks */
+	struct list_head	t_busy;		/* list of busy extents */
 	unsigned long		t_pflags;	/* saved process flags state */
 } xfs_trans_t;
 
@@ -1025,9 +1004,6 @@ int		_xfs_trans_commit(xfs_trans_t *,
 void		xfs_trans_cancel(xfs_trans_t *, int);
 int		xfs_trans_ail_init(struct xfs_mount *);
 void		xfs_trans_ail_destroy(struct xfs_mount *);
-xfs_log_busy_slot_t *xfs_trans_add_busy(xfs_trans_t *tp,
-					xfs_agnumber_t ag,
-					xfs_extlen_t idx);
 
 extern kmem_zone_t	*xfs_trans_zone;
 
diff --git a/fs/xfs/xfs_trans_buf.c b/fs/xfs/xfs_trans_buf.c
index 9cd809025f3a..63d81a22f4fd 100644
--- a/fs/xfs/xfs_trans_buf.c
+++ b/fs/xfs/xfs_trans_buf.c
@@ -114,7 +114,7 @@ _xfs_trans_bjoin(
 	xfs_buf_item_init(bp, tp->t_mountp);
 	bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
 	ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
-	ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
+	ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
 	ASSERT(!(bip->bli_flags & XFS_BLI_LOGGED));
 	if (reset_recur)
 		bip->bli_recur = 0;
@@ -511,7 +511,7 @@ xfs_trans_brelse(xfs_trans_t	*tp,
 	bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
 	ASSERT(bip->bli_item.li_type == XFS_LI_BUF);
 	ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
-	ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
+	ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
 	ASSERT(atomic_read(&bip->bli_refcount) > 0);
 
 	/*
@@ -619,7 +619,7 @@ xfs_trans_bhold(xfs_trans_t	*tp,
 
 	bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
 	ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
-	ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
+	ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
 	ASSERT(atomic_read(&bip->bli_refcount) > 0);
 	bip->bli_flags |= XFS_BLI_HOLD;
 	trace_xfs_trans_bhold(bip);
@@ -641,7 +641,7 @@ xfs_trans_bhold_release(xfs_trans_t	*tp,
 
 	bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
 	ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
-	ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
+	ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
 	ASSERT(atomic_read(&bip->bli_refcount) > 0);
 	ASSERT(bip->bli_flags & XFS_BLI_HOLD);
 	bip->bli_flags &= ~XFS_BLI_HOLD;
@@ -704,7 +704,7 @@ xfs_trans_log_buf(xfs_trans_t	*tp,
 		bip->bli_flags &= ~XFS_BLI_STALE;
 		ASSERT(XFS_BUF_ISSTALE(bp));
 		XFS_BUF_UNSTALE(bp);
-		bip->bli_format.blf_flags &= ~XFS_BLI_CANCEL;
+		bip->bli_format.blf_flags &= ~XFS_BLF_CANCEL;
 	}
 
 	lidp = xfs_trans_find_item(tp, (xfs_log_item_t*)bip);
@@ -762,8 +762,8 @@ xfs_trans_binval(
 		ASSERT(!(XFS_BUF_ISDELAYWRITE(bp)));
 		ASSERT(XFS_BUF_ISSTALE(bp));
 		ASSERT(!(bip->bli_flags & (XFS_BLI_LOGGED | XFS_BLI_DIRTY)));
-		ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_INODE_BUF));
-		ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
+		ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_INODE_BUF));
+		ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
 		ASSERT(lidp->lid_flags & XFS_LID_DIRTY);
 		ASSERT(tp->t_flags & XFS_TRANS_DIRTY);
 		return;
@@ -774,7 +774,7 @@ xfs_trans_binval(
 	 * in the buf log item.  The STALE flag will be used in
 	 * xfs_buf_item_unpin() to determine if it should clean up
 	 * when the last reference to the buf item is given up.
-	 * We set the XFS_BLI_CANCEL flag in the buf log format structure
+	 * We set the XFS_BLF_CANCEL flag in the buf log format structure
 	 * and log the buf item.  This will be used at recovery time
 	 * to determine that copies of the buffer in the log before
 	 * this should not be replayed.
@@ -792,9 +792,9 @@ xfs_trans_binval(
 	XFS_BUF_UNDELAYWRITE(bp);
 	XFS_BUF_STALE(bp);
 	bip->bli_flags |= XFS_BLI_STALE;
-	bip->bli_flags &= ~(XFS_BLI_LOGGED | XFS_BLI_DIRTY);
-	bip->bli_format.blf_flags &= ~XFS_BLI_INODE_BUF;
-	bip->bli_format.blf_flags |= XFS_BLI_CANCEL;
+	bip->bli_flags &= ~(XFS_BLI_INODE_BUF | XFS_BLI_LOGGED | XFS_BLI_DIRTY);
+	bip->bli_format.blf_flags &= ~XFS_BLF_INODE_BUF;
+	bip->bli_format.blf_flags |= XFS_BLF_CANCEL;
 	memset((char *)(bip->bli_format.blf_data_map), 0,
 	      (bip->bli_format.blf_map_size * sizeof(uint)));
 	lidp->lid_flags |= XFS_LID_DIRTY;
@@ -802,16 +802,16 @@ xfs_trans_binval(
 }
 
 /*
- * This call is used to indicate that the buffer contains on-disk
- * inodes which must be handled specially during recovery.  They
- * require special handling because only the di_next_unlinked from
- * the inodes in the buffer should be recovered.  The rest of the
- * data in the buffer is logged via the inodes themselves.
+ * This call is used to indicate that the buffer contains on-disk inodes which
+ * must be handled specially during recovery.  They require special handling
+ * because only the di_next_unlinked from the inodes in the buffer should be
+ * recovered.  The rest of the data in the buffer is logged via the inodes
+ * themselves.
  *
- * All we do is set the XFS_BLI_INODE_BUF flag in the buffer's log
- * format structure so that we'll know what to do at recovery time.
+ * All we do is set the XFS_BLI_INODE_BUF flag in the items flags so it can be
+ * transferred to the buffer's log format structure so that we'll know what to
+ * do at recovery time.
  */
-/* ARGSUSED */
 void
 xfs_trans_inode_buf(
 	xfs_trans_t	*tp,
@@ -826,7 +826,7 @@ xfs_trans_inode_buf(
 	bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
 	ASSERT(atomic_read(&bip->bli_refcount) > 0);
 
-	bip->bli_format.blf_flags |= XFS_BLI_INODE_BUF;
+	bip->bli_flags |= XFS_BLI_INODE_BUF;
 }
 
 /*
@@ -908,9 +908,9 @@ xfs_trans_dquot_buf(
 	ASSERT(XFS_BUF_ISBUSY(bp));
 	ASSERT(XFS_BUF_FSPRIVATE2(bp, xfs_trans_t *) == tp);
 	ASSERT(XFS_BUF_FSPRIVATE(bp, void *) != NULL);
-	ASSERT(type == XFS_BLI_UDQUOT_BUF ||
-	       type == XFS_BLI_PDQUOT_BUF ||
-	       type == XFS_BLI_GDQUOT_BUF);
+	ASSERT(type == XFS_BLF_UDQUOT_BUF ||
+	       type == XFS_BLF_PDQUOT_BUF ||
+	       type == XFS_BLF_GDQUOT_BUF);
 
 	bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
 	ASSERT(atomic_read(&bip->bli_refcount) > 0);
diff --git a/fs/xfs/xfs_trans_item.c b/fs/xfs/xfs_trans_item.c
index eb3fc57f9eef..f11d37d06dcc 100644
--- a/fs/xfs/xfs_trans_item.c
+++ b/fs/xfs/xfs_trans_item.c
@@ -299,6 +299,7 @@ xfs_trans_next_item(xfs_trans_t *tp, xfs_log_item_desc_t *lidp)
 void
 xfs_trans_free_items(
 	xfs_trans_t	*tp,
+	xfs_lsn_t	commit_lsn,
 	int		flags)
 {
 	xfs_log_item_chunk_t	*licp;
@@ -311,7 +312,7 @@ xfs_trans_free_items(
 	 * Special case the embedded chunk so we don't free it below.
 	 */
 	if (!xfs_lic_are_all_free(licp)) {
-		(void) xfs_trans_unlock_chunk(licp, 1, abort, NULLCOMMITLSN);
+		(void) xfs_trans_unlock_chunk(licp, 1, abort, commit_lsn);
 		xfs_lic_all_free(licp);
 		licp->lic_unused = 0;
 	}
@@ -322,7 +323,7 @@ xfs_trans_free_items(
 	 */
 	while (licp != NULL) {
 		ASSERT(!xfs_lic_are_all_free(licp));
-		(void) xfs_trans_unlock_chunk(licp, 1, abort, NULLCOMMITLSN);
+		(void) xfs_trans_unlock_chunk(licp, 1, abort, commit_lsn);
 		next_licp = licp->lic_next;
 		kmem_free(licp);
 		licp = next_licp;
@@ -438,112 +439,3 @@ xfs_trans_unlock_chunk(
 
 	return freed;
 }
-
-
-/*
- * This is called to add the given busy item to the transaction's
- * list of busy items.  It must find a free busy item descriptor
- * or allocate a new one and add the item to that descriptor.
- * The function returns a pointer to busy descriptor used to point
- * to the new busy entry.  The log busy entry will now point to its new
- * descriptor with its ???? field.
- */
-xfs_log_busy_slot_t *
-xfs_trans_add_busy(xfs_trans_t *tp, xfs_agnumber_t ag, xfs_extlen_t idx)
-{
-	xfs_log_busy_chunk_t	*lbcp;
-	xfs_log_busy_slot_t	*lbsp;
-	int			i=0;
-
-	/*
-	 * If there are no free descriptors, allocate a new chunk
-	 * of them and put it at the front of the chunk list.
-	 */
-	if (tp->t_busy_free == 0) {
-		lbcp = (xfs_log_busy_chunk_t*)
-		       kmem_alloc(sizeof(xfs_log_busy_chunk_t), KM_SLEEP);
-		ASSERT(lbcp != NULL);
-		/*
-		 * Initialize the chunk, and then
-		 * claim the first slot in the newly allocated chunk.
-		 */
-		XFS_LBC_INIT(lbcp);
-		XFS_LBC_CLAIM(lbcp, 0);
-		lbcp->lbc_unused = 1;
-		lbsp = XFS_LBC_SLOT(lbcp, 0);
-
-		/*
-		 * Link in the new chunk and update the free count.
-		 */
-		lbcp->lbc_next = tp->t_busy.lbc_next;
-		tp->t_busy.lbc_next = lbcp;
-		tp->t_busy_free = XFS_LIC_NUM_SLOTS - 1;
-
-		/*
-		 * Initialize the descriptor and the generic portion
-		 * of the log item.
-		 *
-		 * Point the new slot at this item and return it.
-		 * Also point the log item at its currently active
-		 * descriptor and set the item's mount pointer.
-		 */
-		lbsp->lbc_ag = ag;
-		lbsp->lbc_idx = idx;
-		return lbsp;
-	}
-
-	/*
-	 * Find the free descriptor. It is somewhere in the chunklist
-	 * of descriptors.
-	 */
-	lbcp = &tp->t_busy;
-	while (lbcp != NULL) {
-		if (XFS_LBC_VACANCY(lbcp)) {
-			if (lbcp->lbc_unused <= XFS_LBC_MAX_SLOT) {
-				i = lbcp->lbc_unused;
-				break;
-			} else {
-				/* out-of-order vacancy */
-				cmn_err(CE_DEBUG, "OOO vacancy lbcp 0x%p\n", lbcp);
-				ASSERT(0);
-			}
-		}
-		lbcp = lbcp->lbc_next;
-	}
-	ASSERT(lbcp != NULL);
-	/*
-	 * If we find a free descriptor, claim it,
-	 * initialize it, and return it.
-	 */
-	XFS_LBC_CLAIM(lbcp, i);
-	if (lbcp->lbc_unused <= i) {
-		lbcp->lbc_unused = i + 1;
-	}
-	lbsp = XFS_LBC_SLOT(lbcp, i);
-	tp->t_busy_free--;
-	lbsp->lbc_ag = ag;
-	lbsp->lbc_idx = idx;
-	return lbsp;
-}
-
-
-/*
- * xfs_trans_free_busy
- * Free all of the busy lists from a transaction
- */
-void
-xfs_trans_free_busy(xfs_trans_t *tp)
-{
-	xfs_log_busy_chunk_t	*lbcp;
-	xfs_log_busy_chunk_t	*lbcq;
-
-	lbcp = tp->t_busy.lbc_next;
-	while (lbcp != NULL) {
-		lbcq = lbcp->lbc_next;
-		kmem_free(lbcp);
-		lbcp = lbcq;
-	}
-
-	XFS_LBC_INIT(&tp->t_busy);
-	tp->t_busy.lbc_unused = 0;
-}
diff --git a/fs/xfs/xfs_trans_priv.h b/fs/xfs/xfs_trans_priv.h
index 73e2ad397432..c6e4f2c8de6e 100644
--- a/fs/xfs/xfs_trans_priv.h
+++ b/fs/xfs/xfs_trans_priv.h
@@ -35,13 +35,14 @@ struct xfs_log_item_desc	*xfs_trans_find_item(struct xfs_trans *,
 struct xfs_log_item_desc	*xfs_trans_first_item(struct xfs_trans *);
 struct xfs_log_item_desc	*xfs_trans_next_item(struct xfs_trans *,
 					     struct xfs_log_item_desc *);
-void				xfs_trans_free_items(struct xfs_trans *, int);
-void				xfs_trans_unlock_items(struct xfs_trans *,
-							xfs_lsn_t);
-void				xfs_trans_free_busy(xfs_trans_t *tp);
-xfs_log_busy_slot_t		*xfs_trans_add_busy(xfs_trans_t *tp,
-						    xfs_agnumber_t ag,
-						    xfs_extlen_t idx);
+
+void	xfs_trans_unlock_items(struct xfs_trans *tp, xfs_lsn_t commit_lsn);
+void	xfs_trans_free_items(struct xfs_trans *tp, xfs_lsn_t commit_lsn,
+				int flags);
+
+void	xfs_trans_item_committed(struct xfs_log_item *lip,
+				xfs_lsn_t commit_lsn, int aborted);
+void	xfs_trans_unreserve_and_mod_sb(struct xfs_trans *tp);
 
 /*
  * AIL traversal cursor.
diff --git a/fs/xfs/xfs_types.h b/fs/xfs/xfs_types.h
index b09904555d07..320775295e32 100644
--- a/fs/xfs/xfs_types.h
+++ b/fs/xfs/xfs_types.h
@@ -75,6 +75,8 @@ typedef	__uint32_t	xfs_dahash_t;	/* dir/attr hash value */
 
 typedef __uint16_t	xfs_prid_t;	/* prid_t truncated to 16bits in XFS */
 
+typedef __uint32_t	xlog_tid_t;	/* transaction ID type */
+
 /*
  * These types are 64 bits on disk but are either 32 or 64 bits in memory.
  * Disk based types: