| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 | 
							- // Copyright 2015, Google Inc.
 
- // All rights reserved.
 
- //
 
- // Redistribution and use in source and binary forms, with or without
 
- // modification, are permitted provided that the following conditions are
 
- // met:
 
- //
 
- //     * Redistributions of source code must retain the above copyright
 
- // notice, this list of conditions and the following disclaimer.
 
- //     * Redistributions in binary form must reproduce the above
 
- // copyright notice, this list of conditions and the following disclaimer
 
- // in the documentation and/or other materials provided with the
 
- // distribution.
 
- //     * Neither the name of Google Inc. nor the names of its
 
- // contributors may be used to endorse or promote products derived from
 
- // this software without specific prior written permission.
 
- //
 
- // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
- // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
- // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 
- // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 
- // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
- // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 
- // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
- // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
- // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
- // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 
- // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
- var async = require('async');
 
- var fs = require('fs');
 
- var GoogleAuth = require('googleauth');
 
- var parseArgs = require('minimist');
 
- var strftime = require('strftime');
 
- var _ = require('underscore');
 
- var grpc = require('../..');
 
- var PROTO_PATH = __dirname + '/pubsub.proto';
 
- var pubsub = grpc.load(PROTO_PATH).tech.pubsub;
 
- function PubsubRunner(pub, sub, args) {
 
-   this.pub = pub;
 
-   this.sub = sub;
 
-   this.args = args;
 
- }
 
- PubsubRunner.prototype.getTestTopicName = function() {
 
-   var base_name = '/topics/' + this.args.project_id + '/';
 
-   if (this.args.topic_name) {
 
-     return base_name + this.args.topic_name;
 
-   }
 
-   var now_text = strftime('%Y%m%d%H%M%S%L');
 
-   return base_name + process.env.USER + '-' + now_text;
 
- };
 
- PubsubRunner.prototype.getTestSubName = function() {
 
-   var base_name = '/subscriptions/' + this.args.project_id + '/';
 
-   if (this.args.sub_name) {
 
-     return base_name + this.args.sub_name;
 
-   }
 
-   var now_text = strftime('%Y%m%d%H%M%S%L');
 
-   return base_name + process.env.USER + '-' + now_text;
 
- };
 
- PubsubRunner.prototype.listProjectTopics = function(callback) {
 
-   var q = ('cloud.googleapis.com/project in (/projects/' +
 
-       this.args.project_id + ')');
 
-   this.pub.listTopics({query: q}, callback);
 
- };
 
- PubsubRunner.prototype.topicExists = function(name, callback) {
 
-   this.listProjectTopics(function(err, response) {
 
-     if (err) {
 
-       callback(err);
 
-     } else {
 
-       callback(null, _.some(response.topic, function(t) {
 
-         return t.name === name;
 
-       }));
 
-     }
 
-   });
 
- };
 
- PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
 
-   var self = this;
 
-   this.topicExists(name, function(err, exists) {
 
-     if (err) {
 
-       callback(err);
 
-     } else{
 
-       if (exists) {
 
-         callback(null);
 
-       } else {
 
-         self.pub.createTopic({name: name}, callback);
 
-       }
 
-     }
 
-   });
 
- };
 
- PubsubRunner.prototype.removeTopic = function(callback) {
 
-   var name = this.getTestTopicName();
 
-   console.log('... removing Topic', name);
 
-   this.pub.deleteTopic({topic: name}, function(err, value) {
 
-     if (err) {
 
-       console.log('Could not delete a topic: rpc failed with', err);
 
-       callback(err);
 
-     } else {
 
-       console.log('removed Topic', name, 'OK');
 
-       callback(null);
 
-     }
 
-   });
 
- };
 
- PubsubRunner.prototype.createTopic = function(callback) {
 
-   var name = this.getTestTopicName();
 
-   console.log('... creating Topic', name);
 
-   this.pub.createTopic({name: name}, function(err, value) {
 
-     if (err) {
 
-       console.log('Could not create a topic: rpc failed with', err);
 
-       callback(err);
 
-     } else {
 
-       console.log('created Topic', name, 'OK');
 
-       callback(null);
 
-     }
 
-   });
 
- };
 
- PubsubRunner.prototype.listSomeTopics = function(callback) {
 
-   console.log('Listing topics');
 
-   console.log('-------------_');
 
-   this.listProjectTopics(function(err, response) {
 
-     if (err) {
 
-       console.log('Could not list topic: rpc failed with', err);
 
-       callback(err);
 
-     } else {
 
-       _.each(response.topic, function(t) {
 
-         console.log(t.name);
 
-       });
 
-       callback(null);
 
-     }
 
-   });
 
- };
 
- PubsubRunner.prototype.checkExists = function(callback) {
 
-   var name = this.getTestTopicName();
 
-   console.log('... checking for topic', name);
 
-   this.topicExists(name, function(err, exists) {
 
-     if (err) {
 
-       console.log('Could not check for a topics: rpc failed with', err);
 
-       callback(err);
 
-     } else {
 
-       if (exists) {
 
-         console.log(name, 'is a topic');
 
-       } else {
 
-         console.log(name, 'is not a topic');
 
-       }
 
-       callback(null);
 
-     }
 
-   });
 
- };
 
- PubsubRunner.prototype.randomPubSub = function(callback) {
 
-   var self = this;
 
-   var topic_name = this.getTestTopicName();
 
-   var sub_name = this.getTestSubName();
 
-   var subscription = {name: sub_name, topic: topic_name};
 
-   async.waterfall([
 
-     _.bind(this.createTopicIfNeeded, this, topic_name),
 
-     _.bind(this.sub.createSubscription, this.sub, subscription),
 
-     function(resp, cb) {
 
-       var msg_count = _.random(10, 30);
 
-       // Set up msg_count messages to publish
 
-       var message_senders = _.times(msg_count, function(n) {
 
-         return _.bind(self.pub.publish, self.pub, {
 
-           topic: topic_name,
 
-           message: {data: new Buffer('message ' + n)}
 
-         });
 
-       });
 
-       async.parallel(message_senders, function(err, result) {
 
-         cb(err, result, msg_count);
 
-       });
 
-     },
 
-     function(result, msg_count, cb) {
 
-       console.log('Sent', msg_count, 'messages to', topic_name + ',',
 
-                   'checking for them now.');
 
-       var batch_request = {
 
-         subscription: sub_name,
 
-         max_events: msg_count
 
-       };
 
-       self.sub.pullBatch(batch_request, cb);
 
-     },
 
-     function(batch, cb) {
 
-       var ack_id = _.pluck(batch.pull_responses, 'ack_id');
 
-       console.log('Got', ack_id.length, 'messages, acknowledging them...');
 
-       var ack_request = {
 
-         subscription: sub_name,
 
-         ack_id: ack_id
 
-       };
 
-       self.sub.acknowledge(ack_request, cb);
 
-     },
 
-     function(result, cb) {
 
-       console.log(
 
-           'Test messages were acknowledged OK, deleting the subscription');
 
-       self.sub.deleteSubscription({subscription: sub_name}, cb);
 
-     }
 
-   ], function (err, result) {
 
-     if (err) {
 
-       console.log('Could not do random pub sub: rpc failed with', err);
 
-     }
 
-     callback(err, result);
 
-   });
 
- };
 
- function main(callback) {
 
-   var argv = parseArgs(process.argv, {
 
-     string: [
 
-       'host',
 
-       'oauth_scope',
 
-       'port',
 
-       'action',
 
-       'project_id',
 
-       'topic_name',
 
-       'sub_name'
 
-     ],
 
-     default: {
 
-       host: 'pubsub-staging.googleapis.com',
 
-       oauth_scope: 'https://www.googleapis.com/auth/pubsub',
 
-       port: 443,
 
-       action: 'listSomeTopics',
 
-       project_id: 'stoked-keyword-656'
 
-     }
 
-   });
 
-   var valid_actions = [
 
-     'createTopic',
 
-     'removeTopic',
 
-     'listSomeTopics',
 
-     'checkExists',
 
-     'randomPubSub'
 
-   ];
 
-   if (_.some(valid_actions, function(action) {
 
-     return action === argv.action;
 
-   })) {
 
-     callback(new Error('Action was not valid'));
 
-   }
 
-   var address = argv.host + ':' + argv.port;
 
-   (new GoogleAuth()).getApplicationDefault(function(err, credential) {
 
-     if (err) {
 
-       callback(err);
 
-       return;
 
-     }
 
-     if (credential.createScopedRequired()) {
 
-       credential = credential.createScoped(argv.oauth_scope);
 
-     }
 
-     var updateMetadata = grpc.getGoogleAuthDelegate(credential);
 
-     var ca_path = process.env.SSL_CERT_FILE;
 
-     fs.readFile(ca_path, function(err, ca_data) {
 
-       if (err) {
 
-         callback(err);
 
-         return;
 
-       }
 
-       var ssl_creds = grpc.Credentials.createSsl(ca_data);
 
-       var options = {
 
-         credentials: ssl_creds,
 
-         'grpc.ssl_target_name_override': argv.host
 
-       };
 
-       var pub = new pubsub.PublisherService(address, options, updateMetadata);
 
-       var sub = new pubsub.SubscriberService(address, options, updateMetadata);
 
-       var runner = new PubsubRunner(pub, sub, argv);
 
-       runner[argv.action](callback);
 
-     });
 
-   });
 
- }
 
- if (require.main === module) {
 
-   main(function(err) {
 
-     if (err) throw err;
 
-   });
 
- }
 
- module.exports = PubsubRunner;
 
 
  |