-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathPubSubTelemetryEncoder.cs
More file actions
293 lines (250 loc) · 12.7 KB
/
Copy pathPubSubTelemetryEncoder.cs
File metadata and controls
293 lines (250 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
namespace Opc.Ua.Cloud.Publisher
{
using Extensions;
using Microsoft.Extensions.Logging;
using Opc.Ua;
using Opc.Ua.Cloud.Publisher.Interfaces;
using Opc.Ua.Cloud.Publisher.Models;
using System;
using System.Collections.Generic;
public class PubSubTelemetryEncoder : IMessageEncoder
{
private readonly IUAApplication _app;
private readonly ILogger _logger;
public PubSubTelemetryEncoder(IUAApplication app, ILoggerFactory loggerFactory)
{
_app = app;
_logger = loggerFactory.CreateLogger("PubSubTelemetryEncoder");
}
public string EncodeHeader(ulong messageID, bool isMetaData = false)
{
// The OPC UA PubSub JSON NetworkMessage header is optional (see OPC UA Part 14 JSON message mapping).
// When it is omitted for data messages, the NetworkMessage is simply the JSON array of DataSetMessages,
// so we only emit the opening bracket of that array here (the closing bracket is added when the batch is finished).
if (!isMetaData && Settings.Instance.OmitNetworkMessageHeader)
{
return "[";
}
// add PubSub JSON network message header (the mandatory fields of the OPC UA PubSub JSON NetworkMessage definition)
// see https://reference.opcfoundation.org/v105/Core/docs/Part14/7.2.5/#7.2.5.4
JsonEncoder encoder = new(new ServiceMessageContext(_app.Telemetry), Settings.Instance.ReversiblePubSubEncoding);
encoder.WriteString("MessageId", messageID.ToString());
if (isMetaData)
{
encoder.WriteString("MessageType", "ua-metadata");
}
else
{
encoder.WriteString("MessageType", "ua-data");
}
encoder.WriteString("PublisherId", Settings.Instance.PublisherName);
if (!isMetaData)
{
encoder.PushArray("Messages");
}
// remove the closing bracket as we will add this later
return encoder.CloseAndReturnText().TrimEnd('}');
}
public string EncodeMetadata(MessageProcessorModel messageData)
{
try
{
JsonEncoder encoder = new(messageData.MessageContext, Settings.Instance.ReversiblePubSubEncoding);
ushort hash = (ushort)(messageData.ApplicationUri.GetDeterministicHashCode() ^ messageData.ExpandedNodeId.GetDeterministicHashCode());
encoder.WriteUInt16("DataSetWriterId", hash);
DataSetMetaDataType dataSetMetaData = BuildDataSetMetaData(messageData);
encoder.WriteDateTime("Timestamp", DateTime.UtcNow);
encoder.WriteEncodeable("MetaData", dataSetMetaData, typeof(DataSetMetaDataType));
// remove the opening bracket as we need to stitch this together with the header
return encoder.CloseAndReturnText().TrimStart('{');
}
catch (Exception e)
{
_logger.LogError(e, "Generation of JSON PubSub metadata message failed.");
return string.Empty;
}
}
// Builds the DataSetMetaData (field type information) for the given message. Shared by the
// OPC UA PubSub metadata message and the CloudEvents metadata message.
private DataSetMetaDataType BuildDataSetMetaData(MessageProcessorModel messageData)
{
DataSetMetaDataType dataSetMetaData = new()
{
Name = messageData.ApplicationUri + ";" + messageData.ExpandedNodeId,
Fields = new FieldMetaDataCollection()
};
if (messageData.EventValues != null && messageData.EventValues.Count > 0)
{
// process events
foreach (EventValueModel eventValue in messageData.EventValues)
{
FieldMetaData fieldData = new()
{
Name = eventValue.Name,
DataSetFieldId = new Uuid(Guid.NewGuid()),
BuiltInType = (byte)eventValue.Value.WrappedValue.TypeInfo.BuiltInType,
DataType = TypeInfo.GetDataTypeId(eventValue.Value.WrappedValue),
ValueRank = eventValue.Value.WrappedValue.TypeInfo.ValueRank,
Description = LocalizedText.Null
};
dataSetMetaData.Fields.Add(fieldData);
}
}
else
{
FieldMetaData fieldData = new()
{
Name = messageData.Name,
DataSetFieldId = new Uuid(Guid.NewGuid()),
BuiltInType = (byte)messageData.Value.WrappedValue.TypeInfo.BuiltInType,
DataType = TypeInfo.GetDataTypeId(messageData.Value.WrappedValue),
ValueRank = messageData.Value.WrappedValue.TypeInfo.ValueRank,
Description = new LocalizedText(messageData.DataType)
};
dataSetMetaData.Fields.Add(fieldData);
}
dataSetMetaData.ConfigurationVersion = new ConfigurationVersionDataType()
{
MinorVersion = 1,
MajorVersion = 1
};
dataSetMetaData.Description = LocalizedText.Null;
// Populate the schema header namespaces with the distinct namespace URIs referenced by the field
// data types, resolved from the local (session) namespace table - no server round-trips. The
// structure/enum/simple type descriptions are intentionally left empty: built-in types don't need
// them, and describing custom types would require reading their definitions from the server.
foreach (FieldMetaData field in dataSetMetaData.Fields)
{
if (field.DataType == null)
{
continue;
}
string namespaceUri = messageData.MessageContext.NamespaceUris.GetString(field.DataType.NamespaceIndex);
if (!string.IsNullOrEmpty(namespaceUri) && !dataSetMetaData.Namespaces.Contains(namespaceUri))
{
dataSetMetaData.Namespaces.Add(namespaceUri);
}
}
return dataSetMetaData;
}
public string EncodeCloudEventMetadata(MessageProcessorModel messageData)
{
try
{
// CloudEvents binary mode: the OPC UA NetworkMessage and DataSet headers are mapped to CloudEvents
// attributes (carried as transport headers), so the payload contains only the DataSetMetaData object,
// encoded non-reversibly. See https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/opcua.md
JsonEncoder encoder = new(messageData.MessageContext, false);
DataSetMetaDataType dataSetMetaData = BuildDataSetMetaData(messageData);
encoder.WriteEncodeable("MetaData", dataSetMetaData, typeof(DataSetMetaDataType));
// the encoder wraps the value as {"MetaData":{...}}; return just the inner DataSetMetaData object
string wrapped = encoder.CloseAndReturnText();
const string metaDataPrefix = "{\"MetaData\":";
if (wrapped.StartsWith(metaDataPrefix, StringComparison.Ordinal) && wrapped.EndsWith("}", StringComparison.Ordinal))
{
return wrapped.Substring(metaDataPrefix.Length, wrapped.Length - metaDataPrefix.Length - 1);
}
return wrapped;
}
catch (Exception e)
{
_logger.LogError(e, "Generation of CloudEvents PubSub metadata message failed.");
return string.Empty;
}
}
// Builds the CloudEvents context attributes for an OPC UA PubSub metadata message (binary content mode).
// See https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/opcua.md
public IReadOnlyDictionary<string, string> BuildCloudEventMetadataAttributes(ulong messageId, ushort dataSetWriterId)
{
return new Dictionary<string, string>
{
["specversion"] = "1.0",
["type"] = "ua-metadata",
["id"] = messageId.ToString(),
["source"] = Settings.Instance.PublisherName,
["subject"] = dataSetWriterId.ToString(),
["time"] = DateTime.UtcNow.ToString("o"),
["datacontenttype"] = "application/json"
};
}
public string EncodePayload(MessageProcessorModel messageData, out ushort hash)
{
try
{
JsonEncoder encoder = new(messageData.MessageContext, Settings.Instance.ReversiblePubSubEncoding);
hash = (ushort)(messageData.ApplicationUri.GetDeterministicHashCode() ^ messageData.ExpandedNodeId.GetDeterministicHashCode());
encoder.WriteUInt16("DataSetWriterId", hash);
if ((messageData.EventValues == null) || (messageData.EventValues.Count == 0))
{
encoder.WriteDateTime("Timestamp", messageData.Value.ServerTimestamp);
}
if (messageData.Value.StatusCode != StatusCodes.Good)
{
encoder.WriteUInt32("Status", messageData.Value.StatusCode.Code);
}
encoder.PushStructure("Payload");
if ((messageData.EventValues != null) && (messageData.EventValues.Count > 0))
{
// process events
foreach (EventValueModel eventValue in messageData.EventValues)
{
// filter source timestamp before encoding
eventValue.Value.SourceTimestamp = DateTime.MinValue;
if (Settings.Instance.ReversiblePubSubEncoding)
{
encoder.WriteVariant(eventValue.Name, eventValue.Value.WrappedValue);
}
else
{
encoder.WriteVariant(eventValue.Name, eventValue.Value);
}
}
}
else
{
// filter timestamps before encoding as we already encoded the server timestamp above
messageData.Value.SourceTimestamp = DateTime.MinValue;
messageData.Value.ServerTimestamp = DateTime.MinValue;
if (Settings.Instance.ReversiblePubSubEncoding)
{
encoder.WriteVariant(messageData.Name, messageData.Value.WrappedValue);
}
else
{
encoder.WriteVariant(messageData.Name, messageData.Value);
}
}
encoder.PopStructure();
return encoder.CloseAndReturnText();
}
catch (Exception e)
{
_logger.LogError(e, "Generation of JSON PubSub data message failed.");
hash = 0;
return string.Empty;
}
}
public string EncodeStatus(ulong messageID)
{
try
{
// encode a PubSub JSON status message
JsonEncoder encoder = new(new ServiceMessageContext(_app.Telemetry), Settings.Instance.ReversiblePubSubEncoding);
encoder.WriteString("MessageId", messageID.ToString());
encoder.WriteString("MessageType", "ua-status");
encoder.WriteString("PublisherId", Settings.Instance.PublisherName);
encoder.WriteDateTime("Timestamp", DateTime.UtcNow);
encoder.WriteBoolean("IsCyclic", true);
encoder.WriteEnumerated("Status", PubSubState.Operational);
encoder.WriteDateTime("NextReportTime", DateTime.UtcNow.AddMilliseconds(Settings.Instance.DiagnosticsLoggingInterval * 1000));
return encoder.CloseAndReturnText();
}
catch (Exception e)
{
_logger.LogError(e, "Generation of JSON PubSub status message failed.");
return string.Empty;
}
}
}
}