call.cc 25 KB


  1. /*
  2. *
  3. * Copyright 2015, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. #include <memory>
  34. #include <vector>
  35. #include <map>
  36. #include <node.h>
  37. #include "grpc/support/log.h"
  38. #include "grpc/grpc.h"
  39. #include "grpc/grpc_security.h"
  40. #include "grpc/support/alloc.h"
  41. #include "grpc/support/time.h"
  42. #include "byte_buffer.h"
  43. #include "call.h"
  44. #include "channel.h"
  45. #include "completion_queue.h"
  46. #include "completion_queue_async_worker.h"
  47. #include "call_credentials.h"
  48. #include "slice.h"
  49. #include "timeval.h"
  50. using std::unique_ptr;
  51. using std::shared_ptr;
  52. using std::vector;
  53. namespace grpc {
  54. namespace node {
  55. using Nan::Callback;
  56. using Nan::EscapableHandleScope;
  57. using Nan::HandleScope;
  58. using Nan::Maybe;
  59. using Nan::MaybeLocal;
  60. using Nan::ObjectWrap;
  61. using Nan::Persistent;
  62. using Nan::Utf8String;
  63. using v8::Array;
  64. using v8::Boolean;
  65. using v8::Exception;
  66. using v8::External;
  67. using v8::Function;
  68. using v8::FunctionTemplate;
  69. using v8::Integer;
  70. using v8::Local;
  71. using v8::Number;
  72. using v8::Object;
  73. using v8::ObjectTemplate;
  74. using v8::Uint32;
  75. using v8::String;
  76. using v8::Value;
  77. Callback *Call::constructor;
  78. Persistent<FunctionTemplate> Call::fun_tpl;
  79. /**
  80. * Helper function for throwing errors with a grpc_call_error value.
  81. * Modified from the answer by Gus Goose to
  82. * http://stackoverflow.com/questions/31794200.
  83. */
  84. Local<Value> nanErrorWithCode(const char *msg, grpc_call_error code) {
  85. EscapableHandleScope scope;
  86. Local<Object> err = Nan::Error(msg).As<Object>();
  87. Nan::Set(err, Nan::New("code").ToLocalChecked(), Nan::New<Uint32>(code));
  88. return scope.Escape(err);
  89. }
  90. bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) {
  91. HandleScope scope;
  92. grpc_metadata_array_init(array);
  93. Local<Array> keys = Nan::GetOwnPropertyNames(metadata).ToLocalChecked();
  94. for (unsigned int i = 0; i < keys->Length(); i++) {
  95. Local<String> current_key = Nan::To<String>(
  96. Nan::Get(keys, i).ToLocalChecked()).ToLocalChecked();
  97. Local<Value> value_array = Nan::Get(metadata, current_key).ToLocalChecked();
  98. if (!value_array->IsArray()) {
  99. return false;
  100. }
  101. array->capacity += Local<Array>::Cast(value_array)->Length();
  102. }
  103. array->metadata = reinterpret_cast<grpc_metadata*>(
  104. gpr_malloc(array->capacity * sizeof(grpc_metadata)));
  105. for (unsigned int i = 0; i < keys->Length(); i++) {
  106. Local<String> current_key(Nan::To<String>(keys->Get(i)).ToLocalChecked());
  107. Local<Array> values = Local<Array>::Cast(
  108. Nan::Get(metadata, current_key).ToLocalChecked());
  109. grpc_slice key_slice = grpc_slice_intern(CreateSliceFromString(current_key));
  110. for (unsigned int j = 0; j < values->Length(); j++) {
  111. Local<Value> value = Nan::Get(values, j).ToLocalChecked();
  112. grpc_metadata *current = &array->metadata[array->count];
  113. current->key = key_slice;
  114. // Only allow binary headers for "-bin" keys
  115. if (grpc_is_binary_header(key_slice)) {
  116. if (::node::Buffer::HasInstance(value)) {
  117. current->value = CreateSliceFromBuffer(value);
  118. } else {
  119. return false;
  120. }
  121. } else {
  122. if (value->IsString()) {
  123. Local<String> string_value = Nan::To<String>(value).ToLocalChecked();
  124. current->value = CreateSliceFromString(string_value);
  125. } else {
  126. return false;
  127. }
  128. }
  129. array->count += 1;
  130. }
  131. }
  132. return true;
  133. }
  134. Local<Value> ParseMetadata(const grpc_metadata_array *metadata_array) {
  135. EscapableHandleScope scope;
  136. grpc_metadata *metadata_elements = metadata_array->metadata;
  137. size_t length = metadata_array->count;
  138. Local<Object> metadata_object = Nan::New<Object>();
  139. for (unsigned int i = 0; i < length; i++) {
  140. grpc_metadata* elem = &metadata_elements[i];
  141. // TODO(murgatroid99): Use zero-copy string construction instead
  142. Local<String> key_string = CopyStringFromSlice(elem->key);
  143. Local<Array> array;
  144. MaybeLocal<Value> maybe_array = Nan::Get(metadata_object, key_string);
  145. if (maybe_array.IsEmpty() || !maybe_array.ToLocalChecked()->IsArray()) {
  146. array = Nan::New<Array>(0);
  147. Nan::Set(metadata_object, key_string, array);
  148. } else {
  149. array = Local<Array>::Cast(maybe_array.ToLocalChecked());
  150. }
  151. if (grpc_is_binary_header(elem->key)) {
  152. Nan::Set(array, array->Length(), CreateBufferFromSlice(elem->value));
  153. } else {
  154. // TODO(murgatroid99): Use zero-copy string construction instead
  155. Nan::Set(array, array->Length(), CopyStringFromSlice(elem->value));
  156. }
  157. }
  158. return scope.Escape(metadata_object);
  159. }
  160. Local<Value> Op::GetOpType() const {
  161. EscapableHandleScope scope;
  162. return scope.Escape(Nan::New(GetTypeString()).ToLocalChecked());
  163. }
  164. Op::~Op() {
  165. }
  166. class SendMetadataOp : public Op {
  167. public:
  168. Local<Value> GetNodeValue() const {
  169. EscapableHandleScope scope;
  170. return scope.Escape(Nan::True());
  171. }
  172. bool ParseOp(Local<Value> value, grpc_op *out) {
  173. if (!value->IsObject()) {
  174. return false;
  175. }
  176. grpc_metadata_array array;
  177. MaybeLocal<Object> maybe_metadata = Nan::To<Object>(value);
  178. if (maybe_metadata.IsEmpty()) {
  179. return false;
  180. }
  181. if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(),
  182. &array)) {
  183. return false;
  184. }
  185. out->data.send_initial_metadata.count = array.count;
  186. out->data.send_initial_metadata.metadata = array.metadata;
  187. return true;
  188. }
  189. bool IsFinalOp() {
  190. return false;
  191. }
  192. protected:
  193. std::string GetTypeString() const {
  194. return "send_metadata";
  195. }
  196. };
  197. class SendMessageOp : public Op {
  198. public:
  199. SendMessageOp() {
  200. send_message = NULL;
  201. }
  202. ~SendMessageOp() {
  203. if (send_message != NULL) {
  204. grpc_byte_buffer_destroy(send_message);
  205. }
  206. }
  207. Local<Value> GetNodeValue() const {
  208. EscapableHandleScope scope;
  209. return scope.Escape(Nan::True());
  210. }
  211. bool ParseOp(Local<Value> value, grpc_op *out) {
  212. if (!::node::Buffer::HasInstance(value)) {
  213. return false;
  214. }
  215. Local<Object> object_value = Nan::To<Object>(value).ToLocalChecked();
  216. MaybeLocal<Value> maybe_flag_value = Nan::Get(
  217. object_value, Nan::New("grpcWriteFlags").ToLocalChecked());
  218. if (!maybe_flag_value.IsEmpty()) {
  219. Local<Value> flag_value = maybe_flag_value.ToLocalChecked();
  220. if (flag_value->IsUint32()) {
  221. Maybe<uint32_t> maybe_flag = Nan::To<uint32_t>(flag_value);
  222. out->flags = maybe_flag.FromMaybe(0) & GRPC_WRITE_USED_MASK;
  223. }
  224. }
  225. send_message = BufferToByteBuffer(value);
  226. out->data.send_message.send_message = send_message;
  227. return true;
  228. }
  229. bool IsFinalOp() {
  230. return false;
  231. }
  232. protected:
  233. std::string GetTypeString() const {
  234. return "send_message";
  235. }
  236. private:
  237. grpc_byte_buffer *send_message;
  238. };
  239. class SendClientCloseOp : public Op {
  240. public:
  241. Local<Value> GetNodeValue() const {
  242. EscapableHandleScope scope;
  243. return scope.Escape(Nan::True());
  244. }
  245. bool ParseOp(Local<Value> value, grpc_op *out) {
  246. return true;
  247. }
  248. bool IsFinalOp() {
  249. return false;
  250. }
  251. protected:
  252. std::string GetTypeString() const {
  253. return "client_close";
  254. }
  255. };
  256. class SendServerStatusOp : public Op {
  257. public:
  258. ~SendServerStatusOp() {
  259. grpc_slice_unref(details);
  260. }
  261. Local<Value> GetNodeValue() const {
  262. EscapableHandleScope scope;
  263. return scope.Escape(Nan::True());
  264. }
  265. bool ParseOp(Local<Value> value, grpc_op *out) {
  266. if (!value->IsObject()) {
  267. return false;
  268. }
  269. Local<Object> server_status = Nan::To<Object>(value).ToLocalChecked();
  270. MaybeLocal<Value> maybe_metadata = Nan::Get(
  271. server_status, Nan::New("metadata").ToLocalChecked());
  272. if (maybe_metadata.IsEmpty()) {
  273. return false;
  274. }
  275. if (!maybe_metadata.ToLocalChecked()->IsObject()) {
  276. return false;
  277. }
  278. Local<Object> metadata = Nan::To<Object>(
  279. maybe_metadata.ToLocalChecked()).ToLocalChecked();
  280. MaybeLocal<Value> maybe_code = Nan::Get(server_status,
  281. Nan::New("code").ToLocalChecked());
  282. if (maybe_code.IsEmpty()) {
  283. return false;
  284. }
  285. if (!maybe_code.ToLocalChecked()->IsUint32()) {
  286. return false;
  287. }
  288. uint32_t code = Nan::To<uint32_t>(maybe_code.ToLocalChecked()).FromJust();
  289. MaybeLocal<Value> maybe_details = Nan::Get(
  290. server_status, Nan::New("details").ToLocalChecked());
  291. if (maybe_details.IsEmpty()) {
  292. return false;
  293. }
  294. if (!maybe_details.ToLocalChecked()->IsString()) {
  295. return false;
  296. }
  297. Local<String> details = Nan::To<String>(
  298. maybe_details.ToLocalChecked()).ToLocalChecked();
  299. grpc_metadata_array array;
  300. if (!CreateMetadataArray(metadata, &array)) {
  301. return false;
  302. }
  303. out->data.send_status_from_server.trailing_metadata_count = array.count;
  304. out->data.send_status_from_server.trailing_metadata = array.metadata;
  305. out->data.send_status_from_server.status =
  306. static_cast<grpc_status_code>(code);
  307. this->details = CreateSliceFromString(details);
  308. out->data.send_status_from_server.status_details = &this->details;
  309. return true;
  310. }
  311. bool IsFinalOp() {
  312. return true;
  313. }
  314. protected:
  315. std::string GetTypeString() const {
  316. return "send_status";
  317. }
  318. private:
  319. grpc_slice details;
  320. };
  321. class GetMetadataOp : public Op {
  322. public:
  323. GetMetadataOp() {
  324. grpc_metadata_array_init(&recv_metadata);
  325. }
  326. ~GetMetadataOp() {
  327. grpc_metadata_array_destroy(&recv_metadata);
  328. }
  329. Local<Value> GetNodeValue() const {
  330. EscapableHandleScope scope;
  331. return scope.Escape(ParseMetadata(&recv_metadata));
  332. }
  333. bool ParseOp(Local<Value> value, grpc_op *out) {
  334. out->data.recv_initial_metadata.recv_initial_metadata = &recv_metadata;
  335. return true;
  336. }
  337. bool IsFinalOp() {
  338. return false;
  339. }
  340. protected:
  341. std::string GetTypeString() const {
  342. return "metadata";
  343. }
  344. private:
  345. grpc_metadata_array recv_metadata;
  346. };
  347. class ReadMessageOp : public Op {
  348. public:
  349. ReadMessageOp() {
  350. recv_message = NULL;
  351. }
  352. ~ReadMessageOp() {
  353. if (recv_message != NULL) {
  354. grpc_byte_buffer_destroy(recv_message);
  355. }
  356. }
  357. Local<Value> GetNodeValue() const {
  358. EscapableHandleScope scope;
  359. return scope.Escape(ByteBufferToBuffer(recv_message));
  360. }
  361. bool ParseOp(Local<Value> value, grpc_op *out) {
  362. out->data.recv_message.recv_message = &recv_message;
  363. return true;
  364. }
  365. bool IsFinalOp() {
  366. return false;
  367. }
  368. protected:
  369. std::string GetTypeString() const {
  370. return "read";
  371. }
  372. private:
  373. grpc_byte_buffer *recv_message;
  374. };
  375. class ClientStatusOp : public Op {
  376. public:
  377. ClientStatusOp() {
  378. grpc_metadata_array_init(&metadata_array);
  379. }
  380. ~ClientStatusOp() {
  381. grpc_metadata_array_destroy(&metadata_array);
  382. }
  383. bool ParseOp(Local<Value> value, grpc_op *out) {
  384. out->data.recv_status_on_client.trailing_metadata = &metadata_array;
  385. out->data.recv_status_on_client.status = &status;
  386. out->data.recv_status_on_client.status_details = &status_details;
  387. return true;
  388. }
  389. Local<Value> GetNodeValue() const {
  390. EscapableHandleScope scope;
  391. Local<Object> status_obj = Nan::New<Object>();
  392. Nan::Set(status_obj, Nan::New("code").ToLocalChecked(),
  393. Nan::New<Number>(status));
  394. Nan::Set(status_obj, Nan::New("details").ToLocalChecked(),
  395. CopyStringFromSlice(status_details));
  396. Nan::Set(status_obj, Nan::New("metadata").ToLocalChecked(),
  397. ParseMetadata(&metadata_array));
  398. return scope.Escape(status_obj);
  399. }
  400. bool IsFinalOp() {
  401. return true;
  402. }
  403. protected:
  404. std::string GetTypeString() const {
  405. return "status";
  406. }
  407. private:
  408. grpc_metadata_array metadata_array;
  409. grpc_status_code status;
  410. grpc_slice status_details;
  411. };
  412. class ServerCloseResponseOp : public Op {
  413. public:
  414. Local<Value> GetNodeValue() const {
  415. EscapableHandleScope scope;
  416. return scope.Escape(Nan::New<Boolean>(cancelled));
  417. }
  418. bool ParseOp(Local<Value> value, grpc_op *out) {
  419. out->data.recv_close_on_server.cancelled = &cancelled;
  420. return true;
  421. }
  422. bool IsFinalOp() {
  423. return false;
  424. }
  425. protected:
  426. std::string GetTypeString() const {
  427. return "cancelled";
  428. }
  429. private:
  430. int cancelled;
  431. };
  432. tag::tag(Callback *callback, OpVec *ops, Call *call) :
  433. callback(callback), ops(ops), call(call){
  434. }
  435. tag::~tag() {
  436. delete callback;
  437. delete ops;
  438. }
  439. Local<Value> GetTagNodeValue(void *tag) {
  440. EscapableHandleScope scope;
  441. struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
  442. Local<Object> tag_obj = Nan::New<Object>();
  443. for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
  444. it != tag_struct->ops->end(); ++it) {
  445. Op *op_ptr = it->get();
  446. Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue());
  447. }
  448. return scope.Escape(tag_obj);
  449. }
  450. Callback *GetTagCallback(void *tag) {
  451. struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
  452. return tag_struct->callback;
  453. }
  454. void CompleteTag(void *tag) {
  455. struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
  456. bool is_final_op = false;
  457. if (tag_struct->call == NULL) {
  458. return;
  459. }
  460. for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
  461. it != tag_struct->ops->end(); ++it) {
  462. Op *op_ptr = it->get();
  463. if (op_ptr->IsFinalOp()) {
  464. is_final_op = true;
  465. }
  466. }
  467. tag_struct->call->CompleteBatch(is_final_op);
  468. }
  469. void DestroyTag(void *tag) {
  470. struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
  471. delete tag_struct;
  472. }
  473. Call::Call(grpc_call *call) : wrapped_call(call),
  474. pending_batches(0),
  475. has_final_op_completed(false) {
  476. }
  477. Call::~Call() {
  478. if (wrapped_call != NULL) {
  479. grpc_call_destroy(wrapped_call);
  480. }
  481. }
  482. void Call::Init(Local<Object> exports) {
  483. HandleScope scope;
  484. Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
  485. tpl->SetClassName(Nan::New("Call").ToLocalChecked());
  486. tpl->InstanceTemplate()->SetInternalFieldCount(1);
  487. Nan::SetPrototypeMethod(tpl, "startBatch", StartBatch);
  488. Nan::SetPrototypeMethod(tpl, "cancel", Cancel);
  489. Nan::SetPrototypeMethod(tpl, "cancelWithStatus", CancelWithStatus);
  490. Nan::SetPrototypeMethod(tpl, "getPeer", GetPeer);
  491. Nan::SetPrototypeMethod(tpl, "setCredentials", SetCredentials);
  492. fun_tpl.Reset(tpl);
  493. Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked();
  494. Nan::Set(exports, Nan::New("Call").ToLocalChecked(), ctr);
  495. constructor = new Callback(ctr);
  496. }
  497. bool Call::HasInstance(Local<Value> val) {
  498. HandleScope scope;
  499. return Nan::New(fun_tpl)->HasInstance(val);
  500. }
  501. Local<Value> Call::WrapStruct(grpc_call *call) {
  502. EscapableHandleScope scope;
  503. if (call == NULL) {
  504. return scope.Escape(Nan::Null());
  505. }
  506. const int argc = 1;
  507. Local<Value> argv[argc] = {Nan::New<External>(
  508. reinterpret_cast<void *>(call))};
  509. MaybeLocal<Object> maybe_instance = Nan::NewInstance(
  510. constructor->GetFunction(), argc, argv);
  511. if (maybe_instance.IsEmpty()) {
  512. return scope.Escape(Nan::Null());
  513. } else {
  514. return scope.Escape(maybe_instance.ToLocalChecked());
  515. }
  516. }
  517. void Call::CompleteBatch(bool is_final_op) {
  518. if (is_final_op) {
  519. this->has_final_op_completed = true;
  520. }
  521. this->pending_batches--;
  522. if (this->has_final_op_completed && this->pending_batches == 0) {
  523. grpc_call_destroy(this->wrapped_call);
  524. this->wrapped_call = NULL;
  525. }
  526. }
  527. NAN_METHOD(Call::New) {
  528. if (info.IsConstructCall()) {
  529. Call *call;
  530. if (info[0]->IsExternal()) {
  531. Local<External> ext = info[0].As<External>();
  532. // This option is used for wrapping an existing call
  533. grpc_call *call_value =
  534. reinterpret_cast<grpc_call *>(ext->Value());
  535. call = new Call(call_value);
  536. } else {
  537. if (!Channel::HasInstance(info[0])) {
  538. return Nan::ThrowTypeError("Call's first argument must be a Channel");
  539. }
  540. if (!info[1]->IsString()) {
  541. return Nan::ThrowTypeError("Call's second argument must be a string");
  542. }
  543. if (!(info[2]->IsNumber() || info[2]->IsDate())) {
  544. return Nan::ThrowTypeError(
  545. "Call's third argument must be a date or a number");
  546. }
  547. // These arguments are at the end because they are optional
  548. grpc_call *parent_call = NULL;
  549. if (Call::HasInstance(info[4])) {
  550. Call *parent_obj = ObjectWrap::Unwrap<Call>(
  551. Nan::To<Object>(info[4]).ToLocalChecked());
  552. parent_call = parent_obj->wrapped_call;
  553. } else if (!(info[4]->IsUndefined() || info[4]->IsNull())) {
  554. return Nan::ThrowTypeError(
  555. "Call's fifth argument must be another call, if provided");
  556. }
  557. uint32_t propagate_flags = GRPC_PROPAGATE_DEFAULTS;
  558. if (info[5]->IsUint32()) {
  559. propagate_flags = Nan::To<uint32_t>(info[5]).FromJust();
  560. } else if (!(info[5]->IsUndefined() || info[5]->IsNull())) {
  561. return Nan::ThrowTypeError(
  562. "Call's sixth argument must be propagate flags, if provided");
  563. }
  564. Local<Object> channel_object = Nan::To<Object>(info[0]).ToLocalChecked();
  565. Channel *channel = ObjectWrap::Unwrap<Channel>(channel_object);
  566. if (channel->GetWrappedChannel() == NULL) {
  567. return Nan::ThrowError("Call cannot be created from a closed channel");
  568. }
  569. double deadline = Nan::To<double>(info[2]).FromJust();
  570. grpc_channel *wrapped_channel = channel->GetWrappedChannel();
  571. grpc_call *wrapped_call;
  572. if (info[3]->IsString()) {
  573. grpc_slice *host = new grpc_slice;
  574. *host = CreateSliceFromString(
  575. Nan::To<String>(info[3]).ToLocalChecked());
  576. wrapped_call = grpc_channel_create_call(
  577. wrapped_channel, parent_call, propagate_flags,
  578. GetCompletionQueue(), CreateSliceFromString(
  579. Nan::To<String>(info[1]).ToLocalChecked()),
  580. host, MillisecondsToTimespec(deadline), NULL);
  581. delete host;
  582. } else if (info[3]->IsUndefined() || info[3]->IsNull()) {
  583. wrapped_call = grpc_channel_create_call(
  584. wrapped_channel, parent_call, propagate_flags,
  585. GetCompletionQueue(), CreateSliceFromString(
  586. Nan::To<String>(info[1]).ToLocalChecked()),
  587. NULL, MillisecondsToTimespec(deadline), NULL);
  588. } else {
  589. return Nan::ThrowTypeError("Call's fourth argument must be a string");
  590. }
  591. call = new Call(wrapped_call);
  592. Nan::Set(info.This(), Nan::New("channel_").ToLocalChecked(),
  593. channel_object);
  594. }
  595. call->Wrap(info.This());
  596. info.GetReturnValue().Set(info.This());
  597. } else {
  598. const int argc = 4;
  599. Local<Value> argv[argc] = {info[0], info[1], info[2], info[3]};
  600. MaybeLocal<Object> maybe_instance = Nan::NewInstance(
  601. constructor->GetFunction(), argc, argv);
  602. if (maybe_instance.IsEmpty()) {
  603. // There's probably a pending exception
  604. return;
  605. } else {
  606. info.GetReturnValue().Set(maybe_instance.ToLocalChecked());
  607. }
  608. }
  609. }
  610. NAN_METHOD(Call::StartBatch) {
  611. if (!Call::HasInstance(info.This())) {
  612. return Nan::ThrowTypeError("startBatch can only be called on Call objects");
  613. }
  614. if (!info[0]->IsObject()) {
  615. return Nan::ThrowError("startBatch's first argument must be an object");
  616. }
  617. if (!info[1]->IsFunction()) {
  618. return Nan::ThrowError("startBatch's second argument must be a callback");
  619. }
  620. Local<Function> callback_func = info[1].As<Function>();
  621. Call *call = ObjectWrap::Unwrap<Call>(info.This());
  622. Local<Object> obj = Nan::To<Object>(info[0]).ToLocalChecked();
  623. Local<Array> keys = Nan::GetOwnPropertyNames(obj).ToLocalChecked();
  624. size_t nops = keys->Length();
  625. vector<grpc_op> ops(nops);
  626. unique_ptr<OpVec> op_vector(new OpVec());
  627. for (unsigned int i = 0; i < nops; i++) {
  628. unique_ptr<Op> op;
  629. MaybeLocal<Value> maybe_key = Nan::Get(keys, i);
  630. if (maybe_key.IsEmpty() || (!maybe_key.ToLocalChecked()->IsUint32())) {
  631. return Nan::ThrowError(
  632. "startBatch's first argument's keys must be integers");
  633. }
  634. uint32_t type = Nan::To<uint32_t>(maybe_key.ToLocalChecked()).FromJust();
  635. ops[i].op = static_cast<grpc_op_type>(type);
  636. ops[i].flags = 0;
  637. ops[i].reserved = NULL;
  638. switch (type) {
  639. case GRPC_OP_SEND_INITIAL_METADATA:
  640. op.reset(new SendMetadataOp());
  641. break;
  642. case GRPC_OP_SEND_MESSAGE:
  643. op.reset(new SendMessageOp());
  644. break;
  645. case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
  646. op.reset(new SendClientCloseOp());
  647. break;
  648. case GRPC_OP_SEND_STATUS_FROM_SERVER:
  649. op.reset(new SendServerStatusOp());
  650. break;
  651. case GRPC_OP_RECV_INITIAL_METADATA:
  652. op.reset(new GetMetadataOp());
  653. break;
  654. case GRPC_OP_RECV_MESSAGE:
  655. op.reset(new ReadMessageOp());
  656. break;
  657. case GRPC_OP_RECV_STATUS_ON_CLIENT:
  658. op.reset(new ClientStatusOp());
  659. break;
  660. case GRPC_OP_RECV_CLOSE_ON_SERVER:
  661. op.reset(new ServerCloseResponseOp());
  662. break;
  663. default:
  664. return Nan::ThrowError("Argument object had an unrecognized key");
  665. }
  666. if (!op->ParseOp(obj->Get(type), &ops[i])) {
  667. return Nan::ThrowTypeError("Incorrectly typed arguments to startBatch");
  668. }
  669. op_vector->push_back(std::move(op));
  670. }
  671. Callback *callback = new Callback(callback_func);
  672. grpc_call_error error = grpc_call_start_batch(
  673. call->wrapped_call, &ops[0], nops, new struct tag(
  674. callback, op_vector.release(), call), NULL);
  675. if (error != GRPC_CALL_OK) {
  676. return Nan::ThrowError(nanErrorWithCode("startBatch failed", error));
  677. }
  678. call->pending_batches++;
  679. CompletionQueueNext();
  680. }
  681. NAN_METHOD(Call::Cancel) {
  682. if (!Call::HasInstance(info.This())) {
  683. return Nan::ThrowTypeError("cancel can only be called on Call objects");
  684. }
  685. Call *call = ObjectWrap::Unwrap<Call>(info.This());
  686. grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL);
  687. if (error != GRPC_CALL_OK) {
  688. return Nan::ThrowError(nanErrorWithCode("cancel failed", error));
  689. }
  690. }
  691. NAN_METHOD(Call::CancelWithStatus) {
  692. Nan::HandleScope scope;
  693. if (!HasInstance(info.This())) {
  694. return Nan::ThrowTypeError("cancel can only be called on Call objects");
  695. }
  696. if (!info[0]->IsUint32()) {
  697. return Nan::ThrowTypeError(
  698. "cancelWithStatus's first argument must be a status code");
  699. }
  700. if (!info[1]->IsString()) {
  701. return Nan::ThrowTypeError(
  702. "cancelWithStatus's second argument must be a string");
  703. }
  704. Call *call = ObjectWrap::Unwrap<Call>(info.This());
  705. grpc_status_code code = static_cast<grpc_status_code>(
  706. Nan::To<uint32_t>(info[0]).FromJust());
  707. if (code == GRPC_STATUS_OK) {
  708. return Nan::ThrowRangeError(
  709. "cancelWithStatus cannot be called with OK status");
  710. }
  711. Utf8String details(info[1]);
  712. grpc_call_cancel_with_status(call->wrapped_call, code, *details, NULL);
  713. }
  714. NAN_METHOD(Call::GetPeer) {
  715. Nan::HandleScope scope;
  716. if (!HasInstance(info.This())) {
  717. return Nan::ThrowTypeError("getPeer can only be called on Call objects");
  718. }
  719. Call *call = ObjectWrap::Unwrap<Call>(info.This());
  720. char *peer = grpc_call_get_peer(call->wrapped_call);
  721. Local<Value> peer_value = Nan::New(peer).ToLocalChecked();
  722. gpr_free(peer);
  723. info.GetReturnValue().Set(peer_value);
  724. }
  725. NAN_METHOD(Call::SetCredentials) {
  726. Nan::HandleScope scope;
  727. if (!HasInstance(info.This())) {
  728. return Nan::ThrowTypeError(
  729. "setCredentials can only be called on Call objects");
  730. }
  731. if (!CallCredentials::HasInstance(info[0])) {
  732. return Nan::ThrowTypeError(
  733. "setCredentials' first argument must be a CallCredentials");
  734. }
  735. Call *call = ObjectWrap::Unwrap<Call>(info.This());
  736. CallCredentials *creds_object = ObjectWrap::Unwrap<CallCredentials>(
  737. Nan::To<Object>(info[0]).ToLocalChecked());
  738. grpc_call_credentials *creds = creds_object->GetWrappedCredentials();
  739. grpc_call_error error = GRPC_CALL_ERROR;
  740. if (creds) {
  741. error = grpc_call_set_credentials(call->wrapped_call, creds);
  742. }
  743. info.GetReturnValue().Set(Nan::New<Uint32>(error));
  744. }
  745. } // namespace node
  746. } // namespace grpc