Merge branch 'delayed-logging-for-2.6.35' into for-linus

This commit is contained in:
Alex Elder 2010-05-24 11:57:36 -05:00
commit 88e88374ee
27 changed files with 2411 additions and 542 deletions

View File

@ -0,0 +1,816 @@
XFS Delayed Logging Design
--------------------------
Introduction to Re-logging in XFS
---------------------------------
XFS logging is a combination of logical and physical logging. Some objects,
such as inodes and dquots, are logged in logical format where the details
logged are made up of the changes to in-core structures rather than on-disk
structures. Other objects - typically buffers - have their physical changes
logged. The reason for these differences is to reduce the amount of log space
required for objects that are frequently logged. Some parts of inodes are more
frequently logged than others, and inodes are typically more frequently logged
than any other object (except maybe the superblock buffer) so keeping the
amount of metadata logged low is of prime importance.
The reason that this is such a concern is that XFS allows multiple separate
modifications to a single object to be carried in the log at any given time.
This allows the log to avoid needing to flush each change to disk before
recording a new change to the object. XFS does this via a method called
"re-logging". Conceptually, this is quite simple - all it requires is that any
new change to the object is recorded with a *new copy* of all the existing
changes in the new transaction that is written to the log.
That is, if we have a sequence of changes A through to F, and the object was
written to disk after change D, we would see in the log the following series
of transactions, their contents and the log sequence number (LSN) of the
transaction:
Transaction Contents LSN
A A X
B A+B X+n
C A+B+C X+n+m
D A+B+C+D X+n+m+o
<object written to disk>
E E Y (> X+n+m+o)
F E+F Yٍ+p
In other words, each time an object is relogged, the new transaction contains
the aggregation of all the previous changes currently held only in the log.
This relogging technique also allows objects to be moved forward in the log so
that an object being relogged does not prevent the tail of the log from ever
moving forward. This can be seen in the table above by the changing
(increasing) LSN of each subsquent transaction - the LSN is effectively a
direct encoding of the location in the log of the transaction.
This relogging is also used to implement long-running, multiple-commit
transactions. These transaction are known as rolling transactions, and require
a special log reservation known as a permanent transaction reservation. A
typical example of a rolling transaction is the removal of extents from an
inode which can only be done at a rate of two extents per transaction because
of reservation size limitations. Hence a rolling extent removal transaction
keeps relogging the inode and btree buffers as they get modified in each
removal operation. This keeps them moving forward in the log as the operation
progresses, ensuring that current operation never gets blocked by itself if the
log wraps around.
Hence it can be seen that the relogging operation is fundamental to the correct
working of the XFS journalling subsystem. From the above description, most
people should be able to see why the XFS metadata operations writes so much to
the log - repeated operations to the same objects write the same changes to
the log over and over again. Worse is the fact that objects tend to get
dirtier as they get relogged, so each subsequent transaction is writing more
metadata into the log.
Another feature of the XFS transaction subsystem is that most transactions are
asynchronous. That is, they don't commit to disk until either a log buffer is
filled (a log buffer can hold multiple transactions) or a synchronous operation
forces the log buffers holding the transactions to disk. This means that XFS is
doing aggregation of transactions in memory - batching them, if you like - to
minimise the impact of the log IO on transaction throughput.
The limitation on asynchronous transaction throughput is the number and size of
log buffers made available by the log manager. By default there are 8 log
buffers available and the size of each is 32kB - the size can be increased up
to 256kB by use of a mount option.
Effectively, this gives us the maximum bound of outstanding metadata changes
that can be made to the filesystem at any point in time - if all the log
buffers are full and under IO, then no more transactions can be committed until
the current batch completes. It is now common for a single current CPU core to
be to able to issue enough transactions to keep the log buffers full and under
IO permanently. Hence the XFS journalling subsystem can be considered to be IO
bound.
Delayed Logging: Concepts
-------------------------
The key thing to note about the asynchronous logging combined with the
relogging technique XFS uses is that we can be relogging changed objects
multiple times before they are committed to disk in the log buffers. If we
return to the previous relogging example, it is entirely possible that
transactions A through D are committed to disk in the same log buffer.
That is, a single log buffer may contain multiple copies of the same object,
but only one of those copies needs to be there - the last one "D", as it
contains all the changes from the previous changes. In other words, we have one
necessary copy in the log buffer, and three stale copies that are simply
wasting space. When we are doing repeated operations on the same set of
objects, these "stale objects" can be over 90% of the space used in the log
buffers. It is clear that reducing the number of stale objects written to the
log would greatly reduce the amount of metadata we write to the log, and this
is the fundamental goal of delayed logging.
From a conceptual point of view, XFS is already doing relogging in memory (where
memory == log buffer), only it is doing it extremely inefficiently. It is using
logical to physical formatting to do the relogging because there is no
infrastructure to keep track of logical changes in memory prior to physically
formatting the changes in a transaction to the log buffer. Hence we cannot avoid
accumulating stale objects in the log buffers.
Delayed logging is the name we've given to keeping and tracking transactional
changes to objects in memory outside the log buffer infrastructure. Because of
the relogging concept fundamental to the XFS journalling subsystem, this is
actually relatively easy to do - all the changes to logged items are already
tracked in the current infrastructure. The big problem is how to accumulate
them and get them to the log in a consistent, recoverable manner.
Describing the problems and how they have been solved is the focus of this
document.
One of the key changes that delayed logging makes to the operation of the
journalling subsystem is that it disassociates the amount of outstanding
metadata changes from the size and number of log buffers available. In other
words, instead of there only being a maximum of 2MB of transaction changes not
written to the log at any point in time, there may be a much greater amount
being accumulated in memory. Hence the potential for loss of metadata on a
crash is much greater than for the existing logging mechanism.
It should be noted that this does not change the guarantee that log recovery
will result in a consistent filesystem. What it does mean is that as far as the
recovered filesystem is concerned, there may be many thousands of transactions
that simply did not occur as a result of the crash. This makes it even more
important that applications that care about their data use fsync() where they
need to ensure application level data integrity is maintained.
It should be noted that delayed logging is not an innovative new concept that
warrants rigorous proofs to determine whether it is correct or not. The method
of accumulating changes in memory for some period before writing them to the
log is used effectively in many filesystems including ext3 and ext4. Hence
no time is spent in this document trying to convince the reader that the
concept is sound. Instead it is simply considered a "solved problem" and as
such implementing it in XFS is purely an exercise in software engineering.
The fundamental requirements for delayed logging in XFS are simple:
1. Reduce the amount of metadata written to the log by at least
an order of magnitude.
2. Supply sufficient statistics to validate Requirement #1.
3. Supply sufficient new tracing infrastructure to be able to debug
problems with the new code.
4. No on-disk format change (metadata or log format).
5. Enable and disable with a mount option.
6. No performance regressions for synchronous transaction workloads.
Delayed Logging: Design
-----------------------
Storing Changes
The problem with accumulating changes at a logical level (i.e. just using the
existing log item dirty region tracking) is that when it comes to writing the
changes to the log buffers, we need to ensure that the object we are formatting
is not changing while we do this. This requires locking the object to prevent
concurrent modification. Hence flushing the logical changes to the log would
require us to lock every object, format them, and then unlock them again.
This introduces lots of scope for deadlocks with transactions that are already
running. For example, a transaction has object A locked and modified, but needs
the delayed logging tracking lock to commit the transaction. However, the
flushing thread has the delayed logging tracking lock already held, and is
trying to get the lock on object A to flush it to the log buffer. This appears
to be an unsolvable deadlock condition, and it was solving this problem that
was the barrier to implementing delayed logging for so long.
The solution is relatively simple - it just took a long time to recognise it.
Put simply, the current logging code formats the changes to each item into an
vector array that points to the changed regions in the item. The log write code
simply copies the memory these vectors point to into the log buffer during
transaction commit while the item is locked in the transaction. Instead of
using the log buffer as the destination of the formatting code, we can use an
allocated memory buffer big enough to fit the formatted vector.
If we then copy the vector into the memory buffer and rewrite the vector to
point to the memory buffer rather than the object itself, we now have a copy of
the changes in a format that is compatible with the log buffer writing code.
that does not require us to lock the item to access. This formatting and
rewriting can all be done while the object is locked during transaction commit,
resulting in a vector that is transactionally consistent and can be accessed
without needing to lock the owning item.
Hence we avoid the need to lock items when we need to flush outstanding
asynchronous transactions to the log. The differences between the existing
formatting method and the delayed logging formatting can be seen in the
diagram below.
Current format log vector:
Object +---------------------------------------------+
Vector 1 +----+
Vector 2 +----+
Vector 3 +----------+
After formatting:
Log Buffer +-V1-+-V2-+----V3----+
Delayed logging vector:
Object +---------------------------------------------+
Vector 1 +----+
Vector 2 +----+
Vector 3 +----------+
After formatting:
Memory Buffer +-V1-+-V2-+----V3----+
Vector 1 +----+
Vector 2 +----+
Vector 3 +----------+
The memory buffer and associated vector need to be passed as a single object,
but still need to be associated with the parent object so if the object is
relogged we can replace the current memory buffer with a new memory buffer that
contains the latest changes.
The reason for keeping the vector around after we've formatted the memory
buffer is to support splitting vectors across log buffer boundaries correctly.
If we don't keep the vector around, we do not know where the region boundaries
are in the item, so we'd need a new encapsulation method for regions in the log
buffer writing (i.e. double encapsulation). This would be an on-disk format
change and as such is not desirable. It also means we'd have to write the log
region headers in the formatting stage, which is problematic as there is per
region state that needs to be placed into the headers during the log write.
Hence we need to keep the vector, but by attaching the memory buffer to it and
rewriting the vector addresses to point at the memory buffer we end up with a
self-describing object that can be passed to the log buffer write code to be
handled in exactly the same manner as the existing log vectors are handled.
Hence we avoid needing a new on-disk format to handle items that have been
relogged in memory.
Tracking Changes
Now that we can record transactional changes in memory in a form that allows
them to be used without limitations, we need to be able to track and accumulate
them so that they can be written to the log at some later point in time. The
log item is the natural place to store this vector and buffer, and also makes sense
to be the object that is used to track committed objects as it will always
exist once the object has been included in a transaction.
The log item is already used to track the log items that have been written to
the log but not yet written to disk. Such log items are considered "active"
and as such are stored in the Active Item List (AIL) which is a LSN-ordered
double linked list. Items are inserted into this list during log buffer IO
completion, after which they are unpinned and can be written to disk. An object
that is in the AIL can be relogged, which causes the object to be pinned again
and then moved forward in the AIL when the log buffer IO completes for that
transaction.
Essentially, this shows that an item that is in the AIL can still be modified
and relogged, so any tracking must be separate to the AIL infrastructure. As
such, we cannot reuse the AIL list pointers for tracking committed items, nor
can we store state in any field that is protected by the AIL lock. Hence the
committed item tracking needs it's own locks, lists and state fields in the log
item.
Similar to the AIL, tracking of committed items is done through a new list
called the Committed Item List (CIL). The list tracks log items that have been
committed and have formatted memory buffers attached to them. It tracks objects
in transaction commit order, so when an object is relogged it is removed from
it's place in the list and re-inserted at the tail. This is entirely arbitrary
and done to make it easy for debugging - the last items in the list are the
ones that are most recently modified. Ordering of the CIL is not necessary for
transactional integrity (as discussed in the next section) so the ordering is
done for convenience/sanity of the developers.
Delayed Logging: Checkpoints
When we have a log synchronisation event, commonly known as a "log force",
all the items in the CIL must be written into the log via the log buffers.
We need to write these items in the order that they exist in the CIL, and they
need to be written as an atomic transaction. The need for all the objects to be
written as an atomic transaction comes from the requirements of relogging and
log replay - all the changes in all the objects in a given transaction must
either be completely replayed during log recovery, or not replayed at all. If
a transaction is not replayed because it is not complete in the log, then
no later transactions should be replayed, either.
To fulfill this requirement, we need to write the entire CIL in a single log
transaction. Fortunately, the XFS log code has no fixed limit on the size of a
transaction, nor does the log replay code. The only fundamental limit is that
the transaction cannot be larger than just under half the size of the log. The
reason for this limit is that to find the head and tail of the log, there must
be at least one complete transaction in the log at any given time. If a
transaction is larger than half the log, then there is the possibility that a
crash during the write of a such a transaction could partially overwrite the
only complete previous transaction in the log. This will result in a recovery
failure and an inconsistent filesystem and hence we must enforce the maximum
size of a checkpoint to be slightly less than a half the log.
Apart from this size requirement, a checkpoint transaction looks no different
to any other transaction - it contains a transaction header, a series of
formatted log items and a commit record at the tail. From a recovery
perspective, the checkpoint transaction is also no different - just a lot
bigger with a lot more items in it. The worst case effect of this is that we
might need to tune the recovery transaction object hash size.
Because the checkpoint is just another transaction and all the changes to log
items are stored as log vectors, we can use the existing log buffer writing
code to write the changes into the log. To do this efficiently, we need to
minimise the time we hold the CIL locked while writing the checkpoint
transaction. The current log write code enables us to do this easily with the
way it separates the writing of the transaction contents (the log vectors) from
the transaction commit record, but tracking this requires us to have a
per-checkpoint context that travels through the log write process through to
checkpoint completion.
Hence a checkpoint has a context that tracks the state of the current
checkpoint from initiation to checkpoint completion. A new context is initiated
at the same time a checkpoint transaction is started. That is, when we remove
all the current items from the CIL during a checkpoint operation, we move all
those changes into the current checkpoint context. We then initialise a new
context and attach that to the CIL for aggregation of new transactions.
This allows us to unlock the CIL immediately after transfer of all the
committed items and effectively allow new transactions to be issued while we
are formatting the checkpoint into the log. It also allows concurrent
checkpoints to be written into the log buffers in the case of log force heavy
workloads, just like the existing transaction commit code does. This, however,
requires that we strictly order the commit records in the log so that
checkpoint sequence order is maintained during log replay.
To ensure that we can be writing an item into a checkpoint transaction at
the same time another transaction modifies the item and inserts the log item
into the new CIL, then checkpoint transaction commit code cannot use log items
to store the list of log vectors that need to be written into the transaction.
Hence log vectors need to be able to be chained together to allow them to be
detatched from the log items. That is, when the CIL is flushed the memory
buffer and log vector attached to each log item needs to be attached to the
checkpoint context so that the log item can be released. In diagrammatic form,
the CIL would look like this before the flush:
CIL Head
|
V
Log Item <-> log vector 1 -> memory buffer
| -> vector array
V
Log Item <-> log vector 2 -> memory buffer
| -> vector array
V
......
|
V
Log Item <-> log vector N-1 -> memory buffer
| -> vector array
V
Log Item <-> log vector N -> memory buffer
-> vector array
And after the flush the CIL head is empty, and the checkpoint context log
vector list would look like:
Checkpoint Context
|
V
log vector 1 -> memory buffer
| -> vector array
| -> Log Item
V
log vector 2 -> memory buffer
| -> vector array
| -> Log Item
V
......
|
V
log vector N-1 -> memory buffer
| -> vector array
| -> Log Item
V
log vector N -> memory buffer
-> vector array
-> Log Item
Once this transfer is done, the CIL can be unlocked and new transactions can
start, while the checkpoint flush code works over the log vector chain to
commit the checkpoint.
Once the checkpoint is written into the log buffers, the checkpoint context is
attached to the log buffer that the commit record was written to along with a
completion callback. Log IO completion will call that callback, which can then
run transaction committed processing for the log items (i.e. insert into AIL
and unpin) in the log vector chain and then free the log vector chain and
checkpoint context.
Discussion Point: I am uncertain as to whether the log item is the most
efficient way to track vectors, even though it seems like the natural way to do
it. The fact that we walk the log items (in the CIL) just to chain the log
vectors and break the link between the log item and the log vector means that
we take a cache line hit for the log item list modification, then another for
the log vector chaining. If we track by the log vectors, then we only need to
break the link between the log item and the log vector, which means we should
dirty only the log item cachelines. Normally I wouldn't be concerned about one
vs two dirty cachelines except for the fact I've seen upwards of 80,000 log
vectors in one checkpoint transaction. I'd guess this is a "measure and
compare" situation that can be done after a working and reviewed implementation
is in the dev tree....
Delayed Logging: Checkpoint Sequencing
One of the key aspects of the XFS transaction subsystem is that it tags
committed transactions with the log sequence number of the transaction commit.
This allows transactions to be issued asynchronously even though there may be
future operations that cannot be completed until that transaction is fully
committed to the log. In the rare case that a dependent operation occurs (e.g.
re-using a freed metadata extent for a data extent), a special, optimised log
force can be issued to force the dependent transaction to disk immediately.
To do this, transactions need to record the LSN of the commit record of the
transaction. This LSN comes directly from the log buffer the transaction is
written into. While this works just fine for the existing transaction
mechanism, it does not work for delayed logging because transactions are not
written directly into the log buffers. Hence some other method of sequencing
transactions is required.
As discussed in the checkpoint section, delayed logging uses per-checkpoint
contexts, and as such it is simple to assign a sequence number to each
checkpoint. Because the switching of checkpoint contexts must be done
atomically, it is simple to ensure that each new context has a monotonically
increasing sequence number assigned to it without the need for an external
atomic counter - we can just take the current context sequence number and add
one to it for the new context.
Then, instead of assigning a log buffer LSN to the transaction commit LSN
during the commit, we can assign the current checkpoint sequence. This allows
operations that track transactions that have not yet completed know what
checkpoint sequence needs to be committed before they can continue. As a
result, the code that forces the log to a specific LSN now needs to ensure that
the log forces to a specific checkpoint.
To ensure that we can do this, we need to track all the checkpoint contexts
that are currently committing to the log. When we flush a checkpoint, the
context gets added to a "committing" list which can be searched. When a
checkpoint commit completes, it is removed from the committing list. Because
the checkpoint context records the LSN of the commit record for the checkpoint,
we can also wait on the log buffer that contains the commit record, thereby
using the existing log force mechanisms to execute synchronous forces.
It should be noted that the synchronous forces may need to be extended with
mitigation algorithms similar to the current log buffer code to allow
aggregation of multiple synchronous transactions if there are already
synchronous transactions being flushed. Investigation of the performance of the
current design is needed before making any decisions here.
The main concern with log forces is to ensure that all the previous checkpoints
are also committed to disk before the one we need to wait for. Therefore we
need to check that all the prior contexts in the committing list are also
complete before waiting on the one we need to complete. We do this
synchronisation in the log force code so that we don't need to wait anywhere
else for such serialisation - it only matters when we do a log force.
The only remaining complexity is that a log force now also has to handle the
case where the forcing sequence number is the same as the current context. That
is, we need to flush the CIL and potentially wait for it to complete. This is a
simple addition to the existing log forcing code to check the sequence numbers
and push if required. Indeed, placing the current sequence checkpoint flush in
the log force code enables the current mechanism for issuing synchronous
transactions to remain untouched (i.e. commit an asynchronous transaction, then
force the log at the LSN of that transaction) and so the higher level code
behaves the same regardless of whether delayed logging is being used or not.
Delayed Logging: Checkpoint Log Space Accounting
The big issue for a checkpoint transaction is the log space reservation for the
transaction. We don't know how big a checkpoint transaction is going to be
ahead of time, nor how many log buffers it will take to write out, nor the
number of split log vector regions are going to be used. We can track the
amount of log space required as we add items to the commit item list, but we
still need to reserve the space in the log for the checkpoint.
A typical transaction reserves enough space in the log for the worst case space
usage of the transaction. The reservation accounts for log record headers,
transaction and region headers, headers for split regions, buffer tail padding,
etc. as well as the actual space for all the changed metadata in the
transaction. While some of this is fixed overhead, much of it is dependent on
the size of the transaction and the number of regions being logged (the number
of log vectors in the transaction).
An example of the differences would be logging directory changes versus logging
inode changes. If you modify lots of inode cores (e.g. chmod -R g+w *), then
there are lots of transactions that only contain an inode core and an inode log
format structure. That is, two vectors totaling roughly 150 bytes. If we modify
10,000 inodes, we have about 1.5MB of metadata to write in 20,000 vectors. Each
vector is 12 bytes, so the total to be logged is approximately 1.75MB. In
comparison, if we are logging full directory buffers, they are typically 4KB
each, so we in 1.5MB of directory buffers we'd have roughly 400 buffers and a
buffer format structure for each buffer - roughly 800 vectors or 1.51MB total
space. From this, it should be obvious that a static log space reservation is
not particularly flexible and is difficult to select the "optimal value" for
all workloads.
Further, if we are going to use a static reservation, which bit of the entire
reservation does it cover? We account for space used by the transaction
reservation by tracking the space currently used by the object in the CIL and
then calculating the increase or decrease in space used as the object is
relogged. This allows for a checkpoint reservation to only have to account for
log buffer metadata used such as log header records.
However, even using a static reservation for just the log metadata is
problematic. Typically log record headers use at least 16KB of log space per
1MB of log space consumed (512 bytes per 32k) and the reservation needs to be
large enough to handle arbitrary sized checkpoint transactions. This
reservation needs to be made before the checkpoint is started, and we need to
be able to reserve the space without sleeping. For a 8MB checkpoint, we need a
reservation of around 150KB, which is a non-trivial amount of space.
A static reservation needs to manipulate the log grant counters - we can take a
permanent reservation on the space, but we still need to make sure we refresh
the write reservation (the actual space available to the transaction) after
every checkpoint transaction completion. Unfortunately, if this space is not
available when required, then the regrant code will sleep waiting for it.
The problem with this is that it can lead to deadlocks as we may need to commit
checkpoints to be able to free up log space (refer back to the description of
rolling transactions for an example of this). Hence we *must* always have
space available in the log if we are to use static reservations, and that is
very difficult and complex to arrange. It is possible to do, but there is a
simpler way.
The simpler way of doing this is tracking the entire log space used by the
items in the CIL and using this to dynamically calculate the amount of log
space required by the log metadata. If this log metadata space changes as a
result of a transaction commit inserting a new memory buffer into the CIL, then
the difference in space required is removed from the transaction that causes
the change. Transactions at this level will *always* have enough space
available in their reservation for this as they have already reserved the
maximal amount of log metadata space they require, and such a delta reservation
will always be less than or equal to the maximal amount in the reservation.
Hence we can grow the checkpoint transaction reservation dynamically as items
are added to the CIL and avoid the need for reserving and regranting log space
up front. This avoids deadlocks and removes a blocking point from the
checkpoint flush code.
As mentioned early, transactions can't grow to more than half the size of the
log. Hence as part of the reservation growing, we need to also check the size
of the reservation against the maximum allowed transaction size. If we reach
the maximum threshold, we need to push the CIL to the log. This is effectively
a "background flush" and is done on demand. This is identical to
a CIL push triggered by a log force, only that there is no waiting for the
checkpoint commit to complete. This background push is checked and executed by
transaction commit code.
If the transaction subsystem goes idle while we still have items in the CIL,
they will be flushed by the periodic log force issued by the xfssyncd. This log
force will push the CIL to disk, and if the transaction subsystem stays idle,
allow the idle log to be covered (effectively marked clean) in exactly the same
manner that is done for the existing logging method. A discussion point is
whether this log force needs to be done more frequently than the current rate
which is once every 30s.
Delayed Logging: Log Item Pinning
Currently log items are pinned during transaction commit while the items are
still locked. This happens just after the items are formatted, though it could
be done any time before the items are unlocked. The result of this mechanism is
that items get pinned once for every transaction that is committed to the log
buffers. Hence items that are relogged in the log buffers will have a pin count
for every outstanding transaction they were dirtied in. When each of these
transactions is completed, they will unpin the item once. As a result, the item
only becomes unpinned when all the transactions complete and there are no
pending transactions. Thus the pinning and unpinning of a log item is symmetric
as there is a 1:1 relationship with transaction commit and log item completion.
For delayed logging, however, we have an assymetric transaction commit to
completion relationship. Every time an object is relogged in the CIL it goes
through the commit process without a corresponding completion being registered.
That is, we now have a many-to-one relationship between transaction commit and
log item completion. The result of this is that pinning and unpinning of the
log items becomes unbalanced if we retain the "pin on transaction commit, unpin
on transaction completion" model.
To keep pin/unpin symmetry, the algorithm needs to change to a "pin on
insertion into the CIL, unpin on checkpoint completion". In other words, the
pinning and unpinning becomes symmetric around a checkpoint context. We have to
pin the object the first time it is inserted into the CIL - if it is already in
the CIL during a transaction commit, then we do not pin it again. Because there
can be multiple outstanding checkpoint contexts, we can still see elevated pin
counts, but as each checkpoint completes the pin count will retain the correct
value according to it's context.
Just to make matters more slightly more complex, this checkpoint level context
for the pin count means that the pinning of an item must take place under the
CIL commit/flush lock. If we pin the object outside this lock, we cannot
guarantee which context the pin count is associated with. This is because of
the fact pinning the item is dependent on whether the item is present in the
current CIL or not. If we don't pin the CIL first before we check and pin the
object, we have a race with CIL being flushed between the check and the pin
(or not pinning, as the case may be). Hence we must hold the CIL flush/commit
lock to guarantee that we pin the items correctly.
Delayed Logging: Concurrent Scalability
A fundamental requirement for the CIL is that accesses through transaction
commits must scale to many concurrent commits. The current transaction commit
code does not break down even when there are transactions coming from 2048
processors at once. The current transaction code does not go any faster than if
there was only one CPU using it, but it does not slow down either.
As a result, the delayed logging transaction commit code needs to be designed
for concurrency from the ground up. It is obvious that there are serialisation
points in the design - the three important ones are:
1. Locking out new transaction commits while flushing the CIL
2. Adding items to the CIL and updating item space accounting
3. Checkpoint commit ordering
Looking at the transaction commit and CIL flushing interactions, it is clear
that we have a many-to-one interaction here. That is, the only restriction on
the number of concurrent transactions that can be trying to commit at once is
the amount of space available in the log for their reservations. The practical
limit here is in the order of several hundred concurrent transactions for a
128MB log, which means that it is generally one per CPU in a machine.
The amount of time a transaction commit needs to hold out a flush is a
relatively long period of time - the pinning of log items needs to be done
while we are holding out a CIL flush, so at the moment that means it is held
across the formatting of the objects into memory buffers (i.e. while memcpy()s
are in progress). Ultimately a two pass algorithm where the formatting is done
separately to the pinning of objects could be used to reduce the hold time of
the transaction commit side.
Because of the number of potential transaction commit side holders, the lock
really needs to be a sleeping lock - if the CIL flush takes the lock, we do not
want every other CPU in the machine spinning on the CIL lock. Given that
flushing the CIL could involve walking a list of tens of thousands of log
items, it will get held for a significant time and so spin contention is a
significant concern. Preventing lots of CPUs spinning doing nothing is the
main reason for choosing a sleeping lock even though nothing in either the
transaction commit or CIL flush side sleeps with the lock held.
It should also be noted that CIL flushing is also a relatively rare operation
compared to transaction commit for asynchronous transaction workloads - only
time will tell if using a read-write semaphore for exclusion will limit
transaction commit concurrency due to cache line bouncing of the lock on the
read side.
The second serialisation point is on the transaction commit side where items
are inserted into the CIL. Because transactions can enter this code
concurrently, the CIL needs to be protected separately from the above
commit/flush exclusion. It also needs to be an exclusive lock but it is only
held for a very short time and so a spin lock is appropriate here. It is
possible that this lock will become a contention point, but given the short
hold time once per transaction I think that contention is unlikely.
The final serialisation point is the checkpoint commit record ordering code
that is run as part of the checkpoint commit and log force sequencing. The code
path that triggers a CIL flush (i.e. whatever triggers the log force) will enter
an ordering loop after writing all the log vectors into the log buffers but
before writing the commit record. This loop walks the list of committing
checkpoints and needs to block waiting for checkpoints to complete their commit
record write. As a result it needs a lock and a wait variable. Log force
sequencing also requires the same lock, list walk, and blocking mechanism to
ensure completion of checkpoints.
These two sequencing operations can use the mechanism even though the
events they are waiting for are different. The checkpoint commit record
sequencing needs to wait until checkpoint contexts contain a commit LSN
(obtained through completion of a commit record write) while log force
sequencing needs to wait until previous checkpoint contexts are removed from
the committing list (i.e. they've completed). A simple wait variable and
broadcast wakeups (thundering herds) has been used to implement these two
serialisation queues. They use the same lock as the CIL, too. If we see too
much contention on the CIL lock, or too many context switches as a result of
the broadcast wakeups these operations can be put under a new spinlock and
given separate wait lists to reduce lock contention and the number of processes
woken by the wrong event.
Lifecycle Changes
The existing log item life cycle is as follows:
1. Transaction allocate
2. Transaction reserve
3. Lock item
4. Join item to transaction
If not already attached,
Allocate log item
Attach log item to owner item
Attach log item to transaction
5. Modify item
Record modifications in log item
6. Transaction commit
Pin item in memory
Format item into log buffer
Write commit LSN into transaction
Unlock item
Attach transaction to log buffer
<log buffer IO dispatched>
<log buffer IO completes>
7. Transaction completion
Mark log item committed
Insert log item into AIL
Write commit LSN into log item
Unpin log item
8. AIL traversal
Lock item
Mark log item clean
Flush item to disk
<item IO completion>
9. Log item removed from AIL
Moves log tail
Item unlocked
Essentially, steps 1-6 operate independently from step 7, which is also
independent of steps 8-9. An item can be locked in steps 1-6 or steps 8-9
at the same time step 7 is occurring, but only steps 1-6 or 8-9 can occur
at the same time. If the log item is in the AIL or between steps 6 and 7
and steps 1-6 are re-entered, then the item is relogged. Only when steps 8-9
are entered and completed is the object considered clean.
With delayed logging, there are new steps inserted into the life cycle:
1. Transaction allocate
2. Transaction reserve
3. Lock item
4. Join item to transaction
If not already attached,
Allocate log item
Attach log item to owner item
Attach log item to transaction
5. Modify item
Record modifications in log item
6. Transaction commit
Pin item in memory if not pinned in CIL
Format item into log vector + buffer
Attach log vector and buffer to log item
Insert log item into CIL
Write CIL context sequence into transaction
Unlock item
<next log force>
7. CIL push
lock CIL flush
Chain log vectors and buffers together
Remove items from CIL
unlock CIL flush
write log vectors into log
sequence commit records
attach checkpoint context to log buffer
<log buffer IO dispatched>
<log buffer IO completes>
8. Checkpoint completion
Mark log item committed
Insert item into AIL
Write commit LSN into log item
Unpin log item
9. AIL traversal
Lock item
Mark log item clean
Flush item to disk
<item IO completion>
10. Log item removed from AIL
Moves log tail
Item unlocked
From this, it can be seen that the only life cycle differences between the two
logging methods are in the middle of the life cycle - they still have the same
beginning and end and execution constraints. The only differences are in the
commiting of the log items to the log itself and the completion processing.
Hence delayed logging should not introduce any constraints on log item
behaviour, allocation or freeing that don't already exist.
As a result of this zero-impact "insertion" of delayed logging infrastructure
and the design of the internal structures to avoid on disk format changes, we
can basically switch between delayed logging and the existing mechanism with a
mount option. Fundamentally, there is no reason why the log manager would not
be able to swap methods automatically and transparently depending on load
characteristics, but this should not be necessary if delayed logging works as
designed.
Roadmap:
2.6.35 Inclusion in mainline as an experimental mount option
=> approximately 2-3 months to merge window
=> needs to be in xfs-dev tree in 4-6 weeks
=> code is nearing readiness for review
2.6.37 Remove experimental tag from mount option
=> should be roughly 6 months after initial merge
=> enough time to:
=> gain confidence and fix problems reported by early
adopters (a.k.a. guinea pigs)
=> address worst performance regressions and undesired
behaviours
=> start tuning/optimising code for parallelism
=> start tuning/optimising algorithms consuming
excessive CPU time
2.6.39 Switch default mount option to use delayed logging
=> should be roughly 12 months after initial merge
=> enough time to shake out remaining problems before next round of
enterprise distro kernel rebases

View File

@ -77,6 +77,7 @@ xfs-y += xfs_alloc.o \
xfs_itable.o \
xfs_dfrag.o \
xfs_log.o \
xfs_log_cil.o \
xfs_log_recover.o \
xfs_mount.o \
xfs_mru_cache.o \

View File

@ -37,6 +37,7 @@
#include "xfs_sb.h"
#include "xfs_inum.h"
#include "xfs_log.h"
#include "xfs_ag.h"
#include "xfs_dmapi.h"
#include "xfs_mount.h"
@ -850,6 +851,12 @@ xfs_buf_lock_value(
* Note that this in no way locks the underlying pages, so it is only
* useful for synchronizing concurrent use of buffer objects, not for
* synchronizing independent access to the underlying pages.
*
* If we come across a stale, pinned, locked buffer, we know that we
* are being asked to lock a buffer that has been reallocated. Because
* it is pinned, we know that the log has not been pushed to disk and
* hence it will still be locked. Rather than sleeping until someone
* else pushes the log, push it ourselves before trying to get the lock.
*/
void
xfs_buf_lock(
@ -857,6 +864,8 @@ xfs_buf_lock(
{
trace_xfs_buf_lock(bp, _RET_IP_);
if (atomic_read(&bp->b_pin_count) && (bp->b_flags & XBF_STALE))
xfs_log_force(bp->b_mount, 0);
if (atomic_read(&bp->b_io_remaining))
blk_run_address_space(bp->b_target->bt_mapping);
down(&bp->b_sema);

View File

@ -19,6 +19,7 @@
#include "xfs_dmapi.h"
#include "xfs_sb.h"
#include "xfs_inum.h"
#include "xfs_log.h"
#include "xfs_ag.h"
#include "xfs_mount.h"
#include "xfs_quota.h"

View File

@ -119,6 +119,8 @@ mempool_t *xfs_ioend_pool;
#define MNTOPT_DMAPI "dmapi" /* DMI enabled (DMAPI / XDSM) */
#define MNTOPT_XDSM "xdsm" /* DMI enabled (DMAPI / XDSM) */
#define MNTOPT_DMI "dmi" /* DMI enabled (DMAPI / XDSM) */
#define MNTOPT_DELAYLOG "delaylog" /* Delayed loging enabled */
#define MNTOPT_NODELAYLOG "nodelaylog" /* Delayed loging disabled */
/*
* Table driven mount option parser.
@ -374,6 +376,13 @@ xfs_parseargs(
mp->m_flags |= XFS_MOUNT_DMAPI;
} else if (!strcmp(this_char, MNTOPT_DMI)) {
mp->m_flags |= XFS_MOUNT_DMAPI;
} else if (!strcmp(this_char, MNTOPT_DELAYLOG)) {
mp->m_flags |= XFS_MOUNT_DELAYLOG;
cmn_err(CE_WARN,
"Enabling EXPERIMENTAL delayed logging feature "
"- use at your own risk.\n");
} else if (!strcmp(this_char, MNTOPT_NODELAYLOG)) {
mp->m_flags &= ~XFS_MOUNT_DELAYLOG;
} else if (!strcmp(this_char, "ihashsize")) {
cmn_err(CE_WARN,
"XFS: ihashsize no longer used, option is deprecated.");
@ -535,6 +544,7 @@ xfs_showargs(
{ XFS_MOUNT_FILESTREAMS, "," MNTOPT_FILESTREAM },
{ XFS_MOUNT_DMAPI, "," MNTOPT_DMAPI },
{ XFS_MOUNT_GRPID, "," MNTOPT_GRPID },
{ XFS_MOUNT_DELAYLOG, "," MNTOPT_DELAYLOG },
{ 0, NULL }
};
static struct proc_xfs_info xfs_info_unset[] = {
@ -1755,7 +1765,7 @@ xfs_init_zones(void)
* but it is much faster.
*/
xfs_buf_item_zone = kmem_zone_init((sizeof(xfs_buf_log_item_t) +
(((XFS_MAX_BLOCKSIZE / XFS_BLI_CHUNK) /
(((XFS_MAX_BLOCKSIZE / XFS_BLF_CHUNK) /
NBWORD) * sizeof(int))), "xfs_buf_item");
if (!xfs_buf_item_zone)
goto out_destroy_trans_zone;

View File

@ -1059,83 +1059,112 @@ TRACE_EVENT(xfs_bunmap,
);
#define XFS_BUSY_SYNC \
{ 0, "async" }, \
{ 1, "sync" }
TRACE_EVENT(xfs_alloc_busy,
TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno, xfs_agblock_t agbno,
xfs_extlen_t len, int slot),
TP_ARGS(mp, agno, agbno, len, slot),
TP_PROTO(struct xfs_trans *trans, xfs_agnumber_t agno,
xfs_agblock_t agbno, xfs_extlen_t len, int sync),
TP_ARGS(trans, agno, agbno, len, sync),
TP_STRUCT__entry(
__field(dev_t, dev)
__field(struct xfs_trans *, tp)
__field(int, tid)
__field(xfs_agnumber_t, agno)
__field(xfs_agblock_t, agbno)
__field(xfs_extlen_t, len)
__field(int, sync)
),
TP_fast_assign(
__entry->dev = trans->t_mountp->m_super->s_dev;
__entry->tp = trans;
__entry->tid = trans->t_ticket->t_tid;
__entry->agno = agno;
__entry->agbno = agbno;
__entry->len = len;
__entry->sync = sync;
),
TP_printk("dev %d:%d trans 0x%p tid 0x%x agno %u agbno %u len %u %s",
MAJOR(__entry->dev), MINOR(__entry->dev),
__entry->tp,
__entry->tid,
__entry->agno,
__entry->agbno,
__entry->len,
__print_symbolic(__entry->sync, XFS_BUSY_SYNC))
);
TRACE_EVENT(xfs_alloc_unbusy,
TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno,
xfs_agblock_t agbno, xfs_extlen_t len),
TP_ARGS(mp, agno, agbno, len),
TP_STRUCT__entry(
__field(dev_t, dev)
__field(xfs_agnumber_t, agno)
__field(xfs_agblock_t, agbno)
__field(xfs_extlen_t, len)
__field(int, slot)
),
TP_fast_assign(
__entry->dev = mp->m_super->s_dev;
__entry->agno = agno;
__entry->agbno = agbno;
__entry->len = len;
__entry->slot = slot;
),
TP_printk("dev %d:%d agno %u agbno %u len %u slot %d",
TP_printk("dev %d:%d agno %u agbno %u len %u",
MAJOR(__entry->dev), MINOR(__entry->dev),
__entry->agno,
__entry->agbno,
__entry->len,
__entry->slot)
__entry->len)
);
#define XFS_BUSY_STATES \
{ 0, "found" }, \
{ 1, "missing" }
{ 0, "missing" }, \
{ 1, "found" }
TRACE_EVENT(xfs_alloc_unbusy,
TRACE_EVENT(xfs_alloc_busysearch,
TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno,
int slot, int found),
TP_ARGS(mp, agno, slot, found),
xfs_agblock_t agbno, xfs_extlen_t len, int found),
TP_ARGS(mp, agno, agbno, len, found),
TP_STRUCT__entry(
__field(dev_t, dev)
__field(xfs_agnumber_t, agno)
__field(int, slot)
__field(xfs_agblock_t, agbno)
__field(xfs_extlen_t, len)
__field(int, found)
),
TP_fast_assign(
__entry->dev = mp->m_super->s_dev;
__entry->agno = agno;
__entry->slot = slot;
__entry->found = found;
),
TP_printk("dev %d:%d agno %u slot %d %s",
MAJOR(__entry->dev), MINOR(__entry->dev),
__entry->agno,
__entry->slot,
__print_symbolic(__entry->found, XFS_BUSY_STATES))
);
TRACE_EVENT(xfs_alloc_busysearch,
TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno, xfs_agblock_t agbno,
xfs_extlen_t len, xfs_lsn_t lsn),
TP_ARGS(mp, agno, agbno, len, lsn),
TP_STRUCT__entry(
__field(dev_t, dev)
__field(xfs_agnumber_t, agno)
__field(xfs_agblock_t, agbno)
__field(xfs_extlen_t, len)
__field(xfs_lsn_t, lsn)
),
TP_fast_assign(
__entry->dev = mp->m_super->s_dev;
__entry->agno = agno;
__entry->agbno = agbno;
__entry->len = len;
__entry->lsn = lsn;
__entry->found = found;
),
TP_printk("dev %d:%d agno %u agbno %u len %u force lsn 0x%llx",
TP_printk("dev %d:%d agno %u agbno %u len %u %s",
MAJOR(__entry->dev), MINOR(__entry->dev),
__entry->agno,
__entry->agbno,
__entry->len,
__print_symbolic(__entry->found, XFS_BUSY_STATES))
);
TRACE_EVENT(xfs_trans_commit_lsn,
TP_PROTO(struct xfs_trans *trans),
TP_ARGS(trans),
TP_STRUCT__entry(
__field(dev_t, dev)
__field(struct xfs_trans *, tp)
__field(xfs_lsn_t, lsn)
),
TP_fast_assign(
__entry->dev = trans->t_mountp->m_super->s_dev;
__entry->tp = trans;
__entry->lsn = trans->t_commit_lsn;
),
TP_printk("dev %d:%d trans 0x%p commit_lsn 0x%llx",
MAJOR(__entry->dev), MINOR(__entry->dev),
__entry->tp,
__entry->lsn)
);

View File

@ -344,9 +344,9 @@ xfs_qm_init_dquot_blk(
for (i = 0; i < q->qi_dqperchunk; i++, d++, curid++)
xfs_qm_dqinit_core(curid, type, d);
xfs_trans_dquot_buf(tp, bp,
(type & XFS_DQ_USER ? XFS_BLI_UDQUOT_BUF :
((type & XFS_DQ_PROJ) ? XFS_BLI_PDQUOT_BUF :
XFS_BLI_GDQUOT_BUF)));
(type & XFS_DQ_USER ? XFS_BLF_UDQUOT_BUF :
((type & XFS_DQ_PROJ) ? XFS_BLF_PDQUOT_BUF :
XFS_BLF_GDQUOT_BUF)));
xfs_trans_log_buf(tp, bp, 0, BBTOB(q->qi_dqchunklen) - 1);
}

View File

@ -175,14 +175,20 @@ typedef struct xfs_agfl {
} xfs_agfl_t;
/*
* Busy block/extent entry. Used in perag to mark blocks that have been freed
* but whose transactions aren't committed to disk yet.
* Busy block/extent entry. Indexed by a rbtree in perag to mark blocks that
* have been freed but whose transactions aren't committed to disk yet.
*
* Note that we use the transaction ID to record the transaction, not the
* transaction structure itself. See xfs_alloc_busy_insert() for details.
*/
typedef struct xfs_perag_busy {
xfs_agblock_t busy_start;
xfs_extlen_t busy_length;
struct xfs_trans *busy_tp; /* transaction that did the free */
} xfs_perag_busy_t;
struct xfs_busy_extent {
struct rb_node rb_node; /* ag by-bno indexed search tree */
struct list_head list; /* transaction busy extent list */
xfs_agnumber_t agno;
xfs_agblock_t bno;
xfs_extlen_t length;
xlog_tid_t tid; /* transaction that created this */
};
/*
* Per-ag incore structure, copies of information in agf and agi,
@ -216,7 +222,8 @@ typedef struct xfs_perag {
xfs_agino_t pagl_leftrec;
xfs_agino_t pagl_rightrec;
#ifdef __KERNEL__
spinlock_t pagb_lock; /* lock for pagb_list */
spinlock_t pagb_lock; /* lock for pagb_tree */
struct rb_root pagb_tree; /* ordered tree of busy extents */
atomic_t pagf_fstrms; /* # of filestreams active in this AG */
@ -226,7 +233,6 @@ typedef struct xfs_perag {
int pag_ici_reclaimable; /* reclaimable inodes */
#endif
int pagb_count; /* pagb slots in use */
xfs_perag_busy_t pagb_list[XFS_PAGB_NUM_SLOTS]; /* unstable blocks */
} xfs_perag_t;
/*

View File

@ -46,11 +46,9 @@
#define XFSA_FIXUP_BNO_OK 1
#define XFSA_FIXUP_CNT_OK 2
STATIC void
xfs_alloc_search_busy(xfs_trans_t *tp,
xfs_agnumber_t agno,
xfs_agblock_t bno,
xfs_extlen_t len);
static int
xfs_alloc_busy_search(struct xfs_mount *mp, xfs_agnumber_t agno,
xfs_agblock_t bno, xfs_extlen_t len);
/*
* Prototypes for per-ag allocation routines
@ -540,9 +538,16 @@ xfs_alloc_ag_vextent(
be32_to_cpu(agf->agf_length));
xfs_alloc_log_agf(args->tp, args->agbp,
XFS_AGF_FREEBLKS);
/* search the busylist for these blocks */
xfs_alloc_search_busy(args->tp, args->agno,
args->agbno, args->len);
/*
* Search the busylist for these blocks and mark the
* transaction as synchronous if blocks are found. This
* avoids the need to block due to a synchronous log
* force to ensure correct ordering as the synchronous
* transaction will guarantee that for us.
*/
if (xfs_alloc_busy_search(args->mp, args->agno,
args->agbno, args->len))
xfs_trans_set_sync(args->tp);
}
if (!args->isfl)
xfs_trans_mod_sb(args->tp,
@ -1693,7 +1698,7 @@ xfs_free_ag_extent(
* when the iclog commits to disk. If a busy block is allocated,
* the iclog is pushed up to the LSN that freed the block.
*/
xfs_alloc_mark_busy(tp, agno, bno, len);
xfs_alloc_busy_insert(tp, agno, bno, len);
return 0;
error0:
@ -1989,14 +1994,20 @@ xfs_alloc_get_freelist(
*bnop = bno;
/*
* As blocks are freed, they are added to the per-ag busy list
* and remain there until the freeing transaction is committed to
* disk. Now that we have allocated blocks, this list must be
* searched to see if a block is being reused. If one is, then
* the freeing transaction must be pushed to disk NOW by forcing
* to disk all iclogs up that transaction's LSN.
* As blocks are freed, they are added to the per-ag busy list and
* remain there until the freeing transaction is committed to disk.
* Now that we have allocated blocks, this list must be searched to see
* if a block is being reused. If one is, then the freeing transaction
* must be pushed to disk before this transaction.
*
* We do this by setting the current transaction to a sync transaction
* which guarantees that the freeing transaction is on disk before this
* transaction. This is done instead of a synchronous log force here so
* that we don't sit and wait with the AGF locked in the transaction
* during the log force.
*/
xfs_alloc_search_busy(tp, be32_to_cpu(agf->agf_seqno), bno, 1);
if (xfs_alloc_busy_search(mp, be32_to_cpu(agf->agf_seqno), bno, 1))
xfs_trans_set_sync(tp);
return 0;
}
@ -2201,7 +2212,7 @@ xfs_alloc_read_agf(
be32_to_cpu(agf->agf_levels[XFS_BTNUM_CNTi]);
spin_lock_init(&pag->pagb_lock);
pag->pagb_count = 0;
memset(pag->pagb_list, 0, sizeof(pag->pagb_list));
pag->pagb_tree = RB_ROOT;
pag->pagf_init = 1;
}
#ifdef DEBUG
@ -2479,127 +2490,263 @@ xfs_free_extent(
* list is reused, the transaction that freed it must be forced to disk
* before continuing to use the block.
*
* xfs_alloc_mark_busy - add to the per-ag busy list
* xfs_alloc_clear_busy - remove an item from the per-ag busy list
* xfs_alloc_busy_insert - add to the per-ag busy list
* xfs_alloc_busy_clear - remove an item from the per-ag busy list
* xfs_alloc_busy_search - search for a busy extent
*/
/*
* Insert a new extent into the busy tree.
*
* The busy extent tree is indexed by the start block of the busy extent.
* there can be multiple overlapping ranges in the busy extent tree but only
* ever one entry at a given start block. The reason for this is that
* multi-block extents can be freed, then smaller chunks of that extent
* allocated and freed again before the first transaction commit is on disk.
* If the exact same start block is freed a second time, we have to wait for
* that busy extent to pass out of the tree before the new extent is inserted.
* There are two main cases we have to handle here.
*
* The first case is a transaction that triggers a "free - allocate - free"
* cycle. This can occur during btree manipulations as a btree block is freed
* to the freelist, then allocated from the free list, then freed again. In
* this case, the second extxpnet free is what triggers the duplicate and as
* such the transaction IDs should match. Because the extent was allocated in
* this transaction, the transaction must be marked as synchronous. This is
* true for all cases where the free/alloc/free occurs in the one transaction,
* hence the addition of the ASSERT(tp->t_flags & XFS_TRANS_SYNC) to this case.
* This serves to catch violations of the second case quite effectively.
*
* The second case is where the free/alloc/free occur in different
* transactions. In this case, the thread freeing the extent the second time
* can't mark the extent busy immediately because it is already tracked in a
* transaction that may be committing. When the log commit for the existing
* busy extent completes, the busy extent will be removed from the tree. If we
* allow the second busy insert to continue using that busy extent structure,
* it can be freed before this transaction is safely in the log. Hence our
* only option in this case is to force the log to remove the existing busy
* extent from the list before we insert the new one with the current
* transaction ID.
*
* The problem we are trying to avoid in the free-alloc-free in separate
* transactions is most easily described with a timeline:
*
* Thread 1 Thread 2 Thread 3 xfslogd
* xact alloc
* free X
* mark busy
* commit xact
* free xact
* xact alloc
* alloc X
* busy search
* mark xact sync
* commit xact
* free xact
* force log
* checkpoint starts
* ....
* xact alloc
* free X
* mark busy
* finds match
* *** KABOOM! ***
* ....
* log IO completes
* unbusy X
* checkpoint completes
*
* By issuing a log force in thread 3 @ "KABOOM", the thread will block until
* the checkpoint completes, and the busy extent it matched will have been
* removed from the tree when it is woken. Hence it can then continue safely.
*
* However, to ensure this matching process is robust, we need to use the
* transaction ID for identifying transaction, as delayed logging results in
* the busy extent and transaction lifecycles being different. i.e. the busy
* extent is active for a lot longer than the transaction. Hence the
* transaction structure can be freed and reallocated, then mark the same
* extent busy again in the new transaction. In this case the new transaction
* will have a different tid but can have the same address, and hence we need
* to check against the tid.
*
* Future: for delayed logging, we could avoid the log force if the extent was
* first freed in the current checkpoint sequence. This, however, requires the
* ability to pin the current checkpoint in memory until this transaction
* commits to ensure that both the original free and the current one combine
* logically into the one checkpoint. If the checkpoint sequences are
* different, however, we still need to wait on a log force.
*/
void
xfs_alloc_mark_busy(xfs_trans_t *tp,
xfs_agnumber_t agno,
xfs_agblock_t bno,
xfs_extlen_t len)
xfs_alloc_busy_insert(
struct xfs_trans *tp,
xfs_agnumber_t agno,
xfs_agblock_t bno,
xfs_extlen_t len)
{
xfs_perag_busy_t *bsy;
struct xfs_busy_extent *new;
struct xfs_busy_extent *busyp;
struct xfs_perag *pag;
int n;
struct rb_node **rbp;
struct rb_node *parent;
int match;
pag = xfs_perag_get(tp->t_mountp, agno);
new = kmem_zalloc(sizeof(struct xfs_busy_extent), KM_MAYFAIL);
if (!new) {
/*
* No Memory! Since it is now not possible to track the free
* block, make this a synchronous transaction to insure that
* the block is not reused before this transaction commits.
*/
trace_xfs_alloc_busy(tp, agno, bno, len, 1);
xfs_trans_set_sync(tp);
return;
}
new->agno = agno;
new->bno = bno;
new->length = len;
new->tid = xfs_log_get_trans_ident(tp);
INIT_LIST_HEAD(&new->list);
/* trace before insert to be able to see failed inserts */
trace_xfs_alloc_busy(tp, agno, bno, len, 0);
pag = xfs_perag_get(tp->t_mountp, new->agno);
restart:
spin_lock(&pag->pagb_lock);
rbp = &pag->pagb_tree.rb_node;
parent = NULL;
busyp = NULL;
match = 0;
while (*rbp && match >= 0) {
parent = *rbp;
busyp = rb_entry(parent, struct xfs_busy_extent, rb_node);
/* search pagb_list for an open slot */
for (bsy = pag->pagb_list, n = 0;
n < XFS_PAGB_NUM_SLOTS;
bsy++, n++) {
if (bsy->busy_tp == NULL) {
if (new->bno < busyp->bno) {
/* may overlap, but exact start block is lower */
rbp = &(*rbp)->rb_left;
if (new->bno + new->length > busyp->bno)
match = busyp->tid == new->tid ? 1 : -1;
} else if (new->bno > busyp->bno) {
/* may overlap, but exact start block is higher */
rbp = &(*rbp)->rb_right;
if (bno < busyp->bno + busyp->length)
match = busyp->tid == new->tid ? 1 : -1;
} else {
match = busyp->tid == new->tid ? 1 : -1;
break;
}
}
trace_xfs_alloc_busy(tp->t_mountp, agno, bno, len, n);
if (n < XFS_PAGB_NUM_SLOTS) {
bsy = &pag->pagb_list[n];
pag->pagb_count++;
bsy->busy_start = bno;
bsy->busy_length = len;
bsy->busy_tp = tp;
xfs_trans_add_busy(tp, agno, n);
} else {
/*
* The busy list is full! Since it is now not possible to
* track the free block, make this a synchronous transaction
* to insure that the block is not reused before this
* transaction commits.
*/
xfs_trans_set_sync(tp);
if (match < 0) {
/* overlap marked busy in different transaction */
spin_unlock(&pag->pagb_lock);
xfs_log_force(tp->t_mountp, XFS_LOG_SYNC);
goto restart;
}
if (match > 0) {
/*
* overlap marked busy in same transaction. Update if exact
* start block match, otherwise combine the busy extents into
* a single range.
*/
if (busyp->bno == new->bno) {
busyp->length = max(busyp->length, new->length);
spin_unlock(&pag->pagb_lock);
ASSERT(tp->t_flags & XFS_TRANS_SYNC);
xfs_perag_put(pag);
kmem_free(new);
return;
}
rb_erase(&busyp->rb_node, &pag->pagb_tree);
new->length = max(busyp->bno + busyp->length,
new->bno + new->length) -
min(busyp->bno, new->bno);
new->bno = min(busyp->bno, new->bno);
} else
busyp = NULL;
rb_link_node(&new->rb_node, parent, rbp);
rb_insert_color(&new->rb_node, &pag->pagb_tree);
list_add(&new->list, &tp->t_busy);
spin_unlock(&pag->pagb_lock);
xfs_perag_put(pag);
kmem_free(busyp);
}
/*
* Search for a busy extent within the range of the extent we are about to
* allocate. You need to be holding the busy extent tree lock when calling
* xfs_alloc_busy_search(). This function returns 0 for no overlapping busy
* extent, -1 for an overlapping but not exact busy extent, and 1 for an exact
* match. This is done so that a non-zero return indicates an overlap that
* will require a synchronous transaction, but it can still be
* used to distinguish between a partial or exact match.
*/
static int
xfs_alloc_busy_search(
struct xfs_mount *mp,
xfs_agnumber_t agno,
xfs_agblock_t bno,
xfs_extlen_t len)
{
struct xfs_perag *pag;
struct rb_node *rbp;
struct xfs_busy_extent *busyp;
int match = 0;
pag = xfs_perag_get(mp, agno);
spin_lock(&pag->pagb_lock);
rbp = pag->pagb_tree.rb_node;
/* find closest start bno overlap */
while (rbp) {
busyp = rb_entry(rbp, struct xfs_busy_extent, rb_node);
if (bno < busyp->bno) {
/* may overlap, but exact start block is lower */
if (bno + len > busyp->bno)
match = -1;
rbp = rbp->rb_left;
} else if (bno > busyp->bno) {
/* may overlap, but exact start block is higher */
if (bno < busyp->bno + busyp->length)
match = -1;
rbp = rbp->rb_right;
} else {
/* bno matches busyp, length determines exact match */
match = (busyp->length == len) ? 1 : -1;
break;
}
}
spin_unlock(&pag->pagb_lock);
trace_xfs_alloc_busysearch(mp, agno, bno, len, !!match);
xfs_perag_put(pag);
return match;
}
void
xfs_alloc_clear_busy(xfs_trans_t *tp,
xfs_agnumber_t agno,
int idx)
xfs_alloc_busy_clear(
struct xfs_mount *mp,
struct xfs_busy_extent *busyp)
{
struct xfs_perag *pag;
xfs_perag_busy_t *list;
ASSERT(idx < XFS_PAGB_NUM_SLOTS);
pag = xfs_perag_get(tp->t_mountp, agno);
trace_xfs_alloc_unbusy(mp, busyp->agno, busyp->bno,
busyp->length);
ASSERT(xfs_alloc_busy_search(mp, busyp->agno, busyp->bno,
busyp->length) == 1);
list_del_init(&busyp->list);
pag = xfs_perag_get(mp, busyp->agno);
spin_lock(&pag->pagb_lock);
list = pag->pagb_list;
trace_xfs_alloc_unbusy(tp->t_mountp, agno, idx, list[idx].busy_tp == tp);
if (list[idx].busy_tp == tp) {
list[idx].busy_tp = NULL;
pag->pagb_count--;
}
rb_erase(&busyp->rb_node, &pag->pagb_tree);
spin_unlock(&pag->pagb_lock);
xfs_perag_put(pag);
}
/*
* If we find the extent in the busy list, force the log out to get the
* extent out of the busy list so the caller can use it straight away.
*/
STATIC void
xfs_alloc_search_busy(xfs_trans_t *tp,
xfs_agnumber_t agno,
xfs_agblock_t bno,
xfs_extlen_t len)
{
struct xfs_perag *pag;
xfs_perag_busy_t *bsy;
xfs_agblock_t uend, bend;
xfs_lsn_t lsn = 0;
int cnt;
pag = xfs_perag_get(tp->t_mountp, agno);
spin_lock(&pag->pagb_lock);
cnt = pag->pagb_count;
/*
* search pagb_list for this slot, skipping open slots. We have to
* search the entire array as there may be multiple overlaps and
* we have to get the most recent LSN for the log force to push out
* all the transactions that span the range.
*/
uend = bno + len - 1;
for (cnt = 0; cnt < pag->pagb_count; cnt++) {
bsy = &pag->pagb_list[cnt];
if (!bsy->busy_tp)
continue;
bend = bsy->busy_start + bsy->busy_length - 1;
if (bno > bend || uend < bsy->busy_start)
continue;
/* (start1,length1) within (start2, length2) */
if (XFS_LSN_CMP(bsy->busy_tp->t_commit_lsn, lsn) > 0)
lsn = bsy->busy_tp->t_commit_lsn;
}
spin_unlock(&pag->pagb_lock);
xfs_perag_put(pag);
trace_xfs_alloc_busysearch(tp->t_mountp, agno, bno, len, lsn);
/*
* If a block was found, force the log through the LSN of the
* transaction that freed the block
*/
if (lsn)
xfs_log_force_lsn(tp->t_mountp, lsn, XFS_LOG_SYNC);
kmem_free(busyp);
}

View File

@ -22,6 +22,7 @@ struct xfs_buf;
struct xfs_mount;
struct xfs_perag;
struct xfs_trans;
struct xfs_busy_extent;
/*
* Freespace allocation types. Argument to xfs_alloc_[v]extent.
@ -119,15 +120,13 @@ xfs_alloc_longest_free_extent(struct xfs_mount *mp,
#ifdef __KERNEL__
void
xfs_alloc_mark_busy(xfs_trans_t *tp,
xfs_alloc_busy_insert(xfs_trans_t *tp,
xfs_agnumber_t agno,
xfs_agblock_t bno,
xfs_extlen_t len);
void
xfs_alloc_clear_busy(xfs_trans_t *tp,
xfs_agnumber_t ag,
int idx);
xfs_alloc_busy_clear(struct xfs_mount *mp, struct xfs_busy_extent *busyp);
#endif /* __KERNEL__ */

View File

@ -134,7 +134,7 @@ xfs_allocbt_free_block(
* disk. If a busy block is allocated, the iclog is pushed up to the
* LSN that freed the block.
*/
xfs_alloc_mark_busy(cur->bc_tp, be32_to_cpu(agf->agf_seqno), bno, 1);
xfs_alloc_busy_insert(cur->bc_tp, be32_to_cpu(agf->agf_seqno), bno, 1);
xfs_trans_agbtree_delta(cur->bc_tp, -1);
return 0;
}

View File

@ -64,7 +64,7 @@ xfs_buf_item_log_debug(
nbytes = last - first + 1;
bfset(bip->bli_logged, first, nbytes);
for (x = 0; x < nbytes; x++) {
chunk_num = byte >> XFS_BLI_SHIFT;
chunk_num = byte >> XFS_BLF_SHIFT;
word_num = chunk_num >> BIT_TO_WORD_SHIFT;
bit_num = chunk_num & (NBWORD - 1);
wordp = &(bip->bli_format.blf_data_map[word_num]);
@ -166,7 +166,7 @@ xfs_buf_item_size(
* cancel flag in it.
*/
trace_xfs_buf_item_size_stale(bip);
ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
return 1;
}
@ -197,9 +197,9 @@ xfs_buf_item_size(
} else if (next_bit != last_bit + 1) {
last_bit = next_bit;
nvecs++;
} else if (xfs_buf_offset(bp, next_bit * XFS_BLI_CHUNK) !=
(xfs_buf_offset(bp, last_bit * XFS_BLI_CHUNK) +
XFS_BLI_CHUNK)) {
} else if (xfs_buf_offset(bp, next_bit * XFS_BLF_CHUNK) !=
(xfs_buf_offset(bp, last_bit * XFS_BLF_CHUNK) +
XFS_BLF_CHUNK)) {
last_bit = next_bit;
nvecs++;
} else {
@ -254,6 +254,20 @@ xfs_buf_item_format(
vecp++;
nvecs = 1;
/*
* If it is an inode buffer, transfer the in-memory state to the
* format flags and clear the in-memory state. We do not transfer
* this state if the inode buffer allocation has not yet been committed
* to the log as setting the XFS_BLI_INODE_BUF flag will prevent
* correct replay of the inode allocation.
*/
if (bip->bli_flags & XFS_BLI_INODE_BUF) {
if (!((bip->bli_flags & XFS_BLI_INODE_ALLOC_BUF) &&
xfs_log_item_in_current_chkpt(&bip->bli_item)))
bip->bli_format.blf_flags |= XFS_BLF_INODE_BUF;
bip->bli_flags &= ~XFS_BLI_INODE_BUF;
}
if (bip->bli_flags & XFS_BLI_STALE) {
/*
* The buffer is stale, so all we need to log
@ -261,7 +275,7 @@ xfs_buf_item_format(
* cancel flag in it.
*/
trace_xfs_buf_item_format_stale(bip);
ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
bip->bli_format.blf_size = nvecs;
return;
}
@ -294,28 +308,28 @@ xfs_buf_item_format(
* keep counting and scanning.
*/
if (next_bit == -1) {
buffer_offset = first_bit * XFS_BLI_CHUNK;
buffer_offset = first_bit * XFS_BLF_CHUNK;
vecp->i_addr = xfs_buf_offset(bp, buffer_offset);
vecp->i_len = nbits * XFS_BLI_CHUNK;
vecp->i_len = nbits * XFS_BLF_CHUNK;
vecp->i_type = XLOG_REG_TYPE_BCHUNK;
nvecs++;
break;
} else if (next_bit != last_bit + 1) {
buffer_offset = first_bit * XFS_BLI_CHUNK;
buffer_offset = first_bit * XFS_BLF_CHUNK;
vecp->i_addr = xfs_buf_offset(bp, buffer_offset);
vecp->i_len = nbits * XFS_BLI_CHUNK;
vecp->i_len = nbits * XFS_BLF_CHUNK;
vecp->i_type = XLOG_REG_TYPE_BCHUNK;
nvecs++;
vecp++;
first_bit = next_bit;
last_bit = next_bit;
nbits = 1;
} else if (xfs_buf_offset(bp, next_bit << XFS_BLI_SHIFT) !=
(xfs_buf_offset(bp, last_bit << XFS_BLI_SHIFT) +
XFS_BLI_CHUNK)) {
buffer_offset = first_bit * XFS_BLI_CHUNK;
} else if (xfs_buf_offset(bp, next_bit << XFS_BLF_SHIFT) !=
(xfs_buf_offset(bp, last_bit << XFS_BLF_SHIFT) +
XFS_BLF_CHUNK)) {
buffer_offset = first_bit * XFS_BLF_CHUNK;
vecp->i_addr = xfs_buf_offset(bp, buffer_offset);
vecp->i_len = nbits * XFS_BLI_CHUNK;
vecp->i_len = nbits * XFS_BLF_CHUNK;
vecp->i_type = XLOG_REG_TYPE_BCHUNK;
/* You would think we need to bump the nvecs here too, but we do not
* this number is used by recovery, and it gets confused by the boundary
@ -341,10 +355,15 @@ xfs_buf_item_format(
}
/*
* This is called to pin the buffer associated with the buf log
* item in memory so it cannot be written out. Simply call bpin()
* on the buffer to do this.
* This is called to pin the buffer associated with the buf log item in memory
* so it cannot be written out. Simply call bpin() on the buffer to do this.
*
* We also always take a reference to the buffer log item here so that the bli
* is held while the item is pinned in memory. This means that we can
* unconditionally drop the reference count a transaction holds when the
* transaction is completed.
*/
STATIC void
xfs_buf_item_pin(
xfs_buf_log_item_t *bip)
@ -356,6 +375,7 @@ xfs_buf_item_pin(
ASSERT(atomic_read(&bip->bli_refcount) > 0);
ASSERT((bip->bli_flags & XFS_BLI_LOGGED) ||
(bip->bli_flags & XFS_BLI_STALE));
atomic_inc(&bip->bli_refcount);
trace_xfs_buf_item_pin(bip);
xfs_bpin(bp);
}
@ -393,7 +413,7 @@ xfs_buf_item_unpin(
ASSERT(XFS_BUF_VALUSEMA(bp) <= 0);
ASSERT(!(XFS_BUF_ISDELAYWRITE(bp)));
ASSERT(XFS_BUF_ISSTALE(bp));
ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
trace_xfs_buf_item_unpin_stale(bip);
/*
@ -489,20 +509,23 @@ xfs_buf_item_trylock(
}
/*
* Release the buffer associated with the buf log item.
* If there is no dirty logged data associated with the
* buffer recorded in the buf log item, then free the
* buf log item and remove the reference to it in the
* buffer.
* Release the buffer associated with the buf log item. If there is no dirty
* logged data associated with the buffer recorded in the buf log item, then
* free the buf log item and remove the reference to it in the buffer.
*
* This call ignores the recursion count. It is only called
* when the buffer should REALLY be unlocked, regardless
* of the recursion count.
* This call ignores the recursion count. It is only called when the buffer
* should REALLY be unlocked, regardless of the recursion count.
*
* If the XFS_BLI_HOLD flag is set in the buf log item, then
* free the log item if necessary but do not unlock the buffer.
* This is for support of xfs_trans_bhold(). Make sure the
* XFS_BLI_HOLD field is cleared if we don't free the item.
* We unconditionally drop the transaction's reference to the log item. If the
* item was logged, then another reference was taken when it was pinned, so we
* can safely drop the transaction reference now. This also allows us to avoid
* potential races with the unpin code freeing the bli by not referencing the
* bli after we've dropped the reference count.
*
* If the XFS_BLI_HOLD flag is set in the buf log item, then free the log item
* if necessary but do not unlock the buffer. This is for support of
* xfs_trans_bhold(). Make sure the XFS_BLI_HOLD field is cleared if we don't
* free the item.
*/
STATIC void
xfs_buf_item_unlock(
@ -514,73 +537,54 @@ xfs_buf_item_unlock(
bp = bip->bli_buf;
/*
* Clear the buffer's association with this transaction.
*/
/* Clear the buffer's association with this transaction. */
XFS_BUF_SET_FSPRIVATE2(bp, NULL);
/*
* If this is a transaction abort, don't return early.
* Instead, allow the brelse to happen.
* Normally it would be done for stale (cancelled) buffers
* at unpin time, but we'll never go through the pin/unpin
* cycle if we abort inside commit.
* If this is a transaction abort, don't return early. Instead, allow
* the brelse to happen. Normally it would be done for stale
* (cancelled) buffers at unpin time, but we'll never go through the
* pin/unpin cycle if we abort inside commit.
*/
aborted = (bip->bli_item.li_flags & XFS_LI_ABORTED) != 0;
/*
* If the buf item is marked stale, then don't do anything.
* We'll unlock the buffer and free the buf item when the
* buffer is unpinned for the last time.
*/
if (bip->bli_flags & XFS_BLI_STALE) {
bip->bli_flags &= ~XFS_BLI_LOGGED;
trace_xfs_buf_item_unlock_stale(bip);
ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
if (!aborted)
return;
}
/*
* Drop the transaction's reference to the log item if
* it was not logged as part of the transaction. Otherwise
* we'll drop the reference in xfs_buf_item_unpin() when
* the transaction is really through with the buffer.
*/
if (!(bip->bli_flags & XFS_BLI_LOGGED)) {
atomic_dec(&bip->bli_refcount);
} else {
/*
* Clear the logged flag since this is per
* transaction state.
*/
bip->bli_flags &= ~XFS_BLI_LOGGED;
}
/*
* Before possibly freeing the buf item, determine if we should
* release the buffer at the end of this routine.
*/
hold = bip->bli_flags & XFS_BLI_HOLD;
/* Clear the per transaction state. */
bip->bli_flags &= ~(XFS_BLI_LOGGED | XFS_BLI_HOLD);
/*
* If the buf item is marked stale, then don't do anything. We'll
* unlock the buffer and free the buf item when the buffer is unpinned
* for the last time.
*/
if (bip->bli_flags & XFS_BLI_STALE) {
trace_xfs_buf_item_unlock_stale(bip);
ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
if (!aborted) {
atomic_dec(&bip->bli_refcount);
return;
}
}
trace_xfs_buf_item_unlock(bip);
/*
* If the buf item isn't tracking any data, free it.
* Otherwise, if XFS_BLI_HOLD is set clear it.
* If the buf item isn't tracking any data, free it, otherwise drop the
* reference we hold to it.
*/
if (xfs_bitmap_empty(bip->bli_format.blf_data_map,
bip->bli_format.blf_map_size)) {
bip->bli_format.blf_map_size))
xfs_buf_item_relse(bp);
} else if (hold) {
bip->bli_flags &= ~XFS_BLI_HOLD;
}
else
atomic_dec(&bip->bli_refcount);
/*
* Release the buffer if XFS_BLI_HOLD was not set.
*/
if (!hold) {
if (!hold)
xfs_buf_relse(bp);
}
}
/*
@ -717,12 +721,12 @@ xfs_buf_item_init(
}
/*
* chunks is the number of XFS_BLI_CHUNK size pieces
* chunks is the number of XFS_BLF_CHUNK size pieces
* the buffer can be divided into. Make sure not to
* truncate any pieces. map_size is the size of the
* bitmap needed to describe the chunks of the buffer.
*/
chunks = (int)((XFS_BUF_COUNT(bp) + (XFS_BLI_CHUNK - 1)) >> XFS_BLI_SHIFT);
chunks = (int)((XFS_BUF_COUNT(bp) + (XFS_BLF_CHUNK - 1)) >> XFS_BLF_SHIFT);
map_size = (int)((chunks + NBWORD) >> BIT_TO_WORD_SHIFT);
bip = (xfs_buf_log_item_t*)kmem_zone_zalloc(xfs_buf_item_zone,
@ -790,8 +794,8 @@ xfs_buf_item_log(
/*
* Convert byte offsets to bit numbers.
*/
first_bit = first >> XFS_BLI_SHIFT;
last_bit = last >> XFS_BLI_SHIFT;
first_bit = first >> XFS_BLF_SHIFT;
last_bit = last >> XFS_BLF_SHIFT;
/*
* Calculate the total number of bits to be set.

View File

@ -41,22 +41,22 @@ typedef struct xfs_buf_log_format {
* This flag indicates that the buffer contains on disk inodes
* and requires special recovery handling.
*/
#define XFS_BLI_INODE_BUF 0x1
#define XFS_BLF_INODE_BUF 0x1
/*
* This flag indicates that the buffer should not be replayed
* during recovery because its blocks are being freed.
*/
#define XFS_BLI_CANCEL 0x2
#define XFS_BLF_CANCEL 0x2
/*
* This flag indicates that the buffer contains on disk
* user or group dquots and may require special recovery handling.
*/
#define XFS_BLI_UDQUOT_BUF 0x4
#define XFS_BLI_PDQUOT_BUF 0x8
#define XFS_BLI_GDQUOT_BUF 0x10
#define XFS_BLF_UDQUOT_BUF 0x4
#define XFS_BLF_PDQUOT_BUF 0x8
#define XFS_BLF_GDQUOT_BUF 0x10
#define XFS_BLI_CHUNK 128
#define XFS_BLI_SHIFT 7
#define XFS_BLF_CHUNK 128
#define XFS_BLF_SHIFT 7
#define BIT_TO_WORD_SHIFT 5
#define NBWORD (NBBY * sizeof(unsigned int))
@ -69,6 +69,7 @@ typedef struct xfs_buf_log_format {
#define XFS_BLI_LOGGED 0x08
#define XFS_BLI_INODE_ALLOC_BUF 0x10
#define XFS_BLI_STALE_INODE 0x20
#define XFS_BLI_INODE_BUF 0x40
#define XFS_BLI_FLAGS \
{ XFS_BLI_HOLD, "HOLD" }, \
@ -76,7 +77,8 @@ typedef struct xfs_buf_log_format {
{ XFS_BLI_STALE, "STALE" }, \
{ XFS_BLI_LOGGED, "LOGGED" }, \
{ XFS_BLI_INODE_ALLOC_BUF, "INODE_ALLOC" }, \
{ XFS_BLI_STALE_INODE, "STALE_INODE" }
{ XFS_BLI_STALE_INODE, "STALE_INODE" }, \
{ XFS_BLI_INODE_BUF, "INODE_BUF" }
#ifdef __KERNEL__

View File

@ -170,7 +170,7 @@ xfs_cmn_err(int panic_tag, int level, xfs_mount_t *mp, char *fmt, ...)
va_list ap;
#ifdef DEBUG
xfs_panic_mask |= XFS_PTAG_SHUTDOWN_CORRUPT;
xfs_panic_mask |= (XFS_PTAG_SHUTDOWN_CORRUPT | XFS_PTAG_LOGRES);
#endif
if (xfs_panic_mask && (xfs_panic_mask & panic_tag)

View File

@ -54,9 +54,6 @@ STATIC xlog_t * xlog_alloc_log(xfs_mount_t *mp,
STATIC int xlog_space_left(xlog_t *log, int cycle, int bytes);
STATIC int xlog_sync(xlog_t *log, xlog_in_core_t *iclog);
STATIC void xlog_dealloc_log(xlog_t *log);
STATIC int xlog_write(struct log *log, struct xfs_log_vec *log_vector,
struct xlog_ticket *tic, xfs_lsn_t *start_lsn,
xlog_in_core_t **commit_iclog, uint flags);
/* local state machine functions */
STATIC void xlog_state_done_syncing(xlog_in_core_t *iclog, int);
@ -86,14 +83,6 @@ STATIC int xlog_regrant_write_log_space(xlog_t *log,
STATIC void xlog_ungrant_log_space(xlog_t *log,
xlog_ticket_t *ticket);
/* local ticket functions */
STATIC xlog_ticket_t *xlog_ticket_alloc(xlog_t *log,
int unit_bytes,
int count,
char clientid,
uint flags);
#if defined(DEBUG)
STATIC void xlog_verify_dest_ptr(xlog_t *log, char *ptr);
STATIC void xlog_verify_grant_head(xlog_t *log, int equals);
@ -360,6 +349,15 @@ xfs_log_reserve(
ASSERT(flags & XFS_LOG_PERM_RESERV);
internal_ticket = *ticket;
/*
* this is a new transaction on the ticket, so we need to
* change the transaction ID so that the next transaction has a
* different TID in the log. Just add one to the existing tid
* so that we can see chains of rolling transactions in the log
* easily.
*/
internal_ticket->t_tid++;
trace_xfs_log_reserve(log, internal_ticket);
xlog_grant_push_ail(mp, internal_ticket->t_unit_res);
@ -367,7 +365,8 @@ xfs_log_reserve(
} else {
/* may sleep if need to allocate more tickets */
internal_ticket = xlog_ticket_alloc(log, unit_bytes, cnt,
client, flags);
client, flags,
KM_SLEEP|KM_MAYFAIL);
if (!internal_ticket)
return XFS_ERROR(ENOMEM);
internal_ticket->t_trans_type = t_type;
@ -452,6 +451,13 @@ xfs_log_mount(
/* Normal transactions can now occur */
mp->m_log->l_flags &= ~XLOG_ACTIVE_RECOVERY;
/*
* Now the log has been fully initialised and we know were our
* space grant counters are, we can initialise the permanent ticket
* needed for delayed logging to work.
*/
xlog_cil_init_post_recovery(mp->m_log);
return 0;
out_destroy_ail:
@ -658,6 +664,10 @@ xfs_log_item_init(
item->li_ailp = mp->m_ail;
item->li_type = type;
item->li_ops = ops;
item->li_lv = NULL;
INIT_LIST_HEAD(&item->li_ail);
INIT_LIST_HEAD(&item->li_cil);
}
/*
@ -1168,6 +1178,9 @@ xlog_alloc_log(xfs_mount_t *mp,
*iclogp = log->l_iclog; /* complete ring */
log->l_iclog->ic_prev = prev_iclog; /* re-write 1st prev ptr */
error = xlog_cil_init(log);
if (error)
goto out_free_iclog;
return log;
out_free_iclog:
@ -1494,6 +1507,8 @@ xlog_dealloc_log(xlog_t *log)
xlog_in_core_t *iclog, *next_iclog;
int i;
xlog_cil_destroy(log);
iclog = log->l_iclog;
for (i=0; i<log->l_iclog_bufs; i++) {
sv_destroy(&iclog->ic_force_wait);
@ -1536,8 +1551,10 @@ xlog_state_finish_copy(xlog_t *log,
* print out info relating to regions written which consume
* the reservation
*/
STATIC void
xlog_print_tic_res(xfs_mount_t *mp, xlog_ticket_t *ticket)
void
xlog_print_tic_res(
struct xfs_mount *mp,
struct xlog_ticket *ticket)
{
uint i;
uint ophdr_spc = ticket->t_res_num_ophdrs * (uint)sizeof(xlog_op_header_t);
@ -1637,6 +1654,10 @@ xlog_print_tic_res(xfs_mount_t *mp, xlog_ticket_t *ticket)
"bad-rtype" : res_type_str[r_type-1]),
ticket->t_res_arr[i].r_len);
}
xfs_cmn_err(XFS_PTAG_LOGRES, CE_ALERT, mp,
"xfs_log_write: reservation ran out. Need to up reservation");
xfs_force_shutdown(mp, SHUTDOWN_CORRUPT_INCORE);
}
/*
@ -1865,7 +1886,7 @@ xlog_write_copy_finish(
* we don't update ic_offset until the end when we know exactly how many
* bytes have been written out.
*/
STATIC int
int
xlog_write(
struct log *log,
struct xfs_log_vec *log_vector,
@ -1889,22 +1910,26 @@ xlog_write(
*start_lsn = 0;
len = xlog_write_calc_vec_length(ticket, log_vector);
if (ticket->t_curr_res < len) {
if (log->l_cilp) {
/*
* Region headers and bytes are already accounted for.
* We only need to take into account start records and
* split regions in this function.
*/
if (ticket->t_flags & XLOG_TIC_INITED)
ticket->t_curr_res -= sizeof(xlog_op_header_t);
/*
* Commit record headers need to be accounted for. These
* come in as separate writes so are easy to detect.
*/
if (flags & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS))
ticket->t_curr_res -= sizeof(xlog_op_header_t);
} else
ticket->t_curr_res -= len;
if (ticket->t_curr_res < 0)
xlog_print_tic_res(log->l_mp, ticket);
#ifdef DEBUG
xlog_panic(
"xfs_log_write: reservation ran out. Need to up reservation");
#else
/* Customer configurable panic */
xfs_cmn_err(XFS_PTAG_LOGRES, CE_ALERT, log->l_mp,
"xfs_log_write: reservation ran out. Need to up reservation");
/* If we did not panic, shutdown the filesystem */
xfs_force_shutdown(log->l_mp, SHUTDOWN_CORRUPT_INCORE);
#endif
}
ticket->t_curr_res -= len;
index = 0;
lv = log_vector;
@ -3000,6 +3025,8 @@ _xfs_log_force(
XFS_STATS_INC(xs_log_force);
xlog_cil_push(log, 1);
spin_lock(&log->l_icloglock);
iclog = log->l_iclog;
@ -3149,6 +3176,12 @@ _xfs_log_force_lsn(
XFS_STATS_INC(xs_log_force);
if (log->l_cilp) {
lsn = xlog_cil_push_lsn(log, lsn);
if (lsn == NULLCOMMITLSN)
return 0;
}
try_again:
spin_lock(&log->l_icloglock);
iclog = log->l_iclog;
@ -3313,22 +3346,30 @@ xfs_log_ticket_get(
return ticket;
}
xlog_tid_t
xfs_log_get_trans_ident(
struct xfs_trans *tp)
{
return tp->t_ticket->t_tid;
}
/*
* Allocate and initialise a new log ticket.
*/
STATIC xlog_ticket_t *
xlog_ticket_t *
xlog_ticket_alloc(
struct log *log,
int unit_bytes,
int cnt,
char client,
uint xflags)
uint xflags,
int alloc_flags)
{
struct xlog_ticket *tic;
uint num_headers;
int iclog_space;
tic = kmem_zone_zalloc(xfs_log_ticket_zone, KM_SLEEP|KM_MAYFAIL);
tic = kmem_zone_zalloc(xfs_log_ticket_zone, alloc_flags);
if (!tic)
return NULL;
@ -3647,6 +3688,11 @@ xlog_state_ioerror(
* c. nothing new gets queued up after (a) and (b) are done.
* d. if !logerror, flush the iclogs to disk, then seal them off
* for business.
*
* Note: for delayed logging the !logerror case needs to flush the regions
* held in memory out to the iclogs before flushing them to disk. This needs
* to be done before the log is marked as shutdown, otherwise the flush to the
* iclogs will fail.
*/
int
xfs_log_force_umount(
@ -3680,6 +3726,16 @@ xfs_log_force_umount(
return 1;
}
retval = 0;
/*
* Flush the in memory commit item list before marking the log as
* being shut down. We need to do it in this order to ensure all the
* completed transactions are flushed to disk with the xfs_log_force()
* call below.
*/
if (!logerror && (mp->m_flags & XFS_MOUNT_DELAYLOG))
xlog_cil_push(log, 1);
/*
* We must hold both the GRANT lock and the LOG lock,
* before we mark the filesystem SHUTDOWN and wake

View File

@ -19,7 +19,6 @@
#define __XFS_LOG_H__
/* get lsn fields */
#define CYCLE_LSN(lsn) ((uint)((lsn)>>32))
#define BLOCK_LSN(lsn) ((uint)(lsn))
@ -114,6 +113,9 @@ struct xfs_log_vec {
struct xfs_log_vec *lv_next; /* next lv in build list */
int lv_niovecs; /* number of iovecs in lv */
struct xfs_log_iovec *lv_iovecp; /* iovec array */
struct xfs_log_item *lv_item; /* owner */
char *lv_buf; /* formatted buffer */
int lv_buf_len; /* size of formatted buffer */
};
/*
@ -134,6 +136,7 @@ struct xlog_in_core;
struct xlog_ticket;
struct xfs_log_item;
struct xfs_item_ops;
struct xfs_trans;
void xfs_log_item_init(struct xfs_mount *mp,
struct xfs_log_item *item,
@ -187,9 +190,16 @@ int xfs_log_need_covered(struct xfs_mount *mp);
void xlog_iodone(struct xfs_buf *);
struct xlog_ticket * xfs_log_ticket_get(struct xlog_ticket *ticket);
struct xlog_ticket *xfs_log_ticket_get(struct xlog_ticket *ticket);
void xfs_log_ticket_put(struct xlog_ticket *ticket);
xlog_tid_t xfs_log_get_trans_ident(struct xfs_trans *tp);
int xfs_log_commit_cil(struct xfs_mount *mp, struct xfs_trans *tp,
struct xfs_log_vec *log_vector,
xfs_lsn_t *commit_lsn, int flags);
bool xfs_log_item_in_current_chkpt(struct xfs_log_item *lip);
#endif

725
fs/xfs/xfs_log_cil.c Normal file
View File

@ -0,0 +1,725 @@
/*
* Copyright (c) 2010 Red Hat, Inc. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it would be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "xfs.h"
#include "xfs_fs.h"
#include "xfs_types.h"
#include "xfs_bit.h"
#include "xfs_log.h"
#include "xfs_inum.h"
#include "xfs_trans.h"
#include "xfs_trans_priv.h"
#include "xfs_log_priv.h"
#include "xfs_sb.h"
#include "xfs_ag.h"
#include "xfs_dir2.h"
#include "xfs_dmapi.h"
#include "xfs_mount.h"
#include "xfs_error.h"
#include "xfs_alloc.h"
/*
* Perform initial CIL structure initialisation. If the CIL is not
* enabled in this filesystem, ensure the log->l_cilp is null so
* we can check this conditional to determine if we are doing delayed
* logging or not.
*/
int
xlog_cil_init(
struct log *log)
{
struct xfs_cil *cil;
struct xfs_cil_ctx *ctx;
log->l_cilp = NULL;
if (!(log->l_mp->m_flags & XFS_MOUNT_DELAYLOG))
return 0;
cil = kmem_zalloc(sizeof(*cil), KM_SLEEP|KM_MAYFAIL);
if (!cil)
return ENOMEM;
ctx = kmem_zalloc(sizeof(*ctx), KM_SLEEP|KM_MAYFAIL);
if (!ctx) {
kmem_free(cil);
return ENOMEM;
}
INIT_LIST_HEAD(&cil->xc_cil);
INIT_LIST_HEAD(&cil->xc_committing);
spin_lock_init(&cil->xc_cil_lock);
init_rwsem(&cil->xc_ctx_lock);
sv_init(&cil->xc_commit_wait, SV_DEFAULT, "cilwait");
INIT_LIST_HEAD(&ctx->committing);
INIT_LIST_HEAD(&ctx->busy_extents);
ctx->sequence = 1;
ctx->cil = cil;
cil->xc_ctx = ctx;
cil->xc_log = log;
log->l_cilp = cil;
return 0;
}
void
xlog_cil_destroy(
struct log *log)
{
if (!log->l_cilp)
return;
if (log->l_cilp->xc_ctx) {
if (log->l_cilp->xc_ctx->ticket)
xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
kmem_free(log->l_cilp->xc_ctx);
}
ASSERT(list_empty(&log->l_cilp->xc_cil));
kmem_free(log->l_cilp);
}
/*
* Allocate a new ticket. Failing to get a new ticket makes it really hard to
* recover, so we don't allow failure here. Also, we allocate in a context that
* we don't want to be issuing transactions from, so we need to tell the
* allocation code this as well.
*
* We don't reserve any space for the ticket - we are going to steal whatever
* space we require from transactions as they commit. To ensure we reserve all
* the space required, we need to set the current reservation of the ticket to
* zero so that we know to steal the initial transaction overhead from the
* first transaction commit.
*/
static struct xlog_ticket *
xlog_cil_ticket_alloc(
struct log *log)
{
struct xlog_ticket *tic;
tic = xlog_ticket_alloc(log, 0, 1, XFS_TRANSACTION, 0,
KM_SLEEP|KM_NOFS);
tic->t_trans_type = XFS_TRANS_CHECKPOINT;
/*
* set the current reservation to zero so we know to steal the basic
* transaction overhead reservation from the first transaction commit.
*/
tic->t_curr_res = 0;
return tic;
}
/*
* After the first stage of log recovery is done, we know where the head and
* tail of the log are. We need this log initialisation done before we can
* initialise the first CIL checkpoint context.
*
* Here we allocate a log ticket to track space usage during a CIL push. This
* ticket is passed to xlog_write() directly so that we don't slowly leak log
* space by failing to account for space used by log headers and additional
* region headers for split regions.
*/
void
xlog_cil_init_post_recovery(
struct log *log)
{
if (!log->l_cilp)
return;
log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log);
log->l_cilp->xc_ctx->sequence = 1;
log->l_cilp->xc_ctx->commit_lsn = xlog_assign_lsn(log->l_curr_cycle,
log->l_curr_block);
}
/*
* Insert the log item into the CIL and calculate the difference in space
* consumed by the item. Add the space to the checkpoint ticket and calculate
* if the change requires additional log metadata. If it does, take that space
* as well. Remove the amount of space we addded to the checkpoint ticket from
* the current transaction ticket so that the accounting works out correctly.
*
* If this is the first time the item is being placed into the CIL in this
* context, pin it so it can't be written to disk until the CIL is flushed to
* the iclog and the iclog written to disk.
*/
static void
xlog_cil_insert(
struct log *log,
struct xlog_ticket *ticket,
struct xfs_log_item *item,
struct xfs_log_vec *lv)
{
struct xfs_cil *cil = log->l_cilp;
struct xfs_log_vec *old = lv->lv_item->li_lv;
struct xfs_cil_ctx *ctx = cil->xc_ctx;
int len;
int diff_iovecs;
int iclog_space;
if (old) {
/* existing lv on log item, space used is a delta */
ASSERT(!list_empty(&item->li_cil));
ASSERT(old->lv_buf && old->lv_buf_len && old->lv_niovecs);
len = lv->lv_buf_len - old->lv_buf_len;
diff_iovecs = lv->lv_niovecs - old->lv_niovecs;
kmem_free(old->lv_buf);
kmem_free(old);
} else {
/* new lv, must pin the log item */
ASSERT(!lv->lv_item->li_lv);
ASSERT(list_empty(&item->li_cil));
len = lv->lv_buf_len;
diff_iovecs = lv->lv_niovecs;
IOP_PIN(lv->lv_item);
}
len += diff_iovecs * sizeof(xlog_op_header_t);
/* attach new log vector to log item */
lv->lv_item->li_lv = lv;
spin_lock(&cil->xc_cil_lock);
list_move_tail(&item->li_cil, &cil->xc_cil);
ctx->nvecs += diff_iovecs;
/*
* If this is the first time the item is being committed to the CIL,
* store the sequence number on the log item so we can tell
* in future commits whether this is the first checkpoint the item is
* being committed into.
*/
if (!item->li_seq)
item->li_seq = ctx->sequence;
/*
* Now transfer enough transaction reservation to the context ticket
* for the checkpoint. The context ticket is special - the unit
* reservation has to grow as well as the current reservation as we
* steal from tickets so we can correctly determine the space used
* during the transaction commit.
*/
if (ctx->ticket->t_curr_res == 0) {
/* first commit in checkpoint, steal the header reservation */
ASSERT(ticket->t_curr_res >= ctx->ticket->t_unit_res + len);
ctx->ticket->t_curr_res = ctx->ticket->t_unit_res;
ticket->t_curr_res -= ctx->ticket->t_unit_res;
}
/* do we need space for more log record headers? */
iclog_space = log->l_iclog_size - log->l_iclog_hsize;
if (len > 0 && (ctx->space_used / iclog_space !=
(ctx->space_used + len) / iclog_space)) {
int hdrs;
hdrs = (len + iclog_space - 1) / iclog_space;
/* need to take into account split region headers, too */
hdrs *= log->l_iclog_hsize + sizeof(struct xlog_op_header);
ctx->ticket->t_unit_res += hdrs;
ctx->ticket->t_curr_res += hdrs;
ticket->t_curr_res -= hdrs;
ASSERT(ticket->t_curr_res >= len);
}
ticket->t_curr_res -= len;
ctx->space_used += len;
spin_unlock(&cil->xc_cil_lock);
}
/*
* Format log item into a flat buffers
*
* For delayed logging, we need to hold a formatted buffer containing all the
* changes on the log item. This enables us to relog the item in memory and
* write it out asynchronously without needing to relock the object that was
* modified at the time it gets written into the iclog.
*
* This function builds a vector for the changes in each log item in the
* transaction. It then works out the length of the buffer needed for each log
* item, allocates them and formats the vector for the item into the buffer.
* The buffer is then attached to the log item are then inserted into the
* Committed Item List for tracking until the next checkpoint is written out.
*
* We don't set up region headers during this process; we simply copy the
* regions into the flat buffer. We can do this because we still have to do a
* formatting step to write the regions into the iclog buffer. Writing the
* ophdrs during the iclog write means that we can support splitting large
* regions across iclog boundares without needing a change in the format of the
* item/region encapsulation.
*
* Hence what we need to do now is change the rewrite the vector array to point
* to the copied region inside the buffer we just allocated. This allows us to
* format the regions into the iclog as though they are being formatted
* directly out of the objects themselves.
*/
static void
xlog_cil_format_items(
struct log *log,
struct xfs_log_vec *log_vector,
struct xlog_ticket *ticket,
xfs_lsn_t *start_lsn)
{
struct xfs_log_vec *lv;
if (start_lsn)
*start_lsn = log->l_cilp->xc_ctx->sequence;
ASSERT(log_vector);
for (lv = log_vector; lv; lv = lv->lv_next) {
void *ptr;
int index;
int len = 0;
/* build the vector array and calculate it's length */
IOP_FORMAT(lv->lv_item, lv->lv_iovecp);
for (index = 0; index < lv->lv_niovecs; index++)
len += lv->lv_iovecp[index].i_len;
lv->lv_buf_len = len;
lv->lv_buf = kmem_zalloc(lv->lv_buf_len, KM_SLEEP|KM_NOFS);
ptr = lv->lv_buf;
for (index = 0; index < lv->lv_niovecs; index++) {
struct xfs_log_iovec *vec = &lv->lv_iovecp[index];
memcpy(ptr, vec->i_addr, vec->i_len);
vec->i_addr = ptr;
ptr += vec->i_len;
}
ASSERT(ptr == lv->lv_buf + lv->lv_buf_len);
xlog_cil_insert(log, ticket, lv->lv_item, lv);
}
}
static void
xlog_cil_free_logvec(
struct xfs_log_vec *log_vector)
{
struct xfs_log_vec *lv;
for (lv = log_vector; lv; ) {
struct xfs_log_vec *next = lv->lv_next;
kmem_free(lv->lv_buf);
kmem_free(lv);
lv = next;
}
}
/*
* Commit a transaction with the given vector to the Committed Item List.
*
* To do this, we need to format the item, pin it in memory if required and
* account for the space used by the transaction. Once we have done that we
* need to release the unused reservation for the transaction, attach the
* transaction to the checkpoint context so we carry the busy extents through
* to checkpoint completion, and then unlock all the items in the transaction.
*
* For more specific information about the order of operations in
* xfs_log_commit_cil() please refer to the comments in
* xfs_trans_commit_iclog().
*
* Called with the context lock already held in read mode to lock out
* background commit, returns without it held once background commits are
* allowed again.
*/
int
xfs_log_commit_cil(
struct xfs_mount *mp,
struct xfs_trans *tp,
struct xfs_log_vec *log_vector,
xfs_lsn_t *commit_lsn,
int flags)
{
struct log *log = mp->m_log;
int log_flags = 0;
int push = 0;
if (flags & XFS_TRANS_RELEASE_LOG_RES)
log_flags = XFS_LOG_REL_PERM_RESERV;
if (XLOG_FORCED_SHUTDOWN(log)) {
xlog_cil_free_logvec(log_vector);
return XFS_ERROR(EIO);
}
/* lock out background commit */
down_read(&log->l_cilp->xc_ctx_lock);
xlog_cil_format_items(log, log_vector, tp->t_ticket, commit_lsn);
/* check we didn't blow the reservation */
if (tp->t_ticket->t_curr_res < 0)
xlog_print_tic_res(log->l_mp, tp->t_ticket);
/* attach the transaction to the CIL if it has any busy extents */
if (!list_empty(&tp->t_busy)) {
spin_lock(&log->l_cilp->xc_cil_lock);
list_splice_init(&tp->t_busy,
&log->l_cilp->xc_ctx->busy_extents);
spin_unlock(&log->l_cilp->xc_cil_lock);
}
tp->t_commit_lsn = *commit_lsn;
xfs_log_done(mp, tp->t_ticket, NULL, log_flags);
xfs_trans_unreserve_and_mod_sb(tp);
/* check for background commit before unlock */
if (log->l_cilp->xc_ctx->space_used > XLOG_CIL_SPACE_LIMIT(log))
push = 1;
up_read(&log->l_cilp->xc_ctx_lock);
/*
* We need to push CIL every so often so we don't cache more than we
* can fit in the log. The limit really is that a checkpoint can't be
* more than half the log (the current checkpoint is not allowed to
* overwrite the previous checkpoint), but commit latency and memory
* usage limit this to a smaller size in most cases.
*/
if (push)
xlog_cil_push(log, 0);
return 0;
}
/*
* Mark all items committed and clear busy extents. We free the log vector
* chains in a separate pass so that we unpin the log items as quickly as
* possible.
*/
static void
xlog_cil_committed(
void *args,
int abort)
{
struct xfs_cil_ctx *ctx = args;
struct xfs_log_vec *lv;
int abortflag = abort ? XFS_LI_ABORTED : 0;
struct xfs_busy_extent *busyp, *n;
/* unpin all the log items */
for (lv = ctx->lv_chain; lv; lv = lv->lv_next ) {
xfs_trans_item_committed(lv->lv_item, ctx->start_lsn,
abortflag);
}
list_for_each_entry_safe(busyp, n, &ctx->busy_extents, list)
xfs_alloc_busy_clear(ctx->cil->xc_log->l_mp, busyp);
spin_lock(&ctx->cil->xc_cil_lock);
list_del(&ctx->committing);
spin_unlock(&ctx->cil->xc_cil_lock);
xlog_cil_free_logvec(ctx->lv_chain);
kmem_free(ctx);
}
/*
* Push the Committed Item List to the log. If the push_now flag is not set,
* then it is a background flush and so we can chose to ignore it.
*/
int
xlog_cil_push(
struct log *log,
int push_now)
{
struct xfs_cil *cil = log->l_cilp;
struct xfs_log_vec *lv;
struct xfs_cil_ctx *ctx;
struct xfs_cil_ctx *new_ctx;
struct xlog_in_core *commit_iclog;
struct xlog_ticket *tic;
int num_lv;
int num_iovecs;
int len;
int error = 0;
struct xfs_trans_header thdr;
struct xfs_log_iovec lhdr;
struct xfs_log_vec lvhdr = { NULL };
xfs_lsn_t commit_lsn;
if (!cil)
return 0;
new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_SLEEP|KM_NOFS);
new_ctx->ticket = xlog_cil_ticket_alloc(log);
/* lock out transaction commit, but don't block on background push */
if (!down_write_trylock(&cil->xc_ctx_lock)) {
if (!push_now)
goto out_free_ticket;
down_write(&cil->xc_ctx_lock);
}
ctx = cil->xc_ctx;
/* check if we've anything to push */
if (list_empty(&cil->xc_cil))
goto out_skip;
/* check for spurious background flush */
if (!push_now && cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log))
goto out_skip;
/*
* pull all the log vectors off the items in the CIL, and
* remove the items from the CIL. We don't need the CIL lock
* here because it's only needed on the transaction commit
* side which is currently locked out by the flush lock.
*/
lv = NULL;
num_lv = 0;
num_iovecs = 0;
len = 0;
while (!list_empty(&cil->xc_cil)) {
struct xfs_log_item *item;
int i;
item = list_first_entry(&cil->xc_cil,
struct xfs_log_item, li_cil);
list_del_init(&item->li_cil);
if (!ctx->lv_chain)
ctx->lv_chain = item->li_lv;
else
lv->lv_next = item->li_lv;
lv = item->li_lv;
item->li_lv = NULL;
num_lv++;
num_iovecs += lv->lv_niovecs;
for (i = 0; i < lv->lv_niovecs; i++)
len += lv->lv_iovecp[i].i_len;
}
/*
* initialise the new context and attach it to the CIL. Then attach
* the current context to the CIL committing lsit so it can be found
* during log forces to extract the commit lsn of the sequence that
* needs to be forced.
*/
INIT_LIST_HEAD(&new_ctx->committing);
INIT_LIST_HEAD(&new_ctx->busy_extents);
new_ctx->sequence = ctx->sequence + 1;
new_ctx->cil = cil;
cil->xc_ctx = new_ctx;
/*
* The switch is now done, so we can drop the context lock and move out
* of a shared context. We can't just go straight to the commit record,
* though - we need to synchronise with previous and future commits so
* that the commit records are correctly ordered in the log to ensure
* that we process items during log IO completion in the correct order.
*
* For example, if we get an EFI in one checkpoint and the EFD in the
* next (e.g. due to log forces), we do not want the checkpoint with
* the EFD to be committed before the checkpoint with the EFI. Hence
* we must strictly order the commit records of the checkpoints so
* that: a) the checkpoint callbacks are attached to the iclogs in the
* correct order; and b) the checkpoints are replayed in correct order
* in log recovery.
*
* Hence we need to add this context to the committing context list so
* that higher sequences will wait for us to write out a commit record
* before they do.
*/
spin_lock(&cil->xc_cil_lock);
list_add(&ctx->committing, &cil->xc_committing);
spin_unlock(&cil->xc_cil_lock);
up_write(&cil->xc_ctx_lock);
/*
* Build a checkpoint transaction header and write it to the log to
* begin the transaction. We need to account for the space used by the
* transaction header here as it is not accounted for in xlog_write().
*
* The LSN we need to pass to the log items on transaction commit is
* the LSN reported by the first log vector write. If we use the commit
* record lsn then we can move the tail beyond the grant write head.
*/
tic = ctx->ticket;
thdr.th_magic = XFS_TRANS_HEADER_MAGIC;
thdr.th_type = XFS_TRANS_CHECKPOINT;
thdr.th_tid = tic->t_tid;
thdr.th_num_items = num_iovecs;
lhdr.i_addr = (xfs_caddr_t)&thdr;
lhdr.i_len = sizeof(xfs_trans_header_t);
lhdr.i_type = XLOG_REG_TYPE_TRANSHDR;
tic->t_curr_res -= lhdr.i_len + sizeof(xlog_op_header_t);
lvhdr.lv_niovecs = 1;
lvhdr.lv_iovecp = &lhdr;
lvhdr.lv_next = ctx->lv_chain;
error = xlog_write(log, &lvhdr, tic, &ctx->start_lsn, NULL, 0);
if (error)
goto out_abort;
/*
* now that we've written the checkpoint into the log, strictly
* order the commit records so replay will get them in the right order.
*/
restart:
spin_lock(&cil->xc_cil_lock);
list_for_each_entry(new_ctx, &cil->xc_committing, committing) {
/*
* Higher sequences will wait for this one so skip them.
* Don't wait for own own sequence, either.
*/
if (new_ctx->sequence >= ctx->sequence)
continue;
if (!new_ctx->commit_lsn) {
/*
* It is still being pushed! Wait for the push to
* complete, then start again from the beginning.
*/
sv_wait(&cil->xc_commit_wait, 0, &cil->xc_cil_lock, 0);
goto restart;
}
}
spin_unlock(&cil->xc_cil_lock);
commit_lsn = xfs_log_done(log->l_mp, tic, &commit_iclog, 0);
if (error || commit_lsn == -1)
goto out_abort;
/* attach all the transactions w/ busy extents to iclog */
ctx->log_cb.cb_func = xlog_cil_committed;
ctx->log_cb.cb_arg = ctx;
error = xfs_log_notify(log->l_mp, commit_iclog, &ctx->log_cb);
if (error)
goto out_abort;
/*
* now the checkpoint commit is complete and we've attached the
* callbacks to the iclog we can assign the commit LSN to the context
* and wake up anyone who is waiting for the commit to complete.
*/
spin_lock(&cil->xc_cil_lock);
ctx->commit_lsn = commit_lsn;
sv_broadcast(&cil->xc_commit_wait);
spin_unlock(&cil->xc_cil_lock);
/* release the hounds! */
return xfs_log_release_iclog(log->l_mp, commit_iclog);
out_skip:
up_write(&cil->xc_ctx_lock);
out_free_ticket:
xfs_log_ticket_put(new_ctx->ticket);
kmem_free(new_ctx);
return 0;
out_abort:
xlog_cil_committed(ctx, XFS_LI_ABORTED);
return XFS_ERROR(EIO);
}
/*
* Conditionally push the CIL based on the sequence passed in.
*
* We only need to push if we haven't already pushed the sequence
* number given. Hence the only time we will trigger a push here is
* if the push sequence is the same as the current context.
*
* We return the current commit lsn to allow the callers to determine if a
* iclog flush is necessary following this call.
*
* XXX: Initially, just push the CIL unconditionally and return whatever
* commit lsn is there. It'll be empty, so this is broken for now.
*/
xfs_lsn_t
xlog_cil_push_lsn(
struct log *log,
xfs_lsn_t push_seq)
{
struct xfs_cil *cil = log->l_cilp;
struct xfs_cil_ctx *ctx;
xfs_lsn_t commit_lsn = NULLCOMMITLSN;
restart:
down_write(&cil->xc_ctx_lock);
ASSERT(push_seq <= cil->xc_ctx->sequence);
/* check to see if we need to force out the current context */
if (push_seq == cil->xc_ctx->sequence) {
up_write(&cil->xc_ctx_lock);
xlog_cil_push(log, 1);
goto restart;
}
/*
* See if we can find a previous sequence still committing.
* We can drop the flush lock as soon as we have the cil lock
* because we are now only comparing contexts protected by
* the cil lock.
*
* We need to wait for all previous sequence commits to complete
* before allowing the force of push_seq to go ahead. Hence block
* on commits for those as well.
*/
spin_lock(&cil->xc_cil_lock);
up_write(&cil->xc_ctx_lock);
list_for_each_entry(ctx, &cil->xc_committing, committing) {
if (ctx->sequence > push_seq)
continue;
if (!ctx->commit_lsn) {
/*
* It is still being pushed! Wait for the push to
* complete, then start again from the beginning.
*/
sv_wait(&cil->xc_commit_wait, 0, &cil->xc_cil_lock, 0);
goto restart;
}
if (ctx->sequence != push_seq)
continue;
/* found it! */
commit_lsn = ctx->commit_lsn;
}
spin_unlock(&cil->xc_cil_lock);
return commit_lsn;
}
/*
* Check if the current log item was first committed in this sequence.
* We can't rely on just the log item being in the CIL, we have to check
* the recorded commit sequence number.
*
* Note: for this to be used in a non-racy manner, it has to be called with
* CIL flushing locked out. As a result, it should only be used during the
* transaction commit process when deciding what to format into the item.
*/
bool
xfs_log_item_in_current_chkpt(
struct xfs_log_item *lip)
{
struct xfs_cil_ctx *ctx;
if (!(lip->li_mountp->m_flags & XFS_MOUNT_DELAYLOG))
return false;
if (list_empty(&lip->li_cil))
return false;
ctx = lip->li_mountp->m_log->l_cilp->xc_ctx;
/*
* li_seq is written on the first commit of a log item to record the
* first checkpoint it is written to. Hence if it is different to the
* current sequence, we're in a new checkpoint.
*/
if (XFS_LSN_CMP(lip->li_seq, ctx->sequence) != 0)
return false;
return true;
}

View File

@ -152,8 +152,6 @@ static inline uint xlog_get_client_id(__be32 i)
#define XLOG_RECOVERY_NEEDED 0x4 /* log was recovered */
#define XLOG_IO_ERROR 0x8 /* log hit an I/O error, and being
shutdown */
typedef __uint32_t xlog_tid_t;
#ifdef __KERNEL__
/*
@ -378,6 +376,99 @@ typedef struct xlog_in_core {
#define ic_header ic_data->hic_header
} xlog_in_core_t;
/*
* The CIL context is used to aggregate per-transaction details as well be
* passed to the iclog for checkpoint post-commit processing. After being
* passed to the iclog, another context needs to be allocated for tracking the
* next set of transactions to be aggregated into a checkpoint.
*/
struct xfs_cil;
struct xfs_cil_ctx {
struct xfs_cil *cil;
xfs_lsn_t sequence; /* chkpt sequence # */
xfs_lsn_t start_lsn; /* first LSN of chkpt commit */
xfs_lsn_t commit_lsn; /* chkpt commit record lsn */
struct xlog_ticket *ticket; /* chkpt ticket */
int nvecs; /* number of regions */
int space_used; /* aggregate size of regions */
struct list_head busy_extents; /* busy extents in chkpt */
struct xfs_log_vec *lv_chain; /* logvecs being pushed */
xfs_log_callback_t log_cb; /* completion callback hook. */
struct list_head committing; /* ctx committing list */
};
/*
* Committed Item List structure
*
* This structure is used to track log items that have been committed but not
* yet written into the log. It is used only when the delayed logging mount
* option is enabled.
*
* This structure tracks the list of committing checkpoint contexts so
* we can avoid the problem of having to hold out new transactions during a
* flush until we have a the commit record LSN of the checkpoint. We can
* traverse the list of committing contexts in xlog_cil_push_lsn() to find a
* sequence match and extract the commit LSN directly from there. If the
* checkpoint is still in the process of committing, we can block waiting for
* the commit LSN to be determined as well. This should make synchronous
* operations almost as efficient as the old logging methods.
*/
struct xfs_cil {
struct log *xc_log;
struct list_head xc_cil;
spinlock_t xc_cil_lock;
struct xfs_cil_ctx *xc_ctx;
struct rw_semaphore xc_ctx_lock;
struct list_head xc_committing;
sv_t xc_commit_wait;
};
/*
* The amount of log space we should the CIL to aggregate is difficult to size.
* Whatever we chose we have to make we can get a reservation for the log space
* effectively, that it is large enough to capture sufficient relogging to
* reduce log buffer IO significantly, but it is not too large for the log or
* induces too much latency when writing out through the iclogs. We track both
* space consumed and the number of vectors in the checkpoint context, so we
* need to decide which to use for limiting.
*
* Every log buffer we write out during a push needs a header reserved, which
* is at least one sector and more for v2 logs. Hence we need a reservation of
* at least 512 bytes per 32k of log space just for the LR headers. That means
* 16KB of reservation per megabyte of delayed logging space we will consume,
* plus various headers. The number of headers will vary based on the num of
* io vectors, so limiting on a specific number of vectors is going to result
* in transactions of varying size. IOWs, it is more consistent to track and
* limit space consumed in the log rather than by the number of objects being
* logged in order to prevent checkpoint ticket overruns.
*
* Further, use of static reservations through the log grant mechanism is
* problematic. It introduces a lot of complexity (e.g. reserve grant vs write
* grant) and a significant deadlock potential because regranting write space
* can block on log pushes. Hence if we have to regrant log space during a log
* push, we can deadlock.
*
* However, we can avoid this by use of a dynamic "reservation stealing"
* technique during transaction commit whereby unused reservation space in the
* transaction ticket is transferred to the CIL ctx commit ticket to cover the
* space needed by the checkpoint transaction. This means that we never need to
* specifically reserve space for the CIL checkpoint transaction, nor do we
* need to regrant space once the checkpoint completes. This also means the
* checkpoint transaction ticket is specific to the checkpoint context, rather
* than the CIL itself.
*
* With dynamic reservations, we can basically make up arbitrary limits for the
* checkpoint size so long as they don't violate any other size rules. Hence
* the initial maximum size for the checkpoint transaction will be set to a
* quarter of the log or 8MB, which ever is smaller. 8MB is an arbitrary limit
* right now based on the latency of writing out a large amount of data through
* the circular iclog buffers.
*/
#define XLOG_CIL_SPACE_LIMIT(log) \
(min((log->l_logsize >> 2), (8 * 1024 * 1024)))
/*
* The reservation head lsn is not made up of a cycle number and block number.
* Instead, it uses a cycle number and byte number. Logs don't expect to
@ -388,6 +479,7 @@ typedef struct log {
/* The following fields don't need locking */
struct xfs_mount *l_mp; /* mount point */
struct xfs_ail *l_ailp; /* AIL log is working with */
struct xfs_cil *l_cilp; /* CIL log is working with */
struct xfs_buf *l_xbuf; /* extra buffer for log
* wrapping */
struct xfs_buftarg *l_targ; /* buftarg of log */
@ -438,14 +530,17 @@ typedef struct log {
#define XLOG_FORCED_SHUTDOWN(log) ((log)->l_flags & XLOG_IO_ERROR)
/* common routines */
extern xfs_lsn_t xlog_assign_tail_lsn(struct xfs_mount *mp);
extern int xlog_recover(xlog_t *log);
extern int xlog_recover_finish(xlog_t *log);
extern void xlog_pack_data(xlog_t *log, xlog_in_core_t *iclog, int);
extern kmem_zone_t *xfs_log_ticket_zone;
extern kmem_zone_t *xfs_log_ticket_zone;
struct xlog_ticket *xlog_ticket_alloc(struct log *log, int unit_bytes,
int count, char client, uint xflags,
int alloc_flags);
static inline void
xlog_write_adv_cnt(void **ptr, int *len, int *off, size_t bytes)
@ -455,6 +550,21 @@ xlog_write_adv_cnt(void **ptr, int *len, int *off, size_t bytes)
*off += bytes;
}
void xlog_print_tic_res(struct xfs_mount *mp, struct xlog_ticket *ticket);
int xlog_write(struct log *log, struct xfs_log_vec *log_vector,
struct xlog_ticket *tic, xfs_lsn_t *start_lsn,
xlog_in_core_t **commit_iclog, uint flags);
/*
* Committed Item List interfaces
*/
int xlog_cil_init(struct log *log);
void xlog_cil_init_post_recovery(struct log *log);
void xlog_cil_destroy(struct log *log);
int xlog_cil_push(struct log *log, int push_now);
xfs_lsn_t xlog_cil_push_lsn(struct log *log, xfs_lsn_t push_sequence);
/*
* Unmount record type is used as a pseudo transaction type for the ticket.
* It's value must be outside the range of XFS_TRANS_* values.

View File

@ -1576,7 +1576,7 @@ xlog_recover_reorder_trans(
switch (ITEM_TYPE(item)) {
case XFS_LI_BUF:
if (!(buf_f->blf_flags & XFS_BLI_CANCEL)) {
if (!(buf_f->blf_flags & XFS_BLF_CANCEL)) {
trace_xfs_log_recover_item_reorder_head(log,
trans, item, pass);
list_move(&item->ri_list, &trans->r_itemq);
@ -1638,7 +1638,7 @@ xlog_recover_do_buffer_pass1(
/*
* If this isn't a cancel buffer item, then just return.
*/
if (!(flags & XFS_BLI_CANCEL)) {
if (!(flags & XFS_BLF_CANCEL)) {
trace_xfs_log_recover_buf_not_cancel(log, buf_f);
return;
}
@ -1696,7 +1696,7 @@ xlog_recover_do_buffer_pass1(
* Check to see whether the buffer being recovered has a corresponding
* entry in the buffer cancel record table. If it does then return 1
* so that it will be cancelled, otherwise return 0. If the buffer is
* actually a buffer cancel item (XFS_BLI_CANCEL is set), then decrement
* actually a buffer cancel item (XFS_BLF_CANCEL is set), then decrement
* the refcount on the entry in the table and remove it from the table
* if this is the last reference.
*
@ -1721,7 +1721,7 @@ xlog_check_buffer_cancelled(
* There is nothing in the table built in pass one,
* so this buffer must not be cancelled.
*/
ASSERT(!(flags & XFS_BLI_CANCEL));
ASSERT(!(flags & XFS_BLF_CANCEL));
return 0;
}
@ -1733,7 +1733,7 @@ xlog_check_buffer_cancelled(
* There is no corresponding entry in the table built
* in pass one, so this buffer has not been cancelled.
*/
ASSERT(!(flags & XFS_BLI_CANCEL));
ASSERT(!(flags & XFS_BLF_CANCEL));
return 0;
}
@ -1752,7 +1752,7 @@ xlog_check_buffer_cancelled(
* one in the table and remove it if this is the
* last reference.
*/
if (flags & XFS_BLI_CANCEL) {
if (flags & XFS_BLF_CANCEL) {
bcp->bc_refcount--;
if (bcp->bc_refcount == 0) {
if (prevp == NULL) {
@ -1772,7 +1772,7 @@ xlog_check_buffer_cancelled(
* We didn't find a corresponding entry in the table, so
* return 0 so that the buffer is NOT cancelled.
*/
ASSERT(!(flags & XFS_BLI_CANCEL));
ASSERT(!(flags & XFS_BLF_CANCEL));
return 0;
}
@ -1874,8 +1874,8 @@ xlog_recover_do_inode_buffer(
nbits = xfs_contig_bits(data_map, map_size,
bit);
ASSERT(nbits > 0);
reg_buf_offset = bit << XFS_BLI_SHIFT;
reg_buf_bytes = nbits << XFS_BLI_SHIFT;
reg_buf_offset = bit << XFS_BLF_SHIFT;
reg_buf_bytes = nbits << XFS_BLF_SHIFT;
item_index++;
}
@ -1889,7 +1889,7 @@ xlog_recover_do_inode_buffer(
}
ASSERT(item->ri_buf[item_index].i_addr != NULL);
ASSERT((item->ri_buf[item_index].i_len % XFS_BLI_CHUNK) == 0);
ASSERT((item->ri_buf[item_index].i_len % XFS_BLF_CHUNK) == 0);
ASSERT((reg_buf_offset + reg_buf_bytes) <= XFS_BUF_COUNT(bp));
/*
@ -1955,9 +1955,9 @@ xlog_recover_do_reg_buffer(
nbits = xfs_contig_bits(data_map, map_size, bit);
ASSERT(nbits > 0);
ASSERT(item->ri_buf[i].i_addr != NULL);
ASSERT(item->ri_buf[i].i_len % XFS_BLI_CHUNK == 0);
ASSERT(item->ri_buf[i].i_len % XFS_BLF_CHUNK == 0);
ASSERT(XFS_BUF_COUNT(bp) >=
((uint)bit << XFS_BLI_SHIFT)+(nbits<<XFS_BLI_SHIFT));
((uint)bit << XFS_BLF_SHIFT)+(nbits<<XFS_BLF_SHIFT));
/*
* Do a sanity check if this is a dquot buffer. Just checking
@ -1966,7 +1966,7 @@ xlog_recover_do_reg_buffer(
*/
error = 0;
if (buf_f->blf_flags &
(XFS_BLI_UDQUOT_BUF|XFS_BLI_PDQUOT_BUF|XFS_BLI_GDQUOT_BUF)) {
(XFS_BLF_UDQUOT_BUF|XFS_BLF_PDQUOT_BUF|XFS_BLF_GDQUOT_BUF)) {
if (item->ri_buf[i].i_addr == NULL) {
cmn_err(CE_ALERT,
"XFS: NULL dquot in %s.", __func__);
@ -1987,9 +1987,9 @@ xlog_recover_do_reg_buffer(
}
memcpy(xfs_buf_offset(bp,
(uint)bit << XFS_BLI_SHIFT), /* dest */
(uint)bit << XFS_BLF_SHIFT), /* dest */
item->ri_buf[i].i_addr, /* source */
nbits<<XFS_BLI_SHIFT); /* length */
nbits<<XFS_BLF_SHIFT); /* length */
next:
i++;
bit += nbits;
@ -2148,11 +2148,11 @@ xlog_recover_do_dquot_buffer(
}
type = 0;
if (buf_f->blf_flags & XFS_BLI_UDQUOT_BUF)
if (buf_f->blf_flags & XFS_BLF_UDQUOT_BUF)
type |= XFS_DQ_USER;
if (buf_f->blf_flags & XFS_BLI_PDQUOT_BUF)
if (buf_f->blf_flags & XFS_BLF_PDQUOT_BUF)
type |= XFS_DQ_PROJ;
if (buf_f->blf_flags & XFS_BLI_GDQUOT_BUF)
if (buf_f->blf_flags & XFS_BLF_GDQUOT_BUF)
type |= XFS_DQ_GROUP;
/*
* This type of quotas was turned off, so ignore this buffer
@ -2173,7 +2173,7 @@ xlog_recover_do_dquot_buffer(
* here which overlaps that may be stale.
*
* When meta-data buffers are freed at run time we log a buffer item
* with the XFS_BLI_CANCEL bit set to indicate that previous copies
* with the XFS_BLF_CANCEL bit set to indicate that previous copies
* of the buffer in the log should not be replayed at recovery time.
* This is so that if the blocks covered by the buffer are reused for
* file data before we crash we don't end up replaying old, freed
@ -2207,7 +2207,7 @@ xlog_recover_do_buffer_trans(
if (pass == XLOG_RECOVER_PASS1) {
/*
* In this pass we're only looking for buf items
* with the XFS_BLI_CANCEL bit set.
* with the XFS_BLF_CANCEL bit set.
*/
xlog_recover_do_buffer_pass1(log, buf_f);
return 0;
@ -2244,7 +2244,7 @@ xlog_recover_do_buffer_trans(
mp = log->l_mp;
buf_flags = XBF_LOCK;
if (!(flags & XFS_BLI_INODE_BUF))
if (!(flags & XFS_BLF_INODE_BUF))
buf_flags |= XBF_MAPPED;
bp = xfs_buf_read(mp->m_ddev_targp, blkno, len, buf_flags);
@ -2257,10 +2257,10 @@ xlog_recover_do_buffer_trans(
}
error = 0;
if (flags & XFS_BLI_INODE_BUF) {
if (flags & XFS_BLF_INODE_BUF) {
error = xlog_recover_do_inode_buffer(mp, item, bp, buf_f);
} else if (flags &
(XFS_BLI_UDQUOT_BUF|XFS_BLI_PDQUOT_BUF|XFS_BLI_GDQUOT_BUF)) {
(XFS_BLF_UDQUOT_BUF|XFS_BLF_PDQUOT_BUF|XFS_BLF_GDQUOT_BUF)) {
xlog_recover_do_dquot_buffer(mp, log, item, bp, buf_f);
} else {
xlog_recover_do_reg_buffer(mp, item, bp, buf_f);

View File

@ -28,7 +28,7 @@
#define XLOG_RHASH(tid) \
((((__uint32_t)tid)>>XLOG_RHASH_SHIFT) & (XLOG_RHASH_SIZE-1))
#define XLOG_MAX_REGIONS_IN_ITEM (XFS_MAX_BLOCKSIZE / XFS_BLI_CHUNK / 2 + 1)
#define XLOG_MAX_REGIONS_IN_ITEM (XFS_MAX_BLOCKSIZE / XFS_BLF_CHUNK / 2 + 1)
/*

View File

@ -268,6 +268,7 @@ typedef struct xfs_mount {
#define XFS_MOUNT_WSYNC (1ULL << 0) /* for nfs - all metadata ops
must be synchronous except
for space allocations */
#define XFS_MOUNT_DELAYLOG (1ULL << 1) /* delayed logging is enabled */
#define XFS_MOUNT_DMAPI (1ULL << 2) /* dmapi is enabled */
#define XFS_MOUNT_WAS_CLEAN (1ULL << 3)
#define XFS_MOUNT_FS_SHUTDOWN (1ULL << 4) /* atomic stop of all filesystem

View File

@ -44,6 +44,7 @@
#include "xfs_trans_priv.h"
#include "xfs_trans_space.h"
#include "xfs_inode_item.h"
#include "xfs_trace.h"
kmem_zone_t *xfs_trans_zone;
@ -243,9 +244,8 @@ _xfs_trans_alloc(
tp->t_type = type;
tp->t_mountp = mp;
tp->t_items_free = XFS_LIC_NUM_SLOTS;
tp->t_busy_free = XFS_LBC_NUM_SLOTS;
xfs_lic_init(&(tp->t_items));
XFS_LBC_INIT(&(tp->t_busy));
INIT_LIST_HEAD(&tp->t_busy);
return tp;
}
@ -255,8 +255,13 @@ _xfs_trans_alloc(
*/
STATIC void
xfs_trans_free(
xfs_trans_t *tp)
struct xfs_trans *tp)
{
struct xfs_busy_extent *busyp, *n;
list_for_each_entry_safe(busyp, n, &tp->t_busy, list)
xfs_alloc_busy_clear(tp->t_mountp, busyp);
atomic_dec(&tp->t_mountp->m_active_trans);
xfs_trans_free_dqinfo(tp);
kmem_zone_free(xfs_trans_zone, tp);
@ -285,9 +290,8 @@ xfs_trans_dup(
ntp->t_type = tp->t_type;
ntp->t_mountp = tp->t_mountp;
ntp->t_items_free = XFS_LIC_NUM_SLOTS;
ntp->t_busy_free = XFS_LBC_NUM_SLOTS;
xfs_lic_init(&(ntp->t_items));
XFS_LBC_INIT(&(ntp->t_busy));
INIT_LIST_HEAD(&ntp->t_busy);
ASSERT(tp->t_flags & XFS_TRANS_PERM_LOG_RES);
ASSERT(tp->t_ticket != NULL);
@ -423,7 +427,6 @@ xfs_trans_reserve(
return error;
}
/*
* Record the indicated change to the given field for application
* to the file system's superblock when the transaction commits.
@ -652,7 +655,7 @@ xfs_trans_apply_sb_deltas(
* XFS_TRANS_SB_DIRTY will not be set when the transaction is updated but we
* still need to update the incore superblock with the changes.
*/
STATIC void
void
xfs_trans_unreserve_and_mod_sb(
xfs_trans_t *tp)
{
@ -880,7 +883,7 @@ xfs_trans_fill_vecs(
* they could be immediately flushed and we'd have to race with the flusher
* trying to pull the item from the AIL as we add it.
*/
static void
void
xfs_trans_item_committed(
struct xfs_log_item *lip,
xfs_lsn_t commit_lsn,
@ -930,26 +933,6 @@ xfs_trans_item_committed(
IOP_UNPIN(lip);
}
/* Clear all the per-AG busy list items listed in this transaction */
static void
xfs_trans_clear_busy_extents(
struct xfs_trans *tp)
{
xfs_log_busy_chunk_t *lbcp;
xfs_log_busy_slot_t *lbsp;
int i;
for (lbcp = &tp->t_busy; lbcp != NULL; lbcp = lbcp->lbc_next) {
i = 0;
for (lbsp = lbcp->lbc_busy; i < lbcp->lbc_unused; i++, lbsp++) {
if (XFS_LBC_ISFREE(lbcp, i))
continue;
xfs_alloc_clear_busy(tp, lbsp->lbc_ag, lbsp->lbc_idx);
}
}
xfs_trans_free_busy(tp);
}
/*
* This is typically called by the LM when a transaction has been fully
* committed to disk. It needs to unpin the items which have
@ -984,7 +967,6 @@ xfs_trans_committed(
kmem_free(licp);
}
xfs_trans_clear_busy_extents(tp);
xfs_trans_free(tp);
}
@ -1012,8 +994,7 @@ xfs_trans_uncommit(
xfs_trans_unreserve_and_mod_sb(tp);
xfs_trans_unreserve_and_mod_dquots(tp);
xfs_trans_free_items(tp, flags);
xfs_trans_free_busy(tp);
xfs_trans_free_items(tp, NULLCOMMITLSN, flags);
xfs_trans_free(tp);
}
@ -1075,6 +1056,8 @@ xfs_trans_commit_iclog(
*commit_lsn = xfs_log_done(mp, tp->t_ticket, &commit_iclog, log_flags);
tp->t_commit_lsn = *commit_lsn;
trace_xfs_trans_commit_lsn(tp);
if (nvec > XFS_TRANS_LOGVEC_COUNT)
kmem_free(log_vector);
@ -1161,6 +1144,93 @@ xfs_trans_commit_iclog(
return xfs_log_release_iclog(mp, commit_iclog);
}
/*
* Walk the log items and allocate log vector structures for
* each item large enough to fit all the vectors they require.
* Note that this format differs from the old log vector format in
* that there is no transaction header in these log vectors.
*/
STATIC struct xfs_log_vec *
xfs_trans_alloc_log_vecs(
xfs_trans_t *tp)
{
xfs_log_item_desc_t *lidp;
struct xfs_log_vec *lv = NULL;
struct xfs_log_vec *ret_lv = NULL;
lidp = xfs_trans_first_item(tp);
/* Bail out if we didn't find a log item. */
if (!lidp) {
ASSERT(0);
return NULL;
}
while (lidp != NULL) {
struct xfs_log_vec *new_lv;
/* Skip items which aren't dirty in this transaction. */
if (!(lidp->lid_flags & XFS_LID_DIRTY)) {
lidp = xfs_trans_next_item(tp, lidp);
continue;
}
/* Skip items that do not have any vectors for writing */
lidp->lid_size = IOP_SIZE(lidp->lid_item);
if (!lidp->lid_size) {
lidp = xfs_trans_next_item(tp, lidp);
continue;
}
new_lv = kmem_zalloc(sizeof(*new_lv) +
lidp->lid_size * sizeof(struct xfs_log_iovec),
KM_SLEEP);
/* The allocated iovec region lies beyond the log vector. */
new_lv->lv_iovecp = (struct xfs_log_iovec *)&new_lv[1];
new_lv->lv_niovecs = lidp->lid_size;
new_lv->lv_item = lidp->lid_item;
if (!ret_lv)
ret_lv = new_lv;
else
lv->lv_next = new_lv;
lv = new_lv;
lidp = xfs_trans_next_item(tp, lidp);
}
return ret_lv;
}
static int
xfs_trans_commit_cil(
struct xfs_mount *mp,
struct xfs_trans *tp,
xfs_lsn_t *commit_lsn,
int flags)
{
struct xfs_log_vec *log_vector;
int error;
/*
* Get each log item to allocate a vector structure for
* the log item to to pass to the log write code. The
* CIL commit code will format the vector and save it away.
*/
log_vector = xfs_trans_alloc_log_vecs(tp);
if (!log_vector)
return ENOMEM;
error = xfs_log_commit_cil(mp, tp, log_vector, commit_lsn, flags);
if (error)
return error;
current_restore_flags_nested(&tp->t_pflags, PF_FSTRANS);
/* xfs_trans_free_items() unlocks them first */
xfs_trans_free_items(tp, *commit_lsn, 0);
xfs_trans_free(tp);
return 0;
}
/*
* xfs_trans_commit
@ -1221,7 +1291,11 @@ _xfs_trans_commit(
xfs_trans_apply_sb_deltas(tp);
xfs_trans_apply_dquot_deltas(tp);
error = xfs_trans_commit_iclog(mp, tp, &commit_lsn, flags);
if (mp->m_flags & XFS_MOUNT_DELAYLOG)
error = xfs_trans_commit_cil(mp, tp, &commit_lsn, flags);
else
error = xfs_trans_commit_iclog(mp, tp, &commit_lsn, flags);
if (error == ENOMEM) {
xfs_force_shutdown(mp, SHUTDOWN_LOG_IO_ERROR);
error = XFS_ERROR(EIO);
@ -1259,8 +1333,7 @@ _xfs_trans_commit(
error = XFS_ERROR(EIO);
}
current_restore_flags_nested(&tp->t_pflags, PF_FSTRANS);
xfs_trans_free_items(tp, error ? XFS_TRANS_ABORT : 0);
xfs_trans_free_busy(tp);
xfs_trans_free_items(tp, NULLCOMMITLSN, error ? XFS_TRANS_ABORT : 0);
xfs_trans_free(tp);
XFS_STATS_INC(xs_trans_empty);
@ -1338,8 +1411,7 @@ xfs_trans_cancel(
/* mark this thread as no longer being in a transaction */
current_restore_flags_nested(&tp->t_pflags, PF_FSTRANS);
xfs_trans_free_items(tp, flags);
xfs_trans_free_busy(tp);
xfs_trans_free_items(tp, NULLCOMMITLSN, flags);
xfs_trans_free(tp);
}

View File

@ -106,7 +106,8 @@ typedef struct xfs_trans_header {
#define XFS_TRANS_GROWFSRT_FREE 39
#define XFS_TRANS_SWAPEXT 40
#define XFS_TRANS_SB_COUNT 41
#define XFS_TRANS_TYPE_MAX 41
#define XFS_TRANS_CHECKPOINT 42
#define XFS_TRANS_TYPE_MAX 42
/* new transaction types need to be reflected in xfs_logprint(8) */
#define XFS_TRANS_TYPES \
@ -148,6 +149,7 @@ typedef struct xfs_trans_header {
{ XFS_TRANS_GROWFSRT_FREE, "GROWFSRT_FREE" }, \
{ XFS_TRANS_SWAPEXT, "SWAPEXT" }, \
{ XFS_TRANS_SB_COUNT, "SB_COUNT" }, \
{ XFS_TRANS_CHECKPOINT, "CHECKPOINT" }, \
{ XFS_TRANS_DUMMY1, "DUMMY1" }, \
{ XFS_TRANS_DUMMY2, "DUMMY2" }, \
{ XLOG_UNMOUNT_REC_TYPE, "UNMOUNT" }
@ -813,6 +815,7 @@ struct xfs_log_item_desc;
struct xfs_mount;
struct xfs_trans;
struct xfs_dquot_acct;
struct xfs_busy_extent;
typedef struct xfs_log_item {
struct list_head li_ail; /* AIL pointers */
@ -828,6 +831,11 @@ typedef struct xfs_log_item {
/* buffer item iodone */
/* callback func */
struct xfs_item_ops *li_ops; /* function list */
/* delayed logging */
struct list_head li_cil; /* CIL pointers */
struct xfs_log_vec *li_lv; /* active log vector */
xfs_lsn_t li_seq; /* CIL commit seq */
} xfs_log_item_t;
#define XFS_LI_IN_AIL 0x1
@ -871,34 +879,6 @@ typedef struct xfs_item_ops {
#define XFS_ITEM_LOCKED 2
#define XFS_ITEM_PUSHBUF 3
/*
* This structure is used to maintain a list of block ranges that have been
* freed in the transaction. The ranges are listed in the perag[] busy list
* between when they're freed and the transaction is committed to disk.
*/
typedef struct xfs_log_busy_slot {
xfs_agnumber_t lbc_ag;
ushort lbc_idx; /* index in perag.busy[] */
} xfs_log_busy_slot_t;
#define XFS_LBC_NUM_SLOTS 31
typedef struct xfs_log_busy_chunk {
struct xfs_log_busy_chunk *lbc_next;
uint lbc_free; /* free slots bitmask */
ushort lbc_unused; /* first unused */
xfs_log_busy_slot_t lbc_busy[XFS_LBC_NUM_SLOTS];
} xfs_log_busy_chunk_t;
#define XFS_LBC_MAX_SLOT (XFS_LBC_NUM_SLOTS - 1)
#define XFS_LBC_FREEMASK ((1U << XFS_LBC_NUM_SLOTS) - 1)
#define XFS_LBC_INIT(cp) ((cp)->lbc_free = XFS_LBC_FREEMASK)
#define XFS_LBC_CLAIM(cp, slot) ((cp)->lbc_free &= ~(1 << (slot)))
#define XFS_LBC_SLOT(cp, slot) (&((cp)->lbc_busy[(slot)]))
#define XFS_LBC_VACANCY(cp) (((cp)->lbc_free) & XFS_LBC_FREEMASK)
#define XFS_LBC_ISFREE(cp, slot) ((cp)->lbc_free & (1 << (slot)))
/*
* This is the type of function which can be given to xfs_trans_callback()
* to be called upon the transaction's commit to disk.
@ -950,8 +930,7 @@ typedef struct xfs_trans {
unsigned int t_items_free; /* log item descs free */
xfs_log_item_chunk_t t_items; /* first log item desc chunk */
xfs_trans_header_t t_header; /* header for in-log trans */
unsigned int t_busy_free; /* busy descs free */
xfs_log_busy_chunk_t t_busy; /* busy/async free blocks */
struct list_head t_busy; /* list of busy extents */
unsigned long t_pflags; /* saved process flags state */
} xfs_trans_t;
@ -1025,9 +1004,6 @@ int _xfs_trans_commit(xfs_trans_t *,
void xfs_trans_cancel(xfs_trans_t *, int);
int xfs_trans_ail_init(struct xfs_mount *);
void xfs_trans_ail_destroy(struct xfs_mount *);
xfs_log_busy_slot_t *xfs_trans_add_busy(xfs_trans_t *tp,
xfs_agnumber_t ag,
xfs_extlen_t idx);
extern kmem_zone_t *xfs_trans_zone;

View File

@ -114,7 +114,7 @@ _xfs_trans_bjoin(
xfs_buf_item_init(bp, tp->t_mountp);
bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
ASSERT(!(bip->bli_flags & XFS_BLI_LOGGED));
if (reset_recur)
bip->bli_recur = 0;
@ -511,7 +511,7 @@ xfs_trans_brelse(xfs_trans_t *tp,
bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
ASSERT(bip->bli_item.li_type == XFS_LI_BUF);
ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
ASSERT(atomic_read(&bip->bli_refcount) > 0);
/*
@ -619,7 +619,7 @@ xfs_trans_bhold(xfs_trans_t *tp,
bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
ASSERT(atomic_read(&bip->bli_refcount) > 0);
bip->bli_flags |= XFS_BLI_HOLD;
trace_xfs_trans_bhold(bip);
@ -641,7 +641,7 @@ xfs_trans_bhold_release(xfs_trans_t *tp,
bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
ASSERT(atomic_read(&bip->bli_refcount) > 0);
ASSERT(bip->bli_flags & XFS_BLI_HOLD);
bip->bli_flags &= ~XFS_BLI_HOLD;
@ -704,7 +704,7 @@ xfs_trans_log_buf(xfs_trans_t *tp,
bip->bli_flags &= ~XFS_BLI_STALE;
ASSERT(XFS_BUF_ISSTALE(bp));
XFS_BUF_UNSTALE(bp);
bip->bli_format.blf_flags &= ~XFS_BLI_CANCEL;
bip->bli_format.blf_flags &= ~XFS_BLF_CANCEL;
}
lidp = xfs_trans_find_item(tp, (xfs_log_item_t*)bip);
@ -762,8 +762,8 @@ xfs_trans_binval(
ASSERT(!(XFS_BUF_ISDELAYWRITE(bp)));
ASSERT(XFS_BUF_ISSTALE(bp));
ASSERT(!(bip->bli_flags & (XFS_BLI_LOGGED | XFS_BLI_DIRTY)));
ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_INODE_BUF));
ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_INODE_BUF));
ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
ASSERT(lidp->lid_flags & XFS_LID_DIRTY);
ASSERT(tp->t_flags & XFS_TRANS_DIRTY);
return;
@ -774,7 +774,7 @@ xfs_trans_binval(
* in the buf log item. The STALE flag will be used in
* xfs_buf_item_unpin() to determine if it should clean up
* when the last reference to the buf item is given up.
* We set the XFS_BLI_CANCEL flag in the buf log format structure
* We set the XFS_BLF_CANCEL flag in the buf log format structure
* and log the buf item. This will be used at recovery time
* to determine that copies of the buffer in the log before
* this should not be replayed.
@ -792,9 +792,9 @@ xfs_trans_binval(
XFS_BUF_UNDELAYWRITE(bp);
XFS_BUF_STALE(bp);
bip->bli_flags |= XFS_BLI_STALE;
bip->bli_flags &= ~(XFS_BLI_LOGGED | XFS_BLI_DIRTY);
bip->bli_format.blf_flags &= ~XFS_BLI_INODE_BUF;
bip->bli_format.blf_flags |= XFS_BLI_CANCEL;
bip->bli_flags &= ~(XFS_BLI_INODE_BUF | XFS_BLI_LOGGED | XFS_BLI_DIRTY);
bip->bli_format.blf_flags &= ~XFS_BLF_INODE_BUF;
bip->bli_format.blf_flags |= XFS_BLF_CANCEL;
memset((char *)(bip->bli_format.blf_data_map), 0,
(bip->bli_format.blf_map_size * sizeof(uint)));
lidp->lid_flags |= XFS_LID_DIRTY;
@ -802,16 +802,16 @@ xfs_trans_binval(
}
/*
* This call is used to indicate that the buffer contains on-disk
* inodes which must be handled specially during recovery. They
* require special handling because only the di_next_unlinked from
* the inodes in the buffer should be recovered. The rest of the
* data in the buffer is logged via the inodes themselves.
* This call is used to indicate that the buffer contains on-disk inodes which
* must be handled specially during recovery. They require special handling
* because only the di_next_unlinked from the inodes in the buffer should be
* recovered. The rest of the data in the buffer is logged via the inodes
* themselves.
*
* All we do is set the XFS_BLI_INODE_BUF flag in the buffer's log
* format structure so that we'll know what to do at recovery time.
* All we do is set the XFS_BLI_INODE_BUF flag in the items flags so it can be
* transferred to the buffer's log format structure so that we'll know what to
* do at recovery time.
*/
/* ARGSUSED */
void
xfs_trans_inode_buf(
xfs_trans_t *tp,
@ -826,7 +826,7 @@ xfs_trans_inode_buf(
bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
ASSERT(atomic_read(&bip->bli_refcount) > 0);
bip->bli_format.blf_flags |= XFS_BLI_INODE_BUF;
bip->bli_flags |= XFS_BLI_INODE_BUF;
}
/*
@ -908,9 +908,9 @@ xfs_trans_dquot_buf(
ASSERT(XFS_BUF_ISBUSY(bp));
ASSERT(XFS_BUF_FSPRIVATE2(bp, xfs_trans_t *) == tp);
ASSERT(XFS_BUF_FSPRIVATE(bp, void *) != NULL);
ASSERT(type == XFS_BLI_UDQUOT_BUF ||
type == XFS_BLI_PDQUOT_BUF ||
type == XFS_BLI_GDQUOT_BUF);
ASSERT(type == XFS_BLF_UDQUOT_BUF ||
type == XFS_BLF_PDQUOT_BUF ||
type == XFS_BLF_GDQUOT_BUF);
bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
ASSERT(atomic_read(&bip->bli_refcount) > 0);

View File

@ -299,6 +299,7 @@ xfs_trans_next_item(xfs_trans_t *tp, xfs_log_item_desc_t *lidp)
void
xfs_trans_free_items(
xfs_trans_t *tp,
xfs_lsn_t commit_lsn,
int flags)
{
xfs_log_item_chunk_t *licp;
@ -311,7 +312,7 @@ xfs_trans_free_items(
* Special case the embedded chunk so we don't free it below.
*/
if (!xfs_lic_are_all_free(licp)) {
(void) xfs_trans_unlock_chunk(licp, 1, abort, NULLCOMMITLSN);
(void) xfs_trans_unlock_chunk(licp, 1, abort, commit_lsn);
xfs_lic_all_free(licp);
licp->lic_unused = 0;
}
@ -322,7 +323,7 @@ xfs_trans_free_items(
*/
while (licp != NULL) {
ASSERT(!xfs_lic_are_all_free(licp));
(void) xfs_trans_unlock_chunk(licp, 1, abort, NULLCOMMITLSN);
(void) xfs_trans_unlock_chunk(licp, 1, abort, commit_lsn);
next_licp = licp->lic_next;
kmem_free(licp);
licp = next_licp;
@ -438,112 +439,3 @@ xfs_trans_unlock_chunk(
return freed;
}
/*
* This is called to add the given busy item to the transaction's
* list of busy items. It must find a free busy item descriptor
* or allocate a new one and add the item to that descriptor.
* The function returns a pointer to busy descriptor used to point
* to the new busy entry. The log busy entry will now point to its new
* descriptor with its ???? field.
*/
xfs_log_busy_slot_t *
xfs_trans_add_busy(xfs_trans_t *tp, xfs_agnumber_t ag, xfs_extlen_t idx)
{
xfs_log_busy_chunk_t *lbcp;
xfs_log_busy_slot_t *lbsp;
int i=0;
/*
* If there are no free descriptors, allocate a new chunk
* of them and put it at the front of the chunk list.
*/
if (tp->t_busy_free == 0) {
lbcp = (xfs_log_busy_chunk_t*)
kmem_alloc(sizeof(xfs_log_busy_chunk_t), KM_SLEEP);
ASSERT(lbcp != NULL);
/*
* Initialize the chunk, and then
* claim the first slot in the newly allocated chunk.
*/
XFS_LBC_INIT(lbcp);
XFS_LBC_CLAIM(lbcp, 0);
lbcp->lbc_unused = 1;
lbsp = XFS_LBC_SLOT(lbcp, 0);
/*
* Link in the new chunk and update the free count.
*/
lbcp->lbc_next = tp->t_busy.lbc_next;
tp->t_busy.lbc_next = lbcp;
tp->t_busy_free = XFS_LIC_NUM_SLOTS - 1;
/*
* Initialize the descriptor and the generic portion
* of the log item.
*
* Point the new slot at this item and return it.
* Also point the log item at its currently active
* descriptor and set the item's mount pointer.
*/
lbsp->lbc_ag = ag;
lbsp->lbc_idx = idx;
return lbsp;
}
/*
* Find the free descriptor. It is somewhere in the chunklist
* of descriptors.
*/
lbcp = &tp->t_busy;
while (lbcp != NULL) {
if (XFS_LBC_VACANCY(lbcp)) {
if (lbcp->lbc_unused <= XFS_LBC_MAX_SLOT) {
i = lbcp->lbc_unused;
break;
} else {
/* out-of-order vacancy */
cmn_err(CE_DEBUG, "OOO vacancy lbcp 0x%p\n", lbcp);
ASSERT(0);
}
}
lbcp = lbcp->lbc_next;
}
ASSERT(lbcp != NULL);
/*
* If we find a free descriptor, claim it,
* initialize it, and return it.
*/
XFS_LBC_CLAIM(lbcp, i);
if (lbcp->lbc_unused <= i) {
lbcp->lbc_unused = i + 1;
}
lbsp = XFS_LBC_SLOT(lbcp, i);
tp->t_busy_free--;
lbsp->lbc_ag = ag;
lbsp->lbc_idx = idx;
return lbsp;
}
/*
* xfs_trans_free_busy
* Free all of the busy lists from a transaction
*/
void
xfs_trans_free_busy(xfs_trans_t *tp)
{
xfs_log_busy_chunk_t *lbcp;
xfs_log_busy_chunk_t *lbcq;
lbcp = tp->t_busy.lbc_next;
while (lbcp != NULL) {
lbcq = lbcp->lbc_next;
kmem_free(lbcp);
lbcp = lbcq;
}
XFS_LBC_INIT(&tp->t_busy);
tp->t_busy.lbc_unused = 0;
}

View File

@ -35,13 +35,14 @@ struct xfs_log_item_desc *xfs_trans_find_item(struct xfs_trans *,
struct xfs_log_item_desc *xfs_trans_first_item(struct xfs_trans *);
struct xfs_log_item_desc *xfs_trans_next_item(struct xfs_trans *,
struct xfs_log_item_desc *);
void xfs_trans_free_items(struct xfs_trans *, int);
void xfs_trans_unlock_items(struct xfs_trans *,
xfs_lsn_t);
void xfs_trans_free_busy(xfs_trans_t *tp);
xfs_log_busy_slot_t *xfs_trans_add_busy(xfs_trans_t *tp,
xfs_agnumber_t ag,
xfs_extlen_t idx);
void xfs_trans_unlock_items(struct xfs_trans *tp, xfs_lsn_t commit_lsn);
void xfs_trans_free_items(struct xfs_trans *tp, xfs_lsn_t commit_lsn,
int flags);
void xfs_trans_item_committed(struct xfs_log_item *lip,
xfs_lsn_t commit_lsn, int aborted);
void xfs_trans_unreserve_and_mod_sb(struct xfs_trans *tp);
/*
* AIL traversal cursor.

View File

@ -75,6 +75,8 @@ typedef __uint32_t xfs_dahash_t; /* dir/attr hash value */
typedef __uint16_t xfs_prid_t; /* prid_t truncated to 16bits in XFS */
typedef __uint32_t xlog_tid_t; /* transaction ID type */
/*
* These types are 64 bits on disk but are either 32 or 64 bits in memory.
* Disk based types: