/**
* @namespace cam
* @description Events section for Cam class
* @author Andrew D.Laptev <a.d.laptev@gmail.com>
* @licence MIT
*/
module.exports = function(Cam) {
/**
* @typedef {object} Cam~CreatePullPointSubscriptionResponse
* @property {object} subscriptionReference
* @property {string|object} subscriptionReference.address
* @property {Date} currentTime
* @property {Date} terminationTime
*/
/**
* Events namespace for the device, stores all information about device events
* @name Cam#events
* @type object
* @property {Cam~EventProperties} properties
* @property {Cam~CreatePullPointSubscriptionResponse} subscription
* @property {Date} terminationTime Time when pull-point subscription is over
* @property {number} messageLimit Pull message count
*/
const linerase = require('./utils').linerase;
const parseSOAPString = require('./utils').parseSOAPString;
const retryErrorCodes = ['ECONNREFUSED','ECONNRESET','ETIMEDOUT', 'ENETUNREACH'];
const maxEventReconnectMs = 2 * 60 * 1000;
/**
* Event properties object
* @typedef {object} Cam~EventProperties
* @property {array} topicNamespaceLocation
* @property {object} topicSet
* @property {array} topicExpressionDialect
*/
/**
* @callback Cam~GetEventPropertiesCallback
* @property {?Error} err
* @property {Cam~EventProperties} response
* @property {string} response xml
*/
/**
* Get event properties of the device. Sets `events` property of the device
* @param {Cam~GetEventPropertiesCallback} callback
*/
Cam.prototype.getEventProperties = function(callback) {
this._request({
service: 'events'
, body: this._envelopeHeader() +
'<GetEventProperties xmlns="http://www.onvif.org/ver10/events/wsdl"/>' +
this._envelopeFooter()
}, function(err, res, xml) {
if (!err) {
this.events.properties = linerase(res).getEventPropertiesResponse;
}
callback.call(this, err, err ? null : this.events.properties, xml);
}.bind(this));
};
/**
* Get event service capabilities
* @param {function} callback
*/
Cam.prototype.getEventServiceCapabilities = function(callback) {
this._request({
service: 'events'
, body: this._envelopeHeader() +
'<GetServiceCapabilities xmlns="http://www.onvif.org/ver10/events/wsdl"/>' +
this._envelopeFooter()
}, function(err, res, xml) {
if (!err) {
var data = linerase(res[0].getServiceCapabilitiesResponse[0].capabilities[0].$);
}
callback.call(this, err, data, xml);
}.bind(this));
};
/**
* Create Base Subscription
* This allows Cameras and NVTs to send events to a URL via a POST message
* TODO - Add Termination Time
* @param {object} options
* @param {string} options.url
* @param {function} callback
*/
Cam.prototype.subscribe = function(options, callback) {
let sendXml = this._envelopeHeader(true);
// TODO Add a:Action ??
// TODO Add a:MessageID ??
sendXml += '<a:ReplyTo><a:Address>' + options.url + '</a:Address></a:ReplyTo>';
sendXml += '</s:Header>' +
'<s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">' +
'<Subscribe xmlns="http://docs.oasis-open.org/wsn/b-2">' +
'<ConsumerReference><a:Address>' + options.url + '</a:Address></ConsumerReference>' +
'<InitialTerminationTime>PT2M</InitialTerminationTime>' + // 2 mins (a value greater than the 1 min Pull Timeout)
'</Subscribe>' +
this._envelopeFooter();
this._request({
service: 'events',
body: sendXml
}, function(err, res, xml) {
if (!err) {
this.events.subscription = linerase(res[0].subscribeResponse[0]);
this.events.subscription.subscriptionReference.address =
this._parseUrl(this.events.subscription.subscriptionReference.address);
this.events.terminationTime = _terminationTime(this.events.subscription);
}
callback.call(this, err, err ? null : this.events.subscription, xml);
}.bind(this));
};
/**
* Create pull-point subscription
* @param {function} callback
*/
Cam.prototype.createPullPointSubscription = function(callback) {
this._request({
service: 'events'
, body: this._envelopeHeader() +
'<CreatePullPointSubscription xmlns="http://www.onvif.org/ver10/events/wsdl">' +
'<InitialTerminationTime>PT2M</InitialTerminationTime>' +
'</CreatePullPointSubscription>' +
this._envelopeFooter()
}, function(err, res, xml) {
if (!err) {
this.events.subscription = linerase(res[0].createPullPointSubscriptionResponse[0]);
this.events.subscription.subscriptionReference.address =
this._parseUrl(this.events.subscription.subscriptionReference.address);
this.events.terminationTime = _terminationTime(this.events.subscription);
}
callback.call(this, err, err ? null : this.events.subscription, xml);
}.bind(this));
};
/**
* Renew pull-point subscription
* @param {Object|Function} [options]
* @param {Function} callback
*/
Cam.prototype.renew = function(options, callback) {
if (!callback) {
callback = options;
}
let urlAddress = null;
let subscriptionId = null;
try {
urlAddress = this.events.subscription.subscriptionReference.address;
} catch (e) {
if (callback && callback.call) {
callback.call(this, new Error('You should create pull-point subscription first!'));
}
return;
}
try {
subscriptionId = this.events.subscription.subscriptionReference.referenceParameters.subscriptionId;
} catch (e) {
subscriptionId = null;
}
let sendXml = this._envelopeHeader(true);
if (!subscriptionId) {
sendXml += '<a:To>' + urlAddress.href + '</a:To>';
} else {
// Axis Cameras use a PullPoint URL and the Subscription ID
sendXml += '<a:To mustUnderstand="1">' + urlAddress.href + '</a:To>' +
'<SubscriptionId xmlns="http://www.axis.com/2009/event" a:IsReferenceParameter="true">' + this.events.subscription.subscriptionReference.referenceParameters.subscriptionId + '</SubscriptionId>';
}
sendXml += '</s:Header>' +
'<s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">' +
'<Renew xmlns="http://docs.oasis-open.org/wsn/b-2">' +
'<TerminationTime>PT2M</TerminationTime>' + // 2 mins (larger than the 1 min Pull timeout)
'</Renew>' +
this._envelopeFooter();
this._request({
url: urlAddress,
body: sendXml
}, function(err, res, xml) {
if (!err) {
var data = linerase(res).renewResponse;
}
callback.call(this, err, data, xml);
}.bind(this));
};
/**
* @typedef {object} Cam~Event
* @property {Date} currentTime
* @property {Date} terminationTime
* @property {Cam~NotificationMessage|Array.<Cam~NotificationMessage>} [notificationMessage]
*/
/**
* @typedef {object} Cam~NotificationMessage
* @property {string} subscriptionReference.address Pull-point address
* @property {string} topic._ Namespace of message topic
* @property {object} message Message object
*/
/**
* @callback Cam~PullMessagesResponse
* @property {?Error} error
* @property {Cam~Event} response Message
* @property {string} xml Raw SOAP response
*/
/**
* Pull messages from pull-point subscription
* @param options
* @param {number} [options.messageLimit=10]
* @param {Cam~PullMessagesResponse} callback
* @throws {Error} {@link Cam#events.subscription} must exists
*/
Cam.prototype.pullMessages = function(options, callback) {
let urlAddress = null;
let subscriptionId = null;
try {
urlAddress = this.events.subscription.subscriptionReference.address;
} catch (e) {
if (callback && callback.call) {
callback.call(this, new Error('You should create pull-point subscription first!'));
} else {
throw new Error('You should create pull-point subscription first!');
}
return;
}
try {
subscriptionId = this.events.subscription.subscriptionReference.referenceParameters.subscriptionId;
} catch (e) {
subscriptionId = null;
}
let sendXml = this._envelopeHeader(true);
if (!subscriptionId) {
sendXml += '<a:To>' + urlAddress.href + '</a:To>';
} else {
// Axis Cameras use a PullPoint URL and the Subscription ID
sendXml += '<a:To mustUnderstand="1">' + urlAddress.href + '</a:To>' +
'<SubscriptionId xmlns="http://www.axis.com/2009/event" a:IsReferenceParameter="true">' + subscriptionId + '</SubscriptionId>';
}
sendXml += '</s:Header>' +
'<s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">' +
'<PullMessages xmlns="http://www.onvif.org/ver10/events/wsdl">' +
'<Timeout>PT1M</Timeout>' + // ONVIF Spec says cameras must support 1 Minute wait times. Ensure network socket has a replyTimeout that is larger that 1 minute
'<MessageLimit>' + (options.messageLimit || 10) + '</MessageLimit>' +
'</PullMessages>' +
this._envelopeFooter();
this._request({
url: urlAddress,
body: sendXml,
replyTimeout: (80 * 1000) // 80 seconds - ensures the socket does not get closed too early while the camera has up to 1 minute to reply
}, function(err, res, xml) {
if (!err) {
var data = linerase(res).pullMessagesResponse;
}
callback.call(this, err, data, xml);
}.bind(this));
};
/**
* Unsubscribe from pull-point subscription
* @param {Cam~PullMessagesResponse} [callback]
* @param {boolean} [preserveListeners=false] Don't remove listeners on 'event'
* @throws {Error} {@link Cam#events.subscription} must exists
*/
Cam.prototype.unsubscribe = function(callback, preserveListeners) {
let urlAddress = null;
let subscriptionId = null;
try {
urlAddress = this.events.subscription.subscriptionReference.address;
} catch (e) {
if (callback && callback.call) {
callback.call(this, new Error('You should create pull-point subscription first!'));
}
return;
}
try {
subscriptionId = this.events.subscription.subscriptionReference.referenceParameters.subscriptionId;
} catch (e) {
subscriptionId = null;
}
delete this.events.subscription;
delete this.events.terminationTime;
let sendXml = this._envelopeHeader(true);
if (!subscriptionId) {
sendXml += '<a:To>' + urlAddress.href + '</a:To>';
} else {
// Axis Cameras use a PullPoint URL and the Subscription ID
sendXml += '<a:To mustUnderstand="1">' + urlAddress.href + '</a:To>' +
'<SubscriptionId xmlns="http://www.axis.com/2009/event" a:IsReferenceParameter="true">' + subscriptionId + '</SubscriptionId>';
}
sendXml += '</s:Header>' +
'<s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">' +
'<Unsubscribe xmlns="http://docs.oasis-open.org/wsn/b-2"/>' +
this._envelopeFooter();
this._request({
url: urlAddress,
body: sendXml
}, function(err, res, xml) {
if (!err) {
if (!preserveListeners) {
this.removeAllListeners('event'); // We can subscribe again only if there is no 'event' listener
}
var data = linerase(res).unsubscribeResponse;
}
if (callback && callback.call) {
callback.call(this, err, data, xml);
}
}.bind(this));
};
/**
* Count time before pull-point subscription terminates
* @param {Cam~CreatePullPointSubscriptionResponse} response
* @returns {Date}
* @private
*/
function _terminationTime(response) {
return new Date(Date.now() - response.currentTime.getTime() + response.terminationTime.getTime());
}
/**
* Event loop for pullMessages request
* @private
*/
Cam.prototype._eventRequest = function() {
if (this.listeners('event').length) { // check for event listeners, if zero, stop pulling
this.events.messageLimit = this.events.messageLimit || 10; // setting message limit
if (!this.events.subscription || !this.events.terminationTime || (Date.now() > this.events.terminationTime)) {
// if there is no pull-point subscription or it has expired, create new subscription
this.createPullPointSubscription(function(error) {
if (!error) {
delete this._eventReconnectms;
this._eventPull();
} else {
this.emit('eventsError', error);
if (typeof error === 'object' && retryErrorCodes.includes(error.code)) {
// connection reset on creation - restart Event loop for pullMessages request
this._restartEventRequest();
}
}
}.bind(this));
} else {
this._eventPull();
}
} else {
delete this.events.terminationTime;
this.unsubscribe();
}
};
/**
* Event loop for pullMessages request
* @private
* @throws {Error} {@link Cam#events.subscription} must exists
*/
Cam.prototype._eventPull = function() {
if (this.listeners('event').length && this.events.subscription) { // check for event listeners, if zero, or no subscription then stop pulling
this.pullMessages({
messageLimit: this.events.messageLimit
}, function(error, data, xml) {
if (!error) {
delete this._eventReconnectms;
if (data.notificationMessage) {
if (!Array.isArray(data.notificationMessage)) {
data.notificationMessage = [data.notificationMessage];
}
data.notificationMessage.forEach(function(message) {
/**
* Indicates message from device.
* @event Cam#event
* @type {Cam~NotificationMessage}
*/
this.emit('event', message, xml);
}.bind(this));
}
this.events.terminationTime = _terminationTime(data); // Axis does not increment the termination time. Use RENEW. Vista returns a termination time with the time now (ie we have expired) even if there was still time left over. Use RENEW
// Axis cameras require us to Rewew the Pull Point Subscription
this.renew({},function(error, data) {
if (!error) {
this.events.terminationTime = _terminationTime(data);
}
this._eventRequest(); // go around the loop again, once the RENEW has completed (and terminationTime updated)
});
} else {
this.emit('eventsError', error);
if (typeof error === 'object' && retryErrorCodes.includes(error.code)) {
// connection reset - restart Event loop for pullMessages request
this._restartEventRequest();
} else {
// there was an error pulling the message
this.unsubscribe(function(_err, _data, _xml) {
// once the unsubsribe has completed (even if it failed), go around the loop again
this._eventRequest();
}, true);
}
}
}.bind(this));
} else {
delete this.events.terminationTime;
if (this.events.subscription) {
this.unsubscribe();
}
}
};
/**
* Restart the event request with an increasing interval when the connection to the device is refused
* @private
*/
Cam.prototype._restartEventRequest = function() {
// TODO maybe stop trying to connect after some time
if (!this._eventReconnectms) {
this._eventReconnectms = 10;
}
setTimeout(this._eventRequest.bind(this), this._eventReconnectms);
if (this._eventReconnectms < maxEventReconnectMs) {
this._eventReconnectms = 1.111 * this._eventReconnectms;
}
};
/**
* Helper Function to Parse XML Event data received by an external TCP port and
* a camera in Event PUSH mode (ie not in subscribe mode)
*/
Cam.prototype.parseEventXML = function(xml, callback) {
let innerFunction = function(err, data, _xml, _statusCode) {
let result = linerase(data).notify.notificationMessage;
callback(err, result);
};
parseSOAPString(xml, innerFunction, 0);
};
};