call.cc 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769
  1. /*
  2. *
  3. * Copyright 2015, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. #include <memory>
  34. #include <vector>
  35. #include <map>
  36. #include <node.h>
  37. #include "grpc/support/log.h"
  38. #include "grpc/grpc.h"
  39. #include "grpc/grpc_security.h"
  40. #include "grpc/support/alloc.h"
  41. #include "grpc/support/time.h"
  42. #include "byte_buffer.h"
  43. #include "call.h"
  44. #include "channel.h"
  45. #include "completion_queue_async_worker.h"
  46. #include "call_credentials.h"
  47. #include "timeval.h"
  48. using std::unique_ptr;
  49. using std::shared_ptr;
  50. using std::vector;
  51. namespace grpc {
  52. namespace node {
  53. using Nan::Callback;
  54. using Nan::EscapableHandleScope;
  55. using Nan::HandleScope;
  56. using Nan::Maybe;
  57. using Nan::MaybeLocal;
  58. using Nan::ObjectWrap;
  59. using Nan::Persistent;
  60. using Nan::Utf8String;
  61. using v8::Array;
  62. using v8::Boolean;
  63. using v8::Exception;
  64. using v8::External;
  65. using v8::Function;
  66. using v8::FunctionTemplate;
  67. using v8::Integer;
  68. using v8::Local;
  69. using v8::Number;
  70. using v8::Object;
  71. using v8::ObjectTemplate;
  72. using v8::Uint32;
  73. using v8::String;
  74. using v8::Value;
  75. Callback *Call::constructor;
  76. Persistent<FunctionTemplate> Call::fun_tpl;
  77. /**
  78. * Helper function for throwing errors with a grpc_call_error value.
  79. * Modified from the answer by Gus Goose to
  80. * http://stackoverflow.com/questions/31794200.
  81. */
  82. Local<Value> nanErrorWithCode(const char *msg, grpc_call_error code) {
  83. EscapableHandleScope scope;
  84. Local<Object> err = Nan::Error(msg).As<Object>();
  85. Nan::Set(err, Nan::New("code").ToLocalChecked(), Nan::New<Uint32>(code));
  86. return scope.Escape(err);
  87. }
  88. bool EndsWith(const char *str, const char *substr) {
  89. return strcmp(str+strlen(str)-strlen(substr), substr) == 0;
  90. }
  91. bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array,
  92. shared_ptr<Resources> resources) {
  93. HandleScope scope;
  94. grpc_metadata_array_init(array);
  95. Local<Array> keys = Nan::GetOwnPropertyNames(metadata).ToLocalChecked();
  96. for (unsigned int i = 0; i < keys->Length(); i++) {
  97. Local<String> current_key = Nan::To<String>(
  98. Nan::Get(keys, i).ToLocalChecked()).ToLocalChecked();
  99. Local<Value> value_array = Nan::Get(metadata, current_key).ToLocalChecked();
  100. if (!value_array->IsArray()) {
  101. return false;
  102. }
  103. array->capacity += Local<Array>::Cast(value_array)->Length();
  104. }
  105. array->metadata = reinterpret_cast<grpc_metadata*>(
  106. gpr_malloc(array->capacity * sizeof(grpc_metadata)));
  107. for (unsigned int i = 0; i < keys->Length(); i++) {
  108. Local<String> current_key(keys->Get(i)->ToString());
  109. Utf8String *utf8_key = new Utf8String(current_key);
  110. resources->strings.push_back(unique_ptr<Utf8String>(utf8_key));
  111. Local<Array> values = Local<Array>::Cast(
  112. Nan::Get(metadata, current_key).ToLocalChecked());
  113. for (unsigned int j = 0; j < values->Length(); j++) {
  114. Local<Value> value = Nan::Get(values, j).ToLocalChecked();
  115. grpc_metadata *current = &array->metadata[array->count];
  116. current->key = **utf8_key;
  117. // Only allow binary headers for "-bin" keys
  118. if (EndsWith(current->key, "-bin")) {
  119. if (::node::Buffer::HasInstance(value)) {
  120. current->value = ::node::Buffer::Data(value);
  121. current->value_length = ::node::Buffer::Length(value);
  122. PersistentValue *handle = new PersistentValue(value);
  123. resources->handles.push_back(unique_ptr<PersistentValue>(handle));
  124. } else {
  125. return false;
  126. }
  127. } else {
  128. if (value->IsString()) {
  129. Local<String> string_value = Nan::To<String>(value).ToLocalChecked();
  130. Utf8String *utf8_value = new Utf8String(string_value);
  131. resources->strings.push_back(unique_ptr<Utf8String>(utf8_value));
  132. current->value = **utf8_value;
  133. current->value_length = string_value->Length();
  134. } else {
  135. return false;
  136. }
  137. }
  138. array->count += 1;
  139. }
  140. }
  141. return true;
  142. }
  143. Local<Value> ParseMetadata(const grpc_metadata_array *metadata_array) {
  144. EscapableHandleScope scope;
  145. grpc_metadata *metadata_elements = metadata_array->metadata;
  146. size_t length = metadata_array->count;
  147. std::map<const char*, size_t> size_map;
  148. std::map<const char*, size_t> index_map;
  149. for (unsigned int i = 0; i < length; i++) {
  150. const char *key = metadata_elements[i].key;
  151. if (size_map.count(key)) {
  152. size_map[key] += 1;
  153. } else {
  154. size_map[key] = 1;
  155. }
  156. index_map[key] = 0;
  157. }
  158. Local<Object> metadata_object = Nan::New<Object>();
  159. for (unsigned int i = 0; i < length; i++) {
  160. grpc_metadata* elem = &metadata_elements[i];
  161. Local<String> key_string = Nan::New(elem->key).ToLocalChecked();
  162. Local<Array> array;
  163. MaybeLocal<Value> maybe_array = Nan::Get(metadata_object, key_string);
  164. if (maybe_array.IsEmpty() || !maybe_array.ToLocalChecked()->IsArray()) {
  165. array = Nan::New<Array>(size_map[elem->key]);
  166. Nan::Set(metadata_object, key_string, array);
  167. } else {
  168. array = Local<Array>::Cast(maybe_array.ToLocalChecked());
  169. }
  170. if (EndsWith(elem->key, "-bin")) {
  171. Nan::Set(array, index_map[elem->key],
  172. MakeFastBuffer(
  173. Nan::CopyBuffer(elem->value,
  174. elem->value_length).ToLocalChecked()));
  175. } else {
  176. Nan::Set(array, index_map[elem->key],
  177. Nan::New(elem->value).ToLocalChecked());
  178. }
  179. index_map[elem->key] += 1;
  180. }
  181. return scope.Escape(metadata_object);
  182. }
  183. Local<Value> Op::GetOpType() const {
  184. EscapableHandleScope scope;
  185. return scope.Escape(Nan::New(GetTypeString()).ToLocalChecked());
  186. }
  187. Op::~Op() {
  188. }
  189. class SendMetadataOp : public Op {
  190. public:
  191. Local<Value> GetNodeValue() const {
  192. EscapableHandleScope scope;
  193. return scope.Escape(Nan::True());
  194. }
  195. bool ParseOp(Local<Value> value, grpc_op *out,
  196. shared_ptr<Resources> resources) {
  197. if (!value->IsObject()) {
  198. return false;
  199. }
  200. grpc_metadata_array array;
  201. MaybeLocal<Object> maybe_metadata = Nan::To<Object>(value);
  202. if (maybe_metadata.IsEmpty()) {
  203. return false;
  204. }
  205. if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(),
  206. &array, resources)) {
  207. return false;
  208. }
  209. out->data.send_initial_metadata.count = array.count;
  210. out->data.send_initial_metadata.metadata = array.metadata;
  211. return true;
  212. }
  213. protected:
  214. std::string GetTypeString() const {
  215. return "send_metadata";
  216. }
  217. };
  218. class SendMessageOp : public Op {
  219. public:
  220. Local<Value> GetNodeValue() const {
  221. EscapableHandleScope scope;
  222. return scope.Escape(Nan::True());
  223. }
  224. bool ParseOp(Local<Value> value, grpc_op *out,
  225. shared_ptr<Resources> resources) {
  226. if (!::node::Buffer::HasInstance(value)) {
  227. return false;
  228. }
  229. Local<Object> object_value = Nan::To<Object>(value).ToLocalChecked();
  230. MaybeLocal<Value> maybe_flag_value = Nan::Get(
  231. object_value, Nan::New("grpcWriteFlags").ToLocalChecked());
  232. if (!maybe_flag_value.IsEmpty()) {
  233. Local<Value> flag_value = maybe_flag_value.ToLocalChecked();
  234. if (flag_value->IsUint32()) {
  235. Maybe<uint32_t> maybe_flag = Nan::To<uint32_t>(flag_value);
  236. out->flags = maybe_flag.FromMaybe(0) & GRPC_WRITE_USED_MASK;
  237. }
  238. }
  239. out->data.send_message = BufferToByteBuffer(value);
  240. PersistentValue *handle = new PersistentValue(value);
  241. resources->handles.push_back(unique_ptr<PersistentValue>(handle));
  242. return true;
  243. }
  244. protected:
  245. std::string GetTypeString() const {
  246. return "send_message";
  247. }
  248. };
  249. class SendClientCloseOp : public Op {
  250. public:
  251. Local<Value> GetNodeValue() const {
  252. EscapableHandleScope scope;
  253. return scope.Escape(Nan::True());
  254. }
  255. bool ParseOp(Local<Value> value, grpc_op *out,
  256. shared_ptr<Resources> resources) {
  257. return true;
  258. }
  259. protected:
  260. std::string GetTypeString() const {
  261. return "client_close";
  262. }
  263. };
  264. class SendServerStatusOp : public Op {
  265. public:
  266. Local<Value> GetNodeValue() const {
  267. EscapableHandleScope scope;
  268. return scope.Escape(Nan::True());
  269. }
  270. bool ParseOp(Local<Value> value, grpc_op *out,
  271. shared_ptr<Resources> resources) {
  272. if (!value->IsObject()) {
  273. return false;
  274. }
  275. Local<Object> server_status = Nan::To<Object>(value).ToLocalChecked();
  276. MaybeLocal<Value> maybe_metadata = Nan::Get(
  277. server_status, Nan::New("metadata").ToLocalChecked());
  278. if (maybe_metadata.IsEmpty()) {
  279. return false;
  280. }
  281. if (!maybe_metadata.ToLocalChecked()->IsObject()) {
  282. return false;
  283. }
  284. Local<Object> metadata = Nan::To<Object>(
  285. maybe_metadata.ToLocalChecked()).ToLocalChecked();
  286. MaybeLocal<Value> maybe_code = Nan::Get(server_status,
  287. Nan::New("code").ToLocalChecked());
  288. if (maybe_code.IsEmpty()) {
  289. return false;
  290. }
  291. if (!maybe_code.ToLocalChecked()->IsUint32()) {
  292. return false;
  293. }
  294. uint32_t code = Nan::To<uint32_t>(maybe_code.ToLocalChecked()).FromJust();
  295. MaybeLocal<Value> maybe_details = Nan::Get(
  296. server_status, Nan::New("details").ToLocalChecked());
  297. if (maybe_details.IsEmpty()) {
  298. return false;
  299. }
  300. if (!maybe_details.ToLocalChecked()->IsString()) {
  301. return false;
  302. }
  303. Local<String> details = Nan::To<String>(
  304. maybe_details.ToLocalChecked()).ToLocalChecked();
  305. grpc_metadata_array array;
  306. if (!CreateMetadataArray(metadata, &array, resources)) {
  307. return false;
  308. }
  309. out->data.send_status_from_server.trailing_metadata_count = array.count;
  310. out->data.send_status_from_server.trailing_metadata = array.metadata;
  311. out->data.send_status_from_server.status =
  312. static_cast<grpc_status_code>(code);
  313. Utf8String *str = new Utf8String(details);
  314. resources->strings.push_back(unique_ptr<Utf8String>(str));
  315. out->data.send_status_from_server.status_details = **str;
  316. return true;
  317. }
  318. protected:
  319. std::string GetTypeString() const {
  320. return "send_status";
  321. }
  322. };
  323. class GetMetadataOp : public Op {
  324. public:
  325. GetMetadataOp() {
  326. grpc_metadata_array_init(&recv_metadata);
  327. }
  328. ~GetMetadataOp() {
  329. grpc_metadata_array_destroy(&recv_metadata);
  330. }
  331. Local<Value> GetNodeValue() const {
  332. EscapableHandleScope scope;
  333. return scope.Escape(ParseMetadata(&recv_metadata));
  334. }
  335. bool ParseOp(Local<Value> value, grpc_op *out,
  336. shared_ptr<Resources> resources) {
  337. out->data.recv_initial_metadata = &recv_metadata;
  338. return true;
  339. }
  340. protected:
  341. std::string GetTypeString() const {
  342. return "metadata";
  343. }
  344. private:
  345. grpc_metadata_array recv_metadata;
  346. };
  347. class ReadMessageOp : public Op {
  348. public:
  349. ReadMessageOp() {
  350. recv_message = NULL;
  351. }
  352. ~ReadMessageOp() {
  353. if (recv_message != NULL) {
  354. grpc_byte_buffer_destroy(recv_message);
  355. }
  356. }
  357. Local<Value> GetNodeValue() const {
  358. EscapableHandleScope scope;
  359. return scope.Escape(ByteBufferToBuffer(recv_message));
  360. }
  361. bool ParseOp(Local<Value> value, grpc_op *out,
  362. shared_ptr<Resources> resources) {
  363. out->data.recv_message = &recv_message;
  364. return true;
  365. }
  366. protected:
  367. std::string GetTypeString() const {
  368. return "read";
  369. }
  370. private:
  371. grpc_byte_buffer *recv_message;
  372. };
  373. class ClientStatusOp : public Op {
  374. public:
  375. ClientStatusOp() {
  376. grpc_metadata_array_init(&metadata_array);
  377. status_details = NULL;
  378. details_capacity = 0;
  379. }
  380. ~ClientStatusOp() {
  381. grpc_metadata_array_destroy(&metadata_array);
  382. gpr_free(status_details);
  383. }
  384. bool ParseOp(Local<Value> value, grpc_op *out,
  385. shared_ptr<Resources> resources) {
  386. out->data.recv_status_on_client.trailing_metadata = &metadata_array;
  387. out->data.recv_status_on_client.status = &status;
  388. out->data.recv_status_on_client.status_details = &status_details;
  389. out->data.recv_status_on_client.status_details_capacity = &details_capacity;
  390. return true;
  391. }
  392. Local<Value> GetNodeValue() const {
  393. EscapableHandleScope scope;
  394. Local<Object> status_obj = Nan::New<Object>();
  395. Nan::Set(status_obj, Nan::New("code").ToLocalChecked(),
  396. Nan::New<Number>(status));
  397. if (status_details != NULL) {
  398. Nan::Set(status_obj, Nan::New("details").ToLocalChecked(),
  399. Nan::New(status_details).ToLocalChecked());
  400. }
  401. Nan::Set(status_obj, Nan::New("metadata").ToLocalChecked(),
  402. ParseMetadata(&metadata_array));
  403. return scope.Escape(status_obj);
  404. }
  405. protected:
  406. std::string GetTypeString() const {
  407. return "status";
  408. }
  409. private:
  410. grpc_metadata_array metadata_array;
  411. grpc_status_code status;
  412. char *status_details;
  413. size_t details_capacity;
  414. };
  415. class ServerCloseResponseOp : public Op {
  416. public:
  417. Local<Value> GetNodeValue() const {
  418. EscapableHandleScope scope;
  419. return scope.Escape(Nan::New<Boolean>(cancelled));
  420. }
  421. bool ParseOp(Local<Value> value, grpc_op *out,
  422. shared_ptr<Resources> resources) {
  423. out->data.recv_close_on_server.cancelled = &cancelled;
  424. return true;
  425. }
  426. protected:
  427. std::string GetTypeString() const {
  428. return "cancelled";
  429. }
  430. private:
  431. int cancelled;
  432. };
  433. tag::tag(Callback *callback, OpVec *ops,
  434. shared_ptr<Resources> resources) :
  435. callback(callback), ops(ops), resources(resources){
  436. }
  437. tag::~tag() {
  438. delete callback;
  439. delete ops;
  440. }
  441. Local<Value> GetTagNodeValue(void *tag) {
  442. EscapableHandleScope scope;
  443. struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
  444. Local<Object> tag_obj = Nan::New<Object>();
  445. for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
  446. it != tag_struct->ops->end(); ++it) {
  447. Op *op_ptr = it->get();
  448. Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue());
  449. }
  450. return scope.Escape(tag_obj);
  451. }
  452. Callback *GetTagCallback(void *tag) {
  453. struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
  454. return tag_struct->callback;
  455. }
  456. void DestroyTag(void *tag) {
  457. struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
  458. delete tag_struct;
  459. }
  460. Call::Call(grpc_call *call) : wrapped_call(call) {
  461. }
  462. Call::~Call() {
  463. grpc_call_destroy(wrapped_call);
  464. }
  465. void Call::Init(Local<Object> exports) {
  466. HandleScope scope;
  467. Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
  468. tpl->SetClassName(Nan::New("Call").ToLocalChecked());
  469. tpl->InstanceTemplate()->SetInternalFieldCount(1);
  470. Nan::SetPrototypeMethod(tpl, "startBatch", StartBatch);
  471. Nan::SetPrototypeMethod(tpl, "cancel", Cancel);
  472. Nan::SetPrototypeMethod(tpl, "cancelWithStatus", CancelWithStatus);
  473. Nan::SetPrototypeMethod(tpl, "getPeer", GetPeer);
  474. Nan::SetPrototypeMethod(tpl, "setCredentials", SetCredentials);
  475. fun_tpl.Reset(tpl);
  476. Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked();
  477. Nan::Set(exports, Nan::New("Call").ToLocalChecked(), ctr);
  478. constructor = new Callback(ctr);
  479. }
  480. bool Call::HasInstance(Local<Value> val) {
  481. HandleScope scope;
  482. return Nan::New(fun_tpl)->HasInstance(val);
  483. }
  484. Local<Value> Call::WrapStruct(grpc_call *call) {
  485. EscapableHandleScope scope;
  486. if (call == NULL) {
  487. return scope.Escape(Nan::Null());
  488. }
  489. const int argc = 1;
  490. Local<Value> argv[argc] = {Nan::New<External>(
  491. reinterpret_cast<void *>(call))};
  492. MaybeLocal<Object> maybe_instance = Nan::NewInstance(
  493. constructor->GetFunction(), argc, argv);
  494. if (maybe_instance.IsEmpty()) {
  495. return scope.Escape(Nan::Null());
  496. } else {
  497. return scope.Escape(maybe_instance.ToLocalChecked());
  498. }
  499. }
  500. NAN_METHOD(Call::New) {
  501. if (info.IsConstructCall()) {
  502. Call *call;
  503. if (info[0]->IsExternal()) {
  504. Local<External> ext = info[0].As<External>();
  505. // This option is used for wrapping an existing call
  506. grpc_call *call_value =
  507. reinterpret_cast<grpc_call *>(ext->Value());
  508. call = new Call(call_value);
  509. } else {
  510. if (!Channel::HasInstance(info[0])) {
  511. return Nan::ThrowTypeError("Call's first argument must be a Channel");
  512. }
  513. if (!info[1]->IsString()) {
  514. return Nan::ThrowTypeError("Call's second argument must be a string");
  515. }
  516. if (!(info[2]->IsNumber() || info[2]->IsDate())) {
  517. return Nan::ThrowTypeError(
  518. "Call's third argument must be a date or a number");
  519. }
  520. // These arguments are at the end because they are optional
  521. grpc_call *parent_call = NULL;
  522. if (Call::HasInstance(info[4])) {
  523. Call *parent_obj = ObjectWrap::Unwrap<Call>(
  524. Nan::To<Object>(info[4]).ToLocalChecked());
  525. parent_call = parent_obj->wrapped_call;
  526. } else if (!(info[4]->IsUndefined() || info[4]->IsNull())) {
  527. return Nan::ThrowTypeError(
  528. "Call's fifth argument must be another call, if provided");
  529. }
  530. gpr_uint32 propagate_flags = GRPC_PROPAGATE_DEFAULTS;
  531. if (info[5]->IsUint32()) {
  532. propagate_flags = Nan::To<uint32_t>(info[5]).FromJust();
  533. } else if (!(info[5]->IsUndefined() || info[5]->IsNull())) {
  534. return Nan::ThrowTypeError(
  535. "Call's sixth argument must be propagate flags, if provided");
  536. }
  537. Local<Object> channel_object = Nan::To<Object>(info[0]).ToLocalChecked();
  538. Channel *channel = ObjectWrap::Unwrap<Channel>(channel_object);
  539. if (channel->GetWrappedChannel() == NULL) {
  540. return Nan::ThrowError("Call cannot be created from a closed channel");
  541. }
  542. Utf8String method(info[1]);
  543. double deadline = Nan::To<double>(info[2]).FromJust();
  544. grpc_channel *wrapped_channel = channel->GetWrappedChannel();
  545. grpc_call *wrapped_call;
  546. if (info[3]->IsString()) {
  547. Utf8String host_override(info[3]);
  548. wrapped_call = grpc_channel_create_call(
  549. wrapped_channel, parent_call, propagate_flags,
  550. CompletionQueueAsyncWorker::GetQueue(), *method,
  551. *host_override, MillisecondsToTimespec(deadline), NULL);
  552. } else if (info[3]->IsUndefined() || info[3]->IsNull()) {
  553. wrapped_call = grpc_channel_create_call(
  554. wrapped_channel, parent_call, propagate_flags,
  555. CompletionQueueAsyncWorker::GetQueue(), *method,
  556. NULL, MillisecondsToTimespec(deadline), NULL);
  557. } else {
  558. return Nan::ThrowTypeError("Call's fourth argument must be a string");
  559. }
  560. call = new Call(wrapped_call);
  561. info.This()->SetHiddenValue(Nan::New("channel_").ToLocalChecked(),
  562. channel_object);
  563. }
  564. call->Wrap(info.This());
  565. info.GetReturnValue().Set(info.This());
  566. } else {
  567. const int argc = 4;
  568. Local<Value> argv[argc] = {info[0], info[1], info[2], info[3]};
  569. MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance(
  570. argc, argv);
  571. if (maybe_instance.IsEmpty()) {
  572. // There's probably a pending exception
  573. return;
  574. } else {
  575. info.GetReturnValue().Set(maybe_instance.ToLocalChecked());
  576. }
  577. }
  578. }
  579. NAN_METHOD(Call::StartBatch) {
  580. if (!Call::HasInstance(info.This())) {
  581. return Nan::ThrowTypeError("startBatch can only be called on Call objects");
  582. }
  583. if (!info[0]->IsObject()) {
  584. return Nan::ThrowError("startBatch's first argument must be an object");
  585. }
  586. if (!info[1]->IsFunction()) {
  587. return Nan::ThrowError("startBatch's second argument must be a callback");
  588. }
  589. Local<Function> callback_func = info[1].As<Function>();
  590. Call *call = ObjectWrap::Unwrap<Call>(info.This());
  591. shared_ptr<Resources> resources(new Resources);
  592. Local<Object> obj = Nan::To<Object>(info[0]).ToLocalChecked();
  593. Local<Array> keys = Nan::GetOwnPropertyNames(obj).ToLocalChecked();
  594. size_t nops = keys->Length();
  595. vector<grpc_op> ops(nops);
  596. unique_ptr<OpVec> op_vector(new OpVec());
  597. for (unsigned int i = 0; i < nops; i++) {
  598. unique_ptr<Op> op;
  599. MaybeLocal<Value> maybe_key = Nan::Get(keys, i);
  600. if (maybe_key.IsEmpty() || (!maybe_key.ToLocalChecked()->IsUint32())) {
  601. return Nan::ThrowError(
  602. "startBatch's first argument's keys must be integers");
  603. }
  604. uint32_t type = Nan::To<uint32_t>(maybe_key.ToLocalChecked()).FromJust();
  605. ops[i].op = static_cast<grpc_op_type>(type);
  606. ops[i].flags = 0;
  607. ops[i].reserved = NULL;
  608. switch (type) {
  609. case GRPC_OP_SEND_INITIAL_METADATA:
  610. op.reset(new SendMetadataOp());
  611. break;
  612. case GRPC_OP_SEND_MESSAGE:
  613. op.reset(new SendMessageOp());
  614. break;
  615. case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
  616. op.reset(new SendClientCloseOp());
  617. break;
  618. case GRPC_OP_SEND_STATUS_FROM_SERVER:
  619. op.reset(new SendServerStatusOp());
  620. break;
  621. case GRPC_OP_RECV_INITIAL_METADATA:
  622. op.reset(new GetMetadataOp());
  623. break;
  624. case GRPC_OP_RECV_MESSAGE:
  625. op.reset(new ReadMessageOp());
  626. break;
  627. case GRPC_OP_RECV_STATUS_ON_CLIENT:
  628. op.reset(new ClientStatusOp());
  629. break;
  630. case GRPC_OP_RECV_CLOSE_ON_SERVER:
  631. op.reset(new ServerCloseResponseOp());
  632. break;
  633. default:
  634. return Nan::ThrowError("Argument object had an unrecognized key");
  635. }
  636. if (!op->ParseOp(obj->Get(type), &ops[i], resources)) {
  637. return Nan::ThrowTypeError("Incorrectly typed arguments to startBatch");
  638. }
  639. op_vector->push_back(std::move(op));
  640. }
  641. Callback *callback = new Callback(callback_func);
  642. grpc_call_error error = grpc_call_start_batch(
  643. call->wrapped_call, &ops[0], nops, new struct tag(
  644. callback, op_vector.release(), resources), NULL);
  645. if (error != GRPC_CALL_OK) {
  646. return Nan::ThrowError(nanErrorWithCode("startBatch failed", error));
  647. }
  648. CompletionQueueAsyncWorker::Next();
  649. }
  650. NAN_METHOD(Call::Cancel) {
  651. if (!Call::HasInstance(info.This())) {
  652. return Nan::ThrowTypeError("cancel can only be called on Call objects");
  653. }
  654. Call *call = ObjectWrap::Unwrap<Call>(info.This());
  655. grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL);
  656. if (error != GRPC_CALL_OK) {
  657. return Nan::ThrowError(nanErrorWithCode("cancel failed", error));
  658. }
  659. }
  660. NAN_METHOD(Call::CancelWithStatus) {
  661. Nan::HandleScope scope;
  662. if (!HasInstance(info.This())) {
  663. return Nan::ThrowTypeError("cancel can only be called on Call objects");
  664. }
  665. if (!info[0]->IsUint32()) {
  666. return Nan::ThrowTypeError(
  667. "cancelWithStatus's first argument must be a status code");
  668. }
  669. if (!info[1]->IsString()) {
  670. return Nan::ThrowTypeError(
  671. "cancelWithStatus's second argument must be a string");
  672. }
  673. Call *call = ObjectWrap::Unwrap<Call>(info.This());
  674. grpc_status_code code = static_cast<grpc_status_code>(
  675. Nan::To<uint32_t>(info[0]).FromJust());
  676. if (code == GRPC_STATUS_OK) {
  677. return Nan::ThrowRangeError(
  678. "cancelWithStatus cannot be called with OK status");
  679. }
  680. Utf8String details(info[1]);
  681. grpc_call_cancel_with_status(call->wrapped_call, code, *details, NULL);
  682. }
  683. NAN_METHOD(Call::GetPeer) {
  684. Nan::HandleScope scope;
  685. if (!HasInstance(info.This())) {
  686. return Nan::ThrowTypeError("getPeer can only be called on Call objects");
  687. }
  688. Call *call = ObjectWrap::Unwrap<Call>(info.This());
  689. char *peer = grpc_call_get_peer(call->wrapped_call);
  690. Local<Value> peer_value = Nan::New(peer).ToLocalChecked();
  691. gpr_free(peer);
  692. info.GetReturnValue().Set(peer_value);
  693. }
  694. NAN_METHOD(Call::SetCredentials) {
  695. Nan::HandleScope scope;
  696. if (!HasInstance(info.This())) {
  697. return Nan::ThrowTypeError(
  698. "setCredentials can only be called on Call objects");
  699. }
  700. if (!CallCredentials::HasInstance(info[0])) {
  701. return Nan::ThrowTypeError(
  702. "setCredentials' first argument must be a CallCredentials");
  703. }
  704. Call *call = ObjectWrap::Unwrap<Call>(info.This());
  705. CallCredentials *creds_object = ObjectWrap::Unwrap<CallCredentials>(
  706. Nan::To<Object>(info[0]).ToLocalChecked());
  707. grpc_call_credentials *creds = creds_object->GetWrappedCredentials();
  708. grpc_call_error error = GRPC_CALL_ERROR;
  709. if (creds) {
  710. error = grpc_call_set_credentials(call->wrapped_call, creds);
  711. }
  712. info.GetReturnValue().Set(Nan::New<Uint32>(error));
  713. }
  714. } // namespace node
  715. } // namespace grpc