]> rtime.felk.cvut.cz Git - linux-imx.git/blob - net/sunrpc/clnt.c
SUNRPC: If the rpcbind channel is disconnected, fail the call to unregister
[linux-imx.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18  */
19
20
21 #include <linux/module.h>
22 #include <linux/types.h>
23 #include <linux/kallsyms.h>
24 #include <linux/mm.h>
25 #include <linux/namei.h>
26 #include <linux/mount.h>
27 #include <linux/slab.h>
28 #include <linux/utsname.h>
29 #include <linux/workqueue.h>
30 #include <linux/in.h>
31 #include <linux/in6.h>
32 #include <linux/un.h>
33 #include <linux/rcupdate.h>
34
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/addr.h>
37 #include <linux/sunrpc/rpc_pipe_fs.h>
38 #include <linux/sunrpc/metrics.h>
39 #include <linux/sunrpc/bc_xprt.h>
40 #include <trace/events/sunrpc.h>
41
42 #include "sunrpc.h"
43 #include "netns.h"
44
45 #ifdef RPC_DEBUG
46 # define RPCDBG_FACILITY        RPCDBG_CALL
47 #endif
48
49 #define dprint_status(t)                                        \
50         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
51                         __func__, t->tk_status)
52
53 /*
54  * All RPC clients are linked into this list
55  */
56
57 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
58
59
60 static void     call_start(struct rpc_task *task);
61 static void     call_reserve(struct rpc_task *task);
62 static void     call_reserveresult(struct rpc_task *task);
63 static void     call_allocate(struct rpc_task *task);
64 static void     call_decode(struct rpc_task *task);
65 static void     call_bind(struct rpc_task *task);
66 static void     call_bind_status(struct rpc_task *task);
67 static void     call_transmit(struct rpc_task *task);
68 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
69 static void     call_bc_transmit(struct rpc_task *task);
70 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
71 static void     call_status(struct rpc_task *task);
72 static void     call_transmit_status(struct rpc_task *task);
73 static void     call_refresh(struct rpc_task *task);
74 static void     call_refreshresult(struct rpc_task *task);
75 static void     call_timeout(struct rpc_task *task);
76 static void     call_connect(struct rpc_task *task);
77 static void     call_connect_status(struct rpc_task *task);
78
79 static __be32   *rpc_encode_header(struct rpc_task *task);
80 static __be32   *rpc_verify_header(struct rpc_task *task);
81 static int      rpc_ping(struct rpc_clnt *clnt);
82
83 static void rpc_register_client(struct rpc_clnt *clnt)
84 {
85         struct net *net = rpc_net_ns(clnt);
86         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
87
88         spin_lock(&sn->rpc_client_lock);
89         list_add(&clnt->cl_clients, &sn->all_clients);
90         spin_unlock(&sn->rpc_client_lock);
91 }
92
93 static void rpc_unregister_client(struct rpc_clnt *clnt)
94 {
95         struct net *net = rpc_net_ns(clnt);
96         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
97
98         spin_lock(&sn->rpc_client_lock);
99         list_del(&clnt->cl_clients);
100         spin_unlock(&sn->rpc_client_lock);
101 }
102
103 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
104 {
105         if (clnt->cl_dentry) {
106                 if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
107                         clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
108                 rpc_remove_client_dir(clnt->cl_dentry);
109         }
110         clnt->cl_dentry = NULL;
111 }
112
113 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
114 {
115         struct net *net = rpc_net_ns(clnt);
116         struct super_block *pipefs_sb;
117
118         pipefs_sb = rpc_get_sb_net(net);
119         if (pipefs_sb) {
120                 __rpc_clnt_remove_pipedir(clnt);
121                 rpc_put_sb_net(net);
122         }
123 }
124
125 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
126                                     struct rpc_clnt *clnt,
127                                     const char *dir_name)
128 {
129         static uint32_t clntid;
130         char name[15];
131         struct dentry *dir, *dentry;
132
133         dir = rpc_d_lookup_sb(sb, dir_name);
134         if (dir == NULL) {
135                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
136                 return dir;
137         }
138         for (;;) {
139                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
140                 name[sizeof(name) - 1] = '\0';
141                 dentry = rpc_create_client_dir(dir, name, clnt);
142                 if (!IS_ERR(dentry))
143                         break;
144                 if (dentry == ERR_PTR(-EEXIST))
145                         continue;
146                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
147                                 " %s/%s, error %ld\n",
148                                 dir_name, name, PTR_ERR(dentry));
149                 break;
150         }
151         dput(dir);
152         return dentry;
153 }
154
155 static int
156 rpc_setup_pipedir(struct rpc_clnt *clnt, const char *dir_name,
157                   struct super_block *pipefs_sb)
158 {
159         struct dentry *dentry;
160
161         clnt->cl_dentry = NULL;
162         if (dir_name == NULL)
163                 return 0;
164         dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt, dir_name);
165         if (IS_ERR(dentry))
166                 return PTR_ERR(dentry);
167         clnt->cl_dentry = dentry;
168         return 0;
169 }
170
171 static inline int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
172 {
173         if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
174             ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
175                 return 1;
176         if ((event == RPC_PIPEFS_MOUNT) && atomic_read(&clnt->cl_count) == 0)
177                 return 1;
178         return 0;
179 }
180
181 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
182                                    struct super_block *sb)
183 {
184         struct dentry *dentry;
185         int err = 0;
186
187         switch (event) {
188         case RPC_PIPEFS_MOUNT:
189                 dentry = rpc_setup_pipedir_sb(sb, clnt,
190                                               clnt->cl_program->pipe_dir_name);
191                 if (!dentry)
192                         return -ENOENT;
193                 if (IS_ERR(dentry))
194                         return PTR_ERR(dentry);
195                 clnt->cl_dentry = dentry;
196                 if (clnt->cl_auth->au_ops->pipes_create) {
197                         err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
198                         if (err)
199                                 __rpc_clnt_remove_pipedir(clnt);
200                 }
201                 break;
202         case RPC_PIPEFS_UMOUNT:
203                 __rpc_clnt_remove_pipedir(clnt);
204                 break;
205         default:
206                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
207                 return -ENOTSUPP;
208         }
209         return err;
210 }
211
212 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
213                                 struct super_block *sb)
214 {
215         int error = 0;
216
217         for (;; clnt = clnt->cl_parent) {
218                 if (!rpc_clnt_skip_event(clnt, event))
219                         error = __rpc_clnt_handle_event(clnt, event, sb);
220                 if (error || clnt == clnt->cl_parent)
221                         break;
222         }
223         return error;
224 }
225
226 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
227 {
228         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
229         struct rpc_clnt *clnt;
230
231         spin_lock(&sn->rpc_client_lock);
232         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
233                 if (clnt->cl_program->pipe_dir_name == NULL)
234                         continue;
235                 if (rpc_clnt_skip_event(clnt, event))
236                         continue;
237                 spin_unlock(&sn->rpc_client_lock);
238                 return clnt;
239         }
240         spin_unlock(&sn->rpc_client_lock);
241         return NULL;
242 }
243
244 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
245                             void *ptr)
246 {
247         struct super_block *sb = ptr;
248         struct rpc_clnt *clnt;
249         int error = 0;
250
251         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
252                 error = __rpc_pipefs_event(clnt, event, sb);
253                 if (error)
254                         break;
255         }
256         return error;
257 }
258
259 static struct notifier_block rpc_clients_block = {
260         .notifier_call  = rpc_pipefs_event,
261         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
262 };
263
264 int rpc_clients_notifier_register(void)
265 {
266         return rpc_pipefs_notifier_register(&rpc_clients_block);
267 }
268
269 void rpc_clients_notifier_unregister(void)
270 {
271         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
272 }
273
274 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
275 {
276         clnt->cl_nodelen = strlen(nodename);
277         if (clnt->cl_nodelen > UNX_MAXNODENAME)
278                 clnt->cl_nodelen = UNX_MAXNODENAME;
279         memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);
280 }
281
282 static int rpc_client_register(const struct rpc_create_args *args,
283                                struct rpc_clnt *clnt)
284 {
285         const struct rpc_program *program = args->program;
286         struct rpc_auth *auth;
287         struct net *net = rpc_net_ns(clnt);
288         struct super_block *pipefs_sb;
289         int err;
290
291         pipefs_sb = rpc_get_sb_net(net);
292         if (pipefs_sb) {
293                 err = rpc_setup_pipedir(clnt, program->pipe_dir_name, pipefs_sb);
294                 if (err)
295                         goto out;
296         }
297
298         rpc_register_client(clnt);
299         if (pipefs_sb)
300                 rpc_put_sb_net(net);
301
302         auth = rpcauth_create(args->authflavor, clnt);
303         if (IS_ERR(auth)) {
304                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
305                                 args->authflavor);
306                 err = PTR_ERR(auth);
307                 goto err_auth;
308         }
309         return 0;
310 err_auth:
311         pipefs_sb = rpc_get_sb_net(net);
312         rpc_unregister_client(clnt);
313         __rpc_clnt_remove_pipedir(clnt);
314 out:
315         if (pipefs_sb)
316                 rpc_put_sb_net(net);
317         return err;
318 }
319
320 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
321 {
322         const struct rpc_program *program = args->program;
323         const struct rpc_version *version;
324         struct rpc_clnt         *clnt = NULL;
325         int err;
326
327         /* sanity check the name before trying to print it */
328         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
329                         program->name, args->servername, xprt);
330
331         err = rpciod_up();
332         if (err)
333                 goto out_no_rpciod;
334
335         err = -EINVAL;
336         if (args->version >= program->nrvers)
337                 goto out_err;
338         version = program->version[args->version];
339         if (version == NULL)
340                 goto out_err;
341
342         err = -ENOMEM;
343         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
344         if (!clnt)
345                 goto out_err;
346         clnt->cl_parent = clnt;
347
348         rcu_assign_pointer(clnt->cl_xprt, xprt);
349         clnt->cl_procinfo = version->procs;
350         clnt->cl_maxproc  = version->nrprocs;
351         clnt->cl_protname = program->name;
352         clnt->cl_prog     = args->prognumber ? : program->number;
353         clnt->cl_vers     = version->number;
354         clnt->cl_stats    = program->stats;
355         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
356         err = -ENOMEM;
357         if (clnt->cl_metrics == NULL)
358                 goto out_no_stats;
359         clnt->cl_program  = program;
360         INIT_LIST_HEAD(&clnt->cl_tasks);
361         spin_lock_init(&clnt->cl_lock);
362
363         if (!xprt_bound(xprt))
364                 clnt->cl_autobind = 1;
365
366         clnt->cl_timeout = xprt->timeout;
367         if (args->timeout != NULL) {
368                 memcpy(&clnt->cl_timeout_default, args->timeout,
369                                 sizeof(clnt->cl_timeout_default));
370                 clnt->cl_timeout = &clnt->cl_timeout_default;
371         }
372
373         clnt->cl_rtt = &clnt->cl_rtt_default;
374         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
375         clnt->cl_principal = NULL;
376         if (args->client_name) {
377                 clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
378                 if (!clnt->cl_principal)
379                         goto out_no_principal;
380         }
381
382         atomic_set(&clnt->cl_count, 1);
383
384         /* save the nodename */
385         rpc_clnt_set_nodename(clnt, utsname()->nodename);
386
387         err = rpc_client_register(args, clnt);
388         if (err)
389                 goto out_no_path;
390         return clnt;
391
392 out_no_path:
393         kfree(clnt->cl_principal);
394 out_no_principal:
395         rpc_free_iostats(clnt->cl_metrics);
396 out_no_stats:
397         kfree(clnt);
398 out_err:
399         rpciod_down();
400 out_no_rpciod:
401         xprt_put(xprt);
402         return ERR_PTR(err);
403 }
404
405 /**
406  * rpc_create - create an RPC client and transport with one call
407  * @args: rpc_clnt create argument structure
408  *
409  * Creates and initializes an RPC transport and an RPC client.
410  *
411  * It can ping the server in order to determine if it is up, and to see if
412  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
413  * this behavior so asynchronous tasks can also use rpc_create.
414  */
415 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
416 {
417         struct rpc_xprt *xprt;
418         struct rpc_clnt *clnt;
419         struct xprt_create xprtargs = {
420                 .net = args->net,
421                 .ident = args->protocol,
422                 .srcaddr = args->saddress,
423                 .dstaddr = args->address,
424                 .addrlen = args->addrsize,
425                 .servername = args->servername,
426                 .bc_xprt = args->bc_xprt,
427         };
428         char servername[48];
429
430         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
431                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
432         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
433                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
434         /*
435          * If the caller chooses not to specify a hostname, whip
436          * up a string representation of the passed-in address.
437          */
438         if (xprtargs.servername == NULL) {
439                 struct sockaddr_un *sun =
440                                 (struct sockaddr_un *)args->address;
441                 struct sockaddr_in *sin =
442                                 (struct sockaddr_in *)args->address;
443                 struct sockaddr_in6 *sin6 =
444                                 (struct sockaddr_in6 *)args->address;
445
446                 servername[0] = '\0';
447                 switch (args->address->sa_family) {
448                 case AF_LOCAL:
449                         snprintf(servername, sizeof(servername), "%s",
450                                  sun->sun_path);
451                         break;
452                 case AF_INET:
453                         snprintf(servername, sizeof(servername), "%pI4",
454                                  &sin->sin_addr.s_addr);
455                         break;
456                 case AF_INET6:
457                         snprintf(servername, sizeof(servername), "%pI6",
458                                  &sin6->sin6_addr);
459                         break;
460                 default:
461                         /* caller wants default server name, but
462                          * address family isn't recognized. */
463                         return ERR_PTR(-EINVAL);
464                 }
465                 xprtargs.servername = servername;
466         }
467
468         xprt = xprt_create_transport(&xprtargs);
469         if (IS_ERR(xprt))
470                 return (struct rpc_clnt *)xprt;
471
472         /*
473          * By default, kernel RPC client connects from a reserved port.
474          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
475          * but it is always enabled for rpciod, which handles the connect
476          * operation.
477          */
478         xprt->resvport = 1;
479         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
480                 xprt->resvport = 0;
481
482         clnt = rpc_new_client(args, xprt);
483         if (IS_ERR(clnt))
484                 return clnt;
485
486         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
487                 int err = rpc_ping(clnt);
488                 if (err != 0) {
489                         rpc_shutdown_client(clnt);
490                         return ERR_PTR(err);
491                 }
492         }
493
494         clnt->cl_softrtry = 1;
495         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
496                 clnt->cl_softrtry = 0;
497
498         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
499                 clnt->cl_autobind = 1;
500         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
501                 clnt->cl_discrtry = 1;
502         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
503                 clnt->cl_chatty = 1;
504
505         return clnt;
506 }
507 EXPORT_SYMBOL_GPL(rpc_create);
508
509 /*
510  * This function clones the RPC client structure. It allows us to share the
511  * same transport while varying parameters such as the authentication
512  * flavour.
513  */
514 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
515                                            struct rpc_clnt *clnt)
516 {
517         struct rpc_xprt *xprt;
518         struct rpc_clnt *new;
519         int err;
520
521         err = -ENOMEM;
522         rcu_read_lock();
523         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
524         rcu_read_unlock();
525         if (xprt == NULL)
526                 goto out_err;
527         args->servername = xprt->servername;
528
529         new = rpc_new_client(args, xprt);
530         if (IS_ERR(new)) {
531                 err = PTR_ERR(new);
532                 goto out_err;
533         }
534
535         atomic_inc(&clnt->cl_count);
536         new->cl_parent = clnt;
537
538         /* Turn off autobind on clones */
539         new->cl_autobind = 0;
540         new->cl_softrtry = clnt->cl_softrtry;
541         new->cl_discrtry = clnt->cl_discrtry;
542         new->cl_chatty = clnt->cl_chatty;
543         return new;
544
545 out_err:
546         dprintk("RPC:       %s: returned error %d\n", __func__, err);
547         return ERR_PTR(err);
548 }
549
550 /**
551  * rpc_clone_client - Clone an RPC client structure
552  *
553  * @clnt: RPC client whose parameters are copied
554  *
555  * Returns a fresh RPC client or an ERR_PTR.
556  */
557 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
558 {
559         struct rpc_create_args args = {
560                 .program        = clnt->cl_program,
561                 .prognumber     = clnt->cl_prog,
562                 .version        = clnt->cl_vers,
563                 .authflavor     = clnt->cl_auth->au_flavor,
564                 .client_name    = clnt->cl_principal,
565         };
566         return __rpc_clone_client(&args, clnt);
567 }
568 EXPORT_SYMBOL_GPL(rpc_clone_client);
569
570 /**
571  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
572  *
573  * @clnt: RPC client whose parameters are copied
574  * @flavor: security flavor for new client
575  *
576  * Returns a fresh RPC client or an ERR_PTR.
577  */
578 struct rpc_clnt *
579 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
580 {
581         struct rpc_create_args args = {
582                 .program        = clnt->cl_program,
583                 .prognumber     = clnt->cl_prog,
584                 .version        = clnt->cl_vers,
585                 .authflavor     = flavor,
586                 .client_name    = clnt->cl_principal,
587         };
588         return __rpc_clone_client(&args, clnt);
589 }
590 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
591
592 /*
593  * Kill all tasks for the given client.
594  * XXX: kill their descendants as well?
595  */
596 void rpc_killall_tasks(struct rpc_clnt *clnt)
597 {
598         struct rpc_task *rovr;
599
600
601         if (list_empty(&clnt->cl_tasks))
602                 return;
603         dprintk("RPC:       killing all tasks for client %p\n", clnt);
604         /*
605          * Spin lock all_tasks to prevent changes...
606          */
607         spin_lock(&clnt->cl_lock);
608         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
609                 if (!RPC_IS_ACTIVATED(rovr))
610                         continue;
611                 if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
612                         rovr->tk_flags |= RPC_TASK_KILLED;
613                         rpc_exit(rovr, -EIO);
614                         if (RPC_IS_QUEUED(rovr))
615                                 rpc_wake_up_queued_task(rovr->tk_waitqueue,
616                                                         rovr);
617                 }
618         }
619         spin_unlock(&clnt->cl_lock);
620 }
621 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
622
623 /*
624  * Properly shut down an RPC client, terminating all outstanding
625  * requests.
626  */
627 void rpc_shutdown_client(struct rpc_clnt *clnt)
628 {
629         might_sleep();
630
631         dprintk_rcu("RPC:       shutting down %s client for %s\n",
632                         clnt->cl_protname,
633                         rcu_dereference(clnt->cl_xprt)->servername);
634
635         while (!list_empty(&clnt->cl_tasks)) {
636                 rpc_killall_tasks(clnt);
637                 wait_event_timeout(destroy_wait,
638                         list_empty(&clnt->cl_tasks), 1*HZ);
639         }
640
641         rpc_release_client(clnt);
642 }
643 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
644
645 /*
646  * Free an RPC client
647  */
648 static void
649 rpc_free_client(struct rpc_clnt *clnt)
650 {
651         dprintk_rcu("RPC:       destroying %s client for %s\n",
652                         clnt->cl_protname,
653                         rcu_dereference(clnt->cl_xprt)->servername);
654         if (clnt->cl_parent != clnt)
655                 rpc_release_client(clnt->cl_parent);
656         rpc_clnt_remove_pipedir(clnt);
657         rpc_unregister_client(clnt);
658         rpc_free_iostats(clnt->cl_metrics);
659         kfree(clnt->cl_principal);
660         clnt->cl_metrics = NULL;
661         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
662         rpciod_down();
663         kfree(clnt);
664 }
665
666 /*
667  * Free an RPC client
668  */
669 static void
670 rpc_free_auth(struct rpc_clnt *clnt)
671 {
672         if (clnt->cl_auth == NULL) {
673                 rpc_free_client(clnt);
674                 return;
675         }
676
677         /*
678          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
679          *       release remaining GSS contexts. This mechanism ensures
680          *       that it can do so safely.
681          */
682         atomic_inc(&clnt->cl_count);
683         rpcauth_release(clnt->cl_auth);
684         clnt->cl_auth = NULL;
685         if (atomic_dec_and_test(&clnt->cl_count))
686                 rpc_free_client(clnt);
687 }
688
689 /*
690  * Release reference to the RPC client
691  */
692 void
693 rpc_release_client(struct rpc_clnt *clnt)
694 {
695         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
696
697         if (list_empty(&clnt->cl_tasks))
698                 wake_up(&destroy_wait);
699         if (atomic_dec_and_test(&clnt->cl_count))
700                 rpc_free_auth(clnt);
701 }
702 EXPORT_SYMBOL_GPL(rpc_release_client);
703
704 /**
705  * rpc_bind_new_program - bind a new RPC program to an existing client
706  * @old: old rpc_client
707  * @program: rpc program to set
708  * @vers: rpc program version
709  *
710  * Clones the rpc client and sets up a new RPC program. This is mainly
711  * of use for enabling different RPC programs to share the same transport.
712  * The Sun NFSv2/v3 ACL protocol can do this.
713  */
714 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
715                                       const struct rpc_program *program,
716                                       u32 vers)
717 {
718         struct rpc_create_args args = {
719                 .program        = program,
720                 .prognumber     = program->number,
721                 .version        = vers,
722                 .authflavor     = old->cl_auth->au_flavor,
723                 .client_name    = old->cl_principal,
724         };
725         struct rpc_clnt *clnt;
726         int err;
727
728         clnt = __rpc_clone_client(&args, old);
729         if (IS_ERR(clnt))
730                 goto out;
731         err = rpc_ping(clnt);
732         if (err != 0) {
733                 rpc_shutdown_client(clnt);
734                 clnt = ERR_PTR(err);
735         }
736 out:
737         return clnt;
738 }
739 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
740
741 void rpc_task_release_client(struct rpc_task *task)
742 {
743         struct rpc_clnt *clnt = task->tk_client;
744
745         if (clnt != NULL) {
746                 /* Remove from client task list */
747                 spin_lock(&clnt->cl_lock);
748                 list_del(&task->tk_task);
749                 spin_unlock(&clnt->cl_lock);
750                 task->tk_client = NULL;
751
752                 rpc_release_client(clnt);
753         }
754 }
755
756 static
757 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
758 {
759         if (clnt != NULL) {
760                 rpc_task_release_client(task);
761                 task->tk_client = clnt;
762                 atomic_inc(&clnt->cl_count);
763                 if (clnt->cl_softrtry)
764                         task->tk_flags |= RPC_TASK_SOFT;
765                 if (sk_memalloc_socks()) {
766                         struct rpc_xprt *xprt;
767
768                         rcu_read_lock();
769                         xprt = rcu_dereference(clnt->cl_xprt);
770                         if (xprt->swapper)
771                                 task->tk_flags |= RPC_TASK_SWAPPER;
772                         rcu_read_unlock();
773                 }
774                 /* Add to the client's list of all tasks */
775                 spin_lock(&clnt->cl_lock);
776                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
777                 spin_unlock(&clnt->cl_lock);
778         }
779 }
780
781 void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
782 {
783         rpc_task_release_client(task);
784         rpc_task_set_client(task, clnt);
785 }
786 EXPORT_SYMBOL_GPL(rpc_task_reset_client);
787
788
789 static void
790 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
791 {
792         if (msg != NULL) {
793                 task->tk_msg.rpc_proc = msg->rpc_proc;
794                 task->tk_msg.rpc_argp = msg->rpc_argp;
795                 task->tk_msg.rpc_resp = msg->rpc_resp;
796                 if (msg->rpc_cred != NULL)
797                         task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
798         }
799 }
800
801 /*
802  * Default callback for async RPC calls
803  */
804 static void
805 rpc_default_callback(struct rpc_task *task, void *data)
806 {
807 }
808
809 static const struct rpc_call_ops rpc_default_ops = {
810         .rpc_call_done = rpc_default_callback,
811 };
812
813 /**
814  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
815  * @task_setup_data: pointer to task initialisation data
816  */
817 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
818 {
819         struct rpc_task *task;
820
821         task = rpc_new_task(task_setup_data);
822         if (IS_ERR(task))
823                 goto out;
824
825         rpc_task_set_client(task, task_setup_data->rpc_client);
826         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
827
828         if (task->tk_action == NULL)
829                 rpc_call_start(task);
830
831         atomic_inc(&task->tk_count);
832         rpc_execute(task);
833 out:
834         return task;
835 }
836 EXPORT_SYMBOL_GPL(rpc_run_task);
837
838 /**
839  * rpc_call_sync - Perform a synchronous RPC call
840  * @clnt: pointer to RPC client
841  * @msg: RPC call parameters
842  * @flags: RPC call flags
843  */
844 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
845 {
846         struct rpc_task *task;
847         struct rpc_task_setup task_setup_data = {
848                 .rpc_client = clnt,
849                 .rpc_message = msg,
850                 .callback_ops = &rpc_default_ops,
851                 .flags = flags,
852         };
853         int status;
854
855         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
856         if (flags & RPC_TASK_ASYNC) {
857                 rpc_release_calldata(task_setup_data.callback_ops,
858                         task_setup_data.callback_data);
859                 return -EINVAL;
860         }
861
862         task = rpc_run_task(&task_setup_data);
863         if (IS_ERR(task))
864                 return PTR_ERR(task);
865         status = task->tk_status;
866         rpc_put_task(task);
867         return status;
868 }
869 EXPORT_SYMBOL_GPL(rpc_call_sync);
870
871 /**
872  * rpc_call_async - Perform an asynchronous RPC call
873  * @clnt: pointer to RPC client
874  * @msg: RPC call parameters
875  * @flags: RPC call flags
876  * @tk_ops: RPC call ops
877  * @data: user call data
878  */
879 int
880 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
881                const struct rpc_call_ops *tk_ops, void *data)
882 {
883         struct rpc_task *task;
884         struct rpc_task_setup task_setup_data = {
885                 .rpc_client = clnt,
886                 .rpc_message = msg,
887                 .callback_ops = tk_ops,
888                 .callback_data = data,
889                 .flags = flags|RPC_TASK_ASYNC,
890         };
891
892         task = rpc_run_task(&task_setup_data);
893         if (IS_ERR(task))
894                 return PTR_ERR(task);
895         rpc_put_task(task);
896         return 0;
897 }
898 EXPORT_SYMBOL_GPL(rpc_call_async);
899
900 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
901 /**
902  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
903  * rpc_execute against it
904  * @req: RPC request
905  * @tk_ops: RPC call ops
906  */
907 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
908                                 const struct rpc_call_ops *tk_ops)
909 {
910         struct rpc_task *task;
911         struct xdr_buf *xbufp = &req->rq_snd_buf;
912         struct rpc_task_setup task_setup_data = {
913                 .callback_ops = tk_ops,
914         };
915
916         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
917         /*
918          * Create an rpc_task to send the data
919          */
920         task = rpc_new_task(&task_setup_data);
921         if (IS_ERR(task)) {
922                 xprt_free_bc_request(req);
923                 goto out;
924         }
925         task->tk_rqstp = req;
926
927         /*
928          * Set up the xdr_buf length.
929          * This also indicates that the buffer is XDR encoded already.
930          */
931         xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
932                         xbufp->tail[0].iov_len;
933
934         task->tk_action = call_bc_transmit;
935         atomic_inc(&task->tk_count);
936         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
937         rpc_execute(task);
938
939 out:
940         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
941         return task;
942 }
943 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
944
945 void
946 rpc_call_start(struct rpc_task *task)
947 {
948         task->tk_action = call_start;
949 }
950 EXPORT_SYMBOL_GPL(rpc_call_start);
951
952 /**
953  * rpc_peeraddr - extract remote peer address from clnt's xprt
954  * @clnt: RPC client structure
955  * @buf: target buffer
956  * @bufsize: length of target buffer
957  *
958  * Returns the number of bytes that are actually in the stored address.
959  */
960 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
961 {
962         size_t bytes;
963         struct rpc_xprt *xprt;
964
965         rcu_read_lock();
966         xprt = rcu_dereference(clnt->cl_xprt);
967
968         bytes = xprt->addrlen;
969         if (bytes > bufsize)
970                 bytes = bufsize;
971         memcpy(buf, &xprt->addr, bytes);
972         rcu_read_unlock();
973
974         return bytes;
975 }
976 EXPORT_SYMBOL_GPL(rpc_peeraddr);
977
978 /**
979  * rpc_peeraddr2str - return remote peer address in printable format
980  * @clnt: RPC client structure
981  * @format: address format
982  *
983  * NB: the lifetime of the memory referenced by the returned pointer is
984  * the same as the rpc_xprt itself.  As long as the caller uses this
985  * pointer, it must hold the RCU read lock.
986  */
987 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
988                              enum rpc_display_format_t format)
989 {
990         struct rpc_xprt *xprt;
991
992         xprt = rcu_dereference(clnt->cl_xprt);
993
994         if (xprt->address_strings[format] != NULL)
995                 return xprt->address_strings[format];
996         else
997                 return "unprintable";
998 }
999 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1000
1001 static const struct sockaddr_in rpc_inaddr_loopback = {
1002         .sin_family             = AF_INET,
1003         .sin_addr.s_addr        = htonl(INADDR_ANY),
1004 };
1005
1006 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1007         .sin6_family            = AF_INET6,
1008         .sin6_addr              = IN6ADDR_ANY_INIT,
1009 };
1010
1011 /*
1012  * Try a getsockname() on a connected datagram socket.  Using a
1013  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1014  * This conserves the ephemeral port number space.
1015  *
1016  * Returns zero and fills in "buf" if successful; otherwise, a
1017  * negative errno is returned.
1018  */
1019 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1020                         struct sockaddr *buf, int buflen)
1021 {
1022         struct socket *sock;
1023         int err;
1024
1025         err = __sock_create(net, sap->sa_family,
1026                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1027         if (err < 0) {
1028                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1029                 goto out;
1030         }
1031
1032         switch (sap->sa_family) {
1033         case AF_INET:
1034                 err = kernel_bind(sock,
1035                                 (struct sockaddr *)&rpc_inaddr_loopback,
1036                                 sizeof(rpc_inaddr_loopback));
1037                 break;
1038         case AF_INET6:
1039                 err = kernel_bind(sock,
1040                                 (struct sockaddr *)&rpc_in6addr_loopback,
1041                                 sizeof(rpc_in6addr_loopback));
1042                 break;
1043         default:
1044                 err = -EAFNOSUPPORT;
1045                 goto out;
1046         }
1047         if (err < 0) {
1048                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1049                 goto out_release;
1050         }
1051
1052         err = kernel_connect(sock, sap, salen, 0);
1053         if (err < 0) {
1054                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1055                 goto out_release;
1056         }
1057
1058         err = kernel_getsockname(sock, buf, &buflen);
1059         if (err < 0) {
1060                 dprintk("RPC:       getsockname failed (%d)\n", err);
1061                 goto out_release;
1062         }
1063
1064         err = 0;
1065         if (buf->sa_family == AF_INET6) {
1066                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1067                 sin6->sin6_scope_id = 0;
1068         }
1069         dprintk("RPC:       %s succeeded\n", __func__);
1070
1071 out_release:
1072         sock_release(sock);
1073 out:
1074         return err;
1075 }
1076
1077 /*
1078  * Scraping a connected socket failed, so we don't have a useable
1079  * local address.  Fallback: generate an address that will prevent
1080  * the server from calling us back.
1081  *
1082  * Returns zero and fills in "buf" if successful; otherwise, a
1083  * negative errno is returned.
1084  */
1085 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1086 {
1087         switch (family) {
1088         case AF_INET:
1089                 if (buflen < sizeof(rpc_inaddr_loopback))
1090                         return -EINVAL;
1091                 memcpy(buf, &rpc_inaddr_loopback,
1092                                 sizeof(rpc_inaddr_loopback));
1093                 break;
1094         case AF_INET6:
1095                 if (buflen < sizeof(rpc_in6addr_loopback))
1096                         return -EINVAL;
1097                 memcpy(buf, &rpc_in6addr_loopback,
1098                                 sizeof(rpc_in6addr_loopback));
1099         default:
1100                 dprintk("RPC:       %s: address family not supported\n",
1101                         __func__);
1102                 return -EAFNOSUPPORT;
1103         }
1104         dprintk("RPC:       %s: succeeded\n", __func__);
1105         return 0;
1106 }
1107
1108 /**
1109  * rpc_localaddr - discover local endpoint address for an RPC client
1110  * @clnt: RPC client structure
1111  * @buf: target buffer
1112  * @buflen: size of target buffer, in bytes
1113  *
1114  * Returns zero and fills in "buf" and "buflen" if successful;
1115  * otherwise, a negative errno is returned.
1116  *
1117  * This works even if the underlying transport is not currently connected,
1118  * or if the upper layer never previously provided a source address.
1119  *
1120  * The result of this function call is transient: multiple calls in
1121  * succession may give different results, depending on how local
1122  * networking configuration changes over time.
1123  */
1124 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1125 {
1126         struct sockaddr_storage address;
1127         struct sockaddr *sap = (struct sockaddr *)&address;
1128         struct rpc_xprt *xprt;
1129         struct net *net;
1130         size_t salen;
1131         int err;
1132
1133         rcu_read_lock();
1134         xprt = rcu_dereference(clnt->cl_xprt);
1135         salen = xprt->addrlen;
1136         memcpy(sap, &xprt->addr, salen);
1137         net = get_net(xprt->xprt_net);
1138         rcu_read_unlock();
1139
1140         rpc_set_port(sap, 0);
1141         err = rpc_sockname(net, sap, salen, buf, buflen);
1142         put_net(net);
1143         if (err != 0)
1144                 /* Couldn't discover local address, return ANYADDR */
1145                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1146         return 0;
1147 }
1148 EXPORT_SYMBOL_GPL(rpc_localaddr);
1149
1150 void
1151 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1152 {
1153         struct rpc_xprt *xprt;
1154
1155         rcu_read_lock();
1156         xprt = rcu_dereference(clnt->cl_xprt);
1157         if (xprt->ops->set_buffer_size)
1158                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1159         rcu_read_unlock();
1160 }
1161 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1162
1163 /**
1164  * rpc_protocol - Get transport protocol number for an RPC client
1165  * @clnt: RPC client to query
1166  *
1167  */
1168 int rpc_protocol(struct rpc_clnt *clnt)
1169 {
1170         int protocol;
1171
1172         rcu_read_lock();
1173         protocol = rcu_dereference(clnt->cl_xprt)->prot;
1174         rcu_read_unlock();
1175         return protocol;
1176 }
1177 EXPORT_SYMBOL_GPL(rpc_protocol);
1178
1179 /**
1180  * rpc_net_ns - Get the network namespace for this RPC client
1181  * @clnt: RPC client to query
1182  *
1183  */
1184 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1185 {
1186         struct net *ret;
1187
1188         rcu_read_lock();
1189         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1190         rcu_read_unlock();
1191         return ret;
1192 }
1193 EXPORT_SYMBOL_GPL(rpc_net_ns);
1194
1195 /**
1196  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1197  * @clnt: RPC client to query
1198  *
1199  * For stream transports, this is one RPC record fragment (see RFC
1200  * 1831), as we don't support multi-record requests yet.  For datagram
1201  * transports, this is the size of an IP packet minus the IP, UDP, and
1202  * RPC header sizes.
1203  */
1204 size_t rpc_max_payload(struct rpc_clnt *clnt)
1205 {
1206         size_t ret;
1207
1208         rcu_read_lock();
1209         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1210         rcu_read_unlock();
1211         return ret;
1212 }
1213 EXPORT_SYMBOL_GPL(rpc_max_payload);
1214
1215 /**
1216  * rpc_get_timeout - Get timeout for transport in units of HZ
1217  * @clnt: RPC client to query
1218  */
1219 unsigned long rpc_get_timeout(struct rpc_clnt *clnt)
1220 {
1221         unsigned long ret;
1222
1223         rcu_read_lock();
1224         ret = rcu_dereference(clnt->cl_xprt)->timeout->to_initval;
1225         rcu_read_unlock();
1226         return ret;
1227 }
1228 EXPORT_SYMBOL_GPL(rpc_get_timeout);
1229
1230 /**
1231  * rpc_force_rebind - force transport to check that remote port is unchanged
1232  * @clnt: client to rebind
1233  *
1234  */
1235 void rpc_force_rebind(struct rpc_clnt *clnt)
1236 {
1237         if (clnt->cl_autobind) {
1238                 rcu_read_lock();
1239                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1240                 rcu_read_unlock();
1241         }
1242 }
1243 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1244
1245 /*
1246  * Restart an (async) RPC call from the call_prepare state.
1247  * Usually called from within the exit handler.
1248  */
1249 int
1250 rpc_restart_call_prepare(struct rpc_task *task)
1251 {
1252         if (RPC_ASSASSINATED(task))
1253                 return 0;
1254         task->tk_action = call_start;
1255         if (task->tk_ops->rpc_call_prepare != NULL)
1256                 task->tk_action = rpc_prepare_task;
1257         return 1;
1258 }
1259 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1260
1261 /*
1262  * Restart an (async) RPC call. Usually called from within the
1263  * exit handler.
1264  */
1265 int
1266 rpc_restart_call(struct rpc_task *task)
1267 {
1268         if (RPC_ASSASSINATED(task))
1269                 return 0;
1270         task->tk_action = call_start;
1271         return 1;
1272 }
1273 EXPORT_SYMBOL_GPL(rpc_restart_call);
1274
1275 #ifdef RPC_DEBUG
1276 static const char *rpc_proc_name(const struct rpc_task *task)
1277 {
1278         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1279
1280         if (proc) {
1281                 if (proc->p_name)
1282                         return proc->p_name;
1283                 else
1284                         return "NULL";
1285         } else
1286                 return "no proc";
1287 }
1288 #endif
1289
1290 /*
1291  * 0.  Initial state
1292  *
1293  *     Other FSM states can be visited zero or more times, but
1294  *     this state is visited exactly once for each RPC.
1295  */
1296 static void
1297 call_start(struct rpc_task *task)
1298 {
1299         struct rpc_clnt *clnt = task->tk_client;
1300
1301         dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1302                         clnt->cl_protname, clnt->cl_vers,
1303                         rpc_proc_name(task),
1304                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
1305
1306         /* Increment call count */
1307         task->tk_msg.rpc_proc->p_count++;
1308         clnt->cl_stats->rpccnt++;
1309         task->tk_action = call_reserve;
1310 }
1311
1312 /*
1313  * 1.   Reserve an RPC call slot
1314  */
1315 static void
1316 call_reserve(struct rpc_task *task)
1317 {
1318         dprint_status(task);
1319
1320         task->tk_status  = 0;
1321         task->tk_action  = call_reserveresult;
1322         xprt_reserve(task);
1323 }
1324
1325 static void call_retry_reserve(struct rpc_task *task);
1326
1327 /*
1328  * 1b.  Grok the result of xprt_reserve()
1329  */
1330 static void
1331 call_reserveresult(struct rpc_task *task)
1332 {
1333         int status = task->tk_status;
1334
1335         dprint_status(task);
1336
1337         /*
1338          * After a call to xprt_reserve(), we must have either
1339          * a request slot or else an error status.
1340          */
1341         task->tk_status = 0;
1342         if (status >= 0) {
1343                 if (task->tk_rqstp) {
1344                         task->tk_action = call_refresh;
1345                         return;
1346                 }
1347
1348                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1349                                 __func__, status);
1350                 rpc_exit(task, -EIO);
1351                 return;
1352         }
1353
1354         /*
1355          * Even though there was an error, we may have acquired
1356          * a request slot somehow.  Make sure not to leak it.
1357          */
1358         if (task->tk_rqstp) {
1359                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1360                                 __func__, status);
1361                 xprt_release(task);
1362         }
1363
1364         switch (status) {
1365         case -ENOMEM:
1366                 rpc_delay(task, HZ >> 2);
1367         case -EAGAIN:   /* woken up; retry */
1368                 task->tk_action = call_retry_reserve;
1369                 return;
1370         case -EIO:      /* probably a shutdown */
1371                 break;
1372         default:
1373                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1374                                 __func__, status);
1375                 break;
1376         }
1377         rpc_exit(task, status);
1378 }
1379
1380 /*
1381  * 1c.  Retry reserving an RPC call slot
1382  */
1383 static void
1384 call_retry_reserve(struct rpc_task *task)
1385 {
1386         dprint_status(task);
1387
1388         task->tk_status  = 0;
1389         task->tk_action  = call_reserveresult;
1390         xprt_retry_reserve(task);
1391 }
1392
1393 /*
1394  * 2.   Bind and/or refresh the credentials
1395  */
1396 static void
1397 call_refresh(struct rpc_task *task)
1398 {
1399         dprint_status(task);
1400
1401         task->tk_action = call_refreshresult;
1402         task->tk_status = 0;
1403         task->tk_client->cl_stats->rpcauthrefresh++;
1404         rpcauth_refreshcred(task);
1405 }
1406
1407 /*
1408  * 2a.  Process the results of a credential refresh
1409  */
1410 static void
1411 call_refreshresult(struct rpc_task *task)
1412 {
1413         int status = task->tk_status;
1414
1415         dprint_status(task);
1416
1417         task->tk_status = 0;
1418         task->tk_action = call_refresh;
1419         switch (status) {
1420         case 0:
1421                 if (rpcauth_uptodatecred(task))
1422                         task->tk_action = call_allocate;
1423                 return;
1424         case -ETIMEDOUT:
1425                 rpc_delay(task, 3*HZ);
1426         case -EKEYEXPIRED:
1427         case -EAGAIN:
1428                 status = -EACCES;
1429                 if (!task->tk_cred_retry)
1430                         break;
1431                 task->tk_cred_retry--;
1432                 dprintk("RPC: %5u %s: retry refresh creds\n",
1433                                 task->tk_pid, __func__);
1434                 return;
1435         }
1436         dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1437                                 task->tk_pid, __func__, status);
1438         rpc_exit(task, status);
1439 }
1440
1441 /*
1442  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1443  *      (Note: buffer memory is freed in xprt_release).
1444  */
1445 static void
1446 call_allocate(struct rpc_task *task)
1447 {
1448         unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1449         struct rpc_rqst *req = task->tk_rqstp;
1450         struct rpc_xprt *xprt = req->rq_xprt;
1451         struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1452
1453         dprint_status(task);
1454
1455         task->tk_status = 0;
1456         task->tk_action = call_bind;
1457
1458         if (req->rq_buffer)
1459                 return;
1460
1461         if (proc->p_proc != 0) {
1462                 BUG_ON(proc->p_arglen == 0);
1463                 if (proc->p_decode != NULL)
1464                         BUG_ON(proc->p_replen == 0);
1465         }
1466
1467         /*
1468          * Calculate the size (in quads) of the RPC call
1469          * and reply headers, and convert both values
1470          * to byte sizes.
1471          */
1472         req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1473         req->rq_callsize <<= 2;
1474         req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1475         req->rq_rcvsize <<= 2;
1476
1477         req->rq_buffer = xprt->ops->buf_alloc(task,
1478                                         req->rq_callsize + req->rq_rcvsize);
1479         if (req->rq_buffer != NULL)
1480                 return;
1481
1482         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1483
1484         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1485                 task->tk_action = call_allocate;
1486                 rpc_delay(task, HZ>>4);
1487                 return;
1488         }
1489
1490         rpc_exit(task, -ERESTARTSYS);
1491 }
1492
1493 static inline int
1494 rpc_task_need_encode(struct rpc_task *task)
1495 {
1496         return task->tk_rqstp->rq_snd_buf.len == 0;
1497 }
1498
1499 static inline void
1500 rpc_task_force_reencode(struct rpc_task *task)
1501 {
1502         task->tk_rqstp->rq_snd_buf.len = 0;
1503         task->tk_rqstp->rq_bytes_sent = 0;
1504 }
1505
1506 static inline void
1507 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1508 {
1509         buf->head[0].iov_base = start;
1510         buf->head[0].iov_len = len;
1511         buf->tail[0].iov_len = 0;
1512         buf->page_len = 0;
1513         buf->flags = 0;
1514         buf->len = 0;
1515         buf->buflen = len;
1516 }
1517
1518 /*
1519  * 3.   Encode arguments of an RPC call
1520  */
1521 static void
1522 rpc_xdr_encode(struct rpc_task *task)
1523 {
1524         struct rpc_rqst *req = task->tk_rqstp;
1525         kxdreproc_t     encode;
1526         __be32          *p;
1527
1528         dprint_status(task);
1529
1530         rpc_xdr_buf_init(&req->rq_snd_buf,
1531                          req->rq_buffer,
1532                          req->rq_callsize);
1533         rpc_xdr_buf_init(&req->rq_rcv_buf,
1534                          (char *)req->rq_buffer + req->rq_callsize,
1535                          req->rq_rcvsize);
1536
1537         p = rpc_encode_header(task);
1538         if (p == NULL) {
1539                 printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1540                 rpc_exit(task, -EIO);
1541                 return;
1542         }
1543
1544         encode = task->tk_msg.rpc_proc->p_encode;
1545         if (encode == NULL)
1546                 return;
1547
1548         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1549                         task->tk_msg.rpc_argp);
1550 }
1551
1552 /*
1553  * 4.   Get the server port number if not yet set
1554  */
1555 static void
1556 call_bind(struct rpc_task *task)
1557 {
1558         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1559
1560         dprint_status(task);
1561
1562         task->tk_action = call_connect;
1563         if (!xprt_bound(xprt)) {
1564                 task->tk_action = call_bind_status;
1565                 task->tk_timeout = xprt->bind_timeout;
1566                 xprt->ops->rpcbind(task);
1567         }
1568 }
1569
1570 /*
1571  * 4a.  Sort out bind result
1572  */
1573 static void
1574 call_bind_status(struct rpc_task *task)
1575 {
1576         int status = -EIO;
1577
1578         if (task->tk_status >= 0) {
1579                 dprint_status(task);
1580                 task->tk_status = 0;
1581                 task->tk_action = call_connect;
1582                 return;
1583         }
1584
1585         trace_rpc_bind_status(task);
1586         switch (task->tk_status) {
1587         case -ENOMEM:
1588                 dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1589                 rpc_delay(task, HZ >> 2);
1590                 goto retry_timeout;
1591         case -EACCES:
1592                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
1593                                 "unavailable\n", task->tk_pid);
1594                 /* fail immediately if this is an RPC ping */
1595                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1596                         status = -EOPNOTSUPP;
1597                         break;
1598                 }
1599                 if (task->tk_rebind_retry == 0)
1600                         break;
1601                 task->tk_rebind_retry--;
1602                 rpc_delay(task, 3*HZ);
1603                 goto retry_timeout;
1604         case -ETIMEDOUT:
1605                 dprintk("RPC: %5u rpcbind request timed out\n",
1606                                 task->tk_pid);
1607                 goto retry_timeout;
1608         case -EPFNOSUPPORT:
1609                 /* server doesn't support any rpcbind version we know of */
1610                 dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1611                                 task->tk_pid);
1612                 break;
1613         case -EPROTONOSUPPORT:
1614                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1615                                 task->tk_pid);
1616                 task->tk_status = 0;
1617                 task->tk_action = call_bind;
1618                 return;
1619         case -ECONNREFUSED:             /* connection problems */
1620         case -ECONNRESET:
1621         case -ENOTCONN:
1622         case -EHOSTDOWN:
1623         case -EHOSTUNREACH:
1624         case -ENETUNREACH:
1625         case -EPIPE:
1626                 dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1627                                 task->tk_pid, task->tk_status);
1628                 if (!RPC_IS_SOFTCONN(task)) {
1629                         rpc_delay(task, 5*HZ);
1630                         goto retry_timeout;
1631                 }
1632                 status = task->tk_status;
1633                 break;
1634         default:
1635                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1636                                 task->tk_pid, -task->tk_status);
1637         }
1638
1639         rpc_exit(task, status);
1640         return;
1641
1642 retry_timeout:
1643         task->tk_action = call_timeout;
1644 }
1645
1646 /*
1647  * 4b.  Connect to the RPC server
1648  */
1649 static void
1650 call_connect(struct rpc_task *task)
1651 {
1652         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1653
1654         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1655                         task->tk_pid, xprt,
1656                         (xprt_connected(xprt) ? "is" : "is not"));
1657
1658         task->tk_action = call_transmit;
1659         if (!xprt_connected(xprt)) {
1660                 task->tk_action = call_connect_status;
1661                 if (task->tk_status < 0)
1662                         return;
1663                 if (task->tk_flags & RPC_TASK_NOCONNECT) {
1664                         rpc_exit(task, -ENOTCONN);
1665                         return;
1666                 }
1667                 xprt_connect(task);
1668         }
1669 }
1670
1671 /*
1672  * 4c.  Sort out connect result
1673  */
1674 static void
1675 call_connect_status(struct rpc_task *task)
1676 {
1677         struct rpc_clnt *clnt = task->tk_client;
1678         int status = task->tk_status;
1679
1680         dprint_status(task);
1681
1682         trace_rpc_connect_status(task, status);
1683         switch (status) {
1684                 /* if soft mounted, test if we've timed out */
1685         case -ETIMEDOUT:
1686                 task->tk_action = call_timeout;
1687                 return;
1688         case -ECONNREFUSED:
1689         case -ECONNRESET:
1690         case -ENETUNREACH:
1691                 if (RPC_IS_SOFTCONN(task))
1692                         break;
1693                 /* retry with existing socket, after a delay */
1694         case 0:
1695         case -EAGAIN:
1696                 task->tk_status = 0;
1697                 clnt->cl_stats->netreconn++;
1698                 task->tk_action = call_transmit;
1699                 return;
1700         }
1701         rpc_exit(task, status);
1702 }
1703
1704 /*
1705  * 5.   Transmit the RPC request, and wait for reply
1706  */
1707 static void
1708 call_transmit(struct rpc_task *task)
1709 {
1710         dprint_status(task);
1711
1712         task->tk_action = call_status;
1713         if (task->tk_status < 0)
1714                 return;
1715         task->tk_status = xprt_prepare_transmit(task);
1716         if (task->tk_status != 0)
1717                 return;
1718         task->tk_action = call_transmit_status;
1719         /* Encode here so that rpcsec_gss can use correct sequence number. */
1720         if (rpc_task_need_encode(task)) {
1721                 rpc_xdr_encode(task);
1722                 /* Did the encode result in an error condition? */
1723                 if (task->tk_status != 0) {
1724                         /* Was the error nonfatal? */
1725                         if (task->tk_status == -EAGAIN)
1726                                 rpc_delay(task, HZ >> 4);
1727                         else
1728                                 rpc_exit(task, task->tk_status);
1729                         return;
1730                 }
1731         }
1732         xprt_transmit(task);
1733         if (task->tk_status < 0)
1734                 return;
1735         /*
1736          * On success, ensure that we call xprt_end_transmit() before sleeping
1737          * in order to allow access to the socket to other RPC requests.
1738          */
1739         call_transmit_status(task);
1740         if (rpc_reply_expected(task))
1741                 return;
1742         task->tk_action = rpc_exit_task;
1743         rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1744 }
1745
1746 /*
1747  * 5a.  Handle cleanup after a transmission
1748  */
1749 static void
1750 call_transmit_status(struct rpc_task *task)
1751 {
1752         task->tk_action = call_status;
1753
1754         /*
1755          * Common case: success.  Force the compiler to put this
1756          * test first.
1757          */
1758         if (task->tk_status == 0) {
1759                 xprt_end_transmit(task);
1760                 rpc_task_force_reencode(task);
1761                 return;
1762         }
1763
1764         switch (task->tk_status) {
1765         case -EAGAIN:
1766                 break;
1767         default:
1768                 dprint_status(task);
1769                 xprt_end_transmit(task);
1770                 rpc_task_force_reencode(task);
1771                 break;
1772                 /*
1773                  * Special cases: if we've been waiting on the
1774                  * socket's write_space() callback, or if the
1775                  * socket just returned a connection error,
1776                  * then hold onto the transport lock.
1777                  */
1778         case -ECONNREFUSED:
1779         case -EHOSTDOWN:
1780         case -EHOSTUNREACH:
1781         case -ENETUNREACH:
1782                 if (RPC_IS_SOFTCONN(task)) {
1783                         xprt_end_transmit(task);
1784                         rpc_exit(task, task->tk_status);
1785                         break;
1786                 }
1787         case -ECONNRESET:
1788         case -ENOTCONN:
1789         case -EPIPE:
1790                 rpc_task_force_reencode(task);
1791         }
1792 }
1793
1794 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1795 /*
1796  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
1797  * addition, disconnect on connectivity errors.
1798  */
1799 static void
1800 call_bc_transmit(struct rpc_task *task)
1801 {
1802         struct rpc_rqst *req = task->tk_rqstp;
1803
1804         task->tk_status = xprt_prepare_transmit(task);
1805         if (task->tk_status == -EAGAIN) {
1806                 /*
1807                  * Could not reserve the transport. Try again after the
1808                  * transport is released.
1809                  */
1810                 task->tk_status = 0;
1811                 task->tk_action = call_bc_transmit;
1812                 return;
1813         }
1814
1815         task->tk_action = rpc_exit_task;
1816         if (task->tk_status < 0) {
1817                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1818                         "error: %d\n", task->tk_status);
1819                 return;
1820         }
1821
1822         xprt_transmit(task);
1823         xprt_end_transmit(task);
1824         dprint_status(task);
1825         switch (task->tk_status) {
1826         case 0:
1827                 /* Success */
1828                 break;
1829         case -EHOSTDOWN:
1830         case -EHOSTUNREACH:
1831         case -ENETUNREACH:
1832         case -ETIMEDOUT:
1833                 /*
1834                  * Problem reaching the server.  Disconnect and let the
1835                  * forechannel reestablish the connection.  The server will
1836                  * have to retransmit the backchannel request and we'll
1837                  * reprocess it.  Since these ops are idempotent, there's no
1838                  * need to cache our reply at this time.
1839                  */
1840                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1841                         "error: %d\n", task->tk_status);
1842                 xprt_conditional_disconnect(req->rq_xprt,
1843                         req->rq_connect_cookie);
1844                 break;
1845         default:
1846                 /*
1847                  * We were unable to reply and will have to drop the
1848                  * request.  The server should reconnect and retransmit.
1849                  */
1850                 WARN_ON_ONCE(task->tk_status == -EAGAIN);
1851                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1852                         "error: %d\n", task->tk_status);
1853                 break;
1854         }
1855         rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1856 }
1857 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1858
1859 /*
1860  * 6.   Sort out the RPC call status
1861  */
1862 static void
1863 call_status(struct rpc_task *task)
1864 {
1865         struct rpc_clnt *clnt = task->tk_client;
1866         struct rpc_rqst *req = task->tk_rqstp;
1867         int             status;
1868
1869         if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1870                 task->tk_status = req->rq_reply_bytes_recvd;
1871
1872         dprint_status(task);
1873
1874         status = task->tk_status;
1875         if (status >= 0) {
1876                 task->tk_action = call_decode;
1877                 return;
1878         }
1879
1880         trace_rpc_call_status(task);
1881         task->tk_status = 0;
1882         switch(status) {
1883         case -EHOSTDOWN:
1884         case -EHOSTUNREACH:
1885         case -ENETUNREACH:
1886                 /*
1887                  * Delay any retries for 3 seconds, then handle as if it
1888                  * were a timeout.
1889                  */
1890                 rpc_delay(task, 3*HZ);
1891         case -ETIMEDOUT:
1892                 task->tk_action = call_timeout;
1893                 if (task->tk_client->cl_discrtry)
1894                         xprt_conditional_disconnect(req->rq_xprt,
1895                                         req->rq_connect_cookie);
1896                 break;
1897         case -ECONNRESET:
1898         case -ECONNREFUSED:
1899                 rpc_force_rebind(clnt);
1900                 rpc_delay(task, 3*HZ);
1901         case -EPIPE:
1902         case -ENOTCONN:
1903                 task->tk_action = call_bind;
1904                 break;
1905         case -EAGAIN:
1906                 task->tk_action = call_transmit;
1907                 break;
1908         case -EIO:
1909                 /* shutdown or soft timeout */
1910                 rpc_exit(task, status);
1911                 break;
1912         default:
1913                 if (clnt->cl_chatty)
1914                         printk("%s: RPC call returned error %d\n",
1915                                clnt->cl_protname, -status);
1916                 rpc_exit(task, status);
1917         }
1918 }
1919
1920 /*
1921  * 6a.  Handle RPC timeout
1922  *      We do not release the request slot, so we keep using the
1923  *      same XID for all retransmits.
1924  */
1925 static void
1926 call_timeout(struct rpc_task *task)
1927 {
1928         struct rpc_clnt *clnt = task->tk_client;
1929
1930         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1931                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1932                 goto retry;
1933         }
1934
1935         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1936         task->tk_timeouts++;
1937
1938         if (RPC_IS_SOFTCONN(task)) {
1939                 rpc_exit(task, -ETIMEDOUT);
1940                 return;
1941         }
1942         if (RPC_IS_SOFT(task)) {
1943                 if (clnt->cl_chatty) {
1944                         rcu_read_lock();
1945                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1946                                 clnt->cl_protname,
1947                                 rcu_dereference(clnt->cl_xprt)->servername);
1948                         rcu_read_unlock();
1949                 }
1950                 if (task->tk_flags & RPC_TASK_TIMEOUT)
1951                         rpc_exit(task, -ETIMEDOUT);
1952                 else
1953                         rpc_exit(task, -EIO);
1954                 return;
1955         }
1956
1957         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1958                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1959                 if (clnt->cl_chatty) {
1960                         rcu_read_lock();
1961                         printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1962                         clnt->cl_protname,
1963                         rcu_dereference(clnt->cl_xprt)->servername);
1964                         rcu_read_unlock();
1965                 }
1966         }
1967         rpc_force_rebind(clnt);
1968         /*
1969          * Did our request time out due to an RPCSEC_GSS out-of-sequence
1970          * event? RFC2203 requires the server to drop all such requests.
1971          */
1972         rpcauth_invalcred(task);
1973
1974 retry:
1975         clnt->cl_stats->rpcretrans++;
1976         task->tk_action = call_bind;
1977         task->tk_status = 0;
1978 }
1979
1980 /*
1981  * 7.   Decode the RPC reply
1982  */
1983 static void
1984 call_decode(struct rpc_task *task)
1985 {
1986         struct rpc_clnt *clnt = task->tk_client;
1987         struct rpc_rqst *req = task->tk_rqstp;
1988         kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
1989         __be32          *p;
1990
1991         dprint_status(task);
1992
1993         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1994                 if (clnt->cl_chatty) {
1995                         rcu_read_lock();
1996                         printk(KERN_NOTICE "%s: server %s OK\n",
1997                                 clnt->cl_protname,
1998                                 rcu_dereference(clnt->cl_xprt)->servername);
1999                         rcu_read_unlock();
2000                 }
2001                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2002         }
2003
2004         /*
2005          * Ensure that we see all writes made by xprt_complete_rqst()
2006          * before it changed req->rq_reply_bytes_recvd.
2007          */
2008         smp_rmb();
2009         req->rq_rcv_buf.len = req->rq_private_buf.len;
2010
2011         /* Check that the softirq receive buffer is valid */
2012         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2013                                 sizeof(req->rq_rcv_buf)) != 0);
2014
2015         if (req->rq_rcv_buf.len < 12) {
2016                 if (!RPC_IS_SOFT(task)) {
2017                         task->tk_action = call_bind;
2018                         clnt->cl_stats->rpcretrans++;
2019                         goto out_retry;
2020                 }
2021                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2022                                 clnt->cl_protname, task->tk_status);
2023                 task->tk_action = call_timeout;
2024                 goto out_retry;
2025         }
2026
2027         p = rpc_verify_header(task);
2028         if (IS_ERR(p)) {
2029                 if (p == ERR_PTR(-EAGAIN))
2030                         goto out_retry;
2031                 return;
2032         }
2033
2034         task->tk_action = rpc_exit_task;
2035
2036         if (decode) {
2037                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2038                                                       task->tk_msg.rpc_resp);
2039         }
2040         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2041                         task->tk_status);
2042         return;
2043 out_retry:
2044         task->tk_status = 0;
2045         /* Note: rpc_verify_header() may have freed the RPC slot */
2046         if (task->tk_rqstp == req) {
2047                 req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2048                 if (task->tk_client->cl_discrtry)
2049                         xprt_conditional_disconnect(req->rq_xprt,
2050                                         req->rq_connect_cookie);
2051         }
2052 }
2053
2054 static __be32 *
2055 rpc_encode_header(struct rpc_task *task)
2056 {
2057         struct rpc_clnt *clnt = task->tk_client;
2058         struct rpc_rqst *req = task->tk_rqstp;
2059         __be32          *p = req->rq_svec[0].iov_base;
2060
2061         /* FIXME: check buffer size? */
2062
2063         p = xprt_skip_transport_header(req->rq_xprt, p);
2064         *p++ = req->rq_xid;             /* XID */
2065         *p++ = htonl(RPC_CALL);         /* CALL */
2066         *p++ = htonl(RPC_VERSION);      /* RPC version */
2067         *p++ = htonl(clnt->cl_prog);    /* program number */
2068         *p++ = htonl(clnt->cl_vers);    /* program version */
2069         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
2070         p = rpcauth_marshcred(task, p);
2071         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2072         return p;
2073 }
2074
2075 static __be32 *
2076 rpc_verify_header(struct rpc_task *task)
2077 {
2078         struct rpc_clnt *clnt = task->tk_client;
2079         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2080         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2081         __be32  *p = iov->iov_base;
2082         u32 n;
2083         int error = -EACCES;
2084
2085         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2086                 /* RFC-1014 says that the representation of XDR data must be a
2087                  * multiple of four bytes
2088                  * - if it isn't pointer subtraction in the NFS client may give
2089                  *   undefined results
2090                  */
2091                 dprintk("RPC: %5u %s: XDR representation not a multiple of"
2092                        " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2093                        task->tk_rqstp->rq_rcv_buf.len);
2094                 goto out_eio;
2095         }
2096         if ((len -= 3) < 0)
2097                 goto out_overflow;
2098
2099         p += 1; /* skip XID */
2100         if ((n = ntohl(*p++)) != RPC_REPLY) {
2101                 dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2102                         task->tk_pid, __func__, n);
2103                 goto out_garbage;
2104         }
2105
2106         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2107                 if (--len < 0)
2108                         goto out_overflow;
2109                 switch ((n = ntohl(*p++))) {
2110                 case RPC_AUTH_ERROR:
2111                         break;
2112                 case RPC_MISMATCH:
2113                         dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2114                                 task->tk_pid, __func__);
2115                         error = -EPROTONOSUPPORT;
2116                         goto out_err;
2117                 default:
2118                         dprintk("RPC: %5u %s: RPC call rejected, "
2119                                 "unknown error: %x\n",
2120                                 task->tk_pid, __func__, n);
2121                         goto out_eio;
2122                 }
2123                 if (--len < 0)
2124                         goto out_overflow;
2125                 switch ((n = ntohl(*p++))) {
2126                 case RPC_AUTH_REJECTEDCRED:
2127                 case RPC_AUTH_REJECTEDVERF:
2128                 case RPCSEC_GSS_CREDPROBLEM:
2129                 case RPCSEC_GSS_CTXPROBLEM:
2130                         if (!task->tk_cred_retry)
2131                                 break;
2132                         task->tk_cred_retry--;
2133                         dprintk("RPC: %5u %s: retry stale creds\n",
2134                                         task->tk_pid, __func__);
2135                         rpcauth_invalcred(task);
2136                         /* Ensure we obtain a new XID! */
2137                         xprt_release(task);
2138                         task->tk_action = call_reserve;
2139                         goto out_retry;
2140                 case RPC_AUTH_BADCRED:
2141                 case RPC_AUTH_BADVERF:
2142                         /* possibly garbled cred/verf? */
2143                         if (!task->tk_garb_retry)
2144                                 break;
2145                         task->tk_garb_retry--;
2146                         dprintk("RPC: %5u %s: retry garbled creds\n",
2147                                         task->tk_pid, __func__);
2148                         task->tk_action = call_bind;
2149                         goto out_retry;
2150                 case RPC_AUTH_TOOWEAK:
2151                         rcu_read_lock();
2152                         printk(KERN_NOTICE "RPC: server %s requires stronger "
2153                                "authentication.\n",
2154                                rcu_dereference(clnt->cl_xprt)->servername);
2155                         rcu_read_unlock();
2156                         break;
2157                 default:
2158                         dprintk("RPC: %5u %s: unknown auth error: %x\n",
2159                                         task->tk_pid, __func__, n);
2160                         error = -EIO;
2161                 }
2162                 dprintk("RPC: %5u %s: call rejected %d\n",
2163                                 task->tk_pid, __func__, n);
2164                 goto out_err;
2165         }
2166         if (!(p = rpcauth_checkverf(task, p))) {
2167                 dprintk("RPC: %5u %s: auth check failed\n",
2168                                 task->tk_pid, __func__);
2169                 goto out_garbage;               /* bad verifier, retry */
2170         }
2171         len = p - (__be32 *)iov->iov_base - 1;
2172         if (len < 0)
2173                 goto out_overflow;
2174         switch ((n = ntohl(*p++))) {
2175         case RPC_SUCCESS:
2176                 return p;
2177         case RPC_PROG_UNAVAIL:
2178                 dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2179                                 "by server %s\n", task->tk_pid, __func__,
2180                                 (unsigned int)clnt->cl_prog,
2181                                 rcu_dereference(clnt->cl_xprt)->servername);
2182                 error = -EPFNOSUPPORT;
2183                 goto out_err;
2184         case RPC_PROG_MISMATCH:
2185                 dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2186                                 "by server %s\n", task->tk_pid, __func__,
2187                                 (unsigned int)clnt->cl_prog,
2188                                 (unsigned int)clnt->cl_vers,
2189                                 rcu_dereference(clnt->cl_xprt)->servername);
2190                 error = -EPROTONOSUPPORT;
2191                 goto out_err;
2192         case RPC_PROC_UNAVAIL:
2193                 dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2194                                 "version %u on server %s\n",
2195                                 task->tk_pid, __func__,
2196                                 rpc_proc_name(task),
2197                                 clnt->cl_prog, clnt->cl_vers,
2198                                 rcu_dereference(clnt->cl_xprt)->servername);
2199                 error = -EOPNOTSUPP;
2200                 goto out_err;
2201         case RPC_GARBAGE_ARGS:
2202                 dprintk("RPC: %5u %s: server saw garbage\n",
2203                                 task->tk_pid, __func__);
2204                 break;                  /* retry */
2205         default:
2206                 dprintk("RPC: %5u %s: server accept status: %x\n",
2207                                 task->tk_pid, __func__, n);
2208                 /* Also retry */
2209         }
2210
2211 out_garbage:
2212         clnt->cl_stats->rpcgarbage++;
2213         if (task->tk_garb_retry) {
2214                 task->tk_garb_retry--;
2215                 dprintk("RPC: %5u %s: retrying\n",
2216                                 task->tk_pid, __func__);
2217                 task->tk_action = call_bind;
2218 out_retry:
2219                 return ERR_PTR(-EAGAIN);
2220         }
2221 out_eio:
2222         error = -EIO;
2223 out_err:
2224         rpc_exit(task, error);
2225         dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2226                         __func__, error);
2227         return ERR_PTR(error);
2228 out_overflow:
2229         dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2230                         __func__);
2231         goto out_garbage;
2232 }
2233
2234 static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2235 {
2236 }
2237
2238 static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2239 {
2240         return 0;
2241 }
2242
2243 static struct rpc_procinfo rpcproc_null = {
2244         .p_encode = rpcproc_encode_null,
2245         .p_decode = rpcproc_decode_null,
2246 };
2247
2248 static int rpc_ping(struct rpc_clnt *clnt)
2249 {
2250         struct rpc_message msg = {
2251                 .rpc_proc = &rpcproc_null,
2252         };
2253         int err;
2254         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2255         err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2256         put_rpccred(msg.rpc_cred);
2257         return err;
2258 }
2259
2260 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2261 {
2262         struct rpc_message msg = {
2263                 .rpc_proc = &rpcproc_null,
2264                 .rpc_cred = cred,
2265         };
2266         struct rpc_task_setup task_setup_data = {
2267                 .rpc_client = clnt,
2268                 .rpc_message = &msg,
2269                 .callback_ops = &rpc_default_ops,
2270                 .flags = flags,
2271         };
2272         return rpc_run_task(&task_setup_data);
2273 }
2274 EXPORT_SYMBOL_GPL(rpc_call_null);
2275
2276 #ifdef RPC_DEBUG
2277 static void rpc_show_header(void)
2278 {
2279         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2280                 "-timeout ---ops--\n");
2281 }
2282
2283 static void rpc_show_task(const struct rpc_clnt *clnt,
2284                           const struct rpc_task *task)
2285 {
2286         const char *rpc_waitq = "none";
2287
2288         if (RPC_IS_QUEUED(task))
2289                 rpc_waitq = rpc_qname(task->tk_waitqueue);
2290
2291         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2292                 task->tk_pid, task->tk_flags, task->tk_status,
2293                 clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2294                 clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
2295                 task->tk_action, rpc_waitq);
2296 }
2297
2298 void rpc_show_tasks(struct net *net)
2299 {
2300         struct rpc_clnt *clnt;
2301         struct rpc_task *task;
2302         int header = 0;
2303         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2304
2305         spin_lock(&sn->rpc_client_lock);
2306         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2307                 spin_lock(&clnt->cl_lock);
2308                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2309                         if (!header) {
2310                                 rpc_show_header();
2311                                 header++;
2312                         }
2313                         rpc_show_task(clnt, task);
2314                 }
2315                 spin_unlock(&clnt->cl_lock);
2316         }
2317         spin_unlock(&sn->rpc_client_lock);
2318 }
2319 #endif