orangefs: copy Orangefs-sized blocks into the pagecache if possible.

->readpage looks in file->private_data to try and find out how the
userspace program set "count" in read(2) or with "dd bs=" or whatever.

->readpage uses "count" and inode->i_size to calculate how much
data Orangefs should deposit in the Orangefs shared buffer, and
remembers which slot the data is in.

After copying data from the Orangefs shared buffer slot into
"the page", readpage tries to increment through the pagecache index
and fill as many pages as it can from the extra data in the shared
buffer. Hopefully these extra pages will soon be needed by the vfs,
and they'll be in the pagecache already.

Signed-off-by: Mike Marshall <hubcap@omnibond.com>
Signed-off-by: Martin Brandenburg <martin@omnibond.com>
This commit is contained in:
Mike Marshall 2019-03-25 18:59:29 -04:00
parent 4077a0f25b
commit dd59a6475c
5 changed files with 156 additions and 15 deletions

View File

@ -54,6 +54,7 @@ ssize_t wait_for_direct_io(enum ORANGEFS_io_type type, struct inode *inode,
struct orangefs_kernel_op_s *new_op = NULL;
int buffer_index = -1;
ssize_t ret;
size_t copy_amount;
new_op = op_alloc(ORANGEFS_VFS_OP_FILE_IO);
if (!new_op)
@ -212,8 +213,25 @@ ssize_t wait_for_direct_io(enum ORANGEFS_io_type type, struct inode *inode,
* can futher be kernel-space or user-space addresses.
* or it can pointers to struct page's
*/
/*
* When reading, readahead_size will only be zero when
* we're doing O_DIRECT, otherwise we got here from
* orangefs_readpage.
*
* If we got here from orangefs_readpage we want to
* copy either a page or the whole file into the io
* vector, whichever is smaller.
*/
if (readahead_size)
copy_amount =
min(new_op->downcall.resp.io.amt_complete,
(__s64)PAGE_SIZE);
else
copy_amount = new_op->downcall.resp.io.amt_complete;
ret = orangefs_bufmap_copy_to_iovec(iter, buffer_index,
new_op->downcall.resp.io.amt_complete);
copy_amount);
if (ret < 0) {
gossip_err("%s: Failed to copy-out buffers. Please make sure that the pvfs2-client is running (%ld)\n",
__func__, (long)ret);
@ -231,10 +249,19 @@ ssize_t wait_for_direct_io(enum ORANGEFS_io_type type, struct inode *inode,
out:
if (buffer_index >= 0) {
if ((readahead_size) && (type == ORANGEFS_IO_READ)) {
/* readpage */
*index_return = buffer_index;
gossip_debug(GOSSIP_FILE_DEBUG,
"%s: hold on to buffer_index :%d:\n",
__func__, buffer_index);
} else {
/* O_DIRECT */
orangefs_bufmap_put(buffer_index);
gossip_debug(GOSSIP_FILE_DEBUG,
"%s(%pU): PUT buffer_index %d\n",
__func__, handle, buffer_index);
}
buffer_index = -1;
}
op_release(new_op);

View File

@ -247,31 +247,80 @@ static int orangefs_writepages(struct address_space *mapping,
return ret;
}
static int orangefs_launder_page(struct page *);
static int orangefs_readpage(struct file *file, struct page *page)
{
struct inode *inode = page->mapping->host;
struct iov_iter iter;
struct bio_vec bv;
ssize_t ret;
loff_t off;
loff_t off; /* offset into this page */
pgoff_t index; /* which page */
struct page *next_page;
char *kaddr;
struct orangefs_read_options *ro = file->private_data;
loff_t read_size;
loff_t roundedup;
int buffer_index = -1; /* orangefs shared memory slot */
int slot_index; /* index into slot */
int remaining;
/*
* If they set some miniscule size for "count" in read(2)
* (for example) then let's try to read a page, or the whole file
* if it is smaller than a page. Once "count" goes over a page
* then lets round up to the highest page size multiple that is
* less than or equal to "count" and do that much orangefs IO and
* try to fill as many pages as we can from it.
*
* "count" should be represented in ro->blksiz.
*
* inode->i_size = file size.
*/
if (ro) {
if (ro->blksiz < PAGE_SIZE) {
if (inode->i_size < PAGE_SIZE)
read_size = inode->i_size;
else
read_size = PAGE_SIZE;
} else {
roundedup = ((PAGE_SIZE - 1) & ro->blksiz) ?
((ro->blksiz + PAGE_SIZE) & ~(PAGE_SIZE -1)) :
ro->blksiz;
if (roundedup > inode->i_size)
read_size = inode->i_size;
else
read_size = roundedup;
}
} else {
read_size = PAGE_SIZE;
}
if (!read_size)
read_size = PAGE_SIZE;
if (PageDirty(page))
orangefs_launder_page(page);
off = page_offset(page);
index = off >> PAGE_SHIFT;
bv.bv_page = page;
bv.bv_len = PAGE_SIZE;
bv.bv_offset = 0;
iov_iter_bvec(&iter, READ, &bv, 1, PAGE_SIZE);
if (PageDirty(page))
orangefs_launder_page(page);
ret = wait_for_direct_io(ORANGEFS_IO_READ, inode, &off, &iter,
PAGE_SIZE, inode->i_size, NULL, NULL);
read_size, inode->i_size, NULL, &buffer_index);
remaining = ret;
/* this will only zero remaining unread portions of the page data */
iov_iter_zero(~0U, &iter);
/* takes care of potential aliasing */
flush_dcache_page(page);
if (ret < 0) {
SetPageError(page);
unlock_page(page);
goto out;
} else {
SetPageUptodate(page);
if (PageError(page))
@ -280,10 +329,61 @@ static int orangefs_readpage(struct file *file, struct page *page)
}
/* unlock the page after the ->readpage() routine completes */
unlock_page(page);
return ret;
if (remaining > PAGE_SIZE) {
slot_index = 0;
while ((remaining - PAGE_SIZE) >= PAGE_SIZE) {
remaining -= PAGE_SIZE;
/*
* It is an optimization to try and fill more than one
* page... by now we've already gotten the single
* page we were after, if stuff doesn't seem to
* be going our way at this point just return
* and hope for the best.
*
* If we look for pages and they're already there is
* one reason to give up, and if they're not there
* and we can't create them is another reason.
*/
index++;
slot_index++;
next_page = find_get_page(inode->i_mapping, index);
if (next_page) {
gossip_debug(GOSSIP_FILE_DEBUG,
"%s: found next page, quitting\n",
__func__);
put_page(next_page);
goto out;
}
next_page = find_or_create_page(inode->i_mapping,
index,
GFP_KERNEL);
/*
* I've never hit this, leave it as a printk for
* now so it will be obvious.
*/
if (!next_page) {
printk("%s: can't create next page, quitting\n",
__func__);
goto out;
}
kaddr = kmap_atomic(next_page);
orangefs_bufmap_page_fill(kaddr,
buffer_index,
slot_index);
kunmap_atomic(kaddr);
SetPageUptodate(next_page);
unlock_page(next_page);
put_page(next_page);
}
}
static int orangefs_launder_page(struct page *);
out:
if (buffer_index != -1)
orangefs_bufmap_put(buffer_index);
return ret;
}
static int orangefs_write_begin(struct file *file,
struct address_space *mapping,
@ -326,7 +426,6 @@ static int orangefs_write_begin(struct file *file,
if (ret)
return ret;
}
}
wr = kmalloc(sizeof *wr, GFP_KERNEL);

View File

@ -538,3 +538,16 @@ int orangefs_bufmap_copy_to_iovec(struct iov_iter *iter,
}
return 0;
}
void orangefs_bufmap_page_fill(void *page_to,
int buffer_index,
int slot_index)
{
struct orangefs_bufmap_desc *from;
void *page_from;
from = &__orangefs_bufmap->desc_array[buffer_index];
page_from = kmap_atomic(from->page_array[slot_index]);
memcpy(page_to, page_from, PAGE_SIZE);
kunmap_atomic(page_from);
}

View File

@ -34,4 +34,6 @@ int orangefs_bufmap_copy_to_iovec(struct iov_iter *iter,
int buffer_index,
size_t size);
void orangefs_bufmap_page_fill(void *kaddr, int buffer_index, int slot_index);
#endif /* __ORANGEFS_BUFMAP_H */