| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 | // Copyright 2015, Google Inc.// All rights reserved.//// Redistribution and use in source and binary forms, with or without// modification, are permitted provided that the following conditions are// met:////     * Redistributions of source code must retain the above copyright// notice, this list of conditions and the following disclaimer.//     * Redistributions in binary form must reproduce the above// copyright notice, this list of conditions and the following disclaimer// in the documentation and/or other materials provided with the// distribution.//     * Neither the name of Google Inc. nor the names of its// contributors may be used to endorse or promote products derived from// this software without specific prior written permission.//// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.var async = require('async');var fs = require('fs');var GoogleAuth = require('googleauth');var parseArgs = require('minimist');var strftime = require('strftime');var _ = require('underscore');var grpc = require('../..');var PROTO_PATH = __dirname + '/pubsub.proto';var pubsub = grpc.load(PROTO_PATH).tech.pubsub;function PubsubRunner(pub, sub, args) {  this.pub = pub;  this.sub = sub;  this.args = args;}PubsubRunner.prototype.getTestTopicName = function() {  var base_name = '/topics/' + this.args.project_id + '/';  if (this.args.topic_name) {    return base_name + this.args.topic_name;  }  var now_text = strftime('%Y%m%d%H%M%S%L');  return base_name + process.env.USER + '-' + now_text;};PubsubRunner.prototype.getTestSubName = function() {  var base_name = '/subscriptions/' + this.args.project_id + '/';  if (this.args.sub_name) {    return base_name + this.args.sub_name;  }  var now_text = strftime('%Y%m%d%H%M%S%L');  return base_name + process.env.USER + '-' + now_text;};PubsubRunner.prototype.listProjectTopics = function(callback) {  var q = ('cloud.googleapis.com/project in (/projects/' +      this.args.project_id + ')');  this.pub.listTopics({query: q}, callback);};PubsubRunner.prototype.topicExists = function(name, callback) {  this.listProjectTopics(function(err, response) {    if (err) {      callback(err);    } else {      callback(null, _.some(response.topic, function(t) {        return t.name === name;      }));    }  });};PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {  var self = this;  this.topicExists(name, function(err, exists) {    if (err) {      callback(err);    } else{      if (exists) {        callback(null);      } else {        self.pub.createTopic({name: name}, callback);      }    }  });};PubsubRunner.prototype.removeTopic = function(callback) {  var name = this.getTestTopicName();  console.log('... removing Topic', name);  this.pub.deleteTopic({topic: name}, function(err, value) {    if (err) {      console.log('Could not delete a topic: rpc failed with', err);      callback(err);    } else {      console.log('removed Topic', name, 'OK');      callback(null);    }  });};PubsubRunner.prototype.createTopic = function(callback) {  var name = this.getTestTopicName();  console.log('... creating Topic', name);  this.pub.createTopic({name: name}, function(err, value) {    if (err) {      console.log('Could not create a topic: rpc failed with', err);      callback(err);    } else {      console.log('created Topic', name, 'OK');      callback(null);    }  });};PubsubRunner.prototype.listSomeTopics = function(callback) {  console.log('Listing topics');  console.log('-------------_');  this.listProjectTopics(function(err, response) {    if (err) {      console.log('Could not list topic: rpc failed with', err);      callback(err);    } else {      _.each(response.topic, function(t) {        console.log(t.name);      });      callback(null);    }  });};PubsubRunner.prototype.checkExists = function(callback) {  var name = this.getTestTopicName();  console.log('... checking for topic', name);  this.topicExists(name, function(err, exists) {    if (err) {      console.log('Could not check for a topics: rpc failed with', err);      callback(err);    } else {      if (exists) {        console.log(name, 'is a topic');      } else {        console.log(name, 'is not a topic');      }      callback(null);    }  });};PubsubRunner.prototype.randomPubSub = function(callback) {  var self = this;  var topic_name = this.getTestTopicName();  var sub_name = this.getTestSubName();  var subscription = {name: sub_name, topic: topic_name};  async.waterfall([    _.bind(this.createTopicIfNeeded, this, topic_name),    _.bind(this.sub.createSubscription, this.sub, subscription),    function(resp, cb) {      var msg_count = _.random(10, 30);      // Set up msg_count messages to publish      var message_senders = _.times(msg_count, function(n) {        return _.bind(self.pub.publish, self.pub, {          topic: topic_name,          message: {data: new Buffer('message ' + n)}        });      });      async.parallel(message_senders, function(err, result) {        cb(err, result, msg_count);      });    },    function(result, msg_count, cb) {      console.log('Sent', msg_count, 'messages to', topic_name + ',',                  'checking for them now.');      var batch_request = {        subscription: sub_name,        max_events: msg_count      };      self.sub.pullBatch(batch_request, cb);    },    function(batch, cb) {      var ack_id = _.pluck(batch.pull_responses, 'ack_id');      console.log('Got', ack_id.length, 'messages, acknowledging them...');      var ack_request = {        subscription: sub_name,        ack_id: ack_id      };      self.sub.acknowledge(ack_request, cb);    },    function(result, cb) {      console.log(          'Test messages were acknowledged OK, deleting the subscription');      self.sub.deleteSubscription({subscription: sub_name}, cb);    }  ], function (err, result) {    if (err) {      console.log('Could not do random pub sub: rpc failed with', err);    }    callback(err, result);  });};function main(callback) {  var argv = parseArgs(process.argv, {    string: [      'host',      'oauth_scope',      'port',      'action',      'project_id',      'topic_name',      'sub_name'    ],    default: {      host: 'pubsub-staging.googleapis.com',      oauth_scope: 'https://www.googleapis.com/auth/pubsub',      port: 443,      action: 'listSomeTopics',      project_id: 'stoked-keyword-656'    }  });  var valid_actions = [    'createTopic',    'removeTopic',    'listSomeTopics',    'checkExists',    'randomPubSub'  ];  if (_.some(valid_actions, function(action) {    return action === argv.action;  })) {    callback(new Error('Action was not valid'));  }  var address = argv.host + ':' + argv.port;  (new GoogleAuth()).getApplicationDefault(function(err, credential) {    if (err) {      callback(err);      return;    }    if (credential.createScopedRequired()) {      credential = credential.createScoped(argv.oauth_scope);    }    var updateMetadata = grpc.getGoogleAuthDelegate(credential);    var ca_path = process.env.SSL_CERT_FILE;    fs.readFile(ca_path, function(err, ca_data) {      if (err) {        callback(err);        return;      }      var ssl_creds = grpc.Credentials.createSsl(ca_data);      var options = {        credentials: ssl_creds,        'grpc.ssl_target_name_override': argv.host      };      var pub = new pubsub.PublisherService(address, options, updateMetadata);      var sub = new pubsub.SubscriberService(address, options, updateMetadata);      var runner = new PubsubRunner(pub, sub, argv);      runner[argv.action](callback);    });  });}if (require.main === module) {  main(function(err) {    if (err) throw err;  });}module.exports = PubsubRunner;
 |