Skip to content

jays-codes/reactive-microservices

Repository files navigation

reactive-microservices

Jay's project/practice repo for Reactive Microservices : WebFlux + Project Reactor

proj: Stock Trading Services

  • added index.html front end; updated README2.md
  • Important: refer to [BP]README2.md [added] for Stock Trading Microservices details
  • Major Refactor of aggregator-service and customer-service to remove parent relationship with reactive-microservices: modified individual project pom and submodule poms
  • [BP] Major Refactor of aggregator-service and customer-service to conform to DDD principles: separate microservice into server and client submodules; expose clean API contract, common-client module for shared domain logic, Bounded Context implementation

proj: aggregator-service

  • created CustomerTradeTest: testCustomerTrade(), testCustomerTradeInsufficientBalance(), testInputValidation(); created /customer-service/customer-trade-400.json, customer-trade-200.json; helpers - mockCustomerTrade(path, respCode), mockStockPriceResponse(path, ticker, respCode), postTrade(tradeRequest, expectedStatus):WebTestClient.BodyContentSpec
  • created StockPriceStreamTest: testStockPriceStream() - .withContentType(MediaType.parse("application/x-ndjson")), .returnResult(), .accept(MediaType.TEXT_EVENT_STREAM); updated controller api for price stream to use MediaType.TEXT_EVENT_STREAM; created /stock-service/stock-price-stream-200.jsonl test response data file
  • refactored CIT: added mockCustomerInfo(path, respCode), getCustomerInfo(HttpStatus), testCustomerInfo() to call previous two methods
  • created CustomerInformationTest (CIT): testCustomerInfo(). response data json file; modified AbstractIntegrationTest - added resourceToString()
  • created AbstractIntegrationTest (aggregator-service/src/test/java): uses mock-server dependency. Uses @MockServerTest, @AutoConfigureWebTestClient, @SpringBootTest, MockServerClient, WebTestClient
  • created ServiceClientsConfig (aggregator-server/config) for StockServiceClient and CustomerServiceClient; passes in values from app.yaml: customer-service.url, stock-service.url
  • Added Exception handling via @ControllerAdvice, aggregator-server/advice.ApplicationExceptionHandler
  • created CustomerPortfolioControoler, StockPriceStreamController, <> and Impl
  • created <>, impl class (@Service) with getCustomperPortfolio():Mono, processTrade(custId, TradeReq):Mono - which calls stockClient.getStockPrice(ticker), and custClient.processTrade(custId, StockTradeRequest); created StockTradeMapper class with .toStockTradeRequest(TradeRequest, price):StockTradeRequest
  • created CustomerServiceClient (pkg: aggregator-server/aggregator.client): reactive WebClient to communicate with Customer-Service: getCustomerIno(custId):Mono, processTrade(custId, StockTradeRequest):Mono, handleException(BadRequest): Mono
  • StockServiceClient: Modified StockServiceClient into a Hot Publisher -> getPriceUpdates(); added cache(1) and retry logic: retryWhen(), retry(), Retry.fixedDelay(), doBeforeRetry(), RetrySignal.failure(); added attribute priceUpdateFlux : Flux and initialization method calling getPriceUpdates()
  • created StockServiceClient(pkg: aggregator-server/aggregator.client): reactive WebClient to connect to Stock Service; getStockPrice(Ticker):Mono, getPriceUpdates():Flux
  • [BP] Added in aggregator.exceptions package: InvalidTradeRequestException, ApplicationExceptions with factory methods returning Mono where t is exception type; RequestValidator with HOFs, validate(), returning UnaryOperator<Mono> or Mono via use of AppExceptions factory methods. for validation error related to TradeRequest
  • create packages: advice, client, config, controller, domain, dto, exceptions, service, validator; application.yaml, logback.xml, data.sql
  • initial proj create, SpringBoot 3.5.0, jdk 21, jar; dep: Spring Reactive Web, Lombok

proj: customer-service

  • CustomerServiceApplicationTests: additional Tests: buyAndSell(), negative tests: customerNotFound(), insufficientShares(), insufficientBalance(); used .jsonPath("$.detail") to evaluate string in returned ProblemDetail
  • CustomerServiceApplicationTests (src/test/java): used WebTestClient to test endpoints; created helper methods for getCustomer(), processTrade; test methods call helper methods and focus on asserting results
  • created ExceptionHandler, handleException():ProblemDetail methods for each exception; refactored build():ProblemDetail for use by handleException() methods; updated Sequence Diagram
  • created CustomerController (@RestController), 2 endpoints: getCustomerInfo(id):Mono GET /{id}, processTrade(id, Mono):Mono POST /{id}/trade; updated Sequence Diagram to reflect Controller
  • added to TradeServiceImpl: sellStock(), .executeSell(), refactored balance and portfolio update in executeBuy() to updateBalanceAndPortfolio() to reuse for executeSell(); updated UML sequence diagram for TradeService use cases
  • [BP] created <>, TradeServiceImpl with references to repos, (+)processTrade(custId, tradeReq):Mono, with private methods buyStock(), sellStock(), executeBuy(), executeSell(). buyStock() preps a Mono and Mono prior to calling executeBuy. Used switchIfEmpty(), filter(), defaultIfEmpty(), ApplicationExceptions, mapper call to get default new PortfolioItem; executeBuy() sets fields accordingly and does repos call, calls to repo done in parallel via Mono.zip; Updated Mapper class; update UML sequence diagram
  • refactored DTO naming; renamed private method in Service impl. [BP] template for entities with entity collection inside entity, with dto containing both parent entity and entity collection info. e.g. Order/OrderItem
  • created <>, CustomerServiceImpl with (+)findCustomerInfoById(id):Mono making a lambda call to this method ->, (-) getCustomerInfo(customer):Mono; create CustomerMapper with toCustomerInfo(customer, List):CustomerInfo
  • created Exception classes: CustomerNotFoundException, InsufficientBalanceException, InsufficientSharesException, ApplicationExceptions with factory methods
  • created domain: (enum) Ticker, TradeAction; entity: Customer, PortfolioItem; dto: (record) Holding, CustomerInfo, StockTradeRequest, StockTradeResponse; repository: <>, <>
  • create packages: advice, controller, domain, dto, entity, exceptions, mapper, repository, service; application.yaml, logback.xml, data.sql
  • initial proj create, SpringBoot 3.5.0, jdk 21, jar; dep: Spring Data R2DBC, Spring Reactive Web, H2, Lombok

proj: webflux-sandbox

Best Practices

  • [BP] HTTP2Test; Enabled HTTP2 in WebClient instance setup with pool size only 1. Processed 30,000 request in 5 secs

  • [BP] Customized Connection Pool to handle more request: Consumer<WebClient.Builder>, ConnectionProvider, .lifo(), .maxConnections(poolsize), .pendingAcquireMaxCount(), .compress(), .clientConnector(), ReactorClientHttpConnector+

  • [BP] ConnectPoolTest; test concurrent requests from client to slow service (demo03). created section11 on test package

  • Server Sent Events

  • added ServerSentEventTest: @SpringBootTest, tested streamProduct endpoint

  • Added filtering to Streaming. Created DataSetupService (@Service) implementing <> to emit 1,000 ProductDTO in 1 second interval with Random prices (via ThreadLocalRandom); new streaming endpoint in ProductController that accepts a @PathVariable (price) and passed to .filter() for filtering the streaming data; used template UI page

  • updated ProductController, created 2 endpoints saveProduct(), streamProduct() with produces=MediaType.TEXT_EVENT_STREAM_VALUE; test via browser (streamProduct endpoint), postman/console curl (saveProduct)

  • Update service class (interface and impl) to autowire productSink bean; added two api saveProduct(Mono):Mono that calls tryEmitNext() to save dto to productSink, and streamProduct():Flux calling productSink.asFlux()

  • create config pkg; created ApplicationConfig (@Configuration); created @Bean productSink():Sinks.Many

  • created section11 package

  • Streaming - Uploading million Products

  • FluxFileWriter - enables writing to a file as a Flux publisher is being processed; modified test class to call existing downloadProducts() with FluxFileWriter

  • Added client method to write records to a file(products.txt) as the Flux publisher is being processed; [BP] modified test to process 1,000,000 records

  • created service and related API, endpoint/methods for retrieving (download) all products. Updated ProductClient and DownloadTest as well;

  • Create ProductsUploadDownloadTest (src/test/java) to use ProductClient. testUpload() creates Flux of 10 products and calls client; to run, have springboot service running prior to executing test

  • Created ProductClient in src/test/java, defined WebClient reference, uploadProducts() to call appropriate service endpoint. Set .contentType(MediaType.APPLICATION_NDJSON)

  • Created UploadResponseDTO; ProductController using NDJSON format (MediaType.APPLICATION_NDJSON_VALUE), uploadProducts() post endpoint method

  • Created section10 pkg; created pkgs dto, entity, mapper, repository; create Product entity, ProductDTO record, Mapper, ProductRepository; enabled lombok; created ProductService class (Interface and Impl) with saveProducts(Flux):Flux and getProductsCount():Mono

  • WebClient - Non-Blocking HTTP Client

  • modified LoggingFilter to execute only given the value of an attribute (ClientRequest.attributes()), attribute set in call to WebClient via RequestHeader.attribute(); calls logging only for even Id numbers passed in request

  • Section09AssignmentLoggingFilter - created separate logging filter to log request info; added new filter to filter chain call

  • [BP] Section09ExchangeFilterTest - generate token for each request sent; used ExchangeFilterFunction, tokenGenerator():ExchangeFilterFunction, ClientRequest.from(), ExchangeFunction.exchange(), filter(ExchangeFilterFunction):Builder

  • Section09BearerAuthTest - for sending request requiring bearer token; call defaultHeaders(consumer), setBearerAuth()

  • Section09BasicAuthTest - for sending request requiring basic auth in header; call createWebClient(consumer) calling defaultHeaders(consumer), setBasicAuth()

  • Section09QueryParamTest - for sending HTTP requests having query params; used UriBuilder (w/variables and [BP] with Map) - uri(), .path(), .query(), build(10,5,"+"), build(map)

  • [BP] Section09ErrorHandlingTest: exchangeTest() using .exchangeToMono(this::decode), custom eval logic of ClientResponse

  • [BP] created Section09ErrorHandlingTest; doOnError(Ex, consumer) to log exception in ProblemDetail format via ex.getResponseBodyAs(), onErrorReturn(WebClientResponseException, obj) for error recovery returning obj appropriate for given WCRException Type: InternalServerError, BadRequest

  • created Section09HeaderTest - request requiring header values. WebClient.Builder.defaultHeader() - set via factory method of WebClient impl, .header(), .headers(consumer) (h->h.setAll(map))

  • created Section09FluxTest - send POST request to external service, via 2 options: passing in bodyValue (bodyValue(ProductDTO)), passing in Body Publisher (body(Mono))

  • created Section09StreamTest - process streaming response from external service endpoint; used bodyToFlux()

  • Section09MonoTest: added concurrentRequest() (lecturers code) and provided my improved alternative [BP] concurrentRequestsTest using Flux.range(), flatMap(function), getProduct(id):Mono - execute concurrent requests

  • created test Section09MonoTest to connect to external service via AbstractWebClient. tested GET /lec01/product/{id}, retrieved a Mono; created ProductDTO record to capture response data from external service

  • created section9 package in src/test/java; created AbstractWebClient [for demo02 endpoints - ensure external service is running]: WebClient, createWebClient(consumer<>):WebClient, createWebClient():WebClient, print(): Consumer; ensure external service (docker) is running

  • Functional Endpoints

  • answered assignment #92: [BP] on use of higher order functions to write RouterFunctions and Handlers; used RequestPredicate, used other impl of RouterFunctions.route().GET()

  • created RouterFunction Filters: filter()

  • modified RouterConfiguration to nest route functions, use path()

  • modified order of route for "GET /customers/{id}", and "GET /customers/paginated" to prevent calling the 1st GET when paginated GET is expected

  • modified handler getAllCustomersPaginated() to return complete collection instead of stream as is appropriate with pagination

  • created UML sequence and class diagrams to include error handling structure and flow

  • Created ApplicationExceptionHandler (@Service), handleException() for each exception type; wire handler to RouterConfiguration, used onError() and chained for each exception type. Take note of [BP] using high-order functions encapsulating common handleException() logic.

  • Created RouterConfiguration, implemented route mappings for endpoints - customerRoutes():RouterFunction, each route mapping is a map of endpoint url and respective handler method

  • created UML diagram for section 8: class diagram and sequence diagram centered on RouterConfiguration and CustomerRequestHandler

  • created CustomerRequestHandler, implemented handler methods returning Mono. called Service using inputs from ServerRequest

  • created RouterConfiguration, CustomerHandler.getAllCustomers()

  • added section8 package set for Functional Endpoints study

  • WebFilter

  • added section7.CustomerServiceTest to test stamdard amd prime category and HttpStatus: OK 200, FORBIDDEN 403, UNAUTHORIZED 401

  • created FilterErrorHandler to handle errors thrown from WebFilter returning ProblemDetail wrapped in a Mono. WebFilters call handler's, sendProblemDetail() to do return

  • created UML diagrams for section 7: class diagram and sequence diagram for filter behavior

  • modified controller getAll() to get category via @RequestAttribute

  • created AuthorizationWebFilter, retrieved category via exchange.getAttributeOrDefault(), provide method handler for enum category values, do a switch in filter() calling appropriate method handler for case representing enum bategory val; modified AuthenticationWebFilter to save category as an attribute

  • created AuthenticationWebFilter that checks for auth-token (determines access) in header; extracted token from header via exchange.getRequest().getHeaders().getFirst(); setStatusCode in response via exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED); reorganized uml diagrams

  • Added package section6 for integrating filters; created 2 x classes implementing <>, used filter(), @Order

  • Input Validation and Error Handling

  • Added CustomerServiceTest for section6 to test modifications to error handling

  • modified sequence diagram for update and save customer flows

  • Added ExceptionHandler (@ControllerAdvice), added handleException()s (@ExceptionHandler) for CustomerNotFoundException/InvalidInputException.class; modified classes to remove lombok references

  • included sequence diagrams (plantuml/mermaid) for saveCustomer(), updateCustomer() validation flow. note: please use plantuml or mermaid preview plugin or app to view.

  • created RequestValidator which encapsulate validation logic and throws approriate Error monos. CustomerController.update()/save() (note: request body) invokes transform() on Mono using RequestValidator as an arg, transform() ultimately returns an error mono or dto mono.

  • created custom Exception types ext RuntimeException: CustomerNotFoundException and InvalidInputException; created factory class ApplicationExceptions, with factory methods returning Mono.error enclosing specific exception type mentioned

  • created new package(section6) for demoing Input Validation/Error Handling. Added package placeholders: advice, exceptions, validator

package jayslabs.core.practice:

  • HigherOrderFunctions class: wrote closure and callback functions; refer to HOF notes in core java notes

proj: webflux-sandbox

  • added tests to CustomerServiceTest: validateCustomerNotFound() 404 using isNotFound() and 4xxClientError() for get,put,delete endpoints
  • added tests to CustomerServiceTest: getCustomerById(), createCustomer(), deleteCustomer(), updateCustomer()
  • modified CustomerController - deleteCustomer API to return 204;
  • modified CustomerController - saveCustomer API to return 201;
  • added getCustomerId test
  • created CustomerServiceTest: uses WebTestClient and test getAllUsers() api and getAllUsers paginated api
  • created paginated get API - uses @RequestParam for page and size; defined findBy() in <> which uses Pageable. defined Service method which uses PageRequest.Of()
  • modified deleteCustomer() api to return 200 and run map() only on successful repo delete
  • added Query method to repository to return Mono for deleteCustomer() and return 204 instead of 200 for successful delete; modified controller and service
  • [BP] updated APIs getCustomerById() and updateCustomer() to use ResponseEntity to return 404. modified return to Mono<ResponseEntity>
  • fixed issue with autowiring: added missing mapstruct-processor to pom, constructor for repo and mapper in ServiceImpl
  • created controller tailored for reactive pipeline, with CRUD APIs. Take note of Mono<> in @RequestBody for saveCustomer() and updateCustomer()
  • create <> and CustomerServiceImpl:
    • getAllCustomers():Flux
    • getCustomerById(Integer id):Mono
    • saveCustomer(Mono):Mono
    • updateCustomer(Integer id, Mono mono):Mono
    • deleteCustomer(Integer id):Mono
  • created Customer Mapper: basic java class, MapStruct interface
  • created demo section for REST API with Webflux; created packages structure: controller; service, repository, dto, mapper, entity; CustomerDTO, Customer, <>

proj: [BP][Template Quality] consumption-throughput-demo

  • created: parent mvn proj, pom, docker compose file, customer.sql, Makefile: 2 sub modules, reactive/traditional: customer entity, repository interfaces, 2x runners (CommandLineRunner - Efficiency/Throughput TestRunner), app.yml
  • demo project to test R2DBC vs JPA on efficiency (bulk querying) and throughput (speed of query execution) on 10,000,000 records on postgresql (setup via docker)

proj: webflux-sandbox

  • used DatabaseClient instead of r2dbc repo to execute custom sql: dbClient.sql(), bind(param, val), mapProperties(dto class), all()
  • updated CustomerOrderRepo to add method to return DTO from three tables; added test getCustomerOrdersByProductDescription(desc)
  • [BP] updated CustomerOrderRepository to add method for custom query: @Query, getProductsOrderedByCustomerName(name):Flux; added CustomerOrderRepoTest, getProductsOrderedByCustomerName()
  • Created CustomerOrder entity: used UUID for Id, [BP] java.time.Instant for orderDate (timestamp); created <>; refactored repositories to add @Repository
  • Pagination in r2dbc: modified ProductRepository, added findAllBy(Pageable):Flux; added test findPageableAllBy - test to return page 1 with 3 items per pg: used PageRequest, Sort.by("price"), ascending()
  • done Assignment25: create <>, add findByPriceBetween(), create ProductRepositoryTest, create testFindByPriceBetween()
  • tested setting logging level on app.yml and via @SpringBootTest property
  • created test to update; demo update logic: findByX(str), doOnNext(function to modify), flatMap(repo.save)
  • created test for Customer repo insert and delete: save(cust):Mono, deleteById(id), count()
  • Assignment21: Modified CustomerRepository to add custom query method: finByEmailEndingWith(str):Flux, added test: findByEmailEndingWith()
  • created Customer entity: used Lombok; created <> extending ReactiveCrudRepository<Customer,Integer>, added repo method findByName(name):Flux; created Test Class to test CustRepo, @SpringBootTest, 3x testcases (@Test) - testFindAll(), testFindById(), testFindByName()
  • added code to use R2DBC with H2; app.yml, data.sql; modified WebFluxSandboxApplication: used property scanBasePackages, added @EnableR2dbcRepositories/basePackages; refactored package naming
  • demoed Resiliency for Reactive endpoints; used new External service endpt to throw error, added onErrorComplete() to enable completion of emit
  • Created Streaming endpoint on reactive controller; used org.springframework.http.MediaType, MediaType.TEXT_EVENT_STREAM_VALUE
  • TraditionalProductController, ReactiveProductController (Traditional and Reactive controllers); used web.client.RestClient, reactive.function.client.WebClient, .builder().requestFactory().baseUrl().build(), .get().uri().retrieve(), .body(), .bodyToFlux(), doOnNext(). Flux, Product record
  • created package, tradvsreactive02 for demo on traditional vs reactive service; Product record,
  • initial proj commit: generate via initialzr (jdk 21, springboot 3.4.4; dep: Spring Reactive Web, Spring Data R2DBC, H2); created dockerfile, image for exeternal-service.jar

repo:

  • initial proj commit
  • created local and gh repo : reactive-microservices

About

Jay's project/practice repo for Reactive Microservices : WebFlux + Project Reactor

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages