 anduin revisou este gist 1 year ago. Ir para a revisão
                
                anduin revisou este gist 1 year ago. Ir para a revisão
                
            
            Sem alterações
                
                
                
                     anduin revisou este gist 1 year ago. Ir para a revisão
                
                anduin revisou este gist 1 year ago. Ir para a revisão
                
                    1 file changed, 19 insertions
Service.csproj(arquivo criado)
| @@ -0,0 +1,19 @@ | |||
| 1 | + | <Project Sdk="Microsoft.NET.Sdk"> | |
| 2 | + | <PropertyGroup> | |
| 3 | + | <OutputType>Exe</OutputType> | |
| 4 | + | <TargetFramework>net8.0</TargetFramework> | |
| 5 | + | <AssemblyName>LearnServiceBus</AssemblyName> | |
| 6 | + | <RootNamespace>LearnServiceBus</RootNamespace> | |
| 7 | + | <IsTestProject>false</IsTestProject> | |
| 8 | + | <IsPackable>false</IsPackable> | |
| 9 | + | <ImplicitUsings>enable</ImplicitUsings> | |
| 10 | + | <Nullable>enable</Nullable> | |
| 11 | + | </PropertyGroup> | |
| 12 | + | <ItemGroup> | |
| 13 | + | <PackageReference Include="Aiursoft.Canon" Version="8.0.4" /> | |
| 14 | + | <PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.5" /> | |
| 15 | + | <PackageReference Include="Microsoft.Azure.Kusto.Ingest" Version="12.2.3" /> | |
| 16 | + | <PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" /> | |
| 17 | + | <PackageReference Include="Microsoft.Extensions.Logging.console" Version="8.0.0" /> | |
| 18 | + | </ItemGroup> | |
| 19 | + | </Project> | |
                
                
                
                     anduin revisou este gist 1 year ago. Ir para a revisão
                
                anduin revisou este gist 1 year ago. Ir para a revisão
                
                    1 file changed, 99 insertions, 28 deletions
Program.cs
| @@ -1,6 +1,11 @@ | |||
| 1 | + | using System.ComponentModel; | |
| 2 | + | using System.Data; | |
| 1 | 3 | using System.Text; | |
| 4 | + | using Aiursoft.Canon; | |
| 2 | 5 | using Azure.Identity; | |
| 3 | 6 | using Azure.Messaging.ServiceBus; | |
| 7 | + | using Kusto.Data; | |
| 8 | + | using Kusto.Ingest; | |
| 4 | 9 | using Microsoft.Extensions.DependencyInjection; | |
| 5 | 10 | using Microsoft.Extensions.Logging; | |
| 6 | 11 | using Microsoft.Identity.Client; | |
| @@ -20,20 +25,23 @@ public abstract class Program | |||
| 20 | 25 | logging.SetMinimumLevel(LogLevel.Information); | |
| 21 | 26 | }); | |
| 22 | 27 | ||
| 28 | + | services.AddMemoryCache(); | |
| 29 | + | services.AddTaskCanon(); | |
| 23 | 30 | services.AddHttpClient(); | |
| 24 | - | services.AddSingleton<ServiceBusEventSender>(); | |
| 31 | + | services.AddSingleton<DemoWorker>(); | |
| 25 | 32 | ||
| 26 | 33 | var serviceProvider = services.BuildServiceProvider(); | |
| 27 | - | var entry = serviceProvider.GetRequiredService<ServiceBusEventSender>(); | |
| 34 | + | var entry = serviceProvider.GetRequiredService<DemoWorker>(); | |
| 28 | 35 | ||
| 29 | 36 | await Task.Factory.StartNew(async () => | |
| 30 | 37 | { | |
| 31 | 38 | try | |
| 32 | 39 | { | |
| 33 | - | await entry.Listen(async message => | |
| 40 | + | await entry.ListenFromServiceBusAsync(async message => | |
| 34 | 41 | { | |
| 35 | - | await Task.CompletedTask; | |
| 36 | - | Console.WriteLine($"==================Received message: {message}=================="); | |
| 42 | + | var messageObject = JsonConvert.DeserializeObject<TestEvent>(message); | |
| 43 | + | Console.WriteLine($"================Received message: {messageObject?.Name}================"); | |
| 44 | + | await entry.SendToKustoAsync(messageObject); | |
| 37 | 45 | }); | |
| 38 | 46 | } | |
| 39 | 47 | catch (Exception e) | |
| @@ -45,9 +53,9 @@ public abstract class Program | |||
| 45 | 53 | ||
| 46 | 54 | for (int i = 0; i < 1000; i++) | |
| 47 | 55 | { | |
| 48 | - | await entry.SendAsync(new TestEvent { Name = $"Test {i}" }); | |
| 56 | + | await entry.SendToServiceBusAsync(new TestEvent { Name = $"Test {i}" }); | |
| 49 | 57 | } | |
| 50 | - | ||
| 58 | + | ||
| 51 | 59 | await Task.Delay(int.MaxValue); | |
| 52 | 60 | } | |
| 53 | 61 | } | |
| @@ -58,17 +66,25 @@ public class TestEvent | |||
| 58 | 66 | public string? Name { get; set; } | |
| 59 | 67 | } | |
| 60 | 68 | ||
| 61 | - | public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpClientFactory httpClientFactory) | |
| 69 | + | public class DemoWorker(ILogger<DemoWorker> logger, IHttpClientFactory httpClientFactory, CacheService cache) | |
| 62 | 70 | { | |
| 63 | 71 | private readonly HttpClient _httpClient = httpClientFactory.CreateClient(); | |
| 72 | + | ||
| 73 | + | // Authentication. | |
| 64 | 74 | private readonly string _tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838"; | |
| 65 | 75 | private readonly string _clientId = "a19850a3-cfef-4ced-8fcd-d632fec31bdc"; | |
| 66 | - | private readonly string _clientSecret = "YOUR_CLIENT_SECRET"; | |
| 76 | + | private readonly string _clientSecret = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; | |
| 67 | 77 | private readonly string _scope = "https://servicebus.azure.net/.default"; | |
| 68 | 78 | ||
| 79 | + | // Service Bus | |
| 69 | 80 | private readonly string _serviceBusNamespace = "anduinlearn"; | |
| 70 | - | private readonly string _topicName = "mytopic"; | |
| 71 | - | private readonly string _subscriptionName = "mysubscription"; | |
| 81 | + | private readonly string _serviceBusTopicName = "mytopic"; | |
| 82 | + | private readonly string _serviceBusTopicSubscriptionName = "mysubscription"; | |
| 83 | + | ||
| 84 | + | // Kusto database. | |
| 85 | + | private readonly string _kustoIngestionUri = "https://ingest-learnkustoanduin.eastasia.kusto.windows.net"; | |
| 86 | + | private readonly string _kustoDatabase = "MyDatabase"; | |
| 87 | + | private readonly string _kustoTable = "MyTable"; | |
| 72 | 88 | ||
| 73 | 89 | /// <summary> | |
| 74 | 90 | /// Sends an asynchronous request to the service bus endpoint. | |
| @@ -76,12 +92,12 @@ public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpC | |||
| 76 | 92 | /// <typeparam name="T">The type of the request object.</typeparam> | |
| 77 | 93 | /// <param name="request">The request object.</param> | |
| 78 | 94 | /// <returns>A task representing the asynchronous operation that returns the response string.</returns> | |
| 79 | - | public async Task<string> SendAsync<T>(T request) | |
| 95 | + | public async Task<string> SendToServiceBusAsync<T>(T request) | |
| 80 | 96 | { | |
| 81 | 97 | var requestJson = JsonConvert.SerializeObject(request); | |
| 82 | 98 | logger.LogTrace("Sending request to service bus, with input {0}", request?.GetType().FullName); | |
| 83 | 99 | ||
| 84 | - | var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_topicName}/messages"; | |
| 100 | + | var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_serviceBusTopicName}/messages"; | |
| 85 | 101 | logger.LogTrace("Sending request to service bus, with endpoint {0}", endpoint); | |
| 86 | 102 | ||
| 87 | 103 | var token = await GetToken(); | |
| @@ -112,14 +128,14 @@ public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpC | |||
| 112 | 128 | return responseString; | |
| 113 | 129 | } | |
| 114 | 130 | ||
| 115 | - | public async Task Listen(Func<string, Task> onMessage) | |
| 131 | + | public async Task ListenFromServiceBusAsync(Func<string, Task> onMessage) | |
| 116 | 132 | { | |
| 117 | 133 | // Create Service Bus client and processor | |
| 118 | - | logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _topicName, _subscriptionName); | |
| 134 | + | logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _serviceBusTopicName, _serviceBusTopicSubscriptionName); | |
| 119 | 135 | var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret); | |
| 120 | 136 | var serviceBusClient = new ServiceBusClient(_serviceBusNamespace + ".servicebus.windows.net" | |
| 121 | 137 | , credential); | |
| 122 | - | var processor = serviceBusClient.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions()); | |
| 138 | + | var processor = serviceBusClient.CreateProcessor(_serviceBusTopicName, _serviceBusTopicSubscriptionName, new ServiceBusProcessorOptions()); | |
| 123 | 139 | ||
| 124 | 140 | processor.ProcessMessageAsync += async args => | |
| 125 | 141 | { | |
| @@ -129,28 +145,83 @@ public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpC | |||
| 129 | 145 | await args.CompleteMessageAsync(message); | |
| 130 | 146 | }; | |
| 131 | 147 | ||
| 132 | - | processor.ProcessErrorAsync += ErrorHandler; | |
| 148 | + | processor.ProcessErrorAsync += args => | |
| 149 | + | { | |
| 150 | + | logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message); | |
| 151 | + | return Task.CompletedTask; | |
| 152 | + | }; | |
| 133 | 153 | ||
| 134 | - | logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _topicName, _subscriptionName); | |
| 154 | + | logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _serviceBusTopicName, _serviceBusTopicSubscriptionName); | |
| 135 | 155 | // Start processing | |
| 136 | 156 | await processor.StartProcessingAsync(); | |
| 137 | 157 | await Task.Delay(int.MaxValue); | |
| 138 | 158 | } | |
| 139 | 159 | ||
| 140 | - | private Task ErrorHandler(ProcessErrorEventArgs args) | |
| 160 | + | public async Task SendToKustoAsync<T>(T request) | |
| 141 | 161 | { | |
| 142 | - | logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message); | |
| 143 | - | return Task.CompletedTask; | |
| 162 | + | var dataTable = new List<T> { request }.ToDataTable(); | |
| 163 | + | var kustoIngestService = new KustoIngestService(_kustoIngestionUri, _kustoDatabase, _clientId, _clientSecret, _tenantId); | |
| 164 | + | await kustoIngestService.IngestDataAsync(dataTable, _kustoTable); | |
| 144 | 165 | } | |
| 145 | 166 | ||
| 146 | - | private async Task<string> GetToken() | |
| 167 | + | private Task<string?> GetToken() | |
| 147 | 168 | { | |
| 148 | - | var app = ConfidentialClientApplicationBuilder.Create(_clientId) | |
| 149 | - | .WithClientSecret(_clientSecret) | |
| 150 | - | .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}")) | |
| 151 | - | .Build(); | |
| 169 | + | return cache.RunWithCache("token", async () => | |
| 170 | + | { | |
| 171 | + | var app = ConfidentialClientApplicationBuilder.Create(_clientId) | |
| 172 | + | .WithClientSecret(_clientSecret) | |
| 173 | + | .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}")) | |
| 174 | + | .Build(); | |
| 152 | 175 | ||
| 153 | - | var authResult = await app.AcquireTokenForClient(new[] { _scope }).ExecuteAsync(); | |
| 154 | - | return authResult.AccessToken; | |
| 176 | + | var authResult = await app.AcquireTokenForClient([_scope]).ExecuteAsync(); | |
| 177 | + | return authResult.AccessToken; | |
| 178 | + | }); | |
| 155 | 179 | } | |
| 156 | 180 | } | |
| 181 | + | ||
| 182 | + | public static class DataTableExtensions | |
| 183 | + | { | |
| 184 | + | public static DataTable ToDataTable<T>(this IList<T> data) | |
| 185 | + | { | |
| 186 | + | var properties = TypeDescriptor.GetProperties(typeof(T)); | |
| 187 | + | var table = new DataTable(); | |
| 188 | + | ||
| 189 | + | foreach (PropertyDescriptor prop in properties) | |
| 190 | + | { | |
| 191 | + | table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType); | |
| 192 | + | } | |
| 193 | + | ||
| 194 | + | foreach (T item in data) | |
| 195 | + | { | |
| 196 | + | var row = table.NewRow(); | |
| 197 | + | foreach (PropertyDescriptor prop in properties) | |
| 198 | + | { | |
| 199 | + | row[prop.Name] = prop.GetValue(item) ?? DBNull.Value; | |
| 200 | + | } | |
| 201 | + | table.Rows.Add(row); | |
| 202 | + | } | |
| 203 | + | return table; | |
| 204 | + | } | |
| 205 | + | } | |
| 206 | + | ||
| 207 | + | public class KustoIngestService | |
| 208 | + | { | |
| 209 | + | private IKustoIngestClient _kustoIngestClient; | |
| 210 | + | private string _database; | |
| 211 | + | ||
| 212 | + | public KustoIngestService(string kustoUri, string database, string appId, string appKey, string tenantId) | |
| 213 | + | { | |
| 214 | + | var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri) | |
| 215 | + | .WithAadApplicationKeyAuthentication(appId, appKey, tenantId); | |
| 216 | + | _kustoIngestClient = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilder); | |
| 217 | + | _database = database; | |
| 218 | + | } | |
| 219 | + | ||
| 220 | + | public async Task IngestDataAsync(DataTable dataTable, string tableName) | |
| 221 | + | { | |
| 222 | + | var ingestionProperties = new KustoIngestionProperties(_database, tableName); | |
| 223 | + | var dataStream = dataTable.CreateDataReader(); | |
| 224 | + | ||
| 225 | + | await _kustoIngestClient.IngestFromDataReaderAsync(dataStream, ingestionProperties); | |
| 226 | + | } | |
| 227 | + | } | |
                
                
                
                     anduin revisou este gist 1 year ago. Ir para a revisão
                
                anduin revisou este gist 1 year ago. Ir para a revisão
                
                    1 file changed, 156 insertions
Program.cs(arquivo criado)
| @@ -0,0 +1,156 @@ | |||
| 1 | + | using System.Text; | |
| 2 | + | using Azure.Identity; | |
| 3 | + | using Azure.Messaging.ServiceBus; | |
| 4 | + | using Microsoft.Extensions.DependencyInjection; | |
| 5 | + | using Microsoft.Extensions.Logging; | |
| 6 | + | using Microsoft.Identity.Client; | |
| 7 | + | using Newtonsoft.Json; | |
| 8 | + | using LogLevel = Microsoft.Extensions.Logging.LogLevel; | |
| 9 | + | ||
| 10 | + | namespace LabServiceBus; | |
| 11 | + | ||
| 12 | + | public abstract class Program | |
| 13 | + | { | |
| 14 | + | public static async Task Main(string[] args) | |
| 15 | + | { | |
| 16 | + | var services = new ServiceCollection(); | |
| 17 | + | services.AddLogging(logging => | |
| 18 | + | { | |
| 19 | + | logging.AddConsole(); | |
| 20 | + | logging.SetMinimumLevel(LogLevel.Information); | |
| 21 | + | }); | |
| 22 | + | ||
| 23 | + | services.AddHttpClient(); | |
| 24 | + | services.AddSingleton<ServiceBusEventSender>(); | |
| 25 | + | ||
| 26 | + | var serviceProvider = services.BuildServiceProvider(); | |
| 27 | + | var entry = serviceProvider.GetRequiredService<ServiceBusEventSender>(); | |
| 28 | + | ||
| 29 | + | await Task.Factory.StartNew(async () => | |
| 30 | + | { | |
| 31 | + | try | |
| 32 | + | { | |
| 33 | + | await entry.Listen(async message => | |
| 34 | + | { | |
| 35 | + | await Task.CompletedTask; | |
| 36 | + | Console.WriteLine($"==================Received message: {message}=================="); | |
| 37 | + | }); | |
| 38 | + | } | |
| 39 | + | catch (Exception e) | |
| 40 | + | { | |
| 41 | + | Console.WriteLine(e); | |
| 42 | + | throw; | |
| 43 | + | } | |
| 44 | + | }); | |
| 45 | + | ||
| 46 | + | for (int i = 0; i < 1000; i++) | |
| 47 | + | { | |
| 48 | + | await entry.SendAsync(new TestEvent { Name = $"Test {i}" }); | |
| 49 | + | } | |
| 50 | + | ||
| 51 | + | await Task.Delay(int.MaxValue); | |
| 52 | + | } | |
| 53 | + | } | |
| 54 | + | ||
| 55 | + | public class TestEvent | |
| 56 | + | { | |
| 57 | + | // ReSharper disable once UnusedAutoPropertyAccessor.Global | |
| 58 | + | public string? Name { get; set; } | |
| 59 | + | } | |
| 60 | + | ||
| 61 | + | public class ServiceBusEventSender(ILogger<ServiceBusEventSender> logger, IHttpClientFactory httpClientFactory) | |
| 62 | + | { | |
| 63 | + | private readonly HttpClient _httpClient = httpClientFactory.CreateClient(); | |
| 64 | + | private readonly string _tenantId = "538bf3b7-8deb-4179-9c63-f09d13f65838"; | |
| 65 | + | private readonly string _clientId = "a19850a3-cfef-4ced-8fcd-d632fec31bdc"; | |
| 66 | + | private readonly string _clientSecret = "YOUR_CLIENT_SECRET"; | |
| 67 | + | private readonly string _scope = "https://servicebus.azure.net/.default"; | |
| 68 | + | ||
| 69 | + | private readonly string _serviceBusNamespace = "anduinlearn"; | |
| 70 | + | private readonly string _topicName = "mytopic"; | |
| 71 | + | private readonly string _subscriptionName = "mysubscription"; | |
| 72 | + | ||
| 73 | + | /// <summary> | |
| 74 | + | /// Sends an asynchronous request to the service bus endpoint. | |
| 75 | + | /// </summary> | |
| 76 | + | /// <typeparam name="T">The type of the request object.</typeparam> | |
| 77 | + | /// <param name="request">The request object.</param> | |
| 78 | + | /// <returns>A task representing the asynchronous operation that returns the response string.</returns> | |
| 79 | + | public async Task<string> SendAsync<T>(T request) | |
| 80 | + | { | |
| 81 | + | var requestJson = JsonConvert.SerializeObject(request); | |
| 82 | + | logger.LogTrace("Sending request to service bus, with input {0}", request?.GetType().FullName); | |
| 83 | + | ||
| 84 | + | var endpoint = $"https://{_serviceBusNamespace}.servicebus.windows.net/{_topicName}/messages"; | |
| 85 | + | logger.LogTrace("Sending request to service bus, with endpoint {0}", endpoint); | |
| 86 | + | ||
| 87 | + | var token = await GetToken(); | |
| 88 | + | logger.LogTrace("Sending request to service bus, with token {0}", token.Substring(0, 20)); | |
| 89 | + | ||
| 90 | + | var httpRequest = new HttpRequestMessage | |
| 91 | + | { | |
| 92 | + | Method = HttpMethod.Post, | |
| 93 | + | RequestUri = new Uri(endpoint) | |
| 94 | + | }; | |
| 95 | + | httpRequest.Headers.Add("Authorization", $"Bearer {token}"); | |
| 96 | + | httpRequest.Headers.Add("accept", "application/json"); | |
| 97 | + | httpRequest.Content = new StringContent(requestJson, Encoding.UTF8, "application/json"); | |
| 98 | + | ||
| 99 | + | logger.LogTrace("Sending request to service bus, with content {0}", requestJson); | |
| 100 | + | var response = await _httpClient.SendAsync(httpRequest); | |
| 101 | + | ||
| 102 | + | logger.LogTrace("Received response from service bus, with status code {0}", response.StatusCode); | |
| 103 | + | var responseString = await response.Content.ReadAsStringAsync(); | |
| 104 | + | ||
| 105 | + | logger.LogTrace("Received response from service bus, with content {0} and status code {1}", responseString, | |
| 106 | + | response.StatusCode); | |
| 107 | + | if (!response.IsSuccessStatusCode) | |
| 108 | + | { | |
| 109 | + | throw new Exception($"Failed to send new service bus event. {responseString}"); | |
| 110 | + | } | |
| 111 | + | ||
| 112 | + | return responseString; | |
| 113 | + | } | |
| 114 | + | ||
| 115 | + | public async Task Listen(Func<string, Task> onMessage) | |
| 116 | + | { | |
| 117 | + | // Create Service Bus client and processor | |
| 118 | + | logger.LogWarning("Listening to service bus topic {0} and subscription {1}", _topicName, _subscriptionName); | |
| 119 | + | var credential = new ClientSecretCredential(_tenantId, _clientId, _clientSecret); | |
| 120 | + | var serviceBusClient = new ServiceBusClient(_serviceBusNamespace + ".servicebus.windows.net" | |
| 121 | + | , credential); | |
| 122 | + | var processor = serviceBusClient.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions()); | |
| 123 | + | ||
| 124 | + | processor.ProcessMessageAsync += async args => | |
| 125 | + | { | |
| 126 | + | var message = args.Message; | |
| 127 | + | var body = Encoding.UTF8.GetString(message.Body.ToArray()); | |
| 128 | + | await onMessage(body); | |
| 129 | + | await args.CompleteMessageAsync(message); | |
| 130 | + | }; | |
| 131 | + | ||
| 132 | + | processor.ProcessErrorAsync += ErrorHandler; | |
| 133 | + | ||
| 134 | + | logger.LogWarning("Starting to listen to service bus topic {0} and subscription {1}", _topicName, _subscriptionName); | |
| 135 | + | // Start processing | |
| 136 | + | await processor.StartProcessingAsync(); | |
| 137 | + | await Task.Delay(int.MaxValue); | |
| 138 | + | } | |
| 139 | + | ||
| 140 | + | private Task ErrorHandler(ProcessErrorEventArgs args) | |
| 141 | + | { | |
| 142 | + | logger.LogError(args.Exception, "Error processing message: {ExceptionMessage}", args.Exception.Message); | |
| 143 | + | return Task.CompletedTask; | |
| 144 | + | } | |
| 145 | + | ||
| 146 | + | private async Task<string> GetToken() | |
| 147 | + | { | |
| 148 | + | var app = ConfidentialClientApplicationBuilder.Create(_clientId) | |
| 149 | + | .WithClientSecret(_clientSecret) | |
| 150 | + | .WithAuthority(new Uri($"https://login.microsoftonline.com/{_tenantId}")) | |
| 151 | + | .Build(); | |
| 152 | + | ||
| 153 | + | var authResult = await app.AcquireTokenForClient(new[] { _scope }).ExecuteAsync(); | |
| 154 | + | return authResult.AccessToken; | |
| 155 | + | } | |
| 156 | + | } | |