The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/ofed/drivers/infiniband/ulp/sdp/sdp_zcopy.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
    3  *
    4  * Copyright (c) 2006 Mellanox Technologies Ltd.  All rights reserved.
    5  *
    6  * This software is available to you under a choice of one of two
    7  * licenses.  You may choose to be licensed under the terms of the GNU
    8  * General Public License (GPL) Version 2, available from the file
    9  * COPYING in the main directory of this source tree, or the
   10  * OpenIB.org BSD license below:
   11  *
   12  *     Redistribution and use in source and binary forms, with or
   13  *     without modification, are permitted provided that the following
   14  *     conditions are met:
   15  *
   16  *      - Redistributions of source code must retain the above
   17  *        copyright notice, this list of conditions and the following
   18  *        disclaimer.
   19  *
   20  *      - Redistributions in binary form must reproduce the above
   21  *        copyright notice, this list of conditions and the following
   22  *        disclaimer in the documentation and/or other materials
   23  *        provided with the distribution.
   24  *
   25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
   26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
   28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
   29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
   30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
   32  * SOFTWARE.
   33  */
   34 #include <linux/tcp.h>
   35 #include <asm/ioctls.h>
   36 #include <linux/workqueue.h>
   37 #include <linux/net.h>
   38 #include <linux/socket.h>
   39 #include <net/protocol.h>
   40 #include <net/inet_common.h>
   41 #include <rdma/rdma_cm.h>
   42 #include <rdma/ib_verbs.h>
   43 #include <rdma/ib_fmr_pool.h>
   44 #include <rdma/ib_umem.h> 
   45 #include <net/tcp.h> /* for memcpy_toiovec */
   46 #include <asm/io.h>
   47 #include <asm/uaccess.h>
   48 #include <linux/delay.h>
   49 #include "sdp.h"
   50 
   51 static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa)
   52 {
   53         struct sdp_sock *ssk = sdp_sk(sk);
   54         struct mbuf *mb;
   55         int payload_len;
   56         struct page *payload_pg;
   57         int off, len;
   58         struct ib_umem_chunk *chunk;
   59 
   60         WARN_ON(ssk->tx_sa);
   61 
   62         BUG_ON(!tx_sa);
   63         BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
   64         BUG_ON(!tx_sa->umem);
   65         BUG_ON(!tx_sa->umem->chunk_list.next);
   66 
   67         chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
   68         BUG_ON(!chunk->nmap);
   69 
   70         off = tx_sa->umem->offset;
   71         len = tx_sa->umem->length;
   72 
   73         tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
   74 
   75         mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0);
   76         if (!mb) {
   77                 return -ENOMEM;
   78         }
   79         sdp_dbg_data(sk, "sending SrcAvail\n");
   80                 
   81         TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb 
   82                                          * but continue to live after mb is freed */
   83         ssk->tx_sa = tx_sa;
   84 
   85         /* must have payload inlined in SrcAvail packet in combined mode */
   86         payload_len = MIN(tx_sa->umem->page_size - off, len);
   87         payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah));
   88         payload_pg  = sg_page(&chunk->page_list[0]);
   89         get_page(payload_pg);
   90 
   91         sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
   92                 off, payload_pg, payload_len);
   93 
   94         mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags,
   95                         payload_pg, off, payload_len);
   96 
   97         mb->len             += payload_len;
   98         mb->data_len         = payload_len;
   99         mb->truesize        += payload_len;
  100 //      sk->sk_wmem_queued   += payload_len;
  101 //      sk->sk_forward_alloc -= payload_len;
  102 
  103         mb_entail(sk, ssk, mb);
  104         
  105         ssk->write_seq += payload_len;
  106         SDP_SKB_CB(mb)->end_seq += payload_len;
  107 
  108         tx_sa->bytes_sent = tx_sa->umem->length;
  109         tx_sa->bytes_acked = payload_len;
  110 
  111         /* TODO: pushing the mb into the tx_queue should be enough */
  112 
  113         return 0;
  114 }
  115 
  116 static int sdp_post_srcavail_cancel(struct socket *sk)
  117 {
  118         struct sdp_sock *ssk = sdp_sk(sk);
  119         struct mbuf *mb;
  120 
  121         sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n");
  122 
  123         mb = sdp_alloc_mb_srcavail_cancel(sk, 0);
  124         mb_entail(sk, ssk, mb);
  125 
  126         sdp_post_sends(ssk, 0);
  127 
  128         schedule_delayed_work(&ssk->srcavail_cancel_work,
  129                         SDP_SRCAVAIL_CANCEL_TIMEOUT);
  130 
  131         return 0;
  132 }
  133 
  134 void srcavail_cancel_timeout(struct work_struct *work)
  135 {
  136         struct sdp_sock *ssk =
  137                 container_of(work, struct sdp_sock, srcavail_cancel_work.work);
  138         struct socket *sk = ssk->socket;
  139 
  140         lock_sock(sk);
  141 
  142         sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
  143                         " closing connection\n");
  144         sdp_set_error(sk, -ECONNRESET);
  145         wake_up(&ssk->wq);
  146 
  147         release_sock(sk);
  148 }
  149 
  150 static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
  151                 int ignore_signals)
  152 {
  153         struct socket *sk = ssk->socket;
  154         int err = 0;
  155         long vm_wait = 0;
  156         long current_timeo = *timeo_p;
  157         struct tx_srcavail_state *tx_sa = ssk->tx_sa;
  158         DEFINE_WAIT(wait);
  159 
  160         sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p);
  161         sdp_prf1(sk, NULL, "Going to sleep");
  162         while (ssk->qp_active) {
  163                 prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
  164 
  165                 if (unlikely(!*timeo_p)) {
  166                         err = -ETIME;
  167                         tx_sa->abort_flags |= TX_SA_TIMEDOUT;
  168                         sdp_prf1(sk, NULL, "timeout");
  169                         SDPSTATS_COUNTER_INC(zcopy_tx_timeout);
  170                         break;
  171                 }
  172 
  173                 else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
  174                         err = -EINVAL;
  175                         sdp_dbg_data(sk, "acked bytes > sent bytes\n");
  176                         tx_sa->abort_flags |= TX_SA_ERROR;
  177                         break;
  178                 }
  179 
  180                 if (tx_sa->abort_flags & TX_SA_SENDSM) {
  181                         sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
  182                         SDPSTATS_COUNTER_INC(zcopy_tx_aborted);
  183                         err = -EAGAIN;
  184                         break ;
  185                 }
  186 
  187                 if (!ignore_signals) {
  188                         if (signal_pending(current)) {
  189                                 err = -EINTR;
  190                                 sdp_prf1(sk, NULL, "signalled");
  191                                 tx_sa->abort_flags |= TX_SA_INTRRUPTED;
  192                                 break;
  193                         }
  194 
  195                         if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) {
  196                                 sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n");
  197                                 tx_sa->abort_flags |= TX_SA_CROSS_SEND;
  198                                 SDPSTATS_COUNTER_INC(zcopy_cross_send);
  199                                 err = -ETIME;
  200                                 break ;
  201                         }
  202                 }
  203 
  204                 posts_handler_put(ssk);
  205 
  206                 sk_wait_event(sk, &current_timeo,
  207                                 tx_sa->abort_flags &&
  208                                 ssk->rx_sa &&
  209                                 (tx_sa->bytes_acked < tx_sa->bytes_sent) && 
  210                                 vm_wait);
  211                 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
  212 
  213                 posts_handler_get(ssk);
  214 
  215                 if (tx_sa->bytes_acked == tx_sa->bytes_sent)
  216                         break;
  217 
  218                 if (vm_wait) {
  219                         vm_wait -= current_timeo;
  220                         current_timeo = *timeo_p;
  221                         if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
  222                             (current_timeo -= vm_wait) < 0)
  223                                 current_timeo = 0;
  224                         vm_wait = 0;
  225                 }
  226                 *timeo_p = current_timeo;
  227         }
  228 
  229         finish_wait(sk->sk_sleep, &wait);
  230 
  231         sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
  232                         tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
  233 
  234         if (!ssk->qp_active) {
  235                 sdp_dbg(sk, "QP destroyed while waiting\n");
  236                 return -EINVAL;
  237         }
  238         return err;
  239 }
  240 
  241 static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
  242 {
  243         struct socket *sk = ssk->socket;
  244         long timeo = HZ * 5; /* Timeout for RDMA read */
  245         DEFINE_WAIT(wait);
  246 
  247         sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
  248         while (1) {
  249                 prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
  250 
  251                 if (!ssk->tx_ring.rdma_inflight->busy) {
  252                         sdp_dbg_data(sk, "got rdma cqe\n");
  253                         break;
  254                 }
  255 
  256                 if (!ssk->qp_active) {
  257                         sdp_dbg_data(sk, "QP destroyed\n");
  258                         break;
  259                 }
  260 
  261                 if (!timeo) {
  262                         sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
  263                         WARN_ON(1);
  264                         break;
  265                 }
  266 
  267                 posts_handler_put(ssk);
  268 
  269                 sdp_prf1(sk, NULL, "Going to sleep");
  270                 sk_wait_event(sk, &timeo, 
  271                         !ssk->tx_ring.rdma_inflight->busy);
  272                 sdp_prf1(sk, NULL, "Woke up");
  273                 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
  274 
  275                 posts_handler_get(ssk);
  276         }
  277 
  278         finish_wait(sk->sk_sleep, &wait);
  279 
  280         sdp_dbg_data(sk, "Finished waiting\n");
  281 }
  282 
  283 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
  284                 struct rx_srcavail_state *rx_sa)
  285 {
  286         struct mbuf *mb;
  287         int copied = rx_sa->used - rx_sa->reported;
  288 
  289         if (rx_sa->used <= rx_sa->reported)
  290                 return 0;
  291 
  292         mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0);
  293 
  294         rx_sa->reported += copied;
  295 
  296         /* TODO: What if no tx_credits available? */
  297         sdp_post_send(ssk, mb);
  298 
  299         return 0;
  300 }
  301 
  302 int sdp_post_sendsm(struct socket *sk)
  303 {
  304         struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0);
  305 
  306         sdp_post_send(sdp_sk(sk), mb);
  307 
  308         return 0;
  309 }
  310 
  311 static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len)
  312 {
  313         sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len);
  314         while (len > 0) {
  315                 if (iov->iov_len) {
  316                         int copy = min_t(unsigned int, iov->iov_len, len);
  317                         len -= copy;
  318                         iov->iov_len -= copy;
  319                         iov->iov_base += copy;
  320                 }
  321                 iov++;
  322         }
  323 
  324         return 0;
  325 }
  326 
  327 static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
  328 {
  329         int bytes = 0;
  330 
  331         while (sge_cnt > 0) {
  332                 bytes += sge->length;
  333                 sge++;
  334                 sge_cnt--;
  335         }
  336 
  337         return bytes;
  338 }
  339 void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
  340 {
  341         struct socket *sk = ssk->socket;
  342         unsigned long flags;
  343 
  344         spin_lock_irqsave(&ssk->tx_sa_lock, flags);
  345 
  346         if (!ssk->tx_sa) {
  347                 sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
  348                 goto out;
  349         }
  350 
  351         if (ssk->tx_sa->mseq > mseq_ack) {
  352                 sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. "
  353                         "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
  354                         mseq_ack, ssk->tx_sa->mseq);
  355                 goto out;
  356         }
  357 
  358         sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
  359 
  360         ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
  361         cancel_delayed_work(&ssk->srcavail_cancel_work);
  362 
  363         wake_up(sk->sk_sleep);
  364         sdp_dbg_data(sk, "woke up sleepers\n");
  365 
  366 out:
  367         spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
  368 }
  369 
  370 void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
  371                 u32 bytes_completed)
  372 {
  373         struct socket *sk = ssk->socket;
  374         unsigned long flags;
  375 
  376         sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
  377         sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
  378 
  379         spin_lock_irqsave(&ssk->tx_sa_lock, flags);
  380 
  381         BUG_ON(!ssk);
  382 
  383         if (!ssk->tx_sa) {
  384                 sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
  385                 goto out;
  386         }
  387 
  388         if (ssk->tx_sa->mseq > mseq_ack) {
  389                 sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. "
  390                         "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
  391                         mseq_ack, ssk->tx_sa->mseq);
  392                 goto out;
  393         }
  394 
  395         ssk->tx_sa->bytes_acked += bytes_completed;
  396 
  397         wake_up(sk->sk_sleep);
  398         sdp_dbg_data(sk, "woke up sleepers\n");
  399 
  400 out:
  401         spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
  402         return;
  403 }
  404 
  405 static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset)
  406 {
  407         unsigned long avail;
  408         unsigned long lock_limit;
  409 
  410         if (capable(CAP_IPC_LOCK))
  411                 return ULONG_MAX;
  412 
  413         lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
  414         avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT);
  415 
  416         return avail - offset;
  417 }
  418 
  419 static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len,
  420         struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
  421 {
  422         struct ib_pool_fmr *fmr;
  423         struct ib_umem *umem;
  424         struct ib_device *dev;
  425         u64 *pages;
  426         struct ib_umem_chunk *chunk;
  427         int n, j, k;
  428         int rc = 0;
  429         unsigned long max_lockable_bytes;
  430 
  431         if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) {
  432                 sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
  433                         len, SDP_MAX_RDMA_READ_LEN);
  434                 len = SDP_MAX_RDMA_READ_LEN;
  435         }
  436 
  437         max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK);
  438         if (unlikely(len > max_lockable_bytes)) {
  439                 sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n",
  440                         len, max_lockable_bytes);
  441                 len = max_lockable_bytes;
  442         }
  443 
  444         sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n",
  445                         uaddr, len, max_lockable_bytes);
  446 
  447         umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
  448                 IB_ACCESS_REMOTE_WRITE, 0);
  449 
  450         if (IS_ERR(umem)) {
  451                 rc = PTR_ERR(umem);
  452                 sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc);
  453                 sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n",
  454                                 current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur,
  455                                 current->signal->rlim[RLIMIT_MEMLOCK].rlim_max,
  456                                 capable(CAP_IPC_LOCK));
  457                 goto err_umem_get;
  458         }
  459 
  460         sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
  461                 umem->offset, umem->length);
  462 
  463         pages = (u64 *) __get_free_page(GFP_KERNEL);
  464         if (!pages)
  465                 goto err_pages_alloc;
  466 
  467         n = 0;
  468 
  469         dev = sdp_sk(sk)->ib_device;
  470         list_for_each_entry(chunk, &umem->chunk_list, list) {
  471                 for (j = 0; j < chunk->nmap; ++j) {
  472                         len = ib_sg_dma_len(dev,
  473                                         &chunk->page_list[j]) >> PAGE_SHIFT;
  474 
  475                         for (k = 0; k < len; ++k) {
  476                                 pages[n++] = ib_sg_dma_address(dev,
  477                                                 &chunk->page_list[j]) +
  478                                         umem->page_size * k;
  479 
  480                         }
  481                 }
  482         }
  483 
  484         fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
  485         if (IS_ERR(fmr)) {
  486                 sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
  487                 goto err_fmr_alloc;
  488         }
  489 
  490         free_page((unsigned long) pages);
  491 
  492         *_umem = umem;
  493         *_fmr = fmr;
  494 
  495         return 0;
  496 
  497 err_fmr_alloc:  
  498         free_page((unsigned long) pages);
  499 
  500 err_pages_alloc:
  501         ib_umem_release(umem);
  502 
  503 err_umem_get:
  504 
  505         return rc;
  506 }
  507 
  508 void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
  509 {
  510         if (!sdp_sk(sk)->qp_active)
  511                 return;
  512 
  513         ib_fmr_pool_unmap(*_fmr);
  514         *_fmr = NULL;
  515 
  516         ib_umem_release(*_umem);
  517         *_umem = NULL;
  518 }
  519 
  520 static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa)
  521 {
  522         struct sdp_sock *ssk = sdp_sk(sk);
  523         struct ib_send_wr *bad_wr;
  524         struct ib_send_wr wr = { NULL };
  525         struct ib_sge sge;
  526 
  527         wr.opcode = IB_WR_RDMA_READ;
  528         wr.next = NULL;
  529         wr.wr_id = SDP_OP_RDMA;
  530         wr.wr.rdma.rkey = rx_sa->rkey;
  531         wr.send_flags = 0;
  532 
  533         ssk->tx_ring.rdma_inflight = rx_sa;
  534 
  535         sge.addr = rx_sa->umem->offset;
  536         sge.length = rx_sa->umem->length;
  537         sge.lkey = rx_sa->fmr->fmr->lkey;
  538 
  539         wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
  540         wr.num_sge = 1;
  541         wr.sg_list = &sge;
  542         rx_sa->busy++;
  543 
  544         wr.send_flags = IB_SEND_SIGNALED;
  545 
  546         return ib_post_send(ssk->qp, &wr, &bad_wr);
  547 }
  548 
  549 int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb,
  550                 unsigned long *used)
  551 {
  552         struct sdp_sock *ssk = sdp_sk(sk);
  553         struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb);
  554         int got_srcavail_cancel;
  555         int rc = 0;
  556         int len = *used;
  557         int copied;
  558 
  559         sdp_dbg_data(ssk->socket, "preparing RDMA read."
  560                 " len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
  561 
  562         sock_hold(sk, SOCK_REF_RDMA_RD);
  563 
  564         if (len > rx_sa->len) {
  565                 sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
  566                 WARN_ON(1);
  567                 len = rx_sa->len;
  568         }
  569 
  570         rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
  571         if (rc) {
  572                 sdp_warn(sk, "Error allocating fmr: %d\n", rc);
  573                 goto err_alloc_fmr;
  574         }
  575 
  576         rc = sdp_post_rdma_read(sk, rx_sa);
  577         if (unlikely(rc)) {
  578                 sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
  579                 sdp_set_error(ssk->socket, -ECONNRESET);
  580                 wake_up(&ssk->wq);
  581                 goto err_post_send;
  582         }
  583 
  584         sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc);
  585 
  586         got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
  587 
  588         sdp_arm_tx_cq(sk);
  589 
  590         sdp_wait_rdma_wr_finished(ssk);
  591 
  592         sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc);
  593         if (!ssk->qp_active) {
  594                 sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
  595                 rc = -EPIPE;
  596                 goto err_post_send;
  597         }
  598 
  599         copied = rx_sa->umem->length;
  600 
  601         sdp_update_iov_used(sk, iov, copied);
  602         rx_sa->used += copied;
  603         atomic_add(copied, &ssk->rcv_nxt);
  604         *used = copied;
  605 
  606         ssk->tx_ring.rdma_inflight = NULL;
  607 
  608 err_post_send:
  609         sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
  610 
  611 err_alloc_fmr:
  612         if (rc && ssk->qp_active) {
  613                 sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
  614                 rx_sa->flags |= RX_SA_ABORTED;
  615         }
  616 
  617         sock_put(sk, SOCK_REF_RDMA_RD);
  618 
  619         return rc;
  620 }
  621 
  622 static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p)
  623 {
  624         struct sdp_sock *ssk = sdp_sk(sk);
  625         int ret = 0;
  626         int credits_needed = 1;
  627 
  628         sdp_dbg_data(sk, "Wait for mem\n");
  629 
  630         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
  631 
  632         SDPSTATS_COUNTER_INC(send_wait_for_mem);
  633 
  634         sdp_do_posts(ssk);
  635 
  636         sdp_xmit_poll(ssk, 1);
  637 
  638         ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
  639 
  640         return ret;
  641 }
  642 
  643 static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa,
  644                 struct iovec *iov, long *timeo)
  645 {
  646         struct sdp_sock *ssk = sdp_sk(sk);
  647         int rc = 0;
  648         unsigned long lock_flags;
  649 
  650         rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
  651                         &tx_sa->fmr, &tx_sa->umem);
  652         if (rc) {
  653                 sdp_warn(sk, "Error allocating fmr: %d\n", rc);
  654                 goto err_alloc_fmr;
  655         }
  656 
  657         if (tx_slots_free(ssk) == 0) {
  658                 rc = wait_for_sndbuf(sk, timeo);
  659                 if (rc) {
  660                         sdp_warn(sk, "Couldn't get send buffer\n");
  661                         goto err_no_tx_slots;
  662                 }
  663         }
  664 
  665         rc = sdp_post_srcavail(sk, tx_sa);
  666         if (rc) {
  667                 sdp_dbg(sk, "Error posting SrcAvail\n");
  668                 goto err_abort_send;
  669         }
  670 
  671         rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
  672         if (unlikely(rc)) {
  673                 enum tx_sa_flag f = tx_sa->abort_flags;
  674 
  675                 if (f & TX_SA_SENDSM) {
  676                         sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
  677                 } else if (f & TX_SA_ERROR) {
  678                         sdp_dbg_data(sk, "SrcAvail error completion\n");
  679                         sdp_reset(sk);
  680                         SDPSTATS_COUNTER_INC(zcopy_tx_error);
  681                 } else if (ssk->qp_active) {
  682                         sdp_post_srcavail_cancel(sk);
  683 
  684                         /* Wait for RdmaRdCompl/SendSM to
  685                          * finish the transaction */
  686                         *timeo = 2 * HZ;
  687                         sdp_dbg_data(sk, "Waiting for SendSM\n");
  688                         sdp_wait_rdmardcompl(ssk, timeo, 1);
  689                         sdp_dbg_data(sk, "finished waiting\n");
  690 
  691                         cancel_delayed_work(&ssk->srcavail_cancel_work);
  692                 } else {
  693                         sdp_dbg_data(sk, "QP was destroyed while waiting\n");
  694                 }
  695         } else {
  696                 sdp_dbg_data(sk, "got RdmaRdCompl\n");
  697         }
  698 
  699         spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
  700         ssk->tx_sa = NULL;
  701         spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
  702 
  703 err_abort_send:
  704         sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
  705 
  706 err_no_tx_slots:
  707         sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
  708 
  709 err_alloc_fmr:
  710         return rc;      
  711 }
  712 
  713 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov)
  714 {
  715         struct sdp_sock *ssk = sdp_sk(sk);
  716         int rc = 0;
  717         long timeo;
  718         struct tx_srcavail_state *tx_sa;
  719         int offset;
  720         size_t bytes_to_copy = 0;
  721         int copied = 0;
  722 
  723         sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n",
  724                         iov->iov_base, iov->iov_len);
  725         sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
  726         if (ssk->rx_sa) {
  727                 sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
  728                 return 0;
  729         }
  730 
  731         sock_hold(ssk->socket, SOCK_REF_ZCOPY);
  732 
  733         SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
  734 
  735         timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
  736 
  737         /* Ok commence sending. */
  738         offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
  739 
  740         tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
  741         if (!tx_sa) {
  742                 sdp_warn(sk, "Error allocating zcopy context\n");
  743                 rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
  744                 goto err_alloc_tx_sa;
  745         }
  746 
  747         bytes_to_copy = iov->iov_len;
  748         do {
  749                 tx_sa_reset(tx_sa);
  750 
  751                 rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
  752 
  753                 if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) {
  754                         sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
  755                                 iov->iov_len);
  756                         break;
  757                 }
  758         } while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
  759 
  760         kfree(tx_sa);
  761 err_alloc_tx_sa:
  762         copied = bytes_to_copy - iov->iov_len;
  763 
  764         sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
  765 
  766         sock_put(ssk->socket, SOCK_REF_ZCOPY);
  767 
  768         if (rc < 0 && rc != -EAGAIN && rc != -ETIME)
  769                 return rc;
  770 
  771         return copied;
  772 }
  773 
  774 void sdp_abort_srcavail(struct socket *sk)
  775 {
  776         struct sdp_sock *ssk = sdp_sk(sk);
  777         struct tx_srcavail_state *tx_sa = ssk->tx_sa;
  778         unsigned long flags;
  779 
  780         if (!tx_sa)
  781                 return;
  782 
  783         cancel_delayed_work(&ssk->srcavail_cancel_work);
  784         flush_scheduled_work();
  785 
  786         spin_lock_irqsave(&ssk->tx_sa_lock, flags);
  787 
  788         sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
  789 
  790         ssk->tx_sa = NULL;
  791 
  792         spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
  793 }
  794 
  795 void sdp_abort_rdma_read(struct socket *sk)
  796 {
  797         struct sdp_sock *ssk = sdp_sk(sk);
  798         struct rx_srcavail_state *rx_sa = ssk->rx_sa;
  799 
  800         if (!rx_sa)
  801                 return;
  802 
  803         sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
  804 
  805         ssk->rx_sa = NULL;
  806 }

Cache object: 98ebc9811a3af0d5a58e48889aada5d6


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.